maintainer,dispatcher: fence stale maintainer epochs#5435
Conversation
|
Skipping CI for Draft Pull Request. |
📝 WalkthroughWalkthroughThis PR introduces maintainer epoch fencing across the TiCDC control plane. A ChangesMaintainer Epoch Fencing
Sequence Diagram(s)sequenceDiagram
rect rgba(100, 149, 237, 0.5)
Note over MaintainerManager,Maintainer: Maintainer side
MaintainerManager->>Maintainer: AddMaintainer(epoch=N)
Maintainer->>Maintainer: mayRegisterMaintainerForAdd(epoch)?
Maintainer->>DispatcherNode: MaintainerBootstrapRequest(MaintainerEpoch=N)
end
rect rgba(144, 238, 144, 0.5)
Note over DispatcherNode,DispatcherManager: Dispatcher side
DispatcherNode->>DispatcherOrchestrator: handleBootstrapRequest(epoch=N)
DispatcherOrchestrator->>DispatcherOrchestrator: check closedMaintainerEpochs >= N?
DispatcherOrchestrator->>DispatcherManager: NewDispatcherManager(maintainerEpoch=N)
DispatcherOrchestrator->>DispatcherManager: TryUpdateMaintainer(from, N)
DispatcherNode->>DispatcherManager: ScheduleDispatcherRequest(From=maintainer, MaintainerEpoch=N)
DispatcherManager->>DispatcherManager: Lock(MaintainerFenceMu) → IsMaintainerRequestAllowed?
DispatcherManager->>DispatcherManager: preCheckForSchedulerHandler → handleScheduleCreate
end
rect rgba(255, 160, 122, 0.5)
Note over Maintainer,DispatcherNode: Stale epoch rejection
MaintainerManager->>Maintainer: AddMaintainer(epoch=N-1)
Maintainer->>Maintainer: isNewerMaintainerEpoch? → reject
DispatcherNode->>DispatcherManager: ScheduleDispatcherRequest(MaintainerEpoch=N-1)
DispatcherManager->>DispatcherManager: IsMaintainerRequestAllowed? → drop + log
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a robust maintainer epoch fencing mechanism to prevent stale maintainers from mutating dispatcher states or sending outdated control requests. Key changes include tracking active maintainer epochs, carrying sender node IDs and epochs in heartbeat responses and schedule requests, and implementing tombstones for closed managers to block delayed bootstrap requests. The review feedback highlights critical concurrency and performance improvements, specifically recommending the use of read locks to prevent data races on maintainer metadata and advising to release the registry lock before performing potentially slow close operations on maintainers.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func (e *DispatcherManager) IsMaintainerRequestAllowed(from node.ID, maintainerEpoch uint64) bool { | ||
| e.meta.Lock() | ||
| defer e.meta.Unlock() | ||
| if maintainerEpoch == 0 { | ||
| return e.meta.maintainerEpoch == 0 && (e.meta.maintainerID == "" || e.meta.maintainerID == from) | ||
| } | ||
| return e.meta.maintainerEpoch == maintainerEpoch && e.meta.maintainerID == from | ||
| } |
There was a problem hiding this comment.
The IsMaintainerRequestAllowed method currently acquires a write lock (e.meta.Lock()) even though it only performs read operations. Since e.meta embeds sync.RWMutex, it is highly recommended to use RLock() and RUnlock() here to allow concurrent read access and avoid unnecessary serialization of heartbeat and scheduler checks.
Additionally, GetMaintainerID() and GetMaintainerEpoch() (which are called concurrently in other parts of the codebase, such as logging inside preCheckForSchedulerHandler) currently access e.meta fields without any locking. This introduces a data race with TryUpdateMaintainer which writes to these fields. They should also be updated to use RLock() and RUnlock() to ensure thread safety.
| func (e *DispatcherManager) IsMaintainerRequestAllowed(from node.ID, maintainerEpoch uint64) bool { | |
| e.meta.Lock() | |
| defer e.meta.Unlock() | |
| if maintainerEpoch == 0 { | |
| return e.meta.maintainerEpoch == 0 && (e.meta.maintainerID == "" || e.meta.maintainerID == from) | |
| } | |
| return e.meta.maintainerEpoch == maintainerEpoch && e.meta.maintainerID == from | |
| } | |
| func (e *DispatcherManager) IsMaintainerRequestAllowed(from node.ID, maintainerEpoch uint64) bool { | |
| e.meta.RLock() | |
| defer e.meta.RUnlock() | |
| if maintainerEpoch == 0 { | |
| return e.meta.maintainerEpoch == 0 && (e.meta.maintainerID == "" || e.meta.maintainerID == from) | |
| } | |
| return e.meta.maintainerEpoch == maintainerEpoch && e.meta.maintainerID == from | |
| } |
| func (p *managerMaintainerSet) registerMaintainerForAdd( | ||
| changefeedID common.ChangeFeedID, | ||
| requestEpoch uint64, | ||
| newMaintainer func() *Maintainer, | ||
| ) *Maintainer { | ||
| p.registryMu.Lock() | ||
| defer p.registryMu.Unlock() | ||
|
|
||
| registered, loaded := p.registry.Load(changefeedID) | ||
| if !loaded { | ||
| maintainer := newMaintainer() | ||
| p.registry.Store(changefeedID, maintainer) | ||
| return maintainer | ||
| } | ||
| existing := registered.(*Maintainer) | ||
| if !canRegisterAfterExistingMaintainer(existing, requestEpoch) { | ||
| logRejectedAddMaintainer(changefeedID, existing, requestEpoch) | ||
| return nil | ||
| } | ||
| // The old maintainer has fully stopped, so it is safe to release the | ||
| // shared metric labels before the new maintainer creates its own metric | ||
| // children for the same changefeed. | ||
| existing.Close() | ||
| maintainer := newMaintainer() | ||
| p.registry.Store(changefeedID, maintainer) | ||
| return maintainer | ||
| } |
There was a problem hiding this comment.
Calling existing.Close() inside registerMaintainerForAdd while holding the node-wide p.registryMu lock can block all other maintainer registrations or cleanups on this node if the close operation takes time (e.g., waiting for background goroutines to exit or cleaning up resources). Since existing is already fully stopped (as verified by canRegisterAfterExistingMaintainer), it is safe to release the lock before calling existing.Close().
func (p *managerMaintainerSet) registerMaintainerForAdd(
changefeedID common.ChangeFeedID,
requestEpoch uint64,
newMaintainer func() *Maintainer,
) *Maintainer {
p.registryMu.Lock()
registered, loaded := p.registry.Load(changefeedID)
if !loaded {
maintainer := newMaintainer()
p.registry.Store(changefeedID, maintainer)
p.registryMu.Unlock()
return maintainer
}
existing := registered.(*Maintainer)
if !canRegisterAfterExistingMaintainer(existing, requestEpoch) {
logRejectedAddMaintainer(changefeedID, existing, requestEpoch)
p.registryMu.Unlock()
return nil
}
maintainer := newMaintainer()
p.registry.Store(changefeedID, maintainer)
p.registryMu.Unlock()
existing.Close()
return maintainer
}| func (p *managerMaintainerSet) cleanupRemovedMaintainer(key, value interface{}) { | ||
| p.registryMu.Lock() | ||
| defer p.registryMu.Unlock() | ||
|
|
||
| cf := value.(*Maintainer) | ||
| if !cf.removed.Load() { | ||
| return | ||
| } | ||
| // Range can observe a removed maintainer just before a newer epoch replaces it. | ||
| // Only the value still stored in the registry owns the shared metric labels. | ||
| if !p.registry.CompareAndDelete(key, cf) { | ||
| return | ||
| } | ||
| cf.Close() | ||
| log.Info("maintainer removed, remove it from dynamic stream", | ||
| zap.Stringer("changefeedID", cf.changefeedID), | ||
| zap.Uint64("checkpointTs", cf.getWatermark().CheckpointTs), | ||
| ) | ||
| } |
There was a problem hiding this comment.
Similarly to registerMaintainerForAdd, calling cf.Close() while holding the node-wide p.registryMu lock can block other maintainers' operations on this node. It is safer and more performant to release the lock before calling cf.Close().
| func (p *managerMaintainerSet) cleanupRemovedMaintainer(key, value interface{}) { | |
| p.registryMu.Lock() | |
| defer p.registryMu.Unlock() | |
| cf := value.(*Maintainer) | |
| if !cf.removed.Load() { | |
| return | |
| } | |
| // Range can observe a removed maintainer just before a newer epoch replaces it. | |
| // Only the value still stored in the registry owns the shared metric labels. | |
| if !p.registry.CompareAndDelete(key, cf) { | |
| return | |
| } | |
| cf.Close() | |
| log.Info("maintainer removed, remove it from dynamic stream", | |
| zap.Stringer("changefeedID", cf.changefeedID), | |
| zap.Uint64("checkpointTs", cf.getWatermark().CheckpointTs), | |
| ) | |
| } | |
| func (p *managerMaintainerSet) cleanupRemovedMaintainer(key, value interface{}) { | |
| p.registryMu.Lock() | |
| cf := value.(*Maintainer) | |
| if !cf.removed.Load() { | |
| p.registryMu.Unlock() | |
| return | |
| } | |
| if !p.registry.CompareAndDelete(key, cf) { | |
| p.registryMu.Unlock() | |
| return | |
| } | |
| p.registryMu.Unlock() | |
| cf.Close() | |
| log.Info("maintainer removed, remove it from dynamic stream", | |
| zap.Stringer("changefeedID", cf.changefeedID), | |
| zap.Uint64("checkpointTs", cf.getWatermark().CheckpointTs), | |
| ) | |
| } |
…le-maintainer-fence
…le-maintainer-fence
…le-maintainer-fence
…le-maintainer-fence
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go (2)
725-754: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick winFilter stale operators before returning bootstrap state.
After
TryUpdateMaintainermoves the manager to a newer owner/epoch,currentOperatorMapcan still contain requests from the previous owner. This loop appends every stored request, so a new maintainer can restore stale operators during bootstrap.Proposed fix
- manager.GetCurrentOperatorMap().Range(func(_, value any) bool { + manager.GetCurrentOperatorMap().Range(func(key, value any) bool { req := value.(dispatchermanager.SchedulerDispatcherRequest) + if !manager.IsMaintainerRequestAllowed(req.From, req.MaintainerEpoch) { + manager.GetCurrentOperatorMap().Delete(key) + return true + } dispatcherID := common.NewDispatcherIDFromPB(req.Config.DispatcherID)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 725 - 754, The loop in the Range function iterates through all operators in manager.GetCurrentOperatorMap() and appends them to response.Operators without filtering for stale operators. After TryUpdateMaintainer updates the manager to a newer owner/epoch, the currentOperatorMap can still contain requests from the previous owner, which causes a new maintainer to restore stale operators during bootstrap. Add a filter condition before appending the operator to response.Operators that checks if the operator belongs to the current owner/epoch and only includes it if it does. This ensures stale operators from previous owners are not included in the bootstrap response.
274-303: 🩺 Stability & Availability | 🔴 Critical | ⚡ Quick winUnlock
MaintainerFenceMuon write-path-closed returns.The new fence lock is held when these write-path-closed branches return. Lines 302, 323, 423, and 436 leave the mutex locked, so later control messages for this dispatcher manager can block forever.
Proposed fix
if err != nil { if dispatchermanager.IsWritePathClosedError(err) { log.Info("dispatcher manager write path closed while creating table trigger event dispatcher", zap.Stringer("changefeedID", cfId), zap.Error(err)) + manager.MaintainerFenceMu.Unlock() return nil } @@ if err != nil { if dispatchermanager.IsWritePathClosedError(err) { log.Info("dispatcher manager write path closed while creating table trigger redo dispatcher", zap.Stringer("changefeedID", cfId), zap.Error(err)) + manager.MaintainerFenceMu.Unlock() return nil } @@ if err != nil { if dispatchermanager.IsWritePathClosedError(err) { log.Info("dispatcher manager write path closed while initializing table trigger event dispatcher", zap.Any("changefeedID", cfId.Name()), zap.Error(err)) + manager.MaintainerFenceMu.Unlock() return nil } @@ if err != nil { if dispatchermanager.IsWritePathClosedError(err) { log.Info("dispatcher manager write path closed while initializing table trigger redo dispatcher", zap.Any("changefeedID", cfId.Name()), zap.Error(err)) + manager.MaintainerFenceMu.Unlock() return nil }Also applies to: 320-324, 379-424, 433-436
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 274 - 303, The MaintainerFenceMu mutex is locked at the beginning of the function but is not being unlocked before returning in the write-path-closed error handling branches. Locate all the places where dispatchermanager.IsWritePathClosedError(err) is checked and returns nil without unlocking (including the one in NewTableTriggerEventDispatcher error handling and any other similar patterns). Before each of these return statements, add manager.MaintainerFenceMu.Unlock() to ensure the lock is released. This prevents deadlocks that would occur when other control messages attempt to acquire this lock.
🧹 Nitpick comments (2)
maintainer/maintainer_test.go (1)
266-307: 🎯 Functional Correctness | 🔵 Trivial | ⚡ Quick winAdd strict-mode epoch-0 response coverage.
The new tests cover non-zero mismatch cases, but epoch 0 is the compatibility sentinel and takes a separate branch. Add a strict-maintainer response test so unfenced stale responses cannot regress back to being accepted.
Proposed test coverage
func TestMaintainerPostBootstrapResponseRequiresCurrentEpoch(t *testing.T) { cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) m := &Maintainer{ changefeedID: cfID, info: &config.ChangeFeedInfo{Epoch: 2}, @@ )) require.NotNil(t, m.postBootstrapMsg) + m.onMaintainerPostBootstrapResponse(messaging.NewSingleTargetMessage( + node.ID("compat"), + messaging.MaintainerManagerTopic, + &heartbeatpb.MaintainerPostBootstrapResponse{ + ChangefeedID: cfID.ToPB(), + MaintainerEpoch: 0, + }, + )) + require.NotNil(t, m.postBootstrapMsg) + m.onMaintainerPostBootstrapResponse(messaging.NewSingleTargetMessage( node.ID("current"), messaging.MaintainerManagerTopic, &heartbeatpb.MaintainerPostBootstrapResponse{🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@maintainer/maintainer_test.go` around lines 266 - 307, Add a test case to verify that a strict maintainer rejects post-bootstrap responses with epoch 0, which is the compatibility sentinel. Create a new test function similar to TestMaintainerPostBootstrapResponseRequiresCurrentEpoch that constructs a strict Maintainer with Epoch: 2 in the ChangeFeedInfo, calls onMaintainerPostBootstrapResponse with a response containing epoch 0, and asserts that postBootstrapMsg remains non-nil (indicating the response was rejected). This ensures stale epoch-0 responses cannot regress back to being accepted by strict maintainers.maintainer/span/span_controller_test.go (1)
490-493: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAssert the propagated maintainer epoch.
Line 490 now passes
7intoNewAddDispatcherMessage, but the test still only checksStartTs. Add an assertion so this changed test covers the new message contract.Proposed test assertion
msg := task.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add, 7) req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest) require.Equal(t, uint64(20), req.Config.StartTs) +require.Equal(t, uint64(7), req.MaintainerEpoch)As per coding guidelines,
**/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@maintainer/span/span_controller_test.go` around lines 490 - 493, The test in the span_controller_test.go file passes a maintainer epoch value of 7 to the NewAddDispatcherMessage function on line 490, but the subsequent assertion only verifies the StartTs field on line 492. Add an additional assertion after the existing require.Equal call for StartTs to verify that the epoch value (7) is properly propagated in the request's Config field. This ensures the test covers the complete contract of the NewAddDispatcherMessage call with all its parameters.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@downstreamadapter/dispatchermanager/helper.go`:
- Around line 294-306: When a Remove request targets a missing dispatcher in the
code around lines 384-413, the existing operator entry is not being deleted from
dispatcherManager.currentOperatorMap. This causes subsequent Create retries to
be dropped by the check at lines 301-303 (which returns false when
ScheduleAction_Create is detected), leaving the dispatcher stuck in an
inconsistent state. In the Remove request handling path when the dispatcher is
absent, add a call to dispatcherManager.currentOperatorMap.Delete(dispatcherID)
to clear the existing operator entry before proceeding, similar to the deletion
already done in the IsMaintainerRequestAllowed check at line 298.
In `@maintainer/maintainer.go`:
- Around line 1028-1030: The isMaintainerEpochResponseAllowed method in the
Maintainer struct needs to be updated to reject epoch-0 responses once the
maintainer enters strict mode. Currently, common.MaintainerEpochMatches accepts
responseEpoch == 0 regardless of the currentMaintainerEpoch value, which leaves
the stale-response path open after strict epochs are in use. Modify
isMaintainerEpochResponseAllowed to reject responses with epoch 0 when the
currentMaintainerEpoch is non-zero, allowing epoch-0 responses only while the
maintainer is still in compatibility mode (when currentMaintainerEpoch is 0).
In `@maintainer/scheduler/balance_splits.go`:
- Around line 135-149: The MaintainerEpoch() method is being called at two
different times: once when creating the NewSplitDispatcherOperator and again
when creating the NewAddDispatcherOperator within the callback. If the epoch
advances between these calls, the split and child add operators will be stamped
with different epochs. Capture the epoch value once before creating the
NewSplitDispatcherOperator by storing s.operatorController.MaintainerEpoch() in
a local variable, then reuse that same variable in both the
NewSplitDispatcherOperator constructor and the NewAddDispatcherOperator
constructor inside the callback function.
---
Outside diff comments:
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 725-754: The loop in the Range function iterates through all
operators in manager.GetCurrentOperatorMap() and appends them to
response.Operators without filtering for stale operators. After
TryUpdateMaintainer updates the manager to a newer owner/epoch, the
currentOperatorMap can still contain requests from the previous owner, which
causes a new maintainer to restore stale operators during bootstrap. Add a
filter condition before appending the operator to response.Operators that checks
if the operator belongs to the current owner/epoch and only includes it if it
does. This ensures stale operators from previous owners are not included in the
bootstrap response.
- Around line 274-303: The MaintainerFenceMu mutex is locked at the beginning of
the function but is not being unlocked before returning in the write-path-closed
error handling branches. Locate all the places where
dispatchermanager.IsWritePathClosedError(err) is checked and returns nil without
unlocking (including the one in NewTableTriggerEventDispatcher error handling
and any other similar patterns). Before each of these return statements, add
manager.MaintainerFenceMu.Unlock() to ensure the lock is released. This prevents
deadlocks that would occur when other control messages attempt to acquire this
lock.
---
Nitpick comments:
In `@maintainer/maintainer_test.go`:
- Around line 266-307: Add a test case to verify that a strict maintainer
rejects post-bootstrap responses with epoch 0, which is the compatibility
sentinel. Create a new test function similar to
TestMaintainerPostBootstrapResponseRequiresCurrentEpoch that constructs a strict
Maintainer with Epoch: 2 in the ChangeFeedInfo, calls
onMaintainerPostBootstrapResponse with a response containing epoch 0, and
asserts that postBootstrapMsg remains non-nil (indicating the response was
rejected). This ensures stale epoch-0 responses cannot regress back to being
accepted by strict maintainers.
In `@maintainer/span/span_controller_test.go`:
- Around line 490-493: The test in the span_controller_test.go file passes a
maintainer epoch value of 7 to the NewAddDispatcherMessage function on line 490,
but the subsequent assertion only verifies the StartTs field on line 492. Add an
additional assertion after the existing require.Equal call for StartTs to verify
that the epoch value (7) is properly propagated in the request's Config field.
This ensures the test covers the complete contract of the
NewAddDispatcherMessage call with all its parameters.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b2e0fbc4-b454-4bec-b9fb-cc2059074454
📒 Files selected for processing (41)
downstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_helper.godownstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatchermanager/dispatcher_manager_redo.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatchermanager/heartbeat_collector.godownstreamadapter/dispatchermanager/helper.godownstreamadapter/dispatchermanager/helper_test.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/helper.gomaintainer/barrier.gomaintainer/barrier_event.gomaintainer/barrier_test.gomaintainer/maintainer.gomaintainer/maintainer_controller.gomaintainer/maintainer_controller_bootstrap.gomaintainer/maintainer_controller_helper.gomaintainer/maintainer_controller_test.gomaintainer/maintainer_manager_maintainers.gomaintainer/maintainer_manager_test.gomaintainer/maintainer_test.gomaintainer/operator/operator_add.gomaintainer/operator/operator_add_test.gomaintainer/operator/operator_controller.gomaintainer/operator/operator_controller_test.gomaintainer/operator/operator_merge.gomaintainer/operator/operator_merge_test.gomaintainer/operator/operator_move.gomaintainer/operator/operator_move_test.gomaintainer/operator/operator_remove.gomaintainer/operator/operator_remove_test.gomaintainer/operator/operator_split.gomaintainer/operator/operator_split_test.gomaintainer/replica/replication_span.gomaintainer/replica/replication_span_test.gomaintainer/scheduler/balance.gomaintainer/scheduler/balance_splits.gomaintainer/scheduler/basic.gomaintainer/scheduler/drain.gomaintainer/scheduler/drain_test.gomaintainer/span/span_controller_test.go
| if existing, operatorExists := dispatcherManager.currentOperatorMap.Load(dispatcherID); operatorExists { | ||
| existingReq := existing.(SchedulerDispatcherRequest) | ||
| if !dispatcherManager.IsMaintainerRequestAllowed(existingReq.From, existingReq.MaintainerEpoch) { | ||
| dispatcherManager.currentOperatorMap.Delete(dispatcherID) | ||
| } else { | ||
| // Create requests must be serialized per dispatcherID; otherwise we can end up creating multiple | ||
| // dispatchers for the same span/dispatcherID. | ||
| if req.ScheduleAction == heartbeatpb.ScheduleAction_Create { | ||
| return common.DispatcherID{}, false | ||
| } | ||
| // Remove requests are allowed to proceed: removeDispatcher is idempotent and the incoming request | ||
| // may carry a newer OperatorType for maintainer bootstrap/failover reconstruction. | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Clear existing operators when a Remove targets a missing dispatcher.
Line 294 allows a current-epoch existing operator to remain while the Remove proceeds, but Lines 384-413 emit/skip the terminal remove path without deleting that existing entry when the dispatcher is absent. If that entry is a prior Create, later Create retries are dropped by Lines 301-303 and the dispatcher can stay stuck.
Proposed fix
if _, exists := dispatcherManager.redoDispatcherMap.Get(dispatcherID); exists {
dispatcherManager.currentOperatorMap.Store(dispatcherID, req)
log.Debug("store current working remove operator for redo dispatcher",
zap.String("changefeedID", req.ChangefeedID.String()),
zap.String("dispatcherID", dispatcherID.String()),
zap.Any("operator", req),
)
} else {
+ dispatcherManager.currentOperatorMap.Delete(dispatcherID)
log.Debug("redo dispatcher not found, skip remove operator store",
zap.String("changefeedID", req.ChangefeedID.String()),
zap.String("dispatcherID", dispatcherID.String()),
zap.Any("operator", req),
)
@@
if _, exists := dispatcherManager.dispatcherMap.Get(dispatcherID); exists {
dispatcherManager.currentOperatorMap.Store(dispatcherID, req)
log.Debug("store current working remove operator",
zap.String("changefeedID", req.ChangefeedID.String()),
zap.String("dispatcherID", dispatcherID.String()),
zap.Any("operator", req),
)
} else {
+ dispatcherManager.currentOperatorMap.Delete(dispatcherID)
log.Debug("dispatcher not found, skip remove operator store",
zap.String("changefeedID", req.ChangefeedID.String()),
zap.String("dispatcherID", dispatcherID.String()),
zap.Any("operator", req),
)Also applies to: 371-413
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/dispatchermanager/helper.go` around lines 294 - 306, When a
Remove request targets a missing dispatcher in the code around lines 384-413,
the existing operator entry is not being deleted from
dispatcherManager.currentOperatorMap. This causes subsequent Create retries to
be dropped by the check at lines 301-303 (which returns false when
ScheduleAction_Create is detected), leaving the dispatcher stuck in an
inconsistent state. In the Remove request handling path when the dispatcher is
absent, add a call to dispatcherManager.currentOperatorMap.Delete(dispatcherID)
to clear the existing operator entry before proceeding, similar to the deletion
already done in the IsMaintainerRequestAllowed check at line 298.
| func (m *Maintainer) isMaintainerEpochResponseAllowed(responseEpoch uint64) bool { | ||
| return common.MaintainerEpochMatches(responseEpoch, m.currentMaintainerEpoch()) | ||
| } |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Reject epoch-0 responses once the maintainer is strict.
common.MaintainerEpochMatches accepts responseEpoch == 0, so a maintainer with a non-zero current epoch can still accept unfenced bootstrap/post-bootstrap/close responses. That leaves the stale-response path open after strict epochs are in use; epoch 0 should only be accepted while this maintainer is still in compatibility mode.
Proposed fix
func (m *Maintainer) isMaintainerEpochResponseAllowed(responseEpoch uint64) bool {
- return common.MaintainerEpochMatches(responseEpoch, m.currentMaintainerEpoch())
+ currentEpoch := m.currentMaintainerEpoch()
+ if responseEpoch == 0 {
+ return currentEpoch == 0
+ }
+ return currentEpoch == 0 || responseEpoch == currentEpoch
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (m *Maintainer) isMaintainerEpochResponseAllowed(responseEpoch uint64) bool { | |
| return common.MaintainerEpochMatches(responseEpoch, m.currentMaintainerEpoch()) | |
| } | |
| func (m *Maintainer) isMaintainerEpochResponseAllowed(responseEpoch uint64) bool { | |
| currentEpoch := m.currentMaintainerEpoch() | |
| if responseEpoch == 0 { | |
| return currentEpoch == 0 | |
| } | |
| return currentEpoch == 0 || responseEpoch == currentEpoch | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@maintainer/maintainer.go` around lines 1028 - 1030, The
isMaintainerEpochResponseAllowed method in the Maintainer struct needs to be
updated to reject epoch-0 responses once the maintainer enters strict mode.
Currently, common.MaintainerEpochMatches accepts responseEpoch == 0 regardless
of the currentMaintainerEpoch value, which leaves the stale-response path open
after strict epochs are in use. Modify isMaintainerEpochResponseAllowed to
reject responses with epoch 0 when the currentMaintainerEpoch is non-zero,
allowing epoch-0 responses only while the maintainer is still in compatibility
mode (when currentMaintainerEpoch is 0).
| op := operator.NewSplitDispatcherOperator( | ||
| s.spanController, | ||
| checkResult.SplitSpan, | ||
| splitSpans, | ||
| checkResult.SplitTargetNodes, | ||
| s.operatorController.MaintainerEpoch(), | ||
| func(span *replica.SpanReplication, node node.ID) bool { | ||
| return s.operatorController.AddOperator(operator.NewAddDispatcherOperator( | ||
| s.spanController, | ||
| span, | ||
| node, | ||
| heartbeatpb.OperatorType_O_Split, | ||
| s.operatorController.MaintainerEpoch(), | ||
| )) | ||
| }, |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Capture one epoch snapshot for split and child add operators.
Line 140 and Line 147 read MaintainerEpoch() at different times. If epoch advances between split creation and postFinish, child add operators can be stamped with a different epoch than the split operator that produced them.
Suggested fix
if len(splitSpans) > 1 {
+ maintainerEpoch := s.operatorController.MaintainerEpoch()
op := operator.NewSplitDispatcherOperator(
s.spanController,
checkResult.SplitSpan,
splitSpans,
checkResult.SplitTargetNodes,
- s.operatorController.MaintainerEpoch(),
+ maintainerEpoch,
func(span *replica.SpanReplication, node node.ID) bool {
return s.operatorController.AddOperator(operator.NewAddDispatcherOperator(
s.spanController,
span,
node,
heartbeatpb.OperatorType_O_Split,
- s.operatorController.MaintainerEpoch(),
+ maintainerEpoch,
))
},
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@maintainer/scheduler/balance_splits.go` around lines 135 - 149, The
MaintainerEpoch() method is being called at two different times: once when
creating the NewSplitDispatcherOperator and again when creating the
NewAddDispatcherOperator within the callback. If the epoch advances between
these calls, the split and child add operators will be stamped with different
epochs. Capture the epoch value once before creating the
NewSplitDispatcherOperator by storing s.operatorController.MaintainerEpoch() in
a local variable, then reuse that same variable in both the
NewSplitDispatcherOperator constructor and the NewAddDispatcherOperator
constructor inside the callback function.
What problem does this PR solve?
Issue Number: ref #5083
This is PR 2 of 3 split from #5182 and is stacked on PR 1.
Background:
PR 1 persists a monotonic maintainer epoch before new ownership is scheduled.
The receiver side still needs to stamp outbound requests and reject control
messages from stale maintainer owners.
Motivation:
Without a receiver-local owner and epoch fence, a delayed old maintainer can
continue to send schedule, heartbeat, post-bootstrap, merge, or close messages
after a newer maintainer has taken over the same changefeed. Those stale
messages can mutate dispatcher state or complete operators that no longer
belong to the active owner.
What is changed and how it works?
This PR adds the receiver-side stale maintainer fence:
heartbeat, and remove messages with the current maintainer epoch.
operator controllers.
current maintainer flow.
still-running local maintainer.
compatibility.
heartbeat, merge, bootstrap, and close side effects.
newer owner can replace an older pending request.
a manager after a close has already completed.
Stack:
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No expected performance regression. The new mutexes serialize only
per-changefeed control operations such as bootstrap, close, schedule, merge, and
heartbeat response handling. They are not in the downstream event write path.
The change is wire-compatible. Epoch 0 remains accepted only for the current
compatibility-mode owner until the receiver observes a non-zero maintainer epoch.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Validation
make fmtgo test ./maintainer ./maintainer/operator ./maintainer/replica ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestratorSummary by CodeRabbit