Skip to content
Open
25 changes: 14 additions & 11 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type DispatcherManager struct {
maintainerEpoch uint64
maintainerID node.ID
}
// MaintainerFenceMu serializes maintainer owner/epoch changes with request
// fence checks and scheduler side effects.
MaintainerFenceMu sync.Mutex

pdClock pdutil.Clock

Expand All @@ -112,15 +115,12 @@ type DispatcherManager struct {
dispatcherMap *DispatcherMap[*dispatcher.EventDispatcher]
// redoDispatcherMap restore all the redo dispatchers in the DispatcherManager, including table trigger redo dispatcher
redoDispatcherMap *DispatcherMap[*dispatcher.RedoDispatcher]
// currentOperatorMap stores at most one in-flight scheduling request per dispatcherID (event and redo).
// currentOperatorMap stores one in-flight scheduling request per dispatcherID.
//
// It is used for:
// - suppressing duplicate maintainer requests for the same dispatcher,
// - reporting unfinished requests during bootstrap so a new maintainer can restore operators,
// - cleaning up remove requests when a dispatcher is fully removed.
//
// Entries must be deleted on completion (create -> after creation; remove -> on cleanup), otherwise
// future maintainer requests for the same dispatcherID will be ignored.
// The value carries sender and maintainer epoch so bootstrap recovery can
// return only current-epoch operators, and precheck can replace stale entries.
// Entries must be deleted on completion, otherwise future requests for the
// same dispatcherID will be ignored.
currentOperatorMap sync.Map // map[common.DispatcherID]SchedulerDispatcherRequest (in dispatcher manager, not heartbeatpb)
// schemaIDToDispatchers is shared in the DispatcherManager,
// it store all the infos about schemaID->Dispatchers
Expand Down Expand Up @@ -208,6 +208,7 @@ func NewDispatcherManager(
tableTriggerRedoDispatcherID *heartbeatpb.DispatcherID,
startTs uint64,
maintainerID node.ID,
maintainerEpoch uint64,
newChangefeed bool,
registerInitializing func(*DispatcherManager) bool,
) (manager *DispatcherManager, err error) {
Expand Down Expand Up @@ -255,8 +256,10 @@ func NewDispatcherManager(
metricRedoCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name(), "redoDispatcher"),
}

// Set the epoch and maintainerID of the event dispatcher manager
manager.meta.maintainerEpoch = cfConfig.Epoch
// Trust only the explicit request maintainer epoch for receiver fencing. The
// config epoch may be newer than an old rolling-upgrade request and must not
// turn epoch 0 compatibility traffic into strict-mode traffic.
manager.meta.maintainerEpoch = maintainerEpoch
manager.meta.maintainerID = maintainerID
cleanupManager := manager
defer func() {
Expand Down Expand Up @@ -451,7 +454,7 @@ func (e *DispatcherManager) NewTableTriggerEventDispatcher(id *heartbeatpb.Dispa
infos := map[common.DispatcherID]dispatcherCreateInfo{}
dispatcherID := common.NewDispatcherIDFromPB(id)
infos[dispatcherID] = dispatcherCreateInfo{
Id: dispatcherID,
ID: dispatcherID,
TableSpan: common.KeyspaceDDLSpan(e.keyspaceID),
StartTs: startTs,
SchemaID: 0,
Expand Down
25 changes: 13 additions & 12 deletions downstreamadapter/dispatchermanager/dispatcher_manager_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func prepareCreateDispatcher[T dispatcher.Dispatcher](infos map[common.Dispatche
schemaIds := make([]int64, 0, len(infos))
skipDMLAsStartTsList := make([]bool, 0, len(infos))
for _, info := range infos {
id := info.Id
id := info.ID
if _, ok := dispatcherMap.Get(id); ok {
continue
}
Expand Down Expand Up @@ -266,17 +266,7 @@ func removeDispatcher[T dispatcher.Dispatcher](e *DispatcherManager,
}
}

// Submit async remove task to thread pool
task := &RemoveDispatcherTask{
manager: e,
dispatcherItem: dispatcherItem,
retryCount: 0,
}
scheduler := GetRemoveDispatcherTaskScheduler()
taskHandle := scheduler.Submit(task, time.Now())

// Save taskHandle for later cancellation
e.removeTaskHandles.Store(id, taskHandle)
e.submitRemoveDispatcherTask(dispatcherItem)

dispatcherItem.SetTryRemoving()

Expand All @@ -296,6 +286,17 @@ func removeDispatcher[T dispatcher.Dispatcher](e *DispatcherManager,
}
}

func (e *DispatcherManager) submitRemoveDispatcherTask(dispatcherItem dispatcher.Dispatcher) {
task := &RemoveDispatcherTask{
manager: e,
dispatcherItem: dispatcherItem,
retryCount: 0,
}
scheduler := GetRemoveDispatcherTaskScheduler()
taskHandle := scheduler.Submit(task, time.Now())
e.removeTaskHandles.Store(dispatcherItem.GetId(), taskHandle)
}

// closeAllDispatchers is called when the event dispatcher manager is closing
func closeAllDispatchers[T dispatcher.Dispatcher](changefeedID common.ChangeFeedID,
dispatcherMap *DispatcherMap[T],
Expand Down
38 changes: 34 additions & 4 deletions downstreamadapter/dispatchermanager/dispatcher_manager_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"github.com/pingcap/ticdc/pkg/node"
)

// event_dispatcher_mananger_info.go is used to store the basic info and function of the event dispatcher manager
// dispatcher_manager_info.go stores the basic info and functions of the dispatcher manager.

type dispatcherCreateInfo struct {
Id common.DispatcherID
ID common.DispatcherID
TableSpan *heartbeatpb.TableSpan
StartTs uint64
SchemaID int64
Expand All @@ -52,10 +52,40 @@ func (e *DispatcherManager) GetMaintainerID() node.ID {
return e.meta.maintainerID
}

func (e *DispatcherManager) SetMaintainerID(maintainerID node.ID) {
// TryUpdateMaintainer records the active maintainer owner and epoch.
// Maintainer epoch 0 is accepted only while the manager is still in compatibility
// mode. Once a non-zero epoch is known, epoch 0 must never downgrade the receiver
// back to compatibility mode.
func (e *DispatcherManager) TryUpdateMaintainer(from node.ID, maintainerEpoch uint64) bool {
e.meta.Lock()
defer e.meta.Unlock()
e.meta.maintainerID = maintainerID
if maintainerEpoch == 0 {
if e.meta.maintainerEpoch != 0 {
return false
}
e.meta.maintainerID = from
return true
}
if e.meta.maintainerEpoch > maintainerEpoch {
return false
}
if e.meta.maintainerEpoch == maintainerEpoch && e.meta.maintainerID != "" && e.meta.maintainerID != from {
return false
}
e.meta.maintainerEpoch = maintainerEpoch
e.meta.maintainerID = from
return true
}

// IsMaintainerRequestAllowed reports whether a request belongs to the current
// maintainer owner/epoch view known by this dispatcher manager.
func (e *DispatcherManager) IsMaintainerRequestAllowed(from node.ID, maintainerEpoch uint64) bool {
e.meta.Lock()
defer e.meta.Unlock()
if maintainerEpoch == 0 {
return e.meta.maintainerEpoch == 0 && (e.meta.maintainerID == "" || e.meta.maintainerID == from)
}
return e.meta.maintainerEpoch == maintainerEpoch && e.meta.maintainerID == from
}
Comment on lines +82 to 89

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The IsMaintainerRequestAllowed method currently acquires a write lock (e.meta.Lock()) even though it only performs read operations. Since e.meta embeds sync.RWMutex, it is highly recommended to use RLock() and RUnlock() here to allow concurrent read access and avoid unnecessary serialization of heartbeat and scheduler checks.

Additionally, GetMaintainerID() and GetMaintainerEpoch() (which are called concurrently in other parts of the codebase, such as logging inside preCheckForSchedulerHandler) currently access e.meta fields without any locking. This introduces a data race with TryUpdateMaintainer which writes to these fields. They should also be updated to use RLock() and RUnlock() to ensure thread safety.

Suggested change
func (e *DispatcherManager) IsMaintainerRequestAllowed(from node.ID, maintainerEpoch uint64) bool {
e.meta.Lock()
defer e.meta.Unlock()
if maintainerEpoch == 0 {
return e.meta.maintainerEpoch == 0 && (e.meta.maintainerID == "" || e.meta.maintainerID == from)
}
return e.meta.maintainerEpoch == maintainerEpoch && e.meta.maintainerID == from
}
func (e *DispatcherManager) IsMaintainerRequestAllowed(from node.ID, maintainerEpoch uint64) bool {
e.meta.RLock()
defer e.meta.RUnlock()
if maintainerEpoch == 0 {
return e.meta.maintainerEpoch == 0 && (e.meta.maintainerID == "" || e.meta.maintainerID == from)
}
return e.meta.maintainerEpoch == maintainerEpoch && e.meta.maintainerID == from
}


func (e *DispatcherManager) GetMaintainerEpoch() uint64 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (e *DispatcherManager) NewTableTriggerRedoDispatcher(id *heartbeatpb.Dispat
infos := map[common.DispatcherID]dispatcherCreateInfo{}
dispatcherID := common.NewDispatcherIDFromPB(id)
infos[dispatcherID] = dispatcherCreateInfo{
Id: dispatcherID,
ID: dispatcherID,
TableSpan: common.KeyspaceDDLSpan(e.keyspaceID),
StartTs: startTs,
SchemaID: 0,
Expand Down
20 changes: 4 additions & 16 deletions downstreamadapter/dispatchermanager/dispatcher_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/ticdc/downstreamadapter/eventcollector"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/downstreamadapter/sink/mock"
"github.com/pingcap/ticdc/downstreamadapter/sink/mysql"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/schemastore"
"github.com/pingcap/ticdc/pkg/common"
Expand All @@ -37,7 +36,6 @@ import (
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/routing"
mysqlcfg "github.com/pingcap/ticdc/pkg/sink/mysql"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -500,20 +498,9 @@ func TestMergeDispatcherInvalidIDs(t *testing.T) {

func TestTryCloseRemovedRequestAfterClosedReturnsImmediatelyAndTriggersCleanup(t *testing.T) {
changefeedID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
mysqlConfig := mysqlcfg.New()
mysqlConfig.EnableDDLTs = false
mysqlSink := mysql.NewMySQLSink(
context.Background(),
changefeedID,
mysqlConfig,
nil,
false,
false,
time.Minute,
)
manager := &DispatcherManager{
changefeedID: changefeedID,
sink: mysqlSink,
sink: newDispatcherManagerTestSink(t, common.BlackHoleSinkType),
}
manager.closed.Store(true)

Expand Down Expand Up @@ -664,6 +651,7 @@ func TestNewDispatcherManagerReturnsFenceErrorWhenInitializingRegistrationReject
nil,
1,
node.ID("maintainer"),
1,
true,
func(manager *DispatcherManager) bool {
hookCalled.Store(true)
Expand Down Expand Up @@ -795,7 +783,7 @@ func TestCreateDispatcherByInfoKeepsCreateOperatorWhenFenced(t *testing.T) {
manager := createTestManager(t)
manager.writePathClosed.Store(true)
dispatcherID := common.NewDispatcherID()
createReq := NewSchedulerDispatcherRequest(&heartbeatpb.ScheduleDispatcherRequest{
createReq := NewSchedulerDispatcherRequest(node.ID("maintainer"), &heartbeatpb.ScheduleDispatcherRequest{
ChangefeedID: manager.changefeedID.ToPB(),
Config: &heartbeatpb.DispatcherConfig{
DispatcherID: dispatcherID.ToPB(),
Expand All @@ -812,7 +800,7 @@ func TestCreateDispatcherByInfoKeepsCreateOperatorWhenFenced(t *testing.T) {

createDispatcherByInfo(manager, map[common.DispatcherID]dispatcherCreateInfo{
dispatcherID: {
Id: dispatcherID,
ID: dispatcherID,
TableSpan: &heartbeatpb.TableSpan{
TableID: 1,
},
Expand Down
6 changes: 3 additions & 3 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,12 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
heartbeatResponse := msg.Message[0].(*heartbeatpb.HeartBeatResponse)
c.heartBeatResponseDynamicStream.Push(
common.NewChangefeedGIDFromPB(heartbeatResponse.ChangefeedID),
NewHeartBeatResponse(heartbeatResponse))
NewHeartBeatResponse(msg.From, heartbeatResponse))
case messaging.TypeScheduleDispatcherRequest:
schedulerDispatcherRequest := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
c.schedulerDispatcherRequestDynamicStream.Push(
common.NewChangefeedGIDFromPB(schedulerDispatcherRequest.ChangefeedID),
NewSchedulerDispatcherRequest(schedulerDispatcherRequest))
NewSchedulerDispatcherRequest(msg.From, schedulerDispatcherRequest))
// TODO: check metrics
metrics.HandleDispatcherRequsetCounter.WithLabelValues("default", schedulerDispatcherRequest.ChangefeedID.Name, "receive").Inc()
case messaging.TypeCheckpointTsMessage:
Expand All @@ -290,7 +290,7 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
mergeDispatcherRequest := msg.Message[0].(*heartbeatpb.MergeDispatcherRequest)
c.mergeDispatcherRequestDynamicStream.Push(
common.NewChangefeedGIDFromPB(mergeDispatcherRequest.ChangefeedID),
NewMergeDispatcherRequest(mergeDispatcherRequest))
NewMergeDispatcherRequest(msg.From, mergeDispatcherRequest))
default:
log.Warn("unknown message type, ignore it",
zap.String("type", msg.Type.String()),
Expand Down
Loading
Loading