Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion pkg/eventservice/event_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,13 +682,23 @@ func (t *TxnEvent) AppendRow(
return t.CurrentDMLEvent.AppendRow(rawEvent, decode, filter, filterContext)
}

// dmlTypeFilterCacheSize follows common.RowType iota values: delete, insert, update.
const dmlTypeFilterCacheSize = int(common.RowTypeUpdate) + 1

// dmlProcessor handles DML event processing and batching
type dmlProcessor struct {
mounter event.Mounter
schemaGetter schemaGetter

filter filter.Filter
filterContext filter.DMLFilterContext
// dmlTypeFilterCache caches the pre-decode filter result within the current transaction.
// The cache is reset when a new transaction starts. It is safe because tableInfo
// and startTs are fixed for the current transaction.
dmlTypeFilterCache [dmlTypeFilterCacheSize]struct {
valid bool
ignore bool
}

// insertRowCache is used to cache the split update event's insert part of the current transaction.
// It will be used to append to the current DML event when the transaction is finished.
Expand Down Expand Up @@ -737,6 +747,7 @@ func (p *dmlProcessor) startTxn(
if p.currentTxn != nil {
log.Panic("there is a transaction not flushed yet")
}
p.resetDMLTypeFilterCache()
var err error
p.currentTxn, err = newTxnEvent(p.batchDML, dispatcherID, tableID, tableInfo, startTs, commitTs, shouldSplitTxn)
return err
Expand Down Expand Up @@ -790,13 +801,29 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error {
rawType := rawEvent.GetType()
if !rawEvent.IsUpdate() {
updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false)
ignore, err := p.shouldIgnoreRawEventByDMLType(rawEvent)
if err != nil {
return err
}
if ignore {
return nil
}
return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter, p.filterContext)
}

var (
shouldSplit bool
err error
)
ignore, err := p.shouldIgnoreDMLByEventType(common.RowTypeUpdate, rawEvent.StartTs)
if err != nil {
return err
}
if ignore {
updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false)
return nil
}

if !p.outputRawChangeEvent {
shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
if err != nil {
Expand All @@ -817,10 +844,73 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error {
if err != nil {
return err
}
p.insertRowCache = append(p.insertRowCache, insertRow)
ignoreInsert, err := p.shouldIgnoreRawEventByDMLType(insertRow)
if err != nil {
return err
}
if !ignoreInsert {
p.insertRowCache = append(p.insertRowCache, insertRow)
}
ignoreDelete, err := p.shouldIgnoreRawEventByDMLType(deleteRow)
if err != nil {
return err
}
if ignoreDelete {
return nil
}
return p.currentTxn.AppendRow(deleteRow, p.mounter.DecodeToChunk, p.filter, p.filterContext)
}

func (p *dmlProcessor) shouldIgnoreRawEventByDMLType(rawEvent *common.RawKVEntry) (bool, error) {
rowType := common.RowTypeInsert
if rawEvent.IsDelete() {
rowType = common.RowTypeDelete
} else if rawEvent.IsUpdate() {
rowType = common.RowTypeUpdate
}
return p.shouldIgnoreDMLByEventType(rowType, rawEvent.StartTs)
}

func (p *dmlProcessor) shouldIgnoreDMLByEventType(rowType common.RowType, startTs uint64) (bool, error) {
idx := int(rowType)
if idx >= 0 && idx < len(p.dmlTypeFilterCache) {
if p.dmlTypeFilterCache[idx].valid {
return p.dmlTypeFilterCache[idx].ignore, nil
}
}

if p.filter == nil {
p.setDMLTypeFilterCache(rowType, false)
return false, nil
}
ignore, err := p.filter.ShouldIgnoreDMLByEventType(
rowType,
p.currentTxn.CurrentDMLEvent.TableInfo,
startTs,
)
if err != nil {
return false, errors.Trace(err)
}
p.setDMLTypeFilterCache(rowType, ignore)
return ignore, nil
}

func (p *dmlProcessor) setDMLTypeFilterCache(rowType common.RowType, ignore bool) {
idx := int(rowType)
if idx < 0 || idx >= len(p.dmlTypeFilterCache) {
return
}
p.dmlTypeFilterCache[idx].valid = true
p.dmlTypeFilterCache[idx].ignore = ignore
}

func (p *dmlProcessor) resetDMLTypeFilterCache() {
for i := range p.dmlTypeFilterCache {
p.dmlTypeFilterCache[i].valid = false
p.dmlTypeFilterCache[i].ignore = false
}
}

// getCurrentBatchDML returns the current batch DML event
func (p *dmlProcessor) getCurrentBatchDML() *event.BatchDMLEvent {
return p.batchDML
Expand Down
235 changes: 235 additions & 0 deletions pkg/eventservice/event_scanner_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package eventservice

import (
"context"
"testing"
"time"

"github.com/pingcap/ticdc/eventpb"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/eventstore"
bf "github.com/pingcap/ticdc/pkg/binlog-filter"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/integrity"
)

type disableDMLTypeFastPathFilter struct {
filter.Filter
}

func (f disableDMLTypeFastPathFilter) ShouldIgnoreDMLByEventType(
common.RowType,
*common.TableInfo,
uint64,
) (bool, error) {
return false, nil
}

type benchmarkEventGetter struct {
raw *common.RawKVEntry
rows int
}

func (g *benchmarkEventGetter) GetIterator(
common.DispatcherID,
common.DataRange,
) (eventstore.EventIterator, error) {
return &singleTxnIterator{
raw: g.raw,
rows: g.rows,
}, nil
}

type singleTxnIterator struct {
raw *common.RawKVEntry
rows int
next int
}

func (i *singleTxnIterator) Next() (*common.RawKVEntry, bool) {
if i.next >= i.rows {
return nil, false
}
isNewTxn := i.next == 0
i.next++
return i.raw, isNewTxn
}

func (i *singleTxnIterator) Close() (int64, error) {
return int64(i.next), nil
}

func newBenchmarkDispatcherInfo(
startTs uint64,
dispatcherID common.DispatcherID,
tableID int64,
) *mockDispatcherInfo {
return &mockDispatcherInfo{
clusterID: 1,
serverID: "server1",
id: dispatcherID,
changefeedID: common.NewChangefeedID4Test("default", "bench"),
topic: "topic1",
span: &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("a"),
EndKey: []byte("z"),
},
startTs: startTs,
actionType: eventpb.ActionType_ACTION_TYPE_REGISTER,
filterConfig: &eventpb.FilterConfig{
FilterConfig: &eventpb.InnerFilterConfig{
Rules: []string{"*.*"},
},
},
bdrMode: false,
integrity: config.GetDefaultReplicaConfig().Integrity,
}
}

func BenchmarkDMLProcessorIgnoreDelete(b *testing.B) {
helper := event.NewEventTestHelper(b)
defer helper.Close()

ddlEvent, kvEvents := genEvents(helper, `create table test.t(id int primary key, c char(50))`,
`insert into test.t(id,c) values (1, "c1")`)
tableInfo := ddlEvent.TableInfo
tableID := ddlEvent.GetTableID()
deleteRow := insertToDeleteRow(kvEvents[0])
dispatcherID := common.NewDispatcherID()
mockSchemaGetter := NewMockSchemaStore()
mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent)

ignoreDeleteFilter, err := filter.NewFilter(&config.FilterConfig{
Rules: []string{"test.*"},
EventFilters: []*config.EventFilterRule{
{
Matcher: []string{"test.t"},
IgnoreEvent: []bf.EventType{bf.DeleteEvent},
},
},
}, "UTC", false, false)
if err != nil {
b.Fatal(err)
}

bench := func(b *testing.B, changefeedFilter filter.Filter) {
processor := newDMLProcessor(
event.NewMounter(time.UTC, &integrity.Config{}),
mockSchemaGetter,
changefeedFilter,
false,
common.DefaultMode,
false,
)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := processor.startTxn(dispatcherID, tableID, tableInfo, deleteRow.StartTs, deleteRow.CRTs, false); err != nil {
b.Fatal(err)
}
if err := processor.appendRow(deleteRow); err != nil {
b.Fatal(err)
}
if err := processor.commitTxn(); err != nil {
b.Fatal(err)
}
processor.resetBatchDML()
}
}

b.Run("fast_path", func(b *testing.B) {
bench(b, ignoreDeleteFilter)
})
b.Run("after_decode", func(b *testing.B) {
bench(b, disableDMLTypeFastPathFilter{Filter: ignoreDeleteFilter})
})
}

func BenchmarkEventScannerIgnoreDelete(b *testing.B) {
helper := event.NewEventTestHelper(b)
defer helper.Close()

ddlEvent, kvEvents := genEvents(helper, `create table test.t(id int primary key, c char(50))`,
`insert into test.t(id,c) values (1, "c1")`)
tableID := ddlEvent.GetTableID()
deleteRow := insertToDeleteRow(kvEvents[0])
dispatcherID := common.NewDispatcherID()
mockSchemaGetter := NewMockSchemaStore()
mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent)

ignoreDeleteFilter, err := filter.NewFilter(&config.FilterConfig{
Rules: []string{"test.*"},
EventFilters: []*config.EventFilterRule{
{
Matcher: []string{"test.t"},
IgnoreEvent: []bf.EventType{bf.DeleteEvent},
},
},
}, "UTC", false, false)
if err != nil {
b.Fatal(err)
}

rowsPerScan := 50000
bench := func(b *testing.B, changefeedFilter filter.Filter) {
disInfo := newBenchmarkDispatcherInfo(
ddlEvent.FinishedTs,
dispatcherID,
tableID,
)
changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0)
changefeedStatus.filter = changefeedFilter
disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus)
makeDispatcherReady(disp)
scanner := newEventScanner(
&benchmarkEventGetter{raw: deleteRow, rows: rowsPerScan},
mockSchemaGetter,
event.NewMounter(time.UTC, &integrity.Config{}),
common.DefaultMode,
)
dataRange := common.DataRange{
Span: disInfo.GetTableSpan(),
CommitTsStart: ddlEvent.FinishedTs,
CommitTsEnd: deleteRow.CRTs + 1,
}
limit := scanLimit{maxDMLBytes: 1 << 60}

b.ReportAllocs()
b.ResetTimer()
start := time.Now()
for i := 0; i < b.N; i++ {
_, _, interrupted, err := scanner.scan(context.Background(), disp, dataRange, limit)
if err != nil {
b.Fatal(err)
}
if interrupted {
b.Fatal("scan interrupted")
}
}
b.ReportMetric(float64(b.N*rowsPerScan)/time.Since(start).Seconds(), "rows/s")
}

b.Run("fast_path", func(b *testing.B) {
bench(b, ignoreDeleteFilter)
})
b.Run("after_decode", func(b *testing.B) {
bench(b, disableDMLTypeFastPathFilter{Filter: ignoreDeleteFilter})
})
}
Loading
Loading