Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 60 additions & 96 deletions cmd/storage-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
"github.com/pingcap/ticdc/cmd/util"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/pkg/cloudstorage"
commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/canal"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/pkg/sink/codec/csv"
Expand All @@ -44,10 +44,9 @@ import (
)

const (
defaultChangefeedName = "storage-consumer"
defaultLogInterval = 5 * time.Second
fakePartitionNumForSchemaFile = -1
metadataFileName = "metadata"
defaultChangefeedName = "storage-consumer"
defaultLogInterval = 5 * time.Second
metadataFileName = "metadata"
)

type (
Expand Down Expand Up @@ -77,8 +76,8 @@ type consumer struct {
// tableDDLWatermark maintains a map of <`schema`.`table`, max executed DDL table version>.
// DML files with smaller table versions are considered stale replays and should be ignored.
tableDDLWatermark map[string]uint64
// tableDefMap maintains a map of <`schema`.`table`, tableDef slice sorted by TableVersion>
tableDefMap map[string]map[uint64]*cloudstorage.TableDefinition
// schemaFileMap maintains a map of <`schema`.`table`, schema files by TableVersion>
schemaFileMap map[string]map[uint64]*cloudstorage.SchemaFile
tableIDGenerator *fakeTableIDGenerator
errCh chan error

Expand Down Expand Up @@ -164,7 +163,7 @@ func newConsumer(ctx context.Context) (*consumer, error) {
tableDMLIdxMap: make(map[cloudstorage.DmlPathKey]fileIndexKeyMap),
eventsGroup: make(map[int64]*util.EventsGroup),
tableDDLWatermark: make(map[string]uint64),
tableDefMap: make(map[string]map[uint64]*cloudstorage.TableDefinition),
schemaFileMap: make(map[string]map[uint64]*cloudstorage.SchemaFile),
tableIDGenerator: &fakeTableIDGenerator{
tableIDs: make(map[string]int64),
},
Expand Down Expand Up @@ -311,7 +310,7 @@ func (c *consumer) appendRow2Group(dml *event.DMLEvent, enableTableAcrossNodes b
func (c *consumer) appendDMLEvents(
ctx context.Context,
tableID int64,
tableDetail cloudstorage.TableDefinition,
schemaFile cloudstorage.SchemaFile,
pathKey cloudstorage.DmlPathKey,
fileIdx *cloudstorage.FileIndex,
) error {
Expand All @@ -323,7 +322,7 @@ func (c *consumer) appendDMLEvents(
}
var decoder common.Decoder

tableInfo, err := tableDetail.ToTableInfo()
tableInfo, err := schemaFile.ToTableInfo()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -424,7 +423,7 @@ func (c *consumer) flushDMLEvents(ctx context.Context, tableID int64) error {

func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
var dmlkey cloudstorage.DmlPathKey
dispatcherID, err := dmlkey.ParseIndexFilePath(
_, err := dmlkey.ParseIndexFilePath(
putil.GetOrZero(c.replicationCfg.Sink.DateSeparator),
path,
)
Expand All @@ -443,32 +442,27 @@ func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
return errors.Trace(err)
}
fileName := strings.TrimSuffix(string(data), "\n")
fileIdx, err := cloudstorage.FetchIndexFromFileName(fileName, c.fileExtension)
fileIndex, err := cloudstorage.ParseFileIndexFromFileName(fileName, c.fileExtension)
if err != nil {
return err
}
fileIndex := &cloudstorage.FileIndex{
FileIndexKey: cloudstorage.FileIndexKey{
DispatcherID: dispatcherID,
EnableTableAcrossNodes: dispatcherID != "",
},
Idx: fileIdx,
}

m, ok := c.tableDMLIdxMap[dmlkey]
if !ok {
c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{
fileIndex.FileIndexKey: fileIndex.Idx,
}
} else if fileIndex.Idx >= m[fileIndex.FileIndexKey] {
return nil
}
if fileIndex.Idx >= m[fileIndex.FileIndexKey] {
c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx
}
return nil
}

func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error {
var schemaKey cloudstorage.SchemaPathKey
checksumInFile, err := schemaKey.ParseSchemaFilePath(path)
_, err := schemaKey.ParseSchemaFilePath(path)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -480,40 +474,26 @@ func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error {
return nil
}
key := schemaKey.GetKey()
if tableDefs, ok := c.tableDefMap[key]; ok {
if _, ok := tableDefs[schemaKey.TableVersion]; ok {
// Skip if tableDef already exists.
if schemaFiles, ok := c.schemaFileMap[key]; ok {
if _, ok := schemaFiles[schemaKey.TableVersion]; ok {
// Skip if schema file already exists.
return nil
}
} else {
c.tableDefMap[key] = make(map[uint64]*cloudstorage.TableDefinition)
c.schemaFileMap[key] = make(map[uint64]*cloudstorage.SchemaFile)
}

// Read tableDef from schema file and check checksum.
var tableDef cloudstorage.TableDefinition
schemaContent, err := c.externalStorage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
err = json.Unmarshal(schemaContent, &tableDef)
if err != nil {
return errors.Trace(err)
}
checksumInMem, err := tableDef.Sum32(nil)
// Read schema file and check checksum.
_, schemaFile, err := cloudstorage.Parse(ctx, c.externalStorage, path)
if err != nil {
if errors.ErrStorageSinkInvalidFileName.Equal(err) {
log.Panic("checksum mismatch", zap.Error(err), zap.String("path", path))
}
return errors.Trace(err)
}
if checksumInMem != checksumInFile || schemaKey.TableVersion != tableDef.TableVersion {
log.Panic("checksum mismatch",
zap.Uint32("checksumInMem", checksumInMem),
zap.Uint32("checksumInFile", checksumInFile),
zap.Uint64("tableversionInMem", schemaKey.TableVersion),
zap.Uint64("tableversionInFile", tableDef.TableVersion),
zap.String("path", path))
}

// Update tableDefMap.
c.tableDefMap[key][tableDef.TableVersion] = &tableDef
// Update schemaFileMap.
c.schemaFileMap[key][schemaFile.TableVersion] = &schemaFile

// Fake a dml key for schema.json file, which is useful for putting DDL
// in front of the DML files when sorting.
Expand All @@ -530,46 +510,42 @@ func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error {
//
// the DDL event recorded in schema.json should be executed first, then the DML events
// in csv files can be executed.
dmlkey := cloudstorage.DmlPathKey{
SchemaPathKey: schemaKey,
PartitionNum: fakePartitionNumForSchemaFile,
Date: "",
}
dmlkey := cloudstorage.NewSchemaFileDMLPathKey(schemaKey)
if _, ok := c.tableDMLIdxMap[dmlkey]; !ok {
c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{}
} else {
// duplicate table schema file found, this should not happen.
// duplicate schema file found, this should not happen.
log.Panic("duplicate schema file found",
zap.String("path", path), zap.Any("tableDef", tableDef),
zap.String("path", path), zap.Any("schemaFile", schemaFile),
zap.Any("schemaKey", schemaKey), zap.Any("dmlkey", dmlkey))
}
return nil
}

func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage.TableDefinition {
var tableDef *cloudstorage.TableDefinition
if tableDefs, ok := c.tableDefMap[key.GetKey()]; ok {
tableDef = tableDefs[key.TableVersion]
func (c *consumer) mustGetSchemaFile(key cloudstorage.SchemaPathKey) cloudstorage.SchemaFile {
var schemaFile *cloudstorage.SchemaFile
if schemaFiles, ok := c.schemaFileMap[key.GetKey()]; ok {
schemaFile = schemaFiles[key.TableVersion]
}
if tableDef == nil {
log.Panic("tableDef not found", zap.Any("key", key), zap.Any("tableDefMap", c.tableDefMap))
if schemaFile == nil {
log.Panic("schema file not found", zap.Any("key", key), zap.Any("schemaFileMap", c.schemaFileMap))
}
return *tableDef
return *schemaFile
}

func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, bool) {
if tableDef.Type != byte(timodel.ActionRenameTable) {
func getRenameTableOldTableKey(schemaFile cloudstorage.SchemaFile) (string, bool) {
if schemaFile.Type != byte(timodel.ActionRenameTable) {
return "", false
}
schemaName := tableDef.Schema
stmt, err := parser.New().ParseOneStmt(tableDef.Query, "", "")
schemaName := schemaFile.Schema
stmt, err := parser.New().ParseOneStmt(schemaFile.Query, "", "")
if err != nil {
log.Panic("parse statement failed", zap.Any("DDL", tableDef.Query), zap.Error(err))
log.Panic("parse statement failed", zap.Any("DDL", schemaFile.Query), zap.Error(err))
}
// The query in job maybe "RENAME TABLE table1 to table2"
renameStmt, ok := stmt.(*ast.RenameTableStmt)
if !ok || len(renameStmt.TableToTables) == 0 {
log.Panic("invalid rename table statement", zap.Any("DDL", tableDef.Query))
log.Panic("invalid rename table statement", zap.Any("DDL", schemaFile.Query))
}
oldTable := renameStmt.TableToTables[0].OldTable
if oldTable.Schema.O != "" {
Expand All @@ -579,14 +555,14 @@ func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, b
return commonType.QuoteSchema(schemaName, tableName), true
}

func (c *consumer) updateTableDDLWatermark(tableDef cloudstorage.TableDefinition) string {
key := commonType.QuoteSchema(tableDef.Schema, tableDef.Table)
if c.tableDDLWatermark[key] < tableDef.TableVersion {
c.tableDDLWatermark[key] = tableDef.TableVersion
func (c *consumer) updateTableDDLWatermark(schemaFile cloudstorage.SchemaFile) string {
key := commonType.QuoteSchema(schemaFile.Schema, schemaFile.Table)
if c.tableDDLWatermark[key] < schemaFile.TableVersion {
c.tableDDLWatermark[key] = schemaFile.TableVersion
}
if oldTableKey, ok := getRenameTableOldTableKey(tableDef); ok {
if c.tableDDLWatermark[oldTableKey] < tableDef.TableVersion {
c.tableDDLWatermark[oldTableKey] = tableDef.TableVersion
if oldTableKey, ok := getRenameTableOldTableKey(schemaFile); ok {
if c.tableDDLWatermark[oldTableKey] < schemaFile.TableVersion {
c.tableDDLWatermark[oldTableKey] = schemaFile.TableVersion
}
}
return key
Expand All @@ -606,23 +582,11 @@ func (c *consumer) handleNewFiles(
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
if keys[i].TableVersion != keys[j].TableVersion {
return keys[i].TableVersion < keys[j].TableVersion
}
if keys[i].PartitionNum != keys[j].PartitionNum {
return keys[i].PartitionNum < keys[j].PartitionNum
}
if keys[i].Date != keys[j].Date {
return keys[i].Date < keys[j].Date
}
if keys[i].Schema != keys[j].Schema {
return keys[i].Schema < keys[j].Schema
}
return keys[i].Table < keys[j].Table
return cloudstorage.CompareDMLPathKey(keys[i], keys[j]) < 0
})

for order, key := range keys {
tableDef := c.mustGetTableDef(key.SchemaPathKey)
schemaFile := c.mustGetSchemaFile(key.SchemaPathKey)
Comment thread
3AceShowHand marked this conversation as resolved.
tableKey := key.GetKey()
ddlWatermark := c.tableDDLWatermark[tableKey]
log.Info("storage consumer handle file key",
Expand All @@ -637,12 +601,12 @@ func (c *consumer) handleNewFiles(

// if the key is a fake dml path key which is mainly used for
// sorting schema.json file before the dml files, then execute the ddl query.
if key.PartitionNum == fakePartitionNumForSchemaFile && len(key.Date) == 0 && len(tableDef.Query) > 0 {
if key.IsSchemaFileDMLPathKey() && len(schemaFile.Query) > 0 {
if key.TableVersion <= ddlWatermark {
log.Warn("DDL event replayed with stale table version, ignore it",
zap.String("schema", key.Schema), zap.String("table", key.Table),
zap.Uint64("tableVersion", key.TableVersion), zap.Uint64("ddlWatermark", ddlWatermark),
zap.String("query", tableDef.Query))
zap.String("query", schemaFile.Query))
continue
}

Expand All @@ -655,26 +619,26 @@ func (c *consumer) handleNewFiles(
zap.String("table", key.Table),
zap.Uint64("tableVersion", key.TableVersion),
zap.Uint64("ddlWatermark", ddlWatermark),
zap.String("query", tableDef.Query))
zap.String("query", schemaFile.Query))

ddlEvent, err := tableDef.ToDDLEvent()
ddlEvent, err := schemaFile.ToDDLEvent()
if err != nil {
return err
}
if err := c.sink.WriteBlockEvent(ddlEvent); err != nil {
return errors.Trace(err)
}
watermarkKey := c.updateTableDDLWatermark(tableDef)
// TODO: need to cleanup tableDefMap in the future.
watermarkKey := c.updateTableDDLWatermark(schemaFile)
// TODO: need to cleanup schemaFileMap in the future.
log.Info("execute ddl event successfully",
zap.String("query", tableDef.Query),
zap.String("query", schemaFile.Query),
zap.String("schema", key.Schema), zap.String("table", key.Table),
zap.Uint64("ddlWatermark", c.tableDDLWatermark[tableKey]),
zap.String("watermarkKey", watermarkKey))
continue
}

// The table schema has already moved to a newer DDL version on downstream.
// The downstream table has already moved to a newer DDL version.
// DML files produced with an older table version should be ignored.
if key.TableVersion < ddlWatermark {
log.Warn("DML files replayed with stale table version, ignore them",
Expand Down Expand Up @@ -708,7 +672,7 @@ func (c *consumer) handleNewFiles(
zap.Bool("enableTableAcrossNodes", indexKey.EnableTableAcrossNodes),
zap.Uint64("fileIndex", i),
zap.String("path", filePath))
if err := c.appendDMLEvents(ctx, tableID, tableDef, key, fileIndex); err != nil {
if err := c.appendDMLEvents(ctx, tableID, schemaFile, key, fileIndex); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/cloudstorage/buffer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

"github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool"
"github.com/pingcap/ticdc/downstreamadapter/sink/metrics"
"github.com/pingcap/ticdc/pkg/cloudstorage"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/cloudstorage/buffer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"time"

"github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool"
"github.com/pingcap/ticdc/pkg/cloudstorage"
commonType "github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/stretchr/testify/require"
)
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/cloudstorage/dml_writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool"
sinkmetrics "github.com/pingcap/ticdc/downstreamadapter/sink/metrics"
"github.com/pingcap/ticdc/pkg/cloudstorage"
commonType "github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/cloudstorage/encoder_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"time"

"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/pkg/cloudstorage"
commonType "github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down
Loading
Loading