diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 53d4485d10..a33ede5e2e 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -295,6 +295,14 @@ func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent, wak // be rewritten into deletes when enable-active-active is disabled). filteredEvents := make([]*commonEvent.DMLEvent, 0, len(events)) for _, event := range events { + if d.blockEventStatus.isDMLCompletedOrObsolete(event.GetCommitTs()) { + log.Info("skip obsolete dml event", + zap.Stringer("dispatcher", d.id), + zap.Uint64("commitTs", event.GetCommitTs()), + zap.Uint64("seq", event.GetSeq())) + continue + } + // FilterDMLEvent returns the original event for normal tables and only // allocates a new event when the table needs active-active or soft-delete // processing. Skip is true when every row in the event is dropped, or when @@ -915,6 +923,10 @@ func (d *BasicDispatcher) reportBlockedEventDone( actionCommitTs uint64, actionIsSyncPoint bool, ) { + d.blockEventStatus.recordCompleted(BlockEventIdentifier{ + CommitTs: actionCommitTs, + IsSyncPoint: actionIsSyncPoint, + }) d.offerDoneBlockStatus(actionCommitTs, actionIsSyncPoint) GetDispatcherStatusDynamicStream().Wake(d.id) } @@ -1072,7 +1084,9 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { shouldBlock := d.shouldBlock(event) shouldHoldBlocked := d.shouldHoldBlockEvent(event) if shouldBlock && shouldHoldBlocked { - d.holdBlockEvent(event) + if !d.completeObsoleteBlockEvent(event) { + d.holdBlockEvent(event) + } return } // Non-blocking DDLs are not coordinated through barrier WRITE/PASS, so @@ -1098,10 +1112,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { needsMaintainerACK := !shouldBlock && d.IsTableTriggerDispatcher() && needsScheduleStatus needsAddTableCheckpointBlocker := !shouldBlock && d.IsTableTriggerDispatcher() && hasNeedAddedTables - identifier := BlockEventIdentifier{ - CommitTs: event.GetCommitTs(), - IsSyncPoint: false, - } + identifier := blockEventIdentifier(event) if needsMaintainerACK { // Register maintainer-visible DDLs before submitting downstream IO so // following DB/All DDLs cannot pass this pending schedule update. @@ -1135,6 +1146,9 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { } if shouldBlock { failpoint.Inject("BlockAfterFlush", nil) + if d.completeObsoleteBlockEvent(event) { + return + } d.reportBlockedEventToMaintainer(event) return } @@ -1275,10 +1289,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block d.pendingACKCount.Add(1) } d.blockEventStatus.setBlockEvent(event, heartbeatpb.BlockStage_WAITING) - identifier := BlockEventIdentifier{ - CommitTs: event.GetCommitTs(), - IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, - } + identifier := blockEventIdentifier(event) // WAITING retries reuse this protobuf object, so clone mutable metadata once // here and keep resend on the same immutable payload. status := &heartbeatpb.TableSpanBlockStatus{ @@ -1300,6 +1311,20 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block d.offerBlockStatus(status) } +func (d *BasicDispatcher) completeObsoleteBlockEvent(event commonEvent.BlockEvent) bool { + if !d.blockEventStatus.isCompletedOrObsolete(event) { + return false + } + identifier := blockEventIdentifier(event) + log.Info("skip obsolete block event", + zap.Stringer("dispatcher", d.id), + zap.Uint64("commitTs", identifier.CommitTs), + zap.Bool("isSyncPoint", identifier.IsSyncPoint)) + d.PassBlockEventToSink(event) + d.reportBlockedEventDone(identifier.CommitTs, identifier.IsSyncPoint) + return true +} + func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) { d.sharedInfo.GetBlockEventExecutor().Submit(d, func() { failpoint.Inject("BlockOrWaitBeforeFlush", nil) @@ -1308,6 +1333,9 @@ func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEve return } failpoint.Inject("BlockAfterFlush", nil) + if d.completeObsoleteBlockEvent(event) { + return + } d.reportBlockedEventToMaintainer(event) }) } diff --git a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go index 38d112571c..ca6f581b63 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go @@ -13,7 +13,9 @@ package dispatcher import ( + "context" "testing" + "time" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" @@ -126,6 +128,89 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) { } } +func TestHandleEventsSkipsDMLBeforeCompletedBlockEvent(t *testing.T) { + sharedInfo := newTestSharedInfo(false, false, nil) + dispatcherSink := newDispatcherTestSink(t, common.MysqlSinkType) + tableSpan := &heartbeatpb.TableSpan{TableID: 1, StartKey: []byte{0}, EndKey: []byte{1}} + dispatcher := NewBasicDispatcher( + common.NewDispatcherID(), + tableSpan, + 100, + 1, + NewSchemaIDToDispatchers(), + false, + false, + 4096, + 0, + 200, + common.DefaultMode, + dispatcherSink.Sink(), + sharedInfo, + ) + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, v int)") + oldDML := helper.DML2Event("test", "t", "insert into t values (1, 1)") + oldDML.DispatcherID = dispatcher.id + oldDML.StartTs = 110 + oldDML.CommitTs = 120 + newDML := helper.DML2Event("test", "t", "insert into t values (2, 2)") + newDML.DispatcherID = dispatcher.id + newDML.StartTs = 130 + newDML.CommitTs = 140 + + dispatcher.blockEventStatus.recordCompleted(BlockEventIdentifier{CommitTs: 120}) + block := dispatcher.handleEvents([]DispatcherEvent{{Event: oldDML}, {Event: newDML}}, func() {}) + require.True(t, block) + + dmls := dispatcherSink.GetDMLs() + require.Len(t, dmls, 1) + require.Equal(t, uint64(140), dmls[0].CommitTs) +} + +func TestHeldObsoleteBlockEventCompletesWithoutWaitingReport(t *testing.T) { + sharedInfo := newTestSharedInfo(false, false, nil) + dispatcherSink := newDispatcherTestSink(t, common.MysqlSinkType) + dispatcherID := common.NewDispatcherID() + dispatcher := NewBasicDispatcher( + dispatcherID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), + 100, + common.DDLSpanSchemaID, + NewSchemaIDToDispatchers(), + false, + false, + 4096, + 0, + 200, + common.DefaultMode, + dispatcherSink.Sink(), + sharedInfo, + ) + + event := commonEvent.NewSyncPointEvent(dispatcherID, 120, 1, 0) + dispatcher.pendingACKCount.Store(1) + dispatcher.DealWithBlockEvent(event) + require.NotNil(t, dispatcher.holdingBlockEvent) + require.Equal(t, 0, dispatcher.resendTaskMap.Len()) + + dispatcher.blockEventStatus.recordCompleted(BlockEventIdentifier{CommitTs: 120, IsSyncPoint: true}) + dispatcher.pendingACKCount.Store(0) + dispatcher.tryDealWithHeldBlockEvent() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + status := dispatcher.TakeBlockStatus(ctx) + require.NotNil(t, status) + require.Equal(t, heartbeatpb.BlockStage_DONE, status.State.Stage) + require.Equal(t, uint64(120), status.State.BlockTs) + require.True(t, status.State.IsSyncPoint) + require.Equal(t, 0, dispatcher.resendTaskMap.Len()) + require.Nil(t, dispatcher.blockEventStatus.getEvent()) +} + func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher { t.Helper() sharedInfo := newTestSharedInfo(enableActiveActive, false, nil) diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index a9530c9085..b87537b43e 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -161,6 +161,7 @@ type BlockEventStatus struct { blockPendingEvent commonEvent.BlockEvent blockStage heartbeatpb.BlockStage blockCommitTs uint64 + completed BlockEventIdentifier } func (b *BlockEventStatus) clear() { @@ -181,6 +182,32 @@ func (b *BlockEventStatus) setBlockEvent(event commonEvent.BlockEvent, blockStag b.blockCommitTs = event.GetCommitTs() } +func (b *BlockEventStatus) isCompletedOrObsolete(event commonEvent.BlockEvent) bool { + b.mutex.Lock() + defer b.mutex.Unlock() + + if b.completed.CommitTs == 0 { + return false + } + return compareBlockEventIdentifier(blockEventIdentifier(event), b.completed) <= 0 +} + +func (b *BlockEventStatus) isDMLCompletedOrObsolete(commitTs uint64) bool { + b.mutex.Lock() + defer b.mutex.Unlock() + + return b.completed.CommitTs != 0 && commitTs <= b.completed.CommitTs +} + +func (b *BlockEventStatus) recordCompleted(identifier BlockEventIdentifier) { + b.mutex.Lock() + defer b.mutex.Unlock() + + if b.completed.CommitTs == 0 || compareBlockEventIdentifier(identifier, b.completed) > 0 { + b.completed = identifier + } +} + func (b *BlockEventStatus) updateBlockStage(blockStage heartbeatpb.BlockStage) { b.mutex.Lock() defer b.mutex.Unlock() @@ -214,7 +241,8 @@ func (b *BlockEventStatus) actionMatchs(action *heartbeatpb.DispatcherAction) bo return false } - return b.blockCommitTs == action.CommitTs + pendingIsSyncPoint := b.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent + return b.blockCommitTs == action.CommitTs && pendingIsSyncPoint == action.IsSyncPoint } // ignoredStatusMatches checks whether the ignored status is for the current pending ddl/sync point event. @@ -244,6 +272,29 @@ func (b *BlockEventStatus) getEventCommitTs() (uint64, bool) { return b.blockCommitTs, true } +func blockEventIdentifier(event commonEvent.BlockEvent) BlockEventIdentifier { + return BlockEventIdentifier{ + CommitTs: event.GetCommitTs(), + IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, + } +} + +func compareBlockEventIdentifier(a, b BlockEventIdentifier) int { + if a.CommitTs < b.CommitTs { + return -1 + } + if a.CommitTs > b.CommitTs { + return 1 + } + if a.IsSyncPoint == b.IsSyncPoint { + return 0 + } + if !a.IsSyncPoint && b.IsSyncPoint { + return -1 + } + return 1 +} + type SchemaIDToDispatchers struct { mutex sync.RWMutex m map[int64]map[common.DispatcherID]interface{} diff --git a/downstreamadapter/dispatcher/helper_test.go b/downstreamadapter/dispatcher/helper_test.go new file mode 100644 index 0000000000..e811a41e19 --- /dev/null +++ b/downstreamadapter/dispatcher/helper_test.go @@ -0,0 +1,48 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "testing" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/stretchr/testify/require" +) + +func TestBlockEventStatusCompletedWatermark(t *testing.T) { + var status BlockEventStatus + ddl10 := &commonEvent.DDLEvent{FinishedTs: 10} + syncpoint10 := commonEvent.NewSyncPointEvent(common.NewDispatcherID(), 10, 1, 0) + ddl11 := &commonEvent.DDLEvent{FinishedTs: 11} + + status.recordCompleted(BlockEventIdentifier{CommitTs: 10, IsSyncPoint: false}) + require.True(t, status.isCompletedOrObsolete(ddl10)) + require.False(t, status.isCompletedOrObsolete(syncpoint10)) + require.False(t, status.isCompletedOrObsolete(ddl11)) + + status.recordCompleted(BlockEventIdentifier{CommitTs: 10, IsSyncPoint: true}) + require.True(t, status.isCompletedOrObsolete(ddl10)) + require.True(t, status.isCompletedOrObsolete(syncpoint10)) + require.False(t, status.isCompletedOrObsolete(ddl11)) +} + +func TestBlockEventStatusActionMatchesSyncPointFlag(t *testing.T) { + var status BlockEventStatus + status.setBlockEvent(&commonEvent.DDLEvent{FinishedTs: 10}, heartbeatpb.BlockStage_WAITING) + + require.True(t, status.actionMatchs(&heartbeatpb.DispatcherAction{CommitTs: 10})) + require.False(t, status.actionMatchs(&heartbeatpb.DispatcherAction{CommitTs: 10, IsSyncPoint: true})) +} diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 350cb0dae7..aa69d1119f 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -137,6 +137,12 @@ func (d *dispatcherStat) advanceEpochForReset(resetTs uint64) uint64 { currentState := d.loadCurrentEpochState() nextState := newDispatcherEpochState(currentState.epoch+1, 0, resetTs) if d.currentEpoch.CompareAndSwap(currentState, nextState) { + // The new epoch replays events from resetTs. Commit-ts based + // deduplication from the old epoch must not filter replayed DDL or + // SyncPoint events. + d.lastEventCommitTs.Store(resetTs) + d.gotDDLOnTs.Store(false) + d.gotSyncpointOnTS.Store(false) return nextState.epoch } } diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index f2d7b17635..9922784954 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/routing" + "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -552,6 +553,53 @@ func TestUpdateCommitTsStateByEvents(t *testing.T) { require.Equal(t, uint64(110), state.maxEventTs.Load()) } +func TestAdvanceEpochForResetClearsCommitTsFilter(t *testing.T) { + t.Parallel() + + dispatcherID := common.NewDispatcherID() + eventServiceID := node.ID("event-service-1") + mockDisp := newMockDispatcher(dispatcherID, 100) + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) (block bool) { + return len(events) > 0 + } + + stat := newDispatcherStatForTest(mockDisp, nil) + stat.currentEpoch.Store(newDispatcherEpochState(10, 3, stat.target.GetStartTs())) + stat.lastEventCommitTs.Store(220) + stat.gotDDLOnTs.Store(true) + stat.gotSyncpointOnTS.Store(true) + + epoch := stat.advanceEpochForReset(150) + require.Equal(t, uint64(11), epoch) + require.Equal(t, uint64(150), stat.lastEventCommitTs.Load()) + require.False(t, stat.gotDDLOnTs.Load()) + require.False(t, stat.gotSyncpointOnTS.Load()) + + handshake := commonEvent.NewHandshakeEvent(dispatcherID, 160, epoch, &common.TableInfo{}) + stat.handleHandshakeEvent(dispatcher.DispatcherEvent{ + From: &eventServiceID, + Event: &handshake, + }) + + ddl := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 180, + Seq: 2, + Epoch: epoch, + } + require.True(t, stat.handleSingleDataEvents([]dispatcher.DispatcherEvent{ + { + From: &eventServiceID, + Event: ddl, + }, + })) + require.Len(t, mockDisp.events, 1) + require.Same(t, ddl, mockDisp.events[0].Event) + require.Equal(t, uint64(180), stat.lastEventCommitTs.Load()) + require.True(t, stat.gotDDLOnTs.Load()) + require.False(t, stat.gotSyncpointOnTS.Load()) +} + func TestHandleSignalEvent(t *testing.T) { localServerID := node.ID("local-server") remoteServerID := node.ID("remote-server") @@ -897,6 +945,42 @@ func TestInitialLocalReadyCallbackIsOneShot(t *testing.T) { requireNoDispatcherRequest(t, mockEventCollector) } +func TestReleasePathFeedbackResetsCurrentEventService(t *testing.T) { + localServerID := node.ID("local-server") + dispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("release_path_test", common.DefaultKeyspaceName) + mockDisp := newMockDispatcher(dispatcherID, 10) + mockDisp.changefeedID = cfID + mockDisp.checkPointTs = 20 + mockEventCollector := newTestEventCollector(localServerID) + stat := newDispatcherStat(mockDisp, mockEventCollector, nil) + setSessionState(stat.session, localServerID, false, "") + mockEventCollector.dispatcherMap.Store(dispatcherID, stat) + mockEventCollector.changefeedMap.Store(cfID.ID(), newChangefeedStat(cfID)) + + released := false + feedback := dynstream.Feedback[common.GID, common.DispatcherID, *dispatcherStat]{ + Area: cfID.ID(), + Path: dispatcherID, + FeedbackType: dynstream.ReleasePath, + } + mockEventCollector.handleReleasePathFeedback(feedback, func(path common.DispatcherID) { + released = true + require.Equal(t, dispatcherID, path) + }, "DS") + + require.True(t, released) + cfStatValue, ok := mockEventCollector.changefeedMap.Load(cfID.ID()) + require.True(t, ok) + require.Equal(t, uint32(1), cfStatValue.(*changefeedStat).memoryReleaseCount.Load()) + requireDispatcherRequests( + t, + readDispatcherRequests(t, mockEventCollector, 1), + dispatcherRequestRecord{to: localServerID, action: eventpb.ActionType_ACTION_TYPE_RESET}, + ) + requireNoDispatcherRequest(t, mockEventCollector) +} + func TestIsFromCurrentEpoch(t *testing.T) { t.Parallel() diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6456f1ada2..7319279a54 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -448,24 +448,38 @@ func (c *EventCollector) processDSFeedback(ctx context.Context) error { return context.Cause(ctx) case feedback := <-c.ds.Feedback(): if feedback.FeedbackType == dynstream.ReleasePath { - if v, ok := c.changefeedMap.Load(feedback.Area); ok { - v.(*changefeedStat).memoryReleaseCount.Add(1) - } - log.Info("release dispatcher memory in DS", zap.Any("dispatcherID", feedback.Path)) - c.ds.Release(feedback.Path) + c.handleReleasePathFeedback(feedback, c.ds.Release, "DS") } case feedback := <-c.redoDs.Feedback(): if feedback.FeedbackType == dynstream.ReleasePath { - if v, ok := c.changefeedMap.Load(feedback.Area); ok { - v.(*changefeedStat).memoryReleaseCount.Add(1) - } - log.Info("release dispatcher memory in redo DS", zap.Any("dispatcherID", feedback.Path)) - c.redoDs.Release(feedback.Path) + c.handleReleasePathFeedback(feedback, c.redoDs.Release, "redo DS") } } } } +func (c *EventCollector) handleReleasePathFeedback( + feedback dynstream.Feedback[common.GID, common.DispatcherID, *dispatcherStat], + release func(common.DispatcherID), + streamName string, +) { + if v, ok := c.changefeedMap.Load(feedback.Area); ok { + v.(*changefeedStat).memoryReleaseCount.Add(1) + } + log.Info("release dispatcher memory in "+streamName, zap.Any("dispatcherID", feedback.Path)) + release(feedback.Path) + + stat := c.getDispatcherStatByID(feedback.Path) + if stat == nil { + return + } + log.Info("reset dispatcher after releasing queued events", + zap.Stringer("changefeedID", stat.target.GetChangefeedID()), + zap.Stringer("dispatcherID", feedback.Path), + zap.String("stream", streamName)) + stat.session.resetCurrentEventService() +} + func (c *EventCollector) sendDispatcherRequests(ctx context.Context) error { for { select {