Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ func (m *Maintainer) HandleEvent(event *Event) bool {
}

func (m *Maintainer) checkNodeChanged() {
if m.removing.Load() {
return
}
m.nodeChanged.Lock()
defer m.nodeChanged.Unlock()
if m.nodeChanged.changed {
Expand Down Expand Up @@ -549,6 +552,13 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) {
m.removing.Store(true)
m.cascadeRemoving.Store(cascade)
m.changefeedRemoved.Store(changefeedRemoved)
// Freeze ordinary scheduling on the old maintainer before we start the close flow.
// Only the DDL trigger close operator is allowed to keep running.
allowedDispatcherIDs := []common.DispatcherID{m.ddlSpan.ID}
if m.enableRedo {
allowedDispatcherIDs = append(allowedDispatcherIDs, m.redoDDLSpan.ID)
}
m.controller.EnterRemovingMode(allowedDispatcherIDs...)
closed := m.tryCloseChangefeed()
if closed {
m.removed.Store(true)
Expand Down Expand Up @@ -899,6 +909,14 @@ func (m *Maintainer) onHeartbeatRequest(msg *messaging.TargetMessage) {
// Process operator status updates AFTER checkpointTsByCapture is updated
// This ensures when operators complete, checkpointTsByCapture already contains the complete heartbeat
// Works with calCheckpointTs constraint ordering to prevent checkpoint advancing past new dispatcher startTs
if m.removing.Load() {
// Once RemoveMaintainer starts, we still need status updates for the close flow itself
// (for example DDL-trigger close operators reaching terminal states), but we must not run
// failover self-healing. A late Stopped/Working heartbeat from a closing dispatcher manager
// would otherwise mark spans absent or remove/recreate dispatchers after shutdown has begun.
m.controller.handleStatus(msg.From, req.Statuses, false)
return
}
m.controller.HandleStatus(msg.From, req.Statuses)
}

Expand All @@ -915,7 +933,7 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) {

func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) {
// the barrier is not initialized
if !m.initialized.Load() {
if !m.initialized.Load() || m.removing.Load() {
return
}
req := msg.Message[0].(*heartbeatpb.BlockStatusRequest)
Expand Down Expand Up @@ -1049,8 +1067,12 @@ func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeat

func (m *Maintainer) handleResendMessage() {
// resend closing message
if m.removing.Load() && m.cascadeRemoving.Load() {
m.trySendMaintainerCloseRequestToAllNode()
if m.removing.Load() {
// After RemoveMaintainer starts, the old maintainer must stop resending bootstrap/barrier
// traffic. Otherwise stale control-plane messages can race with the new maintainer.
if m.cascadeRemoving.Load() {
m.trySendMaintainerCloseRequestToAllNode()
}
return
}
// resend bootstrap message
Expand Down
63 changes: 62 additions & 1 deletion maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@
return controller
}

// HandleStatus handle the status report from the node
// HandleStatus handles the status report from the node.
func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
c.handleStatus(from, statusList, true)
}

func (c *Controller) handleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus, allowSelfHealing bool) {
// HandleStatus reconciles runtime dispatcher reports with maintainer-side state.
//
// In the steady state, spanController (desired tasks), operatorController (in-flight scheduling),
Expand All @@ -170,6 +174,10 @@
// The rules below make the system converge:
// 1) Orphan Working dispatcher without an operator => actively remove it to avoid leaks.
// 2) Non-working dispatcher without an operator => mark the span absent so scheduler can recreate it.
//
// During maintainer removal we still need status bookkeeping so close/remove can observe terminal
// states, but we must disable the self-healing branches. Otherwise a late Stopped/Working heartbeat
// can recreate dispatchers for a changefeed that is already shutting down.
for _, status := range statusList {
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
operatorController := c.getOperatorController(status.Mode)
Expand All @@ -178,6 +186,9 @@
operatorController.UpdateOperatorStatus(dispatcherID, from, status)
stm := spanController.GetTaskByID(dispatcherID)
if stm == nil {
if !allowSelfHealing {
continue
}
// If maintainer doesn't know this dispatcherID, most statuses are late/outdated and can be ignored.
// We only need to act when the runtime says the dispatcher is Working, because that implies there's
// still an active dispatcher consuming resources and potentially producing output.
Expand Down Expand Up @@ -207,6 +218,47 @@
continue
}
spanController.UpdateStatus(stm, status)
<<<<<<< HEAD

Check failure on line 221 in maintainer/maintainer_controller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected }
=======

Check failure on line 222 in maintainer/maintainer_controller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expected }

if !allowSelfHealing {
continue
}

// Fallback: dispatcher becomes non-working without an operator.
//
// In normal scheduling flow, a dispatcher should transition to Stopped/Removed as part of a maintainer
// operator (Remove/Move/Split...). However, after maintainer failover we can lose operatorController state
// while dispatcher managers keep executing the already-issued requests.
//
// A real example is a "remove request in transit" during bootstrap:
// - Old maintainer sends a Remove (e.g. the remove-origin phase of Move), but the request hasn't reached
// dispatcher manager yet.
// - New maintainer bootstraps from dispatcher manager snapshots and sees the dispatcher as Working, with
// no in-flight operator reported in bootstrap response.
// - After bootstrap, the in-transit Remove arrives, the dispatcher is removed, and the new maintainer
// observes a terminal status without a corresponding operator.
//
// In these cases we'd observe a non-working status but have no operator to drive the follow-up
// rescheduling, so we mark the span absent to let the scheduler recreate it.
//
// Safety against message reordering/resend:
// - We only reach here when stm != nil and stm.GetNodeID() == from (checked above). If the span was already
// rebound to a different node, we skip it, so late statuses from the old node won't trigger rescheduling.
// - MarkSpanAbsent is idempotent and only affects the scheduler state, so even if we get duplicate terminal
// statuses, the worst case is an extra no-op absent mark.
if status.ComponentStatus == heartbeatpb.ComponentState_Stopped ||
status.ComponentStatus == heartbeatpb.ComponentState_Removed {
if op := operatorController.GetOperator(dispatcherID); op == nil {
log.Warn("dispatcher becomes non-working without operator, mark span absent for rescheduling",
zap.String("changefeed", c.changefeedID.Name()),
zap.String("from", from.String()),
zap.String("dispatcherID", dispatcherID.String()),
zap.Any("status", status))
spanController.MarkSpanAbsent(stm)
}
}
>>>>>>> 776315e72 (maintainer: quiesce control plane during remove handoff (#4828))

Check failure on line 261 in maintainer/maintainer_controller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
Comment on lines +221 to +261

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git merge conflict markers (<<<<<<< HEAD, =======, >>>>>>> 776315e72) in this file. Please resolve the conflict and remove the markers to ensure the code compiles successfully.

		if !allowSelfHealing {
			continue
		}

		// Fallback: dispatcher becomes non-working without an operator.
		//
		// In normal scheduling flow, a dispatcher should transition to Stopped/Removed as part of a maintainer
		// operator (Remove/Move/Split...). However, after maintainer failover we can lose operatorController state
		// while dispatcher managers keep executing the already-issued requests.
		//
		// A real example is a "remove request in transit" during bootstrap:
		// - Old maintainer sends a Remove (e.g. the remove-origin phase of Move), but the request hasn't reached
		//   dispatcher manager yet.
		// - New maintainer bootstraps from dispatcher manager snapshots and sees the dispatcher as Working, with
		//   no in-flight operator reported in bootstrap response.
		// - After bootstrap, the in-transit Remove arrives, the dispatcher is removed, and the new maintainer
		//   observes a terminal status without a corresponding operator.
		//
		// In these cases we'd observe a non-working status but have no operator to drive the follow-up
		// rescheduling, so we mark the span absent to let the scheduler recreate it.
		//
		// Safety against message reordering/resend:
		// - We only reach here when stm != nil and stm.GetNodeID() == from (checked above). If the span was already
		//   rebound to a different node, we skip it, so late statuses from the old node won't trigger rescheduling.
		// - MarkSpanAbsent is idempotent and only affects the scheduler state, so even if we get duplicate terminal
		//   statuses, the worst case is an extra no-op absent mark.
		if status.ComponentStatus == heartbeatpb.ComponentState_Stopped ||
			status.ComponentStatus == heartbeatpb.ComponentState_Removed {
			if op := operatorController.GetOperator(dispatcherID); op == nil {
				log.Warn("dispatcher becomes non-working without operator, mark span absent for rescheduling",
					zap.String("changefeed", c.changefeedID.Name()),
					zap.String("from", from.String()),
					zap.String("dispatcherID", dispatcherID.String()),
					zap.Any("status", status))
				spanController.MarkSpanAbsent(stm)
			}
		}

}
}

Expand Down Expand Up @@ -241,6 +293,15 @@
c.operatorController.OnNodeRemoved(id)
}

// EnterRemovingMode freezes normal scheduling on the old maintainer while keeping the
// DDL trigger dispatcher close path alive.
func (c *Controller) EnterRemovingMode(allowedDispatcherIDs ...common.DispatcherID) {
c.operatorController.QuiesceExcept(allowedDispatcherIDs...)
if c.redoOperatorController != nil {
c.redoOperatorController.QuiesceExcept(allowedDispatcherIDs...)
}
}

func (c *Controller) GetMinRedoCheckpointTs(minCheckpointTs uint64) uint64 {
minCheckpointTsForOperator := c.redoOperatorController.GetMinCheckpointTs(minCheckpointTs)
minCheckpointTsForSpan := c.redoSpanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs)
Expand Down
200 changes: 200 additions & 0 deletions maintainer/maintainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,159 @@ func TestMaintainer_GetMaintainerStatusUsesCommittedCheckpoint(t *testing.T) {
require.Equal(t, uint64(50), status.LastSyncedTs)
}

func TestMaintainerHeartbeatDuringRemovingSkipsFailoverRecovery(t *testing.T) {
buildMaintainer := func(t *testing.T) (*Maintainer, *replica.SpanReplication, node.ID) {
t.Helper()
testutil.SetUpTestServices(t)

nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
captureID := node.ID("node1")
nodeManager.GetAliveNodes()[captureID] = &node.Info{ID: captureID}

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
ddlDispatcherID := common.NewDispatcherID()
ddlSpan := replica.NewWorkingSpanReplication(cfID, ddlDispatcherID,
common.DDLSpanSchemaID,
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
ID: ddlDispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 10,
Mode: common.DefaultMode,
}, captureID, false)
refresher := replica.NewRegionCountRefresher(cfID, time.Minute)
controller := NewController(cfID, 10, &mockThreadPool{},
config.GetDefaultReplicaConfig(), ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false, testBalanceMoveBatchSize)

totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1)
dispatcherID := common.NewDispatcherID()
workingSpan := replica.NewWorkingSpanReplication(cfID, dispatcherID,
1,
&heartbeatpb.TableSpan{
TableID: totalSpan.TableID,
StartKey: totalSpan.StartKey,
EndKey: totalSpan.EndKey,
KeyspaceID: common.DefaultKeyspaceID,
}, &heartbeatpb.TableSpanStatus{
ID: dispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 10,
Mode: common.DefaultMode,
}, captureID, false)
controller.spanController.AddReplicatingSpan(workingSpan)

m := &Maintainer{
changefeedID: cfID,
controller: controller,
checkpointTsByCapture: newWatermarkCaptureMap(),
redoTsByCapture: newWatermarkCaptureMap(),
statusChanged: atomic.NewBool(false),
}
m.watermark.Watermark = &heartbeatpb.Watermark{}
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)
m.initialized.Store(true)
return m, workingSpan, captureID
}

makeHeartbeat := func(dispatcherID common.DispatcherID, from node.ID) *messaging.TargetMessage {
req := &heartbeatpb.HeartBeatRequest{
Watermark: &heartbeatpb.Watermark{
CheckpointTs: 20,
ResolvedTs: 20,
},
Statuses: []*heartbeatpb.TableSpanStatus{
{
ID: dispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Stopped,
CheckpointTs: 20,
Mode: common.DefaultMode,
},
},
}
return &messaging.TargetMessage{
From: from,
Type: messaging.TypeHeartBeatRequest,
Message: []messaging.IOTypeT{req},
}
}

// Normal failover recovery should still mark a non-working span absent when the runtime
// reports Stopped but maintainer has no operator for it.
t.Run("normal maintainer still self heals", func(t *testing.T) {
m, workingSpan, captureID := buildMaintainer(t)

m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID))

require.Equal(t, 1, m.controller.spanController.GetAbsentSize())
require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus)
require.Equal(t, node.ID(""), workingSpan.GetNodeID())
})

// When RemoveMaintainer has started, the same late Stopped heartbeat must only update runtime
// status bookkeeping. Re-marking the span absent here would let the scheduler recreate a
// dispatcher while the changefeed is shutting down.
t.Run("removing maintainer skips self healing", func(t *testing.T) {
m, workingSpan, captureID := buildMaintainer(t)
m.removing.Store(true)

m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID))

require.Equal(t, 0, m.controller.spanController.GetAbsentSize())
require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus)
require.Equal(t, captureID, workingSpan.GetNodeID())
require.Zero(t, m.controller.operatorController.OperatorSize())
})
}

func TestMaintainerRemovingSuppressesLegacyControlPlaneActions(t *testing.T) {
mockMC := messaging.NewMockMessageCenter()
nodeManager := watcher.NewNodeManager(nil, nil)
nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"}
nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"}

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
m := &Maintainer{
changefeedID: cfID,
mc: mockMC,
nodeManager: nodeManager,
closedNodes: make(map[node.ID]struct{}),
statusChanged: atomic.NewBool(false),
postBootstrapMsg: &heartbeatpb.MaintainerPostBootstrapRequest{
ChangefeedID: cfID.ToPB(),
},
}
m.watermark.Watermark = &heartbeatpb.Watermark{CheckpointTs: 100, ResolvedTs: 100}
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)
m.initialized.Store(true)
m.removing.Store(true)

// Removing maintainer must not keep resending bootstrap/post-bootstrap or barrier traffic.
// The only remaining control-plane action should be cascade close requests.
m.handleResendMessage()
require.Len(t, mockMC.GetMessageChannel(), 0)

// Block status handling must also stop once removal starts, otherwise the old maintainer
// can still schedule DDL-driven add/remove operations after handoff begins.
m.onBlockStateRequest(&messaging.TargetMessage{
From: "node1",
Type: messaging.TypeBlockStatusRequest,
Message: []messaging.IOTypeT{&heartbeatpb.BlockStatusRequest{
ChangefeedID: cfID.ToPB(),
Mode: common.DefaultMode,
}},
})
require.Len(t, mockMC.GetMessageChannel(), 0)

m.cascadeRemoving.Store(true)
m.handleResendMessage()
require.Len(t, mockMC.GetMessageChannel(), 2)
for i := 0; i < 2; i++ {
msg := <-mockMC.GetMessageChannel()
require.Equal(t, messaging.TypeMaintainerCloseRequest, msg.Type)
req := msg.Message[0].(*heartbeatpb.MaintainerCloseRequest)
require.Equal(t, cfID.ToPB(), req.ChangefeedID)
}
}

func TestMaintainerCalculateNewCheckpointTs(t *testing.T) {
t.Run("uses reported checkpoint", func(t *testing.T) {
m, selfNodeID := newMaintainerForCheckpointCalculationTest(t)
Expand Down Expand Up @@ -428,6 +581,52 @@ func TestMaintainerCalculateNewCheckpointTs(t *testing.T) {
require.False(t, canUpdate)
require.Nil(t, newWatermark)
})

t.Run("removing keeps blocked add operator checkpoint constraint", func(t *testing.T) {
// Scenario: the old maintainer enters removing mode while an Add operator is still in flight.
// Steps: report a higher capture watermark, quiesce all ordinary operators, and verify the
// checkpoint calculation remains capped at the in-flight add span's start checkpoint.
m, selfNodeID := newMaintainerForCheckpointCalculationTest(t)
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
nodeManager.GetAliveNodes()[selfNodeID] = &node.Info{ID: selfNodeID}

dispatcherID := common.NewDispatcherID()
replicaSet := replica.NewWorkingSpanReplication(
m.changefeedID,
dispatcherID,
1,
testutil.GetTableSpanByID(101),
&heartbeatpb.TableSpanStatus{
ID: dispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 10,
Mode: common.DefaultMode,
},
"",
false,
)
m.controller.spanController.AddAbsentReplicaSet(replicaSet)
require.True(t, m.controller.operatorController.AddOperator(operator.NewAddDispatcherOperator(
m.controller.spanController,
replicaSet,
selfNodeID,
heartbeatpb.OperatorType_O_Add,
)))

m.removing.Store(true)
m.controller.EnterRemovingMode(m.ddlSpan.ID)
m.checkpointTsByCapture.Set(selfNodeID, heartbeatpb.Watermark{
CheckpointTs: 100,
ResolvedTs: 100,
})

newWatermark, canUpdate := m.calculateNewCheckpointTs()

require.True(t, canUpdate)
require.NotNil(t, newWatermark)
require.Equal(t, uint64(10), newWatermark.CheckpointTs)
require.Equal(t, uint64(10), newWatermark.ResolvedTs)
})
}

func TestMaintainerCalCheckpointTsSkipsInvalidGlobalCheckpoint(t *testing.T) {
Expand Down Expand Up @@ -527,6 +726,7 @@ func newMaintainerForCheckpointCalculationTest(t testing.TB) (*Maintainer, node.
changefeedID: cfID,
selfNode: selfNode,
controller: controller,
ddlSpan: ddlSpan,
pdClock: pdutil.NewClock4Test(),
bootstrapper: bootstrapper,
checkpointTsByCapture: newWatermarkCaptureMap(),
Expand Down
Loading
Loading