diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 7c04e43666..e7f98e2da5 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -771,6 +771,14 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { } middleware.SetChangefeedOperationTarget(c, cfInfo.ChangefeedID.Keyspace(), cfInfo.ChangefeedID.Name()) + // Resume validation must use persisted metadata because stopped changefeeds + // can be edited outside the coordinator process during legacy migration. + cfInfo, err = co.GetPersistedChangefeedInfo(ctx, cfInfo.ChangefeedID) + if err != nil { + _ = c.Error(err) + return + } + // If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not. newCheckpointTs := status.CheckpointTs overwriteCheckpointTs := false diff --git a/api/v2/changefeed_test.go b/api/v2/changefeed_test.go index 81660b9c26..389631954e 100644 --- a/api/v2/changefeed_test.go +++ b/api/v2/changefeed_test.go @@ -137,6 +137,13 @@ func (c *resumeNormalCoordinator) GetChangefeed(ctx context.Context, changefeedD }, nil } +func (c *resumeNormalCoordinator) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + return &config.ChangeFeedInfo{ + ChangefeedID: id, + State: config.StateNormal, + }, nil +} + func (c *resumeNormalCoordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { return nil } diff --git a/coordinator/changefeed/changefeed_db_backend.go b/coordinator/changefeed/changefeed_db_backend.go index 2f92850a87..1eacd755e3 100644 --- a/coordinator/changefeed/changefeed_db_backend.go +++ b/coordinator/changefeed/changefeed_db_backend.go @@ -24,6 +24,8 @@ import ( type Backend interface { // GetAllChangefeeds returns all changefeeds from the backend db, include stopped and failed changefeeds GetAllChangefeeds(ctx context.Context) (map[common.ChangeFeedID]*ChangefeedMetaWrapper, error) + // GetChangefeedInfo returns the latest persisted changefeed info from the backend db. + GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) // CreateChangefeed saves changefeed info and status to db CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // UpdateChangefeed updates changefeed info to db @@ -34,8 +36,8 @@ type Backend interface { DeleteChangefeed(ctx context.Context, id common.ChangeFeedID) error // SetChangefeedProgress persists the operation progress status to db for a changefeed SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error - // ResumeChangefeed persists the resumed status to db for a changefeed - ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error + // ResumeChangefeed persists the resumed status to db for a changefeed and returns the resumed info. + ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error) // UpdateChangefeedCheckpointTs persists the checkpointTs for changefeeds UpdateChangefeedCheckpointTs(ctx context.Context, checkpointTs map[common.ChangeFeedID]uint64) error } diff --git a/coordinator/changefeed/etcd_backend.go b/coordinator/changefeed/etcd_backend.go index 6d9de399e3..c6378a03a2 100644 --- a/coordinator/changefeed/etcd_backend.go +++ b/coordinator/changefeed/etcd_backend.go @@ -124,6 +124,21 @@ func (b *EtcdBackend) GetAllChangefeeds(ctx context.Context) (map[common.ChangeF return cfMap, nil } +// GetChangefeedInfo returns the latest persisted changefeed info from etcd. +func (b *EtcdBackend) GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + info, err := b.etcdClient.GetChangeFeedInfo(ctx, id.DisplayName) + if err != nil { + return nil, errors.Trace(err) + } + // Old metadata may not embed ChangefeedID in the value. Keep the backend + // lookup key as the source of truth so callers can safely use the returned + // info for validation and in-memory replacement. + if info.ChangefeedID.Name() == "" { + info.ChangefeedID = id + } + return info, nil +} + func (b *EtcdBackend) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo, ) error { @@ -248,17 +263,25 @@ func (b *EtcdBackend) DeleteChangefeed(ctx context.Context, return nil } +// ResumeChangefeed persists a resumed changefeed and returns the metadata used by the caller. func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64, -) error { - info, err := b.etcdClient.GetChangeFeedInfo(ctx, id.DisplayName) +) (*config.ChangeFeedInfo, error) { + info, err := b.GetChangefeedInfo(ctx, id) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) + } + // Legacy stopped changefeeds can contain sparse metadata that was completed + // during coordinator bootstrap. Complete it again before persisting the + // resumed state so backend-loaded metadata does not drop compatibility defaults. + if info.Config == nil { + info.Config = config.GetDefaultReplicaConfig() } + info.VerifyAndComplete() info.State = config.StateNormal newStr, err := info.Marshal() if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } infoKey := etcd.GetEtcdKeyChangeFeedInfo(b.etcdClient.GetClusterID(), id.DisplayName) opsThen := []clientv3.Op{ @@ -267,13 +290,13 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, if newCheckpointTs > 0 { status, _, err := b.etcdClient.GetChangeFeedStatus(ctx, id) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } status.CheckpointTs = newCheckpointTs status.Progress = config.ProgressNone jobValue, err := status.Marshal() if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } jobKey := etcd.GetEtcdKeyJob(b.etcdClient.GetClusterID(), id.DisplayName) opsThen = append(opsThen, clientv3.OpPut(jobKey, jobValue)) @@ -281,13 +304,13 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, putResp, err := b.etcdClient.GetEtcdClient().Txn(ctx, nil, opsThen, []clientv3.Op{}) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } if !putResp.Succeeded { err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("resume changefeed %s", info.ChangefeedID.Name())) - return errors.Trace(err) + return nil, errors.Trace(err) } - return nil + return info, nil } func (b *EtcdBackend) SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error { diff --git a/coordinator/changefeed/etcd_backend_test.go b/coordinator/changefeed/etcd_backend_test.go index 223c1db098..3cab212d6f 100644 --- a/coordinator/changefeed/etcd_backend_test.go +++ b/coordinator/changefeed/etcd_backend_test.go @@ -193,6 +193,11 @@ func TestDeleteChangefeed(t *testing.T) { } func TestResumeChangefeed(t *testing.T) { + // Scenario: resuming a stopped changefeed persists the normal state and + // returns the metadata that was actually loaded from etcd. + // Steps: + // 1) Load legacy changefeed info without an embedded ChangefeedID. + // 2) Resume the changefeed and assert the returned info is normalized. ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -210,8 +215,61 @@ func TestResumeChangefeed(t *testing.T) { cdcClient.EXPECT().GetChangeFeedStatus(gomock.Any(), changefeedID).Return(status, int64(0), nil).Times(1) etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.TxnResponse{Succeeded: true}, nil).Times(1) - err := backend.ResumeChangefeed(context.Background(), changefeedID, 200) + resumedInfo, err := backend.ResumeChangefeed(context.Background(), changefeedID, 200) require.Nil(t, err) + require.Equal(t, config.StateNormal, resumedInfo.State) + require.Equal(t, changefeedID, resumedInfo.ChangefeedID) +} + +func TestResumeChangefeedCompletesLegacySchedulerDefaults(t *testing.T) { + // Scenario: an old owner persisted a stopped changefeed with only explicit scheduler fields. + // Steps: resume that sparse metadata, inspect the etcd put payload, and verify resume persists + // compatibility defaults such as RegionCountPerSpan before returning the resumed info. + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cdcClient := etcd.NewMockCDCEtcdClient(ctrl) + etcdClient := etcd.NewMockClient(ctrl) + cdcClient.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() + cdcClient.EXPECT().GetClusterID().Return("test-cluster-id").AnyTimes() + backend := NewEtcdBackend(cdcClient) + + changefeedID := common.NewChangeFeedIDWithName("test-scheduler-defaults", common.DefaultKeyspaceName) + enableTableAcrossNodes := false + regionThreshold := 20 + writeKeyThreshold := 10485760 + info := &config.ChangeFeedInfo{ + ChangefeedID: changefeedID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://127.0.0.1:3306", + } + info.Config.Scheduler = &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: &enableTableAcrossNodes, + RegionThreshold: ®ionThreshold, + WriteKeyThreshold: &writeKeyThreshold, + } + + cdcClient.EXPECT().GetChangeFeedInfo(gomock.Any(), changefeedID.DisplayName).Return(info, nil).Times(1) + etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), NewFuncMatcher(func(i interface{}) bool { + ops := i.([]clientv3.Op) + require.Len(t, ops, 1) + require.True(t, ops[0].IsPut()) + + persistedInfo := &config.ChangeFeedInfo{} + require.NoError(t, persistedInfo.Unmarshal(ops[0].ValueBytes())) + require.Equal(t, config.StateNormal, persistedInfo.State) + require.NotNil(t, persistedInfo.Config) + require.NotNil(t, persistedInfo.Config.Scheduler) + require.NotNil(t, persistedInfo.Config.Scheduler.RegionCountPerSpan) + require.Greater(t, *persistedInfo.Config.Scheduler.RegionCountPerSpan, 0) + return true + }), gomock.Any()).Return(&clientv3.TxnResponse{Succeeded: true}, nil).Times(1) + + resumedInfo, err := backend.ResumeChangefeed(context.Background(), changefeedID, 0) + require.NoError(t, err) + require.NotNil(t, resumedInfo.Config.Scheduler.RegionCountPerSpan) + require.Greater(t, *resumedInfo.Config.Scheduler.RegionCountPerSpan, 0) } func TestSetChangefeedProgress(t *testing.T) { diff --git a/coordinator/changefeed/mock/changefeed_db_backend.go b/coordinator/changefeed/mock/changefeed_db_backend.go index 80671c5c39..8db908a7b4 100644 --- a/coordinator/changefeed/mock/changefeed_db_backend.go +++ b/coordinator/changefeed/mock/changefeed_db_backend.go @@ -80,6 +80,21 @@ func (mr *MockBackendMockRecorder) GetAllChangefeeds(ctx interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllChangefeeds", reflect.TypeOf((*MockBackend)(nil).GetAllChangefeeds), ctx) } +// GetChangefeedInfo mocks base method. +func (m *MockBackend) GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetChangefeedInfo", ctx, id) + ret0, _ := ret[0].(*config.ChangeFeedInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetChangefeedInfo indicates an expected call of GetChangefeedInfo. +func (mr *MockBackendMockRecorder) GetChangefeedInfo(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChangefeedInfo", reflect.TypeOf((*MockBackend)(nil).GetChangefeedInfo), ctx, id) +} + // PauseChangefeed mocks base method. func (m *MockBackend) PauseChangefeed(ctx context.Context, id common.ChangeFeedID) error { m.ctrl.T.Helper() @@ -95,11 +110,12 @@ func (mr *MockBackendMockRecorder) PauseChangefeed(ctx, id interface{}) *gomock. } // ResumeChangefeed mocks base method. -func (m *MockBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error { +func (m *MockBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ResumeChangefeed", ctx, id, newCheckpointTs) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*config.ChangeFeedInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ResumeChangefeed indicates an expected call of ResumeChangefeed. diff --git a/coordinator/controller.go b/coordinator/controller.go index 228b37a822..16ca6bf66a 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -899,11 +899,17 @@ func (c *Controller) ResumeChangefeed( return err } - if err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs); err != nil { + resumedInfo, err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs) + if err != nil { return err } + if resumedInfo == nil { + return errors.New("resumed changefeed info is nil") + } - clone, err := cf.GetInfo().Clone() + // Use the backend-returned info so direct metadata edits made while the + // changefeed was stopped are not overwritten by the stale in-memory copy. + clone, err := resumedInfo.Clone() if err != nil { return err } @@ -959,6 +965,10 @@ func (c *Controller) ListChangefeeds(_ context.Context, keyspace string) ([]*con return infos, statuses, nil } +// GetChangefeed returns a copy of the changefeed info and the current status. +// API callers mutate the returned info when validating update requests, so the +// copy prevents those writes from racing with coordinator goroutines that read +// the in-memory changefeed state. func (c *Controller) GetChangefeed( _ context.Context, changefeedDisplayName common.ChangeFeedDisplayName, @@ -975,6 +985,11 @@ func (c *Controller) GetChangefeed( return nil, nil, errors.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedDisplayName.Name) } + info, err := cf.GetInfo().Clone() + if err != nil { + return nil, nil, errors.Trace(err) + } + maintainerID := cf.GetNodeID() nodeInfo := c.nodeManager.GetNodeInfo(maintainerID) maintainerAddr := "" @@ -983,7 +998,19 @@ func (c *Controller) GetChangefeed( } status := &config.ChangeFeedStatus{CheckpointTs: cf.GetStatus().CheckpointTs, LastSyncedTs: cf.GetStatus().LastSyncedTs, LogCoordinatorResolvedTs: cf.GetLogCoordinatorResolvedTs()} status.SetMaintainerAddr(maintainerAddr) - return cf.GetInfo(), status, nil + return info, status, nil +} + +// GetPersistedChangefeedInfo returns the latest changefeed info persisted in the backend. +// +// Use this for resume-time validation because stopped changefeed metadata can +// be changed outside the coordinator process, for example during metadata +// migration or by legacy tooling. GetChangefeed intentionally returns the +// coordinator's in-memory copy. +func (c *Controller) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + c.apiLock.RLock() + defer c.apiLock.RUnlock() + return c.backend.GetChangefeedInfo(ctx, id) } // getChangefeed returns the changefeed by id, return nil if not found diff --git a/coordinator/controller_test.go b/coordinator/controller_test.go index bbceabef4f..648d1f6e60 100644 --- a/coordinator/controller_test.go +++ b/coordinator/controller_test.go @@ -15,6 +15,7 @@ package coordinator import ( "context" + "sync" "testing" "time" @@ -257,6 +258,9 @@ func TestMaintainerHeartbeatAdmissionRequiresInitializedSender(t *testing.T) { } func TestResumeChangefeed(t *testing.T) { + // Scenario: resume should propagate backend failures and update in-memory state after success. + // Steps: try a missing changefeed, simulate a backend resume failure, then return a persisted + // changefeed info from the backend and verify the stopped changefeed becomes normal. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -276,11 +280,11 @@ func TestResumeChangefeed(t *testing.T) { // no changefeed require.NotNil(t, controller.ResumeChangefeed(context.Background(), common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName), 12, true)) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("failed")).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).Times(1) require.NotNil(t, controller.ResumeChangefeed(context.Background(), cfID, 12, true)) require.Equal(t, config.StateFailed, changefeedDB.GetByID(cfID).GetInfo().State) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.Nil(t, controller.ResumeChangefeed(context.Background(), cfID, 12, false)) require.Equal(t, config.StateNormal, changefeedDB.GetByID(cfID).GetInfo().State) } @@ -313,6 +317,9 @@ func TestResumeChangefeedNormalState(t *testing.T) { } func TestResumeChangefeedOverwriteUpdatesLastSavedCheckpointTs(t *testing.T) { + // Scenario: overwrite resume should reset the persisted checkpoint baseline. + // Steps: resume a stopped changefeed with overwriteCheckpointTs and verify the in-memory + // last saved checkpoint is updated to the requested checkpoint. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -331,12 +338,15 @@ func TestResumeChangefeedOverwriteUpdatesLastSavedCheckpointTs(t *testing.T) { changefeedDB.AddStoppedChangefeed(cf) newCheckpointTs := uint64(120) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.Nil(t, controller.ResumeChangefeed(context.Background(), cfID, newCheckpointTs, true)) require.Equal(t, newCheckpointTs, changefeedDB.GetByID(cfID).GetLastSavedCheckPointTs()) } func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { + // Scenario: a stale maintainer error kept in memory must not block manual resume. + // Steps: install an errored in-memory status, resume from the backend, and verify + // the changefeed is scheduled with a clean status. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -365,7 +375,7 @@ func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { _, _, err := cf.ForceUpdateStatus(stale) require.NotNil(t, err) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.NoError(t, controller.ResumeChangefeed(context.Background(), cfID, 100, false)) // The changefeed should be enqueued for scheduling and should not be blocked by the stale error. @@ -379,6 +389,38 @@ func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { require.True(t, cf.ShouldRun()) } +func TestResumeChangefeedUsesBackendReturnedInfo(t *testing.T) { + // Scenario: stopped changefeed metadata can be edited directly in the backend while + // the coordinator still has an older in-memory copy. Steps: resume the changefeed + // with backend-returned info whose sink URI differs from memory, then verify the + // in-memory changefeed uses the backend value instead of overwriting it. + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + changefeedDB := changefeed.NewChangefeedDB(1216) + controller := &Controller{ + backend: backend, + changefeedDB: changefeedDB, + } + cfID := common.NewChangeFeedIDWithName("test-backend-info", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://downstream:3306", + }, 100, true) + changefeedDB.AddStoppedChangefeed(cf) + + backendInfo, err := cf.GetInfo().Clone() + require.NoError(t, err) + backendInfo.SinkURI = "mysql://upstream:4000" + backendInfo.State = config.StateNormal + backend.EXPECT().ResumeChangefeed(gomock.Any(), cfID, uint64(100)).Return(backendInfo, nil).Times(1) + + require.NoError(t, controller.ResumeChangefeed(context.Background(), cfID, 100, false)) + require.Equal(t, "mysql://upstream:4000", changefeedDB.GetByID(cfID).GetInfo().SinkURI) + require.Equal(t, config.StateNormal, changefeedDB.GetByID(cfID).GetInfo().State) +} + func TestPauseChangefeed(t *testing.T) { ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) @@ -467,6 +509,9 @@ func TestUpdateChangefeed(t *testing.T) { } func TestGetChangefeed(t *testing.T) { + // Scenario: API-facing GetChangefeed must not expose coordinator-owned mutable info. + // Steps: fetch a changefeed, mutate the returned copy, and verify the in-memory + // changefeed state is unchanged while status fields are still reported. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -491,10 +536,67 @@ func TestGetChangefeed(t *testing.T) { require.Equal(t, ret.State, config.StateStopped) require.Equal(t, uint64(1), status.CheckpointTs) + ret.SinkURI = "kafka://127.0.0.1:9092" + ret.Config = nil + storedInfo := changefeedDB.GetByID(cfID).GetInfo() + require.Equal(t, "mysql://127.0.0.1:3306", storedInfo.SinkURI) + require.NotNil(t, storedInfo.Config) + _, _, err = controller.GetChangefeed(context.Background(), common.NewChangeFeedDisplayName("test1", "default")) require.True(t, errors.ErrChangeFeedNotExists.Equal(err)) } +func TestGetChangefeedReturnedInfoMutationDoesNotRaceWithStoredInfo(t *testing.T) { + // Scenario: API handlers may mutate the info returned from GetChangefeed while coordinator + // goroutines read the stored info. Steps: mutate the returned copy and read the stored + // info concurrently, then verify the stored fields remain unchanged. + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + changefeedDB := changefeed.NewChangefeedDB(1216) + nodeManager := watcher.NewNodeManager(nil, nil) + controller := &Controller{ + backend: backend, + changefeedDB: changefeedDB, + nodeManager: nodeManager, + } + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://127.0.0.1:3306", + }, 1, true) + changefeedDB.AddStoppedChangefeed(cf) + + ret, _, err := controller.GetChangefeed(context.Background(), cfID.DisplayName) + require.NoError(t, err) + + var ( + wg sync.WaitGroup + storedChanged bool + ) + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + ret.SinkURI = "kafka://127.0.0.1:9092" + ret.TargetTs = uint64(i + 1) + } + }() + go func() { + defer wg.Done() + storedInfo := changefeedDB.GetByID(cfID).GetInfo() + for i := 0; i < 1000; i++ { + if storedInfo.SinkURI != "mysql://127.0.0.1:3306" || storedInfo.TargetTs != 0 { + storedChanged = true + return + } + } + }() + wg.Wait() + require.False(t, storedChanged) +} + func TestRemoveChangefeed(t *testing.T) { ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index bf2967aa33..be777947ec 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -425,6 +425,10 @@ func (c *coordinator) GetChangefeed(ctx context.Context, changefeedDisplayName c return c.controller.GetChangefeed(ctx, changefeedDisplayName) } +func (c *coordinator) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + return c.controller.GetPersistedChangefeedInfo(ctx, id) +} + func (c *coordinator) DrainNode(ctx context.Context, target node.ID) (int, error) { return c.controller.DrainNode(ctx, target) } diff --git a/pkg/server/coordinator.go b/pkg/server/coordinator.go index f2ac7e73e7..cc66d5716e 100644 --- a/pkg/server/coordinator.go +++ b/pkg/server/coordinator.go @@ -35,6 +35,8 @@ type Coordinator interface { ListChangefeeds(ctx context.Context, keyspace string) ([]*config.ChangeFeedInfo, []*config.ChangeFeedStatus, error) // GetChangefeed returns a changefeed GetChangefeed(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) (*config.ChangeFeedInfo, *config.ChangeFeedStatus, error) + // GetPersistedChangefeedInfo returns the latest backend-persisted changefeed info. + GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) // CreateChangefeed creates a new changefeed CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // RemoveChangefeed gets a changefeed diff --git a/server/module_election.go b/server/module_election.go index 6b0df0e266..f617cc7689 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -38,19 +38,25 @@ type elector struct { // election used for coordinator election *concurrency.Election // election used for log coordinator - logElection *concurrency.Election - svr *server + logElection *concurrency.Election + coordinatorMaxTaskConcurrency int + coordinatorCheckBalanceInterval time.Duration + svr *server } -func NewElector(server *server) common.SubModule { +// NewElector creates the coordinator elector with scheduler settings captured from server startup config. +func NewElector(server *server, schedulerCfg *config.SchedulerConfig) common.SubModule { election := concurrency.NewElection(server.session, etcd.CaptureOwnerKey(server.EtcdClient.GetClusterID())) logElection := concurrency.NewElection(server.session, etcd.LogCoordinatorKey(server.EtcdClient.GetClusterID())) + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings(schedulerCfg) return &elector{ - election: election, - logElection: logElection, - svr: server, + election: election, + logElection: logElection, + coordinatorMaxTaskConcurrency: maxTaskConcurrency, + coordinatorCheckBalanceInterval: checkBalanceInterval, + svr: server, } } @@ -140,8 +146,8 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { changefeed.NewEtcdBackend(e.svr.EtcdClient), e.svr.EtcdClient.GetGCServiceID(), coordinatorVersion, - 10000, - time.Minute, + e.coordinatorMaxTaskConcurrency, + e.coordinatorCheckBalanceInterval, ) e.svr.setCoordinator(co) err = co.Run(ctx) @@ -191,6 +197,10 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { } } +func coordinatorSchedulerSettings(schedulerCfg *config.SchedulerConfig) (int, time.Duration) { + return schedulerCfg.MaxTaskConcurrency, time.Duration(schedulerCfg.CheckBalanceInterval) +} + func (e *elector) campaignLogCoordinator(ctx context.Context) error { // Limit the frequency of elections to avoid putting too much pressure on the etcd server rl := rate.NewLimiter(rate.Every(time.Second), 1 /* burst */) diff --git a/server/module_election_test.go b/server/module_election_test.go new file mode 100644 index 0000000000..26ff16684a --- /dev/null +++ b/server/module_election_test.go @@ -0,0 +1,45 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "testing" + "time" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestCoordinatorSchedulerSettingsUsesCapturedConfig(t *testing.T) { + // Scenario: the coordinator should honor the scheduler config captured during server startup. + // Steps: install a different global config, read settings from an explicit scheduler config, + // and verify both the concurrency limit and balance interval come from the captured config. + original := config.GetGlobalServerConfig() + t.Cleanup(func() { + config.StoreGlobalServerConfig(original) + }) + + globalCfg := config.GetDefaultServerConfig() + globalCfg.Debug.Scheduler.MaxTaskConcurrency = 9 + globalCfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(44 * time.Second) + config.StoreGlobalServerConfig(globalCfg) + + cfg := config.GetDefaultServerConfig() + cfg.Debug.Scheduler.MaxTaskConcurrency = 3 + cfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(22 * time.Second) + + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings(cfg.Debug.Scheduler) + require.Equal(t, 3, maxTaskConcurrency) + require.Equal(t, 22*time.Second, checkBalanceInterval) +} diff --git a/server/server.go b/server/server.go index f5c9b5212b..a2357c5bd3 100644 --- a/server/server.go +++ b/server/server.go @@ -216,7 +216,7 @@ func (c *server) initialize(ctx context.Context) error { c.nodeModules = []common.SubModule{ nodeManager, - NewElector(c), + NewElector(c, conf.Debug.Scheduler), } c.subModules = []common.SubModule{