Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deployments/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ WORKDIR /go/src/github.com/pingcap/ticdc
COPY . .
ENV CDC_ENABLE_VENDOR=0
ARG NEXT_GEN
RUN if [ -n "${NEXT_GEN}" ]; then NEXT_GEN=1 make cdc; else make cdc; fi
RUN if [ -n "${NEXT_GEN}" ]; then NEXT_GEN=1 LEGACY_SAFEPOINT=1 make cdc; else make cdc; fi

FROM alpine:3.15
RUN apk add --no-cache tzdata bash curl socat
Expand Down
366 changes: 366 additions & 0 deletions design.md

Large diffs are not rendered by default.

183 changes: 183 additions & 0 deletions downstreamadapter/sink/mysql/causality/barrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package causality

import (
"errors"

Check failure on line 17 in downstreamadapter/sink/mysql/causality/barrier_test.go

View workflow job for this annotation

GitHub Actions / Check

import 'errors' is not allowed from list 'no-direct-errors': use github.com/pingcap/ticdc/pkg/errors outside pkg/errors (depguard)
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type testBarrier struct {
mu sync.Mutex
done chan struct{}
err error
remaining int
doneFuncs []func()
}

func newTestBarrier(workerCount int) *testBarrier {
return &testBarrier{done: make(chan struct{}), remaining: workerCount}
}

func (b *testBarrier) Ack(int) {
b.mu.Lock()
if b.remaining == 0 {
b.mu.Unlock()
return
}
b.remaining--
if b.remaining > 0 {
b.mu.Unlock()
return
}
doneFuncs := b.doneFuncs
b.doneFuncs = nil
close(b.done)
b.mu.Unlock()
for _, f := range doneFuncs {
f()
}
}

func (b *testBarrier) Fail(err error) {
b.mu.Lock()
if b.remaining == 0 {
b.mu.Unlock()
return
}
b.err = err
b.remaining = 0
doneFuncs := b.doneFuncs
b.doneFuncs = nil
close(b.done)
b.mu.Unlock()
for _, f := range doneFuncs {
f()
}
}

func (b *testBarrier) OnDone(f func()) {
b.mu.Lock()
if b.remaining == 0 {
b.mu.Unlock()
f()
return
}
b.doneFuncs = append(b.doneFuncs, f)
b.mu.Unlock()
}

func TestBroadcastBarrierEnqueuesOneTokenPerWriter(t *testing.T) {
detector := New(4, TxnCacheOption{Count: 2, Size: 1, BlockStrategy: BlockStrategyWaitEmpty}, testChangefeedID())
barrier := newTestBarrier(2)

require.NoError(t, detector.BroadcastBarrier(barrier))

for i := 0; i < 2; i++ {

Check failure on line 91 in downstreamadapter/sink/mysql/causality/barrier_test.go

View workflow job for this annotation

GitHub Actions / Check

rangeint: for loop can be modernized using range over int (modernize)
items, ok := detector.GetOutChByCacheID(i).GetMultipleNoGroup(make([]WriterItem, 0, 1))
require.True(t, ok)
require.Len(t, items, 1)
require.True(t, barrier == items[0].Barrier)
items[0].Barrier.Ack(i)
}

require.Eventually(t, func() bool {
select {
case <-barrier.done:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
}

func TestBroadcastBarrierReturnsErrorWhenDetectorClosed(t *testing.T) {
detector := New(4, TxnCacheOption{Count: 1, Size: 1, BlockStrategy: BlockStrategyWaitEmpty}, testChangefeedID())
detector.CloseNotifiedNodes()

err := detector.BroadcastBarrier(newTestBarrier(1))
require.Error(t, err)
}

func TestRemovalOnlyFenceDoesNotResolveDependersOnAssignment(t *testing.T) {
assigned := false
fence := &Node{id: genNextNodeID(), assignedTo: unassigned, resolveByRemovalOnly: true}
fence.RandCacheID = func() cacheID { return 0 }
fence.TrySendToTxnCache = func(cacheID) bool { return true }
fence.OnNotified = func(callback func()) { callback() }

depender := &Node{id: genNextNodeID(), assignedTo: unassigned}
depender.RandCacheID = func() cacheID { return 0 }
depender.TrySendToTxnCache = func(cacheID) bool {
assigned = true
return true
}
depender.OnNotified = func(callback func()) { callback() }

depender.dependOn(map[int64]*Node{fence.nodeID(): fence})
fence.maybeResolve()
require.False(t, assigned)

fence.remove()
require.True(t, assigned)
}

func TestTxnCacheForceAddBypassesBlockedCache(t *testing.T) {
cache := newTxnCache(TxnCacheOption{Count: 1, Size: 1, BlockStrategy: BlockStrategyWaitEmpty})
require.True(t, cache.add(NewDMLItem(nil)))
require.True(t, cache.add(NewDMLItem(nil)))
require.False(t, cache.add(NewDMLItem(nil)))

barrier := newTestBarrier(1)
require.True(t, cache.forceAdd(NewBarrierItem(barrier)))

items, ok := cache.out().GetMultipleNoGroup(make([]WriterItem, 0, 3))
require.True(t, ok)
require.Len(t, items, 3)
require.True(t, barrier == items[2].Barrier)
}

func TestTxnCacheForceAddFailsWhenClosed(t *testing.T) {
cache := newTxnCache(TxnCacheOption{Count: 1, Size: 1, BlockStrategy: BlockStrategyWaitEmpty})
cache.out().Close()

barrier := newTestBarrier(1)
require.False(t, cache.forceAdd(NewBarrierItem(barrier)))
barrier.Fail(errors.New("closed"))
require.Error(t, barrier.err)
Comment on lines +155 to +162

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

This test masks the barrier-failure path.

Lines 160-162 only prove that forceAdd returns false; the test then sets the barrier error itself. That means it still passes if the production close path forgets to propagate the failure to the barrier, so the regression you care about remains untested. Please drive the failure through the real caller that reacts to forceAdd == false and assert on that outcome instead.

As per coding guidelines, **/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/sink/mysql/causality/barrier_test.go` around lines 155 -
162, The test currently calls cache.forceAdd directly and then sets barrier.Fail
itself, masking the production behavior; instead, locate and invoke the real
production caller that calls forceAdd (search for usages of forceAdd) — call
that higher-level method with NewBarrierItem(barrier) after cache.out().Close(),
then assert that barrier.err was set by that production path (verify barrier.err
is non-nil) rather than calling barrier.Fail manually; keep the test focused and
deterministic (use newTestBarrier, forceAdd caller, and require.Error on
barrier.err).

Source: Coding guidelines

}

func BenchmarkTxnCacheAddDMLItem(b *testing.B) {
cache := newTxnCache(TxnCacheOption{Count: 1, Size: 4096, BlockStrategy: BlockStrategyWaitAvailable})
buffer := make([]WriterItem, 0, 1024)
b.ReportAllocs()

for i := 0; i < b.N; i++ {
if !cache.add(NewDMLItem(nil)) {
b.Fatal("cache unexpectedly rejected DML item")
}
if (i+1)%cap(buffer) == 0 {
var ok bool
buffer, ok = cache.out().GetMultipleNoGroup(buffer)
if !ok {
b.Fatal("cache closed")
}
buffer = buffer[:0]
}
}
}
90 changes: 79 additions & 11 deletions downstreamadapter/sink/mysql/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import (
"context"
"sync"
"time"

"github.com/pingcap/log"
Expand Down Expand Up @@ -51,6 +52,9 @@

changefeedID common.ChangeFeedID
metricConflictDetectDuration prometheus.Observer

admissionMu sync.Mutex

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using a standard sync.Mutex for admissionMu serializes all concurrent Add calls on the DML ingestion hot path. Since barriers are rare, changing this to sync.RWMutex allows concurrent Add calls to proceed in parallel under a read lock, significantly improving throughput.

Suggested change
admissionMu sync.Mutex
admissionMu sync.RWMutex

activeFence *Node
}

// New creates a new ConflictDetector.
Expand Down Expand Up @@ -99,6 +103,9 @@
// NOTE: if multiple threads access this concurrently,
// ConflictKeys must be sorted by the slot index.
func (d *ConflictDetector) Add(event *commonEvent.DMLEvent) {
d.admissionMu.Lock()
defer d.admissionMu.Unlock()
Comment on lines +106 to +107

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Acquire a read lock (RLock) instead of a write lock (Lock) to allow concurrent Add calls to run in parallel. This avoids serializing the DML ingestion hot path.

Suggested change
d.admissionMu.Lock()
defer d.admissionMu.Unlock()
d.admissionMu.RLock()
defer d.admissionMu.RUnlock()


start := time.Now()
hashes := ConflictKeys(event)
node := d.slots.AllocNode(hashes)
Expand All @@ -118,27 +125,88 @@
node.RandCacheID = func() int64 {
return d.nextCacheID.Add(1) % int64(len(d.resolvedTxnCaches))
}
node.OnNotified = func(callback func()) {
if !d.notifyGuardWaitGroup.AddIf(func() bool { return !d.notifyClosed.Load() }) {
return
}
defer d.notifyGuardWaitGroup.Done()

d.notifiedNodes.Push(callback)
}
d.slots.Add(node)
node.OnNotified = d.onNodeNotified
extraDependencies := d.activeFenceDependency()
d.slots.AddWithDependencies(node, extraDependencies)
}

// sendToCache should not call txn.Callback if it returns an error.
func (d *ConflictDetector) sendToCache(event *commonEvent.DMLEvent, id int64) bool {
cache := d.resolvedTxnCaches[id]
ok := cache.add(event)
ok := cache.add(NewDMLItem(event))
return ok
}

// BroadcastBarrier installs a removal-only fence after all DMLs admitted so far
// and broadcasts one barrier token to every writer queue after that fence resolves.
func (d *ConflictDetector) BroadcastBarrier(barrier Barrier) error {
if d.notifyClosed.Load() {
return errors.ErrMySQLTxnError.GenWithStackByArgs("broadcast barrier on closed conflict detector")
}

d.admissionMu.Lock()

dependencyNodes := make(map[int64]*Node)
for _, node := range d.slots.SnapshotTailNodes() {
dependencyNodes[node.nodeID()] = node
}
for id, node := range d.activeFenceDependency() {
dependencyNodes[id] = node

Check failure on line 154 in downstreamadapter/sink/mysql/causality/conflict_detector.go

View workflow job for this annotation

GitHub Actions / Check

mapsloop: Replace m[k]=v loop with maps.Copy (modernize)
}

fence := &Node{
id: genNextNodeID(),
assignedTo: unassigned,
resolveByRemovalOnly: true,
}
fence.TrySendToTxnCache = func(cacheID) bool {
item := NewBarrierItem(barrier)
for _, cache := range d.resolvedTxnCaches {
if !cache.forceAdd(item) {
err := errors.ErrMySQLTxnError.GenWithStackByArgs("broadcast barrier to closed DML writer queue")
go barrier.Fail(err)
return true
}
}
return true
}
fence.RandCacheID = func() cacheID { return 0 }
fence.OnNotified = d.onNodeNotified

d.activeFence = fence
fence.dependOn(dependencyNodes)
d.admissionMu.Unlock()

barrier.OnDone(func() {
d.admissionMu.Lock()
if d.activeFence == fence {
d.activeFence = nil
}
d.admissionMu.Unlock()
fence.remove()
})
return nil
}

func (d *ConflictDetector) activeFenceDependency() map[int64]*Node {
if d.activeFence == nil {
return nil
}
return map[int64]*Node{d.activeFence.nodeID(): d.activeFence}
}

func (d *ConflictDetector) onNodeNotified(callback func()) {
if !d.notifyGuardWaitGroup.AddIf(func() bool { return !d.notifyClosed.Load() }) {
return
}
defer d.notifyGuardWaitGroup.Done()

d.notifiedNodes.Push(callback)
}

// GetOutChByCacheID returns the output channel by cacheID.
// Note txns in single cache should be executed sequentially.
func (d *ConflictDetector) GetOutChByCacheID(id int) *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] {
func (d *ConflictDetector) GetOutChByCacheID(id int) *chann.UnlimitedChannel[WriterItem, any] {
return d.resolvedTxnCaches[id].out()
}

Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/sink/mysql/causality/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/stretchr/testify/require"
)

func testChangefeedID() common.ChangeFeedID {
return common.NewChangefeedID4Test("test", "test")
}

func TestGenKeyListUsesSchemaIndexWithVirtualGeneratedColumn(t *testing.T) {
t.Parallel()

Expand Down
19 changes: 13 additions & 6 deletions downstreamadapter/sink/mysql/causality/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type Node struct {
// Following fields are protected by `mu`.
mu sync.Mutex

assignedTo cacheID
removed bool
assignedTo cacheID
removed bool
resolveByRemovalOnly bool

// dependers is an ordered set for all nodes that
// conflict with the current node.
Expand Down Expand Up @@ -99,7 +100,7 @@ func (n *Node) dependOn(dependencyNodes map[int64]*Node) {
target.mu.Lock()
defer target.mu.Unlock()

if target.assignedTo != unassigned {
if target.assignedTo != unassigned && !n.resolveByRemovalOnly && !target.resolveByRemovalOnly {
// The target has already been assigned to a cache.
// In this case, record the cache ID in `resolvedList`, and this node
// probably can be sent to the same cache and executed sequentially.
Expand Down Expand Up @@ -170,9 +171,11 @@ func (n *Node) tryAssignTo(cacheID int64) bool {
if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
resolvedDependencies := atomic.AddInt32(&node.resolvedDependencies, 1)
atomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo)
node.OnNotified(node.maybeResolve)
if !n.resolveByRemovalOnly {
resolvedDependencies := atomic.AddInt32(&node.resolvedDependencies, 1)
atomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo)
node.OnNotified(node.maybeResolve)
}
return true
})
}
Expand Down Expand Up @@ -215,6 +218,10 @@ func (n *Node) tryResolve() (int64, bool) {
return assignedToAny, true
}

if n.resolveByRemovalOnly {
return unassigned, false
}

resolvedDependencies := atomic.LoadInt32(&n.resolvedDependencies)
if resolvedDependencies == n.totalDependencies {
firstDep := atomic.LoadInt64(&n.resolvedList[0])
Expand Down
Loading
Loading