From 641a7ec8a348401084857882ff032f8d7a67b80c Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Thu, 16 Apr 2026 00:20:53 +0800 Subject: [PATCH 1/4] server: honor scheduler concurrency for coordinator Use the validated server scheduler config when constructing the coordinator so maintainer scheduling concurrency respects max-task-concurrency instead of a hard-coded value. --- server/module_election.go | 10 +++++++-- server/module_election_test.go | 41 ++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 server/module_election_test.go diff --git a/server/module_election.go b/server/module_election.go index 6b0df0e266..06d0776dc5 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -134,14 +134,15 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { log.Info("campaign coordinator successfully", zap.String("nodeID", nodeID), zap.Int64("coordinatorVersion", coordinatorVersion)) + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings() co := coordinator.New( e.svr.info, e.svr.pdClient, changefeed.NewEtcdBackend(e.svr.EtcdClient), e.svr.EtcdClient.GetGCServiceID(), coordinatorVersion, - 10000, - time.Minute, + maxTaskConcurrency, + checkBalanceInterval, ) e.svr.setCoordinator(co) err = co.Run(ctx) @@ -191,6 +192,11 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { } } +func coordinatorSchedulerSettings() (int, time.Duration) { + schedulerCfg := config.GetGlobalServerConfig().Debug.Scheduler + return schedulerCfg.MaxTaskConcurrency, time.Duration(schedulerCfg.CheckBalanceInterval) +} + func (e *elector) campaignLogCoordinator(ctx context.Context) error { // Limit the frequency of elections to avoid putting too much pressure on the etcd server rl := rate.NewLimiter(rate.Every(time.Second), 1 /* burst */) diff --git a/server/module_election_test.go b/server/module_election_test.go new file mode 100644 index 0000000000..29026943ee --- /dev/null +++ b/server/module_election_test.go @@ -0,0 +1,41 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "testing" + "time" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestCoordinatorSchedulerSettingsUsesGlobalConfig(t *testing.T) { + // Scenario: the coordinator should honor the validated server scheduler config. + // Steps: install a temporary global config, read the coordinator settings, and + // verify both the concurrency limit and balance interval match the config. + original := config.GetGlobalServerConfig() + t.Cleanup(func() { + config.StoreGlobalServerConfig(original) + }) + + cfg := config.GetDefaultServerConfig() + cfg.Debug.Scheduler.MaxTaskConcurrency = 3 + cfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(22 * time.Second) + config.StoreGlobalServerConfig(cfg) + + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings() + require.Equal(t, 3, maxTaskConcurrency) + require.Equal(t, 22*time.Second, checkBalanceInterval) +} From 78116317ec961e8d7a61f0e5a1ef7cf79cb6ba48 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Tue, 9 Jun 2026 11:32:16 +0800 Subject: [PATCH 2/4] coordinator,server: fix changefeed update data race --- coordinator/controller.go | 11 +++++- coordinator/controller_test.go | 61 ++++++++++++++++++++++++++++++++++ server/module_election.go | 26 +++++++++------ server/module_election_test.go | 16 +++++---- server/server.go | 2 +- 5 files changed, 97 insertions(+), 19 deletions(-) diff --git a/coordinator/controller.go b/coordinator/controller.go index 1071d653b6..d4b92c6187 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -853,6 +853,10 @@ func (c *Controller) ListChangefeeds(_ context.Context, keyspace string) ([]*con return infos, statuses, nil } +// GetChangefeed returns a copy of the changefeed info and the current status. +// API callers mutate the returned info when validating update requests, so the +// copy prevents those writes from racing with coordinator goroutines that read +// the in-memory changefeed state. func (c *Controller) GetChangefeed( _ context.Context, changefeedDisplayName common.ChangeFeedDisplayName, @@ -869,6 +873,11 @@ func (c *Controller) GetChangefeed( return nil, nil, errors.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedDisplayName.Name) } + info, err := cf.GetInfo().Clone() + if err != nil { + return nil, nil, errors.Trace(err) + } + maintainerID := cf.GetNodeID() nodeInfo := c.nodeManager.GetNodeInfo(maintainerID) maintainerAddr := "" @@ -877,7 +886,7 @@ func (c *Controller) GetChangefeed( } status := &config.ChangeFeedStatus{CheckpointTs: cf.GetStatus().CheckpointTs, LastSyncedTs: cf.GetStatus().LastSyncedTs, LogCoordinatorResolvedTs: cf.GetLogCoordinatorResolvedTs()} status.SetMaintainerAddr(maintainerAddr) - return cf.GetInfo(), status, nil + return info, status, nil } // getChangefeed returns the changefeed by id, return nil if not found diff --git a/coordinator/controller_test.go b/coordinator/controller_test.go index 6702501cd5..f7dc1f96a1 100644 --- a/coordinator/controller_test.go +++ b/coordinator/controller_test.go @@ -15,6 +15,7 @@ package coordinator import ( "context" + "sync" "testing" "time" @@ -329,6 +330,9 @@ func TestUpdateChangefeed(t *testing.T) { } func TestGetChangefeed(t *testing.T) { + // Scenario: API-facing GetChangefeed must not expose coordinator-owned mutable info. + // Steps: fetch a changefeed, mutate the returned copy, and verify the in-memory + // changefeed state is unchanged while status fields are still reported. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -353,10 +357,67 @@ func TestGetChangefeed(t *testing.T) { require.Equal(t, ret.State, config.StateStopped) require.Equal(t, uint64(1), status.CheckpointTs) + ret.SinkURI = "kafka://127.0.0.1:9092" + ret.Config = nil + storedInfo := changefeedDB.GetByID(cfID).GetInfo() + require.Equal(t, "mysql://127.0.0.1:3306", storedInfo.SinkURI) + require.NotNil(t, storedInfo.Config) + _, _, err = controller.GetChangefeed(context.Background(), common.NewChangeFeedDisplayName("test1", "default")) require.True(t, errors.ErrChangeFeedNotExists.Equal(err)) } +func TestGetChangefeedReturnedInfoMutationDoesNotRaceWithStoredInfo(t *testing.T) { + // Scenario: API handlers may mutate the info returned from GetChangefeed while coordinator + // goroutines read the stored info. Steps: mutate the returned copy and read the stored + // info concurrently, then verify the stored fields remain unchanged. + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + changefeedDB := changefeed.NewChangefeedDB(1216) + nodeManager := watcher.NewNodeManager(nil, nil) + controller := &Controller{ + backend: backend, + changefeedDB: changefeedDB, + nodeManager: nodeManager, + } + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://127.0.0.1:3306", + }, 1, true) + changefeedDB.AddStoppedChangefeed(cf) + + ret, _, err := controller.GetChangefeed(context.Background(), cfID.DisplayName) + require.NoError(t, err) + + var ( + wg sync.WaitGroup + storedChanged bool + ) + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + ret.SinkURI = "kafka://127.0.0.1:9092" + ret.TargetTs = uint64(i + 1) + } + }() + go func() { + defer wg.Done() + storedInfo := changefeedDB.GetByID(cfID).GetInfo() + for i := 0; i < 1000; i++ { + if storedInfo.SinkURI != "mysql://127.0.0.1:3306" || storedInfo.TargetTs != 0 { + storedChanged = true + return + } + } + }() + wg.Wait() + require.False(t, storedChanged) +} + func TestRemoveChangefeed(t *testing.T) { ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) diff --git a/server/module_election.go b/server/module_election.go index 06d0776dc5..f617cc7689 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -38,19 +38,25 @@ type elector struct { // election used for coordinator election *concurrency.Election // election used for log coordinator - logElection *concurrency.Election - svr *server + logElection *concurrency.Election + coordinatorMaxTaskConcurrency int + coordinatorCheckBalanceInterval time.Duration + svr *server } -func NewElector(server *server) common.SubModule { +// NewElector creates the coordinator elector with scheduler settings captured from server startup config. +func NewElector(server *server, schedulerCfg *config.SchedulerConfig) common.SubModule { election := concurrency.NewElection(server.session, etcd.CaptureOwnerKey(server.EtcdClient.GetClusterID())) logElection := concurrency.NewElection(server.session, etcd.LogCoordinatorKey(server.EtcdClient.GetClusterID())) + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings(schedulerCfg) return &elector{ - election: election, - logElection: logElection, - svr: server, + election: election, + logElection: logElection, + coordinatorMaxTaskConcurrency: maxTaskConcurrency, + coordinatorCheckBalanceInterval: checkBalanceInterval, + svr: server, } } @@ -134,15 +140,14 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { log.Info("campaign coordinator successfully", zap.String("nodeID", nodeID), zap.Int64("coordinatorVersion", coordinatorVersion)) - maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings() co := coordinator.New( e.svr.info, e.svr.pdClient, changefeed.NewEtcdBackend(e.svr.EtcdClient), e.svr.EtcdClient.GetGCServiceID(), coordinatorVersion, - maxTaskConcurrency, - checkBalanceInterval, + e.coordinatorMaxTaskConcurrency, + e.coordinatorCheckBalanceInterval, ) e.svr.setCoordinator(co) err = co.Run(ctx) @@ -192,8 +197,7 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { } } -func coordinatorSchedulerSettings() (int, time.Duration) { - schedulerCfg := config.GetGlobalServerConfig().Debug.Scheduler +func coordinatorSchedulerSettings(schedulerCfg *config.SchedulerConfig) (int, time.Duration) { return schedulerCfg.MaxTaskConcurrency, time.Duration(schedulerCfg.CheckBalanceInterval) } diff --git a/server/module_election_test.go b/server/module_election_test.go index 29026943ee..26ff16684a 100644 --- a/server/module_election_test.go +++ b/server/module_election_test.go @@ -21,21 +21,25 @@ import ( "github.com/stretchr/testify/require" ) -func TestCoordinatorSchedulerSettingsUsesGlobalConfig(t *testing.T) { - // Scenario: the coordinator should honor the validated server scheduler config. - // Steps: install a temporary global config, read the coordinator settings, and - // verify both the concurrency limit and balance interval match the config. +func TestCoordinatorSchedulerSettingsUsesCapturedConfig(t *testing.T) { + // Scenario: the coordinator should honor the scheduler config captured during server startup. + // Steps: install a different global config, read settings from an explicit scheduler config, + // and verify both the concurrency limit and balance interval come from the captured config. original := config.GetGlobalServerConfig() t.Cleanup(func() { config.StoreGlobalServerConfig(original) }) + globalCfg := config.GetDefaultServerConfig() + globalCfg.Debug.Scheduler.MaxTaskConcurrency = 9 + globalCfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(44 * time.Second) + config.StoreGlobalServerConfig(globalCfg) + cfg := config.GetDefaultServerConfig() cfg.Debug.Scheduler.MaxTaskConcurrency = 3 cfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(22 * time.Second) - config.StoreGlobalServerConfig(cfg) - maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings() + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings(cfg.Debug.Scheduler) require.Equal(t, 3, maxTaskConcurrency) require.Equal(t, 22*time.Second, checkBalanceInterval) } diff --git a/server/server.go b/server/server.go index 421f4f4762..569c85e8da 100644 --- a/server/server.go +++ b/server/server.go @@ -214,7 +214,7 @@ func (c *server) initialize(ctx context.Context) error { c.nodeModules = []common.SubModule{ nodeManager, - NewElector(c), + NewElector(c, conf.Debug.Scheduler), } c.subModules = []common.SubModule{ From a6b7b833dab77230fab7a334ace2dd3f06c39f30 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Wed, 10 Jun 2026 10:22:11 +0800 Subject: [PATCH 3/4] coordinator,api: validate resume with persisted metadata --- api/v2/changefeed.go | 7 +++ .../changefeed/changefeed_db_backend.go | 6 ++- coordinator/changefeed/etcd_backend.go | 33 +++++++++---- coordinator/changefeed/etcd_backend_test.go | 9 +++- .../changefeed/mock/changefeed_db_backend.go | 22 +++++++-- coordinator/controller.go | 22 ++++++++- coordinator/controller_test.go | 49 +++++++++++++++++-- coordinator/coordinator.go | 4 ++ pkg/server/coordinator.go | 2 + 9 files changed, 133 insertions(+), 21 deletions(-) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 29b7c8bf2c..5d459cd8a6 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -745,6 +745,13 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { _ = c.Error(err) return } + // Resume validation must use persisted metadata because stopped changefeeds + // can be edited outside the coordinator process during legacy migration. + cfInfo, err = co.GetPersistedChangefeedInfo(ctx, cfInfo.ChangefeedID) + if err != nil { + _ = c.Error(err) + return + } // If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not. newCheckpointTs := status.CheckpointTs diff --git a/coordinator/changefeed/changefeed_db_backend.go b/coordinator/changefeed/changefeed_db_backend.go index 2f92850a87..1eacd755e3 100644 --- a/coordinator/changefeed/changefeed_db_backend.go +++ b/coordinator/changefeed/changefeed_db_backend.go @@ -24,6 +24,8 @@ import ( type Backend interface { // GetAllChangefeeds returns all changefeeds from the backend db, include stopped and failed changefeeds GetAllChangefeeds(ctx context.Context) (map[common.ChangeFeedID]*ChangefeedMetaWrapper, error) + // GetChangefeedInfo returns the latest persisted changefeed info from the backend db. + GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) // CreateChangefeed saves changefeed info and status to db CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // UpdateChangefeed updates changefeed info to db @@ -34,8 +36,8 @@ type Backend interface { DeleteChangefeed(ctx context.Context, id common.ChangeFeedID) error // SetChangefeedProgress persists the operation progress status to db for a changefeed SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error - // ResumeChangefeed persists the resumed status to db for a changefeed - ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error + // ResumeChangefeed persists the resumed status to db for a changefeed and returns the resumed info. + ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error) // UpdateChangefeedCheckpointTs persists the checkpointTs for changefeeds UpdateChangefeedCheckpointTs(ctx context.Context, checkpointTs map[common.ChangeFeedID]uint64) error } diff --git a/coordinator/changefeed/etcd_backend.go b/coordinator/changefeed/etcd_backend.go index 6d9de399e3..eca17a9024 100644 --- a/coordinator/changefeed/etcd_backend.go +++ b/coordinator/changefeed/etcd_backend.go @@ -124,6 +124,21 @@ func (b *EtcdBackend) GetAllChangefeeds(ctx context.Context) (map[common.ChangeF return cfMap, nil } +// GetChangefeedInfo returns the latest persisted changefeed info from etcd. +func (b *EtcdBackend) GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + info, err := b.etcdClient.GetChangeFeedInfo(ctx, id.DisplayName) + if err != nil { + return nil, errors.Trace(err) + } + // Old metadata may not embed ChangefeedID in the value. Keep the backend + // lookup key as the source of truth so callers can safely use the returned + // info for validation and in-memory replacement. + if info.ChangefeedID.Name() == "" { + info.ChangefeedID = id + } + return info, nil +} + func (b *EtcdBackend) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo, ) error { @@ -250,15 +265,15 @@ func (b *EtcdBackend) DeleteChangefeed(ctx context.Context, func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64, -) error { - info, err := b.etcdClient.GetChangeFeedInfo(ctx, id.DisplayName) +) (*config.ChangeFeedInfo, error) { + info, err := b.GetChangefeedInfo(ctx, id) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } info.State = config.StateNormal newStr, err := info.Marshal() if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } infoKey := etcd.GetEtcdKeyChangeFeedInfo(b.etcdClient.GetClusterID(), id.DisplayName) opsThen := []clientv3.Op{ @@ -267,13 +282,13 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, if newCheckpointTs > 0 { status, _, err := b.etcdClient.GetChangeFeedStatus(ctx, id) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } status.CheckpointTs = newCheckpointTs status.Progress = config.ProgressNone jobValue, err := status.Marshal() if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } jobKey := etcd.GetEtcdKeyJob(b.etcdClient.GetClusterID(), id.DisplayName) opsThen = append(opsThen, clientv3.OpPut(jobKey, jobValue)) @@ -281,13 +296,13 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, putResp, err := b.etcdClient.GetEtcdClient().Txn(ctx, nil, opsThen, []clientv3.Op{}) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } if !putResp.Succeeded { err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("resume changefeed %s", info.ChangefeedID.Name())) - return errors.Trace(err) + return nil, errors.Trace(err) } - return nil + return info, nil } func (b *EtcdBackend) SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error { diff --git a/coordinator/changefeed/etcd_backend_test.go b/coordinator/changefeed/etcd_backend_test.go index a801fbcaec..f2e6ab47a2 100644 --- a/coordinator/changefeed/etcd_backend_test.go +++ b/coordinator/changefeed/etcd_backend_test.go @@ -193,6 +193,11 @@ func TestDeleteChangefeed(t *testing.T) { } func TestResumeChangefeed(t *testing.T) { + // Scenario: resuming a stopped changefeed persists the normal state and + // returns the metadata that was actually loaded from etcd. + // Steps: + // 1) Load legacy changefeed info without an embedded ChangefeedID. + // 2) Resume the changefeed and assert the returned info is normalized. ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -210,8 +215,10 @@ func TestResumeChangefeed(t *testing.T) { cdcClient.EXPECT().GetChangeFeedStatus(gomock.Any(), changefeedID).Return(status, int64(0), nil).Times(1) etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.TxnResponse{Succeeded: true}, nil).Times(1) - err := backend.ResumeChangefeed(context.Background(), changefeedID, 200) + resumedInfo, err := backend.ResumeChangefeed(context.Background(), changefeedID, 200) require.Nil(t, err) + require.Equal(t, config.StateNormal, resumedInfo.State) + require.Equal(t, changefeedID, resumedInfo.ChangefeedID) } func TestSetChangefeedProgress(t *testing.T) { diff --git a/coordinator/changefeed/mock/changefeed_db_backend.go b/coordinator/changefeed/mock/changefeed_db_backend.go index 80671c5c39..8db908a7b4 100644 --- a/coordinator/changefeed/mock/changefeed_db_backend.go +++ b/coordinator/changefeed/mock/changefeed_db_backend.go @@ -80,6 +80,21 @@ func (mr *MockBackendMockRecorder) GetAllChangefeeds(ctx interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllChangefeeds", reflect.TypeOf((*MockBackend)(nil).GetAllChangefeeds), ctx) } +// GetChangefeedInfo mocks base method. +func (m *MockBackend) GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetChangefeedInfo", ctx, id) + ret0, _ := ret[0].(*config.ChangeFeedInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetChangefeedInfo indicates an expected call of GetChangefeedInfo. +func (mr *MockBackendMockRecorder) GetChangefeedInfo(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChangefeedInfo", reflect.TypeOf((*MockBackend)(nil).GetChangefeedInfo), ctx, id) +} + // PauseChangefeed mocks base method. func (m *MockBackend) PauseChangefeed(ctx context.Context, id common.ChangeFeedID) error { m.ctrl.T.Helper() @@ -95,11 +110,12 @@ func (mr *MockBackendMockRecorder) PauseChangefeed(ctx, id interface{}) *gomock. } // ResumeChangefeed mocks base method. -func (m *MockBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error { +func (m *MockBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ResumeChangefeed", ctx, id, newCheckpointTs) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*config.ChangeFeedInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ResumeChangefeed indicates an expected call of ResumeChangefeed. diff --git a/coordinator/controller.go b/coordinator/controller.go index d4b92c6187..69cc4e2b69 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -793,11 +793,17 @@ func (c *Controller) ResumeChangefeed( return nil } - if err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs); err != nil { + resumedInfo, err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs) + if err != nil { return err } + if resumedInfo == nil { + return errors.New("resumed changefeed info is nil") + } - clone, err := cf.GetInfo().Clone() + // Use the backend-returned info so direct metadata edits made while the + // changefeed was stopped are not overwritten by the stale in-memory copy. + clone, err := resumedInfo.Clone() if err != nil { return err } @@ -889,6 +895,18 @@ func (c *Controller) GetChangefeed( return info, status, nil } +// GetPersistedChangefeedInfo returns the latest changefeed info persisted in the backend. +// +// Use this for resume-time validation because stopped changefeed metadata can +// be changed outside the coordinator process, for example during metadata +// migration or by legacy tooling. GetChangefeed intentionally returns the +// coordinator's in-memory copy. +func (c *Controller) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + c.apiLock.RLock() + defer c.apiLock.RUnlock() + return c.backend.GetChangefeedInfo(ctx, id) +} + // getChangefeed returns the changefeed by id, return nil if not found func (c *Controller) getChangefeed(id common.ChangeFeedID) *changefeed.Changefeed { return c.changefeedDB.GetByID(id) diff --git a/coordinator/controller_test.go b/coordinator/controller_test.go index f7dc1f96a1..48954cff89 100644 --- a/coordinator/controller_test.go +++ b/coordinator/controller_test.go @@ -121,6 +121,9 @@ func TestOnPeriodTaskAdvanceLiveness(t *testing.T) { } func TestResumeChangefeed(t *testing.T) { + // Scenario: resume should propagate backend failures and update in-memory state after success. + // Steps: try a missing changefeed, simulate a backend resume failure, then return a persisted + // changefeed info from the backend and verify the stopped changefeed becomes normal. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -140,11 +143,11 @@ func TestResumeChangefeed(t *testing.T) { // no changefeed require.NotNil(t, controller.ResumeChangefeed(context.Background(), common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName), 12, true)) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("failed")).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).Times(1) require.NotNil(t, controller.ResumeChangefeed(context.Background(), cfID, 12, true)) require.Equal(t, config.StateFailed, changefeedDB.GetByID(cfID).GetInfo().State) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.Nil(t, controller.ResumeChangefeed(context.Background(), cfID, 12, false)) require.Equal(t, config.StateNormal, changefeedDB.GetByID(cfID).GetInfo().State) } @@ -176,6 +179,9 @@ func TestResumeChangefeedNormalState(t *testing.T) { } func TestResumeChangefeedOverwriteUpdatesLastSavedCheckpointTs(t *testing.T) { + // Scenario: overwrite resume should reset the persisted checkpoint baseline. + // Steps: resume a stopped changefeed with overwriteCheckpointTs and verify the in-memory + // last saved checkpoint is updated to the requested checkpoint. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -194,12 +200,15 @@ func TestResumeChangefeedOverwriteUpdatesLastSavedCheckpointTs(t *testing.T) { changefeedDB.AddStoppedChangefeed(cf) newCheckpointTs := uint64(120) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.Nil(t, controller.ResumeChangefeed(context.Background(), cfID, newCheckpointTs, true)) require.Equal(t, newCheckpointTs, changefeedDB.GetByID(cfID).GetLastSavedCheckPointTs()) } func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { + // Scenario: a stale maintainer error kept in memory must not block manual resume. + // Steps: install an errored in-memory status, resume from the backend, and verify + // the changefeed is scheduled with a clean status. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -228,7 +237,7 @@ func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { _, _, err := cf.ForceUpdateStatus(stale) require.NotNil(t, err) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.NoError(t, controller.ResumeChangefeed(context.Background(), cfID, 100, false)) // The changefeed should be enqueued for scheduling and should not be blocked by the stale error. @@ -242,6 +251,38 @@ func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { require.True(t, cf.ShouldRun()) } +func TestResumeChangefeedUsesBackendReturnedInfo(t *testing.T) { + // Scenario: stopped changefeed metadata can be edited directly in the backend while + // the coordinator still has an older in-memory copy. Steps: resume the changefeed + // with backend-returned info whose sink URI differs from memory, then verify the + // in-memory changefeed uses the backend value instead of overwriting it. + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + changefeedDB := changefeed.NewChangefeedDB(1216) + controller := &Controller{ + backend: backend, + changefeedDB: changefeedDB, + } + cfID := common.NewChangeFeedIDWithName("test-backend-info", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://downstream:3306", + }, 100, true) + changefeedDB.AddStoppedChangefeed(cf) + + backendInfo, err := cf.GetInfo().Clone() + require.NoError(t, err) + backendInfo.SinkURI = "mysql://upstream:4000" + backendInfo.State = config.StateNormal + backend.EXPECT().ResumeChangefeed(gomock.Any(), cfID, uint64(100)).Return(backendInfo, nil).Times(1) + + require.NoError(t, controller.ResumeChangefeed(context.Background(), cfID, 100, false)) + require.Equal(t, "mysql://upstream:4000", changefeedDB.GetByID(cfID).GetInfo().SinkURI) + require.Equal(t, config.StateNormal, changefeedDB.GetByID(cfID).GetInfo().State) +} + func TestPauseChangefeed(t *testing.T) { ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index eb7fb67c9c..c6f735224c 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -410,6 +410,10 @@ func (c *coordinator) GetChangefeed(ctx context.Context, changefeedDisplayName c return c.controller.GetChangefeed(ctx, changefeedDisplayName) } +func (c *coordinator) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + return c.controller.GetPersistedChangefeedInfo(ctx, id) +} + func (c *coordinator) Initialized() bool { return c.controller.initialized.Load() } diff --git a/pkg/server/coordinator.go b/pkg/server/coordinator.go index 299dfe2938..35a6d940ff 100644 --- a/pkg/server/coordinator.go +++ b/pkg/server/coordinator.go @@ -34,6 +34,8 @@ type Coordinator interface { ListChangefeeds(ctx context.Context, keyspace string) ([]*config.ChangeFeedInfo, []*config.ChangeFeedStatus, error) // GetChangefeed returns a changefeed GetChangefeed(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) (*config.ChangeFeedInfo, *config.ChangeFeedStatus, error) + // GetPersistedChangefeedInfo returns the latest backend-persisted changefeed info. + GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) // CreateChangefeed creates a new changefeed CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // RemoveChangefeed gets a changefeed From e42cb78dd356e7a882f025a14faf8f72f2812478 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Fri, 12 Jun 2026 11:49:33 +0800 Subject: [PATCH 4/4] coordinator: complete changefeed metadata on resume --- coordinator/changefeed/etcd_backend.go | 8 ++++ coordinator/changefeed/etcd_backend_test.go | 51 +++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/coordinator/changefeed/etcd_backend.go b/coordinator/changefeed/etcd_backend.go index eca17a9024..c6378a03a2 100644 --- a/coordinator/changefeed/etcd_backend.go +++ b/coordinator/changefeed/etcd_backend.go @@ -263,6 +263,7 @@ func (b *EtcdBackend) DeleteChangefeed(ctx context.Context, return nil } +// ResumeChangefeed persists a resumed changefeed and returns the metadata used by the caller. func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64, ) (*config.ChangeFeedInfo, error) { @@ -270,6 +271,13 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, if err != nil { return nil, errors.Trace(err) } + // Legacy stopped changefeeds can contain sparse metadata that was completed + // during coordinator bootstrap. Complete it again before persisting the + // resumed state so backend-loaded metadata does not drop compatibility defaults. + if info.Config == nil { + info.Config = config.GetDefaultReplicaConfig() + } + info.VerifyAndComplete() info.State = config.StateNormal newStr, err := info.Marshal() if err != nil { diff --git a/coordinator/changefeed/etcd_backend_test.go b/coordinator/changefeed/etcd_backend_test.go index 42959e69b2..3cab212d6f 100644 --- a/coordinator/changefeed/etcd_backend_test.go +++ b/coordinator/changefeed/etcd_backend_test.go @@ -221,6 +221,57 @@ func TestResumeChangefeed(t *testing.T) { require.Equal(t, changefeedID, resumedInfo.ChangefeedID) } +func TestResumeChangefeedCompletesLegacySchedulerDefaults(t *testing.T) { + // Scenario: an old owner persisted a stopped changefeed with only explicit scheduler fields. + // Steps: resume that sparse metadata, inspect the etcd put payload, and verify resume persists + // compatibility defaults such as RegionCountPerSpan before returning the resumed info. + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cdcClient := etcd.NewMockCDCEtcdClient(ctrl) + etcdClient := etcd.NewMockClient(ctrl) + cdcClient.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() + cdcClient.EXPECT().GetClusterID().Return("test-cluster-id").AnyTimes() + backend := NewEtcdBackend(cdcClient) + + changefeedID := common.NewChangeFeedIDWithName("test-scheduler-defaults", common.DefaultKeyspaceName) + enableTableAcrossNodes := false + regionThreshold := 20 + writeKeyThreshold := 10485760 + info := &config.ChangeFeedInfo{ + ChangefeedID: changefeedID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://127.0.0.1:3306", + } + info.Config.Scheduler = &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: &enableTableAcrossNodes, + RegionThreshold: ®ionThreshold, + WriteKeyThreshold: &writeKeyThreshold, + } + + cdcClient.EXPECT().GetChangeFeedInfo(gomock.Any(), changefeedID.DisplayName).Return(info, nil).Times(1) + etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), NewFuncMatcher(func(i interface{}) bool { + ops := i.([]clientv3.Op) + require.Len(t, ops, 1) + require.True(t, ops[0].IsPut()) + + persistedInfo := &config.ChangeFeedInfo{} + require.NoError(t, persistedInfo.Unmarshal(ops[0].ValueBytes())) + require.Equal(t, config.StateNormal, persistedInfo.State) + require.NotNil(t, persistedInfo.Config) + require.NotNil(t, persistedInfo.Config.Scheduler) + require.NotNil(t, persistedInfo.Config.Scheduler.RegionCountPerSpan) + require.Greater(t, *persistedInfo.Config.Scheduler.RegionCountPerSpan, 0) + return true + }), gomock.Any()).Return(&clientv3.TxnResponse{Succeeded: true}, nil).Times(1) + + resumedInfo, err := backend.ResumeChangefeed(context.Background(), changefeedID, 0) + require.NoError(t, err) + require.NotNil(t, resumedInfo.Config.Scheduler.RegionCountPerSpan) + require.Greater(t, *resumedInfo.Config.Scheduler.RegionCountPerSpan, 0) +} + func TestSetChangefeedProgress(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish()