diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 4ec845debf..b6619d2c2b 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -70,8 +70,8 @@ type dispatcherStat struct { gotSyncpointOnTS atomic.Bool // tableInfo is the latest table info of the dispatcher's corresponding table. tableInfo atomic.Value - // tableInfoVersion is the latest table info version of the dispatcher's corresponding table. - // It is updated by ddl event + // tableInfoVersion is the latest schema version delivered to this dispatcher. + // It may advance even when tableInfo is not replaced. tableInfoVersion atomic.Uint64 } @@ -360,10 +360,6 @@ func (d *dispatcherStat) handleBatchDataEvents(events []dispatcher.DispatcherEve batchDML := event.Event.(*commonEvent.BatchDMLEvent) batchDML.AssembleRows(tableInfo) for _, dml := range batchDML.DMLEvents { - // DMLs in the same batch share the same updateTs in their table info, - // but they may reference different table info objects, - // so each needs to be initialized separately. - dml.TableInfo.InitPrivateFields() dml.TableInfoVersion = tableInfoVersion dmlEvent := dispatcher.NewDispatcherEvent(event.From, dml) if d.shouldForwardEventByCommitTs(dmlEvent) { @@ -433,15 +429,47 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv return false } events[0].Event = ddl - d.tableInfoVersion.Store(ddl.FinishedTs) - if ddl.TableInfo != nil { - d.tableInfo.Store(ddl.TableInfo) - } + d.updateTableInfoByDDL(ddl) } d.updateCommitTsStateByEvents(state, events) return d.target.HandleEvents(events, func() { d.wake() }) } +func (d *dispatcherStat) updateTableInfoByDDL(ddl *commonEvent.DDLEvent) { + tableSpan := d.target.GetTableSpan() + if tableSpan == nil || tableSpan.TableID == common.DDLSpanTableID { + return + } + + // EXCHANGE PARTITION can change the schema version of a physical table dispatcher + // while ddl.TableInfo carries another logical table. The storage sink uses + // tableInfoVersion to decide whether a DML belongs to an old schema, so advance + // it for every DDL delivered to this dispatcher. + // TODO: Revisit whether the storage sink should discard DML solely by comparing + // tableInfoVersion with existing schema files. + d.tableInfoVersion.Store(ddl.FinishedTs) + + if ddl.TableInfo == nil { + return + } + + // A table dispatcher can receive DDLs unrelated to its own table for barrier + // coordination, for example CREATE VIEW is tracked in every table's DDL history. + // The cached table info is used to assemble subsequent DML rows. For partition + // tables, the dispatcher span ID is a physical partition ID while TableInfo + // carries the logical table ID, so compare with the cached table info first. + expectedTableID := tableSpan.TableID + current := d.tableInfo.Load() + if current != nil { + expectedTableID = current.(*common.TableInfo).TableName.TableID + } + if ddl.TableInfo.TableName.TableID != expectedTableID { + return + } + + d.tableInfo.Store(ddl.TableInfo) +} + func (d *dispatcherStat) handleDataEvents(events ...dispatcher.DispatcherEvent) bool { switch events[0].GetType() { case commonEvent.TypeDMLEvent, diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index d84013b610..0c777bb1ac 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -45,6 +45,7 @@ type mockDispatcher struct { handleError func(err error) events []dispatcher.DispatcherEvent checkPointTs uint64 + tableSpan *heartbeatpb.TableSpan skipSyncpointAtStartTs bool router routing.Router @@ -76,6 +77,9 @@ func (m *mockDispatcher) GetChangefeedID() common.ChangeFeedID { } func (m *mockDispatcher) GetTableSpan() *heartbeatpb.TableSpan { + if m.tableSpan != nil { + return m.tableSpan + } return &heartbeatpb.TableSpan{ TableID: 1, } @@ -1538,57 +1542,48 @@ func TestRegisterTo(t *testing.T) { } func TestHandleDDLEventTableInfoUpdate(t *testing.T) { - t.Parallel() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + helper.Tk().MustExec("use test") + + tableDDL := helper.DDL2Event("CREATE TABLE `products` (`id` INT PRIMARY KEY)") + viewDDL := helper.DDL2Event("CREATE VIEW `transient_view` AS SELECT 1 AS `id`") localServerID := node.ID("local") remoteServerID := node.ID("remote") - t.Run("stores ddl table info", func(t *testing.T) { - var capturedEvent *commonEvent.DDLEvent - mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) - mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { - if len(events) > 0 { - capturedEvent = events[0].Event.(*commonEvent.DDLEvent) - } - return false - } - - stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) - stat.session.connState.setEventServiceID(remoteServerID) - stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) - stat.lastEventCommitTs.Store(50) + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.tableSpan = &heartbeatpb.TableSpan{TableID: tableDDL.TableInfo.TableName.TableID} + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + return false + } - tableInfo := &common.TableInfo{ - TableName: common.TableName{ - Schema: "source_db", - Table: "users", - TableID: 1, - }, - } + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.session.connState.setEventServiceID(remoteServerID) + stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) + stat.lastEventCommitTs.Store(50) - ddlEvent := &commonEvent.DDLEvent{ - Version: commonEvent.DDLEventVersion1, - Query: "ALTER TABLE `source_db`.`users` ADD COLUMN `c1` INT", - FinishedTs: 100, - Epoch: 10, - Seq: 2, - TableInfo: tableInfo, - } + tableDDL.Epoch = 10 + tableDDL.Seq = 2 + stat.handleDataEvents(dispatcher.DispatcherEvent{From: &remoteServerID, Event: tableDDL}) + + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + require.Same(t, tableDDL.TableInfo, storedTableInfo) + require.Equal(t, "test", storedTableInfo.TableName.Schema) + require.Equal(t, "products", storedTableInfo.TableName.Table) + require.Equal(t, tableDDL.TableInfo.TableName.TableID, storedTableInfo.TableName.TableID) + require.Equal(t, tableDDL.FinishedTs, stat.tableInfoVersion.Load()) + require.Len(t, mockDisp.events, 1) + require.Same(t, tableDDL, mockDisp.events[0].Event) - events := []dispatcher.DispatcherEvent{ - {From: &remoteServerID, Event: ddlEvent}, - } + viewDDL.Epoch = 10 + viewDDL.Seq = 3 + stat.handleDataEvents(dispatcher.DispatcherEvent{From: &remoteServerID, Event: viewDDL}) - stat.handleDataEvents(events...) - - storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) - require.NotNil(t, storedTableInfo) - require.Same(t, tableInfo, storedTableInfo) - require.Equal(t, "source_db", storedTableInfo.TableName.Schema) - require.Equal(t, "users", storedTableInfo.TableName.Table) - require.Equal(t, int64(1), storedTableInfo.TableName.TableID) - require.Equal(t, uint64(100), stat.tableInfoVersion.Load()) - require.NotNil(t, capturedEvent) - require.Same(t, ddlEvent, capturedEvent) - }) + storedTableInfo = stat.tableInfo.Load().(*common.TableInfo) + require.Same(t, tableDDL.TableInfo, storedTableInfo) + require.Equal(t, viewDDL.FinishedTs, stat.tableInfoVersion.Load()) + require.Len(t, mockDisp.events, 2) + require.Same(t, viewDDL, mockDisp.events[1].Event) } diff --git a/downstreamadapter/routing/ddl_query_rewriter.go b/downstreamadapter/routing/ddl_query_rewriter.go index 3fce06c0f6..9bab897cb2 100644 --- a/downstreamadapter/routing/ddl_query_rewriter.go +++ b/downstreamadapter/routing/ddl_query_rewriter.go @@ -41,7 +41,7 @@ func (r Router) rewriteParserBackedDDLQuery(ddl *commonEvent.DDLEvent) (string, ) for i := range queries { query := queries[i] - newQuery, err := r.rewriteSingleDDLQuery(query) + newQuery, err := r.rewriteSingleDDLQuery(query, ddl.GetSchemaName()) if err != nil { return "", err } @@ -78,7 +78,7 @@ func splitMultiStmtDDLQuery(query string) ([]string, error) { return queries, nil } -func (r Router) rewriteSingleDDLQuery(query string) (string, error) { +func (r Router) rewriteSingleDDLQuery(query string, defaultSchema string) (string, error) { p := parser.New() stmt, err := p.ParseOneStmt(query, "", "") if err != nil { @@ -89,6 +89,7 @@ func (r Router) rewriteSingleDDLQuery(query string) (string, error) { if len(sourceTables) == 0 { return query, nil } + fillDefaultSchema(sourceTables, defaultSchema) var ( routed bool @@ -119,6 +120,18 @@ func (r Router) rewriteSingleDDLQuery(query string) (string, error) { return newQuery, nil } +func fillDefaultSchema(tables []commonEvent.SchemaTableName, defaultSchema string) { + if defaultSchema == "" { + return + } + + for i := range tables { + if tables[i].SchemaName == "" && tables[i].TableName != "" { + tables[i].SchemaName = defaultSchema + } + } +} + // tableNameExtractor extracts table names from DDL AST nodes. // ref: https://github.com/pingcap/tidb/blob/09feccb529be2830944e11f5fed474020f50370f/server/sql_info_fetcher.go#L46 type tableNameExtractor struct { diff --git a/downstreamadapter/routing/router_apply_test.go b/downstreamadapter/routing/router_apply_test.go index de97c77c57..4dfce5cef0 100644 --- a/downstreamadapter/routing/router_apply_test.go +++ b/downstreamadapter/routing/router_apply_test.go @@ -742,7 +742,7 @@ func TestRewriteParserBackedDDLQueryError(t *testing.T) { TargetTable: TablePlaceholder, }}) - _, err := router.rewriteSingleDDLQuery("INVALID SQL !!!") + _, err := router.rewriteSingleDDLQuery("INVALID SQL !!!", "") code, ok := errors.RFCCode(err) require.True(t, ok) require.Equal(t, errors.ErrTableRoutingFailed.RFCCode(), code) diff --git a/downstreamadapter/routing/router_supported_ddl_test.go b/downstreamadapter/routing/router_supported_ddl_test.go index ca33b5f1d8..e3f5765aca 100644 --- a/downstreamadapter/routing/router_supported_ddl_test.go +++ b/downstreamadapter/routing/router_supported_ddl_test.go @@ -16,10 +16,12 @@ package routing import ( "testing" + "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" ddlutil "github.com/pingcap/tidb/pkg/ddl/util" timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/stretchr/testify/require" ) @@ -1007,7 +1009,7 @@ func TestRewriteDDLQueryWithRoutingSupportsParserBackedDDLTypes(t *testing.T) { } } -func TestApplyToDDLEventSupportsCreateTables(t *testing.T) { +func TestApplyToDDLEventRoutesDDLEventMetadata(t *testing.T) { router := newTestRouter(t, false, []*config.DispatchRule{{ Matcher: []string{"source_db.*"}, TargetSchema: "target_db", @@ -1016,25 +1018,73 @@ func TestApplyToDDLEventSupportsCreateTables(t *testing.T) { helper := event.NewEventTestHelper(t) defer helper.Close() - schemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") - // TiDB ActionCreateTables is same-schema only. Cross-schema CREATE TABLE - // statements are emitted as separate DDL jobs upstream, not one ActionCreateTables event. - ddl := helper.BatchCreateTableDDLs2Event("source_db", - "CREATE TABLE `source_db`.`t1` (`id` INT PRIMARY KEY)", - "CREATE TABLE `source_db`.`t2` (`id` INT PRIMARY KEY)", - ) + schemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") routedSchema, err := router.ApplyToDDLEvent(schemaDDL) require.NoError(t, err) require.Contains(t, routedSchema.Query, "`target_db`") - routed, err := router.ApplyToDDLEvent(ddl) + helper.Tk().MustExec("USE `source_db`") + singleCreateDDL := helper.DDL2Event("CREATE TABLE `source_table` (`id` INT PRIMARY KEY)") + singleCreateDDL.DispatcherID = common.NewDispatcherID() + singleCreateDDL.Seq = 1 + singleCreateDDL.Epoch = 2 + singleCreateDDL.TiDBOnly = true + singleCreateDDL.BDRMode = string(ast.BDRRolePrimary) + singleCreateDDL.PostTxnFlushed = []func(){func() {}, func() {}} + + originalQuery := singleCreateDDL.Query + originalTableInfo := singleCreateDDL.TableInfo + require.Equal(t, "source_db", singleCreateDDL.SchemaName) + require.NotContains(t, originalQuery, "`source_db`") + routedSingleCreate, err := router.ApplyToDDLEvent(singleCreateDDL) + require.NoError(t, err) + require.NotSame(t, singleCreateDDL, routedSingleCreate) + require.Equal(t, singleCreateDDL.Version, routedSingleCreate.Version) + require.Equal(t, singleCreateDDL.DispatcherID, routedSingleCreate.DispatcherID) + require.Equal(t, singleCreateDDL.Type, routedSingleCreate.Type) + require.Equal(t, singleCreateDDL.SchemaID, routedSingleCreate.SchemaID) + require.Equal(t, singleCreateDDL.SchemaName, routedSingleCreate.SchemaName) + require.Equal(t, singleCreateDDL.TableName, routedSingleCreate.TableName) + require.Equal(t, singleCreateDDL.FinishedTs, routedSingleCreate.FinishedTs) + require.Equal(t, singleCreateDDL.Seq, routedSingleCreate.Seq) + require.Equal(t, singleCreateDDL.Epoch, routedSingleCreate.Epoch) + require.Equal(t, singleCreateDDL.TiDBOnly, routedSingleCreate.TiDBOnly) + require.Equal(t, singleCreateDDL.BDRMode, routedSingleCreate.BDRMode) + require.Equal(t, originalQuery, singleCreateDDL.Query) + require.Same(t, originalTableInfo, singleCreateDDL.TableInfo) + require.Equal(t, "source_db", singleCreateDDL.GetTargetSchemaName()) + require.Equal(t, "source_table", singleCreateDDL.GetTargetTableName()) + require.Equal(t, "target_db", routedSingleCreate.GetTargetSchemaName()) + require.Equal(t, "source_table_r", routedSingleCreate.GetTargetTableName()) + require.Contains(t, routedSingleCreate.Query, "`target_db`.`source_table_r`") + require.NotContains(t, routedSingleCreate.Query, "`source_db`") + require.NotSame(t, originalTableInfo, routedSingleCreate.TableInfo) + require.Equal(t, "target_db", routedSingleCreate.TableInfo.GetTargetSchemaName()) + require.Equal(t, "source_table_r", routedSingleCreate.TableInfo.GetTargetTableName()) + require.Len(t, routedSingleCreate.PostTxnFlushed, 2) + require.Len(t, singleCreateDDL.PostTxnFlushed, 2) + require.NotEqual(t, &singleCreateDDL.PostTxnFlushed[0], &routedSingleCreate.PostTxnFlushed[0]) + routedSingleCreate.AddPostFlushFunc(func() {}) + require.Len(t, routedSingleCreate.PostTxnFlushed, 3) + require.Len(t, singleCreateDDL.PostTxnFlushed, 2) + + createTablesDDL := helper.BatchCreateTableDDLs2Event("source_db", + "CREATE TABLE `source_db`.`t1` (`id` INT PRIMARY KEY)", + "CREATE TABLE `source_db`.`t2` (`id` INT PRIMARY KEY)", + ) + routedCreateTables, err := router.ApplyToDDLEvent(createTablesDDL) require.NoError(t, err) - require.Contains(t, routed.Query, "CREATE TABLE `target_db`.`t1_r`") - require.Contains(t, routed.Query, "CREATE TABLE `target_db`.`t2_r`") - require.Len(t, routed.MultipleTableInfos, 2) - require.Equal(t, "target_db", routed.MultipleTableInfos[0].GetTargetSchemaName()) - require.Equal(t, "t1_r", routed.MultipleTableInfos[0].GetTargetTableName()) - require.Equal(t, "target_db", routed.MultipleTableInfos[1].GetTargetSchemaName()) - require.Equal(t, "t2_r", routed.MultipleTableInfos[1].GetTargetTableName()) + require.Contains(t, routedCreateTables.Query, "CREATE TABLE `target_db`.`t1_r`") + require.Contains(t, routedCreateTables.Query, "CREATE TABLE `target_db`.`t2_r`") + require.Len(t, createTablesDDL.MultipleTableInfos, 2) + require.Len(t, routedCreateTables.MultipleTableInfos, 2) + require.Equal(t, "source_db", createTablesDDL.MultipleTableInfos[0].GetTargetSchemaName()) + require.Equal(t, "t1", createTablesDDL.MultipleTableInfos[0].GetTargetTableName()) + require.Equal(t, "source_db", createTablesDDL.MultipleTableInfos[1].GetTargetSchemaName()) + require.Equal(t, "t2", createTablesDDL.MultipleTableInfos[1].GetTargetTableName()) + require.Equal(t, "target_db", routedCreateTables.MultipleTableInfos[0].GetTargetSchemaName()) + require.Equal(t, "t1_r", routedCreateTables.MultipleTableInfos[0].GetTargetTableName()) + require.Equal(t, "target_db", routedCreateTables.MultipleTableInfos[1].GetTargetSchemaName()) + require.Equal(t, "t2_r", routedCreateTables.MultipleTableInfos[1].GetTargetTableName()) } diff --git a/pkg/common/event/active_active_test.go b/pkg/common/event/active_active_test.go index 83ac73a466..5b00fd8fcb 100644 --- a/pkg/common/event/active_active_test.go +++ b/pkg/common/event/active_active_test.go @@ -369,7 +369,6 @@ func newTestTableInfo(t *testing.T, activeActive, softDelete bool) *commonpkg.Ta ti := commonpkg.WrapTableInfo("test", table) ti.ActiveActiveTable = activeActive ti.SoftDeleteTable = softDelete - ti.InitPrivateFields() return ti } diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 599883be92..c22bd4b02b 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -522,13 +522,6 @@ func (t *DDLEvent) decodeV1(data []byte) error { return err } - for _, info := range t.MultipleTableInfos { - info.InitPrivateFields() - } - if t.TableInfo != nil { - t.TableInfo.InitPrivateFields() - } - return nil } diff --git a/pkg/common/event/ddl_event_test.go b/pkg/common/event/ddl_event_test.go index 629e8ad756..60fb7e67cb 100644 --- a/pkg/common/event/ddl_event_test.go +++ b/pkg/common/event/ddl_event_test.go @@ -21,8 +21,6 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/ast" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -242,7 +240,6 @@ func TestDDLEvent(t *testing.T) { }, Err: errors.ErrDDLEventError.GenWithStackByArgs("test").Error(), } - ddlEvent.TableInfo.InitPrivateFields() // Test normal marshal/unmarshal data, err := ddlEvent.Marshal() @@ -523,201 +520,3 @@ CREATE TABLE test2 (id INT); }) } } - -// TestNewRoutedDDLEvent ensures routed DDL construction preserves the origin event -// while producing an independent routed event for downstream use. -func TestNewRoutedDDLEvent(t *testing.T) { - helper := NewEventTestHelper(t) - defer helper.Close() - - helper.tk.MustExec("use test") - ddlJob := helper.DDL2Job(createTableSQL) - require.NotNil(t, ddlJob) - - // Create original DDL event with all fields populated - originalTableInfo := common.WrapTableInfo(ddlJob.SchemaName, ddlJob.BinlogInfo.TableInfo) - originalTableInfo.InitPrivateFields() - - multipleTableInfo1 := common.WrapTableInfo("schema1", ddlJob.BinlogInfo.TableInfo) - multipleTableInfo1.InitPrivateFields() - multipleTableInfo2 := common.WrapTableInfo("schema2", ddlJob.BinlogInfo.TableInfo) - multipleTableInfo2.InitPrivateFields() - - postFlushFunc1 := func() {} - postFlushFunc2 := func() {} - - original := &DDLEvent{ - Version: DDLEventVersion1, - DispatcherID: common.NewDispatcherID(), - Type: byte(ddlJob.Type), - SchemaID: ddlJob.SchemaID, - SchemaName: ddlJob.SchemaName, - TableName: ddlJob.TableName, - Query: ddlJob.Query, - TableInfo: originalTableInfo, - FinishedTs: ddlJob.BinlogInfo.FinishedTS, - Seq: 1, - Epoch: 2, - MultipleTableInfos: []*common.TableInfo{multipleTableInfo1, multipleTableInfo2}, - PostTxnFlushed: []func(){postFlushFunc1, postFlushFunc2}, - TiDBOnly: true, - BDRMode: "test-mode", - } - - newRoutedTableInfo := originalTableInfo.CloneWithRouting("routed_schema", "test") - routedMultipleTableInfos := []*common.TableInfo{ - multipleTableInfo1.CloneWithRouting("routed_schema1", "table1"), - multipleTableInfo2.CloneWithRouting("routed_schema2", "table2"), - } - - routed := NewRoutedDDLEvent( - original, - "CREATE TABLE routed_schema.test ...", - "routed_schema", - "", - "", - "", - newRoutedTableInfo, - routedMultipleTableInfos, - original.BlockedTableNames, - ) - require.NotNil(t, routed) - - // Verify that the routed event is a separate object. - require.False(t, original == routed, "routed event should be a different object") - - // Verify that non-routing fields are copied as-is. - require.Equal(t, original.Version, routed.Version) - require.Equal(t, original.DispatcherID, routed.DispatcherID) - require.Equal(t, original.Type, routed.Type) - require.Equal(t, original.SchemaID, routed.SchemaID) - require.Equal(t, original.SchemaName, routed.SchemaName) - require.Equal(t, original.TableName, routed.TableName) - require.Equal(t, original.FinishedTs, routed.FinishedTs) - require.Equal(t, original.Seq, routed.Seq) - require.Equal(t, original.Epoch, routed.Epoch) - require.Equal(t, original.TiDBOnly, routed.TiDBOnly) - require.Equal(t, original.BDRMode, routed.BDRMode) - - // Verify that MultipleTableInfos is a new slice so later mutations remain isolated. - require.False(t, &original.MultipleTableInfos[0] == &routed.MultipleTableInfos[0], "MultipleTableInfos should be a new slice") - - // Verify that PostTxnFlushed is an independent copy (not shared) - // This is defensive: currently DDL events arrive with nil PostTxnFlushed, - // but we copy it to prevent races if callbacks are ever added before building the routed event. - require.NotNil(t, routed.PostTxnFlushed) - require.Equal(t, 2, len(routed.PostTxnFlushed), "PostTxnFlushed should have same length as original") - require.Equal(t, 2, len(original.PostTxnFlushed), "Original PostTxnFlushed should remain unchanged") - // Verify independent backing arrays. - require.NotEqual(t, &original.PostTxnFlushed[0], &routed.PostTxnFlushed[0], "PostTxnFlushed should have independent backing arrays") - - // Verify that appending to the routed event doesn't affect the original. - routed.AddPostFlushFunc(func() {}) - require.Equal(t, 3, len(routed.PostTxnFlushed), "Routed event should have appended callback") - require.Equal(t, 2, len(original.PostTxnFlushed), "Original should be unaffected by routed event append") - - // Verify that routed state doesn't affect the original. - require.Equal(t, ddlJob.SchemaName, original.SchemaName, "Original SchemaName should be unchanged") - require.Equal(t, ddlJob.Query, original.Query, "Original Query should be unchanged") - require.True(t, original.TableInfo == originalTableInfo, "Original TableInfo should be unchanged") - require.True(t, original.MultipleTableInfos[0] == multipleTableInfo1, "Original MultipleTableInfos[0] should be unchanged") - require.True(t, original.MultipleTableInfos[1] == multipleTableInfo2, "Original MultipleTableInfos[1] should be unchanged") - - // Verify that the routed event has the routed state. - require.Equal(t, "routed_schema", routed.GetTargetSchemaName()) - require.Equal(t, "CREATE TABLE routed_schema.test ...", routed.Query) - require.True(t, routed.TableInfo == newRoutedTableInfo) - require.Equal(t, "routed_schema", routed.TableInfo.TableName.TargetSchema) - require.Equal(t, original.SchemaName, routed.GetSchemaName()) - require.Equal(t, original.TableName, routed.GetTableName()) - require.True(t, routed.MultipleTableInfos[0] == routedMultipleTableInfos[0]) - require.True(t, routed.MultipleTableInfos[1] == routedMultipleTableInfos[1]) - - // Test nil origin event. - var nilEvent *DDLEvent - routedNil := NewRoutedDDLEvent(nilEvent, "", "", "", "", "", nil, nil, nil) - require.Nil(t, routedNil) -} - -func TestNewRoutedDDLEventPreservesSourceFields(t *testing.T) { - original := &DDLEvent{ - SchemaName: "source_db", - TableName: "new_orders", - ExtraSchemaName: "source_db", - ExtraTableName: "old_orders", - targetSchemaName: "target_db", - targetTableName: "new_orders_routed", - targetExtraSchemaName: "target_db", - targetExtraTableName: "old_orders_routed", - } - - routed := NewRoutedDDLEvent( - original, - original.Query, - "target_db_v2", - "new_orders_routed_v2", - "target_db_v2", - "old_orders_routed_v2", - original.TableInfo, - original.MultipleTableInfos, - original.BlockedTableNames, - ) - - require.Equal(t, "source_db", routed.GetSchemaName()) - require.Equal(t, "new_orders", routed.GetTableName()) - require.Equal(t, "source_db", routed.GetExtraSchemaName()) - require.Equal(t, "old_orders", routed.GetExtraTableName()) - require.Equal(t, "target_db_v2", routed.GetTargetSchemaName()) - require.Equal(t, "new_orders_routed_v2", routed.GetTargetTableName()) - require.Equal(t, "target_db_v2", routed.GetTargetExtraSchemaName()) - require.Equal(t, "old_orders_routed_v2", routed.GetTargetExtraTableName()) -} - -func TestGetEventsForRenameTablesPreservesSourceAndTargetNames(t *testing.T) { - sourceTable1 := common.WrapTableInfo("new_db1", &model.TableInfo{ - ID: 100, - Name: ast.NewCIStr("new_table1"), - UpdateTS: 10, - }) - sourceTable2 := common.WrapTableInfo("new_db2", &model.TableInfo{ - ID: 101, - Name: ast.NewCIStr("new_table2"), - UpdateTS: 11, - }) - - ddl := &DDLEvent{ - Type: byte(model.ActionRenameTables), - Query: "RENAME TABLE `old_target_db1`.`old_target_table1` TO `new_target_db1`.`new_target_table1`; RENAME TABLE `old_target_db2`.`old_target_table2` TO `new_target_db2`.`new_target_table2`", - MultipleTableInfos: []*common.TableInfo{ - sourceTable1.CloneWithRouting("new_target_db1", "new_target_table1"), - sourceTable2.CloneWithRouting("new_target_db2", "new_target_table2"), - }, - TableNameChange: &TableNameChange{ - DropName: []SchemaTableName{ - {SchemaName: "old_db1", TableName: "old_table1"}, - {SchemaName: "old_db2", TableName: "old_table2"}, - }, - }, - } - - events := ddl.GetEvents() - require.Len(t, events, 2) - - require.Equal(t, "new_db1", events[0].SchemaName) - require.Equal(t, "new_table1", events[0].TableName) - require.Equal(t, "new_target_db1", events[0].GetTargetSchemaName()) - require.Equal(t, "new_target_table1", events[0].GetTargetTableName()) - require.Equal(t, "old_db1", events[0].ExtraSchemaName) - require.Equal(t, "old_table1", events[0].ExtraTableName) - require.Equal(t, "old_target_db1", events[0].GetTargetExtraSchemaName()) - require.Equal(t, "old_target_table1", events[0].GetTargetExtraTableName()) - - require.Equal(t, "new_db2", events[1].SchemaName) - require.Equal(t, "new_table2", events[1].TableName) - require.Equal(t, "new_target_db2", events[1].GetTargetSchemaName()) - require.Equal(t, "new_target_table2", events[1].GetTargetTableName()) - require.Equal(t, "old_db2", events[1].ExtraSchemaName) - require.Equal(t, "old_table2", events[1].ExtraTableName) - require.Equal(t, "old_target_db2", events[1].GetTargetExtraSchemaName()) - require.Equal(t, "old_target_table2", events[1].GetTargetExtraTableName()) -} diff --git a/pkg/common/event/dml_event.go b/pkg/common/event/dml_event.go index b5d3b7d3a0..4b95f28d2b 100644 --- a/pkg/common/event/dml_event.go +++ b/pkg/common/event/dml_event.go @@ -283,10 +283,6 @@ func (b *BatchDMLEvent) AssembleRows(tableInfo *common.TableInfo) { log.Panic("DMLEvent: TableInfo is nil") } - defer func() { - b.TableInfo.InitPrivateFields() - }() - // For local events (same node), rows are already set. if b.Rows != nil { if !tableInfo.TableName.IsRouted() { @@ -296,6 +292,9 @@ func (b *BatchDMLEvent) AssembleRows(tableInfo *common.TableInfo) { originVersion := b.TableInfo.GetUpdateTS() routedVersion := tableInfo.GetUpdateTS() if originVersion != routedVersion { + // TODO: Analyze partition DDL cases where local rows can be + // decoded with a source TableInfo version different from the + // routed TableInfo cached in the dispatcher. log.Panic("table version mismatch when set routed table info", zap.Uint64("originTableVersion", originVersion), zap.Uint64("routedTableVersion", routedVersion)) diff --git a/pkg/common/event/handshake_event.go b/pkg/common/event/handshake_event.go index 9c0c0cb1f1..a8b1f3b01d 100644 --- a/pkg/common/event/handshake_event.go +++ b/pkg/common/event/handshake_event.go @@ -205,8 +205,5 @@ func (e *HandshakeEvent) decodeV1(data []byte) error { return err } - // Initialize private fields after unmarshaling - e.TableInfo.InitPrivateFields() - return nil } diff --git a/pkg/common/event/util.go b/pkg/common/event/util.go index 1e12adab95..ecbf60bcde 100644 --- a/pkg/common/event/util.go +++ b/pkg/common/event/util.go @@ -141,7 +141,6 @@ func (s *EventTestHelper) storeTableInfo(schemaName string, tableInfo *timodel.T if info == nil { return } - info.InitPrivateFields() key := toTableInfosKey(info.GetSchemaName(), info.GetTableName()) if tableInfo.Partition != nil { if _, ok := s.partitionIDs[key]; !ok { diff --git a/pkg/common/table_info.go b/pkg/common/table_info.go index b40d5fbe65..19b6bf2141 100644 --- a/pkg/common/table_info.go +++ b/pkg/common/table_info.go @@ -117,7 +117,7 @@ type TableInfo struct { } func (ti *TableInfo) InitPrivateFields() { - if ti == nil { + if ti == nil || ti.columnSchema == nil { return } @@ -142,10 +142,10 @@ func (ti *TableInfo) InitPrivateFields() { // CloneWithRouting creates a shallow copy of TableInfo with routing applied. // The new TableInfo shares the same columnSchema, View, Sequence pointers -// but has its own TableName (with TargetSchema/TargetTable set) and uninitialized preSQLs. +// but has its own TableName (with TargetSchema/TargetTable set) and preSQLs. // This is safe because: // - columnSchema, View, Sequence are read-only after creation -// - preSQLs will be initialized later via InitPrivateFields() using the new TableName +// - preSQLs is initialized using the new TableName before the clone is returned // - TableName is a value type that gets copied func (ti *TableInfo) CloneWithRouting(targetSchema, targetTable string) *TableInfo { if ti == nil { @@ -178,6 +178,7 @@ func (ti *TableInfo) CloneWithRouting(targetSchema, targetTable string) *TableIn }) } + cloned.InitPrivateFields() return cloned } @@ -230,6 +231,7 @@ func UnmarshalJSONToTableInfo(data []byte) (*TableInfo, error) { if err != nil { return nil, err } + ti.InitPrivateFields() // when this tableInfo is released, we need to cut down the reference count of the columnSchema // This function should be appear when tableInfo is created as a pair. @@ -673,6 +675,7 @@ func newTableInfo(schema string, table string, tableID int64, isPartition bool, func NewTableInfo(schemaName string, tableName string, tableID int64, isPartition bool, columnSchema *columnSchema, tableInfo *model.TableInfo) *TableInfo { ti := newTableInfo(schemaName, tableName, tableID, isPartition, columnSchema, tableInfo) + ti.InitPrivateFields() // when this tableInfo is released, we need to cut down the reference count of the columnSchema // This function should be appeared when tableInfo is created as a pair. @@ -698,7 +701,5 @@ func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo { // do not call this method on the production code. func NewTableInfo4Decoder(schema string, tableInfo *model.TableInfo) *TableInfo { cs := NewColumnSchema4Decoder(tableInfo) - result := newTableInfo(schema, tableInfo.Name.O, tableInfo.ID, tableInfo.GetPartitionInfo() != nil, cs, tableInfo) - result.InitPrivateFields() - return result + return NewTableInfo(schema, tableInfo.Name.O, tableInfo.ID, tableInfo.GetPartitionInfo() != nil, cs, tableInfo) } diff --git a/pkg/common/table_info_test.go b/pkg/common/table_info_test.go index 5be97d2f37..b45d8c3a9c 100644 --- a/pkg/common/table_info_test.go +++ b/pkg/common/table_info_test.go @@ -162,6 +162,10 @@ func TestUnmarshalJSONToTableInfoRoundTrip(t *testing.T) { Columns: []*model.ColumnInfo{idCol, nameCol}, }) require.NotNil(t, source) + require.Contains(t, source.GetPreInsertSQL(), QuoteSchema("test", "t_roundtrip")) + + routed := source.CloneWithRouting("target_db", "target_table") + require.Contains(t, routed.GetPreInsertSQL(), QuoteSchema("target_db", "target_table")) data, err := source.Marshal() require.NoError(t, err) @@ -176,6 +180,7 @@ func TestUnmarshalJSONToTableInfoRoundTrip(t *testing.T) { require.Equal(t, len(source.GetColumns()), len(decoded.GetColumns())) require.Equal(t, source.GetColumns()[0].Name.O, decoded.GetColumns()[0].Name.O) require.Equal(t, source.GetColumns()[1].Name.O, decoded.GetColumns()[1].Name.O) + require.Contains(t, decoded.GetPreInsertSQL(), QuoteSchema("test", "t_roundtrip")) } func TestUnquoteName(t *testing.T) { diff --git a/pkg/sink/mysql/helper.go b/pkg/sink/mysql/helper.go index f795d8dff1..3bf2507618 100644 --- a/pkg/sink/mysql/helper.go +++ b/pkg/sink/mysql/helper.go @@ -406,7 +406,7 @@ func CreateMysqlDBConn(dsnStr string) (*sql.DB, error) { } func needSwitchDB(event *commonEvent.DDLEvent) bool { - if len(event.GetSchemaName()) == 0 { + if len(event.GetTargetSchemaName()) == 0 { return false } if event.GetDDLType() == timodel.ActionCreateSchema || event.GetDDLType() == timodel.ActionDropSchema { diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 6b06d71781..48009fcf5f 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -106,7 +106,7 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { } if shouldSwitchDB { - _, err = tx.ExecContext(ctx, "USE "+common.QuoteName(event.GetSchemaName())+";") + _, err = tx.ExecContext(ctx, "USE "+common.QuoteName(event.GetTargetSchemaName())+";") if err != nil { if rbErr := tx.Rollback(); rbErr != nil { log.Error("Failed to rollback", zap.Error(err)) diff --git a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go index f3cea842a1..e33a1da614 100644 --- a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go +++ b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go @@ -48,6 +48,26 @@ func TestBuildActiveActiveUpsertSQLMultiRows(t *testing.T) { require.Equal(t, common.RowTypeInsert, rowTypes) } +func TestBuildActiveActiveUpsertSQLUsesRoutedTargetTable(t *testing.T) { + writer, _, _ := newTestMysqlWriter(t) + defer writer.db.Close() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + job := helper.DDL2Job("create table t (id int primary key, name varchar(32), _tidb_origin_ts bigint unsigned null, _tidb_softdelete_time timestamp null);") + require.NotNil(t, job) + + event := helper.DML2Event("test", "t", "insert into t values (1, 'alice', 10, NULL)") + event.TableInfo = helper.GetTableInfo(job).CloneWithRouting("target_db", "target_table") + + rows, commitTs := writer.collectActiveActiveRows(event) + sql, _, _ := buildActiveActiveUpsertSQL(event.TableInfo, rows, commitTs) + require.Contains(t, sql, "INSERT INTO `target_db`.`target_table`") + require.NotContains(t, sql, "`test`.`t`") +} + func TestActiveActiveNormalSQLs(t *testing.T) { writer, _, _ := newTestMysqlWriter(t) defer writer.db.Close() diff --git a/pkg/sink/mysql/mysql_writer_test.go b/pkg/sink/mysql/mysql_writer_test.go index 7b169aaab6..75930c7677 100644 --- a/pkg/sink/mysql/mysql_writer_test.go +++ b/pkg/sink/mysql/mysql_writer_test.go @@ -25,9 +25,11 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/downstreamadapter/routing" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/config/kerneltype" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/metrics" @@ -354,6 +356,52 @@ func TestMysqlWriter_Flush_EmptyEvents(t *testing.T) { require.NoError(t, err) } +func TestMysqlWriterExecDDLUsesRoutedSchemaName(t *testing.T) { + router, err := routing.NewRouter( + common.NewChangefeedID4Test("test", "test"), + true, + []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: "{table}_routed", + }}, + ) + require.NoError(t, err) + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + createSchemaDDL := helper.DDL2Event("CREATE DATABASE `source_db`") + routedCreateSchemaDDL, err := router.ApplyToDDLEvent(createSchemaDDL) + require.NoError(t, err) + require.Equal(t, "target_db", routedCreateSchemaDDL.GetTargetSchemaName()) + + writer, db, mock := newTestMysqlWriter(t) + defer db.Close() + mock.ExpectBegin() + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(routedCreateSchemaDDL.Query).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + require.NoError(t, writer.execDDL(routedCreateSchemaDDL)) + require.NoError(t, mock.ExpectationsWereMet()) + + createTableDDL := helper.DDL2Event("CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)") + routedCreateTableDDL, err := router.ApplyToDDLEvent(createTableDDL) + require.NoError(t, err) + require.Equal(t, "target_db", routedCreateTableDDL.GetTargetSchemaName()) + require.Equal(t, "source_table_routed", routedCreateTableDDL.GetTargetTableName()) + + writer, db, mock = newTestMysqlWriter(t) + defer db.Close() + mock.ExpectBegin() + mock.ExpectExec("USE `target_db`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(routedCreateTableDDL.Query).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + require.NoError(t, writer.execDDL(routedCreateTableDDL)) + require.NoError(t, mock.ExpectationsWereMet()) +} + func TestMysqlWriter_FlushSyncPointEvent(t *testing.T) { writer, db, mock := newTestMysqlWriter(t) defer db.Close() diff --git a/pkg/sink/mysql/sql_builder.go b/pkg/sink/mysql/sql_builder.go index 54e0ec75da..97b672573d 100644 --- a/pkg/sink/mysql/sql_builder.go +++ b/pkg/sink/mysql/sql_builder.go @@ -180,7 +180,7 @@ func buildInsert( // sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1` func buildDelete(tableInfo *common.TableInfo, row commonEvent.RowChange) (string, []interface{}) { var builder strings.Builder - quoteTable := tableInfo.TableName.QuoteString() + quoteTable := tableInfo.TableName.QuoteTargetString() builder.WriteString("DELETE FROM ") builder.WriteString(quoteTable) builder.WriteString(" WHERE ") @@ -308,7 +308,7 @@ func buildActiveActiveUpsertSQL( var builder strings.Builder builder.WriteString("INSERT INTO ") - builder.WriteString(tableInfo.TableName.QuoteString()) + builder.WriteString(tableInfo.TableName.QuoteTargetString()) builder.WriteString(" (") for i, colName := range insertColumns { if i > 0 { diff --git a/pkg/sink/mysql/sql_builder_test.go b/pkg/sink/mysql/sql_builder_test.go index c88dc06830..5dde1c1e00 100644 --- a/pkg/sink/mysql/sql_builder_test.go +++ b/pkg/sink/mysql/sql_builder_test.go @@ -182,6 +182,23 @@ func TestBuildInsert(t *testing.T) { require.Equal(t, exportedArgs, args) } +func TestBuildDMLUsesRoutedTargetTable(t *testing.T) { + insert, deleteRow, updateRow, tableInfo := getRowForTest(t) + routedTableInfo := tableInfo.CloneWithRouting("target_db", "target_table") + + insertSQL, _ := buildInsert(routedTableInfo, insert, false) + require.Contains(t, insertSQL, "INSERT INTO `target_db`.`target_table`") + require.NotContains(t, insertSQL, "`test`.`t`") + + deleteSQL, _ := buildDelete(routedTableInfo, deleteRow) + require.Contains(t, deleteSQL, "DELETE FROM `target_db`.`target_table`") + require.NotContains(t, deleteSQL, "`test`.`t`") + + updateSQL, _ := buildUpdate(routedTableInfo, updateRow) + require.Contains(t, updateSQL, "UPDATE `target_db`.`target_table`") + require.NotContains(t, updateSQL, "`test`.`t`") +} + func TestBuildDelete(t *testing.T) { helper := event.NewEventTestHelper(t) defer helper.Close() diff --git a/pkg/sink/sqlmodel/multi_row.go b/pkg/sink/sqlmodel/multi_row.go index 6d6e33c5c8..673a1e7b5d 100644 --- a/pkg/sink/sqlmodel/multi_row.go +++ b/pkg/sink/sqlmodel/multi_row.go @@ -116,7 +116,7 @@ func GenInsertSQL(tp DMLType, changes ...*RowChange) (string, []interface{}) { } else { buf.WriteString("INSERT INTO ") } - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" (") columnNum := 0 var skipColIdx []int @@ -228,7 +228,7 @@ func genDeleteSQLV2(changes ...*RowChange) (string, []interface{}) { var buf strings.Builder buf.Grow(1024) buf.WriteString("DELETE FROM ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" WHERE (") // v2 uses the first row to define the tuple shape of the trailing IN list. @@ -282,7 +282,7 @@ func genUpdateSQLV2(changes ...*RowChange) (string, []any) { // Generate UPDATE `db`.`table` SET first := changes[0] buf.WriteString("UPDATE ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" SET ") // Pre-generate essential sub statements used after WHEN, WHERE. diff --git a/pkg/sink/sqlmodel/multi_row_test.go b/pkg/sink/sqlmodel/multi_row_test.go index b2e474c4d4..f552d9cf86 100644 --- a/pkg/sink/sqlmodel/multi_row_test.go +++ b/pkg/sink/sqlmodel/multi_row_test.go @@ -41,6 +41,69 @@ func TestGenDeleteMultiRows(t *testing.T) { require.Equal(t, []interface{}{1, 3}, args) } +func TestGenMultiRowSQLUsesRoutedTargetTable(t *testing.T) { + t.Parallel() + + sourceTableInfo, routedTableInfo := mockRoutedTableInfo( + t, + "CREATE TABLE tb1 (id INT PRIMARY KEY, name INT)", + "target_db", + "target_tb", + ) + + sourceTable := &sourceTableInfo.TableName + targetTable := &routedTableInfo.TableName + + insertChanges := []*RowChange{ + NewRowChange(sourceTable, targetTable, nil, []interface{}{1, 2}, sourceTableInfo, routedTableInfo, nil), + NewRowChange(sourceTable, targetTable, nil, []interface{}{3, 4}, sourceTableInfo, routedTableInfo, nil), + } + insertSQL, _ := GenInsertSQL(DMLInsert, insertChanges...) + require.Contains(t, insertSQL, "INSERT INTO `target_db`.`target_tb`") + require.NotContains(t, insertSQL, "`db`.`tb1`") + + deleteV2Changes := []*RowChange{ + NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, nil, sourceTableInfo, routedTableInfo, nil), + NewRowChange(sourceTable, targetTable, []interface{}{3, 4}, nil, sourceTableInfo, routedTableInfo, nil), + } + deleteV2SQL, _ := GenDeleteSQL(DefaultWhereClause, deleteV2Changes...) + require.Contains(t, deleteV2SQL, "DELETE FROM `target_db`.`target_tb`") + require.NotContains(t, deleteV2SQL, "`db`.`tb1`") + + updateV2Changes := []*RowChange{ + NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, []interface{}{1, 20}, sourceTableInfo, routedTableInfo, nil), + NewRowChange(sourceTable, targetTable, []interface{}{3, 4}, []interface{}{3, 40}, sourceTableInfo, routedTableInfo, nil), + } + updateV2SQL, _ := GenUpdateSQL(DefaultWhereClause, updateV2Changes...) + require.Contains(t, updateV2SQL, "UPDATE `target_db`.`target_tb`") + require.NotContains(t, updateV2SQL, "`db`.`tb1`") + + nullSourceTableInfo, nullRoutedTableInfo := mockRoutedTableInfo( + t, + "CREATE TABLE tb2 (id INT, name INT)", + "target_db", + "target_tb_v1", + ) + nullSourceTable := &nullSourceTableInfo.TableName + nullTargetTable := &nullRoutedTableInfo.TableName + + deleteV1Changes := []*RowChange{ + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{1, nil}, nil, nullSourceTableInfo, nullRoutedTableInfo, nil), + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{3, 4}, nil, nullSourceTableInfo, nullRoutedTableInfo, nil), + } + deleteV1SQL, _ := GenDeleteSQL(DefaultWhereClause, deleteV1Changes...) + require.Contains(t, deleteV1SQL, "DELETE FROM `target_db`.`target_tb_v1`") + require.NotContains(t, deleteV1SQL, "`db`.`tb2`") + + updateV1Changes := []*RowChange{ + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{1, nil}, []interface{}{1, 20}, nullSourceTableInfo, nullRoutedTableInfo, nil), + NewRowChange(nullSourceTable, nullTargetTable, []interface{}{3, 4}, []interface{}{3, 40}, nullSourceTableInfo, nullRoutedTableInfo, nil), + } + updateV1SQL, _ := GenUpdateSQL(DefaultWhereClause, updateV1Changes...) + require.Contains(t, updateV1SQL, "UPDATE `target_db`.`target_tb_v1`") + require.NotContains(t, updateV1SQL, "`db`.`tb2`") +} + func TestGenDeleteMultiRowsWithNullFallbackToV1(t *testing.T) { t.Parallel() diff --git a/pkg/sink/sqlmodel/multi_row_v1.go b/pkg/sink/sqlmodel/multi_row_v1.go index fbb15f97fb..f9b2b3d38e 100644 --- a/pkg/sink/sqlmodel/multi_row_v1.go +++ b/pkg/sink/sqlmodel/multi_row_v1.go @@ -38,7 +38,7 @@ func genDeleteSQLV1(changes ...*RowChange) (string, []interface{}) { var buf strings.Builder buf.Grow(1024) buf.WriteString("DELETE FROM ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" WHERE (") allArgs := make([]interface{}, 0, len(changes)*CommonIndexColumnsCount) @@ -67,7 +67,7 @@ func genUpdateSQLV1(changes ...*RowChange) (string, []any) { // Generate UPDATE `db`.`table` SET first := changes[0] buf.WriteString("UPDATE ") - buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(first.targetTable.QuoteTargetString()) buf.WriteString(" SET ") // Pre-generate essential sub statements used after WHEN and in the final WHERE. diff --git a/pkg/sink/sqlmodel/row_change.go b/pkg/sink/sqlmodel/row_change.go index dfd5678e9c..30491cdd46 100644 --- a/pkg/sink/sqlmodel/row_change.go +++ b/pkg/sink/sqlmodel/row_change.go @@ -173,7 +173,7 @@ func (r *RowChange) String() string { // TargetTableID returns a ID string for target table. func (r *RowChange) TargetTableID() string { - return r.targetTable.QuoteString() + return r.targetTable.QuoteTargetString() } // SourceTableInfo returns the TableInfo of source table. @@ -274,7 +274,7 @@ func (r *RowChange) genDeleteSQL() (string, []interface{}) { var buf strings.Builder buf.Grow(1024) buf.WriteString("DELETE FROM ") - buf.WriteString(r.targetTable.QuoteString()) + buf.WriteString(r.targetTable.QuoteTargetString()) buf.WriteString(" WHERE ") whereArgs := r.genWhere(&buf) buf.WriteString(" LIMIT 1") @@ -293,7 +293,7 @@ func (r *RowChange) genUpdateSQL() (string, []interface{}) { var buf strings.Builder buf.Grow(2048) buf.WriteString("UPDATE ") - buf.WriteString(r.targetTable.QuoteString()) + buf.WriteString(r.targetTable.QuoteTargetString()) buf.WriteString(" SET ") // Build target generated columns lower names set to accelerate following check diff --git a/pkg/sink/sqlmodel/row_change_test.go b/pkg/sink/sqlmodel/row_change_test.go index f6d67c5e27..34548481a7 100644 --- a/pkg/sink/sqlmodel/row_change_test.go +++ b/pkg/sink/sqlmodel/row_change_test.go @@ -40,6 +40,11 @@ func mockTableInfo(t *testing.T, sql string) *common.TableInfo { return common.WrapTableInfo("db", rawTi) } +func mockRoutedTableInfo(t *testing.T, createTableSQL, targetSchema, targetTable string) (*common.TableInfo, *common.TableInfo) { + sourceTableInfo := mockTableInfo(t, createTableSQL) + return sourceTableInfo, sourceTableInfo.CloneWithRouting(targetSchema, targetTable) +} + type dpanicSuite struct { suite.Suite } @@ -92,6 +97,34 @@ func TestNewRowChange(t *testing.T) { require.Equal(t, expected, actual) } +func TestGenSQLUsesRoutedTargetTable(t *testing.T) { + sourceTableInfo, routedTableInfo := mockRoutedTableInfo( + t, + "CREATE TABLE tb1 (id INT PRIMARY KEY, name INT)", + "target_db", + "target_tb", + ) + + sourceTable := &sourceTableInfo.TableName + targetTable := &routedTableInfo.TableName + + insertChange := NewRowChange(sourceTable, targetTable, nil, []interface{}{1, 2}, sourceTableInfo, routedTableInfo, nil) + insertSQL, _ := insertChange.GenSQL(DMLInsert) + require.Contains(t, insertSQL, "`target_db`.`target_tb`") + require.NotContains(t, insertSQL, "`db`.`tb1`") + require.Equal(t, "`target_db`.`target_tb`", insertChange.TargetTableID()) + + deleteChange := NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, nil, sourceTableInfo, routedTableInfo, nil) + deleteSQL, _ := deleteChange.GenSQL(DMLDelete) + require.Contains(t, deleteSQL, "DELETE FROM `target_db`.`target_tb`") + require.NotContains(t, deleteSQL, "`db`.`tb1`") + + updateChange := NewRowChange(sourceTable, targetTable, []interface{}{1, 2}, []interface{}{1, 3}, sourceTableInfo, routedTableInfo, nil) + updateSQL, _ := updateChange.GenSQL(DMLUpdate) + require.Contains(t, updateSQL, "UPDATE `target_db`.`target_tb`") + require.NotContains(t, updateSQL, "`db`.`tb1`") +} + func (s *dpanicSuite) TestRowChangeType() { change := &RowChange{preValues: []interface{}{1}} change.calculateType() diff --git a/tests/integration_tests/run_light_it_in_ci.sh b/tests/integration_tests/run_light_it_in_ci.sh index b70dd44e6b..a896e3ea26 100755 --- a/tests/integration_tests/run_light_it_in_ci.sh +++ b/tests/integration_tests/run_light_it_in_ci.sh @@ -48,7 +48,7 @@ mysql_groups=( # G07 'fail_over_ddl_H changefeed_update_config synced_status_with_redo' # G08 - 'capture_session_done_during_task changefeed_dup_error_restart mysql_sink_retry fail_over_ddl_I' + 'capture_session_done_during_task changefeed_dup_error_restart mysql_sink_retry fail_over_ddl_I table_route' # G09 'sequence cdc_server_tips ddl_sequence server_config_compatibility log_redaction fail_over_ddl_J' # G10 diff --git a/tests/integration_tests/table_route/conf/changefeed.toml b/tests/integration_tests/table_route/conf/changefeed.toml new file mode 100644 index 0000000000..0e9f9193b9 --- /dev/null +++ b/tests/integration_tests/table_route/conf/changefeed.toml @@ -0,0 +1,18 @@ +# Changefeed configuration for table route integration test +# This tests schema and table routing for MySQL sinks + +[filter] +rules = ['source_db.*', 'source_extra_db.*'] + +[sink] +# Dispatch rules with schema and table routing +# Route source_db.* to target_db.* with table suffix "_routed" +[[sink.dispatchers]] +matcher = ['source_db.*'] +target-schema = 'target_db' +target-table = '{table}_routed' + +[[sink.dispatchers]] +matcher = ['source_extra_db.*'] +target-schema = 'target_extra_db' +target-table = '{table}_routed' diff --git a/tests/integration_tests/table_route/conf/diff_config.toml b/tests/integration_tests/table_route/conf/diff_config.toml new file mode 100644 index 0000000000..4a4afde771 --- /dev/null +++ b/tests/integration_tests/table_route/conf/diff_config.toml @@ -0,0 +1,128 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/table_route/sync_diff/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = [ + "target_db.users_routed", + "target_db.orders_routed", + "target_db.products_routed", + "target_db.products_backup_routed", + "target_db.renamed_table_routed", + "target_db.multi_rename_a_new_routed", + "target_db.multi_rename_b_new_routed", + "target_db.partitioned_events_routed", + "target_db.truncate_test_routed", + "target_db.finish_mark_routed", + "target_extra_db.external_users_routed", + "target_extra_db.cross_move_target_routed", + ] + +[data-sources] +[data-sources.tidb0] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + route-rules = [ + "users", + "orders", + "products", + "products_backup", + "renamed_table", + "multi_rename_a_new", + "multi_rename_b_new", + "partitioned_events", + "truncate_test", + "finish_mark", + "external_users", + "cross_move_target", + ] + +[data-sources.mysql1] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" + +[routes.users] +schema-pattern = "source_db" +table-pattern = "users" +target-schema = "target_db" +target-table = "users_routed" + +[routes.orders] +schema-pattern = "source_db" +table-pattern = "orders" +target-schema = "target_db" +target-table = "orders_routed" + +[routes.products] +schema-pattern = "source_db" +table-pattern = "products" +target-schema = "target_db" +target-table = "products_routed" + +[routes.products_backup] +schema-pattern = "source_db" +table-pattern = "products_backup" +target-schema = "target_db" +target-table = "products_backup_routed" + +[routes.renamed_table] +schema-pattern = "source_db" +table-pattern = "renamed_table" +target-schema = "target_db" +target-table = "renamed_table_routed" + +[routes.multi_rename_a_new] +schema-pattern = "source_db" +table-pattern = "multi_rename_a_new" +target-schema = "target_db" +target-table = "multi_rename_a_new_routed" + +[routes.multi_rename_b_new] +schema-pattern = "source_db" +table-pattern = "multi_rename_b_new" +target-schema = "target_db" +target-table = "multi_rename_b_new_routed" + +[routes.partitioned_events] +schema-pattern = "source_db" +table-pattern = "partitioned_events" +target-schema = "target_db" +target-table = "partitioned_events_routed" + +[routes.truncate_test] +schema-pattern = "source_db" +table-pattern = "truncate_test" +target-schema = "target_db" +target-table = "truncate_test_routed" + +[routes.finish_mark] +schema-pattern = "source_db" +table-pattern = "finish_mark" +target-schema = "target_db" +target-table = "finish_mark_routed" + +[routes.external_users] +schema-pattern = "source_extra_db" +table-pattern = "external_users" +target-schema = "target_extra_db" +target-table = "external_users_routed" + +[routes.cross_move_target] +schema-pattern = "source_extra_db" +table-pattern = "cross_move_target" +target-schema = "target_extra_db" +target-table = "cross_move_target_routed" diff --git a/tests/integration_tests/table_route/data/test.sql b/tests/integration_tests/table_route/data/test.sql new file mode 100644 index 0000000000..247d1b0bf4 --- /dev/null +++ b/tests/integration_tests/table_route/data/test.sql @@ -0,0 +1,215 @@ +-- Test mixed DDL and DML operations for table route. +DROP DATABASE IF EXISTS source_db; +DROP DATABASE IF EXISTS source_extra_db; +CREATE DATABASE source_db; +CREATE DATABASE source_extra_db; +USE source_db; + +-- ============================================ +-- DDL: CREATE TABLE with initial DML +-- ============================================ +CREATE TABLE users ( + id INT PRIMARY KEY, + name VARCHAR(100), + email VARCHAR(100) +); + +CREATE TABLE orders ( + id INT PRIMARY KEY, + user_id INT, + amount DECIMAL(10, 2) +); + +INSERT INTO users VALUES (1, 'Alice', 'alice@example.com'); +INSERT INTO users VALUES (2, 'Bob', 'bob@example.com'); + +INSERT INTO orders VALUES (1, 1, 100.00); +INSERT INTO orders VALUES (2, 2, 200.00); + +-- ============================================ +-- DML: INSERT more data +-- ============================================ +INSERT INTO users VALUES (3, 'Charlie', 'charlie@example.com'); +INSERT INTO orders VALUES (3, 3, 300.00); + +-- ============================================ +-- DML: UPDATE data +-- ============================================ +UPDATE users SET email = 'alice_updated@example.com' WHERE id = 1; +UPDATE orders SET amount = 150.00 WHERE id = 1; + +-- ============================================ +-- DML: DELETE data +-- ============================================ +DELETE FROM orders WHERE id = 2; + +-- ============================================ +-- DDL: ALTER TABLE ADD COLUMN +-- ============================================ +ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP; + +-- ============================================ +-- DDL: CREATE TABLE (new table should be routed) +-- ============================================ +CREATE TABLE products ( + id INT PRIMARY KEY, + name VARCHAR(100), + price DECIMAL(10, 2) +); + +-- Widget starts at 29.99 (>= 15.00), so DELETE WHERE price < 15.00 won't affect it +-- unless the UPDATE (price = 12.99) is applied first +INSERT INTO products VALUES (1, 'Widget', 29.99); +INSERT INTO products VALUES (2, 'Gadget', 19.99); + +-- ============================================ +-- DDL: CREATE TABLE LIKE +-- ============================================ +CREATE TABLE products_backup LIKE products; + +INSERT INTO products_backup VALUES (1, 'Widget', 29.99); + +-- ============================================ +-- DDL: ALTER TABLE DROP COLUMN +-- ============================================ +ALTER TABLE users DROP COLUMN created_at; + +-- ============================================ +-- DDL: ALTER TABLE ADD INDEX +-- ============================================ +ALTER TABLE orders ADD INDEX idx_user_id (user_id); + +-- ============================================ +-- DDL: CROSS DATABASE +-- ============================================ +CREATE TABLE `source_extra_db`.`external_users` LIKE `source_db`.`users`; +INSERT INTO `source_extra_db`.`external_users` + SELECT `id`, `name`, `email` FROM `source_db`.`users` WHERE `id` <= 2; +UPDATE `source_extra_db`.`external_users` SET `email` = 'external_alice@example.com' WHERE `id` = 1; + +CREATE TABLE `source_db`.`cross_move_source` ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +INSERT INTO `source_db`.`cross_move_source` VALUES (1, 'move_source'); +RENAME TABLE `source_db`.`cross_move_source` TO `source_extra_db`.`cross_move_target`; +INSERT INTO `source_extra_db`.`cross_move_target` VALUES (2, 'move_target'); + +-- ============================================ +-- DDL: RENAME TABLE +-- ============================================ +CREATE TABLE temp_table ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +INSERT INTO temp_table VALUES (1, 'test'); + +RENAME TABLE temp_table TO renamed_table; + +-- Verify renamed table works with DML +INSERT INTO renamed_table VALUES (2, 'test2'); +UPDATE renamed_table SET value = 'updated' WHERE id = 1; + +-- ============================================ +-- DDL: RENAME TABLE with multiple table pairs +-- ============================================ +CREATE TABLE multi_rename_a ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +CREATE TABLE multi_rename_b ( + id INT PRIMARY KEY, + value VARCHAR(50) +); +INSERT INTO multi_rename_a VALUES (1, 'a'); +INSERT INTO multi_rename_b VALUES (1, 'b'); + +RENAME TABLE multi_rename_a TO multi_rename_a_new, multi_rename_b TO multi_rename_b_new; + +INSERT INTO multi_rename_a_new VALUES (2, 'a2'); +UPDATE multi_rename_b_new SET value = 'b2' WHERE id = 1; + +-- ============================================ +-- DDL: CREATE VIEW and DROP VIEW +-- ============================================ +CREATE VIEW `source_db`.`user_order_view` AS + SELECT `u`.`id`, `u`.`name`, `o`.`amount` + FROM `source_db`.`users` AS `u` + JOIN `source_db`.`orders` AS `o` ON `u`.`id` = `o`.`user_id`; + +CREATE VIEW `source_db`.`transient_view` AS + SELECT `id`, `name` FROM `source_db`.`users`; + +DROP VIEW `source_db`.`transient_view`; + +-- ============================================ +-- DDL: PARTITION TABLE +-- ============================================ +CREATE TABLE partitioned_events ( + id INT, + bucket INT NOT NULL, + value VARCHAR(50), + PRIMARY KEY (id, bucket) +) PARTITION BY RANGE (bucket) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20) +); + +INSERT INTO partitioned_events VALUES (1, 5, 'p0'); +INSERT INTO partitioned_events VALUES (2, 15, 'p1'); +ALTER TABLE partitioned_events ADD PARTITION (PARTITION p2 VALUES LESS THAN (30)); +INSERT INTO partitioned_events VALUES (3, 25, 'p2'); +ALTER TABLE partitioned_events TRUNCATE PARTITION p0; +INSERT INTO partitioned_events VALUES (4, 6, 'p0_after_truncate'); +ALTER TABLE partitioned_events DROP PARTITION p1; +INSERT INTO partitioned_events VALUES (5, 26, 'p2_more'); + +-- ============================================ +-- DDL: TRUNCATE TABLE +-- ============================================ +CREATE TABLE truncate_test ( + id INT PRIMARY KEY, + data VARCHAR(100) +); +INSERT INTO truncate_test VALUES (1, 'will be truncated'); +INSERT INTO truncate_test VALUES (2, 'also truncated'); + +TRUNCATE TABLE truncate_test; + +-- Insert new data after truncate +INSERT INTO truncate_test VALUES (10, 'after truncate'); + +-- ============================================ +-- DDL: DROP TABLE +-- ============================================ +CREATE TABLE to_be_dropped ( + id INT PRIMARY KEY +); +INSERT INTO to_be_dropped VALUES (1); + +DROP TABLE to_be_dropped; + +-- ============================================ +-- Mixed operations on existing tables +-- ============================================ +-- More inserts +INSERT INTO users VALUES (4, 'Diana', 'diana@example.com'); +INSERT INTO users VALUES (5, 'Eve', 'eve@example.com'); + +-- Batch update +UPDATE users SET name = CONCAT(name, '_v2') WHERE id IN (3, 4); + +-- More deletes +DELETE FROM users WHERE id = 5; + +-- Update with multiple columns +UPDATE products SET name = 'Super Widget', price = 12.99 WHERE id = 1; + +-- Delete with condition +DELETE FROM products WHERE price < 15.00; + +-- ============================================ +-- Create finish marker table +-- ============================================ +CREATE TABLE finish_mark (id INT PRIMARY KEY); +INSERT INTO finish_mark VALUES (1); diff --git a/tests/integration_tests/table_route/run.sh b/tests/integration_tests/table_route/run.sh new file mode 100755 index 0000000000..3775e0c78b --- /dev/null +++ b/tests/integration_tests/table_route/run.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source "$CUR/../_utils/test_prepare" +WORK_DIR="$OUT_DIR/$TEST_NAME" +CDC_BINARY=cdc.test +SINK_TYPE="$1" + +function run() { + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf "$WORK_DIR" && mkdir -p "$WORK_DIR" + + start_tidb_cluster --workdir "$WORK_DIR" + + run_cdc_server --workdir "$WORK_DIR" --binary "$CDC_BINARY" --cluster-id "$KEYSPACE_NAME" + + SINK_URI="mysql://normal:123456@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/" + cdc_cli_changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" + + run_sql_file "$CUR/data/test.sql" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + + check_table_exists target_db.finish_mark_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" 90 + check_sync_diff "$WORK_DIR" "$CUR/conf/diff_config.toml" 120 + + check_table_not_exists source_db.users "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists source_db.orders "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists source_extra_db.external_users "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.temp_table_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.cross_move_source_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.multi_rename_a_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.multi_rename_b_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_table_not_exists target_db.to_be_dropped_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + run_sql "SHOW CREATE VIEW target_db.user_order_view_routed" "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + check_contains "user_order_view_routed" + check_contains "users_routed" + check_contains "orders_routed" + check_table_not_exists target_db.transient_view_routed "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + + run_sql "DROP DATABASE source_extra_db" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + run_sql "DROP DATABASE source_db" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + check_db_not_exists target_extra_db "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" 90 + check_db_not_exists target_db "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" 90 + + cleanup_process "$CDC_BINARY" +} + +trap 'stop_test "$WORK_DIR"' EXIT +run "$@" +check_logs "$WORK_DIR" +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"