[DNM] Branch for CSE testing#5367
Conversation
Signed-off-by: Ping Yu <yuping@pingcap.com>
Signed-off-by: Ping Yu <yuping@pingcap.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR implements a broadcast barrier mechanism in the MySQL sink causality detector to ensure ChangesMySQL Sink DML Barrier Implementation
🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements a DML barrier design for the MySQL sink, allowing block events (such as syncpoints) to wait until all prior enqueued DMLs are flushed downstream. It introduces a WriterItem struct to carry either DML events or barrier tokens, updates the ConflictDetector to broadcast barriers using a removal-only fence, and adapts the writer loop to process these items sequentially. The review feedback highlights critical improvements: fixing a potential deadlock in newDMLBarrier when workerCount is zero, using a sync.RWMutex instead of a standard Mutex to avoid serializing the DML ingestion hot path in ConflictDetector.Add, and utilizing Go's clear() built-in to prevent memory leaks when resetting pointer-holding buffers.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func newDMLBarrier(workerCount int) *dmlBarrier { | ||
| barrier := &dmlBarrier{ | ||
| done: make(chan struct{}), | ||
| remaining: workerCount, | ||
| acked: make([]bool, workerCount), | ||
| } | ||
| if workerCount == 0 { | ||
| barrier.Fail(errors.ErrMySQLTxnError.GenWithStackByArgs("mysql DML barrier has no writers")) | ||
| } | ||
| return barrier | ||
| } |
There was a problem hiding this comment.
If workerCount is 0, barrier.Fail is called when remaining is already 0. In Fail, the check if b.err != nil || b.remaining == 0 will trigger an early return, meaning b.err is never set and b.done is never closed. This causes Wait() to block forever (deadlock). Directly initialize the failed state when workerCount == 0 to avoid this.
| func newDMLBarrier(workerCount int) *dmlBarrier { | |
| barrier := &dmlBarrier{ | |
| done: make(chan struct{}), | |
| remaining: workerCount, | |
| acked: make([]bool, workerCount), | |
| } | |
| if workerCount == 0 { | |
| barrier.Fail(errors.ErrMySQLTxnError.GenWithStackByArgs("mysql DML barrier has no writers")) | |
| } | |
| return barrier | |
| } | |
| func newDMLBarrier(workerCount int) *dmlBarrier { | |
| barrier := &dmlBarrier{ | |
| done: make(chan struct{}), | |
| remaining: workerCount, | |
| acked: make([]bool, workerCount), | |
| } | |
| if workerCount == 0 { | |
| barrier.err = errors.ErrMySQLTxnError.GenWithStackByArgs("mysql DML barrier has no writers") | |
| close(barrier.done) | |
| } | |
| return barrier | |
| } |
| changefeedID common.ChangeFeedID | ||
| metricConflictDetectDuration prometheus.Observer | ||
|
|
||
| admissionMu sync.Mutex |
There was a problem hiding this comment.
Using a standard sync.Mutex for admissionMu serializes all concurrent Add calls on the DML ingestion hot path. Since barriers are rare, changing this to sync.RWMutex allows concurrent Add calls to proceed in parallel under a read lock, significantly improving throughput.
| admissionMu sync.Mutex | |
| admissionMu sync.RWMutex |
| d.admissionMu.Lock() | ||
| defer d.admissionMu.Unlock() |
There was a problem hiding this comment.
| if err := flushDMLs(dmlBuffer, rowCount); err != nil { | ||
| s.failOutstandingBarriers(err) | ||
| return err | ||
| } | ||
| dmlBuffer = dmlBuffer[:0] | ||
| workerBatchFlushDuration.Observe(time.Since(start).Seconds()) | ||
|
|
||
| // we record total time to calculate the worker busy ratio. | ||
| // so we record the total time after flushing, to unified statistics on | ||
| // flush time and total time | ||
| workerTotalDuration.Observe(time.Since(totalStart).Seconds()) | ||
| totalStart = time.Now() | ||
| buffer = buffer[:0] | ||
| itemBuffer = itemBuffer[:0] |
There was a problem hiding this comment.
Resetting dmlBuffer and itemBuffer using [:0] does not clear the underlying array elements. Since these slices hold pointers (*commonEvent.DMLEvent and Barrier), the referenced objects will not be garbage collected while the goroutine is blocked waiting for new items. Use Go's built-in clear() to nil out the elements before resetting the slice length.
if err := flushDMLs(dmlBuffer, rowCount); err != nil {
s.failOutstandingBarriers(err)
return err
}
clear(dmlBuffer)
dmlBuffer = dmlBuffer[:0]
workerBatchFlushDuration.Observe(time.Since(start).Seconds())
// we record total time to calculate the worker busy ratio.
// so we record the total time after flushing, to unified statistics on
// flush time and total time
workerTotalDuration.Observe(time.Since(totalStart).Seconds())
totalStart = time.Now()
clear(itemBuffer)
itemBuffer = itemBuffer[:0]|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@downstreamadapter/sink/mysql/causality/barrier_test.go`:
- Around line 155-162: The test currently calls cache.forceAdd directly and then
sets barrier.Fail itself, masking the production behavior; instead, locate and
invoke the real production caller that calls forceAdd (search for usages of
forceAdd) — call that higher-level method with NewBarrierItem(barrier) after
cache.out().Close(), then assert that barrier.err was set by that production
path (verify barrier.err is non-nil) rather than calling barrier.Fail manually;
keep the test focused and deterministic (use newTestBarrier, forceAdd caller,
and require.Error on barrier.err).
In `@pkg/sink/mysql/mysql_writer.go`:
- Around line 252-254: The failpoint Inject call is inside the per-event loop so
it runs once per event; move failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil)
to just before the for _, event := range events { loop (the block that calls
event.PostFlush()) so the delay is injected once per flush instead of N times;
ensure you keep the event.PostFlush() calls unchanged and only relocate the
inject call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 964400f9-9eda-461e-afff-d48fe3694f45
📒 Files selected for processing (12)
deployments/Dockerfiledesign.mddownstreamadapter/sink/mysql/causality/barrier_test.godownstreamadapter/sink/mysql/causality/conflict_detector.godownstreamadapter/sink/mysql/causality/helper_test.godownstreamadapter/sink/mysql/causality/node.godownstreamadapter/sink/mysql/causality/slot.godownstreamadapter/sink/mysql/causality/txn_cache.godownstreamadapter/sink/mysql/sink.godownstreamadapter/sink/mysql/sink_test.gopkg/sink/mysql/mysql_writer.goutils/chann/unlimited_chann.go
| func TestTxnCacheForceAddFailsWhenClosed(t *testing.T) { | ||
| cache := newTxnCache(TxnCacheOption{Count: 1, Size: 1, BlockStrategy: BlockStrategyWaitEmpty}) | ||
| cache.out().Close() | ||
|
|
||
| barrier := newTestBarrier(1) | ||
| require.False(t, cache.forceAdd(NewBarrierItem(barrier))) | ||
| barrier.Fail(errors.New("closed")) | ||
| require.Error(t, barrier.err) |
There was a problem hiding this comment.
This test masks the barrier-failure path.
Lines 160-162 only prove that forceAdd returns false; the test then sets the barrier error itself. That means it still passes if the production close path forgets to propagate the failure to the barrier, so the regression you care about remains untested. Please drive the failure through the real caller that reacts to forceAdd == false and assert on that outcome instead.
As per coding guidelines, **/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/sink/mysql/causality/barrier_test.go` around lines 155 -
162, The test currently calls cache.forceAdd directly and then sets barrier.Fail
itself, masking the production behavior; instead, locate and invoke the real
production caller that calls forceAdd (search for usages of forceAdd) — call
that higher-level method with NewBarrierItem(barrier) after cache.out().Close(),
then assert that barrier.err was set by that production path (verify barrier.err
is non-nil) rather than calling barrier.Fail manually; keep the test focused and
deterministic (use newTestBarrier, forceAdd caller, and require.Error on
barrier.err).
Source: Coding guidelines
| for _, event := range events { | ||
| failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil) | ||
| event.PostFlush() |
There was a problem hiding this comment.
Inject MySQLSinkDelayDMLPostFlush once per flush, not once per event
failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil) is inside the for _, event := range events loop, so a batch of N DML events triggers the failpoint N times; move the inject before the loop to gate the post-flush phase once per flush.
Suggested change
- for _, event := range events {
- failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil)
+ failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil)
+ for _, event := range events {
event.PostFlush()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, event := range events { | |
| failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil) | |
| event.PostFlush() | |
| failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil) | |
| for _, event := range events { | |
| event.PostFlush() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/sink/mysql/mysql_writer.go` around lines 252 - 254, The failpoint Inject
call is inside the per-event loop so it runs once per event; move
failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil) to just before the for _,
event := range events { loop (the block that calls event.PostFlush()) so the
delay is injected once per flush instead of N times; ensure you keep the
event.PostFlush() calls unchanged and only relocate the inject call.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
New Features
Documentation
Tests