diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index efb768404c..d4781a527b 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -353,6 +353,9 @@ func (m *Maintainer) HandleEvent(event *Event) bool { } func (m *Maintainer) checkNodeChanged() { + if m.removing.Load() { + return + } m.nodeChanged.Lock() defer m.nodeChanged.Unlock() if m.nodeChanged.changed { @@ -549,6 +552,13 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) { m.removing.Store(true) m.cascadeRemoving.Store(cascade) m.changefeedRemoved.Store(changefeedRemoved) + // Freeze ordinary scheduling on the old maintainer before we start the close flow. + // Only the DDL trigger close operator is allowed to keep running. + allowedDispatcherIDs := []common.DispatcherID{m.ddlSpan.ID} + if m.enableRedo { + allowedDispatcherIDs = append(allowedDispatcherIDs, m.redoDDLSpan.ID) + } + m.controller.EnterRemovingMode(allowedDispatcherIDs...) closed := m.tryCloseChangefeed() if closed { m.removed.Store(true) @@ -899,6 +909,14 @@ func (m *Maintainer) onHeartbeatRequest(msg *messaging.TargetMessage) { // Process operator status updates AFTER checkpointTsByCapture is updated // This ensures when operators complete, checkpointTsByCapture already contains the complete heartbeat // Works with calCheckpointTs constraint ordering to prevent checkpoint advancing past new dispatcher startTs + if m.removing.Load() { + // Once RemoveMaintainer starts, we still need status updates for the close flow itself + // (for example DDL-trigger close operators reaching terminal states), but we must not run + // failover self-healing. A late Stopped/Working heartbeat from a closing dispatcher manager + // would otherwise mark spans absent or remove/recreate dispatchers after shutdown has begun. + m.controller.handleStatus(msg.From, req.Statuses, false) + return + } m.controller.HandleStatus(msg.From, req.Statuses) } @@ -915,7 +933,7 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) { func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) { // the barrier is not initialized - if !m.initialized.Load() { + if !m.initialized.Load() || m.removing.Load() { return } req := msg.Message[0].(*heartbeatpb.BlockStatusRequest) @@ -1049,8 +1067,12 @@ func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeat func (m *Maintainer) handleResendMessage() { // resend closing message - if m.removing.Load() && m.cascadeRemoving.Load() { - m.trySendMaintainerCloseRequestToAllNode() + if m.removing.Load() { + // After RemoveMaintainer starts, the old maintainer must stop resending bootstrap/barrier + // traffic. Otherwise stale control-plane messages can race with the new maintainer. + if m.cascadeRemoving.Load() { + m.trySendMaintainerCloseRequestToAllNode() + } return } // resend bootstrap message diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 5cee76ba9c..7cd830f1f6 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -157,8 +157,12 @@ func NewController(changefeedID common.ChangeFeedID, return controller } -// HandleStatus handle the status report from the node +// HandleStatus handles the status report from the node. func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) { + c.handleStatus(from, statusList, true) +} + +func (c *Controller) handleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus, allowSelfHealing bool) { // HandleStatus reconciles runtime dispatcher reports with maintainer-side state. // // In the steady state, spanController (desired tasks), operatorController (in-flight scheduling), @@ -170,6 +174,10 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS // The rules below make the system converge: // 1) Orphan Working dispatcher without an operator => actively remove it to avoid leaks. // 2) Non-working dispatcher without an operator => mark the span absent so scheduler can recreate it. + // + // During maintainer removal we still need status bookkeeping so close/remove can observe terminal + // states, but we must disable the self-healing branches. Otherwise a late Stopped/Working heartbeat + // can recreate dispatchers for a changefeed that is already shutting down. for _, status := range statusList { dispatcherID := common.NewDispatcherIDFromPB(status.ID) operatorController := c.getOperatorController(status.Mode) @@ -178,6 +186,9 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS operatorController.UpdateOperatorStatus(dispatcherID, from, status) stm := spanController.GetTaskByID(dispatcherID) if stm == nil { + if !allowSelfHealing { + continue + } // If maintainer doesn't know this dispatcherID, most statuses are late/outdated and can be ignored. // We only need to act when the runtime says the dispatcher is Working, because that implies there's // still an active dispatcher consuming resources and potentially producing output. @@ -207,6 +218,47 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS continue } spanController.UpdateStatus(stm, status) +<<<<<<< HEAD +======= + + if !allowSelfHealing { + continue + } + + // Fallback: dispatcher becomes non-working without an operator. + // + // In normal scheduling flow, a dispatcher should transition to Stopped/Removed as part of a maintainer + // operator (Remove/Move/Split...). However, after maintainer failover we can lose operatorController state + // while dispatcher managers keep executing the already-issued requests. + // + // A real example is a "remove request in transit" during bootstrap: + // - Old maintainer sends a Remove (e.g. the remove-origin phase of Move), but the request hasn't reached + // dispatcher manager yet. + // - New maintainer bootstraps from dispatcher manager snapshots and sees the dispatcher as Working, with + // no in-flight operator reported in bootstrap response. + // - After bootstrap, the in-transit Remove arrives, the dispatcher is removed, and the new maintainer + // observes a terminal status without a corresponding operator. + // + // In these cases we'd observe a non-working status but have no operator to drive the follow-up + // rescheduling, so we mark the span absent to let the scheduler recreate it. + // + // Safety against message reordering/resend: + // - We only reach here when stm != nil and stm.GetNodeID() == from (checked above). If the span was already + // rebound to a different node, we skip it, so late statuses from the old node won't trigger rescheduling. + // - MarkSpanAbsent is idempotent and only affects the scheduler state, so even if we get duplicate terminal + // statuses, the worst case is an extra no-op absent mark. + if status.ComponentStatus == heartbeatpb.ComponentState_Stopped || + status.ComponentStatus == heartbeatpb.ComponentState_Removed { + if op := operatorController.GetOperator(dispatcherID); op == nil { + log.Warn("dispatcher becomes non-working without operator, mark span absent for rescheduling", + zap.String("changefeed", c.changefeedID.Name()), + zap.String("from", from.String()), + zap.String("dispatcherID", dispatcherID.String()), + zap.Any("status", status)) + spanController.MarkSpanAbsent(stm) + } + } +>>>>>>> 776315e72 (maintainer: quiesce control plane during remove handoff (#4828)) } } @@ -241,6 +293,15 @@ func (c *Controller) RemoveNode(id node.ID) { c.operatorController.OnNodeRemoved(id) } +// EnterRemovingMode freezes normal scheduling on the old maintainer while keeping the +// DDL trigger dispatcher close path alive. +func (c *Controller) EnterRemovingMode(allowedDispatcherIDs ...common.DispatcherID) { + c.operatorController.QuiesceExcept(allowedDispatcherIDs...) + if c.redoOperatorController != nil { + c.redoOperatorController.QuiesceExcept(allowedDispatcherIDs...) + } +} + func (c *Controller) GetMinRedoCheckpointTs(minCheckpointTs uint64) uint64 { minCheckpointTsForOperator := c.redoOperatorController.GetMinCheckpointTs(minCheckpointTs) minCheckpointTsForSpan := c.redoSpanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs) diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 1df5a11e28..887e205312 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -400,6 +400,159 @@ func TestMaintainer_GetMaintainerStatusUsesCommittedCheckpoint(t *testing.T) { require.Equal(t, uint64(50), status.LastSyncedTs) } +func TestMaintainerHeartbeatDuringRemovingSkipsFailoverRecovery(t *testing.T) { + buildMaintainer := func(t *testing.T) (*Maintainer, *replica.SpanReplication, node.ID) { + t.Helper() + testutil.SetUpTestServices(t) + + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + captureID := node.ID("node1") + nodeManager.GetAliveNodes()[captureID] = &node.Info{ID: captureID} + + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlDispatcherID := common.NewDispatcherID() + ddlSpan := replica.NewWorkingSpanReplication(cfID, ddlDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: ddlDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, captureID, false) + refresher := replica.NewRegionCountRefresher(cfID, time.Minute) + controller := NewController(cfID, 10, &mockThreadPool{}, + config.GetDefaultReplicaConfig(), ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false, testBalanceMoveBatchSize) + + totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1) + dispatcherID := common.NewDispatcherID() + workingSpan := replica.NewWorkingSpanReplication(cfID, dispatcherID, + 1, + &heartbeatpb.TableSpan{ + TableID: totalSpan.TableID, + StartKey: totalSpan.StartKey, + EndKey: totalSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + }, &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, captureID, false) + controller.spanController.AddReplicatingSpan(workingSpan) + + m := &Maintainer{ + changefeedID: cfID, + controller: controller, + checkpointTsByCapture: newWatermarkCaptureMap(), + redoTsByCapture: newWatermarkCaptureMap(), + statusChanged: atomic.NewBool(false), + } + m.watermark.Watermark = &heartbeatpb.Watermark{} + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) + m.initialized.Store(true) + return m, workingSpan, captureID + } + + makeHeartbeat := func(dispatcherID common.DispatcherID, from node.ID) *messaging.TargetMessage { + req := &heartbeatpb.HeartBeatRequest{ + Watermark: &heartbeatpb.Watermark{ + CheckpointTs: 20, + ResolvedTs: 20, + }, + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Stopped, + CheckpointTs: 20, + Mode: common.DefaultMode, + }, + }, + } + return &messaging.TargetMessage{ + From: from, + Type: messaging.TypeHeartBeatRequest, + Message: []messaging.IOTypeT{req}, + } + } + + // Normal failover recovery should still mark a non-working span absent when the runtime + // reports Stopped but maintainer has no operator for it. + t.Run("normal maintainer still self heals", func(t *testing.T) { + m, workingSpan, captureID := buildMaintainer(t) + + m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID)) + + require.Equal(t, 1, m.controller.spanController.GetAbsentSize()) + require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus) + require.Equal(t, node.ID(""), workingSpan.GetNodeID()) + }) + + // When RemoveMaintainer has started, the same late Stopped heartbeat must only update runtime + // status bookkeeping. Re-marking the span absent here would let the scheduler recreate a + // dispatcher while the changefeed is shutting down. + t.Run("removing maintainer skips self healing", func(t *testing.T) { + m, workingSpan, captureID := buildMaintainer(t) + m.removing.Store(true) + + m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID)) + + require.Equal(t, 0, m.controller.spanController.GetAbsentSize()) + require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus) + require.Equal(t, captureID, workingSpan.GetNodeID()) + require.Zero(t, m.controller.operatorController.OperatorSize()) + }) +} + +func TestMaintainerRemovingSuppressesLegacyControlPlaneActions(t *testing.T) { + mockMC := messaging.NewMockMessageCenter() + nodeManager := watcher.NewNodeManager(nil, nil) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"} + + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + m := &Maintainer{ + changefeedID: cfID, + mc: mockMC, + nodeManager: nodeManager, + closedNodes: make(map[node.ID]struct{}), + statusChanged: atomic.NewBool(false), + postBootstrapMsg: &heartbeatpb.MaintainerPostBootstrapRequest{ + ChangefeedID: cfID.ToPB(), + }, + } + m.watermark.Watermark = &heartbeatpb.Watermark{CheckpointTs: 100, ResolvedTs: 100} + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) + m.initialized.Store(true) + m.removing.Store(true) + + // Removing maintainer must not keep resending bootstrap/post-bootstrap or barrier traffic. + // The only remaining control-plane action should be cascade close requests. + m.handleResendMessage() + require.Len(t, mockMC.GetMessageChannel(), 0) + + // Block status handling must also stop once removal starts, otherwise the old maintainer + // can still schedule DDL-driven add/remove operations after handoff begins. + m.onBlockStateRequest(&messaging.TargetMessage{ + From: "node1", + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{&heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + Mode: common.DefaultMode, + }}, + }) + require.Len(t, mockMC.GetMessageChannel(), 0) + + m.cascadeRemoving.Store(true) + m.handleResendMessage() + require.Len(t, mockMC.GetMessageChannel(), 2) + for i := 0; i < 2; i++ { + msg := <-mockMC.GetMessageChannel() + require.Equal(t, messaging.TypeMaintainerCloseRequest, msg.Type) + req := msg.Message[0].(*heartbeatpb.MaintainerCloseRequest) + require.Equal(t, cfID.ToPB(), req.ChangefeedID) + } +} + func TestMaintainerCalculateNewCheckpointTs(t *testing.T) { t.Run("uses reported checkpoint", func(t *testing.T) { m, selfNodeID := newMaintainerForCheckpointCalculationTest(t) @@ -428,6 +581,52 @@ func TestMaintainerCalculateNewCheckpointTs(t *testing.T) { require.False(t, canUpdate) require.Nil(t, newWatermark) }) + + t.Run("removing keeps blocked add operator checkpoint constraint", func(t *testing.T) { + // Scenario: the old maintainer enters removing mode while an Add operator is still in flight. + // Steps: report a higher capture watermark, quiesce all ordinary operators, and verify the + // checkpoint calculation remains capped at the in-flight add span's start checkpoint. + m, selfNodeID := newMaintainerForCheckpointCalculationTest(t) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()[selfNodeID] = &node.Info{ID: selfNodeID} + + dispatcherID := common.NewDispatcherID() + replicaSet := replica.NewWorkingSpanReplication( + m.changefeedID, + dispatcherID, + 1, + testutil.GetTableSpanByID(101), + &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, + "", + false, + ) + m.controller.spanController.AddAbsentReplicaSet(replicaSet) + require.True(t, m.controller.operatorController.AddOperator(operator.NewAddDispatcherOperator( + m.controller.spanController, + replicaSet, + selfNodeID, + heartbeatpb.OperatorType_O_Add, + ))) + + m.removing.Store(true) + m.controller.EnterRemovingMode(m.ddlSpan.ID) + m.checkpointTsByCapture.Set(selfNodeID, heartbeatpb.Watermark{ + CheckpointTs: 100, + ResolvedTs: 100, + }) + + newWatermark, canUpdate := m.calculateNewCheckpointTs() + + require.True(t, canUpdate) + require.NotNil(t, newWatermark) + require.Equal(t, uint64(10), newWatermark.CheckpointTs) + require.Equal(t, uint64(10), newWatermark.ResolvedTs) + }) } func TestMaintainerCalCheckpointTsSkipsInvalidGlobalCheckpoint(t *testing.T) { @@ -527,6 +726,7 @@ func newMaintainerForCheckpointCalculationTest(t testing.TB) (*Maintainer, node. changefeedID: cfID, selfNode: selfNode, controller: controller, + ddlSpan: ddlSpan, pdClock: pdutil.NewClock4Test(), bootstrapper: bootstrapper, checkpointTsByCapture: newWatermarkCaptureMap(), diff --git a/maintainer/operator/operator_controller.go b/maintainer/operator/operator_controller.go index 5e5ee1af79..f876b41dd6 100644 --- a/maintainer/operator/operator_controller.go +++ b/maintainer/operator/operator_controller.go @@ -53,10 +53,21 @@ type Controller struct { nodeManager *watcher.NodeManager splitter *split.Splitter + // admissionMu serializes removing-mode quiesce with normal operator side effects. + // A normal operator must hold the read side from its final allow check through + // Start or Schedule/SendCommand so it cannot cross the handoff boundary after + // QuiesceExcept has made the controller quiescing. + admissionMu sync.RWMutex mu sync.RWMutex // protect the following fields operators map[common.DispatcherID]*operator.OperatorWithTime[common.DispatcherID, *heartbeatpb.TableSpanStatus] runningQueue operator.OperatorQueue[common.DispatcherID, *heartbeatpb.TableSpanStatus] mode int64 + // quiescing freezes ordinary operators while the old maintainer is being removed. + // Only dispatcher IDs in allowedOperatorIDs may continue to run, which keeps the + // DDL trigger dispatcher close path alive without letting stale schedulers recreate + // ordinary table dispatchers during handoff. + quiescing bool + allowedOperatorIDs map[common.DispatcherID]struct{} // lastWarnTime tracks the last warning time for each operator to avoid spam logs lastWarnTime map[common.DispatcherID]time.Time } @@ -69,19 +80,62 @@ func NewOperatorController( mode int64, ) *Controller { return &Controller{ - changefeedID: changefeedID, - batchSize: batchSize, - operators: make(map[common.DispatcherID]*operator.OperatorWithTime[common.DispatcherID, *heartbeatpb.TableSpanStatus]), - runningQueue: make(operator.OperatorQueue[common.DispatcherID, *heartbeatpb.TableSpanStatus], 0), - role: "maintainer", - spanController: spanController, - nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), - messageCenter: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter), - mode: mode, - lastWarnTime: make(map[common.DispatcherID]time.Time), + changefeedID: changefeedID, + batchSize: batchSize, + operators: make(map[common.DispatcherID]*operator.OperatorWithTime[common.DispatcherID, *heartbeatpb.TableSpanStatus]), + runningQueue: make(operator.OperatorQueue[common.DispatcherID, *heartbeatpb.TableSpanStatus], 0), + role: "maintainer", + spanController: spanController, + nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), + messageCenter: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter), + mode: mode, + allowedOperatorIDs: make(map[common.DispatcherID]struct{}), + lastWarnTime: make(map[common.DispatcherID]time.Time), } } +// QuiesceExcept freezes the controller so only the listed dispatcher IDs remain active. +// +// This is used when a maintainer enters removing mode. The old maintainer must stop +// issuing or advancing ordinary table operators, but the DDL trigger dispatcher close +// operator still needs to complete. +func (oc *Controller) QuiesceExcept(ids ...common.DispatcherID) { + oc.admissionMu.Lock() + defer oc.admissionMu.Unlock() + + oc.mu.Lock() + defer oc.mu.Unlock() + + oc.quiescing = true + clear(oc.allowedOperatorIDs) + for _, id := range ids { + if id.IsZero() { + continue + } + oc.allowedOperatorIDs[id] = struct{}{} + } +} + +func (oc *Controller) isOperatorAllowedLocked(id common.DispatcherID) bool { + if !oc.quiescing { + return true + } + _, ok := oc.allowedOperatorIDs[id] + return ok +} + +func (oc *Controller) isOperatorAllowed(id common.DispatcherID) bool { + oc.mu.RLock() + defer oc.mu.RUnlock() + return oc.isOperatorAllowedLocked(id) +} + +func (oc *Controller) isQuiescing() bool { + oc.mu.RLock() + defer oc.mu.RUnlock() + return oc.quiescing +} + // Execute poll the operator from the queue and execute it // It will be called in the thread pool. func (oc *Controller) Execute() time.Time { @@ -95,16 +149,7 @@ func (oc *Controller) Execute() time.Time { continue } - msg := op.Schedule() - - if msg != nil { - _ = oc.messageCenter.SendCommand(msg) - log.Debug("send command to dispatcher", - zap.String("role", oc.role), - zap.Stringer("changefeedID", oc.changefeedID), - zap.String("operator", op.String()), - zap.Any("msg", msg.Message)) - } + oc.scheduleOperator(op) executedCounter++ if executedCounter >= oc.batchSize { return time.Now().Add(nextPollInterval) @@ -112,6 +157,27 @@ func (oc *Controller) Execute() time.Time { } } +func (oc *Controller) scheduleOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) { + oc.admissionMu.RLock() + defer oc.admissionMu.RUnlock() + + if !oc.isOperatorAllowed(op.ID()) { + return + } + + msg := op.Schedule() + if msg == nil { + return + } + + _ = oc.messageCenter.SendCommand(msg) + log.Debug("send command to dispatcher", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.String("operator", op.String()), + zap.Any("msg", msg.Message)) +} + // RemoveTasksBySchemaID remove all tasks by schema id. // it is only by the barrier when the schema is dropped by ddl func (oc *Controller) RemoveTasksBySchemaID(schemaID int64) { @@ -142,7 +208,19 @@ func (oc *Controller) RemoveTasksByTableIDs(tables ...int64) { // AddOperator adds an operator to the controller, if the operator already exists, return false. func (oc *Controller) AddOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) bool { + oc.admissionMu.RLock() + defer oc.admissionMu.RUnlock() + oc.mu.RLock() + if !oc.isOperatorAllowedLocked(op.ID()) { + oc.mu.RUnlock() + log.Info("add operator failed, controller is quiescing", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.String("dispatcherID", op.ID().String()), + zap.String("operator", op.String())) + return false + } if old, ok := oc.operators[op.ID()]; ok { oc.mu.RUnlock() log.Info("add operator failed, operator already exists", @@ -161,11 +239,16 @@ func (oc *Controller) AddOperator(op operator.Operator[common.DispatcherID, *hea zap.String("operator", op.String())) return false } - oc.pushOperator(op) - return true + return oc.pushOperatorWithAdmission(op) } func (oc *Controller) UpdateOperatorStatus(id common.DispatcherID, from node.ID, status *heartbeatpb.TableSpanStatus) { + oc.admissionMu.RLock() + defer oc.admissionMu.RUnlock() + + if !oc.isOperatorAllowed(id) { + return + } oc.mu.RLock() op, ok := oc.operators[id] oc.mu.RUnlock() @@ -179,6 +262,12 @@ func (oc *Controller) UpdateOperatorStatus(id common.DispatcherID, from node.ID, // the controller will mark all spans on the node as absent if no operator is handling it, // then the controller will notify all operators. func (oc *Controller) OnNodeRemoved(n node.ID) { + oc.admissionMu.RLock() + defer oc.admissionMu.RUnlock() + + if oc.isQuiescing() { + return + } for _, span := range oc.spanController.GetTaskByNodeID(n) { oc.mu.RLock() _, ok := oc.operators[span.ID] @@ -269,6 +358,12 @@ func (oc *Controller) pollQueueingOperator() ( op := item.OP opID := op.ID() oc.mu.Unlock() + if !oc.isOperatorAllowed(opID) { + // Quiescing is terminal for the old maintainer. Frozen ordinary operators must + // stop executing, but they stay in operators so GetMinCheckpointTs still applies + // their checkpoint safety constraints until the maintainer is closed. + return nil, true + } if item.IsRemoved.Load() { return nil, true } @@ -350,6 +445,17 @@ func (oc *Controller) cancelOperator(opID common.DispatcherID) { } func (oc *Controller) removeReplicaSet(op *removeDispatcherOperator) { + oc.admissionMu.RLock() + defer oc.admissionMu.RUnlock() + + if !oc.isOperatorAllowed(op.ID()) { + log.Info("skip remove operator while controller is quiescing", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.String("dispatcherID", op.ID().String()), + zap.String("operator", op.String())) + return + } oc.mu.RLock() old, ok := oc.operators[op.ID()] oc.mu.RUnlock() @@ -362,11 +468,26 @@ func (oc *Controller) removeReplicaSet(op *removeDispatcherOperator) { old.OP.OnTaskRemoved() oc.finalizeOperator(old, op.ID()) } - oc.pushOperator(op) + oc.pushOperatorWithAdmission(op) } // pushOperator add an operator to the controller queue. -func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) { +func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) bool { + oc.admissionMu.RLock() + defer oc.admissionMu.RUnlock() + + if !oc.isOperatorAllowed(op.ID()) { + log.Info("skip operator while controller is quiescing", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.String("dispatcherID", op.ID().String()), + zap.String("operator", op.String())) + return false + } + return oc.pushOperatorWithAdmission(op) +} + +func (oc *Controller) pushOperatorWithAdmission(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) bool { log.Info("add operator to running queue", zap.String("role", oc.role), zap.Stringer("changefeedID", oc.changefeedID), @@ -390,6 +511,7 @@ func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *he metrics.OperatorCount.WithLabelValues(common.DefaultKeyspaceName, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc() metrics.TotalOperatorCount.WithLabelValues(common.DefaultKeyspaceName, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc() + return true } func (oc *Controller) checkAffectedNodes(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) { diff --git a/maintainer/operator/operator_controller_test.go b/maintainer/operator/operator_controller_test.go index a0097029a7..576a6bc532 100644 --- a/maintainer/operator/operator_controller_test.go +++ b/maintainer/operator/operator_controller_test.go @@ -18,6 +18,7 @@ import ( "sync" syncatomic "sync/atomic" "testing" + "time" "unsafe" "github.com/pingcap/ticdc/heartbeatpb" @@ -141,6 +142,117 @@ func (o *neverFinishOperator) OnTaskRemoved() {} func (o *neverFinishOperator) String() string { return "never-finish" } func (o *neverFinishOperator) BlockTsForward() bool { return false } +type countingOperator struct { + id common.DispatcherID + targetNode node.ID + blockTsForward bool + scheduleCount syncatomic.Int32 + checkCount syncatomic.Int32 + nodeRemovedCount syncatomic.Int32 +} + +func (o *countingOperator) ID() common.DispatcherID { return o.id } +func (o *countingOperator) Type() string { return "add" } +func (o *countingOperator) Start() {} +func (o *countingOperator) Schedule() *messaging.TargetMessage { + o.scheduleCount.Add(1) + return messaging.NewSingleTargetMessage(o.targetNode, messaging.MaintainerManagerTopic, &heartbeatpb.RemoveMaintainerRequest{}) +} +func (o *countingOperator) IsFinished() bool { return false } +func (o *countingOperator) PostFinish() {} +func (o *countingOperator) Check(node.ID, *heartbeatpb.TableSpanStatus) { + o.checkCount.Add(1) +} + +func (o *countingOperator) OnNodeRemove(node.ID) { + o.nodeRemovedCount.Add(1) +} +func (o *countingOperator) AffectedNodes() []node.ID { return []node.ID{o.targetNode} } +func (o *countingOperator) OnTaskRemoved() {} +func (o *countingOperator) String() string { return "counting-operator" } +func (o *countingOperator) BlockTsForward() bool { return o.blockTsForward } + +type blockingScheduleOperator struct { + id common.DispatcherID + targetNode node.ID + + scheduleEntered chan struct{} + releaseSchedule chan struct{} + scheduleOnce sync.Once + scheduleCount syncatomic.Int32 +} + +func newBlockingScheduleOperator(id common.DispatcherID, targetNode node.ID) *blockingScheduleOperator { + return &blockingScheduleOperator{ + id: id, + targetNode: targetNode, + scheduleEntered: make(chan struct{}), + releaseSchedule: make(chan struct{}), + } +} + +func (o *blockingScheduleOperator) ID() common.DispatcherID { return o.id } +func (o *blockingScheduleOperator) Type() string { return "add" } +func (o *blockingScheduleOperator) Start() {} +func (o *blockingScheduleOperator) Schedule() *messaging.TargetMessage { + o.scheduleCount.Add(1) + o.scheduleOnce.Do(func() { close(o.scheduleEntered) }) + <-o.releaseSchedule + return messaging.NewSingleTargetMessage(o.targetNode, messaging.MaintainerManagerTopic, &heartbeatpb.RemoveMaintainerRequest{}) +} + +func (o *blockingScheduleOperator) IsFinished() bool { return false } +func (o *blockingScheduleOperator) PostFinish() {} +func (o *blockingScheduleOperator) Check(node.ID, *heartbeatpb.TableSpanStatus) { +} + +func (o *blockingScheduleOperator) OnNodeRemove(node.ID) { +} +func (o *blockingScheduleOperator) AffectedNodes() []node.ID { return []node.ID{o.targetNode} } +func (o *blockingScheduleOperator) OnTaskRemoved() {} +func (o *blockingScheduleOperator) String() string { return "blocking-schedule" } +func (o *blockingScheduleOperator) BlockTsForward() bool { return false } + +type blockingStartOperator struct { + id common.DispatcherID + targetNode node.ID + + startEntered chan struct{} + releaseStart chan struct{} + startOnce sync.Once + startCount syncatomic.Int32 +} + +func newBlockingStartOperator(id common.DispatcherID, targetNode node.ID) *blockingStartOperator { + return &blockingStartOperator{ + id: id, + targetNode: targetNode, + startEntered: make(chan struct{}), + releaseStart: make(chan struct{}), + } +} + +func (o *blockingStartOperator) ID() common.DispatcherID { return o.id } +func (o *blockingStartOperator) Type() string { return "add" } +func (o *blockingStartOperator) Start() { + o.startCount.Add(1) + o.startOnce.Do(func() { close(o.startEntered) }) + <-o.releaseStart +} + +func (o *blockingStartOperator) Schedule() *messaging.TargetMessage { return nil } +func (o *blockingStartOperator) IsFinished() bool { return false } +func (o *blockingStartOperator) PostFinish() {} +func (o *blockingStartOperator) Check(node.ID, *heartbeatpb.TableSpanStatus) { +} + +func (o *blockingStartOperator) OnNodeRemove(node.ID) { +} +func (o *blockingStartOperator) AffectedNodes() []node.ID { return []node.ID{o.targetNode} } +func (o *blockingStartOperator) OnTaskRemoved() {} +func (o *blockingStartOperator) String() string { return "blocking-start" } +func (o *blockingStartOperator) BlockTsForward() bool { return false } + func setAliveNodes(nodeManager *watcher.NodeManager, alive map[node.ID]*node.Info) { type nodeMap = map[node.ID]*node.Info v := reflect.ValueOf(nodeManager).Elem().FieldByName("nodes") @@ -242,3 +354,237 @@ func TestController_RemoveReplicaSet_ReplacesRemoveOperatorOnTaskRemoved(t *test require.Equal(t, int32(0), postFinishCount.Load()) require.NotNil(t, oc.GetOperator(replicaSet.ID)) } + +func TestController_QuiesceExceptFreezesNonAllowedOperators(t *testing.T) { + // Scenario: removing mode allows only the DDL close operator to keep running. + // Steps: quiesce the controller with one allowed dispatcher, then verify the + // allowed operator still accepts status and schedules, while the frozen operator + // does not run but still blocks checkpoint advancement. + messageCenter := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, messageCenter) + + spanController, changefeedID, replicaSet, nodeA, _ := setupTestEnvironment(t) + spanController.AddReplicatingSpan(replicaSet) + + allowedID := common.NewDispatcherID() + allowedReplica := setupReplicaSetWithID(t, changefeedID, allowedID, nodeA) + allowedReplica.UpdateStatus(&heartbeatpb.TableSpanStatus{ + ID: allowedID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 20, + Mode: common.DefaultMode, + }) + spanController.AddReplicatingSpan(allowedReplica) + + blockedID := common.NewDispatcherID() + blockedReplica := setupReplicaSetWithID(t, changefeedID, blockedID, nodeA) + spanController.AddReplicatingSpan(blockedReplica) + + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + setAliveNodes(nodeManager, map[node.ID]*node.Info{nodeA: {ID: nodeA}}) + + oc := NewOperatorController(changefeedID, spanController, 10, common.DefaultMode) + allowedOp := &countingOperator{id: allowedID, targetNode: nodeA, blockTsForward: true} + blockedOp := &countingOperator{id: blockedID, targetNode: nodeA, blockTsForward: true} + require.True(t, oc.AddOperator(allowedOp)) + require.True(t, oc.AddOperator(blockedOp)) + + oc.QuiesceExcept(allowedID) + + oc.UpdateOperatorStatus(allowedID, nodeA, &heartbeatpb.TableSpanStatus{ID: allowedID.ToPB()}) + oc.UpdateOperatorStatus(blockedID, nodeA, &heartbeatpb.TableSpanStatus{ID: blockedID.ToPB()}) + require.Equal(t, int32(1), allowedOp.checkCount.Load()) + require.Equal(t, int32(0), blockedOp.checkCount.Load()) + + oc.OnNodeRemoved(nodeA) + require.Equal(t, int32(0), allowedOp.nodeRemovedCount.Load()) + require.Equal(t, int32(0), blockedOp.nodeRemovedCount.Load()) + require.Equal(t, 0, spanController.GetAbsentSize()) + + newBlockedID := common.NewDispatcherID() + newBlockedReplica := setupReplicaSetWithID(t, changefeedID, newBlockedID, nodeA) + spanController.AddReplicatingSpan(newBlockedReplica) + require.False(t, oc.AddOperator(&countingOperator{id: newBlockedID, targetNode: nodeA})) + + require.Equal(t, uint64(10), oc.GetMinCheckpointTs(^uint64(0))) + + next := oc.Execute() + require.False(t, next.IsZero()) + require.Equal(t, int32(1), allowedOp.scheduleCount.Load()) + require.Equal(t, int32(0), blockedOp.scheduleCount.Load()) + require.Len(t, messageCenter.GetMessageChannel(), 1) + require.Equal(t, 2, oc.OperatorSize()) +} + +func TestController_QuiesceExceptDropsBlockedOnlyQueueFromExecution(t *testing.T) { + // Scenario: after removing starts, the running queue can contain only frozen ordinary operators. + // Steps: poll a quiesced controller with one non-allowed operator and verify it leaves the heap, + // remains in the operator map for checkpoint safety, and the next poll terminates the Execute loop. + messageCenter := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, messageCenter) + + spanController, changefeedID, replicaSet, nodeA, _ := setupTestEnvironment(t) + spanController.AddReplicatingSpan(replicaSet) + + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + setAliveNodes(nodeManager, map[node.ID]*node.Info{nodeA: {ID: nodeA}}) + + oc := NewOperatorController(changefeedID, spanController, 10, common.DefaultMode) + blockedOp := &countingOperator{id: replicaSet.ID, targetNode: nodeA, blockTsForward: true} + require.True(t, oc.AddOperator(blockedOp)) + + oc.QuiesceExcept(common.NewDispatcherID()) + + op, next := oc.pollQueueingOperator() + require.Nil(t, op) + require.True(t, next) + require.Equal(t, 0, oc.runningQueue.Len()) + require.Equal(t, 1, oc.OperatorSize()) + require.Equal(t, uint64(1000), oc.GetMinCheckpointTs(^uint64(0))) + + op, next = oc.pollQueueingOperator() + require.Nil(t, op) + require.False(t, next) + require.Equal(t, int32(0), blockedOp.scheduleCount.Load()) +} + +func TestController_QuiesceExceptWaitsForInFlightSchedule(t *testing.T) { + // Scenario: Execute has already passed the queue poll and is inside a normal operator's Schedule. + // Steps: block Schedule with a channel, start QuiesceExcept, verify quiesce cannot return until + // Schedule/SendCommand leaves the admission boundary, then verify later Execute calls do not reschedule it. + messageCenter := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, messageCenter) + + spanController, changefeedID, replicaSet, nodeA, _ := setupTestEnvironment(t) + spanController.AddReplicatingSpan(replicaSet) + + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + setAliveNodes(nodeManager, map[node.ID]*node.Info{nodeA: {ID: nodeA}}) + + oc := NewOperatorController(changefeedID, spanController, 10, common.DefaultMode) + op := newBlockingScheduleOperator(replicaSet.ID, nodeA) + require.True(t, oc.AddOperator(op)) + + executeDone := make(chan struct{}) + go func() { + defer close(executeDone) + oc.Execute() + }() + <-op.scheduleEntered + + quiesceDone := make(chan struct{}) + go func() { + defer close(quiesceDone) + oc.QuiesceExcept(common.NewDispatcherID()) + }() + + require.Never(t, func() bool { + select { + case <-quiesceDone: + return true + default: + return false + } + }, 100*time.Millisecond, 10*time.Millisecond) + + close(op.releaseSchedule) + require.Eventually(t, func() bool { + select { + case <-executeDone: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { + select { + case <-quiesceDone: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + + require.Equal(t, int32(1), op.scheduleCount.Load()) + require.Len(t, messageCenter.GetMessageChannel(), 1) + + oc.Execute() + require.Equal(t, int32(1), op.scheduleCount.Load()) + require.Len(t, messageCenter.GetMessageChannel(), 1) +} + +func TestController_QuiesceExceptWaitsForInFlightPush(t *testing.T) { + // Scenario: a normal operator has passed admission and is inside Start while removing mode begins. + // Steps: block Start with a channel, start QuiesceExcept, verify quiesce cannot return until Start + // finishes, then verify a later ordinary operator is rejected without being started. + messageCenter := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, messageCenter) + + spanController, changefeedID, replicaSet, nodeA, _ := setupTestEnvironment(t) + spanController.AddReplicatingSpan(replicaSet) + + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + setAliveNodes(nodeManager, map[node.ID]*node.Info{nodeA: {ID: nodeA}}) + + oc := NewOperatorController(changefeedID, spanController, 10, common.DefaultMode) + op := newBlockingStartOperator(replicaSet.ID, nodeA) + + addResult := make(chan bool, 1) + go func() { + addResult <- oc.AddOperator(op) + }() + <-op.startEntered + + quiesceDone := make(chan struct{}) + go func() { + defer close(quiesceDone) + oc.QuiesceExcept(common.NewDispatcherID()) + }() + + require.Never(t, func() bool { + select { + case <-quiesceDone: + return true + default: + return false + } + }, 100*time.Millisecond, 10*time.Millisecond) + + close(op.releaseStart) + require.Eventually(t, func() bool { return len(addResult) == 1 }, time.Second, 10*time.Millisecond) + require.True(t, <-addResult) + require.Eventually(t, func() bool { + select { + case <-quiesceDone: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.Equal(t, int32(1), op.startCount.Load()) + + blockedID := common.NewDispatcherID() + blockedReplica := setupReplicaSetWithID(t, changefeedID, blockedID, nodeA) + spanController.AddReplicatingSpan(blockedReplica) + blockedOp := newBlockingStartOperator(blockedID, nodeA) + require.False(t, oc.AddOperator(blockedOp)) + require.Equal(t, int32(0), blockedOp.startCount.Load()) +} + +func setupReplicaSetWithID( + t *testing.T, + changefeedID common.ChangeFeedID, + dispatcherID common.DispatcherID, + nodeID node.ID, +) *replica.SpanReplication { + t.Helper() + + tableID := int64(dispatcherID.Low + 100) + span := testutil.GetTableSpanByID(tableID) + return replica.NewWorkingSpanReplication(changefeedID, dispatcherID, 1, span, &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, nodeID, false) +}