From 632330358254d147d548d568711c62a9ee509e4d Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 24 Apr 2026 17:23:58 +0800 Subject: [PATCH] maintainer: clean skipped syncpoint barrier events Reset syncpoint WAITING bookkeeping when a skipped syncpoint is shortcut into the pass phase so stale barrier coverage does not linger. Reconcile forwarded dispatchers for skipped syncpoints during resend and add focused tests for the shortcut and cleanup paths. --- maintainer/barrier_event.go | 191 +++++++++++++++++++++++++------ maintainer/barrier_event_test.go | 75 ++++++++++++ maintainer/barrier_test.go | 97 ++++++++++++++++ 3 files changed, 331 insertions(+), 32 deletions(-) diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index cf81e05a5e..6c4f12b9a5 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -73,7 +73,8 @@ type BarrierEvent struct { rangeChecker range_checker.RangeChecker lastResendTime time.Time - lastWarningLogTime time.Time + lastWarningLogTime time.Time + lastForwardReconcileTime time.Time } func NewBlockEvent(cfID common.ChangeFeedID, @@ -106,7 +107,8 @@ func NewBlockEvent(cfID common.ChangeFeedID, reportedDispatchers: make(map[common.DispatcherID]struct{}), lastResendTime: time.Time{}, - lastWarningLogTime: time.Now(), + lastWarningLogTime: time.Now(), + lastForwardReconcileTime: time.Time{}, } if status.BlockTables != nil { @@ -172,6 +174,92 @@ func (be *BarrierEvent) createRangeCheckerForTypeDB() { log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs), zap.Int64("mode", be.mode)) } +func (be *BarrierEvent) ensureRangeCheckerForCurrentTasks() { + if be.rangeChecker != nil { + return + } + + switch be.blockedDispatchers.InfluenceType { + case heartbeatpb.InfluenceType_Normal: + if be.dynamicSplitEnabled { + be.rangeChecker = range_checker.NewTableSpanRangeChecker(be.spanController.GetkeyspaceID(), be.blockedDispatchers.TableIDs) + } else { + be.rangeChecker = range_checker.NewTableCountChecker(be.blockedDispatchers.TableIDs) + } + case heartbeatpb.InfluenceType_DB: + be.createRangeCheckerForTypeDB() + case heartbeatpb.InfluenceType_All: + be.createRangeCheckerForTypeAll() + } +} + +// shortcutSyncPointToPassPhase switches a syncpoint from the WAITING coverage phase +// to the PASS/DONE coverage phase. The first phase tracks who has reached the barrier, +// while the second phase tracks who has finished it. We must reset the first-phase +// bookkeeping before reconciling forward progress, otherwise stale WAITING reports may +// be mistaken for finished dispatchers and keep the event in an inconsistent state. +func (be *BarrierEvent) shortcutSyncPointToPassPhase() { + be.ensureRangeCheckerForCurrentTasks() + if be.rangeChecker != nil { + be.rangeChecker.Reset() + } + be.reportedDispatchers = make(map[common.DispatcherID]struct{}) + be.selected.Store(true) + be.writerDispatcherAdvanced = true + be.reconcileForwardedDispatchers() + be.lastForwardReconcileTime = time.Now() +} + +func (be *BarrierEvent) reconcileForwardedDispatchers() { + for _, replication := range be.collectRelevantReplications() { + if forwardBarrierEvent(replication, be) { + be.markDispatcherEventDone(replication.ID) + } + } +} + +func (be *BarrierEvent) collectRelevantReplications() []*replica.SpanReplication { + switch be.blockedDispatchers.InfluenceType { + case heartbeatpb.InfluenceType_DB: + replications := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) + ddlDispatcher := be.spanController.GetDDLDispatcher() + if ddlDispatcher != nil { + replications = append(replications, ddlDispatcher) + } + return dedupReplications(replications) + case heartbeatpb.InfluenceType_All: + return be.spanController.GetAllTasks() + case heartbeatpb.InfluenceType_Normal: + replications := make([]*replica.SpanReplication, 0) + for _, tableID := range be.blockedDispatchers.TableIDs { + replications = append(replications, be.spanController.GetTasksByTableID(tableID)...) + } + return dedupReplications(replications) + default: + return nil + } +} + +func dedupReplications(replications []*replica.SpanReplication) []*replica.SpanReplication { + if len(replications) <= 1 { + return replications + } + + seen := make(map[common.DispatcherID]struct{}, len(replications)) + result := make([]*replica.SpanReplication, 0, len(replications)) + for _, replication := range replications { + if replication == nil { + continue + } + if _, ok := seen[replication.ID]; ok { + continue + } + seen[replication.ID] = struct{}{} + result = append(result, replication) + } + return result +} + func (be *BarrierEvent) checkEventAction(dispatcherID common.DispatcherID) (*heartbeatpb.DispatcherStatus, node.ID) { if !be.allDispatcherReported() { return nil, "" @@ -498,37 +586,61 @@ func (be *BarrierEvent) checkBlockedDispatchers() { replications := be.spanController.GetTasksByTableID(tableId) for _, replication := range replications { if forwardBarrierEvent(replication, be) { + if be.isSyncPoint { + be.shortcutSyncPointToPassPhase() + log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Int64("tableId", tableId), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } else { + // one related table has forward checkpointTs, means the block event can be advanced + be.selected.Store(true) + be.writerDispatcherAdvanced = true + log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Int64("tableId", tableId), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } + return + } + } + } + case heartbeatpb.InfluenceType_DB: + schemaID := be.blockedDispatchers.SchemaID + replications := be.spanController.GetTasksBySchemaID(schemaID) + for _, replication := range replications { + if forwardBarrierEvent(replication, be) { + if be.isSyncPoint { + be.shortcutSyncPointToPassPhase() + log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Int64("schemaID", schemaID), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } else { // one related table has forward checkpointTs, means the block event can be advanced be.selected.Store(true) be.writerDispatcherAdvanced = true log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", zap.String("changefeed", be.cfID.Name()), zap.Uint64("commitTs", be.commitTs), - zap.Int64("tableId", tableId), + zap.Int64("schemaID", schemaID), zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), zap.String("dispatcher", replication.ID.String()), zap.Int64("mode", be.mode), ) - return } - } - } - case heartbeatpb.InfluenceType_DB: - schemaID := be.blockedDispatchers.SchemaID - replications := be.spanController.GetTasksBySchemaID(schemaID) - for _, replication := range replications { - if forwardBarrierEvent(replication, be) { - // one related table has forward checkpointTs, means the block event can be advanced - be.selected.Store(true) - be.writerDispatcherAdvanced = true - log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", - zap.String("changefeed", be.cfID.Name()), - zap.Uint64("commitTs", be.commitTs), - zap.Int64("schemaID", schemaID), - zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), - zap.String("dispatcher", replication.ID.String()), - zap.Int64("mode", be.mode), - ) return } } @@ -536,16 +648,27 @@ func (be *BarrierEvent) checkBlockedDispatchers() { replications := be.spanController.GetAllTasks() for _, replication := range replications { if forwardBarrierEvent(replication, be) { - // one related table has forward checkpointTs, means the block event can be advanced - be.selected.Store(true) - be.writerDispatcherAdvanced = true - log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", - zap.String("changefeed", be.cfID.Name()), - zap.Uint64("commitTs", be.commitTs), - zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), - zap.String("dispatcher", replication.ID.String()), - zap.Int64("mode", be.mode), - ) + if be.isSyncPoint { + be.shortcutSyncPointToPassPhase() + log.Info("one related dispatcher has forward checkpointTs, shortcut syncpoint to pass phase", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } else { + // one related table has forward checkpointTs, means the block event can be advanced + be.selected.Store(true) + be.writerDispatcherAdvanced = true + log.Info("one related dispatcher has forward checkpointTs, means the block event can be advanced", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Uint64("checkpointTs", replication.GetStatus().CheckpointTs), + zap.String("dispatcher", replication.ID.String()), + zap.Int64("mode", be.mode), + ) + } return } } @@ -675,6 +798,10 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { msgs = []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID(), mode)} } else { + if be.isSyncPoint && time.Since(be.lastForwardReconcileTime) > time.Second*10 { + be.reconcileForwardedDispatchers() + be.lastForwardReconcileTime = time.Now() + } // the writer dispatcher is advanced, resend pass action return be.sendPassAction(mode) } diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index 79d3faae62..18b7bafcab 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -188,6 +188,81 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) } +func TestShortcutSyncPointToPassPhaseResetsWaitingCoverage(t *testing.T) { + testutil.SetUpTestServices(t) + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + + dispatcher1 := spanController.GetTasksByTableID(1)[0] + dispatcher2 := spanController.GetTasksByTableID(2)[0] + for _, dispatcher := range []*replica.SpanReplication{dispatcher1, dispatcher2} { + spanController.BindSpanToNode("", "node1", dispatcher) + spanController.MarkSpanReplicating(dispatcher) + } + + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_All, + }, + IsSyncPoint: true, + }, false, common.DefaultMode) + + // First-phase bookkeeping tracks who has reached WAITING. These entries must be + // discarded once the syncpoint is shortcut directly to the PASS/DONE phase. + event.markDispatcherEventDone(dispatcher1.ID) + event.markDispatcherEventDone(spanController.GetDDLDispatcherID()) + require.Contains(t, event.reportedDispatchers, dispatcher1.ID) + require.Contains(t, event.reportedDispatchers, spanController.GetDDLDispatcherID()) + require.False(t, event.rangeChecker.IsFullyCovered()) + + dispatcher2.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher2.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + + event.checkBlockedDispatchers() + require.True(t, event.selected.Load()) + require.True(t, event.writerDispatcherAdvanced) + require.True(t, event.writerDispatcher.IsZero()) + require.Contains(t, event.reportedDispatchers, dispatcher2.ID) + require.NotContains(t, event.reportedDispatchers, dispatcher1.ID) + require.NotContains(t, event.reportedDispatchers, spanController.GetDDLDispatcherID()) + require.False(t, event.rangeChecker.IsFullyCovered()) + require.False(t, event.allDispatcherReported()) + + dispatcher1.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher1.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + ddlSpan.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: spanController.GetDDLDispatcherID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + + event.reconcileForwardedDispatchers() + require.True(t, event.allDispatcherReported()) +} + func TestSendPassActionTypeDBIncludesWriterNode(t *testing.T) { testutil.SetUpTestServices(t) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) diff --git a/maintainer/barrier_test.go b/maintainer/barrier_test.go index 5e982762a5..12f35a7475 100644 --- a/maintainer/barrier_test.go +++ b/maintainer/barrier_test.go @@ -1169,6 +1169,103 @@ func TestSyncPointBlockPerf(t *testing.T) { log.Info("duration", zap.Duration("duration", time.Since(now))) } +func TestSkippedSyncPointEventIsRemovedByReconcile(t *testing.T) { + testutil.SetUpTestServices(t) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + + dispatcher1 := spanController.GetTasksByTableID(1)[0] + dispatcher2 := spanController.GetTasksByTableID(2)[0] + for _, dispatcher := range []*replica.SpanReplication{dispatcher1, dispatcher2} { + spanController.BindSpanToNode("", "node1", dispatcher) + spanController.MarkSpanReplicating(dispatcher) + } + + barrier := NewBarrier(spanController, operatorController, false, nil, common.DefaultMode) + _ = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: spanController.GetDDLDispatcherID().ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_All, + }, + IsSyncPoint: true, + }, + }, + { + ID: dispatcher1.ID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_All, + }, + IsSyncPoint: true, + }, + }, + }, + }) + + key := getEventKey(10, true) + event, ok := barrier.blockedEvents.Get(key) + require.True(t, ok) + require.False(t, event.selected.Load()) + + // dispatcher2 recreates/skips this syncpoint and moves directly beyond it. The + // maintainer should switch the event to the PASS/DONE phase without selecting a writer. + dispatcher2.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher2.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + + _ = barrier.Resend() + event, ok = barrier.blockedEvents.Get(key) + require.True(t, ok) + require.True(t, event.selected.Load()) + require.True(t, event.writerDispatcherAdvanced) + require.True(t, event.writerDispatcher.IsZero()) + + // The remaining dispatchers eventually move beyond the skipped syncpoint as well. + dispatcher1.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: dispatcher1.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + ddlSpan.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: spanController.GetDDLDispatcherID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 11, + Mode: common.DefaultMode, + }) + event.lastForwardReconcileTime = time.Now().Add(-11 * time.Second) + + _ = barrier.Resend() + _, ok = barrier.blockedEvents.Get(key) + require.False(t, ok) +} + // TestBarrierEventWithDispatcherReallocation tests the barrier's behavior when dispatchers are reallocated // during a blocking event. The test verifies that: // 1. When dispatchers are removed and new ones are created to replace them