eventservice: improve dml processing efficiency #5473
Conversation
|
Skipping CI for Draft Pull Request. |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces parallel decoding of DML events in the eventScanner and dmlProcessor to improve performance, along with a fast-path cache (dmlTypeFilterCache) to skip decoding for ignored event types early. The review feedback highlights a critical bug where the dmlTypeFilterCache array size of 3 is too small to cache delete events (index 3), causing the cache to be bypassed. Additionally, the reviewer suggested optimizing sequential execution by conditionally bypassing the buffering and cloning of raw KV entries when parallel decoding is disabled.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| dispatcher.info.EnableIgnoreUpdateOnlyColumns(), | ||
| withDMLProcessorMounterFactory(s.newMounter), | ||
| withDMLProcessorParallelDecodeWorkers(s.parallelDecodeWorkers)) | ||
| txnRows := make([]*common.RawKVEntry, 0) |
There was a problem hiding this comment.
Define a boolean flag enableParallel to check if parallel decoding is actually enabled and possible. This allows us to conditionally bypass buffering and cloning for sequential runs, avoiding unnecessary allocations and CPU overhead.
| txnRows := make([]*common.RawKVEntry, 0) | |
| enableParallel := s.parallelDecodeWorkers > 1 && s.newMounter != nil && !dispatcher.info.IsOutputRawChangeEvent() | |
| txnRows := make([]*common.RawKVEntry, 0) |
| zap.Int64("mode", s.mode)) | ||
| return false, err | ||
| } | ||
| txnRows = append(txnRows, cloneRawKVEntry(rawEvent)) |
There was a problem hiding this comment.
If parallel decoding is enabled, buffer and clone the raw KV entry. Otherwise, directly append the row sequentially to avoid the memory allocation and CPU overhead of cloning.
if enableParallel {
txnRows = append(txnRows, cloneRawKVEntry(rawEvent))
} else {
if err = processor.appendRow(rawEvent); err != nil {
log.Error("append row failed", zap.Error(err),
zap.Stringer("dispatcherID", session.dispatcherStat.id),
zap.Int64("tableID", tableID),
zap.Uint64("startTs", rawEvent.StartTs),
zap.Uint64("commitTs", rawEvent.CRTs),
zap.Int64("mode", s.mode))
return false, err
}
}| dmlTypeFilterCache [3]struct { | ||
| valid bool | ||
| ignore bool | ||
| } |
There was a problem hiding this comment.
The dmlTypeFilterCache array size of 3 is too small to cache RowTypeDelete events. In TiCDC, common.RowType is defined as:
const (
RowTypeUnknown RowType = iota
RowTypeInsert
RowTypeUpdate
RowTypeDelete
)This means RowTypeDelete has an integer value of 3. Since the array size is 3, any attempt to cache or retrieve a delete event will result in an out-of-bounds index (idx >= 3), causing the cache to be completely bypassed for all delete events. Increasing the array size to 4 fixes this issue and ensures delete events are cached correctly.
| dmlTypeFilterCache [3]struct { | |
| valid bool | |
| ignore bool | |
| } | |
| dmlTypeFilterCache [4]struct { | |
| valid bool | |
| ignore bool | |
| } |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note