diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index bd5afde268..8cc320b750 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -283,51 +283,23 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest( return nil } if exists { - // Check and potentially add a table trigger event dispatcher. - // This is necessary during maintainer node migration, as the existing - // dispatcher manager on the new node may not have a table trigger - // event dispatcher configured yet. - if req.TableTriggerEventDispatcherId != nil { - tableTriggerDispatcher := manager.GetTableTriggerEventDispatcher() - if tableTriggerDispatcher == nil { - err = manager.NewTableTriggerEventDispatcher( - req.TableTriggerEventDispatcherId, - req.StartTs, - false, - ) - if err != nil { - if dispatchermanager.IsWritePathClosedError(err) { - log.Info("dispatcher manager write path closed while creating table trigger event dispatcher", - zap.Stringer("changefeedID", cfId), zap.Error(err)) - return nil - } - log.Error("failed to create new table trigger event dispatcher", - zap.Stringer("changefeedID", cfId), zap.Error(err)) - manager.MaintainerFenceMu.Unlock() - return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err) - } - } + // A higher-epoch maintainer may reuse an existing DispatcherManager with + // the same table trigger ID. A fresh trigger ID is allowed only after the + // old maintainer handover leaves no old trigger on this node. If a + // different trigger ID is still present, the scheduler has violated the + // handover ordering; replacing it in place can duplicate DDL ownership, + // so fail the bootstrap instead. + if err := ensureBootstrapTableTriggerEventDispatcher( + cfId, manager, req.TableTriggerEventDispatcherId, req.StartTs, + ); err != nil { + manager.MaintainerFenceMu.Unlock() + return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err) } - if req.TableTriggerRedoDispatcherId != nil { - tableTriggerRedoDispatcher := manager.GetTableTriggerRedoDispatcher() - if tableTriggerRedoDispatcher == nil { - err = manager.NewTableTriggerRedoDispatcher( - req.TableTriggerRedoDispatcherId, - req.StartTs, - false, - ) - if err != nil { - if dispatchermanager.IsWritePathClosedError(err) { - log.Info("dispatcher manager write path closed while creating table trigger redo dispatcher", - zap.Stringer("changefeedID", cfId), zap.Error(err)) - return nil - } - log.Error("failed to create new table trigger redo dispatcher", - zap.Stringer("changefeedID", cfId), zap.Error(err)) - manager.MaintainerFenceMu.Unlock() - return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err) - } - } + if err := ensureBootstrapTableTriggerRedoDispatcher( + cfId, manager, req.TableTriggerRedoDispatcherId, req.StartTs, + ); err != nil { + manager.MaintainerFenceMu.Unlock() + return m.handleDispatcherError(from, req.ChangefeedID, maintainerEpoch, err) } } @@ -354,6 +326,70 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest( return m.sendResponse(from, messaging.MaintainerManagerTopic, response) } +// ensureBootstrapTableTriggerEventDispatcher verifies that a reused manager has +// the bootstrap owner's table trigger, or creates it when no trigger exists yet. +func ensureBootstrapTableTriggerEventDispatcher( + cfId common.ChangeFeedID, + manager *dispatchermanager.DispatcherManager, + id *heartbeatpb.DispatcherID, + startTs uint64, +) error { + if id == nil { + return nil + } + expectedID := common.NewDispatcherIDFromPB(id) + tableTriggerDispatcher := manager.GetTableTriggerEventDispatcher() + if tableTriggerDispatcher == nil { + if err := manager.NewTableTriggerEventDispatcher(id, startTs, false); err != nil { + log.Error("failed to create table trigger event dispatcher", + zap.Stringer("changefeedID", cfId), zap.Error(err)) + return err + } + return nil + } + if tableTriggerDispatcher.GetId() == expectedID { + return nil + } + log.Error("table trigger event dispatcher id mismatch during bootstrap", + zap.Stringer("changefeedID", cfId), + zap.Stringer("expectedDispatcherID", expectedID), + zap.Stringer("actualDispatcherID", tableTriggerDispatcher.GetId())) + return errors.ErrChangefeedInitTableTriggerDispatcherFailed. + GenWithStackByArgs("table trigger event dispatcher id mismatch during bootstrap") +} + +// ensureBootstrapTableTriggerRedoDispatcher verifies that a reused manager has +// the bootstrap owner's redo table trigger, or creates it when no trigger exists yet. +func ensureBootstrapTableTriggerRedoDispatcher( + cfId common.ChangeFeedID, + manager *dispatchermanager.DispatcherManager, + id *heartbeatpb.DispatcherID, + startTs uint64, +) error { + if id == nil { + return nil + } + expectedID := common.NewDispatcherIDFromPB(id) + tableTriggerRedoDispatcher := manager.GetTableTriggerRedoDispatcher() + if tableTriggerRedoDispatcher == nil { + if err := manager.NewTableTriggerRedoDispatcher(id, startTs, false); err != nil { + log.Error("failed to create table trigger redo dispatcher", + zap.Stringer("changefeedID", cfId), zap.Error(err)) + return err + } + return nil + } + if tableTriggerRedoDispatcher.GetId() == expectedID { + return nil + } + log.Error("table trigger redo dispatcher id mismatch during bootstrap", + zap.Stringer("changefeedID", cfId), + zap.Stringer("expectedDispatcherID", expectedID), + zap.Stringer("actualDispatcherID", tableTriggerRedoDispatcher.GetId())) + return errors.ErrChangefeedInitTableTriggerDispatcherFailed. + GenWithStackByArgs("table trigger redo dispatcher id mismatch during bootstrap") +} + // handlePostBootstrapRequest handles the maintainer post-bootstrap request message. // It initializes the table trigger event dispatcher with table schema information, // which serves as the initial state for the table schema store. After initialization, @@ -443,7 +479,6 @@ func (m *DispatcherOrchestrator) handlePostBootstrapRequest( } if m.fenced.Load() { - manager.MaintainerFenceMu.Unlock() manager.LocalFence() return nil } @@ -722,32 +757,49 @@ func retrieveOperatorsForBootstrapResponse( manager *dispatchermanager.DispatcherManager, response *heartbeatpb.MaintainerBootstrapResponse, ) { + reportedDispatchers := make(map[reportedDispatcherKey]struct{}, len(response.Spans)) + for _, span := range response.Spans { + reportedDispatchers[reportedDispatcherKey{ + id: common.NewDispatcherIDFromPB(span.ID), + mode: span.Mode, + }] = struct{}{} + } + manager.GetCurrentOperatorMap().Range(func(_, value any) bool { req := value.(dispatchermanager.SchedulerDispatcherRequest) + requestAllowed := manager.IsMaintainerRequestAllowed(req.From, req.MaintainerEpoch) dispatcherID := common.NewDispatcherIDFromPB(req.Config.DispatcherID) - if common.IsRedoMode(req.Config.Mode) { - if manager.IsRedoReady() { - _, ok := manager.GetRedoDispatcherMap().Get(dispatcherID) - // Log error if dispatcher not found and action is not create - // It's possible that the dispatcher is not found when the action is create - // because the dispatcher may be created after the operator is stored - if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create { - log.Error("Redo dispatcher not found, this should not happen", - zap.String("changefeed", changefeedID.String()), - zap.String("dispatcherID", req.Config.DispatcherID.String()), - ) - } + dispatcherExistsKnown := !common.IsRedoMode(req.Config.Mode) || manager.IsRedoReady() + _, dispatcherReported := reportedDispatchers[reportedDispatcherKey{ + id: dispatcherID, + mode: req.Config.Mode, + }] + if !requestAllowed { + // Restore stale remove only when the same bootstrap snapshot reports the dispatcher. + // This keeps the working span and cleanup intent consistent even if live maps change during cleanup. + if req.ScheduleAction != heartbeatpb.ScheduleAction_Remove || !dispatcherReported { + return true } - } else { - _, ok := manager.GetDispatcherMap().Get(dispatcherID) - // Log error if dispatcher not found and action is not create - // It's possible that the dispatcher is not found when the action is create - // because the dispatcher may be created after the operator is stored - if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create { + log.Info("include stale remove operator in bootstrap response", + zap.String("changefeed", changefeedID.String()), + zap.String("dispatcherID", req.Config.DispatcherID.String()), + zap.String("from", req.From.String()), + zap.Uint64("requestMaintainerEpoch", req.MaintainerEpoch), + zap.Uint64("currentMaintainerEpoch", manager.GetMaintainerEpoch()), + zap.String("currentMaintainer", manager.GetMaintainerID().String())) + } + // Log error if dispatcher not found and action is not create. + // It's possible that the dispatcher is not found when the action is create + // because the dispatcher may be created after the operator is stored. + if dispatcherExistsKnown && !dispatcherReported && req.ScheduleAction != heartbeatpb.ScheduleAction_Create { + if common.IsRedoMode(req.Config.Mode) { + log.Error("Redo dispatcher not found, this should not happen", + zap.String("changefeed", changefeedID.String()), + zap.String("dispatcherID", req.Config.DispatcherID.String())) + } else { log.Error("Dispatcher not found, this should not happen", zap.String("changefeed", changefeedID.String()), - zap.String("dispatcherID", req.Config.DispatcherID.String()), - ) + zap.String("dispatcherID", req.Config.DispatcherID.String())) } } response.Operators = append(response.Operators, @@ -755,3 +807,8 @@ func retrieveOperatorsForBootstrapResponse( return true }) } + +type reportedDispatcherKey struct { + id common.DispatcherID + mode int64 +} diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go index 0e3781249a..4e14b6e282 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go @@ -15,15 +15,20 @@ package dispatcherorchestrator import ( "context" + "encoding/json" "fmt" "testing" "time" "github.com/pingcap/ticdc/downstreamadapter/dispatchermanager" + "github.com/pingcap/ticdc/downstreamadapter/eventcollector" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/stretchr/testify/require" ) @@ -173,6 +178,7 @@ func newTestDispatcherOrchestrator() *DispatcherOrchestrator { orchestrator := &DispatcherOrchestrator{ dispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager), initializingDispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager), + closedMaintainerEpochs: make(map[common.ChangeFeedID]uint64), shards: make([]*orchestratorShard, dispatcherOrchestratorShardCount), } for i := range orchestrator.shards { @@ -305,6 +311,75 @@ func TestPendingMessageQueue_CloseRequestRemovedTrueOverridesPendingFalse(t *tes require.True(t, req.Removed) } +func TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose(t *testing.T) { + t.Parallel() + + q := newPendingMessageQueue() + cfID := common.NewChangeFeedIDWithName("cf", "default") + key := pendingMessageKey{ + changefeedID: cfID, + msgType: messaging.TypeMaintainerCloseRequest, + } + + newerClose := messaging.NewSingleTargetMessage( + node.ID("to"), + messaging.DispatcherManagerManagerTopic, + &heartbeatpb.MaintainerCloseRequest{ + ChangefeedID: cfID.ToPB(), + MaintainerEpoch: 2, + Removed: false, + }, + ) + staleRemovedClose := messaging.NewSingleTargetMessage( + node.ID("to"), + messaging.DispatcherManagerManagerTopic, + &heartbeatpb.MaintainerCloseRequest{ + ChangefeedID: cfID.ToPB(), + MaintainerEpoch: 1, + Removed: true, + }, + ) + + require.True(t, q.TryEnqueue(key, newerClose)) + require.False(t, q.TryEnqueue(key, staleRemovedClose)) + + poppedMsg, ok := q.Pop() + require.True(t, ok) + req := poppedMsg.Message[0].(*heartbeatpb.MaintainerCloseRequest) + require.Equal(t, uint64(2), req.MaintainerEpoch) + require.False(t, req.Removed) +} + +func TestPendingMessageQueue_NewerMaintainerEpochOverridesPendingRequest(t *testing.T) { + t.Parallel() + + q := newPendingMessageQueue() + cfID := common.NewChangeFeedIDWithName("cf", "default") + key := pendingMessageKey{ + changefeedID: cfID, + msgType: messaging.TypeMaintainerBootstrapRequest, + } + + oldMsg := messaging.NewSingleTargetMessage( + node.ID("to"), + messaging.DispatcherManagerManagerTopic, + &heartbeatpb.MaintainerBootstrapRequest{ChangefeedID: cfID.ToPB(), MaintainerEpoch: 1}, + ) + newMsg := messaging.NewSingleTargetMessage( + node.ID("to"), + messaging.DispatcherManagerManagerTopic, + &heartbeatpb.MaintainerBootstrapRequest{ChangefeedID: cfID.ToPB(), MaintainerEpoch: 2}, + ) + + require.True(t, q.TryEnqueue(key, oldMsg)) + require.True(t, q.TryEnqueue(key, newMsg)) + + poppedMsg, ok := q.Pop() + require.True(t, ok) + req := poppedMsg.Message[0].(*heartbeatpb.MaintainerBootstrapRequest) + require.Equal(t, uint64(2), req.MaintainerEpoch) +} + func TestPendingMessageQueue_CloseRequestUpgradeAfterPopKeepsReturnedMessageStable(t *testing.T) { t.Parallel() @@ -515,3 +590,282 @@ func TestDispatcherOrchestratorLocalFenceFencesInitializingManagersImmediately(t err := manager.InitalizeTableTriggerEventDispatcher(nil) require.True(t, dispatchermanager.IsWritePathClosedError(err)) } + +func TestBootstrapResponseRestoresCurrentOperatorsAndStaleRemoves(t *testing.T) { + appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test()) + appcontext.SetService(appcontext.MessageCenter, messaging.NewMockMessageCenter()) + heartbeatCollector := dispatchermanager.NewHeartBeatCollector(node.ID("receiver")) + heartbeatCollector.Run(context.Background()) + appcontext.SetService(appcontext.HeartbeatCollector, heartbeatCollector) + t.Cleanup(heartbeatCollector.Close) + appcontext.SetService(appcontext.EventCollector, eventcollector.New(node.ID("receiver"))) + + cfID := common.NewChangeFeedIDWithName("cf", "default") + currentDispatcherID := common.NewDispatcherID() + oldCreateDispatcherID := common.NewDispatcherID() + staleRemoveDispatcherID := common.NewDispatcherID() + manager, err := dispatchermanager.NewDispatcherManager( + 0, + cfID, + newBootstrapResponseTestChangefeedConfig(cfID), + staleRemoveDispatcherID.ToPB(), + nil, + 100, + node.ID("current-maintainer"), + 2, + false, + nil, + ) + require.NoError(t, err) + t.Cleanup(func() { + manager.TryClose(false) + }) + + manager.GetCurrentOperatorMap().Store( + currentDispatcherID, + dispatchermanager.NewSchedulerDispatcherRequest( + node.ID("current-maintainer"), + newBootstrapResponseTestScheduleRequest(cfID, currentDispatcherID, 2), + ), + ) + manager.GetCurrentOperatorMap().Store( + oldCreateDispatcherID, + dispatchermanager.NewSchedulerDispatcherRequest( + node.ID("old-maintainer"), + newBootstrapResponseTestScheduleRequest(cfID, oldCreateDispatcherID, 1), + ), + ) + staleRemoveReq := newBootstrapResponseTestScheduleRequest(cfID, staleRemoveDispatcherID, 1) + staleRemoveReq.ScheduleAction = heartbeatpb.ScheduleAction_Remove + staleRemoveReq.OperatorType = heartbeatpb.OperatorType_O_Move + manager.GetCurrentOperatorMap().Store( + staleRemoveDispatcherID, + dispatchermanager.NewSchedulerDispatcherRequest( + node.ID("old-maintainer"), + staleRemoveReq, + ), + ) + + response := createBootstrapResponse(cfID.ToPB(), manager, 0, 0) + require.Len(t, response.Operators, 2) + operators := make(map[common.DispatcherID]*heartbeatpb.ScheduleDispatcherRequest) + for _, op := range response.Operators { + operators[common.NewDispatcherIDFromPB(op.Config.DispatcherID)] = op + } + currentOp, ok := operators[currentDispatcherID] + require.True(t, ok) + require.Equal(t, uint64(2), currentOp.MaintainerEpoch) + require.Equal(t, heartbeatpb.ScheduleAction_Create, currentOp.ScheduleAction) + require.NotContains(t, operators, oldCreateDispatcherID) + staleRemoveOp, ok := operators[staleRemoveDispatcherID] + require.True(t, ok) + require.Equal(t, uint64(1), staleRemoveOp.MaintainerEpoch) + require.Equal(t, heartbeatpb.ScheduleAction_Remove, staleRemoveOp.ScheduleAction) + require.Equal(t, heartbeatpb.OperatorType_O_Move, staleRemoveOp.OperatorType) +} + +func TestBootstrapResponseUsesSpanSnapshotForStaleRemove(t *testing.T) { + appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test()) + appcontext.SetService(appcontext.MessageCenter, messaging.NewMockMessageCenter()) + heartbeatCollector := dispatchermanager.NewHeartBeatCollector(node.ID("receiver")) + heartbeatCollector.Run(context.Background()) + appcontext.SetService(appcontext.HeartbeatCollector, heartbeatCollector) + t.Cleanup(heartbeatCollector.Close) + + cfID := common.NewChangeFeedIDWithName("cf", "default") + manager, err := dispatchermanager.NewDispatcherManager( + 0, + cfID, + newBootstrapResponseTestChangefeedConfig(cfID), + nil, + nil, + 100, + node.ID("current-maintainer"), + 2, + false, + nil, + ) + require.NoError(t, err) + t.Cleanup(func() { + manager.TryClose(false) + }) + + reportedRemoveDispatcherID := common.NewDispatcherID() + reportedRemoveReq := newBootstrapResponseTestScheduleRequest(cfID, reportedRemoveDispatcherID, 1) + reportedRemoveReq.ScheduleAction = heartbeatpb.ScheduleAction_Remove + reportedRemoveReq.OperatorType = heartbeatpb.OperatorType_O_Move + manager.GetCurrentOperatorMap().Store( + reportedRemoveDispatcherID, + dispatchermanager.NewSchedulerDispatcherRequest(node.ID("old-maintainer"), reportedRemoveReq), + ) + + missingRemoveDispatcherID := common.NewDispatcherID() + missingRemoveReq := newBootstrapResponseTestScheduleRequest(cfID, missingRemoveDispatcherID, 1) + missingRemoveReq.ScheduleAction = heartbeatpb.ScheduleAction_Remove + missingRemoveReq.OperatorType = heartbeatpb.OperatorType_O_Split + manager.GetCurrentOperatorMap().Store( + missingRemoveDispatcherID, + dispatchermanager.NewSchedulerDispatcherRequest(node.ID("old-maintainer"), missingRemoveReq), + ) + + response := &heartbeatpb.MaintainerBootstrapResponse{ + ChangefeedID: cfID.ToPB(), + Spans: []*heartbeatpb.BootstrapTableSpan{ + { + ID: reportedRemoveDispatcherID.ToPB(), + Span: &heartbeatpb.TableSpan{TableID: 1}, + ComponentStatus: heartbeatpb.ComponentState_Working, + Mode: common.DefaultMode, + }, + }, + } + retrieveOperatorsForBootstrapResponse(cfID.ToPB(), manager, response) + + require.Len(t, response.Operators, 1) + require.Equal(t, reportedRemoveDispatcherID, common.NewDispatcherIDFromPB(response.Operators[0].Config.DispatcherID)) + require.Equal(t, heartbeatpb.ScheduleAction_Remove, response.Operators[0].ScheduleAction) + require.Equal(t, heartbeatpb.OperatorType_O_Move, response.Operators[0].OperatorType) +} + +func TestHandleCloseRequestAcksStaleMaintainerEpoch(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test()) + appcontext.SetService(appcontext.MessageCenter, mc) + heartbeatCollector := dispatchermanager.NewHeartBeatCollector(node.ID("receiver")) + heartbeatCollector.Run(context.Background()) + appcontext.SetService(appcontext.HeartbeatCollector, heartbeatCollector) + t.Cleanup(heartbeatCollector.Close) + + cfID := common.NewChangeFeedIDWithName("cf", "default") + manager, err := dispatchermanager.NewDispatcherManager( + 0, + cfID, + newBootstrapResponseTestChangefeedConfig(cfID), + nil, + nil, + 100, + node.ID("current-maintainer"), + 2, + false, + nil, + ) + require.NoError(t, err) + t.Cleanup(func() { + manager.TryClose(false) + }) + + orchestrator := &DispatcherOrchestrator{ + mc: mc, + dispatcherManagers: map[common.ChangeFeedID]*dispatchermanager.DispatcherManager{cfID: manager}, + closedMaintainerEpochs: make(map[common.ChangeFeedID]uint64), + } + err = orchestrator.handleCloseRequest(node.ID("old-maintainer"), &heartbeatpb.MaintainerCloseRequest{ + ChangefeedID: cfID.ToPB(), + MaintainerEpoch: 1, + }) + require.NoError(t, err) + + responseMsg := <-mc.GetMessageChannel() + response := responseMsg.Message[0].(*heartbeatpb.MaintainerCloseResponse) + require.True(t, response.Success) + require.Equal(t, uint64(1), response.MaintainerEpoch) + require.Contains(t, orchestrator.dispatcherManagers, cfID) + require.NotContains(t, orchestrator.closedMaintainerEpochs, cfID) +} + +func TestHandleBootstrapRequestRejectsClosedOlderMaintainerEpoch(t *testing.T) { + cfID := common.NewChangeFeedIDWithName("cf", "default") + + orchestrator := &DispatcherOrchestrator{ + mc: messaging.NewMockMessageCenter(), + dispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager), + closedMaintainerEpochs: map[common.ChangeFeedID]uint64{ + cfID: 2, + }, + } + err := orchestrator.handleBootstrapRequest(node.ID("old-maintainer"), &heartbeatpb.MaintainerBootstrapRequest{ + ChangefeedID: cfID.ToPB(), + Config: []byte("invalid config"), + MaintainerEpoch: 1, + }) + require.NoError(t, err) + require.Empty(t, orchestrator.dispatcherManagers) +} + +func TestHandleCloseRequestDoesNotRecordEpochZeroTombstoneForCompatClose(t *testing.T) { + cfID := common.NewChangeFeedIDWithName("cf", "default") + + orchestrator := &DispatcherOrchestrator{ + mc: messaging.NewMockMessageCenter(), + dispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager), + closedMaintainerEpochs: make(map[common.ChangeFeedID]uint64), + } + + err := orchestrator.handleCloseRequest(node.ID("compat-maintainer"), &heartbeatpb.MaintainerCloseRequest{ + ChangefeedID: cfID.ToPB(), + Removed: false, + MaintainerEpoch: 0, + }) + require.NoError(t, err) + _, ok := orchestrator.closedMaintainerEpochs[cfID] + require.False(t, ok) +} + +func TestHandleCloseRequestRecordsEpochZeroTombstoneForRemovedChangefeed(t *testing.T) { + cfID := common.NewChangeFeedIDWithName("cf", "default") + configBytes, err := json.Marshal(newBootstrapResponseTestChangefeedConfig(cfID)) + require.NoError(t, err) + + orchestrator := &DispatcherOrchestrator{ + mc: messaging.NewMockMessageCenter(), + dispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager), + closedMaintainerEpochs: make(map[common.ChangeFeedID]uint64), + } + + err = orchestrator.handleCloseRequest(node.ID("compat-maintainer"), &heartbeatpb.MaintainerCloseRequest{ + ChangefeedID: cfID.ToPB(), + Removed: true, + MaintainerEpoch: 0, + }) + require.NoError(t, err) + closedEpoch, ok := orchestrator.closedMaintainerEpochs[cfID] + require.True(t, ok) + require.Zero(t, closedEpoch) + + err = orchestrator.handleBootstrapRequest(node.ID("compat-maintainer"), &heartbeatpb.MaintainerBootstrapRequest{ + ChangefeedID: cfID.ToPB(), + Config: configBytes, + MaintainerEpoch: 0, + }) + require.NoError(t, err) + require.Empty(t, orchestrator.dispatcherManagers) +} + +func newBootstrapResponseTestChangefeedConfig(cfID common.ChangeFeedID) *config.ChangefeedConfig { + replicaConfig := config.GetDefaultReplicaConfig() + return &config.ChangefeedConfig{ + ChangefeedID: cfID, + SinkURI: "blackhole://", + SinkConfig: replicaConfig.Sink, + Filter: replicaConfig.Filter, + } +} + +func newBootstrapResponseTestScheduleRequest( + cfID common.ChangeFeedID, + dispatcherID common.DispatcherID, + maintainerEpoch uint64, +) *heartbeatpb.ScheduleDispatcherRequest { + return &heartbeatpb.ScheduleDispatcherRequest{ + ChangefeedID: cfID.ToPB(), + Config: &heartbeatpb.DispatcherConfig{ + Span: &heartbeatpb.TableSpan{TableID: 1}, + StartTs: 100, + DispatcherID: dispatcherID.ToPB(), + Mode: common.DefaultMode, + }, + ScheduleAction: heartbeatpb.ScheduleAction_Create, + OperatorType: heartbeatpb.OperatorType_O_Add, + MaintainerEpoch: maintainerEpoch, + } +} diff --git a/tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh b/tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh index d7c67a6e11..a33e6df9e4 100755 --- a/tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh +++ b/tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh @@ -117,8 +117,10 @@ EOF # Merge the split table while the syncpoint is pending. merge_table_with_retry $merge_table_id "$CHANGEFEED_ID" 10 - ensure 60 "grep -q \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc0-1.log" - merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc0-1.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true) + # The merge task runs in the dispatcher manager that owns the source split spans. + # After node2 joins, those spans may be scheduled to any CDC node. + ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\"" + merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true) if [ -z "$merge_line" ]; then echo "failed to find merge startTs decision log line" exit 1