Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 44 additions & 31 deletions coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package changefeed

import (
"encoding/json"
"net/url"
"sync"

Expand All @@ -41,7 +40,6 @@ type Changefeed struct {
nodeIDMu sync.Mutex
nodeID node.ID

configBytes []byte
// it's saved to the backend db
lastSavedCheckpointTs *atomic.Uint64
logCoordinatorResolvedTs *atomic.Uint64
Expand All @@ -57,21 +55,20 @@ func NewChangefeed(cfID common.ChangeFeedID,
checkpointTs uint64,
isNew bool,
) *Changefeed {
if info == nil {
log.Panic("changefeed info is nil", zap.Stringer("changefeedID", cfID))
}
if info.Config == nil {
log.Panic("changefeed config is nil", zap.Stringer("changefeedID", cfID))
}
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Panic("unable to parse sink-uri",
zap.String("url", info.SinkURI), zap.Error(err))
}
bytes, err := json.Marshal(info)
if err != nil {
log.Panic("unable to marshal changefeed config",
zap.Error(err))
}

res := &Changefeed{
ID: cfID,
info: atomic.NewPointer(info),
configBytes: bytes,
lastSavedCheckpointTs: atomic.NewUint64(checkpointTs),
logCoordinatorResolvedTs: atomic.NewUint64(checkpointTs),
sinkType: getSinkType(uri.Scheme),
Expand Down Expand Up @@ -100,7 +97,7 @@ func NewChangefeed(cfID common.ChangeFeedID,

// GetInfo returns the latest ChangeFeedInfo stored in memory.
//
// It may return nil if the changefeed hasn't been fully initialized.
// Changefeed keeps info non-nil after construction.
func (c *Changefeed) GetInfo() *config.ChangeFeedInfo {
if c == nil || c.info == nil {
return nil
Expand All @@ -109,15 +106,13 @@ func (c *Changefeed) GetInfo() *config.ChangeFeedInfo {
}

// SetInfo updates the in-memory ChangeFeedInfo for the changefeed.
//
// It lazily initializes the internal pointer for uninitialized changefeeds
// (primarily used by unit tests).
//
// If the receiver is nil, it does nothing.
func (c *Changefeed) SetInfo(info *config.ChangeFeedInfo) {
if c == nil {
return
}
if info == nil {
log.Panic("changefeed info is nil", zap.Stringer("changefeedID", c.ID))
}
if c.info == nil {
c.info = atomic.NewPointer(info)
return
Expand Down Expand Up @@ -243,10 +238,12 @@ func (c *Changefeed) GetStatusForResume() *heartbeatpb.MaintainerStatus {
}

clone := &heartbeatpb.MaintainerStatus{
CheckpointTs: status.CheckpointTs,
FeedState: status.FeedState,
State: status.State,
// we don't clone the errors from status, because the old error is meaningless for the resume action, but only blocks.
CheckpointTs: status.CheckpointTs,
FeedState: status.FeedState,
State: status.State,
MaintainerEpoch: status.MaintainerEpoch,
// Resume creates a new maintainer owner, so errors reported by the
// previous owner must not block the resumed in-memory status.
Err: []*heartbeatpb.RunningError{},
}

Expand All @@ -272,21 +269,27 @@ func (c *Changefeed) GetLastSavedCheckPointTs() uint64 {
}

func (c *Changefeed) NewAddMaintainerMessage(server node.ID) *messaging.TargetMessage {
info := c.GetInfo()
configData, err := info.MarshalWithTruncation(false)
if err != nil {
Comment thread
3AceShowHand marked this conversation as resolved.
log.Panic("unable to marshal changefeed config", zap.Error(err))
}
checkpointTs := c.GetLastSavedCheckPointTs()
if status := c.GetStatus(); status != nil {
checkpointTs = status.CheckpointTs
}
return messaging.NewSingleTargetMessage(server,
messaging.MaintainerManagerTopic,
&heartbeatpb.AddMaintainerRequest{
Id: c.ID.ToPB(),
CheckpointTs: c.GetStatus().CheckpointTs,
Config: c.configBytes,
CheckpointTs: checkpointTs,
Config: []byte(configData),
IsNewChangefeed: c.isNew,
KeyspaceId: c.GetKeyspaceID(),
KeyspaceId: info.KeyspaceID,
MaintainerEpoch: info.Epoch,
})
Comment on lines 281 to 290

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

c.GetStatus() can return nil (as explicitly handled in GetStatusForResume). Calling c.GetStatus().CheckpointTs directly without a nil check can cause a nil pointer dereference panic. We should safely fall back to c.GetLastSavedCheckPointTs() if the status is nil.

	checkpointTs := c.GetLastSavedCheckPointTs()
	if status := c.GetStatus(); status != nil {
		checkpointTs = status.CheckpointTs
	}
	return messaging.NewSingleTargetMessage(server,
		messaging.MaintainerManagerTopic,
		&heartbeatpb.AddMaintainerRequest{
			Id:              c.ID.ToPB(),
			CheckpointTs:    checkpointTs,
			Config:          []byte(configData),
			IsNewChangefeed: c.isNew,
			KeyspaceId:      info.KeyspaceID,
			MaintainerEpoch: info.Epoch,
		})

}

func (c *Changefeed) NewRemoveMaintainerMessage(server node.ID, casCade, removed bool) *messaging.TargetMessage {
return RemoveMaintainerMessage(c.GetKeyspaceID(), c.ID, server, casCade, removed)
}

func (c *Changefeed) NewCheckpointTsMessage(ts uint64) *messaging.TargetMessage {
return messaging.NewSingleTargetMessage(c.GetNodeID(),
messaging.MaintainerManagerTopic,
Expand All @@ -296,15 +299,25 @@ func (c *Changefeed) NewCheckpointTsMessage(ts uint64) *messaging.TargetMessage
})
}

func RemoveMaintainerMessage(keyspaceID uint32, id common.ChangeFeedID, server node.ID, casCade bool, removed bool) *messaging.TargetMessage {
// RemoveMaintainerMessage builds the fenced remove request sent to a maintainer owner.
// The maintainer epoch identifies the owner generation that is allowed to stop.
func RemoveMaintainerMessage(
keyspaceID uint32,
id common.ChangeFeedID,
server node.ID,
casCade bool,
removed bool,
maintainerEpoch uint64,
) *messaging.TargetMessage {
casCade = casCade || removed
return messaging.NewSingleTargetMessage(server,
messaging.MaintainerManagerTopic,
&heartbeatpb.RemoveMaintainerRequest{
Id: id.ToPB(),
Cascade: casCade,
Removed: removed,
KeyspaceId: keyspaceID,
Id: id.ToPB(),
Cascade: casCade,
Removed: removed,
KeyspaceId: keyspaceID,
MaintainerEpoch: maintainerEpoch,
})
}

Expand Down
21 changes: 19 additions & 2 deletions coordinator/changefeed/changefeed_db_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ import (
"github.com/pingcap/ticdc/pkg/config"
)

// EpochBumpOptions carries metadata persisted together with a changefeed epoch bump.
type EpochBumpOptions struct {
CheckpointTs uint64
Progress config.Progress
// UpdateStatus controls whether CheckpointTs and Progress overwrite the
// persisted status read by the bump transaction.
UpdateStatus bool
State *config.FeedState
Error *config.RunningError
// UpdateError controls whether Error overwrites the persisted runtime error.
UpdateError bool
}

// Backend is the metastore for the changefeed
type Backend interface {
// GetAllChangefeeds returns all changefeeds from the backend db, include stopped and failed changefeeds
Expand All @@ -30,14 +43,18 @@ type Backend interface {
CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error
// UpdateChangefeed updates changefeed info to db
UpdateChangefeed(ctx context.Context, info *config.ChangeFeedInfo, checkpointTs uint64, progress config.Progress) error
// ResumeChangefeed persists the resumed status with a new owner epoch.
ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, candidateEpoch uint64, checkpointTs uint64) (*config.ChangeFeedInfo, error)
// BumpChangefeedEpoch is the low-level ownership boundary used before a
// coordinator path can create a new maintainer owner. It only reads and
// updates stored status when UpdateStatus is set.
BumpChangefeedEpoch(ctx context.Context, id common.ChangeFeedID, candidateEpoch uint64, options EpochBumpOptions) (*config.ChangeFeedInfo, error)
// PauseChangefeed persists the pause status to db for a changefeed
PauseChangefeed(ctx context.Context, id common.ChangeFeedID) error
// DeleteChangefeed removes all related info of a changefeed from db
DeleteChangefeed(ctx context.Context, id common.ChangeFeedID) error
// SetChangefeedProgress persists the operation progress status to db for a changefeed
SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error
// ResumeChangefeed persists the resumed status to db for a changefeed and returns the resumed info.
ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) (*config.ChangeFeedInfo, error)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this method ?

// UpdateChangefeedCheckpointTs persists the checkpointTs for changefeeds
UpdateChangefeedCheckpointTs(ctx context.Context, checkpointTs map[common.ChangeFeedID]uint64) error
}
Expand Down
55 changes: 36 additions & 19 deletions coordinator/changefeed/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package changefeed

import (
"encoding/json"
"testing"

"github.com/pingcap/ticdc/heartbeatpb"
Expand Down Expand Up @@ -42,6 +43,21 @@ func TestNewChangefeed(t *testing.T) {
require.True(t, cf.NeedCheckpointTsMessage())
}

func TestNewChangefeedRejectsInvalidInfo(t *testing.T) {
t.Parallel()

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
require.Panics(t, func() {
NewChangefeed(cfID, nil, 100, true)
})
require.Panics(t, func() {
NewChangefeed(cfID, &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
}, 100, true)
})
}

func TestChangefeed_GetSetInfo(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
Expand All @@ -58,6 +74,9 @@ func TestChangefeed_GetSetInfo(t *testing.T) {
}
cf.SetInfo(newInfo)
require.Equal(t, newInfo, cf.GetInfo())
require.Panics(t, func() {
cf.SetInfo(nil)
})
}

func TestChangefeed_GetSetNodeID(t *testing.T) {
Expand Down Expand Up @@ -227,28 +246,22 @@ func TestChangefeed_NewAddMaintainerMessage(t *testing.T) {
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Config: config.GetDefaultReplicaConfig(),
Epoch: 7,
}
info.KeyspaceID = 123
cf := NewChangefeed(cfID, info, 100, true)

server := node.ID("server-1")
msg := cf.NewAddMaintainerMessage(server)
require.Equal(t, server, msg.To)
require.Equal(t, messaging.MaintainerManagerTopic, msg.Topic)
}

func TestChangefeed_NewRemoveMaintainerMessage(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Config: config.GetDefaultReplicaConfig(),
}
cf := NewChangefeed(cfID, info, 100, true)

server := node.ID("server-1")
msg := cf.NewRemoveMaintainerMessage(server, true, true)
require.Equal(t, server, msg.To)
require.Equal(t, messaging.MaintainerManagerTopic, msg.Topic)
req := msg.Message[0].(*heartbeatpb.AddMaintainerRequest)
require.Equal(t, info.KeyspaceID, req.KeyspaceId)
require.Equal(t, info.Epoch, req.MaintainerEpoch)
configInfo := &config.ChangeFeedInfo{}
require.NoError(t, json.Unmarshal(req.Config, configInfo))
require.Equal(t, info.Epoch, configInfo.Epoch)
require.Equal(t, info.SinkURI, configInfo.SinkURI)
}

func TestChangefeed_NewCheckpointTsMessage(t *testing.T) {
Expand All @@ -269,9 +282,11 @@ func TestChangefeed_NewCheckpointTsMessage(t *testing.T) {
func TestRemoveMaintainerMessage(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
server := node.ID("server-1")
msg := RemoveMaintainerMessage(common.DefaultKeyspaceID, cfID, server, true, true)
msg := RemoveMaintainerMessage(common.DefaultKeyspaceID, cfID, server, true, true, 10)
require.Equal(t, server, msg.To)
require.Equal(t, messaging.MaintainerManagerTopic, msg.Topic)
req := msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest)
require.Equal(t, uint64(10), req.MaintainerEpoch)
}

func TestChangefeedGetStatusForResume(t *testing.T) {
Expand All @@ -283,9 +298,10 @@ func TestChangefeedGetStatusForResume(t *testing.T) {
Name: "test-changefeed",
Keyspace: "test-keyspace",
},
CheckpointTs: 789,
FeedState: "normal",
State: heartbeatpb.ComponentState_Working,
CheckpointTs: 789,
FeedState: "normal",
State: heartbeatpb.ComponentState_Working,
MaintainerEpoch: 42,
Err: []*heartbeatpb.RunningError{
{
Time: "2024-01-01 00:00:00",
Expand All @@ -312,6 +328,7 @@ func TestChangefeedGetStatusForResume(t *testing.T) {
require.Equal(t, originalStatus.CheckpointTs, clonedStatus.CheckpointTs)
require.Equal(t, originalStatus.FeedState, clonedStatus.FeedState)
require.Equal(t, originalStatus.State, clonedStatus.State)
require.Equal(t, originalStatus.MaintainerEpoch, clonedStatus.MaintainerEpoch)

require.Equal(t, 0, len(clonedStatus.Err))
}
Expand Down
Loading
Loading