Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 122 additions & 65 deletions downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,51 +283,23 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
return nil
}
if exists {
// Check and potentially add a table trigger event dispatcher.
// This is necessary during maintainer node migration, as the existing
// dispatcher manager on the new node may not have a table trigger
// event dispatcher configured yet.
if req.TableTriggerEventDispatcherId != nil {
tableTriggerDispatcher := manager.GetTableTriggerEventDispatcher()
if tableTriggerDispatcher == nil {
err = manager.NewTableTriggerEventDispatcher(
req.TableTriggerEventDispatcherId,
req.StartTs,
false,
)
if err != nil {
if dispatchermanager.IsWritePathClosedError(err) {
log.Info("dispatcher manager write path closed while creating table trigger event dispatcher",
zap.Stringer("changefeedID", cfId), zap.Error(err))
return nil
}
log.Error("failed to create new table trigger event dispatcher",
zap.Stringer("changefeedID", cfId), zap.Error(err))
manager.MaintainerFenceMu.Unlock()
return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err)
}
}
// A higher-epoch maintainer may reuse an existing DispatcherManager with
// the same table trigger ID. A fresh trigger ID is allowed only after the
// old maintainer handover leaves no old trigger on this node. If a
// different trigger ID is still present, the scheduler has violated the
// handover ordering; replacing it in place can duplicate DDL ownership,
// so fail the bootstrap instead.
if err := ensureBootstrapTableTriggerEventDispatcher(
cfId, manager, req.TableTriggerEventDispatcherId, req.StartTs,
); err != nil {
manager.MaintainerFenceMu.Unlock()
return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err)
}
if req.TableTriggerRedoDispatcherId != nil {
tableTriggerRedoDispatcher := manager.GetTableTriggerRedoDispatcher()
if tableTriggerRedoDispatcher == nil {
err = manager.NewTableTriggerRedoDispatcher(
req.TableTriggerRedoDispatcherId,
req.StartTs,
false,
)
if err != nil {
if dispatchermanager.IsWritePathClosedError(err) {
log.Info("dispatcher manager write path closed while creating table trigger redo dispatcher",
zap.Stringer("changefeedID", cfId), zap.Error(err))
return nil
}
log.Error("failed to create new table trigger redo dispatcher",
zap.Stringer("changefeedID", cfId), zap.Error(err))
manager.MaintainerFenceMu.Unlock()
return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err)
}
}
if err := ensureBootstrapTableTriggerRedoDispatcher(
cfId, manager, req.TableTriggerRedoDispatcherId, req.StartTs,
); err != nil {
manager.MaintainerFenceMu.Unlock()
return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err)
}
}

Expand All @@ -354,6 +326,70 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
return m.sendResponse(from, messaging.MaintainerManagerTopic, response)
}

// ensureBootstrapTableTriggerEventDispatcher verifies that a reused manager has
// the bootstrap owner's table trigger, or creates it when no trigger exists yet.
func ensureBootstrapTableTriggerEventDispatcher(
cfId common.ChangeFeedID,
manager *dispatchermanager.DispatcherManager,
id *heartbeatpb.DispatcherID,
startTs uint64,
) error {
if id == nil {
return nil
}
expectedID := common.NewDispatcherIDFromPB(id)
tableTriggerDispatcher := manager.GetTableTriggerEventDispatcher()
if tableTriggerDispatcher == nil {
if err := manager.NewTableTriggerEventDispatcher(id, startTs, false); err != nil {
log.Error("failed to create table trigger event dispatcher",
zap.Stringer("changefeedID", cfId), zap.Error(err))
return err
}
Comment on lines +343 to +347

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If manager.NewTableTriggerEventDispatcher fails because the write path is closed, it is an expected state during shutdown or fencing. Logging it as an Error and returning the error to the maintainer is a regression from the original behavior (where it was logged as Info and returned nil). We should handle IsWritePathClosedError specially and return nil to allow clean shutdown.

		if err := manager.NewTableTriggerEventDispatcher(id, startTs, false); err != nil {
			if dispatchermanager.IsWritePathClosedError(err) {
				log.Info("dispatcher manager write path closed while creating table trigger event dispatcher",
					zap.Stringer("changefeedID", cfId), zap.Error(err))
				return nil
			}
			log.Error("failed to create table trigger event dispatcher",
				zap.Stringer("changefeedID", cfId), zap.Error(err))
			return err
		}

return nil
}
if tableTriggerDispatcher.GetId() == expectedID {
return nil
}
log.Error("table trigger event dispatcher id mismatch during bootstrap",
zap.Stringer("changefeedID", cfId),
zap.Stringer("expectedDispatcherID", expectedID),
zap.Stringer("actualDispatcherID", tableTriggerDispatcher.GetId()))
return errors.ErrChangefeedInitTableTriggerDispatcherFailed.
GenWithStackByArgs("table trigger event dispatcher id mismatch during bootstrap")
}

// ensureBootstrapTableTriggerRedoDispatcher verifies that a reused manager has
// the bootstrap owner's redo table trigger, or creates it when no trigger exists yet.
func ensureBootstrapTableTriggerRedoDispatcher(
cfId common.ChangeFeedID,
manager *dispatchermanager.DispatcherManager,
id *heartbeatpb.DispatcherID,
startTs uint64,
) error {
if id == nil {
return nil
}
expectedID := common.NewDispatcherIDFromPB(id)
tableTriggerRedoDispatcher := manager.GetTableTriggerRedoDispatcher()
if tableTriggerRedoDispatcher == nil {
if err := manager.NewTableTriggerRedoDispatcher(id, startTs, false); err != nil {
log.Error("failed to create table trigger redo dispatcher",
zap.Stringer("changefeedID", cfId), zap.Error(err))
return err
}
Comment on lines +375 to +379

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similarly, if manager.NewTableTriggerRedoDispatcher fails because the write path is closed, we should handle IsWritePathClosedError specially and return nil instead of logging an Error and returning the error to the maintainer.

		if err := manager.NewTableTriggerRedoDispatcher(id, startTs, false); err != nil {
			if dispatchermanager.IsWritePathClosedError(err) {
				log.Info("dispatcher manager write path closed while creating table trigger redo dispatcher",
					zap.Stringer("changefeedID", cfId), zap.Error(err))
				return nil
			}
			log.Error("failed to create table trigger redo dispatcher",
				zap.Stringer("changefeedID", cfId), zap.Error(err))
			return err
		}

return nil
}
if tableTriggerRedoDispatcher.GetId() == expectedID {
return nil
}
log.Error("table trigger redo dispatcher id mismatch during bootstrap",
zap.Stringer("changefeedID", cfId),
zap.Stringer("expectedDispatcherID", expectedID),
zap.Stringer("actualDispatcherID", tableTriggerRedoDispatcher.GetId()))
return errors.ErrChangefeedInitTableTriggerDispatcherFailed.
GenWithStackByArgs("table trigger redo dispatcher id mismatch during bootstrap")
}

// handlePostBootstrapRequest handles the maintainer post-bootstrap request message.
// It initializes the table trigger event dispatcher with table schema information,
// which serves as the initial state for the table schema store. After initialization,
Expand Down Expand Up @@ -443,7 +479,6 @@ func (m *DispatcherOrchestrator) handlePostBootstrapRequest(
}

if m.fenced.Load() {
manager.MaintainerFenceMu.Unlock()
manager.LocalFence()
return nil
}
Comment on lines 481 to 484

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In handlePostBootstrapRequest, if m.fenced.Load() is true, the function returns early without unlocking manager.MaintainerFenceMu. This will cause a mutex leak and potential deadlocks during shutdown or subsequent requests. We should unlock manager.MaintainerFenceMu before calling manager.LocalFence() and returning.

Suggested change
if m.fenced.Load() {
manager.MaintainerFenceMu.Unlock()
manager.LocalFence()
return nil
}
if m.fenced.Load() {
manager.MaintainerFenceMu.Unlock()
manager.LocalFence()
return nil
}

Expand Down Expand Up @@ -722,36 +757,58 @@ func retrieveOperatorsForBootstrapResponse(
manager *dispatchermanager.DispatcherManager,
response *heartbeatpb.MaintainerBootstrapResponse,
) {
reportedDispatchers := make(map[reportedDispatcherKey]struct{}, len(response.Spans))
for _, span := range response.Spans {
reportedDispatchers[reportedDispatcherKey{
id: common.NewDispatcherIDFromPB(span.ID),
mode: span.Mode,
}] = struct{}{}
}

manager.GetCurrentOperatorMap().Range(func(_, value any) bool {
req := value.(dispatchermanager.SchedulerDispatcherRequest)
requestAllowed := manager.IsMaintainerRequestAllowed(req.From, req.MaintainerEpoch)
dispatcherID := common.NewDispatcherIDFromPB(req.Config.DispatcherID)
if common.IsRedoMode(req.Config.Mode) {
if manager.IsRedoReady() {
_, ok := manager.GetRedoDispatcherMap().Get(dispatcherID)
// Log error if dispatcher not found and action is not create
// It's possible that the dispatcher is not found when the action is create
// because the dispatcher may be created after the operator is stored
if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create {
log.Error("Redo dispatcher not found, this should not happen",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()),
)
}
dispatcherExistsKnown := !common.IsRedoMode(req.Config.Mode) || manager.IsRedoReady()
_, dispatcherReported := reportedDispatchers[reportedDispatcherKey{
id: dispatcherID,
mode: req.Config.Mode,
}]
if !requestAllowed {
// Restore stale remove only when the same bootstrap snapshot reports the dispatcher.
// This keeps the working span and cleanup intent consistent even if live maps change during cleanup.
if req.ScheduleAction != heartbeatpb.ScheduleAction_Remove || !dispatcherReported {
return true
}
} else {
_, ok := manager.GetDispatcherMap().Get(dispatcherID)
// Log error if dispatcher not found and action is not create
// It's possible that the dispatcher is not found when the action is create
// because the dispatcher may be created after the operator is stored
if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create {
log.Info("include stale remove operator in bootstrap response",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()),
zap.String("from", req.From.String()),
zap.Uint64("requestMaintainerEpoch", req.MaintainerEpoch),
zap.Uint64("currentMaintainerEpoch", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
}
// Log error if dispatcher not found and action is not create.
// It's possible that the dispatcher is not found when the action is create
// because the dispatcher may be created after the operator is stored.
if dispatcherExistsKnown && !dispatcherReported && req.ScheduleAction != heartbeatpb.ScheduleAction_Create {
if common.IsRedoMode(req.Config.Mode) {
log.Error("Redo dispatcher not found, this should not happen",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()))
} else {
log.Error("Dispatcher not found, this should not happen",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()),
)
zap.String("dispatcherID", req.Config.DispatcherID.String()))
}
}
response.Operators = append(response.Operators,
proto.Clone(req.ScheduleDispatcherRequest).(*heartbeatpb.ScheduleDispatcherRequest))
return true
})
}

type reportedDispatcherKey struct {
id common.DispatcherID
mode int64
}
Loading