From 3b20080e3c3d1231f36aa970d4102a6adc92d6aa Mon Sep 17 00:00:00 2001 From: wlwilliamx <53336371+wlwilliamx@users.noreply.github.com> Date: Fri, 12 Jun 2026 12:50:57 +0800 Subject: [PATCH] This is an automated cherry-pick of #4832 Signed-off-by: ti-chi-bot --- api/v2/changefeed.go | 8 + api/v2/changefeed_test.go | 264 ++++++++++++++++++ .../changefeed/changefeed_db_backend.go | 6 +- coordinator/changefeed/etcd_backend.go | 41 ++- coordinator/changefeed/etcd_backend_test.go | 60 +++- .../changefeed/mock/changefeed_db_backend.go | 22 +- coordinator/controller.go | 33 ++- coordinator/controller_test.go | 110 +++++++- coordinator/coordinator.go | 4 + pkg/server/coordinator.go | 2 + server/module_election.go | 26 +- server/module_election_test.go | 45 +++ server/server.go | 2 +- 13 files changed, 592 insertions(+), 31 deletions(-) create mode 100644 api/v2/changefeed_test.go create mode 100644 server/module_election_test.go diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 2524389312..e1718ed5cd 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -746,6 +746,14 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { } middleware.SetChangefeedOperationTarget(c, cfInfo.ChangefeedID.Keyspace(), cfInfo.ChangefeedID.Name()) + // Resume validation must use persisted metadata because stopped changefeeds + // can be edited outside the coordinator process during legacy migration. + cfInfo, err = co.GetPersistedChangefeedInfo(ctx, cfInfo.ChangefeedID) + if err != nil { + _ = c.Error(err) + return + } + // If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not. newCheckpointTs := status.CheckpointTs overwriteCheckpointTs := false diff --git a/api/v2/changefeed_test.go b/api/v2/changefeed_test.go new file mode 100644 index 0000000000..389631954e --- /dev/null +++ b/api/v2/changefeed_test.go @@ -0,0 +1,264 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/gin-gonic/gin" + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/ticdc/maintainer" + "github.com/pingcap/ticdc/pkg/api" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/liveness" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/server" + "github.com/pingcap/ticdc/pkg/util" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" +) + +// TestValidateResumeChangefeedState covers the API-side guard that runs before +// resume GC safepoint/barrier setup. Running states must fail fast, while states +// that are actually stopped can proceed to the remaining resume validation. +func TestValidateResumeChangefeedState(t *testing.T) { + for _, state := range []config.FeedState{config.StateStopped, config.StateFailed, config.StateFinished} { + require.NoError(t, validateResumeChangefeedState(state)) + } + + for _, state := range []config.FeedState{config.StateNormal, config.StateWarning, config.StatePending} { + err := validateResumeChangefeedState(state) + require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(err)) + require.Contains(t, err.Error(), string(state)) + } +} + +// TestResumeChangefeedRejectsNormalBeforeGC covers the HTTP resume regression: +// a normal changefeed must fail before the handler requests PD/etcd clients for +// GC safepoint/barrier setup or calls the coordinator resume path. +func TestResumeChangefeedRejectsNormalBeforeGC(t *testing.T) { + gin.SetMode(gin.TestMode) + + co := &resumeNormalCoordinator{} + srv := &resumeNormalServer{coordinator: co} + h := &OpenAPIV2{server: srv} + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, "/api/v2/changefeeds/test/resume?keyspace=default", nil) + c.Params = gin.Params{{Key: api.APIOpVarChangefeedID, Value: "test"}} + c.Set("ctx-keyspace", &keyspacepb.KeyspaceMeta{ + Id: common.DefaultKeyspaceID, + State: keyspacepb.KeyspaceState_ENABLED, + }) + + h.ResumeChangefeed(c) + + require.Len(t, c.Errors, 1) + require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(c.Errors.Last().Err)) + require.False(t, srv.pdClientRequested) + require.False(t, srv.etcdClientRequested) + require.False(t, co.resumeCalled) +} + +type resumeNormalServer struct { + coordinator server.Coordinator + pdClientRequested bool + etcdClientRequested bool +} + +func (s *resumeNormalServer) Run(ctx context.Context) error { return nil } + +func (s *resumeNormalServer) Close() {} + +func (s *resumeNormalServer) SelfInfo() (*node.Info, error) { return nil, nil } + +func (s *resumeNormalServer) Liveness() liveness.Liveness { return liveness.CaptureAlive } + +func (s *resumeNormalServer) GetCoordinator() (server.Coordinator, error) { + return s.coordinator, nil +} + +func (s *resumeNormalServer) IsCoordinator() bool { return true } + +func (s *resumeNormalServer) GetCoordinatorInfo(ctx context.Context) (*node.Info, error) { + return nil, nil +} + +func (s *resumeNormalServer) GetPdClient() pd.Client { + s.pdClientRequested = true + return nil +} + +func (s *resumeNormalServer) GetEtcdClient() etcd.CDCEtcdClient { + s.etcdClientRequested = true + return nil +} + +func (s *resumeNormalServer) GetMaintainerManager() *maintainer.Manager { return nil } + +type resumeNormalCoordinator struct { + resumeCalled bool +} + +func (c *resumeNormalCoordinator) Stop() {} + +func (c *resumeNormalCoordinator) Run(ctx context.Context) error { return nil } + +func (c *resumeNormalCoordinator) ListChangefeeds(ctx context.Context, keyspace string) ([]*config.ChangeFeedInfo, []*config.ChangeFeedStatus, error) { + return nil, nil, nil +} + +func (c *resumeNormalCoordinator) GetChangefeed(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) (*config.ChangeFeedInfo, *config.ChangeFeedStatus, error) { + changefeedID := common.NewChangeFeedIDWithName(changefeedDisplayName.Name, changefeedDisplayName.Keyspace) + return &config.ChangeFeedInfo{ + ChangefeedID: changefeedID, + State: config.StateNormal, + }, &config.ChangeFeedStatus{ + CheckpointTs: 123, + }, nil +} + +func (c *resumeNormalCoordinator) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + return &config.ChangeFeedInfo{ + ChangefeedID: id, + State: config.StateNormal, + }, nil +} + +func (c *resumeNormalCoordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { + return nil +} + +func (c *resumeNormalCoordinator) RemoveChangefeed(ctx context.Context, id common.ChangeFeedID) (uint64, error) { + return 0, nil +} + +func (c *resumeNormalCoordinator) PauseChangefeed(ctx context.Context, id common.ChangeFeedID) error { + return nil +} + +func (c *resumeNormalCoordinator) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64, overwriteCheckpointTs bool) error { + c.resumeCalled = true + return nil +} + +func (c *resumeNormalCoordinator) UpdateChangefeed(ctx context.Context, change *config.ChangeFeedInfo) error { + return nil +} + +func (c *resumeNormalCoordinator) RequestResolvedTsFromLogCoordinator(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) { +} + +func (c *resumeNormalCoordinator) DrainNode(ctx context.Context, target node.ID) (int, error) { + return 0, nil +} + +func (c *resumeNormalCoordinator) Initialized() bool { return true } + +// TestMaskSinkURIForError verifies that error messages mask sensitive sink URI +// fields. It checks both a valid URI with secret query parameters and an invalid +// URI parse error that previously exposed raw credentials. +func TestMaskSinkURIForError(t *testing.T) { + sinkURI := "kafka://127.0.0.1:9092/topic?protocol=canal-json" + + "&sasl-user=ticdc&sasl-password=verysecure&secret-access-key=rawsecret" + + maskedURI := maskSinkURIForError(sinkURI) + require.NotContains(t, maskedURI, "verysecure") + require.NotContains(t, maskedURI, "rawsecret") + require.Contains(t, maskedURI, "sasl-password=xxxxx") + require.Contains(t, maskedURI, "secret-access-key=xxxxx") + require.Contains(t, maskedURI, "sasl-user=ticdc") + + invalidURI := "mysql://root:verysecure@127.0.0.1/%zz" + require.Equal(t, "", maskSinkURIForError(invalidURI)) + + err := genSinkURIInvalidError(invalidURI, mustParseURLError(t, invalidURI)) + require.NotContains(t, err.Error(), "verysecure") + require.Contains(t, err.Error(), "") + require.Contains(t, err.Error(), `parse ""`) + require.Contains(t, err.Error(), "invalid URL escape") +} + +func mustParseURLError(t *testing.T, rawURL string) error { + t.Helper() + + _, err := url.Parse(rawURL) + require.Error(t, err) + return err +} + +// TestVerifyRouteConflict covers route conflict detection for eligible and +// ineligible source tables. It exercises the safe cases first, then verifies +// that conflicts report both the shared target table and conflicting sources. +func TestVerifyRouteConflict(t *testing.T) { + t.Parallel() + + changefeedID := common.NewChangeFeedIDWithName("test-changefeed", common.DefaultKeyspaceName) + replicaCfg := config.GetDefaultReplicaConfig() + replicaCfg.Sink.DispatchRules = []*config.DispatchRule{ + {Matcher: []string{"db1.*"}, TargetSchema: "archive", TargetTable: "{table}"}, + {Matcher: []string{"db2.*"}, TargetSchema: "archive", TargetTable: "{table}"}, + } + + eligibleTables := []common.TableName{{Schema: "db1", Table: "orders"}} + ineligibleTables := []common.TableName{{Schema: "db2", Table: "orders"}} + + replicaCfg.ForceReplicate = util.AddressOf(false) + replicaCfg.IgnoreIneligibleTable = util.AddressOf(true) + require.NoError(t, verifyRouteConflict(changefeedID, eligibleTables, ineligibleTables, replicaCfg)) + + replicaCfg.IgnoreIneligibleTable = util.AddressOf(false) + require.NoError(t, verifyRouteConflict(changefeedID, eligibleTables, ineligibleTables, replicaCfg)) + + err := verifyRouteConflict( + changefeedID, + []common.TableName{{Schema: "db1", Table: "orders"}, {Schema: "db2", Table: "orders"}}, + ineligibleTables, + replicaCfg, + ) + require.Error(t, err) + require.True(t, cerror.ErrTableRouteConflict.Equal(err)) + + replicaCfg.ForceReplicate = util.AddressOf(true) + err = verifyRouteConflict(changefeedID, eligibleTables, ineligibleTables, replicaCfg) + require.Error(t, err) + require.True(t, cerror.ErrTableRouteConflict.Equal(err)) + require.Contains(t, err.Error(), "target `archive`.`orders`") + require.Contains(t, err.Error(), "source `db1`.`orders`") + require.Contains(t, err.Error(), "source `db2`.`orders`") + + replicaCfg.ForceReplicate = util.AddressOf(false) + replicaCfg.Sink.DispatchRules = []*config.DispatchRule{ + {Matcher: []string{"db2.*"}, TargetSchema: "db1", TargetTable: "{table}"}, + } + err = verifyRouteConflict( + changefeedID, + []common.TableName{{Schema: "db1", Table: "orders"}, {Schema: "db2", Table: "orders"}}, + nil, + replicaCfg, + ) + require.Error(t, err) + require.True(t, cerror.ErrTableRouteConflict.Equal(err)) + require.Contains(t, err.Error(), "target `db1`.`orders`") + require.Contains(t, err.Error(), "source `db1`.`orders`") + require.Contains(t, err.Error(), "source `db2`.`orders`") +} diff --git a/coordinator/changefeed/changefeed_db_backend.go b/coordinator/changefeed/changefeed_db_backend.go index 2f92850a87..1eacd755e3 100644 --- a/coordinator/changefeed/changefeed_db_backend.go +++ b/coordinator/changefeed/changefeed_db_backend.go @@ -24,6 +24,8 @@ import ( type Backend interface { // GetAllChangefeeds returns all changefeeds from the backend db, include stopped and failed changefeeds GetAllChangefeeds(ctx context.Context) (map[common.ChangeFeedID]*ChangefeedMetaWrapper, error) + // GetChangefeedInfo returns the latest persisted changefeed info from the backend db. + GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) // CreateChangefeed saves changefeed info and status to db CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // UpdateChangefeed updates changefeed info to db @@ -34,8 +36,8 @@ type Backend interface { DeleteChangefeed(ctx context.Context, id common.ChangeFeedID) error // SetChangefeedProgress persists the operation progress status to db for a changefeed SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error - // ResumeChangefeed persists the resumed status to db for a changefeed - ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error + // ResumeChangefeed persists the resumed status to db for a changefeed and returns the resumed info. + ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error) // UpdateChangefeedCheckpointTs persists the checkpointTs for changefeeds UpdateChangefeedCheckpointTs(ctx context.Context, checkpointTs map[common.ChangeFeedID]uint64) error } diff --git a/coordinator/changefeed/etcd_backend.go b/coordinator/changefeed/etcd_backend.go index 6d9de399e3..c6378a03a2 100644 --- a/coordinator/changefeed/etcd_backend.go +++ b/coordinator/changefeed/etcd_backend.go @@ -124,6 +124,21 @@ func (b *EtcdBackend) GetAllChangefeeds(ctx context.Context) (map[common.ChangeF return cfMap, nil } +// GetChangefeedInfo returns the latest persisted changefeed info from etcd. +func (b *EtcdBackend) GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + info, err := b.etcdClient.GetChangeFeedInfo(ctx, id.DisplayName) + if err != nil { + return nil, errors.Trace(err) + } + // Old metadata may not embed ChangefeedID in the value. Keep the backend + // lookup key as the source of truth so callers can safely use the returned + // info for validation and in-memory replacement. + if info.ChangefeedID.Name() == "" { + info.ChangefeedID = id + } + return info, nil +} + func (b *EtcdBackend) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo, ) error { @@ -248,17 +263,25 @@ func (b *EtcdBackend) DeleteChangefeed(ctx context.Context, return nil } +// ResumeChangefeed persists a resumed changefeed and returns the metadata used by the caller. func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64, -) error { - info, err := b.etcdClient.GetChangeFeedInfo(ctx, id.DisplayName) +) (*config.ChangeFeedInfo, error) { + info, err := b.GetChangefeedInfo(ctx, id) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) + } + // Legacy stopped changefeeds can contain sparse metadata that was completed + // during coordinator bootstrap. Complete it again before persisting the + // resumed state so backend-loaded metadata does not drop compatibility defaults. + if info.Config == nil { + info.Config = config.GetDefaultReplicaConfig() } + info.VerifyAndComplete() info.State = config.StateNormal newStr, err := info.Marshal() if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } infoKey := etcd.GetEtcdKeyChangeFeedInfo(b.etcdClient.GetClusterID(), id.DisplayName) opsThen := []clientv3.Op{ @@ -267,13 +290,13 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, if newCheckpointTs > 0 { status, _, err := b.etcdClient.GetChangeFeedStatus(ctx, id) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } status.CheckpointTs = newCheckpointTs status.Progress = config.ProgressNone jobValue, err := status.Marshal() if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } jobKey := etcd.GetEtcdKeyJob(b.etcdClient.GetClusterID(), id.DisplayName) opsThen = append(opsThen, clientv3.OpPut(jobKey, jobValue)) @@ -281,13 +304,13 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, putResp, err := b.etcdClient.GetEtcdClient().Txn(ctx, nil, opsThen, []clientv3.Op{}) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } if !putResp.Succeeded { err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("resume changefeed %s", info.ChangefeedID.Name())) - return errors.Trace(err) + return nil, errors.Trace(err) } - return nil + return info, nil } func (b *EtcdBackend) SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error { diff --git a/coordinator/changefeed/etcd_backend_test.go b/coordinator/changefeed/etcd_backend_test.go index a801fbcaec..66a6fc831f 100644 --- a/coordinator/changefeed/etcd_backend_test.go +++ b/coordinator/changefeed/etcd_backend_test.go @@ -193,6 +193,11 @@ func TestDeleteChangefeed(t *testing.T) { } func TestResumeChangefeed(t *testing.T) { + // Scenario: resuming a stopped changefeed persists the normal state and + // returns the metadata that was actually loaded from etcd. + // Steps: + // 1) Load legacy changefeed info without an embedded ChangefeedID. + // 2) Resume the changefeed and assert the returned info is normalized. ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -210,8 +215,61 @@ func TestResumeChangefeed(t *testing.T) { cdcClient.EXPECT().GetChangeFeedStatus(gomock.Any(), changefeedID).Return(status, int64(0), nil).Times(1) etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.TxnResponse{Succeeded: true}, nil).Times(1) - err := backend.ResumeChangefeed(context.Background(), changefeedID, 200) + resumedInfo, err := backend.ResumeChangefeed(context.Background(), changefeedID, 200) require.Nil(t, err) + require.Equal(t, config.StateNormal, resumedInfo.State) + require.Equal(t, changefeedID, resumedInfo.ChangefeedID) +} + +func TestResumeChangefeedCompletesLegacySchedulerDefaults(t *testing.T) { + // Scenario: an old owner persisted a stopped changefeed with only explicit scheduler fields. + // Steps: resume that sparse metadata, inspect the etcd put payload, and verify resume persists + // compatibility defaults such as RegionCountPerSpan before returning the resumed info. + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cdcClient := etcd.NewMockCDCEtcdClient(ctrl) + etcdClient := etcd.NewMockClient(ctrl) + cdcClient.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes() + cdcClient.EXPECT().GetClusterID().Return("test-cluster-id").AnyTimes() + backend := NewEtcdBackend(cdcClient) + + changefeedID := common.NewChangeFeedIDWithName("test-scheduler-defaults", common.DefaultKeyspaceName) + enableTableAcrossNodes := false + regionThreshold := 20 + writeKeyThreshold := 10485760 + info := &config.ChangeFeedInfo{ + ChangefeedID: changefeedID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://127.0.0.1:3306", + } + info.Config.Scheduler = &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: &enableTableAcrossNodes, + RegionThreshold: ®ionThreshold, + WriteKeyThreshold: &writeKeyThreshold, + } + + cdcClient.EXPECT().GetChangeFeedInfo(gomock.Any(), changefeedID.DisplayName).Return(info, nil).Times(1) + etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), NewFuncMatcher(func(i interface{}) bool { + ops := i.([]clientv3.Op) + require.Len(t, ops, 1) + require.True(t, ops[0].IsPut()) + + persistedInfo := &config.ChangeFeedInfo{} + require.NoError(t, persistedInfo.Unmarshal(ops[0].ValueBytes())) + require.Equal(t, config.StateNormal, persistedInfo.State) + require.NotNil(t, persistedInfo.Config) + require.NotNil(t, persistedInfo.Config.Scheduler) + require.NotNil(t, persistedInfo.Config.Scheduler.RegionCountPerSpan) + require.Greater(t, *persistedInfo.Config.Scheduler.RegionCountPerSpan, 0) + return true + }), gomock.Any()).Return(&clientv3.TxnResponse{Succeeded: true}, nil).Times(1) + + resumedInfo, err := backend.ResumeChangefeed(context.Background(), changefeedID, 0) + require.NoError(t, err) + require.NotNil(t, resumedInfo.Config.Scheduler.RegionCountPerSpan) + require.Greater(t, *resumedInfo.Config.Scheduler.RegionCountPerSpan, 0) } func TestSetChangefeedProgress(t *testing.T) { diff --git a/coordinator/changefeed/mock/changefeed_db_backend.go b/coordinator/changefeed/mock/changefeed_db_backend.go index 80671c5c39..8db908a7b4 100644 --- a/coordinator/changefeed/mock/changefeed_db_backend.go +++ b/coordinator/changefeed/mock/changefeed_db_backend.go @@ -80,6 +80,21 @@ func (mr *MockBackendMockRecorder) GetAllChangefeeds(ctx interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllChangefeeds", reflect.TypeOf((*MockBackend)(nil).GetAllChangefeeds), ctx) } +// GetChangefeedInfo mocks base method. +func (m *MockBackend) GetChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetChangefeedInfo", ctx, id) + ret0, _ := ret[0].(*config.ChangeFeedInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetChangefeedInfo indicates an expected call of GetChangefeedInfo. +func (mr *MockBackendMockRecorder) GetChangefeedInfo(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChangefeedInfo", reflect.TypeOf((*MockBackend)(nil).GetChangefeedInfo), ctx, id) +} + // PauseChangefeed mocks base method. func (m *MockBackend) PauseChangefeed(ctx context.Context, id common.ChangeFeedID) error { m.ctrl.T.Helper() @@ -95,11 +110,12 @@ func (mr *MockBackendMockRecorder) PauseChangefeed(ctx, id interface{}) *gomock. } // ResumeChangefeed mocks base method. -func (m *MockBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error { +func (m *MockBackend) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ResumeChangefeed", ctx, id, newCheckpointTs) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*config.ChangeFeedInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ResumeChangefeed indicates an expected call of ResumeChangefeed. diff --git a/coordinator/controller.go b/coordinator/controller.go index 5f01c837df..f843a2e936 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -863,11 +863,17 @@ func (c *Controller) ResumeChangefeed( return nil } - if err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs); err != nil { + resumedInfo, err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs) + if err != nil { return err } + if resumedInfo == nil { + return errors.New("resumed changefeed info is nil") + } - clone, err := cf.GetInfo().Clone() + // Use the backend-returned info so direct metadata edits made while the + // changefeed was stopped are not overwritten by the stale in-memory copy. + clone, err := resumedInfo.Clone() if err != nil { return err } @@ -923,6 +929,10 @@ func (c *Controller) ListChangefeeds(_ context.Context, keyspace string) ([]*con return infos, statuses, nil } +// GetChangefeed returns a copy of the changefeed info and the current status. +// API callers mutate the returned info when validating update requests, so the +// copy prevents those writes from racing with coordinator goroutines that read +// the in-memory changefeed state. func (c *Controller) GetChangefeed( _ context.Context, changefeedDisplayName common.ChangeFeedDisplayName, @@ -939,6 +949,11 @@ func (c *Controller) GetChangefeed( return nil, nil, errors.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedDisplayName.Name) } + info, err := cf.GetInfo().Clone() + if err != nil { + return nil, nil, errors.Trace(err) + } + maintainerID := cf.GetNodeID() nodeInfo := c.nodeManager.GetNodeInfo(maintainerID) maintainerAddr := "" @@ -947,7 +962,19 @@ func (c *Controller) GetChangefeed( } status := &config.ChangeFeedStatus{CheckpointTs: cf.GetStatus().CheckpointTs, LastSyncedTs: cf.GetStatus().LastSyncedTs, LogCoordinatorResolvedTs: cf.GetLogCoordinatorResolvedTs()} status.SetMaintainerAddr(maintainerAddr) - return cf.GetInfo(), status, nil + return info, status, nil +} + +// GetPersistedChangefeedInfo returns the latest changefeed info persisted in the backend. +// +// Use this for resume-time validation because stopped changefeed metadata can +// be changed outside the coordinator process, for example during metadata +// migration or by legacy tooling. GetChangefeed intentionally returns the +// coordinator's in-memory copy. +func (c *Controller) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + c.apiLock.RLock() + defer c.apiLock.RUnlock() + return c.backend.GetChangefeedInfo(ctx, id) } // getChangefeed returns the changefeed by id, return nil if not found diff --git a/coordinator/controller_test.go b/coordinator/controller_test.go index dace08fe7e..c0a369175f 100644 --- a/coordinator/controller_test.go +++ b/coordinator/controller_test.go @@ -15,6 +15,7 @@ package coordinator import ( "context" + "sync" "testing" "time" @@ -257,6 +258,9 @@ func TestMaintainerHeartbeatAdmissionRequiresInitializedSender(t *testing.T) { } func TestResumeChangefeed(t *testing.T) { + // Scenario: resume should propagate backend failures and update in-memory state after success. + // Steps: try a missing changefeed, simulate a backend resume failure, then return a persisted + // changefeed info from the backend and verify the stopped changefeed becomes normal. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -276,11 +280,11 @@ func TestResumeChangefeed(t *testing.T) { // no changefeed require.NotNil(t, controller.ResumeChangefeed(context.Background(), common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName), 12, true)) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("failed")).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).Times(1) require.NotNil(t, controller.ResumeChangefeed(context.Background(), cfID, 12, true)) require.Equal(t, config.StateFailed, changefeedDB.GetByID(cfID).GetInfo().State) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.Nil(t, controller.ResumeChangefeed(context.Background(), cfID, 12, false)) require.Equal(t, config.StateNormal, changefeedDB.GetByID(cfID).GetInfo().State) } @@ -312,6 +316,9 @@ func TestResumeChangefeedNormalState(t *testing.T) { } func TestResumeChangefeedOverwriteUpdatesLastSavedCheckpointTs(t *testing.T) { + // Scenario: overwrite resume should reset the persisted checkpoint baseline. + // Steps: resume a stopped changefeed with overwriteCheckpointTs and verify the in-memory + // last saved checkpoint is updated to the requested checkpoint. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -330,12 +337,15 @@ func TestResumeChangefeedOverwriteUpdatesLastSavedCheckpointTs(t *testing.T) { changefeedDB.AddStoppedChangefeed(cf) newCheckpointTs := uint64(120) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.Nil(t, controller.ResumeChangefeed(context.Background(), cfID, newCheckpointTs, true)) require.Equal(t, newCheckpointTs, changefeedDB.GetByID(cfID).GetLastSavedCheckPointTs()) } func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { + // Scenario: a stale maintainer error kept in memory must not block manual resume. + // Steps: install an errored in-memory status, resume from the backend, and verify + // the changefeed is scheduled with a clean status. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -364,7 +374,7 @@ func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { _, _, err := cf.ForceUpdateStatus(stale) require.NotNil(t, err) - backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + backend.EXPECT().ResumeChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).Return(cf.GetInfo(), nil).Times(1) require.NoError(t, controller.ResumeChangefeed(context.Background(), cfID, 100, false)) // The changefeed should be enqueued for scheduling and should not be blocked by the stale error. @@ -378,6 +388,38 @@ func TestResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules(t *testing.T) { require.True(t, cf.ShouldRun()) } +func TestResumeChangefeedUsesBackendReturnedInfo(t *testing.T) { + // Scenario: stopped changefeed metadata can be edited directly in the backend while + // the coordinator still has an older in-memory copy. Steps: resume the changefeed + // with backend-returned info whose sink URI differs from memory, then verify the + // in-memory changefeed uses the backend value instead of overwriting it. + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + changefeedDB := changefeed.NewChangefeedDB(1216) + controller := &Controller{ + backend: backend, + changefeedDB: changefeedDB, + } + cfID := common.NewChangeFeedIDWithName("test-backend-info", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://downstream:3306", + }, 100, true) + changefeedDB.AddStoppedChangefeed(cf) + + backendInfo, err := cf.GetInfo().Clone() + require.NoError(t, err) + backendInfo.SinkURI = "mysql://upstream:4000" + backendInfo.State = config.StateNormal + backend.EXPECT().ResumeChangefeed(gomock.Any(), cfID, uint64(100)).Return(backendInfo, nil).Times(1) + + require.NoError(t, controller.ResumeChangefeed(context.Background(), cfID, 100, false)) + require.Equal(t, "mysql://upstream:4000", changefeedDB.GetByID(cfID).GetInfo().SinkURI) + require.Equal(t, config.StateNormal, changefeedDB.GetByID(cfID).GetInfo().State) +} + func TestPauseChangefeed(t *testing.T) { ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) @@ -466,6 +508,9 @@ func TestUpdateChangefeed(t *testing.T) { } func TestGetChangefeed(t *testing.T) { + // Scenario: API-facing GetChangefeed must not expose coordinator-owned mutable info. + // Steps: fetch a changefeed, mutate the returned copy, and verify the in-memory + // changefeed state is unchanged while status fields are still reported. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -490,10 +535,67 @@ func TestGetChangefeed(t *testing.T) { require.Equal(t, ret.State, config.StateStopped) require.Equal(t, uint64(1), status.CheckpointTs) + ret.SinkURI = "kafka://127.0.0.1:9092" + ret.Config = nil + storedInfo := changefeedDB.GetByID(cfID).GetInfo() + require.Equal(t, "mysql://127.0.0.1:3306", storedInfo.SinkURI) + require.NotNil(t, storedInfo.Config) + _, _, err = controller.GetChangefeed(context.Background(), common.NewChangeFeedDisplayName("test1", "default")) require.True(t, errors.ErrChangeFeedNotExists.Equal(err)) } +func TestGetChangefeedReturnedInfoMutationDoesNotRaceWithStoredInfo(t *testing.T) { + // Scenario: API handlers may mutate the info returned from GetChangefeed while coordinator + // goroutines read the stored info. Steps: mutate the returned copy and read the stored + // info concurrently, then verify the stored fields remain unchanged. + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + changefeedDB := changefeed.NewChangefeedDB(1216) + nodeManager := watcher.NewNodeManager(nil, nil) + controller := &Controller{ + backend: backend, + changefeedDB: changefeedDB, + nodeManager: nodeManager, + } + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + State: config.StateStopped, + SinkURI: "mysql://127.0.0.1:3306", + }, 1, true) + changefeedDB.AddStoppedChangefeed(cf) + + ret, _, err := controller.GetChangefeed(context.Background(), cfID.DisplayName) + require.NoError(t, err) + + var ( + wg sync.WaitGroup + storedChanged bool + ) + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + ret.SinkURI = "kafka://127.0.0.1:9092" + ret.TargetTs = uint64(i + 1) + } + }() + go func() { + defer wg.Done() + storedInfo := changefeedDB.GetByID(cfID).GetInfo() + for i := 0; i < 1000; i++ { + if storedInfo.SinkURI != "mysql://127.0.0.1:3306" || storedInfo.TargetTs != 0 { + storedChanged = true + return + } + } + }() + wg.Wait() + require.False(t, storedChanged) +} + func TestRemoveChangefeed(t *testing.T) { ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index aff6d9ab11..db750fe8d1 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -430,6 +430,10 @@ func (c *coordinator) GetChangefeed(ctx context.Context, changefeedDisplayName c return c.controller.GetChangefeed(ctx, changefeedDisplayName) } +func (c *coordinator) GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) { + return c.controller.GetPersistedChangefeedInfo(ctx, id) +} + func (c *coordinator) DrainNode(ctx context.Context, target node.ID) (int, error) { return c.controller.DrainNode(ctx, target) } diff --git a/pkg/server/coordinator.go b/pkg/server/coordinator.go index f2ac7e73e7..cc66d5716e 100644 --- a/pkg/server/coordinator.go +++ b/pkg/server/coordinator.go @@ -35,6 +35,8 @@ type Coordinator interface { ListChangefeeds(ctx context.Context, keyspace string) ([]*config.ChangeFeedInfo, []*config.ChangeFeedStatus, error) // GetChangefeed returns a changefeed GetChangefeed(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) (*config.ChangeFeedInfo, *config.ChangeFeedStatus, error) + // GetPersistedChangefeedInfo returns the latest backend-persisted changefeed info. + GetPersistedChangefeedInfo(ctx context.Context, id common.ChangeFeedID) (*config.ChangeFeedInfo, error) // CreateChangefeed creates a new changefeed CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // RemoveChangefeed gets a changefeed diff --git a/server/module_election.go b/server/module_election.go index 6b0df0e266..f617cc7689 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -38,19 +38,25 @@ type elector struct { // election used for coordinator election *concurrency.Election // election used for log coordinator - logElection *concurrency.Election - svr *server + logElection *concurrency.Election + coordinatorMaxTaskConcurrency int + coordinatorCheckBalanceInterval time.Duration + svr *server } -func NewElector(server *server) common.SubModule { +// NewElector creates the coordinator elector with scheduler settings captured from server startup config. +func NewElector(server *server, schedulerCfg *config.SchedulerConfig) common.SubModule { election := concurrency.NewElection(server.session, etcd.CaptureOwnerKey(server.EtcdClient.GetClusterID())) logElection := concurrency.NewElection(server.session, etcd.LogCoordinatorKey(server.EtcdClient.GetClusterID())) + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings(schedulerCfg) return &elector{ - election: election, - logElection: logElection, - svr: server, + election: election, + logElection: logElection, + coordinatorMaxTaskConcurrency: maxTaskConcurrency, + coordinatorCheckBalanceInterval: checkBalanceInterval, + svr: server, } } @@ -140,8 +146,8 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { changefeed.NewEtcdBackend(e.svr.EtcdClient), e.svr.EtcdClient.GetGCServiceID(), coordinatorVersion, - 10000, - time.Minute, + e.coordinatorMaxTaskConcurrency, + e.coordinatorCheckBalanceInterval, ) e.svr.setCoordinator(co) err = co.Run(ctx) @@ -191,6 +197,10 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { } } +func coordinatorSchedulerSettings(schedulerCfg *config.SchedulerConfig) (int, time.Duration) { + return schedulerCfg.MaxTaskConcurrency, time.Duration(schedulerCfg.CheckBalanceInterval) +} + func (e *elector) campaignLogCoordinator(ctx context.Context) error { // Limit the frequency of elections to avoid putting too much pressure on the etcd server rl := rate.NewLimiter(rate.Every(time.Second), 1 /* burst */) diff --git a/server/module_election_test.go b/server/module_election_test.go new file mode 100644 index 0000000000..26ff16684a --- /dev/null +++ b/server/module_election_test.go @@ -0,0 +1,45 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "testing" + "time" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestCoordinatorSchedulerSettingsUsesCapturedConfig(t *testing.T) { + // Scenario: the coordinator should honor the scheduler config captured during server startup. + // Steps: install a different global config, read settings from an explicit scheduler config, + // and verify both the concurrency limit and balance interval come from the captured config. + original := config.GetGlobalServerConfig() + t.Cleanup(func() { + config.StoreGlobalServerConfig(original) + }) + + globalCfg := config.GetDefaultServerConfig() + globalCfg.Debug.Scheduler.MaxTaskConcurrency = 9 + globalCfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(44 * time.Second) + config.StoreGlobalServerConfig(globalCfg) + + cfg := config.GetDefaultServerConfig() + cfg.Debug.Scheduler.MaxTaskConcurrency = 3 + cfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(22 * time.Second) + + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings(cfg.Debug.Scheduler) + require.Equal(t, 3, maxTaskConcurrency) + require.Equal(t, 22*time.Second, checkBalanceInterval) +} diff --git a/server/server.go b/server/server.go index 421f4f4762..569c85e8da 100644 --- a/server/server.go +++ b/server/server.go @@ -214,7 +214,7 @@ func (c *server) initialize(ctx context.Context) error { c.nodeModules = []common.SubModule{ nodeManager, - NewElector(c), + NewElector(c, conf.Debug.Scheduler), } c.subModules = []common.SubModule{