Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 81 additions & 74 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *event.DDLEvent) error {
tableIDs := w.getBlockTableIDs(ddl)
commitTs := ddl.GetCommitTs()
resolvedEvents := make([]*event.DMLEvent, 0)
// resolvedGroups records which EventsGroup has flushed events so we can
// advance its AppliedWatermark after the flush is fully finished.
resolvedGroups := make([]struct {
group *util.EventsGroup
maxCommitTs uint64
}, 0)
for tableID := range tableIDs {
for _, progress := range w.progresses {
g, ok := progress.eventsGroup[tableID]
Expand All @@ -171,19 +165,7 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *event.DDLEvent) error {
}
before := len(resolvedEvents)
resolvedEvents = g.ResolveInto(commitTs, resolvedEvents)
resolvedCount := len(resolvedEvents) - before
if resolvedCount == 0 {
continue
}

resolvedGroups = append(resolvedGroups, struct {
group *util.EventsGroup
maxCommitTs uint64
}{
group: g,
maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(),
})
total += resolvedCount
total += len(resolvedEvents) - before
}
}

Expand Down Expand Up @@ -211,11 +193,6 @@ func (w *writer) flushDDLEvent(ctx context.Context, ddl *event.DDLEvent) error {
log.Info("flush DML events before DDL done", zap.Uint64("DDLCommitTs", commitTs),
zap.Int("total", total), zap.Duration("duration", time.Since(start)),
zap.Any("tables", tableIDs))
for _, item := range resolvedGroups {
if item.maxCommitTs > item.group.AppliedWatermark {
item.group.AppliedWatermark = item.maxCommitTs
}
}
return w.mysqlSink.WriteBlockEvent(ddl)
case <-ticker.C:
log.Warn("DML events cannot be flushed in time",
Expand Down Expand Up @@ -293,29 +270,11 @@ func (w *writer) flushDMLEventsByWatermark(ctx context.Context) error {

watermark := w.globalWatermark()
resolvedEvents := make([]*event.DMLEvent, 0)
// resolvedGroups records which EventsGroup has flushed events so we can
// advance its AppliedWatermark after the flush is fully finished.
resolvedGroups := make([]struct {
group *util.EventsGroup
maxCommitTs uint64
}, 0)
for _, p := range w.progresses {
for _, group := range p.eventsGroup {
before := len(resolvedEvents)
resolvedEvents = group.ResolveInto(watermark, resolvedEvents)
resolvedCount := len(resolvedEvents) - before
if resolvedCount == 0 {
continue
}

resolvedGroups = append(resolvedGroups, struct {
group *util.EventsGroup
maxCommitTs uint64
}{
group: group,
maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(),
})
total += resolvedCount
total += len(resolvedEvents) - before
}
}
if total == 0 {
Expand Down Expand Up @@ -343,11 +302,6 @@ func (w *writer) flushDMLEventsByWatermark(ctx context.Context) error {
case <-done:
log.Info("flush DML events done", zap.Uint64("watermark", watermark),
zap.Int("total", total), zap.Duration("duration", time.Since(start)))
for _, item := range resolvedGroups {
if item.maxCommitTs > item.group.AppliedWatermark {
item.group.AppliedWatermark = item.maxCommitTs
}
}
return nil
case <-ticker.C:
log.Warn("DML events cannot be flushed in time", zap.Uint64("watermark", watermark),
Expand Down Expand Up @@ -547,18 +501,40 @@ func (w *writer) onDDL(ddl *event.DDLEvent) {
// e.g. create partition table + drop table(rename table) + create normal table: the partitionTableAccessor should drop the table when the table become normal.
switch model.ActionType(ddl.Type) {
case model.ActionCreateTable:
if w.markPartitionTableFromDDL(ddl) {
return
}
stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "")
if err != nil {
log.Panic("parse ddl query failed", zap.String("query", ddl.Query), zap.Error(err))
}
if v, ok := stmt.(*ast.CreateTableStmt); ok && v.Partition != nil {
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName())
}
case model.ActionRenameTable:
if w.partitionTableAccessor.IsPartitionTable(ddl.ExtraSchemaName, ddl.ExtraTableName) {
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName())
}
w.markPartitionTableFromDDL(ddl)
}
}

func (w *writer) markPartitionTableFromDDL(ddl *event.DDLEvent) bool {
if ddl.TableInfo == nil || !ddl.TableInfo.IsPartitionTable() {
return false
}

w.addPartitionTable(ddl.GetSchemaName(), ddl.GetTableName())
w.addPartitionTable(ddl.TableInfo.GetSchemaName(), ddl.TableInfo.GetTableName())
w.addPartitionTable(ddl.TableInfo.GetTargetSchemaName(), ddl.TableInfo.GetTargetTableName())
return true
}

func (w *writer) addPartitionTable(schema, table string) {
if schema == "" || table == "" {
return
}
w.partitionTableAccessor.Add(schema, table)
}

func (w *writer) checkPartition(row *event.DMLEvent, partition int32, offset kafka.Offset) {
Expand Down Expand Up @@ -603,39 +579,70 @@ func (w *writer) appendRow2Group(dml *event.DMLEvent, progress *partitionProgres
group = util.NewEventsGroup(progress.partition, tableID)
progress.eventsGroup[tableID] = group
}
// IMPORTANT: Kafka offsets are append-only, but CommitTs can go backwards after
// a TiCDC restart/retry (at-least-once replay). We must not drop such events
// solely based on a "seen" watermark (e.g. HighWatermark). The only safe
// ignore condition is "already flushed to downstream".
if commitTs <= group.AppliedWatermark {
log.Warn("DML event replayed after applied, ignore it",
if commitTs < progress.watermark {
log.Warn("DML Event fallback row, since less than the partition watermark, ignore it",
zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition),
zap.Uint64("commitTs", commitTs), zap.Any("offset", offset),
zap.Uint64("appliedWatermark", group.AppliedWatermark), zap.Uint64("highWatermark", group.HighWatermark),
zap.Uint64("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table), zap.Any("protocol", w.protocol))
zap.Uint64("watermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table))
return
}
forceInsert := commitTs < group.HighWatermark || commitTs < progress.watermark || w.enableTableAcrossNodes
if forceInsert {
log.Warn("DML event commit ts fallback, append with forceInsert",
if commitTs >= group.HighWatermark {
group.Append(dml, false)
log.Debug("DML event append to the group",
zap.Int32("partition", group.Partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.Uint64("appliedWatermark", group.AppliedWatermark),
zap.Uint64("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.Uint64("commitTs", commitTs), zap.Uint64("HighWatermark", group.HighWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]))
return
}
if w.enableTableAcrossNodes {
log.Warn("DML events fallback, but enableTableAcrossNodes is true, still append it",
zap.Int32("partition", group.Partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("HighWatermark", group.HighWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]), zap.Any("protocol", w.protocol),
zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition))
zap.Stringer("eventType", dml.RowTypes[0]))
group.Append(dml, true)
return
}
group.Append(dml, false)
log.Debug("DML event append to the group",
zap.Int32("partition", group.Partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.Uint64("appliedWatermark", group.AppliedWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]))
switch w.protocol {
case config.ProtocolSimple, config.ProtocolDebezium:
// simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// also consider the table partition.
// open protocol set the partition table id if the table is partitioned.
// for normal table, the table id is generated by the fake table id generator by using schema and table name.
// so one event group for one normal table or one table partition, replayed messages can be ignored.
log.Warn("DML event fallback row, since less than the group high watermark, ignore it",
zap.Int32("partition", progress.partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.Any("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]),
// zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition))
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro:
// for partition table, the canal-json, avro and open-protocol message cannot assign physical table id to each dml message,
// we cannot distinguish whether it's a real fallback event or not, still append it.
if w.partitionTableAccessor.IsPartitionTable(schema, table) {
log.Warn("DML events fallback, but it's canal-json, avro or open-protocol and the table is a partition table, still append it",
zap.Int32("partition", group.Partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]))
group.Append(dml, true)
return
}
log.Warn("DML event fallback row, since less than the group high watermark, ignore it",
zap.Int32("partition", progress.partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("HighWatermark", group.HighWatermark),
zap.Any("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]),
// zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition))
default:
log.Panic("unknown protocol", zap.Any("protocol", w.protocol))
}
}

func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
Expand Down
76 changes: 35 additions & 41 deletions cmd/kafka-consumer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
codecCommon "github.com/pingcap/ticdc/pkg/sink/codec/common"
codeccommon "github.com/pingcap/ticdc/pkg/sink/codec/common"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestWriterWrite_executesIndependentCreateTableWithoutWatermark(t *testing.T
},
}

w.Write(ctx, codecCommon.MessageTypeDDL)
w.Write(ctx, codeccommon.MessageTypeDDL)

require.Equal(t, []string{"CREATE TABLE `test`.`t` (`id` INT PRIMARY KEY)"}, *ddls)
require.Empty(t, w.ddlList)
Expand Down Expand Up @@ -128,12 +128,12 @@ func TestWriterWrite_preservesOrderWhenBlockedDDLNotReady(t *testing.T) {
},
}

w.Write(ctx, codecCommon.MessageTypeDDL)
w.Write(ctx, codeccommon.MessageTypeDDL)
require.Empty(t, *ddls)
require.Len(t, w.ddlList, 2)

p.watermark = 200
w.Write(ctx, codecCommon.MessageTypeDDL)
w.Write(ctx, codeccommon.MessageTypeDDL)
require.Equal(t, []string{
"ALTER TABLE `test`.`t` ADD COLUMN `c2` INT",
"CREATE TABLE `test`.`t2` (`id` INT PRIMARY KEY)",
Expand Down Expand Up @@ -174,12 +174,12 @@ func TestWriterWrite_doesNotBypassWatermarkForCreateTableLike(t *testing.T) {
},
}

w.Write(ctx, codecCommon.MessageTypeDDL)
w.Write(ctx, codeccommon.MessageTypeDDL)
require.Empty(t, *ddls)
require.Len(t, w.ddlList, 1)

p.watermark = 200
w.Write(ctx, codecCommon.MessageTypeDDL)
w.Write(ctx, codeccommon.MessageTypeDDL)
require.Equal(t, []string{"CREATE TABLE `test`.`t2` LIKE `test`.`t1`"}, *ddls)
require.Empty(t, w.ddlList)
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) {
},
}

w.Write(ctx, codecCommon.MessageTypeDDL)
w.Write(ctx, codeccommon.MessageTypeDDL)

require.Equal(t, []string{
"CREATE TABLE `common_1`.`add_and_drop_columns` (`id` INT(11) NOT NULL PRIMARY KEY)",
Expand All @@ -268,59 +268,53 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) {
require.Equal(t, "CREATE TABLE `common_1`.`a` (`a` BIGINT PRIMARY KEY,`b` INT)", w.ddlList[0].Query)
}

func TestAppendRow2Group_DoesNotDropCommitTsFallbackBeforeApplied(t *testing.T) {
// Scenario:
// 1) TiCDC writes DML messages to Kafka in commitTs order.
// 2) Under network partition / changefeed restart, TiCDC may replay older commitTs,
// which will be appended to Kafka at a larger offset (commitTs appears to go backwards).
//
// The kafka-consumer must not drop these "fallback commitTs" events unless they have
// already been flushed to downstream (AppliedWatermark), otherwise the replay cannot
// heal the missing window.
func TestOnDDLMarksRoutedCreateTableLikePartitionTableForAvro(t *testing.T) {
replicaCfg := config.GetDefaultReplicaConfig()
eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, false)
eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, true)
require.NoError(t, err)

w := &writer{
progresses: []*partitionProgress{{partition: 0, eventsGroup: make(map[int64]*util.EventsGroup)}},
eventRouter: eventRouter,
protocol: config.ProtocolCanalJSON,
partitionTableAccessor: codecCommon.NewPartitionTableAccessor(),
protocol: config.ProtocolAvro,
partitionTableAccessor: codeccommon.NewPartitionTableAccessor(),
}

ddl := &commonEvent.DDLEvent{
Query: "CREATE TABLE `target`.`dst` LIKE `target`.`src`",
SchemaName: "source",
TableName: "dst",
Type: byte(timodel.ActionCreateTable),
TableInfo: &common.TableInfo{
TableName: common.TableName{
Schema: "source",
Table: "dst",
IsPartition: true,
TargetSchema: "target",
TargetTable: "dst",
},
},
}
w.onDDL(ddl)
require.True(t, w.partitionTableAccessor.IsPartitionTable("target", "dst"))

newDMLEvent := func(tableID int64, commitTs uint64) *commonEvent.DMLEvent {
newDMLEvent := func(commitTs uint64) *commonEvent.DMLEvent {
return &commonEvent.DMLEvent{
PhysicalTableID: tableID,
PhysicalTableID: 1,
CommitTs: commitTs,
RowTypes: []common.RowType{common.RowTypeUpdate},
Rows: chunk.NewChunkWithCapacity(nil, 0),
TableInfo: &common.TableInfo{
TableName: common.TableName{Schema: "test", Table: "t"},
TableName: common.TableName{Schema: "target", Table: "dst"},
},
}
}

progress := w.progresses[0]
w.appendRow2Group(newDMLEvent(200), progress, kafka.Offset(10))
w.appendRow2Group(newDMLEvent(100), progress, kafka.Offset(11))

// Step 1: observe a larger commitTs first (e.g. produced before restart).
w.appendRow2Group(newDMLEvent(1, 200), progress, kafka.Offset(10))

// Step 2: observe a smaller commitTs later (e.g. replayed after restart).
w.appendRow2Group(newDMLEvent(1, 100), progress, kafka.Offset(11))

group := progress.eventsGroup[1]
require.NotNil(t, group)

resolvedEvents := make([]*commonEvent.DMLEvent, 0)
// Expect: commitTs=100 is still kept and can be resolved.
resolved := group.ResolveInto(150, nil)
resolved := progress.eventsGroup[1].ResolveInto(150, nil)
require.Len(t, resolved, 1)
require.Equal(t, uint64(100), resolved[0].CommitTs)

// Step 3: once downstream has flushed beyond commitTs=100, the replay is safe to ignore.
resolvedEvents = make([]*commonEvent.DMLEvent, 0)
group.AppliedWatermark = 200
w.appendRow2Group(newDMLEvent(1, 100), progress, kafka.Offset(12))
resolved = group.ResolveInto(150, resolvedEvents)
require.Empty(t, resolved)
}
Loading
Loading