diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index e21c3aeb26..a7272f9775 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -65,6 +65,7 @@ type eventBroker struct { eventStore eventstore.EventStore schemaStore schemastore.SchemaStore mounter event.Mounter + newMounter func() event.Mounter timezone string // msgSender is used to send the events to the dispatchers. msgSender messaging.MessageSender @@ -131,6 +132,7 @@ func newEventBroker( eventStore: eventStore, pdClock: pdClock, mounter: event.NewMounter(tz, integrity), + newMounter: func() event.Mounter { return event.NewMounter(tz, integrity) }, timezone: tz.String(), schemaStore: schemaStore, changefeedMap: sync.Map{}, @@ -697,7 +699,13 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { return } - scanner := newEventScanner(c.eventStore, c.schemaStore, c.mounter, task.info.GetMode()) + scanner := newEventScanner( + c.eventStore, + c.schemaStore, + c.mounter, + task.info.GetMode(), + withEventScannerMounterFactory(c.newMounter), + withEventScannerParallelDecodeWorkers(defaultParallelDecodeWorkers)) scannedBytes, events, interrupted, err := scanner.scan(ctx, task, dataRange, sl) if scannedBytes < 0 { releaseQuota(available, uint64(sl.maxDMLBytes)) diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index ca94eef006..1e08ab3dcc 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -15,6 +15,7 @@ package eventservice import ( "context" + "runtime" "time" "github.com/pingcap/log" @@ -29,6 +30,12 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/tidb/pkg/util/chunk" "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + defaultParallelDecodeWorkers = 4 + parallelDecodeMinRows = 1024 ) // eventGetter is the interface for getting iterator of events @@ -60,7 +67,45 @@ type eventScanner struct { eventGetter eventGetter schemaGetter schemaGetter mounter event.Mounter + newMounter func() event.Mounter mode int64 + + parallelDecodeWorkers int +} + +type eventScannerOption func(*eventScanner) + +func withEventScannerMounterFactory(newMounter func() event.Mounter) eventScannerOption { + return func(s *eventScanner) { + s.newMounter = newMounter + } +} + +func withEventScannerParallelDecodeWorkers(workers int) eventScannerOption { + return func(s *eventScanner) { + s.parallelDecodeWorkers = normalizeParallelDecodeWorkers(workers) + } +} + +func normalizeParallelDecodeWorkers(workers int) int { + if workers <= 1 { + return workers + } + if maxProcs := runtime.GOMAXPROCS(0); maxProcs > 0 && workers > maxProcs { + return maxProcs + } + return workers +} + +func cloneRawKVEntry(raw *common.RawKVEntry) *common.RawKVEntry { + if raw == nil { + return nil + } + cloned := *raw + cloned.Key = append([]byte(nil), raw.Key...) + cloned.Value = append([]byte(nil), raw.Value...) + cloned.OldValue = append([]byte(nil), raw.OldValue...) + return &cloned } // newEventScanner creates a new EventScanner @@ -69,13 +114,18 @@ func newEventScanner( schemaStore schemastore.SchemaStore, mounter event.Mounter, mode int64, + opts ...eventScannerOption, ) *eventScanner { - return &eventScanner{ + scanner := &eventScanner{ eventGetter: eventStore, schemaGetter: schemaStore, mounter: mounter, mode: mode, } + for _, opt := range opts { + opt(scanner) + } + return scanner } // scan retrieves and processes events from both eventStore and schemaStore based on the provided scanTask and limits. @@ -212,7 +262,10 @@ func (s *eventScanner) scanAndMergeEvents( dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), s.mode, - dispatcher.info.EnableIgnoreUpdateOnlyColumns()) + dispatcher.info.EnableIgnoreUpdateOnlyColumns(), + withDMLProcessorMounterFactory(s.newMounter), + withDMLProcessorParallelDecodeWorkers(s.parallelDecodeWorkers)) + txnRows := make([]*common.RawKVEntry, 0) for { shouldStop, err := s.checkScanConditions(session) @@ -225,6 +278,13 @@ func (s *eventScanner) scanAndMergeEvents( rawEvent, isNewTxn := iter.Next() if rawEvent == nil { + if err = processor.appendRows(session.ctx, txnRows); err != nil { + log.Error("append txn rows failed", zap.Error(err), + zap.Stringer("dispatcherID", session.dispatcherStat.id), + zap.Int64("tableID", tableID), + zap.Int64("mode", s.mode)) + return false, err + } err = finalizeScan(merger, processor, session, session.dataRange.CommitTsEnd) return false, err } @@ -237,10 +297,25 @@ func (s *eventScanner) scanAndMergeEvents( } // table is deleted, still append remaining DDL event and resolved event. if tableInfo == nil { + if err = processor.appendRows(session.ctx, txnRows); err != nil { + log.Error("append txn rows failed", zap.Error(err), + zap.Stringer("dispatcherID", session.dispatcherStat.id), + zap.Int64("tableID", tableID), + zap.Int64("mode", s.mode)) + return false, err + } err = finalizeScan(merger, processor, session, rawEvent.CRTs-1) return false, err } + if err = processor.appendRows(session.ctx, txnRows); err != nil { + log.Error("append txn rows failed", zap.Error(err), + zap.Stringer("dispatcherID", session.dispatcherStat.id), + zap.Int64("tableID", tableID), + zap.Int64("mode", s.mode)) + return false, err + } + txnRows = txnRows[:0] if err = s.commitTxn(session, merger, processor, rawEvent.CRTs, tableInfo.GetUpdateTS()); err != nil { return false, err } @@ -257,15 +332,7 @@ func (s *eventScanner) scanAndMergeEvents( } } - if err = processor.appendRow(rawEvent); err != nil { - log.Error("append row failed", zap.Error(err), - zap.Stringer("dispatcherID", session.dispatcherStat.id), - zap.Int64("tableID", tableID), - zap.Uint64("startTs", rawEvent.StartTs), - zap.Uint64("commitTs", rawEvent.CRTs), - zap.Int64("mode", s.mode)) - return false, err - } + txnRows = append(txnRows, cloneRawKVEntry(rawEvent)) } } @@ -682,13 +749,88 @@ func (t *TxnEvent) AppendRow( return t.CurrentDMLEvent.AppendRow(rawEvent, decode, filter, filterContext) } +func (t *TxnEvent) AppendDecodedRow( + decoded *decodedDMLRow, + dmlFilter filter.Filter, + filterContext filter.DMLFilterContext, +) error { + if decoded == nil || decoded.count == 0 { + return nil + } + if t.shouldSplitTxn && (t.CurrentDMLEvent.Len() >= t.DMLEventMaxRows || t.CurrentDMLEvent.GetSize() >= t.DMLEventMaxBytes) { + newDMLEvent := event.NewDMLEvent( + t.CurrentDMLEvent.DispatcherID, + t.CurrentDMLEvent.PhysicalTableID, + t.CurrentDMLEvent.StartTs, + t.CurrentDMLEvent.CommitTs, + t.CurrentDMLEvent.TableInfo) + t.CurrentDMLEvent = newDMLEvent + if err := t.BatchDML.AppendDMLEvent(newDMLEvent); err != nil { + return err + } + } + + var preRow, row chunk.Row + switch decoded.rowType { + case common.RowTypeInsert: + row = decoded.rows.GetRow(decoded.rowIndex) + case common.RowTypeDelete: + preRow = decoded.rows.GetRow(decoded.rowIndex) + default: + log.Panic("TxnEvent.AppendDecodedRow: unsupported row type", + zap.Uint8("rowType", uint8(decoded.rowType)), + zap.Any("raw", decoded.raw), + zap.Any("tableInfo", t.CurrentDMLEvent.TableInfo)) + } + + if dmlFilter != nil { + start := time.Now() + skip, err := dmlFilter.ShouldIgnoreDML( + decoded.rowType, + preRow, + row, + t.CurrentDMLEvent.TableInfo, + decoded.raw.StartTs, + filterContext) + event.DMLIgnoreComputeDuration.Observe(time.Since(start).Seconds()) + if err != nil { + return errors.Trace(err) + } + if skip { + return nil + } + } + + t.CurrentDMLEvent.Rows.Append(decoded.rows, decoded.rowIndex, decoded.rowIndex+decoded.count) + for range decoded.count { + t.CurrentDMLEvent.RowTypes = append(t.CurrentDMLEvent.RowTypes, decoded.rowType) + keyCopy := make([]byte, len(decoded.raw.Key)) + copy(keyCopy, decoded.raw.Key) + t.CurrentDMLEvent.RowKeys = append(t.CurrentDMLEvent.RowKeys, keyCopy) + } + t.CurrentDMLEvent.Length++ + t.CurrentDMLEvent.ApproximateSize += decoded.raw.GetSize() + if decoded.checksum != nil { + t.CurrentDMLEvent.Checksum = append(t.CurrentDMLEvent.Checksum, decoded.checksum) + } + return nil +} + // dmlProcessor handles DML event processing and batching type dmlProcessor struct { mounter event.Mounter + newMounter func() event.Mounter schemaGetter schemaGetter filter filter.Filter filterContext filter.DMLFilterContext + // dmlTypeFilterCache caches the pre-decode filter result within the current transaction. + // The cache is reset when a new transaction starts. It is safe because tableInfo + // and startTs are fixed for the current transaction. + dmlTypeFilterCache [3]struct { + valid bool + ignore bool + } // insertRowCache is used to cache the split update event's insert part of the current transaction. // It will be used to append to the current DML event when the transaction is finished. @@ -701,6 +843,22 @@ type dmlProcessor struct { batchDML *event.BatchDMLEvent outputRawChangeEvent bool mode int64 + + parallelDecodeWorkers int +} + +type dmlProcessorOption func(*dmlProcessor) + +func withDMLProcessorMounterFactory(newMounter func() event.Mounter) dmlProcessorOption { + return func(p *dmlProcessor) { + p.newMounter = newMounter + } +} + +func withDMLProcessorParallelDecodeWorkers(workers int) dmlProcessorOption { + return func(p *dmlProcessor) { + p.parallelDecodeWorkers = normalizeParallelDecodeWorkers(workers) + } } // newDMLProcessor creates a new DML processor @@ -708,12 +866,13 @@ func newDMLProcessor( mounter event.Mounter, schemaGetter schemaGetter, dmlFilter filter.Filter, outputRawChangeEvent bool, mode int64, enableIgnoreUpdateOnlyColumns bool, + opts ...dmlProcessorOption, ) *dmlProcessor { filterContext := filter.DMLFilterContext{} if enableIgnoreUpdateOnlyColumns { filterContext.EnableIgnoreUpdateOnlyColumns = true } - return &dmlProcessor{ + processor := &dmlProcessor{ mounter: mounter, schemaGetter: schemaGetter, filter: dmlFilter, @@ -723,6 +882,10 @@ func newDMLProcessor( outputRawChangeEvent: outputRawChangeEvent, mode: mode, } + for _, opt := range opts { + opt(processor) + } + return processor } // startTxn should be called after flush the current transaction @@ -737,6 +900,7 @@ func (p *dmlProcessor) startTxn( if p.currentTxn != nil { log.Panic("there is a transaction not flushed yet") } + p.resetDMLTypeFilterCache() var err error p.currentTxn, err = newTxnEvent(p.batchDML, dispatcherID, tableID, tableInfo, startTs, commitTs, shouldSplitTxn) return err @@ -755,6 +919,165 @@ func (p *dmlProcessor) commitTxn() error { return nil } +func (p *dmlProcessor) appendRows(ctx context.Context, rawEvents []*common.RawKVEntry) error { + if len(rawEvents) == 0 { + return nil + } + if p.canParallelDecode(rawEvents) { + return p.appendRowsParallel(ctx, rawEvents) + } + for _, rawEvent := range rawEvents { + if err := p.appendRow(rawEvent); err != nil { + return err + } + } + return nil +} + +func (p *dmlProcessor) canParallelDecode(rawEvents []*common.RawKVEntry) bool { + if p.currentTxn == nil || p.newMounter == nil || len(rawEvents) < parallelDecodeMinRows { + return false + } + if p.outputRawChangeEvent || p.parallelDecodeWorkers <= 1 { + return false + } + for _, rawEvent := range rawEvents { + if rawEvent.IsUpdate() { + return false + } + } + return true +} + +type decodedDMLRow struct { + raw *common.RawKVEntry + rowType common.RowType + rows *chunk.Chunk + checksum *integrity.Checksum + count int + rowIndex int +} + +type decodedDMLRowSegment struct { + rawEvents []*common.RawKVEntry + rowTypes []common.RowType + rows *chunk.Chunk + checksums []*integrity.Checksum +} + +func (p *dmlProcessor) appendRowsParallel(ctx context.Context, rawEvents []*common.RawKVEntry) error { + rowsToDecode := make([]*common.RawKVEntry, 0, len(rawEvents)) + rowTypes := make([]common.RowType, 0, len(rawEvents)) + for _, rawEvent := range rawEvents { + rawEvent.Key = event.RemoveKeyspacePrefix(rawEvent.Key) + rawType := rawEvent.GetType() + updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) + + rowType := common.RowTypeInsert + if rawEvent.IsDelete() { + rowType = common.RowTypeDelete + } + ignore, err := p.shouldIgnoreDMLByEventType(rowType, rawEvent.StartTs) + if err != nil { + return err + } + if ignore { + continue + } + if len(rawEvent.Value) == 0 && len(rawEvent.OldValue) == 0 { + log.Debug("the value and old_value of the raw kv entry are both nil, skip it", zap.String("raw", rawEvent.String())) + continue + } + rowsToDecode = append(rowsToDecode, rawEvent) + rowTypes = append(rowTypes, rowType) + } + if len(rowsToDecode) == 0 { + return nil + } + + workerCount := min(p.parallelDecodeWorkers, len(rowsToDecode)) + results := make([]decodedDMLRowSegment, workerCount) + g, ctx := errgroup.WithContext(ctx) + rowsPerWorker := (len(rowsToDecode) + workerCount - 1) / workerCount + for workerIndex := range workerCount { + start := workerIndex * rowsPerWorker + end := min(start+rowsPerWorker, len(rowsToDecode)) + if start >= end { + continue + } + g.Go(func() error { + decoded, err := p.decodeDMLRowSegment(ctx, p.newMounter(), rowsToDecode[start:end], rowTypes[start:end]) + if err != nil { + return err + } + results[workerIndex] = decoded + return nil + }) + } + if err := g.Wait(); err != nil { + return err + } + + for i := range results { + segment := &results[i] + for j := range segment.rawEvents { + decoded := decodedDMLRow{ + raw: segment.rawEvents[j], + rowType: segment.rowTypes[j], + rows: segment.rows, + checksum: segment.checksums[j], + count: 1, + rowIndex: j, + } + if err := p.currentTxn.AppendDecodedRow(&decoded, p.filter, p.filterContext); err != nil { + return err + } + } + } + return nil +} + +func (p *dmlProcessor) decodeDMLRowSegment( + ctx context.Context, + mounter event.Mounter, + rawEvents []*common.RawKVEntry, + rowTypes []common.RowType, +) (decodedDMLRowSegment, error) { + rows := chunk.NewChunkWithCapacity(p.currentTxn.CurrentDMLEvent.TableInfo.GetFieldSlice(), len(rawEvents)) + checksums := make([]*integrity.Checksum, len(rawEvents)) + for i, rawEvent := range rawEvents { + select { + case <-ctx.Done(): + return decodedDMLRowSegment{}, ctx.Err() + default: + } + + start := time.Now() + count, checksum, err := mounter.DecodeToChunk(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo, rows) + event.DMLDecodeDuration.Observe(time.Since(start).Seconds()) + if err != nil { + return decodedDMLRowSegment{}, err + } + if count <= 0 { + log.Panic("dmlProcessor.decodeDMLRowSegment: no rows decoded from the raw KV entry", + zap.String("raw", rawEvent.String())) + } + if count != 1 { + log.Panic("dmlProcessor.decodeDMLRowSegment: insert/delete row count should be 1", + zap.Int("count", count), + zap.Any("raw", rawEvent), + zap.Any("tableInfo", p.currentTxn.CurrentDMLEvent.TableInfo)) + } + checksums[i] = checksum + } + return decodedDMLRowSegment{ + rawEvents: rawEvents, + rowTypes: rowTypes, + rows: rows, + checksums: checksums, + }, nil +} + // appendRow appends a row to the current DML event. // // This method processes a raw KV entry and appends it to the current DML event. It handles @@ -790,6 +1113,17 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error { rawType := rawEvent.GetType() if !rawEvent.IsUpdate() { updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) + rowType := common.RowTypeInsert + if rawEvent.IsDelete() { + rowType = common.RowTypeDelete + } + ignore, err := p.shouldIgnoreDMLByEventType(rowType, rawEvent.StartTs) + if err != nil { + return err + } + if ignore { + return nil + } return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter, p.filterContext) } @@ -821,6 +1155,46 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error { return p.currentTxn.AppendRow(deleteRow, p.mounter.DecodeToChunk, p.filter, p.filterContext) } +func (p *dmlProcessor) shouldIgnoreDMLByEventType(rowType common.RowType, startTs uint64) (bool, error) { + idx := int(rowType) + if idx >= 0 && idx < len(p.dmlTypeFilterCache) { + if p.dmlTypeFilterCache[idx].valid { + return p.dmlTypeFilterCache[idx].ignore, nil + } + } + + if p.filter == nil { + p.setDMLTypeFilterCache(rowType, false) + return false, nil + } + ignore, err := p.filter.ShouldIgnoreDMLByEventType( + rowType, + p.currentTxn.CurrentDMLEvent.TableInfo, + startTs, + ) + if err != nil { + return false, errors.Trace(err) + } + p.setDMLTypeFilterCache(rowType, ignore) + return ignore, nil +} + +func (p *dmlProcessor) setDMLTypeFilterCache(rowType common.RowType, ignore bool) { + idx := int(rowType) + if idx < 0 || idx >= len(p.dmlTypeFilterCache) { + return + } + p.dmlTypeFilterCache[idx].valid = true + p.dmlTypeFilterCache[idx].ignore = ignore +} + +func (p *dmlProcessor) resetDMLTypeFilterCache() { + for i := range p.dmlTypeFilterCache { + p.dmlTypeFilterCache[i].valid = false + p.dmlTypeFilterCache[i].ignore = false + } +} + // getCurrentBatchDML returns the current batch DML event func (p *dmlProcessor) getCurrentBatchDML() *event.BatchDMLEvent { return p.batchDML diff --git a/pkg/eventservice/event_scanner_benchmark_test.go b/pkg/eventservice/event_scanner_benchmark_test.go new file mode 100644 index 0000000000..2ee1b8f117 --- /dev/null +++ b/pkg/eventservice/event_scanner_benchmark_test.go @@ -0,0 +1,310 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventservice + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/eventpb" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/logservice/eventstore" + bf "github.com/pingcap/ticdc/pkg/binlog-filter" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/filter" + "github.com/pingcap/ticdc/pkg/integrity" +) + +type disableDMLTypeFastPathFilter struct { + filter.Filter +} + +func (f disableDMLTypeFastPathFilter) ShouldIgnoreDMLByEventType( + common.RowType, + *common.TableInfo, + uint64, +) (bool, error) { + return false, nil +} + +type benchmarkEventGetter struct { + raw *common.RawKVEntry + rows int +} + +func (g *benchmarkEventGetter) GetIterator( + common.DispatcherID, + common.DataRange, +) (eventstore.EventIterator, error) { + return &singleTxnIterator{ + raw: g.raw, + rows: g.rows, + }, nil +} + +type singleTxnIterator struct { + raw *common.RawKVEntry + rows int + next int +} + +func (i *singleTxnIterator) Next() (*common.RawKVEntry, bool) { + if i.next >= i.rows { + return nil, false + } + isNewTxn := i.next == 0 + i.next++ + return i.raw, isNewTxn +} + +func (i *singleTxnIterator) Close() (int64, error) { + return int64(i.next), nil +} + +func newBenchmarkDispatcherInfo( + startTs uint64, + dispatcherID common.DispatcherID, + tableID int64, +) *mockDispatcherInfo { + return &mockDispatcherInfo{ + clusterID: 1, + serverID: "server1", + id: dispatcherID, + changefeedID: common.NewChangefeedID4Test("default", "bench"), + topic: "topic1", + span: &heartbeatpb.TableSpan{ + TableID: tableID, + StartKey: []byte("a"), + EndKey: []byte("z"), + }, + startTs: startTs, + actionType: eventpb.ActionType_ACTION_TYPE_REGISTER, + filterConfig: &eventpb.FilterConfig{ + FilterConfig: &eventpb.InnerFilterConfig{ + Rules: []string{"*.*"}, + }, + }, + bdrMode: false, + integrity: config.GetDefaultReplicaConfig().Integrity, + } +} + +func BenchmarkDMLProcessorIgnoreDelete(b *testing.B) { + helper := event.NewEventTestHelper(b) + defer helper.Close() + + ddlEvent, kvEvents := genEvents(helper, `create table test.t(id int primary key, c char(50))`, + `insert into test.t(id,c) values (1, "c1")`) + tableInfo := ddlEvent.TableInfo + tableID := ddlEvent.GetTableID() + deleteRow := insertToDeleteRow(kvEvents[0]) + dispatcherID := common.NewDispatcherID() + mockSchemaGetter := NewMockSchemaStore() + mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) + + ignoreDeleteFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + if err != nil { + b.Fatal(err) + } + + bench := func(b *testing.B, changefeedFilter filter.Filter) { + processor := newDMLProcessor( + event.NewMounter(time.UTC, &integrity.Config{}), + mockSchemaGetter, + changefeedFilter, + false, + common.DefaultMode, + false, + ) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := processor.startTxn(dispatcherID, tableID, tableInfo, deleteRow.StartTs, deleteRow.CRTs, false); err != nil { + b.Fatal(err) + } + if err := processor.appendRow(deleteRow); err != nil { + b.Fatal(err) + } + if err := processor.commitTxn(); err != nil { + b.Fatal(err) + } + processor.resetBatchDML() + } + } + + b.Run("fast_path", func(b *testing.B) { + bench(b, ignoreDeleteFilter) + }) + b.Run("after_decode", func(b *testing.B) { + bench(b, disableDMLTypeFastPathFilter{Filter: ignoreDeleteFilter}) + }) +} + +func BenchmarkEventScannerIgnoreDelete(b *testing.B) { + helper := event.NewEventTestHelper(b) + defer helper.Close() + + ddlEvent, kvEvents := genEvents(helper, `create table test.t(id int primary key, c char(50))`, + `insert into test.t(id,c) values (1, "c1")`) + tableID := ddlEvent.GetTableID() + deleteRow := insertToDeleteRow(kvEvents[0]) + dispatcherID := common.NewDispatcherID() + mockSchemaGetter := NewMockSchemaStore() + mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) + + ignoreDeleteFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + if err != nil { + b.Fatal(err) + } + + rowsPerScan := 50000 + bench := func(b *testing.B, changefeedFilter filter.Filter) { + disInfo := newBenchmarkDispatcherInfo( + ddlEvent.FinishedTs, + dispatcherID, + tableID, + ) + changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0) + changefeedStatus.filter = changefeedFilter + disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) + makeDispatcherReady(disp) + scanner := newEventScanner( + &benchmarkEventGetter{raw: deleteRow, rows: rowsPerScan}, + mockSchemaGetter, + event.NewMounter(time.UTC, &integrity.Config{}), + common.DefaultMode, + ) + dataRange := common.DataRange{ + Span: disInfo.GetTableSpan(), + CommitTsStart: ddlEvent.FinishedTs, + CommitTsEnd: deleteRow.CRTs + 1, + } + limit := scanLimit{maxDMLBytes: 1 << 60} + + b.ReportAllocs() + b.ResetTimer() + start := time.Now() + for i := 0; i < b.N; i++ { + _, _, interrupted, err := scanner.scan(context.Background(), disp, dataRange, limit) + if err != nil { + b.Fatal(err) + } + if interrupted { + b.Fatal("scan interrupted") + } + } + b.ReportMetric(float64(b.N*rowsPerScan)/time.Since(start).Seconds(), "rows/s") + } + + b.Run("fast_path", func(b *testing.B) { + bench(b, ignoreDeleteFilter) + }) + b.Run("after_decode", func(b *testing.B) { + bench(b, disableDMLTypeFastPathFilter{Filter: ignoreDeleteFilter}) + }) +} + +func BenchmarkEventScannerNoIgnoreDelete(b *testing.B) { + helper := event.NewEventTestHelper(b) + defer helper.Close() + + ddlEvent, kvEvents := genEvents(helper, `create table test.t(id int primary key, c char(50))`, + `insert into test.t(id,c) values (1, "c1")`) + tableID := ddlEvent.GetTableID() + deleteRow := insertToDeleteRow(kvEvents[0]) + dispatcherID := common.NewDispatcherID() + mockSchemaGetter := NewMockSchemaStore() + mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) + + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + }, "UTC", false, false) + if err != nil { + b.Fatal(err) + } + + rowsPerScan := 50000 + bench := func(b *testing.B, parallel bool) { + disInfo := newBenchmarkDispatcherInfo( + ddlEvent.FinishedTs, + dispatcherID, + tableID, + ) + changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0) + changefeedStatus.filter = changefeedFilter + disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) + makeDispatcherReady(disp) + opts := []eventScannerOption{} + if parallel { + opts = append(opts, + withEventScannerMounterFactory(func() event.Mounter { + return event.NewMounter(time.UTC, &integrity.Config{}) + }), + withEventScannerParallelDecodeWorkers(defaultParallelDecodeWorkers)) + } + scanner := newEventScanner( + &benchmarkEventGetter{raw: deleteRow, rows: rowsPerScan}, + mockSchemaGetter, + event.NewMounter(time.UTC, &integrity.Config{}), + common.DefaultMode, + opts..., + ) + dataRange := common.DataRange{ + Span: disInfo.GetTableSpan(), + CommitTsStart: ddlEvent.FinishedTs, + CommitTsEnd: deleteRow.CRTs + 1, + } + limit := scanLimit{maxDMLBytes: 1 << 60} + + b.ReportAllocs() + b.ResetTimer() + start := time.Now() + for i := 0; i < b.N; i++ { + _, _, interrupted, err := scanner.scan(context.Background(), disp, dataRange, limit) + if err != nil { + b.Fatal(err) + } + if interrupted { + b.Fatal("scan interrupted") + } + } + b.ReportMetric(float64(b.N*rowsPerScan)/time.Since(start).Seconds(), "rows/s") + } + + b.Run("sequential", func(b *testing.B) { + bench(b, false) + }) + b.Run("parallel", func(b *testing.B) { + bench(b, true) + }) +} diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 28520686b2..e5653d02c4 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -16,6 +16,7 @@ package eventservice import ( "context" "errors" + "fmt" "testing" "time" @@ -23,8 +24,10 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/eventstore" "github.com/pingcap/ticdc/logservice/schemastore" + bf "github.com/pingcap/ticdc/pkg/binlog-filter" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/integrity" "github.com/pingcap/tidb/pkg/util/chunk" @@ -32,8 +35,34 @@ import ( "go.uber.org/atomic" ) +type rawEventGetter struct { + events []*common.RawKVEntry +} + +func (g *rawEventGetter) GetIterator( + common.DispatcherID, + common.DataRange, +) (eventstore.EventIterator, error) { + return &mockEventIterator{events: append([]*common.RawKVEntry(nil), g.events...)}, nil +} + type mockMounter struct { event.Mounter + decodeCount atomic.Int64 +} + +type countingDMLTypeFilter struct { + filter.Filter + dmlTypeCallCount atomic.Int64 +} + +func (f *countingDMLTypeFilter) ShouldIgnoreDMLByEventType( + dmlType common.RowType, + tableInfo *common.TableInfo, + startTs uint64, +) (bool, error) { + f.dmlTypeCallCount.Inc() + return f.Filter.ShouldIgnoreDMLByEventType(dmlType, tableInfo, startTs) } type stubEventGetter struct { @@ -52,6 +81,7 @@ func makeDispatcherReady(disp *dispatcherStat) { } func (m *mockMounter) DecodeToChunk(rawKV *common.RawKVEntry, tableInfo *common.TableInfo, chk *chunk.Chunk) (int, *integrity.Checksum, error) { + m.decodeCount.Inc() if rawKV.IsUpdate() { return 2, nil, nil } else { @@ -964,13 +994,13 @@ func TestDMLProcessorAppendRow(t *testing.T) { dispatcherID := common.NewDispatcherID() // Create a mock mounter and schema getter - mockMounter := &mockMounter{} + testMounter := &mockMounter{} mockSchemaGetter := NewMockSchemaStore() mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) // Test case 1: appendRow before txn started, illegal usage. t.Run("NoCurrentDMLEvent", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) rawEvent := kvEvents[0] require.Panics(t, func() { @@ -980,7 +1010,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 2: appendRow for insert operation (non-update) t.Run("AppendInsertRow", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) firstEvent := kvEvents[0] processor.startTxn(dispatcherID, tableID, tableInfo, firstEvent.StartTs, firstEvent.CRTs, false) @@ -998,7 +1028,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 3: appendRow for delete operation (non-update) t.Run("AppendDeleteRow", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) rawEvent := kvEvents[0] deleteRow := insertToDeleteRow(rawEvent) @@ -1015,9 +1045,75 @@ func TestDMLProcessorAppendRow(t *testing.T) { require.Empty(t, processor.insertRowCache) }) + t.Run("IgnoreDeleteByEventTypeSkipsDecode", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + processor := newDMLProcessor(mounter, mockSchemaGetter, changefeedFilter, false, common.DefaultMode, false) + + rawEvent := kvEvents[0] + deleteRow := insertToDeleteRow(rawEvent) + + processor.startTxn(dispatcherID, tableID, tableInfo, rawEvent.StartTs, rawEvent.CRTs, false) + err = processor.appendRow(deleteRow) + require.NoError(t, err) + + require.Equal(t, int64(0), mounter.decodeCount.Load()) + require.Equal(t, int32(0), processor.batchDML.Len()) + require.Empty(t, processor.insertRowCache) + }) + + t.Run("IgnoreDeleteByEventTypeCachedWithinTxn", func(t *testing.T) { + mounter := &mockMounter{} + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t"}, + IgnoreEvent: []bf.EventType{bf.DeleteEvent}, + }, + }, + }, "UTC", false, false) + require.NoError(t, err) + countingFilter := &countingDMLTypeFilter{Filter: changefeedFilter} + processor := newDMLProcessor(mounter, mockSchemaGetter, countingFilter, false, common.DefaultMode, false) + + rawEvent := kvEvents[0] + deleteRow := insertToDeleteRow(rawEvent) + + processor.startTxn(dispatcherID, tableID, tableInfo, rawEvent.StartTs, rawEvent.CRTs, false) + for range 3 { + err = processor.appendRow(deleteRow) + require.NoError(t, err) + } + require.Equal(t, int64(1), countingFilter.dmlTypeCallCount.Load()) + require.Equal(t, int64(0), mounter.decodeCount.Load()) + + err = processor.commitTxn() + require.NoError(t, err) + + processor.startTxn(dispatcherID, tableID, tableInfo, rawEvent.StartTs+1, rawEvent.CRTs+1, false) + nextTxnDeleteRow := insertToDeleteRow(kvEvents[1]) + nextTxnDeleteRow.StartTs = rawEvent.StartTs + 1 + nextTxnDeleteRow.CRTs = rawEvent.CRTs + 1 + err = processor.appendRow(nextTxnDeleteRow) + require.NoError(t, err) + + require.Equal(t, int64(2), countingFilter.dmlTypeCallCount.Load()) + require.Equal(t, int64(0), mounter.decodeCount.Load()) + }) + // Test case 4: appendRow for update operation without unique key change t.Run("AppendUpdateRowWithoutUKChange", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) // Create a current DML event first rawEvent := kvEvents[0] @@ -1041,7 +1137,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 5: appendRow for update operation with unique key change (split update) t.Run("AppendUpdateRowWithUKChange", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) // Create a current DML event first rawEvent := kvEvents[0] @@ -1069,7 +1165,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 6: Test multiple appendRow calls t.Run("MultipleAppendRows", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) + processor := newDMLProcessor(testMounter, mockSchemaGetter, nil, false, common.DefaultMode, false) // Create a current DML event first rawEvent := kvEvents[0] @@ -1642,6 +1738,111 @@ func TestGetTableInfo4Txn(t *testing.T) { }) } +func TestEventScannerParallelDecodeMatchesSequential(t *testing.T) { + helper := event.NewEventTestHelper(t) + defer helper.Close() + + rowCount := parallelDecodeMinRows + 1 + ddlEvent, _ := genEvents(helper, `create table test.t(id int primary key, c char(50))`) + tableID := ddlEvent.GetTableID() + kvEvents := make([]*common.RawKVEntry, 0, rowCount) + for i := 1; i <= rowCount; i++ { + events := helper.DML2RawKv( + tableID, + ddlEvent.FinishedTs, + fmt.Sprintf(`insert into test.t(id,c) values (%d, "c%d")`, i, i)) + require.Len(t, events, 1) + kvEvents = append(kvEvents, events[0]) + } + + txnStartTs := kvEvents[0].StartTs + txnCommitTs := kvEvents[0].CRTs + deleteRows := make([]*common.RawKVEntry, 0, len(kvEvents)) + for _, kvEvent := range kvEvents { + deleteRow := insertToDeleteRow(kvEvent) + deleteRow.StartTs = txnStartTs + deleteRow.CRTs = txnCommitTs + deleteRows = append(deleteRows, deleteRow) + } + dispatcherID := common.NewDispatcherID() + mockSchemaGetter := NewMockSchemaStore() + mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) + + changefeedFilter, err := filter.NewFilter(&config.FilterConfig{ + Rules: []string{"test.*"}, + }, "UTC", false, false) + require.NoError(t, err) + + runScan := func(parallel bool) []event.Event { + disInfo := newBenchmarkDispatcherInfo(ddlEvent.FinishedTs, dispatcherID, tableID) + changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0) + changefeedStatus.filter = changefeedFilter + disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) + makeDispatcherReady(disp) + + opts := []eventScannerOption{} + if parallel { + opts = append(opts, + withEventScannerMounterFactory(func() event.Mounter { + return event.NewMounter(time.UTC, &integrity.Config{}) + }), + withEventScannerParallelDecodeWorkers(defaultParallelDecodeWorkers)) + } + scanner := newEventScanner( + &rawEventGetter{events: deleteRows}, + mockSchemaGetter, + event.NewMounter(time.UTC, &integrity.Config{}), + common.DefaultMode, + opts..., + ) + dataRange := common.DataRange{ + Span: disInfo.GetTableSpan(), + CommitTsStart: ddlEvent.FinishedTs, + CommitTsEnd: deleteRows[len(deleteRows)-1].CRTs + 1, + } + _, events, interrupted, err := scanner.scan(context.Background(), disp, dataRange, scanLimit{maxDMLBytes: 1 << 60}) + require.NoError(t, err) + require.False(t, interrupted) + return events + } + + sequentialEvents := runScan(false) + parallelEvents := runScan(true) + require.Equal(t, len(sequentialEvents), len(parallelEvents)) + require.Equal(t, sequentialEvents[len(sequentialEvents)-1].GetCommitTs(), parallelEvents[len(parallelEvents)-1].GetCommitTs()) + + seqBatch, ok := sequentialEvents[0].(*event.BatchDMLEvent) + require.True(t, ok) + parBatch, ok := parallelEvents[0].(*event.BatchDMLEvent) + require.True(t, ok) + require.Equal(t, seqBatch.Len(), parBatch.Len()) + require.Equal(t, seqBatch.GetCommitTs(), parBatch.GetCommitTs()) + require.Equal(t, len(seqBatch.DMLEvents), len(parBatch.DMLEvents)) + require.Equal(t, int32(rowCount), parBatch.Len()) + + collectDeleteIDs := func(batch *event.BatchDMLEvent) []int64 { + ids := make([]int64, 0, batch.Len()) + for _, dml := range batch.DMLEvents { + for { + row, ok := dml.GetNextRow() + if !ok { + break + } + require.Equal(t, common.RowTypeDelete, row.RowType) + ids = append(ids, row.PreRow.GetInt64(0)) + } + } + return ids + } + seqIDs := collectDeleteIDs(seqBatch) + parIDs := collectDeleteIDs(parBatch) + require.Equal(t, seqIDs, parIDs) + require.Len(t, parIDs, rowCount) + for i, id := range parIDs { + require.Equal(t, int64(i+1), id) + } +} + func TestHasDDLAtLastCommitTs(t *testing.T) { helper := event.NewEventTestHelper(t) defer helper.Close() diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 89298fd71e..2a6c284225 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -47,6 +47,9 @@ const ( type Filter interface { // ShouldIgnoreDML returns true if the DML event should not be handled. ShouldIgnoreDML(dmlType common.RowType, preRow, row chunk.Row, tableInfo *common.TableInfo, startTs uint64, ctx DMLFilterContext) (bool, error) + // ShouldIgnoreDMLByEventType returns true only when filters that do not need + // decoded row values can determine the DML should be ignored. + ShouldIgnoreDMLByEventType(dmlType common.RowType, tableInfo *common.TableInfo, startTs uint64) (bool, error) // ShouldDiscardDDL returns true if the DDL event should not be handled. ShouldDiscardDDL(schema, table string, ddlType timodel.ActionType, tableInfo *common.TableInfo) bool // ShouldIgnoreDDL returns true if the DDL event should not be sent to downstream. @@ -133,9 +136,29 @@ func NewFilter(cfg *config.FilterConfig, tz string, caseSensitive bool, forceRep // 4. By update-only-column config. // 5. By row value expression. func (f *filter) ShouldIgnoreDML(dmlType common.RowType, preRow, row chunk.Row, tableInfo *common.TableInfo, startTs uint64, filterContext DMLFilterContext) (bool, error) { + ignore, err := f.ShouldIgnoreDMLByEventType(dmlType, tableInfo, startTs) + if ignore || err != nil { + return ignore, err + } + if filterContext.EnableIgnoreUpdateOnlyColumns { + ignoreByUpdateOnlyColumns, err := f.updateOnlyColumnsFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) + if err != nil { + return false, err + } + if ignoreByUpdateOnlyColumns { + return true, nil + } + } + return f.dmlExprFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) +} + +func (f *filter) ShouldIgnoreDMLByEventType(dmlType common.RowType, tableInfo *common.TableInfo, startTs uint64) (bool, error) { if f.shouldIgnoreStartTs(startTs) { return true, nil } + if tableInfo == nil { + return false, nil + } if f.ShouldIgnoreTable(tableInfo.GetSchemaName(), tableInfo.GetTableName()) { return true, nil @@ -148,16 +171,7 @@ func (f *filter) ShouldIgnoreDML(dmlType common.RowType, preRow, row chunk.Row, if ignoreByEventType { return true, nil } - if filterContext.EnableIgnoreUpdateOnlyColumns { - ignoreByUpdateOnlyColumns, err := f.updateOnlyColumnsFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) - if err != nil { - return false, err - } - if ignoreByUpdateOnlyColumns { - return true, nil - } - } - return f.dmlExprFilter.shouldSkipDML(dmlType, preRow, row, tableInfo) + return false, nil } // ShouldDiscardDDL checks if a DDL event should be discarded by conditions below: diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index a357c5b380..4984ca1ff8 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -21,6 +21,7 @@ import ( bf "github.com/pingcap/ticdc/pkg/binlog-filter" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -702,6 +703,52 @@ func TestColumnValueEqual(t *testing.T) { } } +func TestShouldIgnoreDMLByEventType(t *testing.T) { + t.Parallel() + + cfg := &config.FilterConfig{ + Rules: []string{"test.*", "!test.t4"}, + IgnoreTxnStartTs: []uint64{100}, + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.t2"}, + IgnoreEvent: []bf.EventType{bf.InsertEvent}, + }, + { + Matcher: []string{"test.t3"}, + IgnoreDeleteValueExpr: util.AddressOf("id = 1"), + }, + }, + } + f, err := NewFilter(cfg, "UTC", false, false) + require.NoError(t, err) + + ti1 := mustNewCommonTableInfo("test", "t1", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + ti2 := mustNewCommonTableInfo("test", "t2", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + ti3 := mustNewCommonTableInfo("test", "t3", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + ti4 := mustNewCommonTableInfo("test", "t4", []*model.ColumnInfo{newColumnInfo(1, "id", mysql.TypeLong, mysql.PriKeyFlag)}, nil) + + ignore, err := f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti1, 100) + require.NoError(t, err) + require.True(t, ignore) + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti4, 0) + require.NoError(t, err) + require.True(t, ignore) + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti2, 0) + require.NoError(t, err) + require.True(t, ignore) + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeDelete, ti3, 0) + require.NoError(t, err) + require.False(t, ignore, "expression filters need decoded row values") + + ignore, err = f.ShouldIgnoreDMLByEventType(common.RowTypeInsert, ti1, 0) + require.NoError(t, err) + require.False(t, ignore) +} + func TestShouldIgnoreDMLCaseSensitivity(t *testing.T) { t.Parallel()