diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index ca94eef006..52c44ace2e 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -682,6 +682,9 @@ func (t *TxnEvent) AppendRow( return t.CurrentDMLEvent.AppendRow(rawEvent, decode, filter, filterContext) } +// dmlTypeFilterCacheSize follows common.RowType iota values: delete, insert, update. +const dmlTypeFilterCacheSize = int(common.RowTypeUpdate) + 1 + // dmlProcessor handles DML event processing and batching type dmlProcessor struct { mounter event.Mounter @@ -689,6 +692,13 @@ type dmlProcessor struct { filter filter.Filter filterContext filter.DMLFilterContext + // dmlTypeFilterCache caches the pre-decode filter result within the current transaction. + // The cache is reset when a new transaction starts. It is safe because tableInfo + // and startTs are fixed for the current transaction. + dmlTypeFilterCache [dmlTypeFilterCacheSize]struct { + valid bool + ignore bool + } // insertRowCache is used to cache the split update event's insert part of the current transaction. // It will be used to append to the current DML event when the transaction is finished. @@ -737,6 +747,7 @@ func (p *dmlProcessor) startTxn( if p.currentTxn != nil { log.Panic("there is a transaction not flushed yet") } + p.resetDMLTypeFilterCache() var err error p.currentTxn, err = newTxnEvent(p.batchDML, dispatcherID, tableID, tableInfo, startTs, commitTs, shouldSplitTxn) return err @@ -790,6 +801,13 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error { rawType := rawEvent.GetType() if !rawEvent.IsUpdate() { updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) + ignore, err := p.shouldIgnoreRawEventByDMLType(rawEvent) + if err != nil { + return err + } + if ignore { + return nil + } return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter, p.filterContext) } @@ -797,6 +815,15 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error { shouldSplit bool err error ) + ignore, err := p.shouldIgnoreDMLByEventType(common.RowTypeUpdate, rawEvent.StartTs) + if err != nil { + return err + } + if ignore { + updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) + return nil + } + if !p.outputRawChangeEvent { shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo) if err != nil { @@ -817,10 +844,73 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error { if err != nil { return err } - p.insertRowCache = append(p.insertRowCache, insertRow) + ignoreInsert, err := p.shouldIgnoreRawEventByDMLType(insertRow) + if err != nil { + return err + } + if !ignoreInsert { + p.insertRowCache = append(p.insertRowCache, insertRow) + } + ignoreDelete, err := p.shouldIgnoreRawEventByDMLType(deleteRow) + if err != nil { + return err + } + if ignoreDelete { + return nil + } return p.currentTxn.AppendRow(deleteRow, p.mounter.DecodeToChunk, p.filter, p.filterContext) } +func (p *dmlProcessor) shouldIgnoreRawEventByDMLType(rawEvent *common.RawKVEntry) (bool, error) { + rowType := common.RowTypeInsert + if rawEvent.IsDelete() { + rowType = common.RowTypeDelete + } else if rawEvent.IsUpdate() { + rowType = common.RowTypeUpdate + } + return p.shouldIgnoreDMLByEventType(rowType, rawEvent.StartTs) +} + +func (p *dmlProcessor) shouldIgnoreDMLByEventType(rowType common.RowType, startTs uint64) (bool, error) { + idx := int(rowType) + if idx >= 0 && idx < len(p.dmlTypeFilterCache) { + if p.dmlTypeFilterCache[idx].valid { + return p.dmlTypeFilterCache[idx].ignore, nil + } + } + + if p.filter == nil { + p.setDMLTypeFilterCache(rowType, false) + return false, nil + } + ignore, err := p.filter.ShouldIgnoreDMLByEventType( + rowType, + p.currentTxn.CurrentDMLEvent.TableInfo, + startTs, + ) + if err != nil { + return false, errors.Trace(err) + } + p.setDMLTypeFilterCache(rowType, ignore) + return ignore, nil +} + +func (p *dmlProcessor) setDMLTypeFilterCache(rowType common.RowType, ignore bool) { + idx := int(rowType) + if idx < 0 || idx >= len(p.dmlTypeFilterCache) { + return + } + p.dmlTypeFilterCache[idx].valid = true + p.dmlTypeFilterCache[idx].ignore = ignore +} + +func (p *dmlProcessor) resetDMLTypeFilterCache() { + for i := range p.dmlTypeFilterCache { + p.dmlTypeFilterCache[i].valid = false + p.dmlTypeFilterCache[i].ignore = false + } +} + // getCurrentBatchDML returns the current batch DML event func (p *dmlProcessor) getCurrentBatchDML() *event.BatchDMLEvent { return p.batchDML diff --git a/pkg/eventservice/event_scanner_benchmark_test.go b/pkg/eventservice/event_scanner_benchmark_test.go new file mode 100644 index 0000000000..1bd9f3e5ae --- /dev/null +++ b/pkg/eventservice/event_scanner_benchmark_test.go @@ -0,0 +1,235 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventservice + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/eventpb" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/logservice/eventstore" + bf "github.com/pingcap/ticdc/pkg/binlog-filter" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/filter" + "github.com/pingcap/ticdc/pkg/integrity" +) + +type disableDMLTypeFastPathFilter struct { + filter.Filter +} + +func (f disableDMLTypeFastPathFilter) ShouldIgnoreDMLByEventType( + common.RowType, + *common.TableInfo, + uint64, +) (bool, error) { + return false, nil +} + +type benchmarkEventGetter struct { + raw *common.RawKVEntry + rows int +} + +func (g *benchmarkEventGetter) GetIterator( + common.DispatcherID, + common.DataRange, +) (eventstore.EventIterator, error) { + return &singleTxnIterator{ + raw: g.raw, + rows: g.rows, + }, nil +} + +type singleTxnIterator struct { + raw *common.RawKVEntry + rows int + next int +} + +func (i *singleTxnIterator) Next() (*common.RawKVEntry, bool) { + if i.next >= i.rows { + return nil, false + } + isNewTxn := i.next == 0 + i.next++ + return i.raw, isNewTxn +} + +func (i *singleTxnIterator) Close() (int64, error) { + return int64(i.next), nil +} + +func newBenchmarkDispatcherInfo( + startTs uint64, + dispatcherID common.DispatcherID, + tableID int64, +) *mockDispatcherInfo { + return &mockDispatcherInfo{ + clusterID: 1, + serverID: "server1", + id: dispatcherID, + changefeedID: common.NewChangefeedID4Test("default", "bench"), + topic: "topic1", + span: &heartbeatpb.TableSpan{ + TableID: tableID, + StartKey: []byte("a"), + EndKey: []byte("z"), + }, + startTs: startTs, + actionType: eventpb.ActionType_ACTION_TYPE_REGISTER, + filterConfig: &eventpb.FilterConfig{ + FilterConfig: &eventpb.InnerFilterConfig{ + Rules: []string{"*.*"}, + }, + }, + bdrMode: false, + integrity: config.GetDefaultReplicaConfig().Integrity, + } +} + +func BenchmarkDMLProcessorIgnoreDelete(b *testing.B) { + helper := event.NewEventTestHelper(b) + defer helper.Close() + + ddlEvent, kvEvents := genEvents(helper, `create table test.t(id int primary key, c char(50))`, + `insert into test.t(id,c) values (1, "c1")`) + tableInfo := ddlEvent.TableInfo + tableID := ddlEvent.GetTableID() + deleteRow := insertToDeleteRow(kvEvents[0]) + dispatcherID := common.NewDispatcherID() + mockSchemaGetter := NewMockSchemaStore() + mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) + + ignoreDeleteFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + if err != nil { + b.Fatal(err) + } + + bench := func(b *testing.B, changefeedFilter filter.Filter) { + processor := newDMLProcessor( + event.NewMounter(time.UTC, &integrity.Config{}), + mockSchemaGetter, + changefeedFilter, + false, + common.DefaultMode, + false, + ) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := processor.startTxn(dispatcherID, tableID, tableInfo, deleteRow.StartTs, deleteRow.CRTs, false); err != nil { + b.Fatal(err) + } + if err := processor.appendRow(deleteRow); err != nil { + b.Fatal(err) + } + if err := processor.commitTxn(); err != nil { + b.Fatal(err) + } + processor.resetBatchDML() + } + } + + b.Run("fast_path", func(b *testing.B) { + bench(b, ignoreDeleteFilter) + }) + b.Run("after_decode", func(b *testing.B) { + bench(b, disableDMLTypeFastPathFilter{Filter: ignoreDeleteFilter}) + }) +} + +func BenchmarkEventScannerIgnoreDelete(b *testing.B) { + helper := event.NewEventTestHelper(b) + defer helper.Close() + + ddlEvent, kvEvents := genEvents(helper, `create table test.t(id int primary key, c char(50))`, + `insert into test.t(id,c) values (1, "c1")`) + tableID := ddlEvent.GetTableID() + deleteRow := insertToDeleteRow(kvEvents[0]) + dispatcherID := common.NewDispatcherID() + mockSchemaGetter := NewMockSchemaStore() + mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) + + ignoreDeleteFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + if err != nil { + b.Fatal(err) + } + + rowsPerScan := 50000 + bench := func(b *testing.B, changefeedFilter filter.Filter) { + disInfo := newBenchmarkDispatcherInfo( + ddlEvent.FinishedTs, + dispatcherID, + tableID, + ) + changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0) + changefeedStatus.filter = changefeedFilter + disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) + makeDispatcherReady(disp) + scanner := newEventScanner( + &benchmarkEventGetter{raw: deleteRow, rows: rowsPerScan}, + mockSchemaGetter, + event.NewMounter(time.UTC, &integrity.Config{}), + common.DefaultMode, + ) + dataRange := common.DataRange{ + Span: disInfo.GetTableSpan(), + CommitTsStart: ddlEvent.FinishedTs, + CommitTsEnd: deleteRow.CRTs + 1, + } + limit := scanLimit{maxDMLBytes: 1 << 60} + + b.ReportAllocs() + b.ResetTimer() + start := time.Now() + for i := 0; i < b.N; i++ { + _, _, interrupted, err := scanner.scan(context.Background(), disp, dataRange, limit) + if err != nil { + b.Fatal(err) + } + if interrupted { + b.Fatal("scan interrupted") + } + } + b.ReportMetric(float64(b.N*rowsPerScan)/time.Since(start).Seconds(), "rows/s") + } + + b.Run("fast_path", func(b *testing.B) { + bench(b, ignoreDeleteFilter) + }) + b.Run("after_decode", func(b *testing.B) { + bench(b, disableDMLTypeFastPathFilter{Filter: ignoreDeleteFilter}) + }) +} diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 28520686b2..942728af2a 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -23,8 +23,10 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/eventstore" "github.com/pingcap/ticdc/logservice/schemastore" + bf "github.com/pingcap/ticdc/pkg/binlog-filter" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/integrity" "github.com/pingcap/tidb/pkg/util/chunk" @@ -34,6 +36,21 @@ import ( type mockMounter struct { event.Mounter + decodeCount atomic.Int64 +} + +type countingDMLTypeFilter struct { + filter.Filter + dmlTypeCallCount atomic.Int64 +} + +func (f *countingDMLTypeFilter) ShouldIgnoreDMLByEventType( + dmlType common.RowType, + tableInfo *common.TableInfo, + startTs uint64, +) (bool, error) { + f.dmlTypeCallCount.Inc() + return f.Filter.ShouldIgnoreDMLByEventType(dmlType, tableInfo, startTs) } type stubEventGetter struct { @@ -52,6 +69,7 @@ func makeDispatcherReady(disp *dispatcherStat) { } func (m *mockMounter) DecodeToChunk(rawKV *common.RawKVEntry, tableInfo *common.TableInfo, chk *chunk.Chunk) (int, *integrity.Checksum, error) { + m.decodeCount.Inc() if rawKV.IsUpdate() { return 2, nil, nil } else { @@ -964,13 +982,13 @@ func TestDMLProcessorAppendRow(t *testing.T) { dispatcherID := common.NewDispatcherID() // Create a mock mounter and schema getter - mockMounter := &mockMounter{} + testMounter := &mockMounter{} mockSchemaGetter := NewMockSchemaStore() mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) // Test case 1: appendRow before txn started, illegal usage. t.Run("NoCurrentDMLEvent", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) rawEvent := kvEvents[0] require.Panics(t, func() { @@ -980,7 +998,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 2: appendRow for insert operation (non-update) t.Run("AppendInsertRow", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) firstEvent := kvEvents[0] processor.startTxn(dispatcherID, tableID, tableInfo, firstEvent.StartTs, firstEvent.CRTs, false) @@ -998,7 +1016,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 3: appendRow for delete operation (non-update) t.Run("AppendDeleteRow", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) rawEvent := kvEvents[0] deleteRow := insertToDeleteRow(rawEvent) @@ -1015,9 +1033,75 @@ func TestDMLProcessorAppendRow(t *testing.T) { require.Empty(t, processor.insertRowCache) }) + t.Run("IgnoreDeleteByEventTypeSkipsDecode", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + processor := newDMLProcessor(mounter, mockSchemaGetter, changefeedFilter, false, common.DefaultMode, false) + + rawEvent := kvEvents[0] + deleteRow := insertToDeleteRow(rawEvent) + + processor.startTxn(dispatcherID, tableID, tableInfo, rawEvent.StartTs, rawEvent.CRTs, false) + err = processor.appendRow(deleteRow) + require.NoError(t, err) + + require.Equal(t, int64(0), mounter.decodeCount.Load()) + require.Equal(t, int32(0), processor.batchDML.Len()) + require.Empty(t, processor.insertRowCache) + }) + + t.Run("IgnoreDeleteByEventTypeCachedWithinTxn", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + countingFilter := &countingDMLTypeFilter{Filter: changefeedFilter} + processor := newDMLProcessor(mounter, mockSchemaGetter, countingFilter, false, common.DefaultMode, false) + + rawEvent := kvEvents[0] + deleteRow := insertToDeleteRow(rawEvent) + + processor.startTxn(dispatcherID, tableID, tableInfo, rawEvent.StartTs, rawEvent.CRTs, false) + for range 3 { + err = processor.appendRow(deleteRow) + require.NoError(t, err) + } + require.Equal(t, int64(1), countingFilter.dmlTypeCallCount.Load()) + require.Equal(t, int64(0), mounter.decodeCount.Load()) + + err = processor.commitTxn() + require.NoError(t, err) + + processor.startTxn(dispatcherID, tableID, tableInfo, rawEvent.StartTs+1, rawEvent.CRTs+1, false) + nextTxnDeleteRow := insertToDeleteRow(kvEvents[1]) + nextTxnDeleteRow.StartTs = rawEvent.StartTs + 1 + nextTxnDeleteRow.CRTs = rawEvent.CRTs + 1 + err = processor.appendRow(nextTxnDeleteRow) + require.NoError(t, err) + + require.Equal(t, int64(2), countingFilter.dmlTypeCallCount.Load()) + require.Equal(t, int64(0), mounter.decodeCount.Load()) + }) + // Test case 4: appendRow for update operation without unique key change t.Run("AppendUpdateRowWithoutUKChange", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) // Create a current DML event first rawEvent := kvEvents[0] @@ -1041,7 +1125,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 5: appendRow for update operation with unique key change (split update) t.Run("AppendUpdateRowWithUKChange", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) // Create a current DML event first rawEvent := kvEvents[0] @@ -1069,7 +1153,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 6: Test multiple appendRow calls t.Run("MultipleAppendRows", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) // Create a current DML event first rawEvent := kvEvents[0] @@ -1118,6 +1202,121 @@ func TestDMLProcessorAppendRow(t *testing.T) { require.True(t, ok) require.Equal(t, common.RowTypeUpdate, nextRow.RowType) }) + + t.Run("IgnoreUpdateByEventTypeSkipsDecode", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.UpdateEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + processor := newDMLProcessor(mounter, mockSchemaGetter, changefeedFilter, false, common.DefaultMode, false) + + insertSQL := "insert into test.t(id,a,b) values (103, 'a103', 'b103')" + updateSQL := "update test.t set b = 'b103_updated' where id = 103" + _, updateEvent := helper.DML2UpdateEvent("test", "t", insertSQL, updateSQL) + + processor.startTxn(dispatcherID, tableID, tableInfo, updateEvent.StartTs, updateEvent.CRTs, false) + err = processor.appendRow(updateEvent) + require.NoError(t, err) + + require.Equal(t, int64(0), mounter.decodeCount.Load()) + require.Equal(t, int32(0), processor.batchDML.Len()) + require.Empty(t, processor.insertRowCache) + }) + + t.Run("IgnoreUpdateByEventTypeSkipsSplitCheck", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.UpdateEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + processor := newDMLProcessor(mounter, mockSchemaGetter, changefeedFilter, false, common.DefaultMode, false) + + insertSQL := "insert into test.t(id,a,b) values (104, 'a104', 'b104')" + updateSQL := "update test.t set a = 'a104_updated' where id = 104" + _, updateEvent := helper.DML2UpdateEvent("test", "t", insertSQL, updateSQL) + + processor.startTxn(dispatcherID, tableID, tableInfo, updateEvent.StartTs, updateEvent.CRTs, false) + err = processor.appendRow(updateEvent) + require.NoError(t, err) + + require.Equal(t, int64(0), mounter.decodeCount.Load()) + require.Equal(t, int32(0), processor.batchDML.Len()) + require.Empty(t, processor.insertRowCache) + }) + + t.Run("SplitUpdateSkipsIgnoredDeleteBeforeDecode", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + processor := newDMLProcessor(mounter, mockSchemaGetter, changefeedFilter, false, common.DefaultMode, false) + + insertSQL := "insert into test.t(id,a,b) values (105, 'a105', 'b105')" + updateSQL := "update test.t set a = 'a105_updated' where id = 105" + _, updateEvent := helper.DML2UpdateEvent("test", "t", insertSQL, updateSQL) + + processor.startTxn(dispatcherID, tableID, tableInfo, updateEvent.StartTs, updateEvent.CRTs, false) + err = processor.appendRow(updateEvent) + require.NoError(t, err) + require.Len(t, processor.insertRowCache, 1) + + err = processor.commitTxn() + require.NoError(t, err) + + require.Equal(t, int64(1), mounter.decodeCount.Load()) + require.Equal(t, int32(1), processor.batchDML.Len()) + require.Empty(t, processor.insertRowCache) + }) + + t.Run("SplitUpdateSkipsIgnoredInsertBeforeDecode", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.InsertEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + processor := newDMLProcessor(mounter, mockSchemaGetter, changefeedFilter, false, common.DefaultMode, false) + + insertSQL := "insert into test.t(id,a,b) values (106, 'a106', 'b106')" + updateSQL := "update test.t set a = 'a106_updated' where id = 106" + _, updateEvent := helper.DML2UpdateEvent("test", "t", insertSQL, updateSQL) + + processor.startTxn(dispatcherID, tableID, tableInfo, updateEvent.StartTs, updateEvent.CRTs, false) + err = processor.appendRow(updateEvent) + require.NoError(t, err) + require.Empty(t, processor.insertRowCache) + + err = processor.commitTxn() + require.NoError(t, err) + + require.Equal(t, int64(1), mounter.decodeCount.Load()) + require.Equal(t, int32(1), processor.batchDML.Len()) + }) } func TestScanSession(t *testing.T) { diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 89298fd71e..2a6c284225 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -47,6 +47,9 @@ const ( type Filter interface { // ShouldIgnoreDML returns true if the DML event should not be handled. ShouldIgnoreDML(dmlType common.RowType, preRow, row chunk.Row, tableInfo *common.TableInfo, startTs uint64, ctx DMLFilterContext) (bool, error) + // ShouldIgnoreDMLByEventType returns true only when filters that do not need + // decoded row values can determine the DML should be ignored. + ShouldIgnoreDMLByEventType(dmlType common.RowType, tableInfo *common.TableInfo, startTs uint64) (bool, error) // ShouldDiscardDDL returns true if the DDL event should not be handled. ShouldDiscardDDL(schema, table string, ddlType timodel.ActionType, tableInfo *common.TableInfo) bool // ShouldIgnoreDDL returns true if the DDL event should not be sent to downstream. @@ -133,9 +136,29 @@ func NewFilter(cfg *config.FilterConfig, tz string, caseSensitive bool, forceRep // 4. By update-only-column config. // 5. By row value expression. func (f *filter) ShouldIgnoreDML(dmlType common.RowType, preRow, row chunk.Row, tableInfo *common.TableInfo, startTs uint64, filterContext DMLFilterContext) (bool, error) { + ignore, err := f.ShouldIgnoreDMLByEventType(dmlType, tableInfo, startTs) + if ignore || err != nil { + return ignore, err + } + if filterContext.EnableIgnoreUpdateOnlyColumns { + ignoreByUpdateOnlyColumns, err := f.updateOnlyColumnsFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) + if err != nil { + return false, err + } + if ignoreByUpdateOnlyColumns { + return true, nil + } + } + return f.dmlExprFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) +} + +func (f *filter) ShouldIgnoreDMLByEventType(dmlType common.RowType, tableInfo *common.TableInfo, startTs uint64) (bool, error) { if f.shouldIgnoreStartTs(startTs) { return true, nil } + if tableInfo == nil { + return false, nil + } if f.ShouldIgnoreTable(tableInfo.GetSchemaName(), tableInfo.GetTableName()) { return true, nil @@ -148,16 +171,7 @@ func (f *filter) ShouldIgnoreDML(dmlType common.RowType, preRow, row chunk.Row, if ignoreByEventType { return true, nil } - if filterContext.EnableIgnoreUpdateOnlyColumns { - ignoreByUpdateOnlyColumns, err := f.updateOnlyColumnsFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) - if err != nil { - return false, err - } - if ignoreByUpdateOnlyColumns { - return true, nil - } - } - return f.dmlExprFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) + return false, nil } // ShouldDiscardDDL checks if a DDL event should be discarded by conditions below: diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index a357c5b380..4984ca1ff8 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -21,6 +21,7 @@ import ( bf "github.com/pingcap/ticdc/pkg/binlog-filter" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -702,6 +703,52 @@ func TestColumnValueEqual(t *testing.T) { } } +func TestShouldIgnoreDMLByEventType(t *testing.T) { + t.Parallel() + + cfg := &config.FilterConfig{ + Rules: []string{"test.*", "!test.t4"}, + IgnoreTxnStartTs: []uint64{100}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t2"}, + IgnoreEvent: []bf.EventType{bf.InsertEvent}, + }, + { + Matcher: []string{"test.t3"}, + IgnoreDeleteValueExpr: util.AddressOf("id = 1"), + }, + }, + } + f, err := NewFilter(cfg, "UTC", false, false) + require.NoError(t, err) + + ti1 := mustNewCommonTableInfo("test", "t1", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + ti2 := mustNewCommonTableInfo("test", "t2", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + ti3 := mustNewCommonTableInfo("test", "t3", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + ti4 := mustNewCommonTableInfo("test", "t4", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + + ignore, err := f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti1, 100) + require.NoError(t, err) + require.True(t, ignore) + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti4, 0) + require.NoError(t, err) + require.True(t, ignore) + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti2, 0) + require.NoError(t, err) + require.True(t, ignore) + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeDelete, ti3, 0) + require.NoError(t, err) + require.False(t, ignore, "expression filters need decoded row values") + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti1, 0) + require.NoError(t, err) + require.False(t, ignore) +} + func TestShouldIgnoreDMLCaseSensitivity(t *testing.T) { t.Parallel()