From e12b1e480cc87820a49c8768cbbd5dc2a4afbc45 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 18 Jun 2026 03:38:14 +0000 Subject: [PATCH 1/2] init Signed-off-by: wk989898 --- cmd/kafka-consumer/writer.go | 129 +++++++++++++---------------- cmd/kafka-consumer/writer_test.go | 76 ++--------------- cmd/pulsar-consumer/writer.go | 107 +++++++++--------------- cmd/pulsar-consumer/writer_test.go | 58 ------------- cmd/util/event_group.go | 58 ++----------- cmd/util/event_group_test.go | 44 ---------- 6 files changed, 107 insertions(+), 365 deletions(-) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 88c058f2cc..00626f0956 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -157,12 +157,6 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *event.DDLEvent) error { tableIDs := w.getBlockTableIDs(ddl) commitTs := ddl.GetCommitTs() resolvedEvents := make([]*event.DMLEvent, 0) - // resolvedGroups records which EventsGroup has flushed events so we can - // advance its AppliedWatermark after the flush is fully finished. - resolvedGroups := make([]struct { - group *util.EventsGroup - maxCommitTs uint64 - }, 0) for tableID := range tableIDs { for _, progress := range w.progresses { g, ok := progress.eventsGroup[tableID] @@ -171,19 +165,7 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *event.DDLEvent) error { } before := len(resolvedEvents) resolvedEvents = g.ResolveInto(commitTs, resolvedEvents) - resolvedCount := len(resolvedEvents) - before - if resolvedCount == 0 { - continue - } - - resolvedGroups = append(resolvedGroups, struct { - group *util.EventsGroup - maxCommitTs uint64 - }{ - group: g, - maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(), - }) - total += resolvedCount + total += len(resolvedEvents) - before } } @@ -211,11 +193,6 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *event.DDLEvent) error { log.Info("flush DML events before DDL done", zap.Uint64("DDLCommitTs", commitTs), zap.Int("total", total), zap.Duration("duration", time.Since(start)), zap.Any("tables", tableIDs)) - for _, item := range resolvedGroups { - if item.maxCommitTs > item.group.AppliedWatermark { - item.group.AppliedWatermark = item.maxCommitTs - } - } return w.mysqlSink.WriteBlockEvent(ddl) case <-ticker.C: log.Warn("DML events cannot be flushed in time", @@ -293,29 +270,11 @@ func (w *writer) flushDMLEventsByWatermark(ctx context.Context) error { watermark := w.globalWatermark() resolvedEvents := make([]*event.DMLEvent, 0) - // resolvedGroups records which EventsGroup has flushed events so we can - // advance its AppliedWatermark after the flush is fully finished. - resolvedGroups := make([]struct { - group *util.EventsGroup - maxCommitTs uint64 - }, 0) for _, p := range w.progresses { for _, group := range p.eventsGroup { before := len(resolvedEvents) resolvedEvents = group.ResolveInto(watermark, resolvedEvents) - resolvedCount := len(resolvedEvents) - before - if resolvedCount == 0 { - continue - } - - resolvedGroups = append(resolvedGroups, struct { - group *util.EventsGroup - maxCommitTs uint64 - }{ - group: group, - maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(), - }) - total += resolvedCount + total += len(resolvedEvents) - before } } if total == 0 { @@ -343,11 +302,6 @@ func (w *writer) flushDMLEventsByWatermark(ctx context.Context) error { case <-done: log.Info("flush DML events done", zap.Uint64("watermark", watermark), zap.Int("total", total), zap.Duration("duration", time.Since(start))) - for _, item := range resolvedGroups { - if item.maxCommitTs > item.group.AppliedWatermark { - item.group.AppliedWatermark = item.maxCommitTs - } - } return nil case <-ticker.C: log.Warn("DML events cannot be flushed in time", zap.Uint64("watermark", watermark), @@ -603,39 +557,70 @@ func (w *writer) appendRow2Group(dml *event.DMLEvent, progress *partitionProgres group = util.NewEventsGroup(progress.partition, tableID) progress.eventsGroup[tableID] = group } - // IMPORTANT: Kafka offsets are append-only, but CommitTs can go backwards after - // a TiCDC restart/retry (at-least-once replay). We must not drop such events - // solely based on a "seen" watermark (e.g. HighWatermark). The only safe - // ignore condition is "already flushed to downstream". - if commitTs <= group.AppliedWatermark { - log.Warn("DML event replayed after applied, ignore it", + if commitTs < progress.watermark { + log.Warn("DML Event fallback row, since less than the partition watermark, ignore it", zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition), zap.Uint64("commitTs", commitTs), zap.Any("offset", offset), - zap.Uint64("appliedWatermark", group.AppliedWatermark), zap.Uint64("highWatermark", group.HighWatermark), - zap.Uint64("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.String("schema", schema), zap.String("table", table), zap.Any("protocol", w.protocol)) + zap.Uint64("watermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", schema), zap.String("table", table)) return } - forceInsert := commitTs < group.HighWatermark || commitTs < progress.watermark || w.enableTableAcrossNodes - if forceInsert { - log.Warn("DML event commit ts fallback, append with forceInsert", + if commitTs >= group.HighWatermark { + group.Append(dml, false) + log.Debug("DML event append to the group", zap.Int32("partition", group.Partition), zap.Any("offset", offset), - zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), - zap.Uint64("appliedWatermark", group.AppliedWatermark), - zap.Uint64("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.Uint64("commitTs", commitTs), zap.Uint64("HighWatermark", group.HighWatermark), zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), - zap.Stringer("eventType", dml.RowTypes[0]), zap.Any("protocol", w.protocol), - zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition)) + zap.Stringer("eventType", dml.RowTypes[0])) + return + } + if w.enableTableAcrossNodes { + log.Warn("DML events fallback, but enableTableAcrossNodes is true, still append it", + zap.Int32("partition", group.Partition), zap.Any("offset", offset), + zap.Uint64("commitTs", commitTs), zap.Uint64("HighWatermark", group.HighWatermark), + zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), + zap.Stringer("eventType", dml.RowTypes[0])) group.Append(dml, true) return } - group.Append(dml, false) - log.Info("DML event append to the group", - zap.Int32("partition", group.Partition), zap.Any("offset", offset), - zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), - zap.Uint64("appliedWatermark", group.AppliedWatermark), - zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), - zap.Stringer("eventType", dml.RowTypes[0])) + switch w.protocol { + case config.ProtocolSimple, config.ProtocolDebezium: + // simple protocol set the table id for all row message, it can be known which table the row message belongs to, + // also consider the table partition. + // open protocol set the partition table id if the table is partitioned. + // for normal table, the table id is generated by the fake table id generator by using schema and table name. + // so one event group for one normal table or one table partition, replayed messages can be ignored. + log.Warn("DML event fallback row, since less than the group high watermark, ignore it", + zap.Int32("partition", progress.partition), zap.Any("offset", offset), + zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), + zap.Any("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), + zap.Stringer("eventType", dml.RowTypes[0]), + // zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), + zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition)) + case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro: + // for partition table, the canal-json, avro and open-protocol message cannot assign physical table id to each dml message, + // we cannot distinguish whether it's a real fallback event or not, still append it. + if w.partitionTableAccessor.IsPartitionTable(schema, table) { + log.Warn("DML events fallback, but it's canal-json, avro or open-protocol and the table is a partition table, still append it", + zap.Int32("partition", group.Partition), zap.Any("offset", offset), + zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), + zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), + zap.Stringer("eventType", dml.RowTypes[0])) + group.Append(dml, true) + return + } + log.Warn("DML event fallback row, since less than the group high watermark, ignore it", + zap.Int32("partition", progress.partition), zap.Any("offset", offset), + zap.Uint64("commitTs", commitTs), zap.Uint64("HighWatermark", group.HighWatermark), + zap.Any("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), + zap.Stringer("eventType", dml.RowTypes[0]), + // zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), + zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition)) + default: + log.Panic("unknown protocol", zap.Any("protocol", w.protocol)) + } } func openDB(ctx context.Context, dsn string) (*sql.DB, error) { diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go index ec9aa34a43..0ce56a7c82 100644 --- a/cmd/kafka-consumer/writer_test.go +++ b/cmd/kafka-consumer/writer_test.go @@ -17,17 +17,12 @@ import ( "context" "testing" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/golang/mock/gomock" - "github.com/pingcap/ticdc/cmd/util" - "github.com/pingcap/ticdc/downstreamadapter/sink/eventrouter" sinkmock "github.com/pingcap/ticdc/downstreamadapter/sink/mock" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/config" - codecCommon "github.com/pingcap/ticdc/pkg/sink/codec/common" + codeccommon "github.com/pingcap/ticdc/pkg/sink/codec/common" timodel "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" ) @@ -82,7 +77,7 @@ func TestWriterWrite_executesIndependentCreateTableWithoutWatermark(t *testing.T }, } - w.Write(ctx, codecCommon.MessageTypeDDL) + w.Write(ctx, codeccommon.MessageTypeDDL) require.Equal(t, []string{"CREATE TABLE `test`.`t` (`id` INT PRIMARY KEY)"}, *ddls) require.Empty(t, w.ddlList) @@ -128,12 +123,12 @@ func TestWriterWrite_preservesOrderWhenBlockedDDLNotReady(t *testing.T) { }, } - w.Write(ctx, codecCommon.MessageTypeDDL) + w.Write(ctx, codeccommon.MessageTypeDDL) require.Empty(t, *ddls) require.Len(t, w.ddlList, 2) p.watermark = 200 - w.Write(ctx, codecCommon.MessageTypeDDL) + w.Write(ctx, codeccommon.MessageTypeDDL) require.Equal(t, []string{ "ALTER TABLE `test`.`t` ADD COLUMN `c2` INT", "CREATE TABLE `test`.`t2` (`id` INT PRIMARY KEY)", @@ -174,12 +169,12 @@ func TestWriterWrite_doesNotBypassWatermarkForCreateTableLike(t *testing.T) { }, } - w.Write(ctx, codecCommon.MessageTypeDDL) + w.Write(ctx, codeccommon.MessageTypeDDL) require.Empty(t, *ddls) require.Len(t, w.ddlList, 1) p.watermark = 200 - w.Write(ctx, codecCommon.MessageTypeDDL) + w.Write(ctx, codeccommon.MessageTypeDDL) require.Equal(t, []string{"CREATE TABLE `test`.`t2` LIKE `test`.`t1`"}, *ddls) require.Empty(t, w.ddlList) } @@ -256,7 +251,7 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) { }, } - w.Write(ctx, codecCommon.MessageTypeDDL) + w.Write(ctx, codeccommon.MessageTypeDDL) require.Equal(t, []string{ "CREATE TABLE `common_1`.`add_and_drop_columns` (`id` INT(11) NOT NULL PRIMARY KEY)", @@ -267,60 +262,3 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) { require.Len(t, w.ddlList, 1) require.Equal(t, "CREATE TABLE `common_1`.`a` (`a` BIGINT PRIMARY KEY,`b` INT)", w.ddlList[0].Query) } - -func TestAppendRow2Group_DoesNotDropCommitTsFallbackBeforeApplied(t *testing.T) { - // Scenario: - // 1) TiCDC writes DML messages to Kafka in commitTs order. - // 2) Under network partition / changefeed restart, TiCDC may replay older commitTs, - // which will be appended to Kafka at a larger offset (commitTs appears to go backwards). - // - // The kafka-consumer must not drop these "fallback commitTs" events unless they have - // already been flushed to downstream (AppliedWatermark), otherwise the replay cannot - // heal the missing window. - replicaCfg := config.GetDefaultReplicaConfig() - eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, false) - require.NoError(t, err) - - w := &writer{ - progresses: []*partitionProgress{{partition: 0, eventsGroup: make(map[int64]*util.EventsGroup)}}, - eventRouter: eventRouter, - protocol: config.ProtocolCanalJSON, - partitionTableAccessor: codecCommon.NewPartitionTableAccessor(), - } - - newDMLEvent := func(tableID int64, commitTs uint64) *commonEvent.DMLEvent { - return &commonEvent.DMLEvent{ - PhysicalTableID: tableID, - CommitTs: commitTs, - RowTypes: []common.RowType{common.RowTypeUpdate}, - Rows: chunk.NewChunkWithCapacity(nil, 0), - TableInfo: &common.TableInfo{ - TableName: common.TableName{Schema: "test", Table: "t"}, - }, - } - } - - progress := w.progresses[0] - - // Step 1: observe a larger commitTs first (e.g. produced before restart). - w.appendRow2Group(newDMLEvent(1, 200), progress, kafka.Offset(10)) - - // Step 2: observe a smaller commitTs later (e.g. replayed after restart). - w.appendRow2Group(newDMLEvent(1, 100), progress, kafka.Offset(11)) - - group := progress.eventsGroup[1] - require.NotNil(t, group) - - resolvedEvents := make([]*commonEvent.DMLEvent, 0) - // Expect: commitTs=100 is still kept and can be resolved. - resolved := group.ResolveInto(150, nil) - require.Len(t, resolved, 1) - require.Equal(t, uint64(100), resolved[0].CommitTs) - - // Step 3: once downstream has flushed beyond commitTs=100, the replay is safe to ignore. - resolvedEvents = make([]*commonEvent.DMLEvent, 0) - group.AppliedWatermark = 200 - w.appendRow2Group(newDMLEvent(1, 100), progress, kafka.Offset(12)) - resolved = group.ResolveInto(150, resolvedEvents) - require.Empty(t, resolved) -} diff --git a/cmd/pulsar-consumer/writer.go b/cmd/pulsar-consumer/writer.go index fbbf94c3aa..bc30fb9752 100644 --- a/cmd/pulsar-consumer/writer.go +++ b/cmd/pulsar-consumer/writer.go @@ -150,12 +150,6 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *commonEvent.DDLEvent) e tableIDs := w.getBlockTableIDs(ddl) commitTs := ddl.GetCommitTs() resolvedEvents := make([]*commonEvent.DMLEvent, 0) - // resolvedGroups records which EventsGroup has flushed events so we can - // advance its AppliedWatermark after the flush is fully finished. - resolvedGroups := make([]struct { - group *util.EventsGroup - maxCommitTs uint64 - }, 0) for tableID := range tableIDs { for _, progress := range w.progresses { g, ok := progress.eventsGroup[tableID] @@ -164,19 +158,7 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *commonEvent.DDLEvent) e } before := len(resolvedEvents) resolvedEvents = g.ResolveInto(commitTs, resolvedEvents) - resolvedCount := len(resolvedEvents) - before - if resolvedCount == 0 { - continue - } - - resolvedGroups = append(resolvedGroups, struct { - group *util.EventsGroup - maxCommitTs uint64 - }{ - group: g, - maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(), - }) - total += resolvedCount + total += len(resolvedEvents) - before } } @@ -204,11 +186,6 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *commonEvent.DDLEvent) e log.Info("flush DML events before DDL done", zap.Uint64("DDLCommitTs", commitTs), zap.Int("total", total), zap.Duration("duration", time.Since(start)), zap.Any("tables", tableIDs)) - for _, item := range resolvedGroups { - if item.maxCommitTs > item.group.AppliedWatermark { - item.group.AppliedWatermark = item.maxCommitTs - } - } return w.mysqlSink.WriteBlockEvent(ddl) case <-ticker.C: log.Warn("DML events cannot be flushed in time", @@ -286,29 +263,11 @@ func (w *writer) flushDMLEventsByWatermark(ctx context.Context) error { watermark := w.globalWatermark() resolvedEvents := make([]*commonEvent.DMLEvent, 0) - // resolvedGroups records which EventsGroup has flushed events so we can - // advance its AppliedWatermark after the flush is fully finished. - resolvedGroups := make([]struct { - group *util.EventsGroup - maxCommitTs uint64 - }, 0) for _, p := range w.progresses { for _, group := range p.eventsGroup { before := len(resolvedEvents) resolvedEvents = group.ResolveInto(watermark, resolvedEvents) - resolvedCount := len(resolvedEvents) - before - if resolvedCount == 0 { - continue - } - - resolvedGroups = append(resolvedGroups, struct { - group *util.EventsGroup - maxCommitTs uint64 - }{ - group: group, - maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(), - }) - total += resolvedCount + total += len(resolvedEvents) - before } } if total == 0 { @@ -334,11 +293,6 @@ func (w *writer) flushDMLEventsByWatermark(ctx context.Context) error { case <-done: log.Info("flush DML events done", zap.Uint64("watermark", watermark), zap.Int("total", total), zap.Duration("duration", time.Since(start))) - for _, item := range resolvedGroups { - if item.maxCommitTs > item.group.AppliedWatermark { - item.group.AppliedWatermark = item.maxCommitTs - } - } return nil case <-ticker.C: log.Warn("DML events cannot be flushed in time", zap.Uint64("watermark", watermark), @@ -510,33 +464,48 @@ func (w *writer) appendRow2Group(dml *commonEvent.DMLEvent, progress *partitionP group = util.NewEventsGroup(progress.partition, tableID) progress.eventsGroup[tableID] = group } - if commitTs <= group.AppliedWatermark { - log.Warn("DML event replayed after applied, ignore it", + if commitTs < progress.watermark { + log.Warn("DML Event fallback row, since less than the partition watermark, ignore it", zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition), - zap.Uint64("commitTs", commitTs), - zap.Uint64("appliedWatermark", group.AppliedWatermark), zap.Uint64("highWatermark", group.HighWatermark), - zap.Uint64("partitionWatermark", progress.watermark), - zap.String("schema", schema), zap.String("table", table), zap.Any("protocol", w.protocol)) + zap.Uint64("commitTs", commitTs), zap.Uint64("watermark", progress.watermark), + zap.String("schema", schema), zap.String("table", table)) + return + } + if commitTs >= group.HighWatermark { + group.Append(dml, false) + log.Info("DML event append to the group", + zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), + zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), + zap.Stringer("eventType", dml.RowTypes[0])) return } - forceInsert := commitTs < group.HighWatermark || commitTs < progress.watermark || w.enableTableAcrossNodes - if forceInsert { - log.Warn("DML event commit ts fallback, append with forceInsert", - zap.Int32("partition", group.Partition), + if w.enableTableAcrossNodes { + log.Warn("DML events fallback, but enableTableAcrossNodes is true, still append it", zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), - zap.Uint64("appliedWatermark", group.AppliedWatermark), - zap.Uint64("partitionWatermark", progress.watermark), zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), - zap.Stringer("eventType", dml.RowTypes[0]), zap.Any("protocol", w.protocol), - zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition)) + zap.Stringer("eventType", dml.RowTypes[0])) group.Append(dml, true) return } - group.Append(dml, false) - log.Info("DML event append to the group", - zap.Int32("partition", group.Partition), - zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), - zap.Uint64("appliedWatermark", group.AppliedWatermark), - zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), - zap.Stringer("eventType", dml.RowTypes[0])) + switch w.protocol { + case config.ProtocolCanalJSON: + // for partition table, the canal-json message cannot assign physical table id to each dml message, + // we cannot distinguish whether it's a real fallback event or not, still append it. + if w.partitionTableAccessor.IsPartitionTable(schema, table) { + log.Warn("DML events fallback, but it's canal-json and partition table, still append it", + zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), + zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), + zap.Stringer("eventType", dml.RowTypes[0])) + group.Append(dml, true) + return + } + log.Warn("DML event fallback row, since less than the group high watermark, ignore it", + zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), + zap.Any("partitionWatermark", progress.watermark), zap.Any("watermark", progress.watermark), + zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), + zap.Stringer("eventType", dml.RowTypes[0]), + zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition)) + default: + log.Panic("unknown protocol", zap.Any("protocol", w.protocol)) + } } diff --git a/cmd/pulsar-consumer/writer_test.go b/cmd/pulsar-consumer/writer_test.go index d1323b0157..6de6300074 100644 --- a/cmd/pulsar-consumer/writer_test.go +++ b/cmd/pulsar-consumer/writer_test.go @@ -18,14 +18,11 @@ import ( "testing" "github.com/golang/mock/gomock" - "github.com/pingcap/ticdc/cmd/util" sinkmock "github.com/pingcap/ticdc/downstreamadapter/sink/mock" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/config" codeccommon "github.com/pingcap/ticdc/pkg/sink/codec/common" timodel "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" ) @@ -265,58 +262,3 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) { require.Len(t, w.ddlList, 1) require.Equal(t, "CREATE TABLE `common_1`.`a` (`a` BIGINT PRIMARY KEY,`b` INT)", w.ddlList[0].Query) } - -func TestAppendRow2Group_DoesNotDropCommitTsFallbackBeforeApplied(t *testing.T) { - // Scenario: - // 1) TiCDC writes DML messages to Pulsar in commitTs order. - // 2) Under network partition / changefeed restart, TiCDC may replay older commitTs - // at a later time (commitTs appears to go backwards). - // - // The pulsar-consumer must not drop these "fallback commitTs" events unless they - // have already been flushed to downstream (AppliedWatermark), otherwise replayed - // messages cannot heal missing windows. - w := &writer{ - progresses: []*partitionProgress{ - { - partition: 0, - eventsGroup: make(map[int64]*util.EventsGroup), - }, - }, - protocol: config.ProtocolCanalJSON, - partitionTableAccessor: codeccommon.NewPartitionTableAccessor(), - } - - newDMLEvent := func(tableID int64, commitTs uint64) *commonEvent.DMLEvent { - return &commonEvent.DMLEvent{ - PhysicalTableID: tableID, - CommitTs: commitTs, - RowTypes: []common.RowType{common.RowTypeUpdate}, - Rows: chunk.NewChunkWithCapacity(nil, 0), - TableInfo: &common.TableInfo{ - TableName: common.TableName{Schema: "test", Table: "t"}, - }, - } - } - - progress := w.progresses[0] - - // Step 1: observe a larger commitTs first (e.g. produced before restart). - w.appendRow2Group(newDMLEvent(1, 200), progress) - - // Step 2: observe a smaller commitTs later (e.g. replayed after restart). - w.appendRow2Group(newDMLEvent(1, 100), progress) - - group := progress.eventsGroup[1] - require.NotNil(t, group) - - // Expect: commitTs=100 is still kept and can be resolved. - resolved := group.ResolveInto(150, nil) - require.Len(t, resolved, 1) - require.Equal(t, uint64(100), resolved[0].CommitTs) - - // Step 3: once downstream has flushed beyond commitTs=100, replay is safe to ignore. - group.AppliedWatermark = 200 - w.appendRow2Group(newDMLEvent(1, 100), progress) - resolved = group.ResolveInto(150, nil) - require.Empty(t, resolved) -} diff --git a/cmd/util/event_group.go b/cmd/util/event_group.go index 95ff621510..f81fb54e6f 100644 --- a/cmd/util/event_group.go +++ b/cmd/util/event_group.go @@ -18,7 +18,6 @@ import ( "sort" "github.com/pingcap/log" - "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "go.uber.org/zap" ) @@ -28,20 +27,8 @@ type EventsGroup struct { Partition int32 tableID int64 - events []*commonEvent.DMLEvent - // HighWatermark is the maximum CommitTs ever observed in this group. - // - // It is a "seen" watermark, not a "flushed/applied" watermark. When the - // consumer reads faster than it flushes to downstream, HighWatermark can be - // larger than AppliedWatermark. + events []*commonEvent.DMLEvent HighWatermark uint64 - // AppliedWatermark is the maximum CommitTs that has been successfully flushed - // to the downstream for this group. - // - // It is used to distinguish "safe to ignore" replays (CommitTs <= - // AppliedWatermark) from "still needed" events that arrive late due to sink - // retries / restarts. - AppliedWatermark uint64 } // NewEventsGroup will create new event group. @@ -64,47 +51,23 @@ func (g *EventsGroup) Append(row *commonEvent.DMLEvent, force bool) { lastDMLEvent = g.events[len(g.events)-1] } - mergeDMLEvent := func(dst, src *commonEvent.DMLEvent) { - dst.Rows.Append(src.Rows, 0, src.Rows.NumRows()) - dst.RowTypes = append(dst.RowTypes, src.RowTypes...) - dst.Length += src.Length - dst.PostTxnFlushed = append(dst.PostTxnFlushed, src.PostTxnFlushed...) - } - if lastDMLEvent == nil || lastDMLEvent.GetCommitTs() < row.GetCommitTs() { g.events = append(g.events, row) return } if lastDMLEvent.GetCommitTs() == row.GetCommitTs() { - mergeDMLEvent(lastDMLEvent, row) + lastDMLEvent.Rows.Append(row.Rows, 0, row.Rows.NumRows()) + lastDMLEvent.RowTypes = append(lastDMLEvent.RowTypes, row.RowTypes...) + lastDMLEvent.Length += row.Length + lastDMLEvent.PostTxnFlushed = append(lastDMLEvent.PostTxnFlushed, row.PostTxnFlushed...) return } if force { - // A smaller CommitTs can appear at a larger Kafka offset after a TiCDC - // restart/retry (at-least-once replay). In this case we need to insert the - // event by CommitTs order. If the CommitTs already exists, merge it so one - // upstream transaction isn't split into multiple downstream transactions. i := sort.Search(len(g.events), func(i int) bool { return g.events[i].CommitTs > row.CommitTs }) - if i > 0 && g.events[i-1].CommitTs == row.CommitTs { - previous := g.events[i-1] - // If the table info version is incompatible, we cannot merge the events, - // and the event may be replayed again after the table info is updated. - // So we just skip this event to avoid potential panic in the downstream. - if !compareTableInfo(previous, row) { - log.Warn("skip replayed DML event due to incompatible table info, the event may be replayed again after the table info is updated", - zap.Int32("partition", g.Partition), zap.Int64("tableID", g.tableID), - zap.Uint64("commitTs", row.CommitTs), - zap.Any("previous", previous), - zap.Any("now", row)) - return - } - mergeDMLEvent(previous, row) - return - } g.events = slices.Insert(g.events, i, row) return } @@ -113,17 +76,6 @@ func (g *EventsGroup) Append(row *commonEvent.DMLEvent, force bool) { zap.Uint64("lastCommitTs", lastDMLEvent.GetCommitTs()), zap.Uint64("commitTs", row.GetCommitTs())) } -func compareTableInfo(previous, now *commonEvent.DMLEvent) bool { - previousInfo := previous.TableInfo.ToTiDBTableInfo() - nowInfo := now.TableInfo.ToTiDBTableInfo() - if previousInfo.UpdateTS > nowInfo.UpdateTS { - log.Panic("previous dml event has bigger table info version", - zap.Any("previous", previous), - zap.Any("now", now)) - } - return common.NewColumnSchema4Decoder(previousInfo).SameWithTableInfo(nowInfo) -} - // ResolveInto appends all events with CommitTs <= resolve into dst and removes them from the group. // ResolveInto copies pointers into dst first, then clears the // resolved prefix so Go GC can reclaim resolved events once downstream is done with them. diff --git a/cmd/util/event_group_test.go b/cmd/util/event_group_test.go index f989cbaff9..8a731bcac6 100644 --- a/cmd/util/event_group_test.go +++ b/cmd/util/event_group_test.go @@ -16,54 +16,10 @@ package util import ( "testing" - "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - timodel "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" ) -func TestEventsGroupAppendForceMergesExistingCommitTs(t *testing.T) { - // Scenario: - // 1) An upstream transaction (commitTs=100) is split into multiple messages. - // 2) Due to sink retry/restart, a later transaction (commitTs=200) is observed first. - // 3) A "late" fragment of the commitTs=100 transaction arrives afterwards. - // - // The EventsGroup must merge the late fragment into the existing commitTs=100 event, - // instead of turning it into a second commitTs=100 item (which would split one upstream - // transaction into multiple downstream transactions). - group := NewEventsGroup(0, 1) - - newDMLEvent := func(commitTs uint64) *commonEvent.DMLEvent { - return &commonEvent.DMLEvent{ - CommitTs: commitTs, - RowTypes: []common.RowType{common.RowTypeUpdate}, - Rows: chunk.NewChunkWithCapacity(nil, 0), - Length: 0, - TableInfo: common.NewTableInfo4Decoder("test", &timodel.TableInfo{ - ID: 100, - Name: ast.NewCIStr("t"), - Columns: []*timodel.ColumnInfo{ - {Name: ast.NewCIStr("a")}, - }, - }), - } - } - - group.Append(newDMLEvent(100), false) - group.Append(newDMLEvent(200), false) - group.Append(newDMLEvent(100), true) - - require.Equal(t, uint64(200), group.HighWatermark) - - var dst []*commonEvent.DMLEvent - dst = group.ResolveInto(150, dst) - require.Len(t, dst, 1) - require.Equal(t, uint64(100), dst[0].CommitTs) - require.Len(t, dst[0].RowTypes, 2) -} - func TestEventsGroupResolveIntoAppendsAndClearsResolvedPrefix(t *testing.T) { // Scenario: A consumer resolves a prefix of events by watermark/commit-ts and appends them // into a downstream batch slice. We must clear the resolved prefix in the group's backing From 4c02cd7967cc867fccea7ad45c39ac815a52d191 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 18 Jun 2026 06:01:03 +0000 Subject: [PATCH 2/2] update Signed-off-by: wk989898 --- cmd/kafka-consumer/writer.go | 26 ++++++++++++-- cmd/kafka-consumer/writer_test.go | 56 ++++++++++++++++++++++++++++++ cmd/pulsar-consumer/writer.go | 26 ++++++++++++-- cmd/pulsar-consumer/writer_test.go | 51 +++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 4 deletions(-) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 00626f0956..8f6beff3bb 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -501,18 +501,40 @@ func (w *writer) onDDL(ddl *event.DDLEvent) { // e.g. create partition table + drop table(rename table) + create normal table: the partitionTableAccessor should drop the table when the table become normal. switch model.ActionType(ddl.Type) { case model.ActionCreateTable: + if w.markPartitionTableFromDDL(ddl) { + return + } stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "") if err != nil { log.Panic("parse ddl query failed", zap.String("query", ddl.Query), zap.Error(err)) } if v, ok := stmt.(*ast.CreateTableStmt); ok && v.Partition != nil { - w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName()) + w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName()) } case model.ActionRenameTable: if w.partitionTableAccessor.IsPartitionTable(ddl.ExtraSchemaName, ddl.ExtraTableName) { - w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName()) + w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName()) } + w.markPartitionTableFromDDL(ddl) + } +} + +func (w *writer) markPartitionTableFromDDL(ddl *event.DDLEvent) bool { + if ddl.TableInfo == nil || !ddl.TableInfo.IsPartitionTable() { + return false + } + + w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName()) + w.addPartitionTable(ddl.TableInfo.GetSchemaName(), ddl.TableInfo.GetTableName()) + w.addPartitionTable(ddl.TableInfo.GetTargetSchemaName(), ddl.TableInfo.GetTargetTableName()) + return true +} + +func (w *writer) addPartitionTable(schema, table string) { + if schema == "" || table == "" { + return } + w.partitionTableAccessor.Add(schema, table) } func (w *writer) checkPartition(row *event.DMLEvent, partition int32, offset kafka.Offset) { diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go index 0ce56a7c82..0ff0c3d859 100644 --- a/cmd/kafka-consumer/writer_test.go +++ b/cmd/kafka-consumer/writer_test.go @@ -17,12 +17,17 @@ import ( "context" "testing" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/golang/mock/gomock" + "github.com/pingcap/ticdc/cmd/util" + "github.com/pingcap/ticdc/downstreamadapter/sink/eventrouter" sinkmock "github.com/pingcap/ticdc/downstreamadapter/sink/mock" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" codeccommon "github.com/pingcap/ticdc/pkg/sink/codec/common" timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" ) @@ -262,3 +267,54 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) { require.Len(t, w.ddlList, 1) require.Equal(t, "CREATE TABLE `common_1`.`a` (`a` BIGINT PRIMARY KEY,`b` INT)", w.ddlList[0].Query) } + +func TestOnDDLMarksRoutedCreateTableLikePartitionTableForAvro(t *testing.T) { + replicaCfg := config.GetDefaultReplicaConfig() + eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, true) + require.NoError(t, err) + + w := &writer{ + progresses: []*partitionProgress{{partition: 0, eventsGroup: make(map[int64]*util.EventsGroup)}}, + eventRouter: eventRouter, + protocol: config.ProtocolAvro, + partitionTableAccessor: codeccommon.NewPartitionTableAccessor(), + } + + ddl := &commonEvent.DDLEvent{ + Query: "CREATE TABLE `target`.`dst` LIKE `target`.`src`", + SchemaName: "source", + TableName: "dst", + Type: byte(timodel.ActionCreateTable), + TableInfo: &common.TableInfo{ + TableName: common.TableName{ + Schema: "source", + Table: "dst", + IsPartition: true, + TargetSchema: "target", + TargetTable: "dst", + }, + }, + } + w.onDDL(ddl) + require.True(t, w.partitionTableAccessor.IsPartitionTable("target", "dst")) + + newDMLEvent := func(commitTs uint64) *commonEvent.DMLEvent { + return &commonEvent.DMLEvent{ + PhysicalTableID: 1, + CommitTs: commitTs, + RowTypes: []common.RowType{common.RowTypeUpdate}, + Rows: chunk.NewChunkWithCapacity(nil, 0), + TableInfo: &common.TableInfo{ + TableName: common.TableName{Schema: "target", Table: "dst"}, + }, + } + } + + progress := w.progresses[0] + w.appendRow2Group(newDMLEvent(200), progress, kafka.Offset(10)) + w.appendRow2Group(newDMLEvent(100), progress, kafka.Offset(11)) + + resolved := progress.eventsGroup[1].ResolveInto(150, nil) + require.Len(t, resolved, 1) + require.Equal(t, uint64(100), resolved[0].CommitTs) +} diff --git a/cmd/pulsar-consumer/writer.go b/cmd/pulsar-consumer/writer.go index bc30fb9752..fed72367e8 100644 --- a/cmd/pulsar-consumer/writer.go +++ b/cmd/pulsar-consumer/writer.go @@ -438,18 +438,40 @@ func (w *writer) onDDL(ddl *commonEvent.DDLEvent) { // e.g. create partition table + drop table(rename table) + create normal table: the partitionTableAccessor should drop the table when the table become normal. switch timodel.ActionType(ddl.Type) { case timodel.ActionCreateTable: + if w.markPartitionTableFromDDL(ddl) { + return + } stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "") if err != nil { log.Panic("parse ddl query failed", zap.String("query", ddl.Query), zap.Error(err)) } if v, ok := stmt.(*ast.CreateTableStmt); ok && v.Partition != nil { - w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName()) + w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName()) } case timodel.ActionRenameTable: if w.partitionTableAccessor.IsPartitionTable(ddl.ExtraSchemaName, ddl.ExtraTableName) { - w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName()) + w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName()) } + w.markPartitionTableFromDDL(ddl) + } +} + +func (w *writer) markPartitionTableFromDDL(ddl *commonEvent.DDLEvent) bool { + if ddl.TableInfo == nil || !ddl.TableInfo.IsPartitionTable() { + return false + } + + w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName()) + w.addPartitionTable(ddl.TableInfo.GetSchemaName(), ddl.TableInfo.GetTableName()) + w.addPartitionTable(ddl.TableInfo.GetTargetSchemaName(), ddl.TableInfo.GetTargetTableName()) + return true +} + +func (w *writer) addPartitionTable(schema, table string) { + if schema == "" || table == "" { + return } + w.partitionTableAccessor.Add(schema, table) } func (w *writer) appendRow2Group(dml *commonEvent.DMLEvent, progress *partitionProgress) { diff --git a/cmd/pulsar-consumer/writer_test.go b/cmd/pulsar-consumer/writer_test.go index 6de6300074..9e83ae63d9 100644 --- a/cmd/pulsar-consumer/writer_test.go +++ b/cmd/pulsar-consumer/writer_test.go @@ -18,11 +18,14 @@ import ( "testing" "github.com/golang/mock/gomock" + "github.com/pingcap/ticdc/cmd/util" sinkmock "github.com/pingcap/ticdc/downstreamadapter/sink/mock" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" codeccommon "github.com/pingcap/ticdc/pkg/sink/codec/common" timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" ) @@ -262,3 +265,51 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) { require.Len(t, w.ddlList, 1) require.Equal(t, "CREATE TABLE `common_1`.`a` (`a` BIGINT PRIMARY KEY,`b` INT)", w.ddlList[0].Query) } + +func TestOnDDLMarksRoutedCreateTableLikePartitionTable(t *testing.T) { + w := &writer{ + progresses: []*partitionProgress{ + {partition: 0, eventsGroup: make(map[int64]*util.EventsGroup)}, + }, + protocol: config.ProtocolCanalJSON, + partitionTableAccessor: codeccommon.NewPartitionTableAccessor(), + } + + ddl := &commonEvent.DDLEvent{ + Query: "CREATE TABLE `target`.`dst` LIKE `target`.`src`", + SchemaName: "source", + TableName: "dst", + Type: byte(timodel.ActionCreateTable), + TableInfo: &common.TableInfo{ + TableName: common.TableName{ + Schema: "source", + Table: "dst", + IsPartition: true, + TargetSchema: "target", + TargetTable: "dst", + }, + }, + } + w.onDDL(ddl) + require.True(t, w.partitionTableAccessor.IsPartitionTable("target", "dst")) + + newDMLEvent := func(commitTs uint64) *commonEvent.DMLEvent { + return &commonEvent.DMLEvent{ + PhysicalTableID: 1, + CommitTs: commitTs, + RowTypes: []common.RowType{common.RowTypeUpdate}, + Rows: chunk.NewChunkWithCapacity(nil, 0), + TableInfo: &common.TableInfo{ + TableName: common.TableName{Schema: "target", Table: "dst"}, + }, + } + } + + progress := w.progresses[0] + w.appendRow2Group(newDMLEvent(200), progress) + w.appendRow2Group(newDMLEvent(100), progress) + + resolved := progress.eventsGroup[1].ResolveInto(150, nil) + require.Len(t, resolved, 1) + require.Equal(t, uint64(100), resolved[0].CommitTs) +}