diff --git a/deployments/Dockerfile b/deployments/Dockerfile index d2cb46c6b0..355c1189df 100644 --- a/deployments/Dockerfile +++ b/deployments/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /go/src/github.com/pingcap/ticdc COPY . . ENV CDC_ENABLE_VENDOR=0 ARG NEXT_GEN -RUN if [ -n "${NEXT_GEN}" ]; then NEXT_GEN=1 make cdc; else make cdc; fi +RUN if [ -n "${NEXT_GEN}" ]; then NEXT_GEN=1 LEGACY_SAFEPOINT=1 make cdc; else make cdc; fi FROM alpine:3.15 RUN apk add --no-cache tzdata bash curl socat diff --git a/design.md b/design.md new file mode 100644 index 0000000000..64fd453a10 --- /dev/null +++ b/design.md @@ -0,0 +1,366 @@ +# MySQL Sink DML Barrier Design + +## Problem + +Issue #5353 requires MySQL sink block events, especially syncpoints, to wait until all DMLs enqueued before the block event are actually flushed to downstream. The current direct fix tracks every DML with a `PostFlush` callback in `downstreamadapter/sink/mysql/sink.go` and makes `FlushDMLBeforeBlock` wait for those callbacks. + +That is correct on ordering, but it adds per-DML overhead even when syncpoint is disabled or block events are rare. The desired optimization is to pay the synchronization cost only when a block event needs a DML flush barrier. + +The new barrier must be careful about the existing MySQL sink pipeline. `Sink.AddDMLEvent` does not write directly to a worker queue. It first admits the DML into the conflict detector, and the conflict detector may delay assigning the DML to a writer queue until conflicting transactions are flushed or a blocked cache becomes available. A correct barrier therefore cannot only append tokens to the current writer queues; it must also order itself after DMLs that were admitted into the conflict detector but have not yet reached any writer queue. + +## Goals + +- Preserve the ordering guarantee: a block event must not be written or reported as flushed before earlier MySQL DMLs reach downstream. +- Avoid adding per-DML `PostFlush` callbacks only for block-event barriers. +- Keep the barrier local to MySQL sink internals. +- Return an error instead of hanging if a DML writer fails or the sink closes while a barrier is waiting. + +## Non-Goals + +- Do not change dispatcher block-event semantics. +- Do not change MySQL DML conflict detection rules. +- Do not change DML SQL generation or batching behavior. +- Do not make every sink implement the same barrier mechanism. + +## Current Flow + +The relevant dispatcher path is implemented in `downstreamadapter/dispatcher/basic_dispatcher.go`: + +```text +BasicDispatcher.DealWithBlockEvent + -> sink.FlushDMLBeforeBlock(blockEvent) + -> sink.WriteBlockEvent(blockEvent) or report WAITING status +``` + +`DealWithBlockEvent` calls `FlushDMLBeforeBlock` before both non-blocking block-event writes and blocking `WAITING` reports (`basic_dispatcher.go:1007` and `basic_dispatcher.go:1201`). This is the point where MySQL DMLs already accepted by the sink must be known to have reached downstream. + +For MySQL DMLs, the current flow is: + +```text +Sink.AddDMLEvent + -> create pendingDML and attach event.AddPostFlushFunc + -> conflictDetector.Add(event) + -> slots.Add(node) + -> node.TrySendToTxnCache(cacheID) + -> resolvedTxnCaches[cacheID].add(event) + -> runDMLWriter(ctx, cacheID) + -> inputCh.GetMultipleNoGroup(buffer) + -> dmlWriter[cacheID].Flush(events) + -> event.PostFlush() + -> pendingDML.complete(nil) +``` + +Important code locations: + +- `downstreamadapter/sink/mysql/sink.go:320-347`: current per-DML pending tracking and `FlushDMLBeforeBlock` wait. +- `downstreamadapter/sink/mysql/sink.go:216-302`: DML writer loop and batch flushing. +- `downstreamadapter/sink/mysql/causality/conflict_detector.go:101-129`: conflict-detector admission and slot-node callbacks. +- `downstreamadapter/sink/mysql/causality/txn_cache.go:45-119`: bounded writer queues and `BlockStrategyWaitEmpty` behavior. +- `pkg/sink/mysql/mysql_writer.go:208-255`: downstream DML flush and `PostFlush` invocation after successful execution. +- `pkg/common/event/dml_event.go:654-676`: `PostFlush` runs flush callbacks first and then calls idempotent `PostEnqueue` as a fallback wake-up path. + +`cfg.WorkerCount` creates both the number of MySQL DML writers and the number of conflict-detector output caches, so `cacheID == dmlWriter index`. + +## Existing Correctness Constraints + +The implementation must preserve these properties from the current pipeline: + +1. A DML slot node is removed only through the `PostFlush` callback registered in `ConflictDetector.Add` (`conflict_detector.go:106-108`). Conflict dependencies therefore represent downstream flush completion, not only queue assignment. +2. A DML can be admitted before a block event but remain outside writer queues if it conflicts with an earlier DML or if `BlockStrategyWaitEmpty` rejects insertion while the cache is blocked. +3. `runDMLWriter` receives batches from `UnlimitedChannel.GetMultipleNoGroup`. If a batch contains both DML items and barrier tokens, the writer must process the batch in exact queue order. +4. `Writer.Flush` calls `event.PostFlush()` only after successful downstream execution. A barrier acknowledgement must happen after the same success boundary for all preceding DMLs. +5. `FlushDMLBeforeBlock` has no context parameter in the sink interface, so close and writer-error paths must actively fail waiters. + +## Proposed Design: Broadcast Barrier Token + +Add a MySQL-sink-only barrier protocol that broadcasts a barrier token to all DML writer queues through the conflict detector. `FlushDMLBeforeBlock` waits for all barrier acknowledgements. + +High-level flow: + +```text +FlushDMLBeforeBlock(blockEvent) + -> barrier := newDMLBarrier(workerCount) + -> conflictDetector.BroadcastBarrier(barrier) + -> wait barrier until all writer queues ack or one returns error + +runDMLWriter(ctx, idx) + -> receive DML events and barrier tokens from cache idx + -> flush all DML events before the barrier token + -> ack barrier from writer idx +``` + +This makes the barrier an in-band queue item. Because each DML writer consumes its queue sequentially, receiving the barrier means all earlier events in that writer queue have already been flushed or are being flushed immediately before acknowledgement. + +The key implementation detail is that `BroadcastBarrier` must not bypass the conflict detector's unresolved DML nodes. It should create a conflict-detector fence that is ordered after all DML nodes already admitted to the detector, and only then broadcast one in-band token to every writer queue. + +The resulting shape is: + +```text + existing DML path +AddDMLEvent --------------------------------------+ + | + v + conflict slots + | +FlushDMLBeforeBlock | + -> BroadcastBarrier --------------------> barrier fence + | + all prior DML flushed + | + v + cache 0 cache 1 ... cache N + | | | + v v v + writer 0 writer 1 ... writer N + | | | + +---- barrier acks ---+ +``` + +## Interface Shape + +### Writer Queue Item + +The existing `txnCache` only carries `*commonEvent.DMLEvent`. Generalize the item type to a small value struct instead of using `interface{}` so normal DMLs do not allocate an extra wrapper object: + +```go +type writerItem struct { + dml *commonEvent.DMLEvent + barrier *dmlBarrier +} + +func newDMLItem(event *commonEvent.DMLEvent) writerItem { + return writerItem{dml: event} +} + +func newBarrierItem(barrier *dmlBarrier) writerItem { + return writerItem{barrier: barrier} +} +``` + +Then change the cache interface and output channel types: + +```go +type txnCache interface { + add(item writerItem) bool + forceAdd(item writerItem) bool + out() *chann.UnlimitedChannel[writerItem, any] +} +``` + +`add` preserves the current cache-full semantics for DMLs. `forceAdd` is only for barrier tokens after the conflict-detector fence is resolved; it appends the token even when `BlockStrategyWaitEmpty` is currently blocked. This avoids a deadlock where the barrier is waiting for writer progress, while the blocked-cache policy prevents the token needed to observe that progress from entering the queue. + +`utils/chann.UnlimitedChannel.Push` currently logs and returns nothing when the channel is closed. `forceAdd` cannot reliably report broadcast failure with that API. The implementation should add a narrow cache-level or channel-level push method that returns whether the item was accepted, and `forceAdd` should return false when the output channel has already been closed. Do not infer success from absence of a log. + +The normal hot path remains simple: + +- `ConflictDetector.Add` creates `writerItem{dml: event}` as a stack value. +- No barrier object or `PostFlush` callback is attached for block-event waiting. +- The DML writer reuses preallocated buffers when converting queued `writerItem` values into `[]*commonEvent.DMLEvent` batches for `Writer.Flush`. + +### Conflict Detector + +Add a broadcast method: + +```go +func (d *ConflictDetector) BroadcastBarrier(barrier *dmlBarrier) error +``` + +`BroadcastBarrier` must define a precise snapshot point. All DMLs whose `ConflictDetector.Add` call completes before the snapshot must be ordered before the barrier. DMLs admitted after the snapshot are not required by that barrier and must not be allowed to overtake the barrier tokens. + +Implement this with a lightweight barrier gate inside the conflict detector: + +1. Serialize `Add` and `BroadcastBarrier` admission with a small mutex in `ConflictDetector`. This mutex protects only admission and barrier snapshot state; it must not be held during downstream IO or barrier waiting. +2. When broadcasting, create a barrier fence node that depends on the current in-flight DML frontier. The frontier is the set of slot tail nodes currently stored in `Slots`; those nodes are removed only after downstream `PostFlush`, so depending on them is sufficient to wait for all earlier conflicting chains. +3. Publish the barrier fence as the current global fence before releasing the admission mutex. DMLs admitted after this point should depend on the current global fence before normal conflict-key resolution, so they cannot be assigned to writer queues before the barrier tokens. +4. When the barrier fence resolves, append one `writerItem{barrier: barrier}` to every resolved transaction cache with `forceAdd`. +5. When every writer acknowledges the barrier, remove the global fence and notify DML nodes that depended on it. + +This makes the barrier a flush-order fence, not a DML conflict rule change. DML conflict-key detection remains unchanged for DML-to-DML ordering; the barrier only creates a temporary global ordering point around a block event. + +The `Slots` helper needed by the barrier should be explicit and small: + +```go +func (s *Slots) SnapshotTailNodes() []*Node +``` + +`SnapshotTailNodes` must lock slot mutexes in stable slot order, collect unique tail nodes from `slot.nodes`, and return them. The number of slots is fixed (`defaultConflictDetectorSlots`, currently `16 * 1024`), and barriers are rare, so this O(slot-count) scan is acceptable outside the DML hot path. The helper must not run on every DML. + +### Barrier + +The barrier should collect one acknowledgement per DML writer: + +```go +type dmlBarrier struct { + done chan struct{} + // mutex-protected first error, remaining ack count, and ack bitmap +} +``` + +Required operations: + +- `Ack(writerID int)` for successful writer flush up to the barrier. +- `Fail(err error)` for writer error, broadcast failure, sink close, or run-context cancellation. +- `Wait() error` for `FlushDMLBeforeBlock`. + +`Ack` must be idempotent per writer. A small bitmap or `[]bool` sized by `workerCount` is sufficient because the worker count is fixed for the sink. Duplicate acknowledgements should be ignored after the first successful acknowledgement from the same writer. + +`Fail` must unblock all waiters and preserve the first error. Completion must be idempotent to tolerate races between normal acknowledgements, writer failures, and sink close. If `workerCount == 0` is ever observed, sink construction should fail earlier because the existing MySQL sink requires at least one DML writer. + +`FlushDMLBeforeBlock` should register the barrier in the sink's outstanding-barrier set before calling `BroadcastBarrier`, unregister it after `Wait` returns, and fail it immediately if broadcasting returns an error. + +## MySQL Sink Writer Loop + +`runDMLWriter` currently batches only `*commonEvent.DMLEvent` values. After introducing `writerItem`, it must scan each returned batch in queue order and flush only contiguous DML ranges before acknowledging a barrier. + +Conceptual loop: + +```text +itemBuffer := make([]writerItem, 0, maxTxnRows) +dmlBuffer := make([]*commonEvent.DMLEvent, 0, maxTxnRows) + +for { + items, ok := inputCh.GetMultipleNoGroup(itemBuffer) + if !ok: + fail outstanding barriers and return + + for each item in items in order: + if item is DML: + append item.dml to dmlBuffer + if dmlBuffer row count reaches maxTxnRows: + flush dmlBuffer + clear dmlBuffer + + if item is barrier: + flush dmlBuffer + clear dmlBuffer + item.barrier.Ack(writerID) + + flush dmlBuffer + clear dmlBuffer + clear itemBuffer +} +``` + +The writer must acknowledge a barrier only after `Writer.Flush` returns nil for all DMLs before that token. If `Writer.Flush` returns an error while flushing DMLs before a barrier, the writer must call `barrier.Fail(err)` before returning the writer error. In practice, the sink should also call `failOutstandingBarriers(err)` on any writer error, because the writer might fail before it has dequeued a waiting barrier token. + +Metrics should keep the same meaning as today: + +- `WorkerEventRowCount` observes only DML events. +- `WorkerHandledRows` counts only flushed DML rows. +- `WorkerFlushDuration`, `WorkerBatchFlushDuration`, and `WorkerTotalDuration` measure DML flush work; barrier-only batches should not inflate handled-row metrics. + +The normal DML path should avoid new per-DML heap allocations. Reuse `itemBuffer` and `dmlBuffer` for the lifetime of the writer goroutine. Do not log per barrier on the normal success path; use tests and metrics rather than high-cardinality logs. + +## Ordering Guarantees + +The design relies on these invariants: + +1. Dispatcher calls `FlushDMLBeforeBlock` after enqueueing earlier DMLs and before writing or reporting the block event. +2. `BroadcastBarrier` defines a snapshot after all completed `ConflictDetector.Add` calls that happened before the barrier. +3. The conflict-detector barrier fence depends on every in-flight DML frontier node visible at the snapshot. Because DML slot nodes are removed only after `PostFlush`, the fence cannot resolve before those DMLs reach downstream. +4. DMLs admitted after the snapshot depend on the active barrier fence, so they cannot be assigned to writer queues ahead of the barrier token. +5. Once the fence resolves, `BroadcastBarrier` appends one barrier token to every writer queue. +6. Each DML writer processes its queue in order. +7. A writer acknowledges the barrier only after all DMLs before that barrier token have reached downstream successfully. +8. `FlushDMLBeforeBlock` returns only after every writer has acknowledged the barrier, so the subsequent block event cannot overtake earlier DMLs. + +This is stronger than waiting for `PostEnqueue` and equivalent to waiting for `PostFlush` for all DMLs before the block event, but without registering a block-event pending callback on every DML. + +## Error and Close Semantics + +The barrier must never wait forever. + +- If `BroadcastBarrier` cannot install the fence or cannot append all tokens because the detector is closed, `FlushDMLBeforeBlock` must fail the barrier and return an error immediately. +- If any DML writer fails, the sink must fail all outstanding barriers with the writer error before returning from `runDMLWriter`. +- If a writer fails while flushing DMLs immediately before a barrier token, it should call `barrier.Fail(err)` directly and then return the same error. +- If the sink closes while a barrier is waiting, `Close` must fail all outstanding barriers with `context.Canceled`, close the conflict detector notification path, close writers, and close the DB as it does today. +- If the `Run` context is canceled, `runDMLWriter` should fail outstanding barriers with `ctx.Err()` before returning. +- Barrier completion must be idempotent to tolerate races between writer failure, sink close, broadcast failure, and normal acknowledgements. + +Use repository error conventions for newly generated errors. Existing writer errors are already wrapped by the MySQL writer path and can be propagated. For new barrier-specific internal errors, use an existing suitable predefined error such as `errors.ErrMySQLTxnError.GenWithStackByArgs(...)` or add a narrowly named predefined error if the implementation needs a distinct RFC code. Do not introduce bare `fmt.Errorf` or unwrapped third-party errors in new code. + +## Cache Full Behavior + +The current MySQL sink creates causality caches with `BlockStrategyWaitEmpty` (`downstreamadapter/sink/mysql/sink.go:175-180`). With this strategy, a normal DML insertion can return false when a cache is blocked until empty. + +Barrier tokens should use a separate `forceAdd` path after the barrier fence resolves: + +- Normal DMLs keep using `add`, so existing cache pressure behavior is unchanged. +- Barrier tokens use `forceAdd`, because they are control markers and must be able to enter every queue to let waiting block events finish. +- `forceAdd` must still fail if the underlying channel is closed, so `FlushDMLBeforeBlock` can return an error instead of hanging. +- The channel or cache API must expose accepted-or-closed status for this path; the current `UnlimitedChannel.Push` return type is insufficient for this requirement. + +Allowing a small number of barrier tokens to bypass the cache-size policy is safe because the number of tokens is bounded by `workerCount * activeBarrierCount`, and block events are rare compared with DML events. This is simpler and safer than retrying through the existing notification loop, which can deadlock if no new DML notification is produced after the cache becomes empty. + +## Multiple and Concurrent Barriers + +Multiple block events can call `FlushDMLBeforeBlock` concurrently through the block-event executor paths. The implementation should support independent barriers rather than using a single global waiter. + +Rules: + +- Each call creates a distinct `dmlBarrier` with its own acknowledgement state and error. +- `BroadcastBarrier` serializes barrier fence installation so barriers preserve call order. +- If barrier B is installed while barrier A is active, B should depend on A's fence. This keeps token order stable in every writer queue. +- Failing the sink or detector fails all outstanding barriers. +- Successfully completing one barrier must not close or mutate another barrier's state. + +This keeps the semantics equivalent to separate snapshots while avoiding a shared condition variable that can miss notifications. + +## Performance Considerations + +The optimization target is the steady-state DML hot path when no block-event barrier is active. + +Expected hot-path properties: + +- No per-DML block-barrier `PostFlush` callback is registered in `Sink.AddDMLEvent`. +- No per-DML barrier object is allocated. +- `writerItem` is a small value type stored directly in the existing unlimited channel. +- DML writer buffers are allocated once per writer goroutine and reused. +- The O(slot-count) `SnapshotTailNodes` scan runs only when `FlushDMLBeforeBlock` is called. + +The implementation should include a microbenchmark or reuse existing causality benchmarks to compare DML-only throughput before and after the queue item type change. The acceptable result is no material regression in normal DML throughput and allocation count. If the value-struct queue introduces measurable allocation growth, the implementation should first optimize buffer reuse and escape behavior rather than adding broader abstractions. + +## Advantages + +- No per-DML `PostFlush` callback is needed for block-event ordering. +- Runtime overhead is paid only when `FlushDMLBeforeBlock` is called, except for the small queue item shape change. +- The barrier is aligned with existing writer queues, so it naturally waits for each writer's prior DMLs. +- The conflict-detector fence closes the correctness gap for DMLs that are admitted but not yet assigned to a writer queue. +- The design handles DMLs distributed across all workers, including conflict-free round-robin DMLs. + +## Risks + +- Generalizing the queue item type changes the causality cache and writer loop internals. The change should stay inside `downstreamadapter/sink/mysql` and `downstreamadapter/sink/mysql/causality`. +- Barrier latency is bounded by the slowest DML writer and by earlier unresolved conflict chains. +- Incorrect handling of mixed DML/barrier batches can break ordering. +- Queue-close, writer-error, and sink-close races must be tested to avoid deadlocks. +- If the barrier fence does not cover DMLs already admitted to the conflict detector, block events can still overtake unresolved DMLs. +- If post-barrier DMLs are not ordered behind an active barrier fence, they can enter writer queues before barrier tokens and create unnecessary latency or incorrect ordering. + +## Test Plan + +Add focused tests for: + +1. A DML below syncpoint primary ts is delayed in a MySQL DML writer; syncpoint waits until the barrier ack. +2. DMLs routed to multiple DML writers all flush before the barrier completes. +3. A DML writer error before barrier ack makes `FlushDMLBeforeBlock` return an error instead of hanging. +4. Sink close while a barrier is waiting unblocks the wait with an error. +5. Barrier ordering with a mixed queue: DML, barrier, DML. The first DML must flush before ack; the second DML must not be required for that barrier. +6. Cache-full or blocked-cache behavior does not deadlock `BroadcastBarrier`. +7. A DML admitted to `ConflictDetector.Add` before `BroadcastBarrier`, but not yet assigned to any writer queue because it conflicts with an earlier DML, is flushed before the barrier completes. +8. A DML admitted after `BroadcastBarrier` does not overtake the active barrier token in any writer queue. +9. Two concurrent `FlushDMLBeforeBlock` calls complete independently and preserve barrier token order in every writer. +10. Closing the conflict detector or writer queue while broadcasting returns an error instead of leaving `Wait` blocked. + +The existing tests `TestMysqlSinkFlushDMLBeforeBlockReturnsOnDMLError` and `TestMysqlSinkFlushDMLBeforeBlockWaitsForDMLPostFlush` in `downstreamadapter/sink/mysql/sink_test.go` should be adapted to the barrier implementation rather than removed. The failpoint regression around delayed DML `PostFlush` remains valuable because it verifies that the barrier waits for downstream flush completion, not only SQL execution or queue dequeue. + +## Migration Plan + +1. Introduce `writerItem`, update `txnCache` channel types, and keep normal DML insertion behavior unchanged. +2. Add `dmlBarrier` and sink-level outstanding-barrier tracking with idempotent ack/fail semantics. +3. Add `Slots.SnapshotTailNodes` and the conflict-detector barrier fence used by `BroadcastBarrier`. +4. Add `ConflictDetector.BroadcastBarrier` without changing DML-to-DML conflict-key rules. +5. Update `runDMLWriter` to process DML and barrier items in queue order while reusing buffers. +6. Replace per-DML pending callback tracking in `Sink.AddDMLEvent` and `FlushDMLBeforeBlock` with broadcast barrier waiting. +7. Keep the failpoint regression from issue #5353 and add error, close, cache-full, unresolved-conflict, and concurrent-barrier tests. +8. Run focused MySQL sink and causality tests, then run a DML-only benchmark or allocation check to confirm the normal hot path does not regress materially. diff --git a/downstreamadapter/sink/mysql/causality/barrier_test.go b/downstreamadapter/sink/mysql/causality/barrier_test.go new file mode 100644 index 0000000000..88769fa44a --- /dev/null +++ b/downstreamadapter/sink/mysql/causality/barrier_test.go @@ -0,0 +1,183 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package causality + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type testBarrier struct { + mu sync.Mutex + done chan struct{} + err error + remaining int + doneFuncs []func() +} + +func newTestBarrier(workerCount int) *testBarrier { + return &testBarrier{done: make(chan struct{}), remaining: workerCount} +} + +func (b *testBarrier) Ack(int) { + b.mu.Lock() + if b.remaining == 0 { + b.mu.Unlock() + return + } + b.remaining-- + if b.remaining > 0 { + b.mu.Unlock() + return + } + doneFuncs := b.doneFuncs + b.doneFuncs = nil + close(b.done) + b.mu.Unlock() + for _, f := range doneFuncs { + f() + } +} + +func (b *testBarrier) Fail(err error) { + b.mu.Lock() + if b.remaining == 0 { + b.mu.Unlock() + return + } + b.err = err + b.remaining = 0 + doneFuncs := b.doneFuncs + b.doneFuncs = nil + close(b.done) + b.mu.Unlock() + for _, f := range doneFuncs { + f() + } +} + +func (b *testBarrier) OnDone(f func()) { + b.mu.Lock() + if b.remaining == 0 { + b.mu.Unlock() + f() + return + } + b.doneFuncs = append(b.doneFuncs, f) + b.mu.Unlock() +} + +func TestBroadcastBarrierEnqueuesOneTokenPerWriter(t *testing.T) { + detector := New(4, TxnCacheOption{Count: 2, Size: 1, BlockStrategy: BlockStrategyWaitEmpty}, testChangefeedID()) + barrier := newTestBarrier(2) + + require.NoError(t, detector.BroadcastBarrier(barrier)) + + for i := 0; i < 2; i++ { + items, ok := detector.GetOutChByCacheID(i).GetMultipleNoGroup(make([]WriterItem, 0, 1)) + require.True(t, ok) + require.Len(t, items, 1) + require.True(t, barrier == items[0].Barrier) + items[0].Barrier.Ack(i) + } + + require.Eventually(t, func() bool { + select { + case <-barrier.done: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) +} + +func TestBroadcastBarrierReturnsErrorWhenDetectorClosed(t *testing.T) { + detector := New(4, TxnCacheOption{Count: 1, Size: 1, BlockStrategy: BlockStrategyWaitEmpty}, testChangefeedID()) + detector.CloseNotifiedNodes() + + err := detector.BroadcastBarrier(newTestBarrier(1)) + require.Error(t, err) +} + +func TestRemovalOnlyFenceDoesNotResolveDependersOnAssignment(t *testing.T) { + assigned := false + fence := &Node{id: genNextNodeID(), assignedTo: unassigned, resolveByRemovalOnly: true} + fence.RandCacheID = func() cacheID { return 0 } + fence.TrySendToTxnCache = func(cacheID) bool { return true } + fence.OnNotified = func(callback func()) { callback() } + + depender := &Node{id: genNextNodeID(), assignedTo: unassigned} + depender.RandCacheID = func() cacheID { return 0 } + depender.TrySendToTxnCache = func(cacheID) bool { + assigned = true + return true + } + depender.OnNotified = func(callback func()) { callback() } + + depender.dependOn(map[int64]*Node{fence.nodeID(): fence}) + fence.maybeResolve() + require.False(t, assigned) + + fence.remove() + require.True(t, assigned) +} + +func TestTxnCacheForceAddBypassesBlockedCache(t *testing.T) { + cache := newTxnCache(TxnCacheOption{Count: 1, Size: 1, BlockStrategy: BlockStrategyWaitEmpty}) + require.True(t, cache.add(NewDMLItem(nil))) + require.True(t, cache.add(NewDMLItem(nil))) + require.False(t, cache.add(NewDMLItem(nil))) + + barrier := newTestBarrier(1) + require.True(t, cache.forceAdd(NewBarrierItem(barrier))) + + items, ok := cache.out().GetMultipleNoGroup(make([]WriterItem, 0, 3)) + require.True(t, ok) + require.Len(t, items, 3) + require.True(t, barrier == items[2].Barrier) +} + +func TestTxnCacheForceAddFailsWhenClosed(t *testing.T) { + cache := newTxnCache(TxnCacheOption{Count: 1, Size: 1, BlockStrategy: BlockStrategyWaitEmpty}) + cache.out().Close() + + barrier := newTestBarrier(1) + require.False(t, cache.forceAdd(NewBarrierItem(barrier))) + barrier.Fail(errors.New("closed")) + require.Error(t, barrier.err) +} + +func BenchmarkTxnCacheAddDMLItem(b *testing.B) { + cache := newTxnCache(TxnCacheOption{Count: 1, Size: 4096, BlockStrategy: BlockStrategyWaitAvailable}) + buffer := make([]WriterItem, 0, 1024) + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if !cache.add(NewDMLItem(nil)) { + b.Fatal("cache unexpectedly rejected DML item") + } + if (i+1)%cap(buffer) == 0 { + var ok bool + buffer, ok = cache.out().GetMultipleNoGroup(buffer) + if !ok { + b.Fatal("cache closed") + } + buffer = buffer[:0] + } + } +} diff --git a/downstreamadapter/sink/mysql/causality/conflict_detector.go b/downstreamadapter/sink/mysql/causality/conflict_detector.go index 29296e2f6f..46d81b0dbc 100644 --- a/downstreamadapter/sink/mysql/causality/conflict_detector.go +++ b/downstreamadapter/sink/mysql/causality/conflict_detector.go @@ -15,6 +15,7 @@ package causality import ( "context" + "sync" "time" "github.com/pingcap/log" @@ -51,6 +52,9 @@ type ConflictDetector struct { changefeedID common.ChangeFeedID metricConflictDetectDuration prometheus.Observer + + admissionMu sync.Mutex + activeFence *Node } // New creates a new ConflictDetector. @@ -99,6 +103,9 @@ func (d *ConflictDetector) Run(ctx context.Context) error { // NOTE: if multiple threads access this concurrently, // ConflictKeys must be sorted by the slot index. func (d *ConflictDetector) Add(event *commonEvent.DMLEvent) { + d.admissionMu.Lock() + defer d.admissionMu.Unlock() + start := time.Now() hashes := ConflictKeys(event) node := d.slots.AllocNode(hashes) @@ -118,27 +125,88 @@ func (d *ConflictDetector) Add(event *commonEvent.DMLEvent) { node.RandCacheID = func() int64 { return d.nextCacheID.Add(1) % int64(len(d.resolvedTxnCaches)) } - node.OnNotified = func(callback func()) { - if !d.notifyGuardWaitGroup.AddIf(func() bool { return !d.notifyClosed.Load() }) { - return - } - defer d.notifyGuardWaitGroup.Done() - - d.notifiedNodes.Push(callback) - } - d.slots.Add(node) + node.OnNotified = d.onNodeNotified + extraDependencies := d.activeFenceDependency() + d.slots.AddWithDependencies(node, extraDependencies) } // sendToCache should not call txn.Callback if it returns an error. func (d *ConflictDetector) sendToCache(event *commonEvent.DMLEvent, id int64) bool { cache := d.resolvedTxnCaches[id] - ok := cache.add(event) + ok := cache.add(NewDMLItem(event)) return ok } +// BroadcastBarrier installs a removal-only fence after all DMLs admitted so far +// and broadcasts one barrier token to every writer queue after that fence resolves. +func (d *ConflictDetector) BroadcastBarrier(barrier Barrier) error { + if d.notifyClosed.Load() { + return errors.ErrMySQLTxnError.GenWithStackByArgs("broadcast barrier on closed conflict detector") + } + + d.admissionMu.Lock() + + dependencyNodes := make(map[int64]*Node) + for _, node := range d.slots.SnapshotTailNodes() { + dependencyNodes[node.nodeID()] = node + } + for id, node := range d.activeFenceDependency() { + dependencyNodes[id] = node + } + + fence := &Node{ + id: genNextNodeID(), + assignedTo: unassigned, + resolveByRemovalOnly: true, + } + fence.TrySendToTxnCache = func(cacheID) bool { + item := NewBarrierItem(barrier) + for _, cache := range d.resolvedTxnCaches { + if !cache.forceAdd(item) { + err := errors.ErrMySQLTxnError.GenWithStackByArgs("broadcast barrier to closed DML writer queue") + go barrier.Fail(err) + return true + } + } + return true + } + fence.RandCacheID = func() cacheID { return 0 } + fence.OnNotified = d.onNodeNotified + + d.activeFence = fence + fence.dependOn(dependencyNodes) + d.admissionMu.Unlock() + + barrier.OnDone(func() { + d.admissionMu.Lock() + if d.activeFence == fence { + d.activeFence = nil + } + d.admissionMu.Unlock() + fence.remove() + }) + return nil +} + +func (d *ConflictDetector) activeFenceDependency() map[int64]*Node { + if d.activeFence == nil { + return nil + } + return map[int64]*Node{d.activeFence.nodeID(): d.activeFence} +} + +func (d *ConflictDetector) onNodeNotified(callback func()) { + if !d.notifyGuardWaitGroup.AddIf(func() bool { return !d.notifyClosed.Load() }) { + return + } + defer d.notifyGuardWaitGroup.Done() + + d.notifiedNodes.Push(callback) +} + // GetOutChByCacheID returns the output channel by cacheID. // Note txns in single cache should be executed sequentially. -func (d *ConflictDetector) GetOutChByCacheID(id int) *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] { +func (d *ConflictDetector) GetOutChByCacheID(id int) *chann.UnlimitedChannel[WriterItem, any] { return d.resolvedTxnCaches[id].out() } diff --git a/downstreamadapter/sink/mysql/causality/helper_test.go b/downstreamadapter/sink/mysql/causality/helper_test.go index 94417f2137..3b7f0d9ac2 100644 --- a/downstreamadapter/sink/mysql/causality/helper_test.go +++ b/downstreamadapter/sink/mysql/causality/helper_test.go @@ -26,6 +26,10 @@ import ( "github.com/stretchr/testify/require" ) +func testChangefeedID() common.ChangeFeedID { + return common.NewChangefeedID4Test("test", "test") +} + func TestGenKeyListUsesSchemaIndexWithVirtualGeneratedColumn(t *testing.T) { t.Parallel() diff --git a/downstreamadapter/sink/mysql/causality/node.go b/downstreamadapter/sink/mysql/causality/node.go index 58d71cb82a..5f49a5e549 100644 --- a/downstreamadapter/sink/mysql/causality/node.go +++ b/downstreamadapter/sink/mysql/causality/node.go @@ -65,8 +65,9 @@ type Node struct { // Following fields are protected by `mu`. mu sync.Mutex - assignedTo cacheID - removed bool + assignedTo cacheID + removed bool + resolveByRemovalOnly bool // dependers is an ordered set for all nodes that // conflict with the current node. @@ -99,7 +100,7 @@ func (n *Node) dependOn(dependencyNodes map[int64]*Node) { target.mu.Lock() defer target.mu.Unlock() - if target.assignedTo != unassigned { + if target.assignedTo != unassigned && !n.resolveByRemovalOnly && !target.resolveByRemovalOnly { // The target has already been assigned to a cache. // In this case, record the cache ID in `resolvedList`, and this node // probably can be sent to the same cache and executed sequentially. @@ -170,9 +171,11 @@ func (n *Node) tryAssignTo(cacheID int64) bool { if n.dependers != nil { // `mu` must be holded during accessing dependers. n.dependers.Ascend(func(node *Node) bool { - resolvedDependencies := atomic.AddInt32(&node.resolvedDependencies, 1) - atomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo) - node.OnNotified(node.maybeResolve) + if !n.resolveByRemovalOnly { + resolvedDependencies := atomic.AddInt32(&node.resolvedDependencies, 1) + atomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo) + node.OnNotified(node.maybeResolve) + } return true }) } @@ -215,6 +218,10 @@ func (n *Node) tryResolve() (int64, bool) { return assignedToAny, true } + if n.resolveByRemovalOnly { + return unassigned, false + } + resolvedDependencies := atomic.LoadInt32(&n.resolvedDependencies) if resolvedDependencies == n.totalDependencies { firstDep := atomic.LoadInt64(&n.resolvedList[0]) diff --git a/downstreamadapter/sink/mysql/causality/slot.go b/downstreamadapter/sink/mysql/causality/slot.go index 56039c3e2c..61423ce38f 100644 --- a/downstreamadapter/sink/mysql/causality/slot.go +++ b/downstreamadapter/sink/mysql/causality/slot.go @@ -67,8 +67,17 @@ func (s *Slots) AllocNode(hashes map[uint64]struct{}) *Node { // Add adds an elem to the slots and calls DependOn for elem. func (s *Slots) Add(elem *Node) { + s.AddWithDependencies(elem, nil) +} + +// AddWithDependencies adds elem to the slots and makes it also depend on extra +// nodes that are outside normal conflict-key detection, such as barrier fences. +func (s *Slots) AddWithDependencies(elem *Node, extraDependencies map[int64]*Node) { hashes := elem.sortedKeysHash dependencyNodes := make(map[int64]*Node, len(hashes)) + for id, node := range extraDependencies { + dependencyNodes[id] = node + } var lastSlot uint64 = math.MaxUint64 for _, hash := range hashes { @@ -110,6 +119,26 @@ func (s *Slots) Add(elem *Node) { } } +// SnapshotTailNodes returns the unique current slot tail nodes. It is used only +// by rare barrier broadcasts and intentionally scans all slots outside the DML +// hot path. +func (s *Slots) SnapshotTailNodes() []*Node { + nodes := make(map[int64]*Node) + for i := range s.slots { + s.slots[i].mu.Lock() + for _, node := range s.slots[i].nodes { + nodes[node.nodeID()] = node + } + s.slots[i].mu.Unlock() + } + + result := make([]*Node, 0, len(nodes)) + for _, node := range nodes { + result = append(result, node) + } + return result +} + // Remove removes an element from the Slots. func (s *Slots) Remove(elem *Node) { elem.remove() diff --git a/downstreamadapter/sink/mysql/causality/txn_cache.go b/downstreamadapter/sink/mysql/causality/txn_cache.go index ef74ac260f..09a3df0a86 100644 --- a/downstreamadapter/sink/mysql/causality/txn_cache.go +++ b/downstreamadapter/sink/mysql/causality/txn_cache.go @@ -21,6 +21,30 @@ import ( "github.com/pingcap/ticdc/utils/chann" ) +// Barrier is an in-band DML writer control marker. It is acknowledged by each +// writer only after all earlier DMLs in that writer queue have flushed. +type Barrier interface { + Ack(writerID int) + Fail(err error) + OnDone(func()) +} + +// WriterItem is the value carried by MySQL DML writer queues. +type WriterItem struct { + DML *commonEvent.DMLEvent + Barrier Barrier +} + +// NewDMLItem creates a writer queue item for a DML event. +func NewDMLItem(event *commonEvent.DMLEvent) WriterItem { + return WriterItem{DML: event} +} + +// NewBarrierItem creates a writer queue item for a barrier token. +func NewBarrierItem(barrier Barrier) WriterItem { + return WriterItem{Barrier: barrier} +} + const ( // BlockStrategyWaitAvailable means the cache will block until there is an available slot. BlockStrategyWaitAvailable BlockStrategy = "waitAvailable" @@ -45,9 +69,12 @@ type TxnCacheOption struct { // In current implementation, the conflict detector will push txn to the txnCache. type txnCache interface { // add adds a event to the Cache. - add(txn *commonEvent.DMLEvent) bool + add(item WriterItem) bool + // forceAdd appends a control item even when the cache is blocked, but fails + // if the underlying channel has already been closed. + forceAdd(item WriterItem) bool // out returns a unlimited channel to receive events which are ready to be executed. - out() *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + out() *chann.UnlimitedChannel[WriterItem, any] } func newTxnCache(opt TxnCacheOption) txnCache { @@ -57,9 +84,9 @@ func newTxnCache(opt TxnCacheOption) txnCache { switch opt.BlockStrategy { case BlockStrategyWaitAvailable: - return &boundedTxnCache{ch: chann.NewUnlimitedChannel[*commonEvent.DMLEvent, any](nil, nil), upperSize: opt.Size} + return &boundedTxnCache{ch: chann.NewUnlimitedChannel[WriterItem, any](nil, nil), upperSize: opt.Size} case BlockStrategyWaitEmpty: - return &boundedTxnCacheWithBlock{ch: chann.NewUnlimitedChannel[*commonEvent.DMLEvent, any](nil, nil), upperSize: opt.Size} + return &boundedTxnCacheWithBlock{ch: chann.NewUnlimitedChannel[WriterItem, any](nil, nil), upperSize: opt.Size} default: return nil } @@ -69,35 +96,39 @@ func newTxnCache(opt TxnCacheOption) txnCache { // //nolint:unused type boundedTxnCache struct { - ch *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + ch *chann.UnlimitedChannel[WriterItem, any] upperSize int } //nolint:unused -func (w *boundedTxnCache) add(txn *commonEvent.DMLEvent) bool { +func (w *boundedTxnCache) add(item WriterItem) bool { if w.ch.Len() > w.upperSize { return false } - w.ch.Push(txn) - return true + return w.ch.PushIfNotClosed(item) +} + +//nolint:unused +func (w *boundedTxnCache) forceAdd(item WriterItem) bool { + return w.ch.PushIfNotClosed(item) } //nolint:unused -func (w *boundedTxnCache) out() *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] { +func (w *boundedTxnCache) out() *chann.UnlimitedChannel[WriterItem, any] { return w.ch } // boundedTxnCacheWithBlock is a special boundedWorker. Once the cache // is full, it will block until all cached txns are consumed. type boundedTxnCacheWithBlock struct { - ch *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + ch *chann.UnlimitedChannel[WriterItem, any] //nolint:unused isBlocked atomic.Bool upperSize int } //nolint:unused -func (w *boundedTxnCacheWithBlock) add(txn *commonEvent.DMLEvent) bool { +func (w *boundedTxnCacheWithBlock) add(item WriterItem) bool { if w.isBlocked.Load() && w.ch.Len() <= 0 { w.isBlocked.Store(false) } @@ -107,13 +138,17 @@ func (w *boundedTxnCacheWithBlock) add(txn *commonEvent.DMLEvent) bool { w.isBlocked.CompareAndSwap(false, true) return false } - w.ch.Push(txn) - return true + return w.ch.PushIfNotClosed(item) } return false } //nolint:unused -func (w *boundedTxnCacheWithBlock) out() *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] { +func (w *boundedTxnCacheWithBlock) forceAdd(item WriterItem) bool { + return w.ch.PushIfNotClosed(item) +} + +//nolint:unused +func (w *boundedTxnCacheWithBlock) out() *chann.UnlimitedChannel[WriterItem, any] { return w.ch } diff --git a/downstreamadapter/sink/mysql/sink.go b/downstreamadapter/sink/mysql/sink.go index 958f669c29..791cdfeb99 100644 --- a/downstreamadapter/sink/mysql/sink.go +++ b/downstreamadapter/sink/mysql/sink.go @@ -18,6 +18,7 @@ import ( "database/sql" "net/url" "strconv" + "sync" "time" "github.com/pingcap/log" @@ -67,6 +68,91 @@ type Sink struct { // variable @@tidb_cdc_active_active_sync_stats and is shared by all DML writers. // It is nil when disabled or unsupported by downstream. activeActiveSyncStatsCollector *mysql.ActiveActiveSyncStatsCollector + + barrierMu sync.Mutex + outstandingBarriers map[*dmlBarrier]struct{} +} + +type dmlBarrier struct { + mu sync.Mutex + done chan struct{} + err error + remaining int + acked []bool + doneFuncs []func() +} + +func newDMLBarrier(workerCount int) *dmlBarrier { + barrier := &dmlBarrier{ + done: make(chan struct{}), + remaining: workerCount, + acked: make([]bool, workerCount), + } + if workerCount == 0 { + barrier.Fail(errors.ErrMySQLTxnError.GenWithStackByArgs("mysql DML barrier has no writers")) + } + return barrier +} + +func (b *dmlBarrier) Ack(writerID int) { + b.mu.Lock() + if b.err != nil || b.remaining == 0 || writerID < 0 || writerID >= len(b.acked) || b.acked[writerID] { + b.mu.Unlock() + return + } + b.acked[writerID] = true + b.remaining-- + if b.remaining > 0 { + b.mu.Unlock() + return + } + doneFuncs := b.doneFuncs + b.doneFuncs = nil + close(b.done) + b.mu.Unlock() + runBarrierDoneFuncs(doneFuncs) +} + +func (b *dmlBarrier) Fail(err error) { + if err == nil { + err = errors.ErrMySQLTxnError.GenWithStackByArgs("mysql DML barrier failed") + } + b.mu.Lock() + if b.err != nil || b.remaining == 0 { + b.mu.Unlock() + return + } + b.err = err + b.remaining = 0 + doneFuncs := b.doneFuncs + b.doneFuncs = nil + close(b.done) + b.mu.Unlock() + runBarrierDoneFuncs(doneFuncs) +} + +func (b *dmlBarrier) Wait() error { + <-b.done + b.mu.Lock() + defer b.mu.Unlock() + return b.err +} + +func (b *dmlBarrier) OnDone(f func()) { + b.mu.Lock() + if b.remaining == 0 { + b.mu.Unlock() + f() + return + } + b.doneFuncs = append(b.doneFuncs, f) + b.mu.Unlock() +} + +func runBarrierDoneFuncs(funcs []func()) { + for _, f := range funcs { + f() + } } // Verify is used to verify the sink uri and config is valid @@ -156,6 +242,7 @@ func NewMySQLSink( bdrMode: bdrMode, enableActiveActive: enableActiveActive, activeActiveSyncStatsCollector: activeActiveSyncStatsCollector, + outstandingBarriers: make(map[*dmlBarrier]struct{}), } for i := 0; i < len(result.dmlWriter); i++ { result.dmlWriter[i] = mysql.NewWriter(ctx, i, db, cfg, changefeedID, stat, activeActiveSyncStatsCollector) @@ -205,27 +292,35 @@ func (s *Sink) runDMLWriter(ctx context.Context, idx int) error { writer := s.dmlWriter[idx] totalStart := time.Now() - buffer := make([]*commonEvent.DMLEvent, 0, s.maxTxnRows) + itemBuffer := make([]causality.WriterItem, 0, s.maxTxnRows) + dmlBuffer := make([]*commonEvent.DMLEvent, 0, s.maxTxnRows) for { select { case <-ctx.Done(): - return errors.Trace(ctx.Err()) + err := errors.Trace(ctx.Err()) + s.failOutstandingBarriers(err) + return err default: - txnEvents, ok := inputCh.GetMultipleNoGroup(buffer) + items, ok := inputCh.GetMultipleNoGroup(itemBuffer) if !ok { - return errors.Trace(ctx.Err()) + err := errors.Trace(ctx.Err()) + s.failOutstandingBarriers(err) + return err } - if len(txnEvents) == 0 { - buffer = buffer[:0] + if len(items) == 0 { + itemBuffer = itemBuffer[:0] continue } start := time.Now() singleFlushStart := time.Now() - flushEvent := func(beginIndex, endIndex int, rowCount int32) error { + flushDMLs := func(events []*commonEvent.DMLEvent, rowCount int32) error { + if len(events) == 0 { + return nil + } workerHandledRows.Add(float64(rowCount)) - err := writer.Flush(txnEvents[beginIndex:endIndex]) + err := writer.Flush(events) if err != nil { return errors.Trace(err) } @@ -234,23 +329,40 @@ func (s *Sink) runDMLWriter(ctx context.Context, idx int) error { return nil } - beginIndex, rowCount := 0, txnEvents[0].Len() - workerEventRowCount.Observe(float64(rowCount)) - for i := 1; i < len(txnEvents); i++ { - workerEventRowCount.Observe(float64(txnEvents[i].Len())) - if rowCount+txnEvents[i].Len() > int32(s.maxTxnRows) { - if err := flushEvent(beginIndex, i, rowCount); err != nil { - return errors.Trace(err) + rowCount := int32(0) + for _, item := range items { + if item.DML != nil { + workerEventRowCount.Observe(float64(item.DML.Len())) + if rowCount+item.DML.Len() > int32(s.maxTxnRows) { + if err := flushDMLs(dmlBuffer, rowCount); err != nil { + s.failOutstandingBarriers(err) + return err + } + dmlBuffer = dmlBuffer[:0] + rowCount = 0 } - beginIndex, rowCount = i, txnEvents[i].Len() - } else { - rowCount += txnEvents[i].Len() + dmlBuffer = append(dmlBuffer, item.DML) + rowCount += item.DML.Len() + continue + } + + if item.Barrier != nil { + if err := flushDMLs(dmlBuffer, rowCount); err != nil { + item.Barrier.Fail(err) + s.failOutstandingBarriers(err) + return err + } + dmlBuffer = dmlBuffer[:0] + rowCount = 0 + item.Barrier.Ack(idx) } } // flush last batch - if err := flushEvent(beginIndex, len(txnEvents), rowCount); err != nil { - return errors.Trace(err) + if err := flushDMLs(dmlBuffer, rowCount); err != nil { + s.failOutstandingBarriers(err) + return err } + dmlBuffer = dmlBuffer[:0] workerBatchFlushDuration.Observe(time.Since(start).Seconds()) // we record total time to calculate the worker busy ratio. @@ -258,7 +370,7 @@ func (s *Sink) runDMLWriter(ctx context.Context, idx int) error { // flush time and total time workerTotalDuration.Observe(time.Since(totalStart).Seconds()) totalStart = time.Now() - buffer = buffer[:0] + itemBuffer = itemBuffer[:0] } } } @@ -284,7 +396,40 @@ func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) { } func (s *Sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { - return nil + barrier := newDMLBarrier(len(s.dmlWriter)) + s.registerBarrier(barrier) + defer s.unregisterBarrier(barrier) + + if err := s.conflictDetector.BroadcastBarrier(barrier); err != nil { + barrier.Fail(err) + return err + } + return barrier.Wait() +} + +func (s *Sink) registerBarrier(barrier *dmlBarrier) { + s.barrierMu.Lock() + s.outstandingBarriers[barrier] = struct{}{} + s.barrierMu.Unlock() +} + +func (s *Sink) unregisterBarrier(barrier *dmlBarrier) { + s.barrierMu.Lock() + delete(s.outstandingBarriers, barrier) + s.barrierMu.Unlock() +} + +func (s *Sink) failOutstandingBarriers(err error) { + s.barrierMu.Lock() + barriers := make([]*dmlBarrier, 0, len(s.outstandingBarriers)) + for barrier := range s.outstandingBarriers { + barriers = append(barriers, barrier) + } + s.barrierMu.Unlock() + + for _, barrier := range barriers { + barrier.Fail(err) + } } func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { @@ -403,6 +548,7 @@ func (s *Sink) GetTableRecoveryInfo( } func (s *Sink) Close() { + s.failOutstandingBarriers(context.Canceled) s.conflictDetector.CloseNotifiedNodes() s.ddlWriter.Close() for _, w := range s.dmlWriter { diff --git a/downstreamadapter/sink/mysql/sink_test.go b/downstreamadapter/sink/mysql/sink_test.go index b1697a52c2..2c8d0b8f86 100644 --- a/downstreamadapter/sink/mysql/sink_test.go +++ b/downstreamadapter/sink/mysql/sink_test.go @@ -223,6 +223,166 @@ func TestMysqlSinkMeetsDMLError(t *testing.T) { require.False(t, sink.IsNormal()) } +func TestMysqlSinkFlushDMLBeforeBlockReturnsOnDMLError(t *testing.T) { + sink, mock := MysqlSinkForTest() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + + dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')") + dmlEvent.CommitTs = 100 + + mock.ExpectExec("BEGIN;INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?);COMMIT;"). + WithArgs(1, "test"). + WillDelayFor(100 * time.Millisecond). + WillReturnError(errors.New("connect: connection refused")) + + sink.AddDMLEvent(dmlEvent) + + syncPointEvent := commonEvent.NewSyncPointEvent(common.NewDispatcherID(), 200, 0, 0) + flushDone := make(chan error, 1) + go func() { + flushDone <- sink.FlushDMLBeforeBlock(syncPointEvent) + }() + + require.Eventually(t, func() bool { + select { + case err := <-flushDone: + require.Error(t, err) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.False(t, sink.IsNormal()) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestMysqlSinkFlushDMLBeforeBlockWaitsForDMLPostFlush(t *testing.T) { + sink, mock := MysqlSinkForTest() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + + postFlushEntered := make(chan struct{}) + releasePostFlush := make(chan struct{}) + var dmlFlushed atomic.Bool + dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')") + dmlEvent.CommitTs = 100 + dmlEvent.AddPostFlushFunc(func() { + close(postFlushEntered) + <-releasePostFlush + dmlFlushed.Store(true) + }) + + mock.ExpectExec("BEGIN;INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?);COMMIT;"). + WithArgs(1, "test"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + sink.AddDMLEvent(dmlEvent) + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { + select { + case <-postFlushEntered: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.False(t, dmlFlushed.Load()) + + syncPointEvent := commonEvent.NewSyncPointEvent(common.NewDispatcherID(), 200, 0, 0) + flushDone := make(chan error, 1) + go func() { + flushDone <- sink.FlushDMLBeforeBlock(syncPointEvent) + }() + + require.Never(t, func() bool { + select { + case err := <-flushDone: + require.NoError(t, err) + return true + default: + return false + } + }, 200*time.Millisecond, 10*time.Millisecond) + + close(releasePostFlush) + + require.Eventually(t, func() bool { + select { + case err := <-flushDone: + require.NoError(t, err) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.True(t, dmlFlushed.Load()) +} + +func TestDMLBarrierAckFailAndDoneAreIdempotent(t *testing.T) { + barrier := newDMLBarrier(2) + doneCount := atomic.Int64{} + barrier.OnDone(func() { doneCount.Add(1) }) + + barrier.Ack(0) + barrier.Ack(0) + require.Never(t, func() bool { + select { + case <-barrier.done: + return true + default: + return false + } + }, 50*time.Millisecond, 10*time.Millisecond) + + barrier.Ack(1) + require.NoError(t, barrier.Wait()) + barrier.Fail(errors.New("late failure")) + barrier.Ack(1) + require.Equal(t, int64(1), doneCount.Load()) + + lateDone := atomic.Bool{} + barrier.OnDone(func() { lateDone.Store(true) }) + require.True(t, lateDone.Load()) +} + +func TestMysqlSinkCloseUnblocksWaitingBarrier(t *testing.T) { + _, sink, mock := getMysqlSink() + barrier := newDMLBarrier(1) + sink.registerBarrier(barrier) + mock.ExpectClose() + + flushDone := make(chan error, 1) + go func() { flushDone <- barrier.Wait() }() + + sink.Close() + + require.Eventually(t, func() bool { + select { + case err := <-flushDone: + require.Error(t, err) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.NoError(t, mock.ExpectationsWereMet()) +} + // test the situation meets error when executing DDL // whether the sink state is correct func TestMysqlSinkMeetsDDLError(t *testing.T) { diff --git a/pkg/sink/mysql/mysql_writer.go b/pkg/sink/mysql/mysql_writer.go index 4c38fadaab..dc24c70c8b 100644 --- a/pkg/sink/mysql/mysql_writer.go +++ b/pkg/sink/mysql/mysql_writer.go @@ -21,6 +21,7 @@ import ( "time" lru "github.com/hashicorp/golang-lru" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" @@ -249,6 +250,7 @@ func (w *Writer) Flush(events []*commonEvent.DMLEvent) error { } for _, event := range events { + failpoint.Inject("MySQLSinkDelayDMLPostFlush", nil) event.PostFlush() } diff --git a/utils/chann/unlimited_chann.go b/utils/chann/unlimited_chann.go index 9ef1a631d7..9aeb5a1de8 100644 --- a/utils/chann/unlimited_chann.go +++ b/utils/chann/unlimited_chann.go @@ -80,6 +80,24 @@ func (c *UnlimitedChannel[T, G]) Push(values ...T) { c.cond.Signal() } +// PushIfNotClosed pushes values to the channel and reports whether they were +// accepted. It is intended for control paths that must distinguish a closed +// channel from a successful enqueue. +func (c *UnlimitedChannel[T, G]) PushIfNotClosed(values ...T) bool { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return false + } + for _, v := range values { + c.queue.PushBack(v) + } + + c.cond.Signal() + return true +} + // Get retrieves an element from the channel. // Return the element and a boolean indicating whether the channel is available. // Return false if the channel is closed.