From 8729a9dd54e573d98193ef9894d873d685594cf9 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 16:28:49 +0800 Subject: [PATCH 01/12] mysql,sqlmodel: support table route in mysql sink --- pkg/common/event/ddl_event_test.go | 155 ++++++-------- pkg/sink/mysql/helper.go | 2 +- pkg/sink/mysql/mysql_writer_ddl.go | 2 +- .../mysql_writer_dml_active_active_test.go | 20 ++ pkg/sink/mysql/mysql_writer_test.go | 60 ++++++ pkg/sink/mysql/sql_builder.go | 4 +- pkg/sink/mysql/sql_builder_test.go | 18 ++ pkg/sink/sqlmodel/multi_row.go | 6 +- pkg/sink/sqlmodel/multi_row_test.go | 61 ++++++ pkg/sink/sqlmodel/multi_row_v1.go | 4 +- pkg/sink/sqlmodel/row_change.go | 6 +- pkg/sink/sqlmodel/row_change_test.go | 33 +++ tests/integration_tests/run_light_it_in_ci.sh | 2 +- tests/integration_tests/table_route/README.md | 35 ++++ .../table_route/conf/changefeed.toml | 13 ++ .../table_route/conf/diff_config.toml | 112 ++++++++++ .../table_route/data/test.sql | 196 ++++++++++++++++++ tests/integration_tests/table_route/run.sh | 51 +++++ 18 files changed, 680 insertions(+), 100 deletions(-) create mode 100644 tests/integration_tests/table_route/README.md create mode 100644 tests/integration_tests/table_route/conf/changefeed.toml create mode 100644 tests/integration_tests/table_route/conf/diff_config.toml create mode 100644 tests/integration_tests/table_route/data/test.sql create mode 100755 tests/integration_tests/table_route/run.sh diff --git a/pkg/common/event/ddl_event_test.go b/pkg/common/event/ddl_event_test.go index 629e8ad756..258ba064d8 100644 --- a/pkg/common/event/ddl_event_test.go +++ b/pkg/common/event/ddl_event_test.go @@ -21,8 +21,6 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/ast" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -530,55 +528,39 @@ func TestNewRoutedDDLEvent(t *testing.T) { helper := NewEventTestHelper(t) defer helper.Close() - helper.tk.MustExec("use test") - ddlJob := helper.DDL2Job(createTableSQL) - require.NotNil(t, ddlJob) - - // Create original DDL event with all fields populated - originalTableInfo := common.WrapTableInfo(ddlJob.SchemaName, ddlJob.BinlogInfo.TableInfo) - originalTableInfo.InitPrivateFields() + helper.Tk().MustExec("CREATE DATABASE `source_db`") + original := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") - multipleTableInfo1 := common.WrapTableInfo("schema1", ddlJob.BinlogInfo.TableInfo) - multipleTableInfo1.InitPrivateFields() - multipleTableInfo2 := common.WrapTableInfo("schema2", ddlJob.BinlogInfo.TableInfo) - multipleTableInfo2.InitPrivateFields() + require.NotNil(t, original) + require.NotNil(t, original.TableInfo) + require.Empty(t, original.targetSchemaName) + require.Empty(t, original.targetTableName) + require.Empty(t, original.targetExtraSchemaName) + require.Empty(t, original.targetExtraTableName) + originalQuery := original.Query postFlushFunc1 := func() {} postFlushFunc2 := func() {} - original := &DDLEvent{ - Version: DDLEventVersion1, - DispatcherID: common.NewDispatcherID(), - Type: byte(ddlJob.Type), - SchemaID: ddlJob.SchemaID, - SchemaName: ddlJob.SchemaName, - TableName: ddlJob.TableName, - Query: ddlJob.Query, - TableInfo: originalTableInfo, - FinishedTs: ddlJob.BinlogInfo.FinishedTS, - Seq: 1, - Epoch: 2, - MultipleTableInfos: []*common.TableInfo{multipleTableInfo1, multipleTableInfo2}, - PostTxnFlushed: []func(){postFlushFunc1, postFlushFunc2}, - TiDBOnly: true, - BDRMode: "test-mode", - } + original.DispatcherID = common.NewDispatcherID() + original.Seq = 1 + original.Epoch = 2 + original.PostTxnFlushed = []func(){postFlushFunc1, postFlushFunc2} + original.TiDBOnly = true + original.BDRMode = "test-mode" - newRoutedTableInfo := originalTableInfo.CloneWithRouting("routed_schema", "test") - routedMultipleTableInfos := []*common.TableInfo{ - multipleTableInfo1.CloneWithRouting("routed_schema1", "table1"), - multipleTableInfo2.CloneWithRouting("routed_schema2", "table2"), - } + originalTableInfo := original.TableInfo + newRoutedTableInfo := originalTableInfo.CloneWithRouting("routed_schema", "source_table_routed") routed := NewRoutedDDLEvent( original, - "CREATE TABLE routed_schema.test ...", + "CREATE TABLE `routed_schema`.`source_table_routed` (`id` INT PRIMARY KEY)", "routed_schema", - "", + "source_table_routed", "", "", newRoutedTableInfo, - routedMultipleTableInfos, + nil, original.BlockedTableNames, ) require.NotNil(t, routed) @@ -599,9 +581,6 @@ func TestNewRoutedDDLEvent(t *testing.T) { require.Equal(t, original.TiDBOnly, routed.TiDBOnly) require.Equal(t, original.BDRMode, routed.BDRMode) - // Verify that MultipleTableInfos is a new slice so later mutations remain isolated. - require.False(t, &original.MultipleTableInfos[0] == &routed.MultipleTableInfos[0], "MultipleTableInfos should be a new slice") - // Verify that PostTxnFlushed is an independent copy (not shared) // This is defensive: currently DDL events arrive with nil PostTxnFlushed, // but we copy it to prevent races if callbacks are ever added before building the routed event. @@ -617,21 +596,18 @@ func TestNewRoutedDDLEvent(t *testing.T) { require.Equal(t, 2, len(original.PostTxnFlushed), "Original should be unaffected by routed event append") // Verify that routed state doesn't affect the original. - require.Equal(t, ddlJob.SchemaName, original.SchemaName, "Original SchemaName should be unchanged") - require.Equal(t, ddlJob.Query, original.Query, "Original Query should be unchanged") + require.Equal(t, "source_db", original.SchemaName, "Original SchemaName should be unchanged") + require.Equal(t, originalQuery, original.Query, "Original Query should be unchanged") require.True(t, original.TableInfo == originalTableInfo, "Original TableInfo should be unchanged") - require.True(t, original.MultipleTableInfos[0] == multipleTableInfo1, "Original MultipleTableInfos[0] should be unchanged") - require.True(t, original.MultipleTableInfos[1] == multipleTableInfo2, "Original MultipleTableInfos[1] should be unchanged") // Verify that the routed event has the routed state. require.Equal(t, "routed_schema", routed.GetTargetSchemaName()) - require.Equal(t, "CREATE TABLE routed_schema.test ...", routed.Query) + require.Equal(t, "source_table_routed", routed.GetTargetTableName()) + require.Equal(t, "CREATE TABLE `routed_schema`.`source_table_routed` (`id` INT PRIMARY KEY)", routed.Query) require.True(t, routed.TableInfo == newRoutedTableInfo) require.Equal(t, "routed_schema", routed.TableInfo.TableName.TargetSchema) require.Equal(t, original.SchemaName, routed.GetSchemaName()) require.Equal(t, original.TableName, routed.GetTableName()) - require.True(t, routed.MultipleTableInfos[0] == routedMultipleTableInfos[0]) - require.True(t, routed.MultipleTableInfos[1] == routedMultipleTableInfos[1]) // Test nil origin event. var nilEvent *DDLEvent @@ -640,25 +616,25 @@ func TestNewRoutedDDLEvent(t *testing.T) { } func TestNewRoutedDDLEventPreservesSourceFields(t *testing.T) { - original := &DDLEvent{ - SchemaName: "source_db", - TableName: "new_orders", - ExtraSchemaName: "source_db", - ExtraTableName: "old_orders", - targetSchemaName: "target_db", - targetTableName: "new_orders_routed", - targetExtraSchemaName: "target_db", - targetExtraTableName: "old_orders_routed", - } + helper := NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("CREATE DATABASE `source_db`") + helper.Tk().MustExec("CREATE TABLE `source_db`.`old_orders` (`id` INT PRIMARY KEY)") + original := helper.DDL2Event("RENAME TABLE `source_db`.`old_orders` TO `source_db`.`new_orders`") + require.Empty(t, original.targetSchemaName) + require.Empty(t, original.targetTableName) + require.Empty(t, original.targetExtraSchemaName) + require.Empty(t, original.targetExtraTableName) routed := NewRoutedDDLEvent( original, - original.Query, + "RENAME TABLE `target_db_v2`.`old_orders_routed_v2` TO `target_db_v2`.`new_orders_routed_v2`", "target_db_v2", "new_orders_routed_v2", "target_db_v2", "old_orders_routed_v2", - original.TableInfo, + original.TableInfo.CloneWithRouting("target_db_v2", "new_orders_routed_v2"), original.MultipleTableInfos, original.BlockedTableNames, ) @@ -674,49 +650,54 @@ func TestNewRoutedDDLEventPreservesSourceFields(t *testing.T) { } func TestGetEventsForRenameTablesPreservesSourceAndTargetNames(t *testing.T) { - sourceTable1 := common.WrapTableInfo("new_db1", &model.TableInfo{ - ID: 100, - Name: ast.NewCIStr("new_table1"), - UpdateTS: 10, - }) - sourceTable2 := common.WrapTableInfo("new_db2", &model.TableInfo{ - ID: 101, - Name: ast.NewCIStr("new_table2"), - UpdateTS: 11, - }) - - ddl := &DDLEvent{ - Type: byte(model.ActionRenameTables), - Query: "RENAME TABLE `old_target_db1`.`old_target_table1` TO `new_target_db1`.`new_target_table1`; RENAME TABLE `old_target_db2`.`old_target_table2` TO `new_target_db2`.`new_target_table2`", - MultipleTableInfos: []*common.TableInfo{ - sourceTable1.CloneWithRouting("new_target_db1", "new_target_table1"), - sourceTable2.CloneWithRouting("new_target_db2", "new_target_table2"), + helper := NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("CREATE DATABASE `rename_db`") + helper.Tk().MustExec("CREATE TABLE `rename_db`.`old_table1` (`id` INT PRIMARY KEY)") + helper.Tk().MustExec("CREATE TABLE `rename_db`.`old_table2` (`id` INT PRIMARY KEY)") + original := helper.DDL2Event( + "RENAME TABLE `rename_db`.`old_table1` TO `rename_db`.`new_table1`, `rename_db`.`old_table2` TO `rename_db`.`new_table2`") + require.Empty(t, original.targetSchemaName) + require.Empty(t, original.targetTableName) + require.Empty(t, original.targetExtraSchemaName) + require.Empty(t, original.targetExtraTableName) + + ddl := NewRoutedDDLEvent( + original, + "RENAME TABLE `old_target_db1`.`old_target_table1` TO `new_target_db1`.`new_target_table1`; RENAME TABLE `old_target_db2`.`old_target_table2` TO `new_target_db2`.`new_target_table2`", + "", + "", + "", + "", + original.TableInfo, + []*common.TableInfo{ + original.MultipleTableInfos[0].CloneWithRouting("new_target_db1", "new_target_table1"), + original.MultipleTableInfos[1].CloneWithRouting("new_target_db2", "new_target_table2"), }, - TableNameChange: &TableNameChange{ - DropName: []SchemaTableName{ - {SchemaName: "old_db1", TableName: "old_table1"}, - {SchemaName: "old_db2", TableName: "old_table2"}, - }, + []SchemaTableName{ + {SchemaName: "old_target_db1", TableName: "old_target_table1"}, + {SchemaName: "old_target_db2", TableName: "old_target_table2"}, }, - } + ) events := ddl.GetEvents() require.Len(t, events, 2) - require.Equal(t, "new_db1", events[0].SchemaName) + require.Equal(t, "rename_db", events[0].SchemaName) require.Equal(t, "new_table1", events[0].TableName) require.Equal(t, "new_target_db1", events[0].GetTargetSchemaName()) require.Equal(t, "new_target_table1", events[0].GetTargetTableName()) - require.Equal(t, "old_db1", events[0].ExtraSchemaName) + require.Equal(t, "rename_db", events[0].ExtraSchemaName) require.Equal(t, "old_table1", events[0].ExtraTableName) require.Equal(t, "old_target_db1", events[0].GetTargetExtraSchemaName()) require.Equal(t, "old_target_table1", events[0].GetTargetExtraTableName()) - require.Equal(t, "new_db2", events[1].SchemaName) + require.Equal(t, "rename_db", events[1].SchemaName) require.Equal(t, "new_table2", events[1].TableName) require.Equal(t, "new_target_db2", events[1].GetTargetSchemaName()) require.Equal(t, "new_target_table2", events[1].GetTargetTableName()) - require.Equal(t, "old_db2", events[1].ExtraSchemaName) + require.Equal(t, "rename_db", events[1].ExtraSchemaName) require.Equal(t, "old_table2", events[1].ExtraTableName) require.Equal(t, "old_target_db2", events[1].GetTargetExtraSchemaName()) require.Equal(t, "old_target_table2", events[1].GetTargetExtraTableName()) diff --git a/pkg/sink/mysql/helper.go b/pkg/sink/mysql/helper.go index f795d8dff1..3bf2507618 100644 --- a/pkg/sink/mysql/helper.go +++ b/pkg/sink/mysql/helper.go @@ -406,7 +406,7 @@ func CreateMysqlDBConn(dsnStr string) (*sql.DB, error) { } func needSwitchDB(event *commonEvent.DDLEvent) bool { - if len(event.GetSchemaName()) == 0 { + if len(event.GetTargetSchemaName()) == 0 { return false } if event.GetDDLType() == timodel.ActionCreateSchema || event.GetDDLType() == timodel.ActionDropSchema { diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 6b06d71781..48009fcf5f 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -106,7 +106,7 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { } if shouldSwitchDB { - _, err = tx.ExecContext(ctx, "USE "+common.QuoteName(event.GetSchemaName())+";") + _, err = tx.ExecContext(ctx, "USE "+common.QuoteName(event.GetTargetSchemaName())+";") if err != nil { if rbErr := tx.Rollback(); rbErr != nil { log.Error("Failed to rollback", zap.Error(err)) diff --git a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go index f3cea842a1..e33a1da614 100644 --- a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go +++ b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go @@ -48,6 +48,26 @@ func TestBuildActiveActiveUpsertSQLMultiRows(t *testing.T) { require.Equal(t, common.RowTypeInsert, rowTypes) } +func TestBuildActiveActiveUpsertSQLUsesRoutedTargetTable(t *testing.T) { + writer, _, _ := newTestMysqlWriter(t) + defer writer.db.Close() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + job := helper.DDL2Job("create table t (id int primary key, name varchar(32), _tidb_origin_ts bigint unsigned null, _tidb_softdelete_time timestamp null);") + require.NotNil(t, job) + + event := helper.DML2Event("test", "t", "insert into t values (1, 'alice', 10, NULL)") + event.TableInfo = helper.GetTableInfo(job).CloneWithRouting("target_db", "target_table") + + rows, commitTs := writer.collectActiveActiveRows(event) + sql, _, _ := buildActiveActiveUpsertSQL(event.TableInfo, rows, commitTs) + require.Contains(t, sql, "INSERT INTO `target_db`.`target_table`") + require.NotContains(t, sql, "`test`.`t`") +} + func TestActiveActiveNormalSQLs(t *testing.T) { writer, _, _ := newTestMysqlWriter(t) defer writer.db.Close() diff --git a/pkg/sink/mysql/mysql_writer_test.go b/pkg/sink/mysql/mysql_writer_test.go index 7b169aaab6..449d1e9ff2 100644 --- a/pkg/sink/mysql/mysql_writer_test.go +++ b/pkg/sink/mysql/mysql_writer_test.go @@ -354,6 +354,66 @@ func TestMysqlWriter_Flush_EmptyEvents(t *testing.T) { require.NoError(t, err) } +func TestMysqlWriterExecDDLUsesRoutedSchemaName(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("CREATE DATABASE `source_db`") + helper.Tk().MustExec("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") + alterDDL := helper.DDL2Event("ALTER TABLE `source_db`.`source_table` ADD COLUMN age INT") + require.Equal(t, "source_db", alterDDL.GetTargetSchemaName()) + require.Equal(t, "source_table", alterDDL.GetTargetTableName()) + routedAlterDDL := commonEvent.NewRoutedDDLEvent( + alterDDL, + "ALTER TABLE `target_db`.`target_table` ADD COLUMN age INT", + "target_db", + "target_table", + "", + "", + alterDDL.TableInfo.CloneWithRouting("target_db", "target_table"), + nil, + nil, + ) + + writer, db, mock := newTestMysqlWriter(t) + defer db.Close() + mock.ExpectBegin() + mock.ExpectExec("USE `target_db`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("ALTER TABLE `target_db`.`target_table` ADD COLUMN age INT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + require.NoError(t, writer.execDDL(routedAlterDDL)) + require.NoError(t, mock.ExpectationsWereMet()) + + helper.Tk().MustExec("CREATE TABLE `source_db`.`orders_old` (`id` INT PRIMARY KEY)") + renameDDL := helper.DDL2Event("RENAME TABLE `source_db`.`orders_old` TO `source_db`.`orders_new`") + require.Equal(t, "source_db", renameDDL.GetTargetSchemaName()) + require.Equal(t, "orders_new", renameDDL.GetTargetTableName()) + require.Equal(t, "source_db", renameDDL.GetTargetExtraSchemaName()) + require.Equal(t, "orders_old", renameDDL.GetTargetExtraTableName()) + routedRenameDDL := commonEvent.NewRoutedDDLEvent( + renameDDL, + "RENAME TABLE `old_target_db`.`orders_old` TO `new_target_db`.`orders_new`", + "new_target_db", + "orders_new", + "old_target_db", + "orders_old", + renameDDL.TableInfo.CloneWithRouting("new_target_db", "orders_new"), + nil, + nil, + ) + + writer, db, mock = newTestMysqlWriter(t) + defer db.Close() + mock.ExpectBegin() + mock.ExpectExec("USE `new_target_db`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("RENAME TABLE `old_target_db`.`orders_old` TO `new_target_db`.`orders_new`").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + require.NoError(t, writer.execDDL(routedRenameDDL)) + require.NoError(t, mock.ExpectationsWereMet()) +} + func TestMysqlWriter_FlushSyncPointEvent(t *testing.T) { writer, db, mock := newTestMysqlWriter(t) defer db.Close() diff --git a/pkg/sink/mysql/sql_builder.go b/pkg/sink/mysql/sql_builder.go index 54e0ec75da..97b672573d 100644 --- a/pkg/sink/mysql/sql_builder.go +++ b/pkg/sink/mysql/sql_builder.go @@ -180,7 +180,7 @@ func buildInsert( // sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1` func buildDelete(tableInfo *common.TableInfo, row commonEvent.RowChange) (string, []interface{}) { var builder strings.Builder - quoteTable := tableInfo.TableName.QuoteString() + quoteTable := tableInfo.TableName.QuoteTargetString() builder.WriteString("DELETE FROM ") builder.WriteString(quoteTable) builder.WriteString(" WHERE ") @@ -308,7 +308,7 @@ func buildActiveActiveUpsertSQL( var builder strings.Builder builder.WriteString("INSERT INTO ") - builder.WriteString(tableInfo.TableName.QuoteString()) + builder.WriteString(tableInfo.TableName.QuoteTargetString()) builder.WriteString(" (") for i, colName := range insertColumns { if i > 0 { diff --git a/pkg/sink/mysql/sql_builder_test.go b/pkg/sink/mysql/sql_builder_test.go index c88dc06830..1cb2247deb 100644 --- a/pkg/sink/mysql/sql_builder_test.go +++ b/pkg/sink/mysql/sql_builder_test.go @@ -182,6 +182,24 @@ func TestBuildInsert(t *testing.T) { require.Equal(t, exportedArgs, args) } +func TestBuildDMLUsesRoutedTargetTable(t *testing.T) { + insert, deleteRow, updateRow, tableInfo := getRowForTest(t) + routedTableInfo := tableInfo.CloneWithRouting("target_db", "target_table") + routedTableInfo.InitPrivateFields() + + insertSQL, _ := buildInsert(routedTableInfo, insert, false) + require.Contains(t, insertSQL, "INSERT INTO `target_db`.`target_table`") + require.NotContains(t, insertSQL, "`test`.`t`") + + deleteSQL, _ := buildDelete(routedTableInfo, deleteRow) + require.Contains(t, deleteSQL, "DELETE FROM `target_db`.`target_table`") + require.NotContains(t, deleteSQL, "`test`.`t`") + + updateSQL, _ := buildUpdate(routedTableInfo, updateRow) + require.Contains(t, updateSQL, "UPDATE `target_db`.`target_table`") + require.NotContains(t, updateSQL, "`test`.`t`") +} + func TestBuildDelete(t *testing.T) { helper := event.NewEventTestHelper(t) defer helper.Close() diff --git a/pkg/sink/sqlmodel/multi_row.go b/pkg/sink/sqlmodel/multi_row.go index 6d6e33c5c8..673a1e7b5d 100644 --- a/pkg/sink/sqlmodel/multi_row.go +++ b/pkg/sink/sqlmodel/multi_row.go @@ -116,7 +116,7 @@ func GenInsertSQL(tp DMLType, changes ...*RowChange) (string, []interface{}) { } else { buf.WriteString("INSERT INTO ") } - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" (") columnNum := 0 var skipColIdx []int @@ -228,7 +228,7 @@ func genDeleteSQLV2(changes ...*RowChange) (string, []interface{}) { var buf strings.Builder buf.Grow(1024) buf.WriteString("DELETE FROM ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" WHERE (") // v2 uses the first row to define the tuple shape of the trailing IN list. @@ -282,7 +282,7 @@ func genUpdateSQLV2(changes ...*RowChange) (string, []any) { // Generate UPDATE `db`.`table` SET first := changes[0] buf.WriteString("UPDATE ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" SET ") // Pre-generate essential sub statements used after WHEN, WHERE. diff --git a/pkg/sink/sqlmodel/multi_row_test.go b/pkg/sink/sqlmodel/multi_row_test.go index b2e474c4d4..4f61e12adf 100644 --- a/pkg/sink/sqlmodel/multi_row_test.go +++ b/pkg/sink/sqlmodel/multi_row_test.go @@ -41,6 +41,67 @@ func TestGenDeleteMultiRows(t *testing.T) { require.Equal(t, []interface{}{1, 3}, args) } +func TestGenMultiRowSQLUsesRoutedTargetTable(t *testing.T) { + sourceTableInfo, routedTableInfo := mockRoutedTableInfo( + t, + "CREATE TABLE tb1 (id INT PRIMARY KEY, name INT)", + "target_db", + "target_tb", + ) + + sourceTable := &sourceTableInfo.TableName + targetTable := &routedTableInfo.TableName + + insertChanges := []*RowChange{ + NewRowChange(sourceTable, targetTable, nil, []interface{}{1, 2}, sourceTableInfo, routedTableInfo, nil), + NewRowChange(sourceTable, targetTable, nil, []interface{}{3, 4}, sourceTableInfo, routedTableInfo, nil), + } + insertSQL, _ := GenInsertSQL(DMLInsert, insertChanges...) + require.Contains(t, insertSQL, "INSERT INTO `target_db`.`target_tb`") + require.NotContains(t, insertSQL, "`db`.`tb1`") + + deleteV2Changes := []*RowChange{ + NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, nil, sourceTableInfo, routedTableInfo, nil), + NewRowChange(sourceTable, targetTable, []interface{}{3, 4}, nil, sourceTableInfo, routedTableInfo, nil), + } + deleteV2SQL, _ := GenDeleteSQL(DefaultWhereClause, deleteV2Changes...) + require.Contains(t, deleteV2SQL, "DELETE FROM `target_db`.`target_tb`") + require.NotContains(t, deleteV2SQL, "`db`.`tb1`") + + updateV2Changes := []*RowChange{ + NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, []interface{}{1, 20}, sourceTableInfo, routedTableInfo, nil), + NewRowChange(sourceTable, targetTable, []interface{}{3, 4}, []interface{}{3, 40}, sourceTableInfo, routedTableInfo, nil), + } + updateV2SQL, _ := GenUpdateSQL(DefaultWhereClause, updateV2Changes...) + require.Contains(t, updateV2SQL, "UPDATE `target_db`.`target_tb`") + require.NotContains(t, updateV2SQL, "`db`.`tb1`") + + nullSourceTableInfo, nullRoutedTableInfo := mockRoutedTableInfo( + t, + "CREATE TABLE tb2 (id INT, name INT)", + "target_db", + "target_tb_v1", + ) + nullSourceTable := &nullSourceTableInfo.TableName + nullTargetTable := &nullRoutedTableInfo.TableName + + deleteV1Changes := []*RowChange{ + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{1, nil}, nil, nullSourceTableInfo, nullRoutedTableInfo, nil), + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{3, 4}, nil, nullSourceTableInfo, nullRoutedTableInfo, nil), + } + deleteV1SQL, _ := GenDeleteSQL(DefaultWhereClause, deleteV1Changes...) + require.Contains(t, deleteV1SQL, "DELETE FROM `target_db`.`target_tb_v1`") + require.NotContains(t, deleteV1SQL, "`db`.`tb2`") + + updateV1Changes := []*RowChange{ + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{1, nil}, []interface{}{1, 20}, nullSourceTableInfo, nullRoutedTableInfo, nil), + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{3, 4}, []interface{}{3, 40}, nullSourceTableInfo, nullRoutedTableInfo, nil), + } + updateV1SQL, _ := GenUpdateSQL(DefaultWhereClause, updateV1Changes...) + require.Contains(t, updateV1SQL, "UPDATE `target_db`.`target_tb_v1`") + require.NotContains(t, updateV1SQL, "`db`.`tb2`") +} + func TestGenDeleteMultiRowsWithNullFallbackToV1(t *testing.T) { t.Parallel() diff --git a/pkg/sink/sqlmodel/multi_row_v1.go b/pkg/sink/sqlmodel/multi_row_v1.go index fbb15f97fb..f9b2b3d38e 100644 --- a/pkg/sink/sqlmodel/multi_row_v1.go +++ b/pkg/sink/sqlmodel/multi_row_v1.go @@ -38,7 +38,7 @@ func genDeleteSQLV1(changes ...*RowChange) (string, []interface{}) { var buf strings.Builder buf.Grow(1024) buf.WriteString("DELETE FROM ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" WHERE (") allArgs := make([]interface{}, 0, len(changes)*CommonIndexColumnsCount) @@ -67,7 +67,7 @@ func genUpdateSQLV1(changes ...*RowChange) (string, []any) { // Generate UPDATE `db`.`table` SET first := changes[0] buf.WriteString("UPDATE ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" SET ") // Pre-generate essential sub statements used after WHEN and in the final WHERE. diff --git a/pkg/sink/sqlmodel/row_change.go b/pkg/sink/sqlmodel/row_change.go index dfd5678e9c..30491cdd46 100644 --- a/pkg/sink/sqlmodel/row_change.go +++ b/pkg/sink/sqlmodel/row_change.go @@ -173,7 +173,7 @@ func (r *RowChange) String() string { // TargetTableID returns a ID string for target table. func (r *RowChange) TargetTableID() string { - return r.targetTable.QuoteString() + return r.targetTable.QuoteTargetString() } // SourceTableInfo returns the TableInfo of source table. @@ -274,7 +274,7 @@ func (r *RowChange) genDeleteSQL() (string, []interface{}) { var buf strings.Builder buf.Grow(1024) buf.WriteString("DELETE FROM ") - buf.WriteString(r.targetTable.QuoteString()) + buf.WriteString(r.targetTable.QuoteTargetString()) buf.WriteString(" WHERE ") whereArgs := r.genWhere(&buf) buf.WriteString(" LIMIT 1") @@ -293,7 +293,7 @@ func (r *RowChange) genUpdateSQL() (string, []interface{}) { var buf strings.Builder buf.Grow(2048) buf.WriteString("UPDATE ") - buf.WriteString(r.targetTable.QuoteString()) + buf.WriteString(r.targetTable.QuoteTargetString()) buf.WriteString(" SET ") // Build target generated columns lower names set to accelerate following check diff --git a/pkg/sink/sqlmodel/row_change_test.go b/pkg/sink/sqlmodel/row_change_test.go index f6d67c5e27..34548481a7 100644 --- a/pkg/sink/sqlmodel/row_change_test.go +++ b/pkg/sink/sqlmodel/row_change_test.go @@ -40,6 +40,11 @@ func mockTableInfo(t *testing.T, sql string) *common.TableInfo { return common.WrapTableInfo("db", rawTi) } +func mockRoutedTableInfo(t *testing.T, createTableSQL, targetSchema, targetTable string) (*common.TableInfo, *common.TableInfo) { + sourceTableInfo := mockTableInfo(t, createTableSQL) + return sourceTableInfo, sourceTableInfo.CloneWithRouting(targetSchema, targetTable) +} + type dpanicSuite struct { suite.Suite } @@ -92,6 +97,34 @@ func TestNewRowChange(t *testing.T) { require.Equal(t, expected, actual) } +func TestGenSQLUsesRoutedTargetTable(t *testing.T) { + sourceTableInfo, routedTableInfo := mockRoutedTableInfo( + t, + "CREATE TABLE tb1 (id INT PRIMARY KEY, name INT)", + "target_db", + "target_tb", + ) + + sourceTable := &sourceTableInfo.TableName + targetTable := &routedTableInfo.TableName + + insertChange := NewRowChange(sourceTable, targetTable, nil, []interface{}{1, 2}, sourceTableInfo, routedTableInfo, nil) + insertSQL, _ := insertChange.GenSQL(DMLInsert) + require.Contains(t, insertSQL, "`target_db`.`target_tb`") + require.NotContains(t, insertSQL, "`db`.`tb1`") + require.Equal(t, "`target_db`.`target_tb`", insertChange.TargetTableID()) + + deleteChange := NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, nil, sourceTableInfo, routedTableInfo, nil) + deleteSQL, _ := deleteChange.GenSQL(DMLDelete) + require.Contains(t, deleteSQL, "DELETE FROM `target_db`.`target_tb`") + require.NotContains(t, deleteSQL, "`db`.`tb1`") + + updateChange := NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, []interface{}{1, 3}, sourceTableInfo, routedTableInfo, nil) + updateSQL, _ := updateChange.GenSQL(DMLUpdate) + require.Contains(t, updateSQL, "UPDATE `target_db`.`target_tb`") + require.NotContains(t, updateSQL, "`db`.`tb1`") +} + func (s *dpanicSuite) TestRowChangeType() { change := &RowChange{preValues: []interface{}{1}} change.calculateType() diff --git a/tests/integration_tests/run_light_it_in_ci.sh b/tests/integration_tests/run_light_it_in_ci.sh index b70dd44e6b..a896e3ea26 100755 --- a/tests/integration_tests/run_light_it_in_ci.sh +++ b/tests/integration_tests/run_light_it_in_ci.sh @@ -48,7 +48,7 @@ mysql_groups=( # G07 'fail_over_ddl_H changefeed_update_config synced_status_with_redo' # G08 - 'capture_session_done_during_task changefeed_dup_error_restart mysql_sink_retry fail_over_ddl_I' + 'capture_session_done_during_task changefeed_dup_error_restart mysql_sink_retry fail_over_ddl_I table_route' # G09 'sequence cdc_server_tips ddl_sequence server_config_compatibility log_redaction fail_over_ddl_J' # G10 diff --git a/tests/integration_tests/table_route/README.md b/tests/integration_tests/table_route/README.md new file mode 100644 index 0000000000..5cccc7c874 --- /dev/null +++ b/tests/integration_tests/table_route/README.md @@ -0,0 +1,35 @@ +# Table Route Integration Test + +This test verifies that table route works correctly for MySQL sinks. + +## What it tests + +1. **Schema rewrite**: Routes `source_db.*` to `target_db.*` +2. **Table rewrite**: Appends `_routed` suffix to table names +3. **DML output**: INSERT, UPDATE, DELETE operations use the routed schema/table +4. **DDL output**: CREATE DATABASE, CREATE TABLE, ALTER TABLE, RENAME TABLE, view DDL, partition DDL, TRUNCATE TABLE, DROP TABLE, and DROP DATABASE use the routed schema/table + +## Configuration + +The test uses the following routing rules in `conf/changefeed.toml`: + +```toml +[[sink.dispatchers]] +matcher = ['source_db.*'] +target-schema = 'target_db' +target-table = '{table}_routed' +``` + +This routes: +- `source_db.users` -> `target_db.users_routed` +- `source_db.orders` -> `target_db.orders_routed` +- etc. + +## Test flow + +1. Create changefeed with table route rules +2. Execute one SQL workload on the upstream source schema +3. Wait until the routed finish marker appears downstream +4. Use sync diff with table route rules to compare upstream source tables and downstream routed target tables +5. Verify dropped and renamed-away target tables do not remain downstream +6. Drop the upstream source database and verify the routed downstream database is gone diff --git a/tests/integration_tests/table_route/conf/changefeed.toml b/tests/integration_tests/table_route/conf/changefeed.toml new file mode 100644 index 0000000000..9b0258e9aa --- /dev/null +++ b/tests/integration_tests/table_route/conf/changefeed.toml @@ -0,0 +1,13 @@ +# Changefeed configuration for table route integration test +# This tests schema and table routing for MySQL sinks + +[filter] +rules = ['source_db.*'] + +[sink] +# Dispatch rules with schema and table routing +# Route source_db.* to target_db.* with table suffix "_routed" +[[sink.dispatchers]] +matcher = ['source_db.*'] +target-schema = 'target_db' +target-table = '{table}_routed' diff --git a/tests/integration_tests/table_route/conf/diff_config.toml b/tests/integration_tests/table_route/conf/diff_config.toml new file mode 100644 index 0000000000..4734113499 --- /dev/null +++ b/tests/integration_tests/table_route/conf/diff_config.toml @@ -0,0 +1,112 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/table_route/sync_diff/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = [ + "target_db.users_routed", + "target_db.orders_routed", + "target_db.products_routed", + "target_db.products_backup_routed", + "target_db.renamed_table_routed", + "target_db.multi_rename_a_new_routed", + "target_db.multi_rename_b_new_routed", + "target_db.partitioned_events_routed", + "target_db.truncate_test_routed", + "target_db.finish_mark_routed", + ] + +[data-sources] +[data-sources.tidb0] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + route-rules = [ + "users", + "orders", + "products", + "products_backup", + "renamed_table", + "multi_rename_a_new", + "multi_rename_b_new", + "partitioned_events", + "truncate_test", + "finish_mark", + ] + +[data-sources.mysql1] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" + +[routes.users] +schema-pattern = "source_db" +table-pattern = "users" +target-schema = "target_db" +target-table = "users_routed" + +[routes.orders] +schema-pattern = "source_db" +table-pattern = "orders" +target-schema = "target_db" +target-table = "orders_routed" + +[routes.products] +schema-pattern = "source_db" +table-pattern = "products" +target-schema = "target_db" +target-table = "products_routed" + +[routes.products_backup] +schema-pattern = "source_db" +table-pattern = "products_backup" +target-schema = "target_db" +target-table = "products_backup_routed" + +[routes.renamed_table] +schema-pattern = "source_db" +table-pattern = "renamed_table" +target-schema = "target_db" +target-table = "renamed_table_routed" + +[routes.multi_rename_a_new] +schema-pattern = "source_db" +table-pattern = "multi_rename_a_new" +target-schema = "target_db" +target-table = "multi_rename_a_new_routed" + +[routes.multi_rename_b_new] +schema-pattern = "source_db" +table-pattern = "multi_rename_b_new" +target-schema = "target_db" +target-table = "multi_rename_b_new_routed" + +[routes.partitioned_events] +schema-pattern = "source_db" +table-pattern = "partitioned_events" +target-schema = "target_db" +target-table = "partitioned_events_routed" + +[routes.truncate_test] +schema-pattern = "source_db" +table-pattern = "truncate_test" +target-schema = "target_db" +target-table = "truncate_test_routed" + +[routes.finish_mark] +schema-pattern = "source_db" +table-pattern = "finish_mark" +target-schema = "target_db" +target-table = "finish_mark_routed" diff --git a/tests/integration_tests/table_route/data/test.sql b/tests/integration_tests/table_route/data/test.sql new file mode 100644 index 0000000000..4491f768d1 --- /dev/null +++ b/tests/integration_tests/table_route/data/test.sql @@ -0,0 +1,196 @@ +-- Test mixed DDL and DML operations for table route. +DROP DATABASE IF EXISTS source_db; +CREATE DATABASE source_db; +USE source_db; + +-- ============================================ +-- DDL: CREATE TABLE with initial DML +-- ============================================ +CREATE TABLE users ( + id INT PRIMARY KEY, + name VARCHAR(100), + email VARCHAR(100) +); + +CREATE TABLE orders ( + id INT PRIMARY KEY, + user_id INT, + amount DECIMAL(10, 2) +); + +INSERT INTO users VALUES (1, 'Alice', 'alice@example.com'); +INSERT INTO users VALUES (2, 'Bob', 'bob@example.com'); + +INSERT INTO orders VALUES (1, 1, 100.00); +INSERT INTO orders VALUES (2, 2, 200.00); + +-- ============================================ +-- DML: INSERT more data +-- ============================================ +INSERT INTO users VALUES (3, 'Charlie', 'charlie@example.com'); +INSERT INTO orders VALUES (3, 3, 300.00); + +-- ============================================ +-- DML: UPDATE data +-- ============================================ +UPDATE users SET email = 'alice_updated@example.com' WHERE id = 1; +UPDATE orders SET amount = 150.00 WHERE id = 1; + +-- ============================================ +-- DML: DELETE data +-- ============================================ +DELETE FROM orders WHERE id = 2; + +-- ============================================ +-- DDL: ALTER TABLE ADD COLUMN +-- ============================================ +ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP; + +-- ============================================ +-- DDL: CREATE TABLE (new table should be routed) +-- ============================================ +CREATE TABLE products ( + id INT PRIMARY KEY, + name VARCHAR(100), + price DECIMAL(10, 2) +); + +-- Widget starts at 29.99 (>= 15.00), so DELETE WHERE price < 15.00 won't affect it +-- unless the UPDATE (price = 12.99) is applied first +INSERT INTO products VALUES (1, 'Widget', 29.99); +INSERT INTO products VALUES (2, 'Gadget', 19.99); + +-- ============================================ +-- DDL: CREATE TABLE LIKE +-- ============================================ +CREATE TABLE products_backup LIKE products; + +INSERT INTO products_backup VALUES (1, 'Widget', 29.99); + +-- ============================================ +-- DDL: ALTER TABLE DROP COLUMN +-- ============================================ +ALTER TABLE users DROP COLUMN created_at; + +-- ============================================ +-- DDL: ALTER TABLE ADD INDEX +-- ============================================ +ALTER TABLE orders ADD INDEX idx_user_id (user_id); + +-- ============================================ +-- DDL: RENAME TABLE +-- ============================================ +CREATE TABLE temp_table ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +INSERT INTO temp_table VALUES (1, 'test'); + +RENAME TABLE temp_table TO renamed_table; + +-- Verify renamed table works with DML +INSERT INTO renamed_table VALUES (2, 'test2'); +UPDATE renamed_table SET value = 'updated' WHERE id = 1; + +-- ============================================ +-- DDL: RENAME TABLE with multiple table pairs +-- ============================================ +CREATE TABLE multi_rename_a ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +CREATE TABLE multi_rename_b ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +INSERT INTO multi_rename_a VALUES (1, 'a'); +INSERT INTO multi_rename_b VALUES (1, 'b'); + +RENAME TABLE multi_rename_a TO multi_rename_a_new, multi_rename_b TO multi_rename_b_new; + +INSERT INTO multi_rename_a_new VALUES (2, 'a2'); +UPDATE multi_rename_b_new SET value = 'b2' WHERE id = 1; + +-- ============================================ +-- DDL: CREATE VIEW and DROP VIEW +-- ============================================ +CREATE VIEW `source_db`.`user_order_view` AS + SELECT `u`.`id`, `u`.`name`, `o`.`amount` + FROM `source_db`.`users` AS `u` + JOIN `source_db`.`orders` AS `o` ON `u`.`id` = `o`.`user_id`; + +CREATE VIEW `source_db`.`transient_view` AS + SELECT `id`, `name` FROM `source_db`.`users`; + +DROP VIEW `source_db`.`transient_view`; + +-- ============================================ +-- DDL: PARTITION TABLE +-- ============================================ +CREATE TABLE partitioned_events ( + id INT PRIMARY KEY, + bucket INT NOT NULL, + value VARCHAR(50) +) PARTITION BY RANGE (bucket) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20) +); + +INSERT INTO partitioned_events VALUES (1, 5, 'p0'); +INSERT INTO partitioned_events VALUES (2, 15, 'p1'); +ALTER TABLE partitioned_events ADD PARTITION (PARTITION p2 VALUES LESS THAN (30)); +INSERT INTO partitioned_events VALUES (3, 25, 'p2'); +ALTER TABLE partitioned_events TRUNCATE PARTITION p0; +INSERT INTO partitioned_events VALUES (4, 6, 'p0_after_truncate'); +ALTER TABLE partitioned_events DROP PARTITION p1; +INSERT INTO partitioned_events VALUES (5, 26, 'p2_more'); + +-- ============================================ +-- DDL: TRUNCATE TABLE +-- ============================================ +CREATE TABLE truncate_test ( + id INT PRIMARY KEY, + data VARCHAR(100) +); +INSERT INTO truncate_test VALUES (1, 'will be truncated'); +INSERT INTO truncate_test VALUES (2, 'also truncated'); + +TRUNCATE TABLE truncate_test; + +-- Insert new data after truncate +INSERT INTO truncate_test VALUES (10, 'after truncate'); + +-- ============================================ +-- DDL: DROP TABLE +-- ============================================ +CREATE TABLE to_be_dropped ( + id INT PRIMARY KEY +); +INSERT INTO to_be_dropped VALUES (1); + +DROP TABLE to_be_dropped; + +-- ============================================ +-- Mixed operations on existing tables +-- ============================================ +-- More inserts +INSERT INTO users VALUES (4, 'Diana', 'diana@example.com'); +INSERT INTO users VALUES (5, 'Eve', 'eve@example.com'); + +-- Batch update +UPDATE users SET name = CONCAT(name, '_v2') WHERE id IN (3, 4); + +-- More deletes +DELETE FROM users WHERE id = 5; + +-- Update with multiple columns +UPDATE products SET name = 'Super Widget', price = 12.99 WHERE id = 1; + +-- Delete with condition +DELETE FROM products WHERE price < 15.00; + +-- ============================================ +-- Create finish marker table +-- ============================================ +CREATE TABLE finish_mark (id INT PRIMARY KEY); +INSERT INTO finish_mark VALUES (1); diff --git a/tests/integration_tests/table_route/run.sh b/tests/integration_tests/table_route/run.sh new file mode 100755 index 0000000000..8f2b6a597f --- /dev/null +++ b/tests/integration_tests/table_route/run.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --cluster-id "$KEYSPACE_NAME" + + SINK_URI="mysql://normal:123456@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/" + cdc_cli_changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" + + run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + check_table_exists target_db.finish_mark_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 120 + + check_table_not_exists source_db.users ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists source_db.orders ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists target_db.temp_table_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists target_db.multi_rename_a_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists target_db.multi_rename_b_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists target_db.to_be_dropped_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "SHOW CREATE VIEW target_db.user_order_view_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "user_order_view_routed" + check_contains "users_routed" + check_contains "orders_routed" + check_table_not_exists target_db.transient_view_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_sql "DROP DATABASE source_db" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_db_not_exists target_db ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + cleanup_process $CDC_BINARY +} + +trap 'stop_test $WORK_DIR' EXIT +run "$@" +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 41025cef84cc8bac3a2e60d81bda14f947720a42 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 18:03:46 +0800 Subject: [PATCH 02/12] adjust tests --- .../routing/router_supported_ddl_test.go | 120 ++++++++++++ pkg/common/event/ddl_event.go | 32 +++- pkg/common/event/ddl_event_test.go | 181 ------------------ pkg/sink/mysql/mysql_writer_test.go | 66 +++---- 4 files changed, 176 insertions(+), 223 deletions(-) diff --git a/downstreamadapter/routing/router_supported_ddl_test.go b/downstreamadapter/routing/router_supported_ddl_test.go index ca33b5f1d8..28b1e98bb0 100644 --- a/downstreamadapter/routing/router_supported_ddl_test.go +++ b/downstreamadapter/routing/router_supported_ddl_test.go @@ -16,10 +16,12 @@ package routing import ( "testing" + "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" ddlutil "github.com/pingcap/tidb/pkg/ddl/util" timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/stretchr/testify/require" ) @@ -1033,8 +1035,126 @@ func TestApplyToDDLEventSupportsCreateTables(t *testing.T) { require.Contains(t, routed.Query, "CREATE TABLE `target_db`.`t1_r`") require.Contains(t, routed.Query, "CREATE TABLE `target_db`.`t2_r`") require.Len(t, routed.MultipleTableInfos, 2) + require.Len(t, ddl.MultipleTableInfos, 2) + require.Equal(t, "source_db", ddl.MultipleTableInfos[0].GetTargetSchemaName()) + require.Equal(t, "t1", ddl.MultipleTableInfos[0].GetTargetTableName()) + require.Equal(t, "source_db", ddl.MultipleTableInfos[1].GetTargetSchemaName()) + require.Equal(t, "t2", ddl.MultipleTableInfos[1].GetTargetTableName()) require.Equal(t, "target_db", routed.MultipleTableInfos[0].GetTargetSchemaName()) require.Equal(t, "t1_r", routed.MultipleTableInfos[0].GetTargetTableName()) require.Equal(t, "target_db", routed.MultipleTableInfos[1].GetTargetSchemaName()) require.Equal(t, "t2_r", routed.MultipleTableInfos[1].GetTargetTableName()) } + +func TestApplyToDDLEventCopiesRoutedEventWithoutMutatingOrigin(t *testing.T) { + router := newTestRouter(t, false, []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: "{table}_r", + }}) + + helper := event.NewEventTestHelper(t) + defer helper.Close() + + schemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") + routedSchema, err := router.ApplyToDDLEvent(schemaDDL) + require.NoError(t, err) + require.Contains(t, routedSchema.Query, "`target_db`") + + ddl := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") + ddl.DispatcherID = common.NewDispatcherID() + ddl.Seq = 1 + ddl.Epoch = 2 + ddl.TiDBOnly = true + ddl.BDRMode = string(ast.BDRRolePrimary) + ddl.PostTxnFlushed = []func(){func() {}, func() {}} + + originalQuery := ddl.Query + originalTableInfo := ddl.TableInfo + routed, err := router.ApplyToDDLEvent(ddl) + require.NoError(t, err) + require.NotSame(t, ddl, routed) + + require.Equal(t, ddl.Version, routed.Version) + require.Equal(t, ddl.DispatcherID, routed.DispatcherID) + require.Equal(t, ddl.Type, routed.Type) + require.Equal(t, ddl.SchemaID, routed.SchemaID) + require.Equal(t, ddl.SchemaName, routed.SchemaName) + require.Equal(t, ddl.TableName, routed.TableName) + require.Equal(t, ddl.FinishedTs, routed.FinishedTs) + require.Equal(t, ddl.Seq, routed.Seq) + require.Equal(t, ddl.Epoch, routed.Epoch) + require.Equal(t, ddl.TiDBOnly, routed.TiDBOnly) + require.Equal(t, ddl.BDRMode, routed.BDRMode) + + require.Equal(t, originalQuery, ddl.Query) + require.Same(t, originalTableInfo, ddl.TableInfo) + require.Equal(t, "source_db", ddl.GetTargetSchemaName()) + require.Equal(t, "source_table", ddl.GetTargetTableName()) + + require.Equal(t, "target_db", routed.GetTargetSchemaName()) + require.Equal(t, "source_table_r", routed.GetTargetTableName()) + require.Contains(t, routed.Query, "`target_db`.`source_table_r`") + require.NotSame(t, originalTableInfo, routed.TableInfo) + require.Equal(t, "target_db", routed.TableInfo.GetTargetSchemaName()) + require.Equal(t, "source_table_r", routed.TableInfo.GetTargetTableName()) + + require.Len(t, routed.PostTxnFlushed, 2) + require.Len(t, ddl.PostTxnFlushed, 2) + require.NotEqual(t, &ddl.PostTxnFlushed[0], &routed.PostTxnFlushed[0]) + routed.AddPostFlushFunc(func() {}) + require.Len(t, routed.PostTxnFlushed, 3) + require.Len(t, ddl.PostTxnFlushed, 2) +} + +func TestApplyToDDLEventRenameTablesGetEventsPreserveSourceAndTargetNames(t *testing.T) { + router := newTestRouter(t, false, []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: "{table}_r", + }}) + + helper := event.NewEventTestHelper(t) + defer helper.Close() + + schemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") + routedSchema, err := router.ApplyToDDLEvent(schemaDDL) + require.NoError(t, err) + require.Contains(t, routedSchema.Query, "`target_db`") + + table1DDL := helper.DDL2Event("CREATE TABLE `source_db`.`old_table1` (`id` INT PRIMARY KEY)") + routedTable1, err := router.ApplyToDDLEvent(table1DDL) + require.NoError(t, err) + require.Contains(t, routedTable1.Query, "`target_db`.`old_table1_r`") + + table2DDL := helper.DDL2Event("CREATE TABLE `source_db`.`old_table2` (`id` INT PRIMARY KEY)") + routedTable2, err := router.ApplyToDDLEvent(table2DDL) + require.NoError(t, err) + require.Contains(t, routedTable2.Query, "`target_db`.`old_table2_r`") + + renameDDL := helper.DDL2Event( + "RENAME TABLE `source_db`.`old_table1` TO `source_db`.`new_table1`, `source_db`.`old_table2` TO `source_db`.`new_table2`") + routed, err := router.ApplyToDDLEvent(renameDDL) + require.NoError(t, err) + + events := routed.GetEvents() + require.Len(t, events, 2) + + require.Equal(t, "source_db", events[0].SchemaName) + require.Equal(t, "new_table1", events[0].TableName) + require.Equal(t, "target_db", events[0].GetTargetSchemaName()) + require.Equal(t, "new_table1_r", events[0].GetTargetTableName()) + require.Equal(t, "source_db", events[0].ExtraSchemaName) + require.Equal(t, "old_table1", events[0].ExtraTableName) + require.Equal(t, "target_db", events[0].GetTargetExtraSchemaName()) + require.Equal(t, "old_table1_r", events[0].GetTargetExtraTableName()) + + require.Equal(t, "source_db", events[1].SchemaName) + require.Equal(t, "new_table2", events[1].TableName) + require.Equal(t, "target_db", events[1].GetTargetSchemaName()) + require.Equal(t, "new_table2_r", events[1].GetTargetTableName()) + require.Equal(t, "source_db", events[1].ExtraSchemaName) + require.Equal(t, "old_table2", events[1].ExtraTableName) + require.Equal(t, "target_db", events[1].GetTargetExtraSchemaName()) + require.Equal(t, "old_table2_r", events[1].GetTargetExtraTableName()) +} diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 599883be92..d39c933154 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -257,19 +257,23 @@ func (d *DDLEvent) GetTableID() int64 { func (d *DDLEvent) GetEvents() []*DDLEvent { // Some ddl event may be multi-events, we need to split it into multiple messages. // Such as rename table test.table1 to test.table10, test.table2 to test.table20 - switch model.ActionType(d.Type) { + actionType := model.ActionType(d.Type) + switch actionType { case model.ActionCreateTables, model.ActionRenameTables: events := make([]*DDLEvent, 0, len(d.MultipleTableInfos)) queries, err := SplitQueries(d.Query) if err != nil { log.Panic("split queries failed", zap.Error(err)) } + if actionType == model.ActionRenameTables && len(queries) == 1 && len(d.MultipleTableInfos) > 1 { + queries = splitRenameTablesQuery(queries[0]) + } if len(queries) != len(d.MultipleTableInfos) { log.Panic("queries length should be equal to multipleTableInfos length", zap.String("query", d.Query), zap.Any("multipleTableInfos", d.MultipleTableInfos)) } t := model.ActionCreateTable - if model.ActionType(d.Type) == model.ActionRenameTables { + if actionType == model.ActionRenameTables { t = model.ActionRenameTable } for i, info := range d.MultipleTableInfos { @@ -285,7 +289,7 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { StartTs: d.StartTs, FinishedTs: d.FinishedTs, } - if model.ActionType(d.Type) == model.ActionRenameTables { + if actionType == model.ActionRenameTables { event.ExtraSchemaName = d.TableNameChange.DropName[i].SchemaName event.ExtraTableName = d.TableNameChange.DropName[i].TableName targetExtraSchemaName, targetExtraTableName := extractRenameTargetExtraFromQuery(queries[i]) @@ -300,6 +304,28 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { return []*DDLEvent{d} } +func splitRenameTablesQuery(query string) []string { + stmt, err := parser.New().ParseOneStmt(query, "", "") + if err != nil { + log.Panic("parse rename tables query failed", zap.String("query", query), zap.Error(err)) + } + renameStmt, ok := stmt.(*ast.RenameTableStmt) + if !ok || len(renameStmt.TableToTables) == 0 { + log.Panic("unexpected rename tables query", zap.String("query", query), zap.Any("stmt", stmt)) + } + + queries := make([]string, 0, len(renameStmt.TableToTables)) + for _, tableToTable := range renameStmt.TableToTables { + singleStmt := &ast.RenameTableStmt{TableToTables: []*ast.TableToTable{tableToTable}} + restoredQuery, err := Restore(singleStmt) + if err != nil { + log.Panic("restore split rename query failed", zap.String("query", query), zap.Error(err)) + } + queries = append(queries, restoredQuery+";") + } + return queries +} + func extractRenameTargetExtraFromQuery(query string) (string, string) { stmt, err := parser.New().ParseOneStmt(query, "", "") if err != nil { diff --git a/pkg/common/event/ddl_event_test.go b/pkg/common/event/ddl_event_test.go index 258ba064d8..41efb79b86 100644 --- a/pkg/common/event/ddl_event_test.go +++ b/pkg/common/event/ddl_event_test.go @@ -521,184 +521,3 @@ CREATE TABLE test2 (id INT); }) } } - -// TestNewRoutedDDLEvent ensures routed DDL construction preserves the origin event -// while producing an independent routed event for downstream use. -func TestNewRoutedDDLEvent(t *testing.T) { - helper := NewEventTestHelper(t) - defer helper.Close() - - helper.Tk().MustExec("CREATE DATABASE `source_db`") - original := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") - - require.NotNil(t, original) - require.NotNil(t, original.TableInfo) - require.Empty(t, original.targetSchemaName) - require.Empty(t, original.targetTableName) - require.Empty(t, original.targetExtraSchemaName) - require.Empty(t, original.targetExtraTableName) - - originalQuery := original.Query - postFlushFunc1 := func() {} - postFlushFunc2 := func() {} - - original.DispatcherID = common.NewDispatcherID() - original.Seq = 1 - original.Epoch = 2 - original.PostTxnFlushed = []func(){postFlushFunc1, postFlushFunc2} - original.TiDBOnly = true - original.BDRMode = "test-mode" - - originalTableInfo := original.TableInfo - newRoutedTableInfo := originalTableInfo.CloneWithRouting("routed_schema", "source_table_routed") - - routed := NewRoutedDDLEvent( - original, - "CREATE TABLE `routed_schema`.`source_table_routed` (`id` INT PRIMARY KEY)", - "routed_schema", - "source_table_routed", - "", - "", - newRoutedTableInfo, - nil, - original.BlockedTableNames, - ) - require.NotNil(t, routed) - - // Verify that the routed event is a separate object. - require.False(t, original == routed, "routed event should be a different object") - - // Verify that non-routing fields are copied as-is. - require.Equal(t, original.Version, routed.Version) - require.Equal(t, original.DispatcherID, routed.DispatcherID) - require.Equal(t, original.Type, routed.Type) - require.Equal(t, original.SchemaID, routed.SchemaID) - require.Equal(t, original.SchemaName, routed.SchemaName) - require.Equal(t, original.TableName, routed.TableName) - require.Equal(t, original.FinishedTs, routed.FinishedTs) - require.Equal(t, original.Seq, routed.Seq) - require.Equal(t, original.Epoch, routed.Epoch) - require.Equal(t, original.TiDBOnly, routed.TiDBOnly) - require.Equal(t, original.BDRMode, routed.BDRMode) - - // Verify that PostTxnFlushed is an independent copy (not shared) - // This is defensive: currently DDL events arrive with nil PostTxnFlushed, - // but we copy it to prevent races if callbacks are ever added before building the routed event. - require.NotNil(t, routed.PostTxnFlushed) - require.Equal(t, 2, len(routed.PostTxnFlushed), "PostTxnFlushed should have same length as original") - require.Equal(t, 2, len(original.PostTxnFlushed), "Original PostTxnFlushed should remain unchanged") - // Verify independent backing arrays. - require.NotEqual(t, &original.PostTxnFlushed[0], &routed.PostTxnFlushed[0], "PostTxnFlushed should have independent backing arrays") - - // Verify that appending to the routed event doesn't affect the original. - routed.AddPostFlushFunc(func() {}) - require.Equal(t, 3, len(routed.PostTxnFlushed), "Routed event should have appended callback") - require.Equal(t, 2, len(original.PostTxnFlushed), "Original should be unaffected by routed event append") - - // Verify that routed state doesn't affect the original. - require.Equal(t, "source_db", original.SchemaName, "Original SchemaName should be unchanged") - require.Equal(t, originalQuery, original.Query, "Original Query should be unchanged") - require.True(t, original.TableInfo == originalTableInfo, "Original TableInfo should be unchanged") - - // Verify that the routed event has the routed state. - require.Equal(t, "routed_schema", routed.GetTargetSchemaName()) - require.Equal(t, "source_table_routed", routed.GetTargetTableName()) - require.Equal(t, "CREATE TABLE `routed_schema`.`source_table_routed` (`id` INT PRIMARY KEY)", routed.Query) - require.True(t, routed.TableInfo == newRoutedTableInfo) - require.Equal(t, "routed_schema", routed.TableInfo.TableName.TargetSchema) - require.Equal(t, original.SchemaName, routed.GetSchemaName()) - require.Equal(t, original.TableName, routed.GetTableName()) - - // Test nil origin event. - var nilEvent *DDLEvent - routedNil := NewRoutedDDLEvent(nilEvent, "", "", "", "", "", nil, nil, nil) - require.Nil(t, routedNil) -} - -func TestNewRoutedDDLEventPreservesSourceFields(t *testing.T) { - helper := NewEventTestHelper(t) - defer helper.Close() - - helper.Tk().MustExec("CREATE DATABASE `source_db`") - helper.Tk().MustExec("CREATE TABLE `source_db`.`old_orders` (`id` INT PRIMARY KEY)") - original := helper.DDL2Event("RENAME TABLE `source_db`.`old_orders` TO `source_db`.`new_orders`") - require.Empty(t, original.targetSchemaName) - require.Empty(t, original.targetTableName) - require.Empty(t, original.targetExtraSchemaName) - require.Empty(t, original.targetExtraTableName) - - routed := NewRoutedDDLEvent( - original, - "RENAME TABLE `target_db_v2`.`old_orders_routed_v2` TO `target_db_v2`.`new_orders_routed_v2`", - "target_db_v2", - "new_orders_routed_v2", - "target_db_v2", - "old_orders_routed_v2", - original.TableInfo.CloneWithRouting("target_db_v2", "new_orders_routed_v2"), - original.MultipleTableInfos, - original.BlockedTableNames, - ) - - require.Equal(t, "source_db", routed.GetSchemaName()) - require.Equal(t, "new_orders", routed.GetTableName()) - require.Equal(t, "source_db", routed.GetExtraSchemaName()) - require.Equal(t, "old_orders", routed.GetExtraTableName()) - require.Equal(t, "target_db_v2", routed.GetTargetSchemaName()) - require.Equal(t, "new_orders_routed_v2", routed.GetTargetTableName()) - require.Equal(t, "target_db_v2", routed.GetTargetExtraSchemaName()) - require.Equal(t, "old_orders_routed_v2", routed.GetTargetExtraTableName()) -} - -func TestGetEventsForRenameTablesPreservesSourceAndTargetNames(t *testing.T) { - helper := NewEventTestHelper(t) - defer helper.Close() - - helper.Tk().MustExec("CREATE DATABASE `rename_db`") - helper.Tk().MustExec("CREATE TABLE `rename_db`.`old_table1` (`id` INT PRIMARY KEY)") - helper.Tk().MustExec("CREATE TABLE `rename_db`.`old_table2` (`id` INT PRIMARY KEY)") - original := helper.DDL2Event( - "RENAME TABLE `rename_db`.`old_table1` TO `rename_db`.`new_table1`, `rename_db`.`old_table2` TO `rename_db`.`new_table2`") - require.Empty(t, original.targetSchemaName) - require.Empty(t, original.targetTableName) - require.Empty(t, original.targetExtraSchemaName) - require.Empty(t, original.targetExtraTableName) - - ddl := NewRoutedDDLEvent( - original, - "RENAME TABLE `old_target_db1`.`old_target_table1` TO `new_target_db1`.`new_target_table1`; RENAME TABLE `old_target_db2`.`old_target_table2` TO `new_target_db2`.`new_target_table2`", - "", - "", - "", - "", - original.TableInfo, - []*common.TableInfo{ - original.MultipleTableInfos[0].CloneWithRouting("new_target_db1", "new_target_table1"), - original.MultipleTableInfos[1].CloneWithRouting("new_target_db2", "new_target_table2"), - }, - []SchemaTableName{ - {SchemaName: "old_target_db1", TableName: "old_target_table1"}, - {SchemaName: "old_target_db2", TableName: "old_target_table2"}, - }, - ) - - events := ddl.GetEvents() - require.Len(t, events, 2) - - require.Equal(t, "rename_db", events[0].SchemaName) - require.Equal(t, "new_table1", events[0].TableName) - require.Equal(t, "new_target_db1", events[0].GetTargetSchemaName()) - require.Equal(t, "new_target_table1", events[0].GetTargetTableName()) - require.Equal(t, "rename_db", events[0].ExtraSchemaName) - require.Equal(t, "old_table1", events[0].ExtraTableName) - require.Equal(t, "old_target_db1", events[0].GetTargetExtraSchemaName()) - require.Equal(t, "old_target_table1", events[0].GetTargetExtraTableName()) - - require.Equal(t, "rename_db", events[1].SchemaName) - require.Equal(t, "new_table2", events[1].TableName) - require.Equal(t, "new_target_db2", events[1].GetTargetSchemaName()) - require.Equal(t, "new_target_table2", events[1].GetTargetTableName()) - require.Equal(t, "rename_db", events[1].ExtraSchemaName) - require.Equal(t, "old_table2", events[1].ExtraTableName) - require.Equal(t, "old_target_db2", events[1].GetTargetExtraSchemaName()) - require.Equal(t, "old_target_table2", events[1].GetTargetExtraTableName()) -} diff --git a/pkg/sink/mysql/mysql_writer_test.go b/pkg/sink/mysql/mysql_writer_test.go index 449d1e9ff2..75930c7677 100644 --- a/pkg/sink/mysql/mysql_writer_test.go +++ b/pkg/sink/mysql/mysql_writer_test.go @@ -25,9 +25,11 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/downstreamadapter/routing" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/config/kerneltype" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/metrics" @@ -355,62 +357,48 @@ func TestMysqlWriter_Flush_EmptyEvents(t *testing.T) { } func TestMysqlWriterExecDDLUsesRoutedSchemaName(t *testing.T) { + router, err := routing.NewRouter( + common.NewChangefeedID4Test("test", "test"), + true, + []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: "{table}_routed", + }}, + ) + require.NoError(t, err) + helper := commonEvent.NewEventTestHelper(t) defer helper.Close() - helper.Tk().MustExec("CREATE DATABASE `source_db`") - helper.Tk().MustExec("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") - alterDDL := helper.DDL2Event("ALTER TABLE `source_db`.`source_table` ADD COLUMN age INT") - require.Equal(t, "source_db", alterDDL.GetTargetSchemaName()) - require.Equal(t, "source_table", alterDDL.GetTargetTableName()) - routedAlterDDL := commonEvent.NewRoutedDDLEvent( - alterDDL, - "ALTER TABLE `target_db`.`target_table` ADD COLUMN age INT", - "target_db", - "target_table", - "", - "", - alterDDL.TableInfo.CloneWithRouting("target_db", "target_table"), - nil, - nil, - ) + createSchemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") + routedCreateSchemaDDL, err := router.ApplyToDDLEvent(createSchemaDDL) + require.NoError(t, err) + require.Equal(t, "target_db", routedCreateSchemaDDL.GetTargetSchemaName()) writer, db, mock := newTestMysqlWriter(t) defer db.Close() mock.ExpectBegin() - mock.ExpectExec("USE `target_db`;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("ALTER TABLE `target_db`.`target_table` ADD COLUMN age INT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(routedCreateSchemaDDL.Query).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - require.NoError(t, writer.execDDL(routedAlterDDL)) + require.NoError(t, writer.execDDL(routedCreateSchemaDDL)) require.NoError(t, mock.ExpectationsWereMet()) - helper.Tk().MustExec("CREATE TABLE `source_db`.`orders_old` (`id` INT PRIMARY KEY)") - renameDDL := helper.DDL2Event("RENAME TABLE `source_db`.`orders_old` TO `source_db`.`orders_new`") - require.Equal(t, "source_db", renameDDL.GetTargetSchemaName()) - require.Equal(t, "orders_new", renameDDL.GetTargetTableName()) - require.Equal(t, "source_db", renameDDL.GetTargetExtraSchemaName()) - require.Equal(t, "orders_old", renameDDL.GetTargetExtraTableName()) - routedRenameDDL := commonEvent.NewRoutedDDLEvent( - renameDDL, - "RENAME TABLE `old_target_db`.`orders_old` TO `new_target_db`.`orders_new`", - "new_target_db", - "orders_new", - "old_target_db", - "orders_old", - renameDDL.TableInfo.CloneWithRouting("new_target_db", "orders_new"), - nil, - nil, - ) + createTableDDL := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") + routedCreateTableDDL, err := router.ApplyToDDLEvent(createTableDDL) + require.NoError(t, err) + require.Equal(t, "target_db", routedCreateTableDDL.GetTargetSchemaName()) + require.Equal(t, "source_table_routed", routedCreateTableDDL.GetTargetTableName()) writer, db, mock = newTestMysqlWriter(t) defer db.Close() mock.ExpectBegin() - mock.ExpectExec("USE `new_target_db`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("USE `target_db`;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("RENAME TABLE `old_target_db`.`orders_old` TO `new_target_db`.`orders_new`").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(routedCreateTableDDL.Query).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - require.NoError(t, writer.execDDL(routedRenameDDL)) + require.NoError(t, writer.execDDL(routedCreateTableDDL)) require.NoError(t, mock.ExpectationsWereMet()) } From ea7ed99730ad33d86a06fa5b7872782c7eb36bf0 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 18:21:37 +0800 Subject: [PATCH 03/12] adjust tests --- .../routing/router_supported_ddl_test.go | 186 +++++++----------- 1 file changed, 69 insertions(+), 117 deletions(-) diff --git a/downstreamadapter/routing/router_supported_ddl_test.go b/downstreamadapter/routing/router_supported_ddl_test.go index 28b1e98bb0..90c18ad1a0 100644 --- a/downstreamadapter/routing/router_supported_ddl_test.go +++ b/downstreamadapter/routing/router_supported_ddl_test.go @@ -1009,44 +1009,7 @@ func TestRewriteDDLQueryWithRoutingSupportsParserBackedDDLTypes(t *testing.T) { } } -func TestApplyToDDLEventSupportsCreateTables(t *testing.T) { - router := newTestRouter(t, false, []*config.DispatchRule{{ - Matcher: []string{"source_db.*"}, - TargetSchema: "target_db", - TargetTable: "{table}_r", - }}) - - helper := event.NewEventTestHelper(t) - defer helper.Close() - schemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") - // TiDB ActionCreateTables is same-schema only. Cross-schema CREATE TABLE - // statements are emitted as separate DDL jobs upstream, not one ActionCreateTables event. - ddl := helper.BatchCreateTableDDLs2Event("source_db", - "CREATE TABLE `source_db`.`t1` (`id` INT PRIMARY KEY)", - "CREATE TABLE `source_db`.`t2` (`id` INT PRIMARY KEY)", - ) - - routedSchema, err := router.ApplyToDDLEvent(schemaDDL) - require.NoError(t, err) - require.Contains(t, routedSchema.Query, "`target_db`") - - routed, err := router.ApplyToDDLEvent(ddl) - require.NoError(t, err) - require.Contains(t, routed.Query, "CREATE TABLE `target_db`.`t1_r`") - require.Contains(t, routed.Query, "CREATE TABLE `target_db`.`t2_r`") - require.Len(t, routed.MultipleTableInfos, 2) - require.Len(t, ddl.MultipleTableInfos, 2) - require.Equal(t, "source_db", ddl.MultipleTableInfos[0].GetTargetSchemaName()) - require.Equal(t, "t1", ddl.MultipleTableInfos[0].GetTargetTableName()) - require.Equal(t, "source_db", ddl.MultipleTableInfos[1].GetTargetSchemaName()) - require.Equal(t, "t2", ddl.MultipleTableInfos[1].GetTargetTableName()) - require.Equal(t, "target_db", routed.MultipleTableInfos[0].GetTargetSchemaName()) - require.Equal(t, "t1_r", routed.MultipleTableInfos[0].GetTargetTableName()) - require.Equal(t, "target_db", routed.MultipleTableInfos[1].GetTargetSchemaName()) - require.Equal(t, "t2_r", routed.MultipleTableInfos[1].GetTargetTableName()) -} - -func TestApplyToDDLEventCopiesRoutedEventWithoutMutatingOrigin(t *testing.T) { +func TestApplyToDDLEventRoutesDDLEventMetadata(t *testing.T) { router := newTestRouter(t, false, []*config.DispatchRule{{ Matcher: []string{"source_db.*"}, TargetSchema: "target_db", @@ -1061,100 +1024,89 @@ func TestApplyToDDLEventCopiesRoutedEventWithoutMutatingOrigin(t *testing.T) { require.NoError(t, err) require.Contains(t, routedSchema.Query, "`target_db`") - ddl := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") - ddl.DispatcherID = common.NewDispatcherID() - ddl.Seq = 1 - ddl.Epoch = 2 - ddl.TiDBOnly = true - ddl.BDRMode = string(ast.BDRRolePrimary) - ddl.PostTxnFlushed = []func(){func() {}, func() {}} - - originalQuery := ddl.Query - originalTableInfo := ddl.TableInfo - routed, err := router.ApplyToDDLEvent(ddl) + singleCreateDDL := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") + singleCreateDDL.DispatcherID = common.NewDispatcherID() + singleCreateDDL.Seq = 1 + singleCreateDDL.Epoch = 2 + singleCreateDDL.TiDBOnly = true + singleCreateDDL.BDRMode = string(ast.BDRRolePrimary) + singleCreateDDL.PostTxnFlushed = []func(){func() {}, func() {}} + + originalQuery := singleCreateDDL.Query + originalTableInfo := singleCreateDDL.TableInfo + routedSingleCreate, err := router.ApplyToDDLEvent(singleCreateDDL) require.NoError(t, err) - require.NotSame(t, ddl, routed) - - require.Equal(t, ddl.Version, routed.Version) - require.Equal(t, ddl.DispatcherID, routed.DispatcherID) - require.Equal(t, ddl.Type, routed.Type) - require.Equal(t, ddl.SchemaID, routed.SchemaID) - require.Equal(t, ddl.SchemaName, routed.SchemaName) - require.Equal(t, ddl.TableName, routed.TableName) - require.Equal(t, ddl.FinishedTs, routed.FinishedTs) - require.Equal(t, ddl.Seq, routed.Seq) - require.Equal(t, ddl.Epoch, routed.Epoch) - require.Equal(t, ddl.TiDBOnly, routed.TiDBOnly) - require.Equal(t, ddl.BDRMode, routed.BDRMode) - - require.Equal(t, originalQuery, ddl.Query) - require.Same(t, originalTableInfo, ddl.TableInfo) - require.Equal(t, "source_db", ddl.GetTargetSchemaName()) - require.Equal(t, "source_table", ddl.GetTargetTableName()) - - require.Equal(t, "target_db", routed.GetTargetSchemaName()) - require.Equal(t, "source_table_r", routed.GetTargetTableName()) - require.Contains(t, routed.Query, "`target_db`.`source_table_r`") - require.NotSame(t, originalTableInfo, routed.TableInfo) - require.Equal(t, "target_db", routed.TableInfo.GetTargetSchemaName()) - require.Equal(t, "source_table_r", routed.TableInfo.GetTargetTableName()) - - require.Len(t, routed.PostTxnFlushed, 2) - require.Len(t, ddl.PostTxnFlushed, 2) - require.NotEqual(t, &ddl.PostTxnFlushed[0], &routed.PostTxnFlushed[0]) - routed.AddPostFlushFunc(func() {}) - require.Len(t, routed.PostTxnFlushed, 3) - require.Len(t, ddl.PostTxnFlushed, 2) -} - -func TestApplyToDDLEventRenameTablesGetEventsPreserveSourceAndTargetNames(t *testing.T) { - router := newTestRouter(t, false, []*config.DispatchRule{{ - Matcher: []string{"source_db.*"}, - TargetSchema: "target_db", - TargetTable: "{table}_r", - }}) - - helper := event.NewEventTestHelper(t) - defer helper.Close() - - schemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") - routedSchema, err := router.ApplyToDDLEvent(schemaDDL) - require.NoError(t, err) - require.Contains(t, routedSchema.Query, "`target_db`") - - table1DDL := helper.DDL2Event("CREATE TABLE `source_db`.`old_table1` (`id` INT PRIMARY KEY)") - routedTable1, err := router.ApplyToDDLEvent(table1DDL) - require.NoError(t, err) - require.Contains(t, routedTable1.Query, "`target_db`.`old_table1_r`") - - table2DDL := helper.DDL2Event("CREATE TABLE `source_db`.`old_table2` (`id` INT PRIMARY KEY)") - routedTable2, err := router.ApplyToDDLEvent(table2DDL) + require.NotSame(t, singleCreateDDL, routedSingleCreate) + require.Equal(t, singleCreateDDL.Version, routedSingleCreate.Version) + require.Equal(t, singleCreateDDL.DispatcherID, routedSingleCreate.DispatcherID) + require.Equal(t, singleCreateDDL.Type, routedSingleCreate.Type) + require.Equal(t, singleCreateDDL.SchemaID, routedSingleCreate.SchemaID) + require.Equal(t, singleCreateDDL.SchemaName, routedSingleCreate.SchemaName) + require.Equal(t, singleCreateDDL.TableName, routedSingleCreate.TableName) + require.Equal(t, singleCreateDDL.FinishedTs, routedSingleCreate.FinishedTs) + require.Equal(t, singleCreateDDL.Seq, routedSingleCreate.Seq) + require.Equal(t, singleCreateDDL.Epoch, routedSingleCreate.Epoch) + require.Equal(t, singleCreateDDL.TiDBOnly, routedSingleCreate.TiDBOnly) + require.Equal(t, singleCreateDDL.BDRMode, routedSingleCreate.BDRMode) + require.Equal(t, originalQuery, singleCreateDDL.Query) + require.Same(t, originalTableInfo, singleCreateDDL.TableInfo) + require.Equal(t, "source_db", singleCreateDDL.GetTargetSchemaName()) + require.Equal(t, "source_table", singleCreateDDL.GetTargetTableName()) + require.Equal(t, "target_db", routedSingleCreate.GetTargetSchemaName()) + require.Equal(t, "source_table_r", routedSingleCreate.GetTargetTableName()) + require.Contains(t, routedSingleCreate.Query, "`target_db`.`source_table_r`") + require.NotSame(t, originalTableInfo, routedSingleCreate.TableInfo) + require.Equal(t, "target_db", routedSingleCreate.TableInfo.GetTargetSchemaName()) + require.Equal(t, "source_table_r", routedSingleCreate.TableInfo.GetTargetTableName()) + require.Len(t, routedSingleCreate.PostTxnFlushed, 2) + require.Len(t, singleCreateDDL.PostTxnFlushed, 2) + require.NotEqual(t, &singleCreateDDL.PostTxnFlushed[0], &routedSingleCreate.PostTxnFlushed[0]) + routedSingleCreate.AddPostFlushFunc(func() {}) + require.Len(t, routedSingleCreate.PostTxnFlushed, 3) + require.Len(t, singleCreateDDL.PostTxnFlushed, 2) + + createTablesDDL := helper.BatchCreateTableDDLs2Event("source_db", + "CREATE TABLE `source_db`.`t1` (`id` INT PRIMARY KEY)", + "CREATE TABLE `source_db`.`t2` (`id` INT PRIMARY KEY)", + ) + routedCreateTables, err := router.ApplyToDDLEvent(createTablesDDL) require.NoError(t, err) - require.Contains(t, routedTable2.Query, "`target_db`.`old_table2_r`") + require.Contains(t, routedCreateTables.Query, "CREATE TABLE `target_db`.`t1_r`") + require.Contains(t, routedCreateTables.Query, "CREATE TABLE `target_db`.`t2_r`") + require.Len(t, createTablesDDL.MultipleTableInfos, 2) + require.Len(t, routedCreateTables.MultipleTableInfos, 2) + require.Equal(t, "source_db", createTablesDDL.MultipleTableInfos[0].GetTargetSchemaName()) + require.Equal(t, "t1", createTablesDDL.MultipleTableInfos[0].GetTargetTableName()) + require.Equal(t, "source_db", createTablesDDL.MultipleTableInfos[1].GetTargetSchemaName()) + require.Equal(t, "t2", createTablesDDL.MultipleTableInfos[1].GetTargetTableName()) + require.Equal(t, "target_db", routedCreateTables.MultipleTableInfos[0].GetTargetSchemaName()) + require.Equal(t, "t1_r", routedCreateTables.MultipleTableInfos[0].GetTargetTableName()) + require.Equal(t, "target_db", routedCreateTables.MultipleTableInfos[1].GetTargetSchemaName()) + require.Equal(t, "t2_r", routedCreateTables.MultipleTableInfos[1].GetTargetTableName()) renameDDL := helper.DDL2Event( - "RENAME TABLE `source_db`.`old_table1` TO `source_db`.`new_table1`, `source_db`.`old_table2` TO `source_db`.`new_table2`") - routed, err := router.ApplyToDDLEvent(renameDDL) + "RENAME TABLE `source_db`.`t1` TO `source_db`.`t1_new`, `source_db`.`t2` TO `source_db`.`t2_new`") + routedRename, err := router.ApplyToDDLEvent(renameDDL) require.NoError(t, err) - events := routed.GetEvents() + events := routedRename.GetEvents() require.Len(t, events, 2) require.Equal(t, "source_db", events[0].SchemaName) - require.Equal(t, "new_table1", events[0].TableName) + require.Equal(t, "t1_new", events[0].TableName) require.Equal(t, "target_db", events[0].GetTargetSchemaName()) - require.Equal(t, "new_table1_r", events[0].GetTargetTableName()) + require.Equal(t, "t1_new_r", events[0].GetTargetTableName()) require.Equal(t, "source_db", events[0].ExtraSchemaName) - require.Equal(t, "old_table1", events[0].ExtraTableName) + require.Equal(t, "t1", events[0].ExtraTableName) require.Equal(t, "target_db", events[0].GetTargetExtraSchemaName()) - require.Equal(t, "old_table1_r", events[0].GetTargetExtraTableName()) + require.Equal(t, "t1_r", events[0].GetTargetExtraTableName()) require.Equal(t, "source_db", events[1].SchemaName) - require.Equal(t, "new_table2", events[1].TableName) + require.Equal(t, "t2_new", events[1].TableName) require.Equal(t, "target_db", events[1].GetTargetSchemaName()) - require.Equal(t, "new_table2_r", events[1].GetTargetTableName()) + require.Equal(t, "t2_new_r", events[1].GetTargetTableName()) require.Equal(t, "source_db", events[1].ExtraSchemaName) - require.Equal(t, "old_table2", events[1].ExtraTableName) + require.Equal(t, "t2", events[1].ExtraTableName) require.Equal(t, "target_db", events[1].GetTargetExtraSchemaName()) - require.Equal(t, "old_table2_r", events[1].GetTargetExtraTableName()) + require.Equal(t, "t2_r", events[1].GetTargetExtraTableName()) } From 95730109fae358759393c94dd5c85ea73573fe7a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 23:11:14 +0800 Subject: [PATCH 04/12] adjust more code --- .../routing/router_supported_ddl_test.go | 26 -------------- pkg/common/event/ddl_event.go | 32 ++--------------- pkg/sink/sqlmodel/multi_row_test.go | 2 ++ tests/integration_tests/table_route/README.md | 35 ------------------- 4 files changed, 5 insertions(+), 90 deletions(-) delete mode 100644 tests/integration_tests/table_route/README.md diff --git a/downstreamadapter/routing/router_supported_ddl_test.go b/downstreamadapter/routing/router_supported_ddl_test.go index 90c18ad1a0..8742e3d817 100644 --- a/downstreamadapter/routing/router_supported_ddl_test.go +++ b/downstreamadapter/routing/router_supported_ddl_test.go @@ -1083,30 +1083,4 @@ func TestApplyToDDLEventRoutesDDLEventMetadata(t *testing.T) { require.Equal(t, "t1_r", routedCreateTables.MultipleTableInfos[0].GetTargetTableName()) require.Equal(t, "target_db", routedCreateTables.MultipleTableInfos[1].GetTargetSchemaName()) require.Equal(t, "t2_r", routedCreateTables.MultipleTableInfos[1].GetTargetTableName()) - - renameDDL := helper.DDL2Event( - "RENAME TABLE `source_db`.`t1` TO `source_db`.`t1_new`, `source_db`.`t2` TO `source_db`.`t2_new`") - routedRename, err := router.ApplyToDDLEvent(renameDDL) - require.NoError(t, err) - - events := routedRename.GetEvents() - require.Len(t, events, 2) - - require.Equal(t, "source_db", events[0].SchemaName) - require.Equal(t, "t1_new", events[0].TableName) - require.Equal(t, "target_db", events[0].GetTargetSchemaName()) - require.Equal(t, "t1_new_r", events[0].GetTargetTableName()) - require.Equal(t, "source_db", events[0].ExtraSchemaName) - require.Equal(t, "t1", events[0].ExtraTableName) - require.Equal(t, "target_db", events[0].GetTargetExtraSchemaName()) - require.Equal(t, "t1_r", events[0].GetTargetExtraTableName()) - - require.Equal(t, "source_db", events[1].SchemaName) - require.Equal(t, "t2_new", events[1].TableName) - require.Equal(t, "target_db", events[1].GetTargetSchemaName()) - require.Equal(t, "t2_new_r", events[1].GetTargetTableName()) - require.Equal(t, "source_db", events[1].ExtraSchemaName) - require.Equal(t, "t2", events[1].ExtraTableName) - require.Equal(t, "target_db", events[1].GetTargetExtraSchemaName()) - require.Equal(t, "t2_r", events[1].GetTargetExtraTableName()) } diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index d39c933154..599883be92 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -257,23 +257,19 @@ func (d *DDLEvent) GetTableID() int64 { func (d *DDLEvent) GetEvents() []*DDLEvent { // Some ddl event may be multi-events, we need to split it into multiple messages. // Such as rename table test.table1 to test.table10, test.table2 to test.table20 - actionType := model.ActionType(d.Type) - switch actionType { + switch model.ActionType(d.Type) { case model.ActionCreateTables, model.ActionRenameTables: events := make([]*DDLEvent, 0, len(d.MultipleTableInfos)) queries, err := SplitQueries(d.Query) if err != nil { log.Panic("split queries failed", zap.Error(err)) } - if actionType == model.ActionRenameTables && len(queries) == 1 && len(d.MultipleTableInfos) > 1 { - queries = splitRenameTablesQuery(queries[0]) - } if len(queries) != len(d.MultipleTableInfos) { log.Panic("queries length should be equal to multipleTableInfos length", zap.String("query", d.Query), zap.Any("multipleTableInfos", d.MultipleTableInfos)) } t := model.ActionCreateTable - if actionType == model.ActionRenameTables { + if model.ActionType(d.Type) == model.ActionRenameTables { t = model.ActionRenameTable } for i, info := range d.MultipleTableInfos { @@ -289,7 +285,7 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { StartTs: d.StartTs, FinishedTs: d.FinishedTs, } - if actionType == model.ActionRenameTables { + if model.ActionType(d.Type) == model.ActionRenameTables { event.ExtraSchemaName = d.TableNameChange.DropName[i].SchemaName event.ExtraTableName = d.TableNameChange.DropName[i].TableName targetExtraSchemaName, targetExtraTableName := extractRenameTargetExtraFromQuery(queries[i]) @@ -304,28 +300,6 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { return []*DDLEvent{d} } -func splitRenameTablesQuery(query string) []string { - stmt, err := parser.New().ParseOneStmt(query, "", "") - if err != nil { - log.Panic("parse rename tables query failed", zap.String("query", query), zap.Error(err)) - } - renameStmt, ok := stmt.(*ast.RenameTableStmt) - if !ok || len(renameStmt.TableToTables) == 0 { - log.Panic("unexpected rename tables query", zap.String("query", query), zap.Any("stmt", stmt)) - } - - queries := make([]string, 0, len(renameStmt.TableToTables)) - for _, tableToTable := range renameStmt.TableToTables { - singleStmt := &ast.RenameTableStmt{TableToTables: []*ast.TableToTable{tableToTable}} - restoredQuery, err := Restore(singleStmt) - if err != nil { - log.Panic("restore split rename query failed", zap.String("query", query), zap.Error(err)) - } - queries = append(queries, restoredQuery+";") - } - return queries -} - func extractRenameTargetExtraFromQuery(query string) (string, string) { stmt, err := parser.New().ParseOneStmt(query, "", "") if err != nil { diff --git a/pkg/sink/sqlmodel/multi_row_test.go b/pkg/sink/sqlmodel/multi_row_test.go index 4f61e12adf..f552d9cf86 100644 --- a/pkg/sink/sqlmodel/multi_row_test.go +++ b/pkg/sink/sqlmodel/multi_row_test.go @@ -42,6 +42,8 @@ func TestGenDeleteMultiRows(t *testing.T) { } func TestGenMultiRowSQLUsesRoutedTargetTable(t *testing.T) { + t.Parallel() + sourceTableInfo, routedTableInfo := mockRoutedTableInfo( t, "CREATE TABLE tb1 (id INT PRIMARY KEY, name INT)", diff --git a/tests/integration_tests/table_route/README.md b/tests/integration_tests/table_route/README.md deleted file mode 100644 index 5cccc7c874..0000000000 --- a/tests/integration_tests/table_route/README.md +++ /dev/null @@ -1,35 +0,0 @@ -# Table Route Integration Test - -This test verifies that table route works correctly for MySQL sinks. - -## What it tests - -1. **Schema rewrite**: Routes `source_db.*` to `target_db.*` -2. **Table rewrite**: Appends `_routed` suffix to table names -3. **DML output**: INSERT, UPDATE, DELETE operations use the routed schema/table -4. **DDL output**: CREATE DATABASE, CREATE TABLE, ALTER TABLE, RENAME TABLE, view DDL, partition DDL, TRUNCATE TABLE, DROP TABLE, and DROP DATABASE use the routed schema/table - -## Configuration - -The test uses the following routing rules in `conf/changefeed.toml`: - -```toml -[[sink.dispatchers]] -matcher = ['source_db.*'] -target-schema = 'target_db' -target-table = '{table}_routed' -``` - -This routes: -- `source_db.users` -> `target_db.users_routed` -- `source_db.orders` -> `target_db.orders_routed` -- etc. - -## Test flow - -1. Create changefeed with table route rules -2. Execute one SQL workload on the upstream source schema -3. Wait until the routed finish marker appears downstream -4. Use sync diff with table route rules to compare upstream source tables and downstream routed target tables -5. Verify dropped and renamed-away target tables do not remain downstream -6. Drop the upstream source database and verify the routed downstream database is gone From 0ca311213591b984924be11b8b9cd8906e4aa48b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 23:42:53 +0800 Subject: [PATCH 05/12] fix test, the ddl may does not have default schema name --- downstreamadapter/routing/ddl_query_rewriter.go | 17 +++++++++++++++-- downstreamadapter/routing/router_apply_test.go | 2 +- .../routing/router_supported_ddl_test.go | 6 +++++- .../integration_tests/table_route/data/test.sql | 5 +++-- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/downstreamadapter/routing/ddl_query_rewriter.go b/downstreamadapter/routing/ddl_query_rewriter.go index 3fce06c0f6..9bab897cb2 100644 --- a/downstreamadapter/routing/ddl_query_rewriter.go +++ b/downstreamadapter/routing/ddl_query_rewriter.go @@ -41,7 +41,7 @@ func (r Router) rewriteParserBackedDDLQuery(ddl *commonEvent.DDLEvent) (string, ) for i := range queries { query := queries[i] - newQuery, err := r.rewriteSingleDDLQuery(query) + newQuery, err := r.rewriteSingleDDLQuery(query, ddl.GetSchemaName()) if err != nil { return "", err } @@ -78,7 +78,7 @@ func splitMultiStmtDDLQuery(query string) ([]string, error) { return queries, nil } -func (r Router) rewriteSingleDDLQuery(query string) (string, error) { +func (r Router) rewriteSingleDDLQuery(query string, defaultSchema string) (string, error) { p := parser.New() stmt, err := p.ParseOneStmt(query, "", "") if err != nil { @@ -89,6 +89,7 @@ func (r Router) rewriteSingleDDLQuery(query string) (string, error) { if len(sourceTables) == 0 { return query, nil } + fillDefaultSchema(sourceTables, defaultSchema) var ( routed bool @@ -119,6 +120,18 @@ func (r Router) rewriteSingleDDLQuery(query string) (string, error) { return newQuery, nil } +func fillDefaultSchema(tables []commonEvent.SchemaTableName, defaultSchema string) { + if defaultSchema == "" { + return + } + + for i := range tables { + if tables[i].SchemaName == "" && tables[i].TableName != "" { + tables[i].SchemaName = defaultSchema + } + } +} + // tableNameExtractor extracts table names from DDL AST nodes. // ref: https://github.com/pingcap/tidb/blob/09feccb529be2830944e11f5fed474020f50370f/server/sql_info_fetcher.go#L46 type tableNameExtractor struct { diff --git a/downstreamadapter/routing/router_apply_test.go b/downstreamadapter/routing/router_apply_test.go index de97c77c57..4dfce5cef0 100644 --- a/downstreamadapter/routing/router_apply_test.go +++ b/downstreamadapter/routing/router_apply_test.go @@ -742,7 +742,7 @@ func TestRewriteParserBackedDDLQueryError(t *testing.T) { TargetTable: TablePlaceholder, }}) - _, err := router.rewriteSingleDDLQuery("INVALID SQL !!!") + _, err := router.rewriteSingleDDLQuery("INVALID SQL !!!", "") code, ok := errors.RFCCode(err) require.True(t, ok) require.Equal(t, errors.ErrTableRoutingFailed.RFCCode(), code) diff --git a/downstreamadapter/routing/router_supported_ddl_test.go b/downstreamadapter/routing/router_supported_ddl_test.go index 8742e3d817..e3f5765aca 100644 --- a/downstreamadapter/routing/router_supported_ddl_test.go +++ b/downstreamadapter/routing/router_supported_ddl_test.go @@ -1024,7 +1024,8 @@ func TestApplyToDDLEventRoutesDDLEventMetadata(t *testing.T) { require.NoError(t, err) require.Contains(t, routedSchema.Query, "`target_db`") - singleCreateDDL := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") + helper.Tk().MustExec("USE `source_db`") + singleCreateDDL := helper.DDL2Event("CREATE TABLE `source_table` (`id` INT PRIMARY KEY)") singleCreateDDL.DispatcherID = common.NewDispatcherID() singleCreateDDL.Seq = 1 singleCreateDDL.Epoch = 2 @@ -1034,6 +1035,8 @@ func TestApplyToDDLEventRoutesDDLEventMetadata(t *testing.T) { originalQuery := singleCreateDDL.Query originalTableInfo := singleCreateDDL.TableInfo + require.Equal(t, "source_db", singleCreateDDL.SchemaName) + require.NotContains(t, originalQuery, "`source_db`") routedSingleCreate, err := router.ApplyToDDLEvent(singleCreateDDL) require.NoError(t, err) require.NotSame(t, singleCreateDDL, routedSingleCreate) @@ -1055,6 +1058,7 @@ func TestApplyToDDLEventRoutesDDLEventMetadata(t *testing.T) { require.Equal(t, "target_db", routedSingleCreate.GetTargetSchemaName()) require.Equal(t, "source_table_r", routedSingleCreate.GetTargetTableName()) require.Contains(t, routedSingleCreate.Query, "`target_db`.`source_table_r`") + require.NotContains(t, routedSingleCreate.Query, "`source_db`") require.NotSame(t, originalTableInfo, routedSingleCreate.TableInfo) require.Equal(t, "target_db", routedSingleCreate.TableInfo.GetTargetSchemaName()) require.Equal(t, "source_table_r", routedSingleCreate.TableInfo.GetTargetTableName()) diff --git a/tests/integration_tests/table_route/data/test.sql b/tests/integration_tests/table_route/data/test.sql index 4491f768d1..7432add2e4 100644 --- a/tests/integration_tests/table_route/data/test.sql +++ b/tests/integration_tests/table_route/data/test.sql @@ -128,9 +128,10 @@ DROP VIEW `source_db`.`transient_view`; -- DDL: PARTITION TABLE -- ============================================ CREATE TABLE partitioned_events ( - id INT PRIMARY KEY, + id INT, bucket INT NOT NULL, - value VARCHAR(50) + value VARCHAR(50), + PRIMARY KEY (id, bucket) ) PARTITION BY RANGE (bucket) ( PARTITION p0 VALUES LESS THAN (10), PARTITION p1 VALUES LESS THAN (20) From d641c7f16b0bc03d4ffa39dade66b18822a3e04b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 23:47:50 +0800 Subject: [PATCH 06/12] fix integration test --- tests/integration_tests/table_route/run.sh | 44 +++++++++++----------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/tests/integration_tests/table_route/run.sh b/tests/integration_tests/table_route/run.sh index 8f2b6a597f..3859f72916 100755 --- a/tests/integration_tests/table_route/run.sh +++ b/tests/integration_tests/table_route/run.sh @@ -3,49 +3,49 @@ set -eu CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME +source "$CUR/../_utils/test_prepare" +WORK_DIR="$OUT_DIR/$TEST_NAME" CDC_BINARY=cdc.test -SINK_TYPE=$1 +SINK_TYPE="$1" function run() { if [ "$SINK_TYPE" != "mysql" ]; then return fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR + rm -rf "$WORK_DIR" && mkdir -p "$WORK_DIR" - start_tidb_cluster --workdir $WORK_DIR + start_tidb_cluster --workdir "$WORK_DIR" - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --cluster-id "$KEYSPACE_NAME" + run_cdc_server --workdir "$WORK_DIR" --binary "$CDC_BINARY" --cluster-id "$KEYSPACE_NAME" SINK_URI="mysql://normal:123456@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/" cdc_cli_changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" - run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file "$CUR/data/test.sql" "$UP_TIDB_HOST" "$UP_TIDB_PORT" - check_table_exists target_db.finish_mark_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 120 + check_table_exists target_db.finish_mark_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" 90 + check_sync_diff "$WORK_DIR" "$CUR/conf/diff_config.toml" 120 - check_table_not_exists source_db.users ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists source_db.orders ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.temp_table_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.multi_rename_a_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.multi_rename_b_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_not_exists target_db.to_be_dropped_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - run_sql "SHOW CREATE VIEW target_db.user_order_view_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists source_db.users "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists source_db.orders "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.temp_table_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.multi_rename_a_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.multi_rename_b_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.to_be_dropped_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + run_sql "SHOW CREATE VIEW target_db.user_order_view_routed" "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" check_contains "user_order_view_routed" check_contains "users_routed" check_contains "orders_routed" - check_table_not_exists target_db.transient_view_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists target_db.transient_view_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" - run_sql "DROP DATABASE source_db" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_db_not_exists target_db ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + run_sql "DROP DATABASE source_db" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + check_db_not_exists target_db "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" 90 - cleanup_process $CDC_BINARY + cleanup_process "$CDC_BINARY" } -trap 'stop_test $WORK_DIR' EXIT +trap 'stop_test "$WORK_DIR"' EXIT run "$@" -check_logs $WORK_DIR +check_logs "$WORK_DIR" echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 3a85d403dea3403c98ac098231fb7b1800bf1a0d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 23:54:53 +0800 Subject: [PATCH 07/12] add cross db tests --- .../table_route/conf/changefeed.toml | 7 ++++++- .../table_route/conf/diff_config.toml | 16 ++++++++++++++++ .../table_route/data/test.sql | 18 ++++++++++++++++++ tests/integration_tests/table_route/run.sh | 4 ++++ 4 files changed, 44 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests/table_route/conf/changefeed.toml b/tests/integration_tests/table_route/conf/changefeed.toml index 9b0258e9aa..0e9f9193b9 100644 --- a/tests/integration_tests/table_route/conf/changefeed.toml +++ b/tests/integration_tests/table_route/conf/changefeed.toml @@ -2,7 +2,7 @@ # This tests schema and table routing for MySQL sinks [filter] -rules = ['source_db.*'] +rules = ['source_db.*', 'source_extra_db.*'] [sink] # Dispatch rules with schema and table routing @@ -11,3 +11,8 @@ rules = ['source_db.*'] matcher = ['source_db.*'] target-schema = 'target_db' target-table = '{table}_routed' + +[[sink.dispatchers]] +matcher = ['source_extra_db.*'] +target-schema = 'target_extra_db' +target-table = '{table}_routed' diff --git a/tests/integration_tests/table_route/conf/diff_config.toml b/tests/integration_tests/table_route/conf/diff_config.toml index 4734113499..4a4afde771 100644 --- a/tests/integration_tests/table_route/conf/diff_config.toml +++ b/tests/integration_tests/table_route/conf/diff_config.toml @@ -24,6 +24,8 @@ check-struct-only = false "target_db.partitioned_events_routed", "target_db.truncate_test_routed", "target_db.finish_mark_routed", + "target_extra_db.external_users_routed", + "target_extra_db.cross_move_target_routed", ] [data-sources] @@ -43,6 +45,8 @@ check-struct-only = false "partitioned_events", "truncate_test", "finish_mark", + "external_users", + "cross_move_target", ] [data-sources.mysql1] @@ -110,3 +114,15 @@ schema-pattern = "source_db" table-pattern = "finish_mark" target-schema = "target_db" target-table = "finish_mark_routed" + +[routes.external_users] +schema-pattern = "source_extra_db" +table-pattern = "external_users" +target-schema = "target_extra_db" +target-table = "external_users_routed" + +[routes.cross_move_target] +schema-pattern = "source_extra_db" +table-pattern = "cross_move_target" +target-schema = "target_extra_db" +target-table = "cross_move_target_routed" diff --git a/tests/integration_tests/table_route/data/test.sql b/tests/integration_tests/table_route/data/test.sql index 7432add2e4..247d1b0bf4 100644 --- a/tests/integration_tests/table_route/data/test.sql +++ b/tests/integration_tests/table_route/data/test.sql @@ -1,6 +1,8 @@ -- Test mixed DDL and DML operations for table route. DROP DATABASE IF EXISTS source_db; +DROP DATABASE IF EXISTS source_extra_db; CREATE DATABASE source_db; +CREATE DATABASE source_extra_db; USE source_db; -- ============================================ @@ -77,6 +79,22 @@ ALTER TABLE users DROP COLUMN created_at; -- ============================================ ALTER TABLE orders ADD INDEX idx_user_id (user_id); +-- ============================================ +-- DDL: CROSS DATABASE +-- ============================================ +CREATE TABLE `source_extra_db`.`external_users` LIKE `source_db`.`users`; +INSERT INTO `source_extra_db`.`external_users` + SELECT `id`, `name`, `email` FROM `source_db`.`users` WHERE `id` <= 2; +UPDATE `source_extra_db`.`external_users` SET `email` = 'external_alice@example.com' WHERE `id` = 1; + +CREATE TABLE `source_db`.`cross_move_source` ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +INSERT INTO `source_db`.`cross_move_source` VALUES (1, 'move_source'); +RENAME TABLE `source_db`.`cross_move_source` TO `source_extra_db`.`cross_move_target`; +INSERT INTO `source_extra_db`.`cross_move_target` VALUES (2, 'move_target'); + -- ============================================ -- DDL: RENAME TABLE -- ============================================ diff --git a/tests/integration_tests/table_route/run.sh b/tests/integration_tests/table_route/run.sh index 3859f72916..3775e0c78b 100755 --- a/tests/integration_tests/table_route/run.sh +++ b/tests/integration_tests/table_route/run.sh @@ -29,7 +29,9 @@ function run() { check_table_not_exists source_db.users "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" check_table_not_exists source_db.orders "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists source_extra_db.external_users "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" check_table_not_exists target_db.temp_table_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.cross_move_source_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" check_table_not_exists target_db.multi_rename_a_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" check_table_not_exists target_db.multi_rename_b_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" check_table_not_exists target_db.to_be_dropped_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" @@ -39,7 +41,9 @@ function run() { check_contains "orders_routed" check_table_not_exists target_db.transient_view_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + run_sql "DROP DATABASE source_extra_db" "$UP_TIDB_HOST" "$UP_TIDB_PORT" run_sql "DROP DATABASE source_db" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + check_db_not_exists target_extra_db "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" 90 check_db_not_exists target_db "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" 90 cleanup_process "$CDC_BINARY" From 6fd9d544608723c0d1b9d93217e3a55f86339d74 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 8 May 2026 15:18:54 +0800 Subject: [PATCH 08/12] ignore unrelated DDL to the dispatcher --- .../eventcollector/dispatcher_stat.go | 33 ++- .../eventcollector/dispatcher_stat_test.go | 212 ++++++++++++++---- 2 files changed, 202 insertions(+), 43 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 5960cfc314..0281d66d35 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -550,15 +550,40 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv return false } events[0].Event = ddl - d.tableInfoVersion.Store(ddl.FinishedTs) - if ddl.TableInfo != nil { - d.tableInfo.Store(ddl.TableInfo) - } + d.updateTableInfoByDDL(ddl) } d.updateCommitTsStateByEvents(state, events) return d.target.HandleEvents(events, func() { d.wake() }) } +func (d *dispatcherStat) updateTableInfoByDDL(ddl *commonEvent.DDLEvent) { + if ddl.TableInfo == nil { + return + } + + tableSpan := d.target.GetTableSpan() + if tableSpan == nil || tableSpan.TableID == common.DDLSpanTableID { + return + } + + // A table dispatcher can receive DDLs unrelated to its own table for barrier + // coordination, for example CREATE VIEW is tracked in every table's DDL history. + // The cached table info is used to assemble subsequent DML rows. For partition + // tables, the dispatcher span ID is a physical partition ID while TableInfo + // carries the logical table ID, so compare with the cached table info first. + expectedTableID := tableSpan.TableID + current := d.tableInfo.Load() + if current != nil { + expectedTableID = current.(*common.TableInfo).TableName.TableID + } + if ddl.TableInfo.TableName.TableID != expectedTableID { + return + } + + d.tableInfoVersion.Store(ddl.FinishedTs) + d.tableInfo.Store(ddl.TableInfo) +} + func (d *dispatcherStat) handleDataEvents(events ...dispatcher.DispatcherEvent) bool { switch events[0].GetType() { case commonEvent.TypeDMLEvent, diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index 372e116cf9..63830a05ce 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -1512,52 +1513,185 @@ func TestHandleDDLEventTableInfoUpdate(t *testing.T) { localServerID := node.ID("local") remoteServerID := node.ID("remote") - t.Run("stores ddl table info", func(t *testing.T) { - var capturedEvent *commonEvent.DDLEvent - mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) - mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { - if len(events) > 0 { - capturedEvent = events[0].Event.(*commonEvent.DDLEvent) - } - return false + var capturedEvent *commonEvent.DDLEvent + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + if len(events) > 0 { + capturedEvent = events[0].Event.(*commonEvent.DDLEvent) + } + return false + } + + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) + stat.lastEventCommitTs.Store(50) + + tableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "users", + TableID: 1, + }, + } + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + Query: "ALTER TABLE `source_db`.`users` ADD COLUMN `c1` INT", + FinishedTs: 100, + Epoch: 10, + Seq: 2, + TableInfo: tableInfo, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + require.Same(t, tableInfo, storedTableInfo) + require.Equal(t, "source_db", storedTableInfo.TableName.Schema) + require.Equal(t, "users", storedTableInfo.TableName.Table) + require.Equal(t, int64(1), storedTableInfo.TableName.TableID) + require.Equal(t, uint64(100), stat.tableInfoVersion.Load()) + require.NotNil(t, capturedEvent) + require.Same(t, ddlEvent, capturedEvent) +} + +func TestHandleDDLEventDoesNotOverwriteTableInfoForAnotherTable(t *testing.T) { + t.Parallel() + + localServerID := node.ID("local") + remoteServerID := node.ID("remote") + + var capturedEvent *commonEvent.DDLEvent + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + if len(events) > 0 { + capturedEvent = events[0].Event.(*commonEvent.DDLEvent) } + return false + } - stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) - stat.connState.setEventServiceID(remoteServerID) - stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) - stat.lastEventCommitTs.Store(50) + router, err := routing.NewRouter(mockChangefeedID, false, []*config.DispatchRule{ + { + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: "{table}_routed", + }, + }) + require.NoError(t, err) + mockDisp.router = router - tableInfo := &common.TableInfo{ + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) + stat.lastEventCommitTs.Store(150) + + originalTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "products", + TableID: 1, + TargetSchema: "target_db", + TargetTable: "products_routed", + }, + UpdateTS: 100, + } + stat.tableInfo.Store(originalTableInfo) + stat.tableInfoVersion.Store(100) + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + Type: byte(model.ActionCreateView), + SchemaName: "source_db", + TableName: "transient_view", + Query: "CREATE VIEW `source_db`.`transient_view` AS SELECT `id` FROM `source_db`.`users`", + StartTs: 199, + FinishedTs: 200, + Epoch: 10, + Seq: 2, + TableInfo: &common.TableInfo{ TableName: common.TableName{ Schema: "source_db", - Table: "users", - TableID: 1, + Table: "transient_view", + TableID: 2, }, - } + UpdateTS: 200, + }, + } - ddlEvent := &commonEvent.DDLEvent{ - Version: commonEvent.DDLEventVersion1, - Query: "ALTER TABLE `source_db`.`users` ADD COLUMN `c1` INT", - FinishedTs: 100, - Epoch: 10, - Seq: 2, - TableInfo: tableInfo, - } + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } - events := []dispatcher.DispatcherEvent{ - {From: &remoteServerID, Event: ddlEvent}, - } + stat.handleDataEvents(events...) - stat.handleDataEvents(events...) - - storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) - require.NotNil(t, storedTableInfo) - require.Same(t, tableInfo, storedTableInfo) - require.Equal(t, "source_db", storedTableInfo.TableName.Schema) - require.Equal(t, "users", storedTableInfo.TableName.Table) - require.Equal(t, int64(1), storedTableInfo.TableName.TableID) - require.Equal(t, uint64(100), stat.tableInfoVersion.Load()) - require.NotNil(t, capturedEvent) - require.Same(t, ddlEvent, capturedEvent) - }) + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.Same(t, originalTableInfo, storedTableInfo) + require.Equal(t, uint64(100), stat.tableInfoVersion.Load()) + require.NotNil(t, capturedEvent) + require.NotSame(t, ddlEvent, capturedEvent) + require.Equal(t, "target_db", capturedEvent.TableInfo.GetTargetSchemaName()) + require.Equal(t, "transient_view_routed", capturedEvent.TableInfo.GetTargetTableName()) +} + +func TestHandleDDLEventUpdatesPartitionLogicalTableInfo(t *testing.T) { + t.Parallel() + + localServerID := node.ID("local") + remoteServerID := node.ID("remote") + + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) + stat.lastEventCommitTs.Store(150) + + originalTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "partitioned_events", + TableID: 10, + IsPartition: true, + }, + UpdateTS: 100, + } + stat.tableInfo.Store(originalTableInfo) + stat.tableInfoVersion.Store(100) + + updatedTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "partitioned_events", + TableID: 10, + IsPartition: true, + }, + UpdateTS: 200, + } + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + Type: byte(model.ActionAddTablePartition), + SchemaName: "source_db", + TableName: "partitioned_events", + Query: "ALTER TABLE `source_db`.`partitioned_events` ADD PARTITION (PARTITION p1 VALUES LESS THAN (100))", + StartTs: 199, + FinishedTs: 200, + Epoch: 10, + Seq: 2, + TableInfo: updatedTableInfo, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.Same(t, updatedTableInfo, storedTableInfo) + require.Equal(t, uint64(200), stat.tableInfoVersion.Load()) } From 2115c0662b644dec3e655fbb6c3399e838c4add2 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 8 May 2026 15:27:19 +0800 Subject: [PATCH 09/12] update tests --- .../eventcollector/dispatcher_stat_test.go | 197 +++--------------- 1 file changed, 29 insertions(+), 168 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index 63830a05ce..235d1050e8 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" - "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -46,6 +45,7 @@ type mockDispatcher struct { handleError func(err error) events []dispatcher.DispatcherEvent checkPointTs uint64 + tableSpan *heartbeatpb.TableSpan skipSyncpointAtStartTs bool router routing.Router @@ -77,6 +77,9 @@ func (m *mockDispatcher) GetChangefeedID() common.ChangeFeedID { } func (m *mockDispatcher) GetTableSpan() *heartbeatpb.TableSpan { + if m.tableSpan != nil { + return m.tableSpan + } return &heartbeatpb.TableSpan{ TableID: 1, } @@ -1508,17 +1511,19 @@ func TestRegisterTo(t *testing.T) { } func TestHandleDDLEventTableInfoUpdate(t *testing.T) { - t.Parallel() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + helper.Tk().MustExec("use test") + + tableDDL := helper.DDL2Event("CREATE TABLE `products` (`id` INT PRIMARY KEY)") + viewDDL := helper.DDL2Event("CREATE VIEW `transient_view` AS SELECT 1 AS `id`") localServerID := node.ID("local") remoteServerID := node.ID("remote") - var capturedEvent *commonEvent.DDLEvent mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.tableSpan = &heartbeatpb.TableSpan{TableID: tableDDL.TableInfo.TableName.TableID} mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { - if len(events) > 0 { - capturedEvent = events[0].Event.(*commonEvent.DDLEvent) - } return false } @@ -1527,171 +1532,27 @@ func TestHandleDDLEventTableInfoUpdate(t *testing.T) { stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) stat.lastEventCommitTs.Store(50) - tableInfo := &common.TableInfo{ - TableName: common.TableName{ - Schema: "source_db", - Table: "users", - TableID: 1, - }, - } - - ddlEvent := &commonEvent.DDLEvent{ - Version: commonEvent.DDLEventVersion1, - Query: "ALTER TABLE `source_db`.`users` ADD COLUMN `c1` INT", - FinishedTs: 100, - Epoch: 10, - Seq: 2, - TableInfo: tableInfo, - } - - events := []dispatcher.DispatcherEvent{ - {From: &remoteServerID, Event: ddlEvent}, - } - - stat.handleDataEvents(events...) + tableDDL.Epoch = 10 + tableDDL.Seq = 2 + stat.handleDataEvents(dispatcher.DispatcherEvent{From: &remoteServerID, Event: tableDDL}) storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) require.NotNil(t, storedTableInfo) - require.Same(t, tableInfo, storedTableInfo) - require.Equal(t, "source_db", storedTableInfo.TableName.Schema) - require.Equal(t, "users", storedTableInfo.TableName.Table) - require.Equal(t, int64(1), storedTableInfo.TableName.TableID) - require.Equal(t, uint64(100), stat.tableInfoVersion.Load()) - require.NotNil(t, capturedEvent) - require.Same(t, ddlEvent, capturedEvent) -} - -func TestHandleDDLEventDoesNotOverwriteTableInfoForAnotherTable(t *testing.T) { - t.Parallel() - - localServerID := node.ID("local") - remoteServerID := node.ID("remote") - - var capturedEvent *commonEvent.DDLEvent - mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) - mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { - if len(events) > 0 { - capturedEvent = events[0].Event.(*commonEvent.DDLEvent) - } - return false - } - - router, err := routing.NewRouter(mockChangefeedID, false, []*config.DispatchRule{ - { - Matcher: []string{"source_db.*"}, - TargetSchema: "target_db", - TargetTable: "{table}_routed", - }, - }) - require.NoError(t, err) - mockDisp.router = router - - stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) - stat.connState.setEventServiceID(remoteServerID) - stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) - stat.lastEventCommitTs.Store(150) - - originalTableInfo := &common.TableInfo{ - TableName: common.TableName{ - Schema: "source_db", - Table: "products", - TableID: 1, - TargetSchema: "target_db", - TargetTable: "products_routed", - }, - UpdateTS: 100, - } - stat.tableInfo.Store(originalTableInfo) - stat.tableInfoVersion.Store(100) - - ddlEvent := &commonEvent.DDLEvent{ - Version: commonEvent.DDLEventVersion1, - Type: byte(model.ActionCreateView), - SchemaName: "source_db", - TableName: "transient_view", - Query: "CREATE VIEW `source_db`.`transient_view` AS SELECT `id` FROM `source_db`.`users`", - StartTs: 199, - FinishedTs: 200, - Epoch: 10, - Seq: 2, - TableInfo: &common.TableInfo{ - TableName: common.TableName{ - Schema: "source_db", - Table: "transient_view", - TableID: 2, - }, - UpdateTS: 200, - }, - } - - events := []dispatcher.DispatcherEvent{ - {From: &remoteServerID, Event: ddlEvent}, - } - - stat.handleDataEvents(events...) - - storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) - require.Same(t, originalTableInfo, storedTableInfo) - require.Equal(t, uint64(100), stat.tableInfoVersion.Load()) - require.NotNil(t, capturedEvent) - require.NotSame(t, ddlEvent, capturedEvent) - require.Equal(t, "target_db", capturedEvent.TableInfo.GetTargetSchemaName()) - require.Equal(t, "transient_view_routed", capturedEvent.TableInfo.GetTargetTableName()) -} - -func TestHandleDDLEventUpdatesPartitionLogicalTableInfo(t *testing.T) { - t.Parallel() - - localServerID := node.ID("local") - remoteServerID := node.ID("remote") - - mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) - stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) - stat.connState.setEventServiceID(remoteServerID) - stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) - stat.lastEventCommitTs.Store(150) - - originalTableInfo := &common.TableInfo{ - TableName: common.TableName{ - Schema: "source_db", - Table: "partitioned_events", - TableID: 10, - IsPartition: true, - }, - UpdateTS: 100, - } - stat.tableInfo.Store(originalTableInfo) - stat.tableInfoVersion.Store(100) - - updatedTableInfo := &common.TableInfo{ - TableName: common.TableName{ - Schema: "source_db", - Table: "partitioned_events", - TableID: 10, - IsPartition: true, - }, - UpdateTS: 200, - } - ddlEvent := &commonEvent.DDLEvent{ - Version: commonEvent.DDLEventVersion1, - Type: byte(model.ActionAddTablePartition), - SchemaName: "source_db", - TableName: "partitioned_events", - Query: "ALTER TABLE `source_db`.`partitioned_events` ADD PARTITION (PARTITION p1 VALUES LESS THAN (100))", - StartTs: 199, - FinishedTs: 200, - Epoch: 10, - Seq: 2, - TableInfo: updatedTableInfo, - } - - events := []dispatcher.DispatcherEvent{ - {From: &remoteServerID, Event: ddlEvent}, - } + require.Same(t, tableDDL.TableInfo, storedTableInfo) + require.Equal(t, "test", storedTableInfo.TableName.Schema) + require.Equal(t, "products", storedTableInfo.TableName.Table) + require.Equal(t, tableDDL.TableInfo.TableName.TableID, storedTableInfo.TableName.TableID) + require.Equal(t, tableDDL.FinishedTs, stat.tableInfoVersion.Load()) + require.Len(t, mockDisp.events, 1) + require.Same(t, tableDDL, mockDisp.events[0].Event) - stat.handleDataEvents(events...) + viewDDL.Epoch = 10 + viewDDL.Seq = 3 + stat.handleDataEvents(dispatcher.DispatcherEvent{From: &remoteServerID, Event: viewDDL}) - storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) - require.Same(t, updatedTableInfo, storedTableInfo) - require.Equal(t, uint64(200), stat.tableInfoVersion.Load()) + storedTableInfo = stat.tableInfo.Load().(*common.TableInfo) + require.Same(t, tableDDL.TableInfo, storedTableInfo) + require.Equal(t, tableDDL.FinishedTs, stat.tableInfoVersion.Load()) + require.Len(t, mockDisp.events, 2) + require.Same(t, viewDDL, mockDisp.events[1].Event) } From 3f2737ecbe9eee1a32aec26d77f6d2de47936e74 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 8 May 2026 15:40:51 +0800 Subject: [PATCH 10/12] update comments --- downstreamadapter/eventcollector/dispatcher_stat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 4b284232ab..b0ef145303 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -71,7 +71,7 @@ type dispatcherStat struct { // tableInfo is the latest table info of the dispatcher's corresponding table. tableInfo atomic.Value // tableInfoVersion is the latest table info version of the dispatcher's corresponding table. - // It is updated by ddl event + // It should be updated together with tableInfo, because following DMLs use them as one schema snapshot. tableInfoVersion atomic.Uint64 } From b90a0aaf8f82ffa0a24e7411ee0c86acf9f51357 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 8 May 2026 16:56:08 +0800 Subject: [PATCH 11/12] update code --- .../eventcollector/dispatcher_stat.go | 19 +++++++++++++------ .../eventcollector/dispatcher_stat_test.go | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index b0ef145303..9fcbc94bb2 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -70,8 +70,8 @@ type dispatcherStat struct { gotSyncpointOnTS atomic.Bool // tableInfo is the latest table info of the dispatcher's corresponding table. tableInfo atomic.Value - // tableInfoVersion is the latest table info version of the dispatcher's corresponding table. - // It should be updated together with tableInfo, because following DMLs use them as one schema snapshot. + // tableInfoVersion is the latest schema version delivered to this dispatcher. + // It may advance even when tableInfo is not replaced. tableInfoVersion atomic.Uint64 } @@ -440,12 +440,20 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv } func (d *dispatcherStat) updateTableInfoByDDL(ddl *commonEvent.DDLEvent) { - if ddl.TableInfo == nil { + tableSpan := d.target.GetTableSpan() + if tableSpan == nil || tableSpan.TableID == common.DDLSpanTableID { return } - tableSpan := d.target.GetTableSpan() - if tableSpan == nil || tableSpan.TableID == common.DDLSpanTableID { + // EXCHANGE PARTITION can change the schema version of a physical table dispatcher + // while ddl.TableInfo carries another logical table. The storage sink uses + // tableInfoVersion to decide whether a DML belongs to an old schema, so advance + // it for every DDL delivered to this dispatcher. + // TODO: Revisit whether the storage sink should discard DML solely by comparing + // tableInfoVersion with existing schema files. + d.tableInfoVersion.Store(ddl.FinishedTs) + + if ddl.TableInfo == nil { return } @@ -463,7 +471,6 @@ func (d *dispatcherStat) updateTableInfoByDDL(ddl *commonEvent.DDLEvent) { return } - d.tableInfoVersion.Store(ddl.FinishedTs) d.tableInfo.Store(ddl.TableInfo) } diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index 76c993ea31..0c777bb1ac 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -1583,7 +1583,7 @@ func TestHandleDDLEventTableInfoUpdate(t *testing.T) { storedTableInfo = stat.tableInfo.Load().(*common.TableInfo) require.Same(t, tableDDL.TableInfo, storedTableInfo) - require.Equal(t, tableDDL.FinishedTs, stat.tableInfoVersion.Load()) + require.Equal(t, viewDDL.FinishedTs, stat.tableInfoVersion.Load()) require.Len(t, mockDisp.events, 2) require.Same(t, viewDDL, mockDisp.events[1].Event) } From a1b3e84f95a81e6f8438a9ba7fe448e90e3aaf92 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 8 May 2026 21:22:57 +0800 Subject: [PATCH 12/12] adjust InitPrivateFields method call --- downstreamadapter/eventcollector/dispatcher_stat.go | 4 ---- pkg/common/event/active_active_test.go | 1 - pkg/common/event/ddl_event.go | 7 ------- pkg/common/event/ddl_event_test.go | 1 - pkg/common/event/dml_event.go | 7 +++---- pkg/common/event/handshake_event.go | 3 --- pkg/common/event/util.go | 1 - pkg/common/table_info.go | 13 +++++++------ pkg/common/table_info_test.go | 5 +++++ pkg/sink/mysql/sql_builder_test.go | 1 - 10 files changed, 15 insertions(+), 28 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 9fcbc94bb2..b6619d2c2b 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -360,10 +360,6 @@ func (d *dispatcherStat) handleBatchDataEvents(events []dispatcher.DispatcherEve batchDML := event.Event.(*commonEvent.BatchDMLEvent) batchDML.AssembleRows(tableInfo) for _, dml := range batchDML.DMLEvents { - // DMLs in the same batch share the same updateTs in their table info, - // but they may reference different table info objects, - // so each needs to be initialized separately. - dml.TableInfo.InitPrivateFields() dml.TableInfoVersion = tableInfoVersion dmlEvent := dispatcher.NewDispatcherEvent(event.From, dml) if d.shouldForwardEventByCommitTs(dmlEvent) { diff --git a/pkg/common/event/active_active_test.go b/pkg/common/event/active_active_test.go index 83ac73a466..5b00fd8fcb 100644 --- a/pkg/common/event/active_active_test.go +++ b/pkg/common/event/active_active_test.go @@ -369,7 +369,6 @@ func newTestTableInfo(t *testing.T, activeActive, softDelete bool) *commonpkg.Ta ti := commonpkg.WrapTableInfo("test", table) ti.ActiveActiveTable = activeActive ti.SoftDeleteTable = softDelete - ti.InitPrivateFields() return ti } diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 599883be92..c22bd4b02b 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -522,13 +522,6 @@ func (t *DDLEvent) decodeV1(data []byte) error { return err } - for _, info := range t.MultipleTableInfos { - info.InitPrivateFields() - } - if t.TableInfo != nil { - t.TableInfo.InitPrivateFields() - } - return nil } diff --git a/pkg/common/event/ddl_event_test.go b/pkg/common/event/ddl_event_test.go index 41efb79b86..60fb7e67cb 100644 --- a/pkg/common/event/ddl_event_test.go +++ b/pkg/common/event/ddl_event_test.go @@ -240,7 +240,6 @@ func TestDDLEvent(t *testing.T) { }, Err: errors.ErrDDLEventError.GenWithStackByArgs("test").Error(), } - ddlEvent.TableInfo.InitPrivateFields() // Test normal marshal/unmarshal data, err := ddlEvent.Marshal() diff --git a/pkg/common/event/dml_event.go b/pkg/common/event/dml_event.go index b5d3b7d3a0..4b95f28d2b 100644 --- a/pkg/common/event/dml_event.go +++ b/pkg/common/event/dml_event.go @@ -283,10 +283,6 @@ func (b *BatchDMLEvent) AssembleRows(tableInfo *common.TableInfo) { log.Panic("DMLEvent: TableInfo is nil") } - defer func() { - b.TableInfo.InitPrivateFields() - }() - // For local events (same node), rows are already set. if b.Rows != nil { if !tableInfo.TableName.IsRouted() { @@ -296,6 +292,9 @@ func (b *BatchDMLEvent) AssembleRows(tableInfo *common.TableInfo) { originVersion := b.TableInfo.GetUpdateTS() routedVersion := tableInfo.GetUpdateTS() if originVersion != routedVersion { + // TODO: Analyze partition DDL cases where local rows can be + // decoded with a source TableInfo version different from the + // routed TableInfo cached in the dispatcher. log.Panic("table version mismatch when set routed table info", zap.Uint64("originTableVersion", originVersion), zap.Uint64("routedTableVersion", routedVersion)) diff --git a/pkg/common/event/handshake_event.go b/pkg/common/event/handshake_event.go index 9c0c0cb1f1..a8b1f3b01d 100644 --- a/pkg/common/event/handshake_event.go +++ b/pkg/common/event/handshake_event.go @@ -205,8 +205,5 @@ func (e *HandshakeEvent) decodeV1(data []byte) error { return err } - // Initialize private fields after unmarshaling - e.TableInfo.InitPrivateFields() - return nil } diff --git a/pkg/common/event/util.go b/pkg/common/event/util.go index 1e12adab95..ecbf60bcde 100644 --- a/pkg/common/event/util.go +++ b/pkg/common/event/util.go @@ -141,7 +141,6 @@ func (s *EventTestHelper) storeTableInfo(schemaName string, tableInfo *timodel.T if info == nil { return } - info.InitPrivateFields() key := toTableInfosKey(info.GetSchemaName(), info.GetTableName()) if tableInfo.Partition != nil { if _, ok := s.partitionIDs[key]; !ok { diff --git a/pkg/common/table_info.go b/pkg/common/table_info.go index b40d5fbe65..19b6bf2141 100644 --- a/pkg/common/table_info.go +++ b/pkg/common/table_info.go @@ -117,7 +117,7 @@ type TableInfo struct { } func (ti *TableInfo) InitPrivateFields() { - if ti == nil { + if ti == nil || ti.columnSchema == nil { return } @@ -142,10 +142,10 @@ func (ti *TableInfo) InitPrivateFields() { // CloneWithRouting creates a shallow copy of TableInfo with routing applied. // The new TableInfo shares the same columnSchema, View, Sequence pointers -// but has its own TableName (with TargetSchema/TargetTable set) and uninitialized preSQLs. +// but has its own TableName (with TargetSchema/TargetTable set) and preSQLs. // This is safe because: // - columnSchema, View, Sequence are read-only after creation -// - preSQLs will be initialized later via InitPrivateFields() using the new TableName +// - preSQLs is initialized using the new TableName before the clone is returned // - TableName is a value type that gets copied func (ti *TableInfo) CloneWithRouting(targetSchema, targetTable string) *TableInfo { if ti == nil { @@ -178,6 +178,7 @@ func (ti *TableInfo) CloneWithRouting(targetSchema, targetTable string) *TableIn }) } + cloned.InitPrivateFields() return cloned } @@ -230,6 +231,7 @@ func UnmarshalJSONToTableInfo(data []byte) (*TableInfo, error) { if err != nil { return nil, err } + ti.InitPrivateFields() // when this tableInfo is released, we need to cut down the reference count of the columnSchema // This function should be appear when tableInfo is created as a pair. @@ -673,6 +675,7 @@ func newTableInfo(schema string, table string, tableID int64, isPartition bool, func NewTableInfo(schemaName string, tableName string, tableID int64, isPartition bool, columnSchema *columnSchema, tableInfo *model.TableInfo) *TableInfo { ti := newTableInfo(schemaName, tableName, tableID, isPartition, columnSchema, tableInfo) + ti.InitPrivateFields() // when this tableInfo is released, we need to cut down the reference count of the columnSchema // This function should be appeared when tableInfo is created as a pair. @@ -698,7 +701,5 @@ func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo { // do not call this method on the production code. func NewTableInfo4Decoder(schema string, tableInfo *model.TableInfo) *TableInfo { cs := NewColumnSchema4Decoder(tableInfo) - result := newTableInfo(schema, tableInfo.Name.O, tableInfo.ID, tableInfo.GetPartitionInfo() != nil, cs, tableInfo) - result.InitPrivateFields() - return result + return NewTableInfo(schema, tableInfo.Name.O, tableInfo.ID, tableInfo.GetPartitionInfo() != nil, cs, tableInfo) } diff --git a/pkg/common/table_info_test.go b/pkg/common/table_info_test.go index 5be97d2f37..b45d8c3a9c 100644 --- a/pkg/common/table_info_test.go +++ b/pkg/common/table_info_test.go @@ -162,6 +162,10 @@ func TestUnmarshalJSONToTableInfoRoundTrip(t *testing.T) { Columns: []*model.ColumnInfo{idCol, nameCol}, }) require.NotNil(t, source) + require.Contains(t, source.GetPreInsertSQL(), QuoteSchema("test", "t_roundtrip")) + + routed := source.CloneWithRouting("target_db", "target_table") + require.Contains(t, routed.GetPreInsertSQL(), QuoteSchema("target_db", "target_table")) data, err := source.Marshal() require.NoError(t, err) @@ -176,6 +180,7 @@ func TestUnmarshalJSONToTableInfoRoundTrip(t *testing.T) { require.Equal(t, len(source.GetColumns()), len(decoded.GetColumns())) require.Equal(t, source.GetColumns()[0].Name.O, decoded.GetColumns()[0].Name.O) require.Equal(t, source.GetColumns()[1].Name.O, decoded.GetColumns()[1].Name.O) + require.Contains(t, decoded.GetPreInsertSQL(), QuoteSchema("test", "t_roundtrip")) } func TestUnquoteName(t *testing.T) { diff --git a/pkg/sink/mysql/sql_builder_test.go b/pkg/sink/mysql/sql_builder_test.go index 1cb2247deb..5dde1c1e00 100644 --- a/pkg/sink/mysql/sql_builder_test.go +++ b/pkg/sink/mysql/sql_builder_test.go @@ -185,7 +185,6 @@ func TestBuildInsert(t *testing.T) { func TestBuildDMLUsesRoutedTargetTable(t *testing.T) { insert, deleteRow, updateRow, tableInfo := getRowForTest(t) routedTableInfo := tableInfo.CloneWithRouting("target_db", "target_table") - routedTableInfo.InitPrivateFields() insertSQL, _ := buildInsert(routedTableInfo, insert, false) require.Contains(t, insertSQL, "INSERT INTO `target_db`.`target_table`")