Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {

verifyTableGroup := v2.Group("/verify_table")
verifyTableGroup.POST("", api.VerifyTable)
getAllTablesGroup := v2.Group("/get_all_tables")
getAllTablesGroup.POST("", api.GetAllTables)

// processor apis
// Note: They are not useful in new arch cdc,
Expand Down
164 changes: 137 additions & 27 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
return
}

ineligibleTables, _, err := getVerifiedTables(ctx, replicaCfg, kvStorage, cfg.StartTs, scheme, topic, protocol)
ineligibleTables, eligibleTables, allTables, err := getVerifiedTables(ctx, replicaCfg, kvStorage, cfg.StartTs, scheme, topic, protocol)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -284,7 +284,11 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
log.Info("Create changefeed successfully!",
zap.String("id", info.ChangefeedID.Name()),
zap.String("state", string(info.State)),
zap.String("changefeedInfo", info.String()))
zap.String("changefeedInfo", info.String()),
zap.Int("eligibleTablesLength", len(eligibleTables)),
zap.Int("ineligibleTablesLength", len(ineligibleTables)),
zap.Int("allTablesLength", len(allTables)),
)

c.JSON(getStatus(c), CfInfoToAPIModel(
info,
Expand Down Expand Up @@ -352,7 +356,46 @@ func (h *OpenAPIV2) ListChangeFeeds(c *gin.Context) {
c.JSON(http.StatusOK, toListResponse(c, commonInfos))
}

// VerifyTable verify table, return ineligibleTables and EligibleTables.
// GetAllTables return ineligibleTables and EligibleTables.
func (h *OpenAPIV2) GetAllTables(c *gin.Context) {
ctx := c.Request.Context()
cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}

if err := c.BindJSON(&cfg); err != nil {
_ = c.Error(errors.WrapError(errors.ErrAPIInvalidParam, err))
return
}

// fill replicaConfig
replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig()

keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceName := GetKeyspaceValueWithDefault(c)
kvStorage, err := keyspaceManager.GetStorage(ctx, keyspaceName)
if err != nil {
_ = c.Error(err)
return
}

f, err := filter.NewFilter(replicaCfg.Filter, "", util.GetOrZero(replicaCfg.CaseSensitive), util.GetOrZero(replicaCfg.ForceReplicate))
if err != nil {
_ = c.Error(err)
return
}
_, ineligibleTables, eligibleTables, allTables, err := schemastore.
VerifyTables(f, kvStorage, cfg.StartTs)
if err != nil {
_ = c.Error(err)
return
}
Comment thread
wk989898 marked this conversation as resolved.
tables := &Tables{
IneligibleTables: toAPIModelFunc(ineligibleTables),
EligibleTables: toAPIModelFunc(eligibleTables),
AllTables: toAPIModelFunc(allTables),
}
c.JSON(http.StatusOK, tables)
}

func (h *OpenAPIV2) VerifyTable(c *gin.Context) {
ctx := c.Request.Context()
cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
Expand Down Expand Up @@ -395,7 +438,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) {
_ = c.Error(err)
return
}
ineligibleTables, eligibleTables, err := getVerifiedTables(ctx, replicaCfg, kvStorage, cfg.StartTs, scheme, topic, protocol)
ineligibleTables, eligibleTables, allTables, err := getVerifiedTables(ctx, replicaCfg, kvStorage, cfg.StartTs, scheme, topic, protocol)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -405,18 +448,10 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) {
zap.Bool("ignoreIneligibleTable", util.GetOrZero(cfg.ReplicaConfig.IgnoreIneligibleTable)),
)

toAPIModelFunc := func(tbls []string) []TableName {
var apiModels []TableName
for _, tbl := range tbls {
apiModels = append(apiModels, TableName{
Table: tbl,
})
}
return apiModels
}
tables := &Tables{
IneligibleTables: toAPIModelFunc(ineligibleTables),
EligibleTables: toAPIModelFunc(eligibleTables),
AllTables: toAPIModelFunc(allTables),
}
c.JSON(http.StatusOK, tables)
}
Expand Down Expand Up @@ -466,6 +501,19 @@ func shouldShowRunningError(state config.FeedState) bool {
}
}

func toAPIModelFunc(tbls []common.TableName) []TableName {
var apiModels []TableName
for _, tbl := range tbls {
apiModels = append(apiModels, TableName{
Schema: tbl.Schema,
Table: tbl.Table,
TableID: tbl.TableID,
IsPartition: tbl.IsPartition,
})
}
return apiModels
}

func CfInfoToAPIModel(
info *config.ChangeFeedInfo,
status *config.ChangeFeedStatus,
Expand Down Expand Up @@ -666,8 +714,10 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {

// If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not.
newCheckpointTs := status.CheckpointTs
overwriteCheckpointTs := false
if cfg.OverwriteCheckpointTs != 0 {
newCheckpointTs = cfg.OverwriteCheckpointTs
overwriteCheckpointTs = true
}

keyspaceMeta := middleware.GetKeyspaceFromContext(c)
Expand Down Expand Up @@ -709,13 +759,57 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {
}
}()

err = co.ResumeChangefeed(ctx, cfInfo.ChangefeedID, newCheckpointTs, cfg.OverwriteCheckpointTs != 0)
var (
eligibleTables []common.TableName
ineligibleTables []common.TableName
allTables []common.TableName
)
if overwriteCheckpointTs {
sinkURIParsed, err := url.Parse(cfInfo.SinkURI)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI))
return
}
scheme := sinkURIParsed.Scheme
topic := ""
if config.IsMQScheme(scheme) {
topic, err = helper.GetTopic(sinkURIParsed)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI))
return
}
}
protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(cfInfo.Config.Sink.Protocol))

keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
kvStorage, err := keyspaceManager.GetStorage(ctx, keyspaceName)
if err != nil {
_ = c.Error(err)
return
}
ineligibleTables, eligibleTables, allTables, err = getVerifiedTables(ctx, cfInfo.Config, kvStorage, newCheckpointTs, scheme, topic, protocol)
if err != nil {
_ = c.Error(err)
return
}
}
Comment on lines +767 to +795
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for getting verified tables is very similar to the logic in CreateChangefeed, UpdateChangefeed, and the new GetAllTables handler. To improve maintainability and reduce code duplication, consider extracting this logic into a shared helper function.


err = co.ResumeChangefeed(ctx, cfInfo.ChangefeedID, newCheckpointTs, overwriteCheckpointTs)
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
return
}
c.Errors = nil
log.Info("Resume changefeed successfully!",
zap.String("id", cfInfo.ChangefeedID.Name()),
zap.String("state", string(cfInfo.State)),
zap.String("changefeedInfo", cfInfo.String()),
zap.Bool("overwriteCheckpointTs", overwriteCheckpointTs),
zap.Int("eligibleTablesLength", len(eligibleTables)),
zap.Int("ineligibleTablesLength", len(ineligibleTables)),
zap.Int("allTablesLength", len(allTables)),
)
c.JSON(getStatus(c), &EmptyResponse{})
}

Expand Down Expand Up @@ -813,6 +907,11 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
return
}

var (
ineligibleTables []common.TableName
eligibleTables []common.TableName
allTables []common.TableName
)
if configUpdated || sinkURIUpdated {
// verify replicaConfig
sinkURIParsed, err := url.Parse(oldCfInfo.SinkURI)
Expand Down Expand Up @@ -846,7 +945,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
}

// use checkpointTs get snapshot from kv storage
ineligibleTables, _, err := getVerifiedTables(ctx, oldCfInfo.Config, kvStorage, status.CheckpointTs, scheme, topic, protocol)
ineligibleTables, eligibleTables, allTables, err = getVerifiedTables(ctx, oldCfInfo.Config, kvStorage, status.CheckpointTs, scheme, topic, protocol)
if err != nil {
_ = c.Error(errors.ErrChangefeedUpdateRefused.GenWithStackByCause(err))
return
Expand All @@ -871,6 +970,17 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
return
}

log.Info("Update changefeed successfully!",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add cost time in this log.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LogMiddleware will print the cost time

zap.String("id", oldCfInfo.ChangefeedID.Name()),
zap.String("state", string(oldCfInfo.State)),
zap.String("changefeedInfo", oldCfInfo.String()),
zap.Bool("configUpdated", configUpdated),
zap.Bool("sinkURIUpdated", sinkURIUpdated),
zap.Int("eligibleTablesLength", len(eligibleTables)),
zap.Int("ineligibleTablesLength", len(ineligibleTables)),
zap.Int("allTables", len(allTables)),
)

c.JSON(getStatus(c), CfInfoToAPIModel(oldCfInfo, status, nil))
}

Expand Down Expand Up @@ -1493,51 +1603,51 @@ func getVerifiedTables(
replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64,
scheme string, topic string, protocol config.Protocol,
) ([]string, []string, error) {
) ([]common.TableName, []common.TableName, []common.TableName, error) {
f, err := filter.NewFilter(replicaConfig.Filter, "", util.GetOrZero(replicaConfig.CaseSensitive), util.GetOrZero(replicaConfig.ForceReplicate))
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
tableInfos, ineligibleTables, eligibleTables, err := schemastore.
tableInfos, ineligibleTables, eligibleTables, allTables, err := schemastore.
VerifyTables(f, storage, startTs)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
log.Info("verifyTables completed",
zap.Int("tableCount", len(tableInfos)),
zap.Uint64("startTs", startTs))

err = f.Verify(tableInfos)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if !config.IsMQScheme(scheme) {
return ineligibleTables, eligibleTables, nil
return ineligibleTables, eligibleTables, allTables, nil
}

eventRouter, err := eventrouter.NewEventRouter(replicaConfig.Sink, topic, config.IsPulsarScheme(scheme), protocol == config.ProtocolAvro)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
err = eventRouter.VerifyTables(tableInfos)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

selectors, err := columnselector.New(replicaConfig.Sink)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
err = selectors.VerifyTables(tableInfos, eventRouter)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

if ctx.Err() != nil {
return nil, nil, errors.Trace(ctx.Err())
return nil, nil, nil, errors.Trace(ctx.Err())
}

return ineligibleTables, eligibleTables, nil
return ineligibleTables, eligibleTables, allTables, nil
}

func GetKeyspaceValueWithDefault(c *gin.Context) string {
Expand Down
3 changes: 2 additions & 1 deletion api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ type Tso struct {
LogicTime int64 `json:"logic_time"`
}

// Tables contains IneligibleTables and EligibleTables
// Tables contains IneligibleTables, EligibleTables and AllTables
type Tables struct {
IneligibleTables []TableName `json:"ineligible_tables,omitempty"`
EligibleTables []TableName `json:"eligible_tables,omitempty"`
AllTables []TableName `json:"all_tables,omitempty"`
}

// TableName contains table information
Expand Down
9 changes: 8 additions & 1 deletion cmd/cdc/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type createChangefeedOptions struct {
disableGCSafePointCheck bool
startTs uint64
timezone string
verbose bool

cfg *config.ReplicaConfig
}
Expand All @@ -126,6 +127,7 @@ func (o *createChangefeedOptions) addFlags(cmd *cobra.Command) {
cmd.PersistentFlags().BoolVarP(&o.disableGCSafePointCheck, "disable-gc-check", "", false, "Disable GC safe point check")
cmd.PersistentFlags().Uint64Var(&o.startTs, "start-ts", 0, "Start ts of changefeed")
cmd.PersistentFlags().StringVar(&o.timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)")
cmd.PersistentFlags().BoolVarP(&o.verbose, "verbose", "v", false, "Print verbose information when creating a changefeed. Caution: This will list all tables to be replicated by the changefeed. If the number of tables is extremely large, it may flood your screen.")
// we don't support specify these flags below when cdc version >= 6.2.0
_ = cmd.PersistentFlags().MarkHidden("tz")
}
Expand Down Expand Up @@ -337,7 +339,12 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e
if err != nil {
return err
}
cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\n", info.ID, infoStr)
cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\nAllTablesCount: %d\n", info.ID, infoStr, len(tables.IneligibleTables), len(tables.EligibleTables), len(tables.AllTables))
if o.verbose {
cmd.Printf("EligibleTables: %s\n", formatTableNames(tables.EligibleTables))
cmd.Printf("IneligibleTables: %s\n", formatTableNames(tables.IneligibleTables))
cmd.Printf("AllTables: %s\n", formatTableNames(tables.AllTables))
}
return nil
}

Expand Down
Loading