From 96bb0b4cc9c67df72fdb12048609fc9dc7b278a4 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 16 Jun 2026 20:45:02 +0800 Subject: [PATCH 1/3] downstreamadapter,eventservice: skip obsolete replayed events --- .../dispatcher/basic_dispatcher.go | 36 +++++++- .../basic_dispatcher_active_active_test.go | 85 +++++++++++++++++++ downstreamadapter/dispatcher/helper.go | 55 +++++++++++- downstreamadapter/dispatcher/helper_test.go | 48 +++++++++++ .../eventcollector/dispatcher_stat.go | 6 ++ .../eventcollector/dispatcher_stat_test.go | 84 ++++++++++++++++++ .../eventcollector/event_collector.go | 34 +++++--- pkg/eventservice/event_scanner.go | 6 +- pkg/eventservice/event_scanner_test.go | 67 ++++++++++++++- 9 files changed, 405 insertions(+), 16 deletions(-) create mode 100644 downstreamadapter/dispatcher/helper_test.go diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 6c68254851..ef30b8cfc5 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -284,6 +284,14 @@ func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent, wak // be rewritten into deletes when enable-active-active is disabled). filteredEvents := make([]*commonEvent.DMLEvent, 0, len(events)) for _, event := range events { + if d.blockEventStatus.isDMLCompletedOrObsolete(event.GetCommitTs()) { + log.Info("skip obsolete dml event", + zap.Stringer("dispatcher", d.id), + zap.Uint64("commitTs", event.GetCommitTs()), + zap.Uint64("seq", event.GetSeq())) + continue + } + // FilterDMLEvent returns the original event for normal tables and only // allocates a new event when the table needs active-active or soft-delete // processing. Skip is true when every row in the event is dropped, or when @@ -902,6 +910,10 @@ func (d *BasicDispatcher) reportBlockedEventDone( actionCommitTs uint64, actionIsSyncPoint bool, ) { + d.blockEventStatus.recordCompleted(BlockEventIdentifier{ + CommitTs: actionCommitTs, + IsSyncPoint: actionIsSyncPoint, + }) d.offerDoneBlockStatus(actionCommitTs, actionIsSyncPoint) GetDispatcherStatusDynamicStream().Wake(d.id) } @@ -1043,7 +1055,9 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { shouldBlock := d.shouldBlock(event) shouldHoldBlocked := d.shouldHoldBlockEvent(event) if shouldBlock && shouldHoldBlocked { - d.holdBlockEvent(event) + if !d.completeObsoleteBlockEvent(event) { + d.holdBlockEvent(event) + } return } // Writing a block event may involve downstream IO (e.g. executing DDL), so it must not block @@ -1089,6 +1103,9 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { } if shouldBlock { failpoint.Inject("BlockAfterFlush", nil) + if d.completeObsoleteBlockEvent(event) { + return + } d.reportBlockedEventToMaintainer(event) return } @@ -1273,6 +1290,20 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block d.offerBlockStatus(status) } +func (d *BasicDispatcher) completeObsoleteBlockEvent(event commonEvent.BlockEvent) bool { + if !d.blockEventStatus.isCompletedOrObsolete(event) { + return false + } + identifier := blockEventIdentifier(event) + log.Info("skip obsolete block event", + zap.Stringer("dispatcher", d.id), + zap.Uint64("commitTs", identifier.CommitTs), + zap.Bool("isSyncPoint", identifier.IsSyncPoint)) + d.PassBlockEventToSink(event) + d.reportBlockedEventDone(identifier.CommitTs, identifier.IsSyncPoint) + return true +} + func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) { d.sharedInfo.GetBlockEventExecutor().Submit(d, func() { failpoint.Inject("BlockOrWaitBeforeFlush", nil) @@ -1281,6 +1312,9 @@ func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEve return } failpoint.Inject("BlockAfterFlush", nil) + if d.completeObsoleteBlockEvent(event) { + return + } d.reportBlockedEventToMaintainer(event) }) } diff --git a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go index 38d112571c..ca6f581b63 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go @@ -13,7 +13,9 @@ package dispatcher import ( + "context" "testing" + "time" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" @@ -126,6 +128,89 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) { } } +func TestHandleEventsSkipsDMLBeforeCompletedBlockEvent(t *testing.T) { + sharedInfo := newTestSharedInfo(false, false, nil) + dispatcherSink := newDispatcherTestSink(t, common.MysqlSinkType) + tableSpan := &heartbeatpb.TableSpan{TableID: 1, StartKey: []byte{0}, EndKey: []byte{1}} + dispatcher := NewBasicDispatcher( + common.NewDispatcherID(), + tableSpan, + 100, + 1, + NewSchemaIDToDispatchers(), + false, + false, + 4096, + 0, + 200, + common.DefaultMode, + dispatcherSink.Sink(), + sharedInfo, + ) + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, v int)") + oldDML := helper.DML2Event("test", "t", "insert into t values (1, 1)") + oldDML.DispatcherID = dispatcher.id + oldDML.StartTs = 110 + oldDML.CommitTs = 120 + newDML := helper.DML2Event("test", "t", "insert into t values (2, 2)") + newDML.DispatcherID = dispatcher.id + newDML.StartTs = 130 + newDML.CommitTs = 140 + + dispatcher.blockEventStatus.recordCompleted(BlockEventIdentifier{CommitTs: 120}) + block := dispatcher.handleEvents([]DispatcherEvent{{Event: oldDML}, {Event: newDML}}, func() {}) + require.True(t, block) + + dmls := dispatcherSink.GetDMLs() + require.Len(t, dmls, 1) + require.Equal(t, uint64(140), dmls[0].CommitTs) +} + +func TestHeldObsoleteBlockEventCompletesWithoutWaitingReport(t *testing.T) { + sharedInfo := newTestSharedInfo(false, false, nil) + dispatcherSink := newDispatcherTestSink(t, common.MysqlSinkType) + dispatcherID := common.NewDispatcherID() + dispatcher := NewBasicDispatcher( + dispatcherID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), + 100, + common.DDLSpanSchemaID, + NewSchemaIDToDispatchers(), + false, + false, + 4096, + 0, + 200, + common.DefaultMode, + dispatcherSink.Sink(), + sharedInfo, + ) + + event := commonEvent.NewSyncPointEvent(dispatcherID, 120, 1, 0) + dispatcher.pendingACKCount.Store(1) + dispatcher.DealWithBlockEvent(event) + require.NotNil(t, dispatcher.holdingBlockEvent) + require.Equal(t, 0, dispatcher.resendTaskMap.Len()) + + dispatcher.blockEventStatus.recordCompleted(BlockEventIdentifier{CommitTs: 120, IsSyncPoint: true}) + dispatcher.pendingACKCount.Store(0) + dispatcher.tryDealWithHeldBlockEvent() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + status := dispatcher.TakeBlockStatus(ctx) + require.NotNil(t, status) + require.Equal(t, heartbeatpb.BlockStage_DONE, status.State.Stage) + require.Equal(t, uint64(120), status.State.BlockTs) + require.True(t, status.State.IsSyncPoint) + require.Equal(t, 0, dispatcher.resendTaskMap.Len()) + require.Nil(t, dispatcher.blockEventStatus.getEvent()) +} + func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher { t.Helper() sharedInfo := newTestSharedInfo(enableActiveActive, false, nil) diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index c76b333832..580464df6c 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -86,6 +86,8 @@ type BlockEventStatus struct { blockPendingEvent commonEvent.BlockEvent blockStage heartbeatpb.BlockStage blockCommitTs uint64 + completed BlockEventIdentifier + hasCompleted bool } func (b *BlockEventStatus) clear() { @@ -106,6 +108,33 @@ func (b *BlockEventStatus) setBlockEvent(event commonEvent.BlockEvent, blockStag b.blockCommitTs = event.GetCommitTs() } +func (b *BlockEventStatus) isCompletedOrObsolete(event commonEvent.BlockEvent) bool { + b.mutex.Lock() + defer b.mutex.Unlock() + + if !b.hasCompleted { + return false + } + return compareBlockEventIdentifier(blockEventIdentifier(event), b.completed) <= 0 +} + +func (b *BlockEventStatus) isDMLCompletedOrObsolete(commitTs uint64) bool { + b.mutex.Lock() + defer b.mutex.Unlock() + + return b.hasCompleted && commitTs <= b.completed.CommitTs +} + +func (b *BlockEventStatus) recordCompleted(identifier BlockEventIdentifier) { + b.mutex.Lock() + defer b.mutex.Unlock() + + if !b.hasCompleted || compareBlockEventIdentifier(identifier, b.completed) > 0 { + b.completed = identifier + b.hasCompleted = true + } +} + func (b *BlockEventStatus) updateBlockStage(blockStage heartbeatpb.BlockStage) { b.mutex.Lock() defer b.mutex.Unlock() @@ -139,7 +168,8 @@ func (b *BlockEventStatus) actionMatchs(action *heartbeatpb.DispatcherAction) bo return false } - return b.blockCommitTs == action.CommitTs + pendingIsSyncPoint := b.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent + return b.blockCommitTs == action.CommitTs && pendingIsSyncPoint == action.IsSyncPoint } // ignoredStatusMatches checks whether the ignored status is for the current pending ddl/sync point event. @@ -169,6 +199,29 @@ func (b *BlockEventStatus) getEventCommitTs() (uint64, bool) { return b.blockCommitTs, true } +func blockEventIdentifier(event commonEvent.BlockEvent) BlockEventIdentifier { + return BlockEventIdentifier{ + CommitTs: event.GetCommitTs(), + IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, + } +} + +func compareBlockEventIdentifier(a, b BlockEventIdentifier) int { + if a.CommitTs < b.CommitTs { + return -1 + } + if a.CommitTs > b.CommitTs { + return 1 + } + if a.IsSyncPoint == b.IsSyncPoint { + return 0 + } + if !a.IsSyncPoint && b.IsSyncPoint { + return -1 + } + return 1 +} + type SchemaIDToDispatchers struct { mutex sync.RWMutex m map[int64]map[common.DispatcherID]interface{} diff --git a/downstreamadapter/dispatcher/helper_test.go b/downstreamadapter/dispatcher/helper_test.go new file mode 100644 index 0000000000..e811a41e19 --- /dev/null +++ b/downstreamadapter/dispatcher/helper_test.go @@ -0,0 +1,48 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "testing" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/stretchr/testify/require" +) + +func TestBlockEventStatusCompletedWatermark(t *testing.T) { + var status BlockEventStatus + ddl10 := &commonEvent.DDLEvent{FinishedTs: 10} + syncpoint10 := commonEvent.NewSyncPointEvent(common.NewDispatcherID(), 10, 1, 0) + ddl11 := &commonEvent.DDLEvent{FinishedTs: 11} + + status.recordCompleted(BlockEventIdentifier{CommitTs: 10, IsSyncPoint: false}) + require.True(t, status.isCompletedOrObsolete(ddl10)) + require.False(t, status.isCompletedOrObsolete(syncpoint10)) + require.False(t, status.isCompletedOrObsolete(ddl11)) + + status.recordCompleted(BlockEventIdentifier{CommitTs: 10, IsSyncPoint: true}) + require.True(t, status.isCompletedOrObsolete(ddl10)) + require.True(t, status.isCompletedOrObsolete(syncpoint10)) + require.False(t, status.isCompletedOrObsolete(ddl11)) +} + +func TestBlockEventStatusActionMatchesSyncPointFlag(t *testing.T) { + var status BlockEventStatus + status.setBlockEvent(&commonEvent.DDLEvent{FinishedTs: 10}, heartbeatpb.BlockStage_WAITING) + + require.True(t, status.actionMatchs(&heartbeatpb.DispatcherAction{CommitTs: 10})) + require.False(t, status.actionMatchs(&heartbeatpb.DispatcherAction{CommitTs: 10, IsSyncPoint: true})) +} diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 350cb0dae7..aa69d1119f 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -137,6 +137,12 @@ func (d *dispatcherStat) advanceEpochForReset(resetTs uint64) uint64 { currentState := d.loadCurrentEpochState() nextState := newDispatcherEpochState(currentState.epoch+1, 0, resetTs) if d.currentEpoch.CompareAndSwap(currentState, nextState) { + // The new epoch replays events from resetTs. Commit-ts based + // deduplication from the old epoch must not filter replayed DDL or + // SyncPoint events. + d.lastEventCommitTs.Store(resetTs) + d.gotDDLOnTs.Store(false) + d.gotSyncpointOnTS.Store(false) return nextState.epoch } } diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index f2d7b17635..9922784954 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/routing" + "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -552,6 +553,53 @@ func TestUpdateCommitTsStateByEvents(t *testing.T) { require.Equal(t, uint64(110), state.maxEventTs.Load()) } +func TestAdvanceEpochForResetClearsCommitTsFilter(t *testing.T) { + t.Parallel() + + dispatcherID := common.NewDispatcherID() + eventServiceID := node.ID("event-service-1") + mockDisp := newMockDispatcher(dispatcherID, 100) + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) (block bool) { + return len(events) > 0 + } + + stat := newDispatcherStatForTest(mockDisp, nil) + stat.currentEpoch.Store(newDispatcherEpochState(10, 3, stat.target.GetStartTs())) + stat.lastEventCommitTs.Store(220) + stat.gotDDLOnTs.Store(true) + stat.gotSyncpointOnTS.Store(true) + + epoch := stat.advanceEpochForReset(150) + require.Equal(t, uint64(11), epoch) + require.Equal(t, uint64(150), stat.lastEventCommitTs.Load()) + require.False(t, stat.gotDDLOnTs.Load()) + require.False(t, stat.gotSyncpointOnTS.Load()) + + handshake := commonEvent.NewHandshakeEvent(dispatcherID, 160, epoch, &common.TableInfo{}) + stat.handleHandshakeEvent(dispatcher.DispatcherEvent{ + From: &eventServiceID, + Event: &handshake, + }) + + ddl := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 180, + Seq: 2, + Epoch: epoch, + } + require.True(t, stat.handleSingleDataEvents([]dispatcher.DispatcherEvent{ + { + From: &eventServiceID, + Event: ddl, + }, + })) + require.Len(t, mockDisp.events, 1) + require.Same(t, ddl, mockDisp.events[0].Event) + require.Equal(t, uint64(180), stat.lastEventCommitTs.Load()) + require.True(t, stat.gotDDLOnTs.Load()) + require.False(t, stat.gotSyncpointOnTS.Load()) +} + func TestHandleSignalEvent(t *testing.T) { localServerID := node.ID("local-server") remoteServerID := node.ID("remote-server") @@ -897,6 +945,42 @@ func TestInitialLocalReadyCallbackIsOneShot(t *testing.T) { requireNoDispatcherRequest(t, mockEventCollector) } +func TestReleasePathFeedbackResetsCurrentEventService(t *testing.T) { + localServerID := node.ID("local-server") + dispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("release_path_test", common.DefaultKeyspaceName) + mockDisp := newMockDispatcher(dispatcherID, 10) + mockDisp.changefeedID = cfID + mockDisp.checkPointTs = 20 + mockEventCollector := newTestEventCollector(localServerID) + stat := newDispatcherStat(mockDisp, mockEventCollector, nil) + setSessionState(stat.session, localServerID, false, "") + mockEventCollector.dispatcherMap.Store(dispatcherID, stat) + mockEventCollector.changefeedMap.Store(cfID.ID(), newChangefeedStat(cfID)) + + released := false + feedback := dynstream.Feedback[common.GID, common.DispatcherID, *dispatcherStat]{ + Area: cfID.ID(), + Path: dispatcherID, + FeedbackType: dynstream.ReleasePath, + } + mockEventCollector.handleReleasePathFeedback(feedback, func(path common.DispatcherID) { + released = true + require.Equal(t, dispatcherID, path) + }, "DS") + + require.True(t, released) + cfStatValue, ok := mockEventCollector.changefeedMap.Load(cfID.ID()) + require.True(t, ok) + require.Equal(t, uint32(1), cfStatValue.(*changefeedStat).memoryReleaseCount.Load()) + requireDispatcherRequests( + t, + readDispatcherRequests(t, mockEventCollector, 1), + dispatcherRequestRecord{to: localServerID, action: eventpb.ActionType_ACTION_TYPE_RESET}, + ) + requireNoDispatcherRequest(t, mockEventCollector) +} + func TestIsFromCurrentEpoch(t *testing.T) { t.Parallel() diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6456f1ada2..7319279a54 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -448,24 +448,38 @@ func (c *EventCollector) processDSFeedback(ctx context.Context) error { return context.Cause(ctx) case feedback := <-c.ds.Feedback(): if feedback.FeedbackType == dynstream.ReleasePath { - if v, ok := c.changefeedMap.Load(feedback.Area); ok { - v.(*changefeedStat).memoryReleaseCount.Add(1) - } - log.Info("release dispatcher memory in DS", zap.Any("dispatcherID", feedback.Path)) - c.ds.Release(feedback.Path) + c.handleReleasePathFeedback(feedback, c.ds.Release, "DS") } case feedback := <-c.redoDs.Feedback(): if feedback.FeedbackType == dynstream.ReleasePath { - if v, ok := c.changefeedMap.Load(feedback.Area); ok { - v.(*changefeedStat).memoryReleaseCount.Add(1) - } - log.Info("release dispatcher memory in redo DS", zap.Any("dispatcherID", feedback.Path)) - c.redoDs.Release(feedback.Path) + c.handleReleasePathFeedback(feedback, c.redoDs.Release, "redo DS") } } } } +func (c *EventCollector) handleReleasePathFeedback( + feedback dynstream.Feedback[common.GID, common.DispatcherID, *dispatcherStat], + release func(common.DispatcherID), + streamName string, +) { + if v, ok := c.changefeedMap.Load(feedback.Area); ok { + v.(*changefeedStat).memoryReleaseCount.Add(1) + } + log.Info("release dispatcher memory in "+streamName, zap.Any("dispatcherID", feedback.Path)) + release(feedback.Path) + + stat := c.getDispatcherStatByID(feedback.Path) + if stat == nil { + return + } + log.Info("reset dispatcher after releasing queued events", + zap.Stringer("changefeedID", stat.target.GetChangefeedID()), + zap.Stringer("dispatcherID", feedback.Path), + zap.String("stream", streamName)) + stat.session.resetCurrentEventService() +} + func (c *EventCollector) sendDispatcherRequests(ctx context.Context) error { for { select { diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index b082ccfc13..d4a948e870 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -229,9 +229,11 @@ func (s *eventScanner) scanAndMergeEvents( if err != nil { return false, err } - // table is deleted, still append remaining DDL event and resolved event. + // The table has been deleted, so the current raw event cannot be + // decoded as DML. Resolve to its commit ts to skip it; resolving to + // rawEvent.CRTs-1 can equal the scan start and cause a no-progress loop. if tableInfo == nil { - err = finalizeScan(merger, processor, session, rawEvent.CRTs-1) + err = finalizeScan(merger, processor, session, rawEvent.CRTs) return false, err } diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 7a76adeae9..bbf1cbdeb9 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -449,6 +449,7 @@ func TestEventScannerWithDeleteTable(t *testing.T) { dml0 := kvEvents[0] dml1 := kvEvents[1] dml2 := kvEvents[2] + dml3 := kvEvents[3] mockSchemaStore.DeleteTable(tableID, dml2.CRTs) disp.receivedResolvedTs.Store(resolvedTs) ok, dataRange := broker.getScanTaskDataRange(disp) @@ -480,10 +481,12 @@ func TestEventScannerWithDeleteTable(t *testing.T) { require.Equal(t, batchDML1.DMLEvents[0].GetCommitTs(), dml1.CRTs) require.Equal(t, batchDML1.DMLEvents[1].GetCommitTs(), dml2.CRTs) - // resolvedTs + // resolvedTs skips the first raw event after the table is deleted, so the + // next scan range will not keep seeing the same deleted-table event. e = events[3] require.Equal(t, e.GetType(), event.TypeResolvedEvent) - require.Equal(t, dml2.CRTs, e.GetCommitTs()) + require.Equal(t, dml3.CRTs, e.GetCommitTs()) + require.Greater(t, e.GetCommitTs(), dml2.CRTs) } // TestEventScannerWithDDL tests cases where scanning is interrupted at DDL events @@ -1567,6 +1570,66 @@ func TestScanAndMergeEventsSingleUKUpdate(t *testing.T) { require.True(t, sess.scannedBytes > 0) // Some bytes were processed } +func TestScanAndMergeEventsSkipsDeletedTableTxn(t *testing.T) { + helper := event.NewEventTestHelper(t) + defer helper.Close() + + ddlEvent, kvEvents := genEvents(helper, + `create table test.t_deleted(id int primary key, c char(50))`, + `insert into test.t_deleted(id,c) values (1, "c1")`) + require.Len(t, kvEvents, 1) + rawEvent := kvEvents[0] + tableID := ddlEvent.GetTableID() + + schemaStore := &schemaStoreWithErr{ + mockSchemaStore: NewMockSchemaStore(), + getTableInfoError: &schemastore.TableDeletedError{}, + } + scanner := &eventScanner{ + mounter: &mockMounter{}, + schemaGetter: schemaStore, + } + + disInfo := newMockDispatcherInfoForTest(t) + disInfo.span.TableID = tableID + dispatcherID := common.NewDispatcherID() + disp := &dispatcherStat{ + info: disInfo, + id: dispatcherID, + isRemoved: atomic.Bool{}, + } + + dataRange := common.DataRange{ + Span: &heartbeatpb.TableSpan{ + TableID: tableID, + }, + CommitTsStart: rawEvent.CRTs - 1, + CommitTsEnd: rawEvent.CRTs + 100, + } + sess := &session{ + ctx: context.Background(), + dispatcherStat: disp, + dataRange: dataRange, + startTime: time.Now(), + events: make([]event.Event, 0), + } + merger := newEventMerger(nil) + + isInterrupted, err := scanner.scanAndMergeEvents(sess, merger, &mockEventIterator{ + events: []*common.RawKVEntry{rawEvent}, + }) + require.NoError(t, err) + require.False(t, isInterrupted) + require.Zero(t, sess.dmlCount) + require.Len(t, sess.events, 1) + + resolvedEvent, ok := sess.events[0].(event.ResolvedEvent) + require.True(t, ok) + require.Equal(t, dispatcherID, resolvedEvent.DispatcherID) + require.Equal(t, rawEvent.CRTs, resolvedEvent.ResolvedTs) + require.Greater(t, resolvedEvent.ResolvedTs, dataRange.CommitTsStart) +} + type schemaStoreWithErr struct { *mockSchemaStore getTableInfoError error From ee536334829a2b19b46aa1c5cada099d9a78ed84 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 18 Jun 2026 14:52:02 +0800 Subject: [PATCH 2/3] downstreamadapter: simplify block event status --- downstreamadapter/dispatcher/basic_dispatcher.go | 10 ++-------- downstreamadapter/dispatcher/helper.go | 8 +++----- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index eaa01fd3ec..a33ede5e2e 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -1112,10 +1112,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { needsMaintainerACK := !shouldBlock && d.IsTableTriggerDispatcher() && needsScheduleStatus needsAddTableCheckpointBlocker := !shouldBlock && d.IsTableTriggerDispatcher() && hasNeedAddedTables - identifier := BlockEventIdentifier{ - CommitTs: event.GetCommitTs(), - IsSyncPoint: false, - } + identifier := blockEventIdentifier(event) if needsMaintainerACK { // Register maintainer-visible DDLs before submitting downstream IO so // following DB/All DDLs cannot pass this pending schedule update. @@ -1292,10 +1289,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block d.pendingACKCount.Add(1) } d.blockEventStatus.setBlockEvent(event, heartbeatpb.BlockStage_WAITING) - identifier := BlockEventIdentifier{ - CommitTs: event.GetCommitTs(), - IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, - } + identifier := blockEventIdentifier(event) // WAITING retries reuse this protobuf object, so clone mutable metadata once // here and keep resend on the same immutable payload. status := &heartbeatpb.TableSpanBlockStatus{ diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index 1886fd8b9f..b87537b43e 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -162,7 +162,6 @@ type BlockEventStatus struct { blockStage heartbeatpb.BlockStage blockCommitTs uint64 completed BlockEventIdentifier - hasCompleted bool } func (b *BlockEventStatus) clear() { @@ -187,7 +186,7 @@ func (b *BlockEventStatus) isCompletedOrObsolete(event commonEvent.BlockEvent) b b.mutex.Lock() defer b.mutex.Unlock() - if !b.hasCompleted { + if b.completed.CommitTs == 0 { return false } return compareBlockEventIdentifier(blockEventIdentifier(event), b.completed) <= 0 @@ -197,16 +196,15 @@ func (b *BlockEventStatus) isDMLCompletedOrObsolete(commitTs uint64) bool { b.mutex.Lock() defer b.mutex.Unlock() - return b.hasCompleted && commitTs <= b.completed.CommitTs + return b.completed.CommitTs != 0 && commitTs <= b.completed.CommitTs } func (b *BlockEventStatus) recordCompleted(identifier BlockEventIdentifier) { b.mutex.Lock() defer b.mutex.Unlock() - if !b.hasCompleted || compareBlockEventIdentifier(identifier, b.completed) > 0 { + if b.completed.CommitTs == 0 || compareBlockEventIdentifier(identifier, b.completed) > 0 { b.completed = identifier - b.hasCompleted = true } } From 024a92e5949b3c589875c77ba032fb8364a849c2 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 24 Jun 2026 09:24:43 +0800 Subject: [PATCH 3/3] eventservice: split deleted table scan fix --- pkg/eventservice/event_scanner.go | 6 +-- pkg/eventservice/event_scanner_test.go | 67 +------------------------- 2 files changed, 4 insertions(+), 69 deletions(-) diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index d4a948e870..b082ccfc13 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -229,11 +229,9 @@ func (s *eventScanner) scanAndMergeEvents( if err != nil { return false, err } - // The table has been deleted, so the current raw event cannot be - // decoded as DML. Resolve to its commit ts to skip it; resolving to - // rawEvent.CRTs-1 can equal the scan start and cause a no-progress loop. + // table is deleted, still append remaining DDL event and resolved event. if tableInfo == nil { - err = finalizeScan(merger, processor, session, rawEvent.CRTs) + err = finalizeScan(merger, processor, session, rawEvent.CRTs-1) return false, err } diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index bbf1cbdeb9..7a76adeae9 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -449,7 +449,6 @@ func TestEventScannerWithDeleteTable(t *testing.T) { dml0 := kvEvents[0] dml1 := kvEvents[1] dml2 := kvEvents[2] - dml3 := kvEvents[3] mockSchemaStore.DeleteTable(tableID, dml2.CRTs) disp.receivedResolvedTs.Store(resolvedTs) ok, dataRange := broker.getScanTaskDataRange(disp) @@ -481,12 +480,10 @@ func TestEventScannerWithDeleteTable(t *testing.T) { require.Equal(t, batchDML1.DMLEvents[0].GetCommitTs(), dml1.CRTs) require.Equal(t, batchDML1.DMLEvents[1].GetCommitTs(), dml2.CRTs) - // resolvedTs skips the first raw event after the table is deleted, so the - // next scan range will not keep seeing the same deleted-table event. + // resolvedTs e = events[3] require.Equal(t, e.GetType(), event.TypeResolvedEvent) - require.Equal(t, dml3.CRTs, e.GetCommitTs()) - require.Greater(t, e.GetCommitTs(), dml2.CRTs) + require.Equal(t, dml2.CRTs, e.GetCommitTs()) } // TestEventScannerWithDDL tests cases where scanning is interrupted at DDL events @@ -1570,66 +1567,6 @@ func TestScanAndMergeEventsSingleUKUpdate(t *testing.T) { require.True(t, sess.scannedBytes > 0) // Some bytes were processed } -func TestScanAndMergeEventsSkipsDeletedTableTxn(t *testing.T) { - helper := event.NewEventTestHelper(t) - defer helper.Close() - - ddlEvent, kvEvents := genEvents(helper, - `create table test.t_deleted(id int primary key, c char(50))`, - `insert into test.t_deleted(id,c) values (1, "c1")`) - require.Len(t, kvEvents, 1) - rawEvent := kvEvents[0] - tableID := ddlEvent.GetTableID() - - schemaStore := &schemaStoreWithErr{ - mockSchemaStore: NewMockSchemaStore(), - getTableInfoError: &schemastore.TableDeletedError{}, - } - scanner := &eventScanner{ - mounter: &mockMounter{}, - schemaGetter: schemaStore, - } - - disInfo := newMockDispatcherInfoForTest(t) - disInfo.span.TableID = tableID - dispatcherID := common.NewDispatcherID() - disp := &dispatcherStat{ - info: disInfo, - id: dispatcherID, - isRemoved: atomic.Bool{}, - } - - dataRange := common.DataRange{ - Span: &heartbeatpb.TableSpan{ - TableID: tableID, - }, - CommitTsStart: rawEvent.CRTs - 1, - CommitTsEnd: rawEvent.CRTs + 100, - } - sess := &session{ - ctx: context.Background(), - dispatcherStat: disp, - dataRange: dataRange, - startTime: time.Now(), - events: make([]event.Event, 0), - } - merger := newEventMerger(nil) - - isInterrupted, err := scanner.scanAndMergeEvents(sess, merger, &mockEventIterator{ - events: []*common.RawKVEntry{rawEvent}, - }) - require.NoError(t, err) - require.False(t, isInterrupted) - require.Zero(t, sess.dmlCount) - require.Len(t, sess.events, 1) - - resolvedEvent, ok := sess.events[0].(event.ResolvedEvent) - require.True(t, ok) - require.Equal(t, dispatcherID, resolvedEvent.DispatcherID) - require.Equal(t, rawEvent.CRTs, resolvedEvent.ResolvedTs) - require.Greater(t, resolvedEvent.ResolvedTs, dataRange.CommitTsStart) -} - type schemaStoreWithErr struct { *mockSchemaStore getTableInfoError error