Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent, wak
// be rewritten into deletes when enable-active-active is disabled).
filteredEvents := make([]*commonEvent.DMLEvent, 0, len(events))
for _, event := range events {
if d.blockEventStatus.isDMLCompletedOrObsolete(event.GetCommitTs()) {
log.Info("skip obsolete dml event",
zap.Stringer("dispatcher", d.id),
zap.Uint64("commitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()))
continue
}

// FilterDMLEvent returns the original event for normal tables and only
// allocates a new event when the table needs active-active or soft-delete
// processing. Skip is true when every row in the event is dropped, or when
Expand Down Expand Up @@ -915,6 +923,10 @@ func (d *BasicDispatcher) reportBlockedEventDone(
actionCommitTs uint64,
actionIsSyncPoint bool,
) {
d.blockEventStatus.recordCompleted(BlockEventIdentifier{
CommitTs: actionCommitTs,
IsSyncPoint: actionIsSyncPoint,
})
d.offerDoneBlockStatus(actionCommitTs, actionIsSyncPoint)
GetDispatcherStatusDynamicStream().Wake(d.id)
}
Expand Down Expand Up @@ -1072,7 +1084,9 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
shouldBlock := d.shouldBlock(event)
shouldHoldBlocked := d.shouldHoldBlockEvent(event)
if shouldBlock && shouldHoldBlocked {
d.holdBlockEvent(event)
if !d.completeObsoleteBlockEvent(event) {
d.holdBlockEvent(event)
}
return
}
// Non-blocking DDLs are not coordinated through barrier WRITE/PASS, so
Expand All @@ -1098,10 +1112,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
needsMaintainerACK := !shouldBlock && d.IsTableTriggerDispatcher() &&
needsScheduleStatus
needsAddTableCheckpointBlocker := !shouldBlock && d.IsTableTriggerDispatcher() && hasNeedAddedTables
identifier := BlockEventIdentifier{
CommitTs: event.GetCommitTs(),
IsSyncPoint: false,
}
identifier := blockEventIdentifier(event)
if needsMaintainerACK {
// Register maintainer-visible DDLs before submitting downstream IO so
// following DB/All DDLs cannot pass this pending schedule update.
Expand Down Expand Up @@ -1135,6 +1146,9 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
}
if shouldBlock {
failpoint.Inject("BlockAfterFlush", nil)
if d.completeObsoleteBlockEvent(event) {
return
}
d.reportBlockedEventToMaintainer(event)
return
}
Expand Down Expand Up @@ -1275,10 +1289,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block
d.pendingACKCount.Add(1)
}
d.blockEventStatus.setBlockEvent(event, heartbeatpb.BlockStage_WAITING)
identifier := BlockEventIdentifier{
CommitTs: event.GetCommitTs(),
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
}
identifier := blockEventIdentifier(event)
// WAITING retries reuse this protobuf object, so clone mutable metadata once
// here and keep resend on the same immutable payload.
status := &heartbeatpb.TableSpanBlockStatus{
Expand All @@ -1300,6 +1311,20 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block
d.offerBlockStatus(status)
}

func (d *BasicDispatcher) completeObsoleteBlockEvent(event commonEvent.BlockEvent) bool {
if !d.blockEventStatus.isCompletedOrObsolete(event) {
return false
}
identifier := blockEventIdentifier(event)
log.Info("skip obsolete block event",
zap.Stringer("dispatcher", d.id),
zap.Uint64("commitTs", identifier.CommitTs),
zap.Bool("isSyncPoint", identifier.IsSyncPoint))
d.PassBlockEventToSink(event)
d.reportBlockedEventDone(identifier.CommitTs, identifier.IsSyncPoint)
return true
}

func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) {
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
failpoint.Inject("BlockOrWaitBeforeFlush", nil)
Expand All @@ -1308,6 +1333,9 @@ func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEve
return
}
failpoint.Inject("BlockAfterFlush", nil)
if d.completeObsoleteBlockEvent(event) {
return
}
d.reportBlockedEventToMaintainer(event)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package dispatcher

import (
"context"
"testing"
"time"

"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -126,6 +128,89 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) {
}
}

func TestHandleEventsSkipsDMLBeforeCompletedBlockEvent(t *testing.T) {
sharedInfo := newTestSharedInfo(false, false, nil)
dispatcherSink := newDispatcherTestSink(t, common.MysqlSinkType)
tableSpan := &heartbeatpb.TableSpan{TableID: 1, StartKey: []byte{0}, EndKey: []byte{1}}
dispatcher := NewBasicDispatcher(
common.NewDispatcherID(),
tableSpan,
100,
1,
NewSchemaIDToDispatchers(),
false,
false,
4096,
0,
200,
common.DefaultMode,
dispatcherSink.Sink(),
sharedInfo,
)

helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, v int)")
oldDML := helper.DML2Event("test", "t", "insert into t values (1, 1)")
oldDML.DispatcherID = dispatcher.id
oldDML.StartTs = 110
oldDML.CommitTs = 120
newDML := helper.DML2Event("test", "t", "insert into t values (2, 2)")
newDML.DispatcherID = dispatcher.id
newDML.StartTs = 130
newDML.CommitTs = 140

dispatcher.blockEventStatus.recordCompleted(BlockEventIdentifier{CommitTs: 120})
block := dispatcher.handleEvents([]DispatcherEvent{{Event: oldDML}, {Event: newDML}}, func() {})
require.True(t, block)

dmls := dispatcherSink.GetDMLs()
require.Len(t, dmls, 1)
require.Equal(t, uint64(140), dmls[0].CommitTs)
}

func TestHeldObsoleteBlockEventCompletesWithoutWaitingReport(t *testing.T) {
sharedInfo := newTestSharedInfo(false, false, nil)
dispatcherSink := newDispatcherTestSink(t, common.MysqlSinkType)
dispatcherID := common.NewDispatcherID()
dispatcher := NewBasicDispatcher(
dispatcherID,
common.KeyspaceDDLSpan(common.DefaultKeyspaceID),
100,
common.DDLSpanSchemaID,
NewSchemaIDToDispatchers(),
false,
false,
4096,
0,
200,
common.DefaultMode,
dispatcherSink.Sink(),
sharedInfo,
)

event := commonEvent.NewSyncPointEvent(dispatcherID, 120, 1, 0)
dispatcher.pendingACKCount.Store(1)
dispatcher.DealWithBlockEvent(event)
require.NotNil(t, dispatcher.holdingBlockEvent)
require.Equal(t, 0, dispatcher.resendTaskMap.Len())

dispatcher.blockEventStatus.recordCompleted(BlockEventIdentifier{CommitTs: 120, IsSyncPoint: true})
dispatcher.pendingACKCount.Store(0)
dispatcher.tryDealWithHeldBlockEvent()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
status := dispatcher.TakeBlockStatus(ctx)
require.NotNil(t, status)
require.Equal(t, heartbeatpb.BlockStage_DONE, status.State.Stage)
require.Equal(t, uint64(120), status.State.BlockTs)
require.True(t, status.State.IsSyncPoint)
require.Equal(t, 0, dispatcher.resendTaskMap.Len())
require.Nil(t, dispatcher.blockEventStatus.getEvent())
}

func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher {
t.Helper()
sharedInfo := newTestSharedInfo(enableActiveActive, false, nil)
Expand Down
53 changes: 52 additions & 1 deletion downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type BlockEventStatus struct {
blockPendingEvent commonEvent.BlockEvent
blockStage heartbeatpb.BlockStage
blockCommitTs uint64
completed BlockEventIdentifier
}

func (b *BlockEventStatus) clear() {
Expand All @@ -181,6 +182,32 @@ func (b *BlockEventStatus) setBlockEvent(event commonEvent.BlockEvent, blockStag
b.blockCommitTs = event.GetCommitTs()
}

func (b *BlockEventStatus) isCompletedOrObsolete(event commonEvent.BlockEvent) bool {
b.mutex.Lock()
defer b.mutex.Unlock()

if b.completed.CommitTs == 0 {
return false
}
return compareBlockEventIdentifier(blockEventIdentifier(event), b.completed) <= 0
}

func (b *BlockEventStatus) isDMLCompletedOrObsolete(commitTs uint64) bool {
b.mutex.Lock()
defer b.mutex.Unlock()

return b.completed.CommitTs != 0 && commitTs <= b.completed.CommitTs
}

func (b *BlockEventStatus) recordCompleted(identifier BlockEventIdentifier) {
b.mutex.Lock()
defer b.mutex.Unlock()

if b.completed.CommitTs == 0 || compareBlockEventIdentifier(identifier, b.completed) > 0 {
b.completed = identifier
}
}

func (b *BlockEventStatus) updateBlockStage(blockStage heartbeatpb.BlockStage) {
b.mutex.Lock()
defer b.mutex.Unlock()
Expand Down Expand Up @@ -214,7 +241,8 @@ func (b *BlockEventStatus) actionMatchs(action *heartbeatpb.DispatcherAction) bo
return false
}

return b.blockCommitTs == action.CommitTs
pendingIsSyncPoint := b.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent
return b.blockCommitTs == action.CommitTs && pendingIsSyncPoint == action.IsSyncPoint
}

// ignoredStatusMatches checks whether the ignored status is for the current pending ddl/sync point event.
Expand Down Expand Up @@ -244,6 +272,29 @@ func (b *BlockEventStatus) getEventCommitTs() (uint64, bool) {
return b.blockCommitTs, true
}

func blockEventIdentifier(event commonEvent.BlockEvent) BlockEventIdentifier {
return BlockEventIdentifier{
CommitTs: event.GetCommitTs(),
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
}
}

func compareBlockEventIdentifier(a, b BlockEventIdentifier) int {
if a.CommitTs < b.CommitTs {
return -1
}
if a.CommitTs > b.CommitTs {
return 1
}
if a.IsSyncPoint == b.IsSyncPoint {
return 0
}
if !a.IsSyncPoint && b.IsSyncPoint {
return -1
}
return 1
}

type SchemaIDToDispatchers struct {
mutex sync.RWMutex
m map[int64]map[common.DispatcherID]interface{}
Expand Down
48 changes: 48 additions & 0 deletions downstreamadapter/dispatcher/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"testing"

"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/stretchr/testify/require"
)

func TestBlockEventStatusCompletedWatermark(t *testing.T) {
var status BlockEventStatus
ddl10 := &commonEvent.DDLEvent{FinishedTs: 10}
syncpoint10 := commonEvent.NewSyncPointEvent(common.NewDispatcherID(), 10, 1, 0)
ddl11 := &commonEvent.DDLEvent{FinishedTs: 11}

status.recordCompleted(BlockEventIdentifier{CommitTs: 10, IsSyncPoint: false})
require.True(t, status.isCompletedOrObsolete(ddl10))
require.False(t, status.isCompletedOrObsolete(syncpoint10))
require.False(t, status.isCompletedOrObsolete(ddl11))

status.recordCompleted(BlockEventIdentifier{CommitTs: 10, IsSyncPoint: true})
require.True(t, status.isCompletedOrObsolete(ddl10))
require.True(t, status.isCompletedOrObsolete(syncpoint10))
require.False(t, status.isCompletedOrObsolete(ddl11))
}

func TestBlockEventStatusActionMatchesSyncPointFlag(t *testing.T) {
var status BlockEventStatus
status.setBlockEvent(&commonEvent.DDLEvent{FinishedTs: 10}, heartbeatpb.BlockStage_WAITING)

require.True(t, status.actionMatchs(&heartbeatpb.DispatcherAction{CommitTs: 10}))
require.False(t, status.actionMatchs(&heartbeatpb.DispatcherAction{CommitTs: 10, IsSyncPoint: true}))
}
6 changes: 6 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (d *dispatcherStat) advanceEpochForReset(resetTs uint64) uint64 {
currentState := d.loadCurrentEpochState()
nextState := newDispatcherEpochState(currentState.epoch+1, 0, resetTs)
if d.currentEpoch.CompareAndSwap(currentState, nextState) {
// The new epoch replays events from resetTs. Commit-ts based
// deduplication from the old epoch must not filter replayed DDL or
// SyncPoint events.
d.lastEventCommitTs.Store(resetTs)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a concrete example to explain why these three lines is necessary?

d.gotDDLOnTs.Store(false)
d.gotSyncpointOnTS.Store(false)
return nextState.epoch
}
}
Expand Down
Loading
Loading