diff --git a/cmd/storage-consumer/consumer.go b/cmd/storage-consumer/consumer.go index 8e54ae3532..ee45aadc98 100644 --- a/cmd/storage-consumer/consumer.go +++ b/cmd/storage-consumer/consumer.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "sort" + "strconv" "strings" "time" @@ -25,11 +26,11 @@ import ( "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/downstreamadapter/sink" "github.com/pingcap/ticdc/downstreamadapter/sink/helper" + "github.com/pingcap/ticdc/pkg/cloudstorage" commonType "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/canal" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/pingcap/ticdc/pkg/sink/codec/csv" @@ -44,10 +45,9 @@ import ( ) const ( - defaultChangefeedName = "storage-consumer" - defaultLogInterval = 5 * time.Second - fakePartitionNumForSchemaFile = -1 - metadataFileName = "metadata" + defaultChangefeedName = "storage-consumer" + defaultLogInterval = 5 * time.Second + metadataFileName = "metadata" ) type ( @@ -72,13 +72,13 @@ type consumer struct { fileExtension string sink sink.Sink // tableDMLIdxMap maintains a map of - tableDMLIdxMap map[cloudstorage.DmlPathKey]fileIndexKeyMap + tableDMLIdxMap map[cloudstorage.DMLPathKey]fileIndexKeyMap eventsGroup map[int64]*util.EventsGroup // tableDDLWatermark maintains a map of <`schema`.`table`, max executed DDL table version>. // DML files with smaller table versions are considered stale replays and should be ignored. tableDDLWatermark map[string]uint64 - // tableDefMap maintains a map of <`schema`.`table`, tableDef slice sorted by TableVersion> - tableDefMap map[string]map[uint64]*cloudstorage.TableDefinition + // schemaFileMap maintains a map of <`schema`.`table`, schema files by TableVersion> + schemaFileMap map[string]map[uint64]*cloudstorage.SchemaFile tableIDGenerator *fakeTableIDGenerator errCh chan error @@ -161,10 +161,10 @@ func newConsumer(ctx context.Context) (*consumer, error) { fileExtension: extension, sink: sink, errCh: errCh, - tableDMLIdxMap: make(map[cloudstorage.DmlPathKey]fileIndexKeyMap), + tableDMLIdxMap: make(map[cloudstorage.DMLPathKey]fileIndexKeyMap), eventsGroup: make(map[int64]*util.EventsGroup), tableDDLWatermark: make(map[string]uint64), - tableDefMap: make(map[string]map[uint64]*cloudstorage.TableDefinition), + schemaFileMap: make(map[string]map[uint64]*cloudstorage.SchemaFile), tableIDGenerator: &fakeTableIDGenerator{ tableIDs: make(map[string]int64), }, @@ -173,9 +173,9 @@ func newConsumer(ctx context.Context) (*consumer, error) { // map1 - map2 func diffDMLMaps( - map1, map2 map[cloudstorage.DmlPathKey]fileIndexKeyMap, -) map[cloudstorage.DmlPathKey]fileIndexRange { - resMap := make(map[cloudstorage.DmlPathKey]fileIndexRange) // DmlPathKey -> FileIndexKey -> indexRange + map1, map2 map[cloudstorage.DMLPathKey]fileIndexKeyMap, +) map[cloudstorage.DMLPathKey]fileIndexRange { + resMap := make(map[cloudstorage.DMLPathKey]fileIndexRange) // DmlPathKey -> FileIndexKey -> indexRange for dmlPathKey1, fileIndexKeyMap1 := range map1 { dmlPathKey2, ok := map2[dmlPathKey1] if !ok { @@ -231,11 +231,11 @@ func (c *consumer) getGlobalCheckpointTs(ctx context.Context) error { // getNewFiles returns newly created dml files in specific ranges that are visible under checkpointTs. func (c *consumer) getNewFiles( ctx context.Context, -) (map[cloudstorage.DmlPathKey]fileIndexRange, error) { - tableDMLMap := make(map[cloudstorage.DmlPathKey]fileIndexRange) +) (map[cloudstorage.DMLPathKey]fileIndexRange, error) { + tableDMLMap := make(map[cloudstorage.DMLPathKey]fileIndexRange) opt := &storeapi.WalkOption{SubDir: ""} - origDMLIdxMap := make(map[cloudstorage.DmlPathKey]fileIndexKeyMap, len(c.tableDMLIdxMap)) + origDMLIdxMap := make(map[cloudstorage.DMLPathKey]fileIndexKeyMap, len(c.tableDMLIdxMap)) for k, v := range c.tableDMLIdxMap { m := make(fileIndexKeyMap) for fileIndexKey, val := range v { @@ -311,8 +311,8 @@ func (c *consumer) appendRow2Group(dml *event.DMLEvent, enableTableAcrossNodes b func (c *consumer) appendDMLEvents( ctx context.Context, tableID int64, - tableDetail cloudstorage.TableDefinition, - pathKey cloudstorage.DmlPathKey, + schemaFile cloudstorage.SchemaFile, + pathKey cloudstorage.DMLPathKey, fileIdx *cloudstorage.FileIndex, ) error { filePath := pathKey.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth) @@ -322,15 +322,9 @@ func (c *consumer) appendDMLEvents( return errors.Trace(err) } var decoder common.Decoder - - tableInfo, err := tableDetail.ToTableInfo() - if err != nil { - return errors.Trace(err) - } - switch c.codecCfg.Protocol { case config.ProtocolCsv: - decoder, err = csv.NewDecoder(ctx, c.codecCfg, tableInfo, content) + decoder, err = csv.NewDecoder(ctx, c.codecCfg, schemaFile.TableInfo(), content) if err != nil { return errors.Trace(err) } @@ -423,14 +417,11 @@ func (c *consumer) flushDMLEvents(ctx context.Context, tableID int64) error { } func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error { - var dmlkey cloudstorage.DmlPathKey - dispatcherID, err := dmlkey.ParseIndexFilePath( + var dmlkey cloudstorage.DMLPathKey + dmlkey.ParseIndexFilePath( putil.GetOrZero(c.replicationCfg.Sink.DateSeparator), path, ) - if err != nil { - return errors.Trace(err) - } if c.globalCheckpointTs > 0 && dmlkey.TableVersion > c.globalCheckpointTs { log.Debug("skip dml index file by checkpoint", zap.String("path", path), @@ -443,16 +434,10 @@ func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error { return errors.Trace(err) } fileName := strings.TrimSuffix(string(data), "\n") - fileIdx, err := cloudstorage.FetchIndexFromFileName(fileName, c.fileExtension) + fileIndex, err := cloudstorage.ParseFileIndexFromFileName(fileName, c.fileExtension) if err != nil { - return err - } - fileIndex := &cloudstorage.FileIndex{ - FileIndexKey: cloudstorage.FileIndexKey{ - DispatcherID: dispatcherID, - EnableTableAcrossNodes: dispatcherID != "", - }, - Idx: fileIdx, + log.Panic("parse file index from file name failed", + zap.String("fileName", fileName), zap.Error(err)) } m, ok := c.tableDMLIdxMap[dmlkey] @@ -460,7 +445,9 @@ func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error { c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{ fileIndex.FileIndexKey: fileIndex.Idx, } - } else if fileIndex.Idx >= m[fileIndex.FileIndexKey] { + return nil + } + if fileIndex.Idx >= m[fileIndex.FileIndexKey] { c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx } return nil @@ -468,10 +455,7 @@ func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error { func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error { var schemaKey cloudstorage.SchemaPathKey - checksumInFile, err := schemaKey.ParseSchemaFilePath(path) - if err != nil { - return errors.Trace(err) - } + schemaKey.Parse(path) if c.globalCheckpointTs > 0 && schemaKey.TableVersion > c.globalCheckpointTs { log.Debug("skip schema file by checkpoint", zap.String("path", path), @@ -480,40 +464,41 @@ func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error { return nil } key := schemaKey.GetKey() - if tableDefs, ok := c.tableDefMap[key]; ok { - if _, ok := tableDefs[schemaKey.TableVersion]; ok { - // Skip if tableDef already exists. + if schemaFiles, ok := c.schemaFileMap[key]; ok { + if _, ok := schemaFiles[schemaKey.TableVersion]; ok { + // Skip if schema file already exists. return nil } } else { - c.tableDefMap[key] = make(map[uint64]*cloudstorage.TableDefinition) + c.schemaFileMap[key] = make(map[uint64]*cloudstorage.SchemaFile) } - // Read tableDef from schema file and check checksum. - var tableDef cloudstorage.TableDefinition - schemaContent, err := c.externalStorage.ReadFile(ctx, path) + // Read schema file. + data, err := c.externalStorage.ReadFile(ctx, path) if err != nil { return errors.Trace(err) } - err = json.Unmarshal(schemaContent, &tableDef) - if err != nil { - return errors.Trace(err) + var schemaFile cloudstorage.SchemaFile + if err := json.Unmarshal(data, &schemaFile); err != nil { + log.Panic("unmarshal schema file failed, this should not happen", + zap.ByteString("content", data), zap.Error(err)) } - checksumInMem, err := tableDef.Sum32(nil) + schemaFileName := path[strings.LastIndex(path, "/")+1:] + checksumText := strings.TrimSuffix(schemaFileName[strings.LastIndex(schemaFileName, "_")+1:], ".json") + checksum, err := strconv.ParseUint(checksumText, 10, 32) if err != nil { - return errors.Trace(err) + log.Panic("parse schema file checksum failed, this should not happen", + zap.String("path", path), zap.Error(err)) } - if checksumInMem != checksumInFile || schemaKey.TableVersion != tableDef.TableVersion { - log.Panic("checksum mismatch", - zap.Uint32("checksumInMem", checksumInMem), - zap.Uint32("checksumInFile", checksumInFile), - zap.Uint64("tableversionInMem", schemaKey.TableVersion), - zap.Uint64("tableversionInFile", tableDef.TableVersion), - zap.String("path", path)) + checksumInMem := schemaFile.Checksum() + if checksumInMem != uint32(checksum) { + log.Panic("checksum mismatch in the schema file", + zap.String("path", path), + zap.Uint32("checksum", uint32(checksum)), + zap.Uint32("checksumInMem", checksumInMem)) } - - // Update tableDefMap. - c.tableDefMap[key][tableDef.TableVersion] = &tableDef + // Update schemaFileMap. + c.schemaFileMap[key][schemaFile.TableVersion] = &schemaFile // Fake a dml key for schema.json file, which is useful for putting DDL // in front of the DML files when sorting. @@ -530,46 +515,42 @@ func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error { // // the DDL event recorded in schema.json should be executed first, then the DML events // in csv files can be executed. - dmlkey := cloudstorage.DmlPathKey{ - SchemaPathKey: schemaKey, - PartitionNum: fakePartitionNumForSchemaFile, - Date: "", - } + dmlkey := cloudstorage.NewSchemaFileDMLPathKey(schemaKey) if _, ok := c.tableDMLIdxMap[dmlkey]; !ok { c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{} } else { - // duplicate table schema file found, this should not happen. + // duplicate schema file found, this should not happen. log.Panic("duplicate schema file found", - zap.String("path", path), zap.Any("tableDef", tableDef), + zap.String("path", path), zap.Any("schemaFile", schemaFile), zap.Any("schemaKey", schemaKey), zap.Any("dmlkey", dmlkey)) } return nil } -func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage.TableDefinition { - var tableDef *cloudstorage.TableDefinition - if tableDefs, ok := c.tableDefMap[key.GetKey()]; ok { - tableDef = tableDefs[key.TableVersion] +func (c *consumer) mustGetSchemaFile(key cloudstorage.SchemaPathKey) cloudstorage.SchemaFile { + var schemaFile *cloudstorage.SchemaFile + if schemaFiles, ok := c.schemaFileMap[key.GetKey()]; ok { + schemaFile = schemaFiles[key.TableVersion] } - if tableDef == nil { - log.Panic("tableDef not found", zap.Any("key", key), zap.Any("tableDefMap", c.tableDefMap)) + if schemaFile == nil { + log.Panic("schema file not found", zap.Any("key", key), zap.Any("schemaFileMap", c.schemaFileMap)) } - return *tableDef + return *schemaFile } -func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, bool) { - if tableDef.Type != byte(timodel.ActionRenameTable) { +func getRenameTableOldTableKey(schemaFile cloudstorage.SchemaFile) (string, bool) { + if schemaFile.Type != byte(timodel.ActionRenameTable) { return "", false } - schemaName := tableDef.Schema - stmt, err := parser.New().ParseOneStmt(tableDef.Query, "", "") + schemaName := schemaFile.Schema + stmt, err := parser.New().ParseOneStmt(schemaFile.Query, "", "") if err != nil { - log.Panic("parse statement failed", zap.Any("DDL", tableDef.Query), zap.Error(err)) + log.Panic("parse statement failed", zap.Any("DDL", schemaFile.Query), zap.Error(err)) } // The query in job maybe "RENAME TABLE table1 to table2" renameStmt, ok := stmt.(*ast.RenameTableStmt) if !ok || len(renameStmt.TableToTables) == 0 { - log.Panic("invalid rename table statement", zap.Any("DDL", tableDef.Query)) + log.Panic("invalid rename table statement", zap.Any("DDL", schemaFile.Query)) } oldTable := renameStmt.TableToTables[0].OldTable if oldTable.Schema.O != "" { @@ -579,14 +560,14 @@ func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, b return commonType.QuoteSchema(schemaName, tableName), true } -func (c *consumer) updateTableDDLWatermark(tableDef cloudstorage.TableDefinition) string { - key := commonType.QuoteSchema(tableDef.Schema, tableDef.Table) - if c.tableDDLWatermark[key] < tableDef.TableVersion { - c.tableDDLWatermark[key] = tableDef.TableVersion +func (c *consumer) updateTableDDLWatermark(schemaFile cloudstorage.SchemaFile) string { + key := commonType.QuoteSchema(schemaFile.Schema, schemaFile.Table) + if c.tableDDLWatermark[key] < schemaFile.TableVersion { + c.tableDDLWatermark[key] = schemaFile.TableVersion } - if oldTableKey, ok := getRenameTableOldTableKey(tableDef); ok { - if c.tableDDLWatermark[oldTableKey] < tableDef.TableVersion { - c.tableDDLWatermark[oldTableKey] = tableDef.TableVersion + if oldTableKey, ok := getRenameTableOldTableKey(schemaFile); ok { + if c.tableDDLWatermark[oldTableKey] < schemaFile.TableVersion { + c.tableDDLWatermark[oldTableKey] = schemaFile.TableVersion } } return key @@ -594,35 +575,23 @@ func (c *consumer) updateTableDDLWatermark(tableDef cloudstorage.TableDefinition func (c *consumer) handleNewFiles( ctx context.Context, - dmlFileMap map[cloudstorage.DmlPathKey]fileIndexRange, + dmlFileMap map[cloudstorage.DMLPathKey]fileIndexRange, round uint64, ) error { if len(dmlFileMap) == 0 { log.Info("no new dml files found since last round", zap.Uint64("round", round)) return nil } - keys := make([]cloudstorage.DmlPathKey, 0, len(dmlFileMap)) + keys := make([]cloudstorage.DMLPathKey, 0, len(dmlFileMap)) for k := range dmlFileMap { keys = append(keys, k) } sort.Slice(keys, func(i, j int) bool { - if keys[i].TableVersion != keys[j].TableVersion { - return keys[i].TableVersion < keys[j].TableVersion - } - if keys[i].PartitionNum != keys[j].PartitionNum { - return keys[i].PartitionNum < keys[j].PartitionNum - } - if keys[i].Date != keys[j].Date { - return keys[i].Date < keys[j].Date - } - if keys[i].Schema != keys[j].Schema { - return keys[i].Schema < keys[j].Schema - } - return keys[i].Table < keys[j].Table + return cloudstorage.CompareDMLPathKey(keys[i], keys[j]) < 0 }) for order, key := range keys { - tableDef := c.mustGetTableDef(key.SchemaPathKey) + schemaFile := c.mustGetSchemaFile(key.SchemaPathKey) tableKey := key.GetKey() ddlWatermark := c.tableDDLWatermark[tableKey] log.Info("storage consumer handle file key", @@ -637,12 +606,12 @@ func (c *consumer) handleNewFiles( // if the key is a fake dml path key which is mainly used for // sorting schema.json file before the dml files, then execute the ddl query. - if key.PartitionNum == fakePartitionNumForSchemaFile && len(key.Date) == 0 && len(tableDef.Query) > 0 { + if key.IsSchemaFileDMLPathKey() && len(schemaFile.Query) > 0 { if key.TableVersion <= ddlWatermark { log.Warn("DDL event replayed with stale table version, ignore it", zap.String("schema", key.Schema), zap.String("table", key.Table), zap.Uint64("tableVersion", key.TableVersion), zap.Uint64("ddlWatermark", ddlWatermark), - zap.String("query", tableDef.Query)) + zap.String("query", schemaFile.Query)) continue } @@ -655,26 +624,23 @@ func (c *consumer) handleNewFiles( zap.String("table", key.Table), zap.Uint64("tableVersion", key.TableVersion), zap.Uint64("ddlWatermark", ddlWatermark), - zap.String("query", tableDef.Query)) + zap.String("query", schemaFile.Query)) - ddlEvent, err := tableDef.ToDDLEvent() - if err != nil { - return err - } + ddlEvent := schemaFile.DDLEvent() if err := c.sink.WriteBlockEvent(ddlEvent); err != nil { return errors.Trace(err) } - watermarkKey := c.updateTableDDLWatermark(tableDef) - // TODO: need to cleanup tableDefMap in the future. + watermarkKey := c.updateTableDDLWatermark(schemaFile) + // TODO: need to cleanup schemaFileMap in the future. log.Info("execute ddl event successfully", - zap.String("query", tableDef.Query), + zap.String("query", schemaFile.Query), zap.String("schema", key.Schema), zap.String("table", key.Table), zap.Uint64("ddlWatermark", c.tableDDLWatermark[tableKey]), zap.String("watermarkKey", watermarkKey)) continue } - // The table schema has already moved to a newer DDL version on downstream. + // The downstream table has already moved to a newer DDL version. // DML files produced with an older table version should be ignored. if key.TableVersion < ddlWatermark { log.Warn("DML files replayed with stale table version, ignore them", @@ -708,7 +674,7 @@ func (c *consumer) handleNewFiles( zap.Bool("enableTableAcrossNodes", indexKey.EnableTableAcrossNodes), zap.Uint64("fileIndex", i), zap.String("path", filePath)) - if err := c.appendDMLEvents(ctx, tableID, tableDef, key, fileIndex); err != nil { + if err := c.appendDMLEvents(ctx, tableID, schemaFile, key, fileIndex); err != nil { return err } } diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 70256977f4..30329e5bd9 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -57,9 +57,15 @@ func init() { config.DefaultFileIndexWidth, "file index width") flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling") flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer") +} + +func main() { + var consumer *consumer + var err error + flag.Parse() - err := logger.InitLogger(&logger.Config{ + err = logger.InitLogger(&logger.Config{ Level: logLevel, File: logFile, }) @@ -79,11 +85,6 @@ func init() { log.Error("invalid storage scheme, the scheme of upstream-uri must be file/s3/azblob/gcs") os.Exit(1) } -} - -func main() { - var consumer *consumer - var err error if enableProfiling { go func() { diff --git a/coordinator/changefeed/etcd_backend_test.go b/coordinator/changefeed/etcd_backend_test.go index a316fc80e9..e8e1f1fac6 100644 --- a/coordinator/changefeed/etcd_backend_test.go +++ b/coordinator/changefeed/etcd_backend_test.go @@ -428,7 +428,7 @@ func TestDeleteChangefeed(t *testing.T) { changefeedID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) - etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), NewFuncMatcher(func(i interface{}) bool { + etcdClient.EXPECT().Txn(gomock.Any(), gomock.Any(), NewFuncMatcher(func(i any) bool { ops := i.([]clientv3.Op) require.Len(t, ops, 2) require.True(t, ops[0].IsDelete()) diff --git a/downstreamadapter/sink/cloudstorage/buffer_manager.go b/downstreamadapter/sink/cloudstorage/buffer_manager.go index 6c028bc8d7..6ad0451167 100644 --- a/downstreamadapter/sink/cloudstorage/buffer_manager.go +++ b/downstreamadapter/sink/cloudstorage/buffer_manager.go @@ -20,9 +20,9 @@ import ( "github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool" "github.com/pingcap/ticdc/downstreamadapter/sink/metrics" + "github.com/pingcap/ticdc/pkg/cloudstorage" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" ) const ( diff --git a/downstreamadapter/sink/cloudstorage/buffer_manager_test.go b/downstreamadapter/sink/cloudstorage/buffer_manager_test.go index c77678e304..48518eadd1 100644 --- a/downstreamadapter/sink/cloudstorage/buffer_manager_test.go +++ b/downstreamadapter/sink/cloudstorage/buffer_manager_test.go @@ -19,10 +19,10 @@ import ( "time" "github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool" + "github.com/pingcap/ticdc/pkg/cloudstorage" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/stretchr/testify/require" ) diff --git a/downstreamadapter/sink/cloudstorage/dml_writers.go b/downstreamadapter/sink/cloudstorage/dml_writers.go index fc94265ffa..37f54fd258 100644 --- a/downstreamadapter/sink/cloudstorage/dml_writers.go +++ b/downstreamadapter/sink/cloudstorage/dml_writers.go @@ -19,10 +19,10 @@ import ( "github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool" sinkmetrics "github.com/pingcap/ticdc/downstreamadapter/sink/metrics" + "github.com/pingcap/ticdc/pkg/cloudstorage" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/metrics" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/pkg/objstore/storeapi" diff --git a/downstreamadapter/sink/cloudstorage/encoder_group_test.go b/downstreamadapter/sink/cloudstorage/encoder_group_test.go index acde673e5a..269702ec04 100644 --- a/downstreamadapter/sink/cloudstorage/encoder_group_test.go +++ b/downstreamadapter/sink/cloudstorage/encoder_group_test.go @@ -20,10 +20,10 @@ import ( "time" "github.com/pingcap/ticdc/downstreamadapter/sink/helper" + "github.com/pingcap/ticdc/pkg/cloudstorage" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" diff --git a/downstreamadapter/sink/cloudstorage/sink.go b/downstreamadapter/sink/cloudstorage/sink.go index 8a64f45396..7b53755b98 100644 --- a/downstreamadapter/sink/cloudstorage/sink.go +++ b/downstreamadapter/sink/cloudstorage/sink.go @@ -22,12 +22,12 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/sink/helper" + "github.com/pingcap/ticdc/pkg/cloudstorage" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/metrics" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/objstore/storeapi" @@ -63,9 +63,8 @@ type sink struct { lastCheckpointTs atomic.Uint64 lastSendCheckpointTsTime time.Time - tableSchemaStore *commonEvent.TableSchemaStore - cron *cron.Cron - statistics *metrics.Statistics + cron *cron.Cron + statistics *metrics.Statistics isNormal *atomic.Bool cleanupJobs []func() /* only for test */ @@ -244,37 +243,31 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error { } sourceTableInfo := event.MultipleTableInfos[1] - var def cloudstorage.TableDefinition - def.FromTableInfo( + schemaEvent := *event + schemaEvent.TableInfo = event.TableInfo.CloneWithRouting( event.GetTargetExtraSchemaName(), event.GetTargetExtraTableName(), - event.TableInfo, - event.FinishedTs, - s.cfg.OutputColumnID, ) - def.Query = event.Query - def.Type = event.Type - if err := s.writeFile(event, def); err != nil { + var schemaFile cloudstorage.SchemaFile + schemaFile.Build(&schemaEvent, s.cfg.OutputColumnID) + if err := s.writeFile(event, schemaFile); err != nil { return err } - var sourceTableDef cloudstorage.TableDefinition - sourceTableDef.FromTableInfo( + sourceEvent := *event + sourceEvent.TableInfo = sourceTableInfo.CloneWithRouting( event.GetTargetSchemaName(), event.GetTargetTableName(), - sourceTableInfo, - event.FinishedTs, - s.cfg.OutputColumnID, ) - sourceEvent := *event - sourceEvent.TableInfo = sourceTableInfo - if err := s.writeFile(&sourceEvent, sourceTableDef); err != nil { + var sourceSchemaFile cloudstorage.SchemaFile + sourceSchemaFile.Build(&sourceEvent, s.cfg.OutputColumnID) + if err := s.writeFile(&sourceEvent, sourceSchemaFile); err != nil { return err } } else { for _, e := range event.GetEvents() { - var def cloudstorage.TableDefinition - def.FromDDLEvent(e, s.cfg.OutputColumnID) - if err := s.writeFile(e, def); err != nil { + var schemaFile cloudstorage.SchemaFile + schemaFile.Build(e, s.cfg.OutputColumnID) + if err := s.writeFile(e, schemaFile); err != nil { return err } } @@ -291,22 +284,15 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error { return nil } -func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error { +func (s *sink) writeFile(v *commonEvent.DDLEvent, schemaFile cloudstorage.SchemaFile) error { // skip write database-level event for 'use-table-id-as-path' mode - if s.cfg.UseTableIDAsPath && def.Table == "" { + if s.cfg.UseTableIDAsPath && schemaFile.Table == "" { return nil } - encodedDef, err := def.MarshalWithQuery() - if err != nil { - return err - } - - path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, v.GetTableID()) - if err != nil { - return err - } + encodedSchemaFile := schemaFile.Marshal() + path := schemaFile.Path(s.cfg.UseTableIDAsPath, v.GetTableID()) return s.statistics.RecordDDLExecution(func() (string, error) { - err = s.storage.WriteFile(s.ctx, path, encodedDef) + err := s.storage.WriteFile(s.ctx, path, encodedSchemaFile) if err != nil { return "", err } @@ -382,8 +368,7 @@ func (s *sink) sendCheckpointTs(ctx context.Context) error { } } -func (s *sink) SetTableSchemaStore(tableSchemaStore *commonEvent.TableSchemaStore) { - s.tableSchemaStore = tableSchemaStore +func (s *sink) SetTableSchemaStore(_ *commonEvent.TableSchemaStore) { } func (s *sink) initCron( diff --git a/downstreamadapter/sink/cloudstorage/sink_test.go b/downstreamadapter/sink/cloudstorage/sink_test.go index 69169c0ac9..1edcd30fb9 100644 --- a/downstreamadapter/sink/cloudstorage/sink_test.go +++ b/downstreamadapter/sink/cloudstorage/sink_test.go @@ -26,11 +26,11 @@ import ( "testing" "time" + "github.com/pingcap/ticdc/pkg/cloudstorage" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/pdutil" - pkgcloudstorage "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/util" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" @@ -219,7 +219,7 @@ func TestIgnoreCallsAfterRunError(t *testing.T) { func TestCloudStorageSinkBatchConfig(t *testing.T) { sink := &sink{ - cfg: &pkgcloudstorage.Config{ + cfg: &cloudstorage.Config{ FileSize: 2048, }, } @@ -273,7 +273,7 @@ func TestWriteDDLEvent(t *testing.T) { err = cloudStorageSink.WriteBlockEvent(ddlEvent) require.NoError(t, err) - tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) + schemaContent, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) require.NoError(t, err) require.JSONEq(t, `{ "Table": "table1", @@ -295,7 +295,7 @@ func TestWriteDDLEvent(t *testing.T) { } ], "TableColumnsTotal": 2 - }`, string(tableSchema)) + }`, string(schemaContent)) t.Run("flush dml before write ddl", verifyWriteDDLEventFlushDMLBeforeBlock) } @@ -408,9 +408,9 @@ func TestWriteDDLEventWithTableIDAsPath(t *testing.T) { require.NoError(t, err) tableDir := path.Join(parentDir, "20/meta/") - tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) + schemaContent, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) require.NoError(t, err) - require.Contains(t, string(tableSchema), `"Table": "table1"`) + require.Contains(t, string(schemaContent), `"Table": "table1"`) } func TestSkipDatabaseSchemaWithTableIDAsPath(t *testing.T) { @@ -514,7 +514,7 @@ func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) { } } -func readSchemaDefinitionForTest(t *testing.T, parentDir, schema, table string) pkgcloudstorage.TableDefinition { +func readSchemaFileForTest(t *testing.T, parentDir, schema, table string) cloudstorage.SchemaFile { t.Helper() files, err := os.ReadDir(filepath.Join(parentDir, schema, table, "meta")) @@ -524,9 +524,9 @@ func readSchemaDefinitionForTest(t *testing.T, parentDir, schema, table string) content, err := os.ReadFile(filepath.Join(parentDir, schema, table, "meta", files[0].Name())) require.NoError(t, err) - var def pkgcloudstorage.TableDefinition - require.NoError(t, json.Unmarshal(content, &def)) - return def + var schemaFile cloudstorage.SchemaFile + require.NoError(t, json.Unmarshal(content, &schemaFile)) + return schemaFile } func TestWriteExchangePartitionDDLEventUsesTargetNames(t *testing.T) { @@ -605,15 +605,15 @@ func TestWriteExchangePartitionDDLEventUsesTargetNames(t *testing.T) { err = cloudStorageSink.WriteBlockEvent(routedEvent) require.NoError(t, err) - exchangeDef := readSchemaDefinitionForTest(t, parentDir, "target_db", "exchange_table_routed") - require.Equal(t, "target_db", exchangeDef.Schema) - require.Equal(t, "exchange_table_routed", exchangeDef.Table) - require.Equal(t, "partition_value", exchangeDef.Columns[1].Name) + exchangeSchemaFile := readSchemaFileForTest(t, parentDir, "target_db", "exchange_table_routed") + require.Equal(t, "target_db", exchangeSchemaFile.Schema) + require.Equal(t, "exchange_table_routed", exchangeSchemaFile.Table) + require.Equal(t, "partition_value", exchangeSchemaFile.Columns[1].Name) - partitionedDef := readSchemaDefinitionForTest(t, parentDir, "target_db", "partitioned_routed") - require.Equal(t, "target_db", partitionedDef.Schema) - require.Equal(t, "partitioned_routed", partitionedDef.Table) - require.Equal(t, "exchange_value", partitionedDef.Columns[1].Name) + partitionedSchemaFile := readSchemaFileForTest(t, parentDir, "target_db", "partitioned_routed") + require.Equal(t, "target_db", partitionedSchemaFile.Schema) + require.Equal(t, "partitioned_routed", partitionedSchemaFile.Table) + require.Equal(t, "exchange_value", partitionedSchemaFile.Columns[1].Name) _, err = os.Stat(filepath.Join(parentDir, "source_db")) require.ErrorIs(t, err, os.ErrNotExist) @@ -710,7 +710,7 @@ func TestCleanupExpiredFiles(t *testing.T) { cloudStorageSink := &sink{ changefeedID: common.NewChangefeedID4Test("test", "test"), - cfg: &pkgcloudstorage.Config{ + cfg: &cloudstorage.Config{ DateSeparator: config.DateSeparatorDay.String(), FileExpirationDays: 1, FileCleanupCronSpec: util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec), diff --git a/downstreamadapter/sink/cloudstorage/task.go b/downstreamadapter/sink/cloudstorage/task.go index d832547d4a..3b9ff1a082 100644 --- a/downstreamadapter/sink/cloudstorage/task.go +++ b/downstreamadapter/sink/cloudstorage/task.go @@ -17,10 +17,10 @@ import ( "context" "sync/atomic" + "github.com/pingcap/ticdc/pkg/cloudstorage" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" ) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 0ac5083d07..da7da3d87c 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -22,10 +22,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool" "github.com/pingcap/ticdc/downstreamadapter/sink/metrics" + "github.com/pingcap/ticdc/pkg/cloudstorage" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/errors" pmetrics "github.com/pingcap/ticdc/pkg/metrics" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/tidb/pkg/objstore/storeapi" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -179,15 +179,7 @@ func (d *writer) flushMessages(ctx context.Context) error { zap.Error(err)) return err } - indexFilePath, err := d.filePathGenerator.GenerateIndexFilePath(table, date) - if err != nil { - log.Error("failed to generate index file path", - zap.String("keyspace", keyspace), - zap.String("changefeed", changefeed), - zap.Int("shardID", d.shardID), - zap.Error(err)) - return err - } + indexFilePath := d.filePathGenerator.GenerateIndexFilePath(table, date) payload, err := buildPayload(d.spool, tableTask) if err != nil { diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index 64000931da..250c7b00a6 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -27,12 +27,12 @@ import ( "time" "github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool" + "github.com/pingcap/ticdc/pkg/cloudstorage" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/pdutil" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/meta/model" diff --git a/go.mod b/go.mod index 18d4f3b7c2..372f022fa3 100644 --- a/go.mod +++ b/go.mod @@ -394,7 +394,7 @@ require ( ) replace ( - github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20260508 + github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20260622.1 // Downgrade grpc to v1.63.2, as well as other related modules. github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82 diff --git a/go.sum b/go.sum index 470d2e6320..a3e32540d5 100644 --- a/go.sum +++ b/go.sum @@ -759,8 +759,8 @@ github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGa github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d h1:5JCgncG9X7tOsqKqbIXpV2VG4mu/hv3RvvZewqFj0U4= github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d/go.mod h1:HMNxmg0/lrn3SPGJ6LTZqP0WwEpcXMu9s/4TWJbzT8w= -github.com/pingcap/sarama v1.41.2-pingcap-20260508 h1:3ZFtYLUGMMZeA6U0iz3EyFnNGPHu3qOuPLj5wXxHmeU= -github.com/pingcap/sarama v1.41.2-pingcap-20260508/go.mod h1:PIL6ZKKKhm19IbQpmpJcFnybAi1yXtgLAitDAeBdNCw= +github.com/pingcap/sarama v1.41.2-pingcap-20260622.1 h1:TrtpL+fs51pUc21CZ8mzIotOlJchN0G9rwfA9VtcyTw= +github.com/pingcap/sarama v1.41.2-pingcap-20260622.1/go.mod h1:PIL6ZKKKhm19IbQpmpJcFnybAi1yXtgLAitDAeBdNCw= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530= github.com/pingcap/tidb v1.1.0-beta.0.20260604031706-f9faeaf4828f h1:Z+Ez33+LxWbKwM88th19M/v81zwFTjbsKFv1qXQk134= diff --git a/pkg/sink/cloudstorage/config.go b/pkg/cloudstorage/config.go similarity index 100% rename from pkg/sink/cloudstorage/config.go rename to pkg/cloudstorage/config.go diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/cloudstorage/config_test.go similarity index 90% rename from pkg/sink/cloudstorage/config_test.go rename to pkg/cloudstorage/config_test.go index 7fb96a85e2..d19926515d 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/cloudstorage/config_test.go @@ -14,7 +14,6 @@ package cloudstorage import ( - "context" "net/url" "testing" "time" @@ -44,7 +43,7 @@ func TestConfigApply(t *testing.T) { err = replicaConfig.ValidateAndAdjust(sinkURI) require.NoError(t, err) cfg := NewConfig() - err = cfg.Apply(context.TODO(), sinkURI, replicaConfig.Sink, false) + err = cfg.Apply(t.Context(), sinkURI, replicaConfig.Sink, false) require.NoError(t, err) require.Equal(t, expected, cfg) } @@ -126,7 +125,7 @@ func TestVerifySinkURIParams(t *testing.T) { sinkURI, err := url.Parse(tc.uri) require.NoError(t, err) cfg := NewConfig() - err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig().Sink, true) + err = cfg.Apply(t.Context(), sinkURI, config.GetDefaultReplicaConfig().Sink, true) if tc.expectedErr == "" { require.NoError(t, err) require.LessOrEqual(t, cfg.WorkerCount, maxWorkerCount) @@ -150,7 +149,7 @@ func TestMergeConfig(t *testing.T) { OutputColumnID: aws.Bool(false), } c := NewConfig() - err = c.Apply(context.TODO(), sinkURI, replicaConfig.Sink, true) + err = c.Apply(t.Context(), sinkURI, replicaConfig.Sink, true) require.NoError(t, err) require.Equal(t, 12, c.WorkerCount) require.Equal(t, 1485760, c.FileSize) @@ -167,7 +166,7 @@ func TestMergeConfig(t *testing.T) { OutputColumnID: aws.Bool(false), } c = NewConfig() - err = c.Apply(context.TODO(), sinkURI, replicaConfig.Sink, true) + err = c.Apply(t.Context(), sinkURI, replicaConfig.Sink, true) require.NoError(t, err) require.Equal(t, 64, c.WorkerCount) require.Equal(t, 33554432, c.FileSize) @@ -185,7 +184,7 @@ func TestSpoolDiskQuotaConfig(t *testing.T) { } cfg := NewConfig() - err = cfg.Apply(context.Background(), sinkURI, replicaConfig.Sink, true) + err = cfg.Apply(t.Context(), sinkURI, replicaConfig.Sink, true) require.NoError(t, err) require.Equal(t, int64(2147483648), cfg.SpoolDiskQuota) @@ -193,7 +192,7 @@ func TestSpoolDiskQuotaConfig(t *testing.T) { sinkURI, err = url.Parse(uri) require.NoError(t, err) cfg = NewConfig() - err = cfg.Apply(context.Background(), sinkURI, replicaConfig.Sink, true) + err = cfg.Apply(t.Context(), sinkURI, replicaConfig.Sink, true) require.NoError(t, err) require.Equal(t, int64(3221225472), cfg.SpoolDiskQuota) } @@ -212,7 +211,7 @@ func TestSpoolBaseDirConfig(t *testing.T) { } cfg := NewConfig() - err = cfg.Apply(context.Background(), sinkURI, replicaConfig.Sink, true) + err = cfg.Apply(t.Context(), sinkURI, replicaConfig.Sink, true) require.NoError(t, err) require.Equal(t, uriSpoolBaseDir, cfg.SpoolBaseDir) @@ -221,7 +220,7 @@ func TestSpoolBaseDirConfig(t *testing.T) { require.NoError(t, err) cfg = NewConfig() - err = cfg.Apply(context.Background(), sinkURI, replicaConfig.Sink, true) + err = cfg.Apply(t.Context(), sinkURI, replicaConfig.Sink, true) require.NoError(t, err) require.Equal(t, configSpoolBaseDir, cfg.SpoolBaseDir) } @@ -232,7 +231,7 @@ func TestInvalidSpoolBaseDirConfig(t *testing.T) { require.NoError(t, err) cfg := NewConfig() - err = cfg.Apply(context.Background(), sinkURI, config.GetDefaultReplicaConfig().Sink, true) + err = cfg.Apply(t.Context(), sinkURI, config.GetDefaultReplicaConfig().Sink, true) require.Error(t, err) require.True(t, errors.ErrStorageSinkInvalidConfig.Equal(err)) @@ -245,7 +244,7 @@ func TestInvalidSpoolBaseDirConfig(t *testing.T) { } cfg = NewConfig() - err = cfg.Apply(context.Background(), sinkURI, replicaConfig.Sink, true) + err = cfg.Apply(t.Context(), sinkURI, replicaConfig.Sink, true) require.Error(t, err) require.True(t, errors.ErrStorageSinkInvalidConfig.Equal(err)) } diff --git a/pkg/sink/cloudstorage/path.go b/pkg/cloudstorage/generator.go similarity index 67% rename from pkg/sink/cloudstorage/path.go rename to pkg/cloudstorage/generator.go index 6dc7230fa0..e2825aafd9 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/cloudstorage/generator.go @@ -28,9 +28,9 @@ import ( "github.com/pingcap/log" commonType "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/hash" "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/objstore/storeapi" @@ -50,22 +50,24 @@ const ( // The database schema is stored in the following path: // /meta/schema_{tableVersion}_{checksum}.json dbSchemaPrefix = "%s/meta/" - // The table schema is stored in the following path: + // The table-level schema file is stored in the following path: // //meta/schema_{tableVersion}_{checksum}.json - tableSchemaPrefix = "%s/%s/meta/" + tableMetaPrefix = "%s/%s/meta/" // When use-table-id-as-path, schema is omitted: /meta/... tableIDPrefix = "%s/meta/" ) var schemaRE = regexp.MustCompile(`meta/schema_\d+_\d{10}\.json$`) -// IsSchemaFile checks whether the file is a schema file. +// IsSchemaFile reports whether path matches a schema file under a meta +// directory. func IsSchemaFile(path string) bool { return schemaRE.MatchString(path) } -// mustParseSchemaName parses the version from the schema file name. -func mustParseSchemaName(path string) (uint64, uint32) { +// mustParseSchemaFileName returns tableVersion and checksum encoded in a schema +// file name. Invalid names panic. +func mustParseSchemaFileName(path string) (uint64, uint32) { reportErr := func(reason string, fields ...zap.Field) { fields = append([]zap.Field{ zap.String("schemaPath", path), @@ -94,50 +96,34 @@ func mustParseSchemaName(path string) (uint64, uint32) { return tableVersion, uint32(tableChecksum) } +// generateSchemaFilePath returns the schema file path. +// When table is empty, output is /meta/schema__.json. +// When omitSchema is true, output is /meta/schema__.json. +// Otherwise output is /
/meta/schema__.json. func generateSchemaFilePath( schema, table string, tableVersion uint64, checksum uint32, omitSchema bool, -) (string, error) { - if schema == "" || tableVersion == 0 { - return "", errors.ErrInternalCheckFailed.GenWithStack( - "invalid schema or tableVersion, schema=%q table=%q tableVersion=%d", - schema, table, tableVersion, - ) - } - - var dir string +) string { + name := fmt.Sprintf(schemaFileNameFormat, tableVersion, checksum) if omitSchema { - if table == "" { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( - "table cannot be empty when 'use-table-id-as-path' is true", - ) - } - // use-table-id-as-path: omit schema, path is /meta/ - dir = fmt.Sprintf(tableIDPrefix, table) - } else { - if table == "" { - // Generate db schema file path. - dir = fmt.Sprintf(dbSchemaPrefix, schema) - } else { - // Generate table schema file path. - dir = fmt.Sprintf(tableSchemaPrefix, schema, table) - } + return path.Join(fmt.Sprintf(tableIDPrefix, table), name) } - name := fmt.Sprintf(schemaFileNameFormat, tableVersion, checksum) - return path.Join(dir, name), nil + if table == "" { + return path.Join(fmt.Sprintf(dbSchemaPrefix, schema), name) + } + return path.Join(fmt.Sprintf(tableMetaPrefix, schema, table), name) } -func generateTablePath(tableName string, tableID int64, useTableIDAsPath bool) (string, error) { +// generateTablePath returns either the table name or physical table ID path +// segment according to useTableIDAsPath. +func generateTablePath(tableName string, tableID int64, useTableIDAsPath bool) string { if useTableIDAsPath { - if tableID <= 0 { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( - "invalid table id for table-id path", - ) - } - return fmt.Sprintf("%d", tableID), nil + return fmt.Sprintf("%d", tableID) } - return tableName, nil + return tableName } +// generateDataFileName returns CDC or +// CDC__. fileIndexWidth controls zero padding. func generateDataFileName(enableTableAcrossNodes bool, dispatcherID string, index uint64, extension string, fileIndexWidth int) string { indexFmt := "%0" + strconv.Itoa(fileIndexWidth) + "d" if enableTableAcrossNodes { @@ -160,7 +146,7 @@ type VersionedTableName struct { // tables, we need to use the physical table ID instead of the // logical table ID.(Especially when the table is a partitioned table). TableNameWithPhysicTableID commonType.TableName - // TableInfoVersion is the table schema version carried with incoming DML. + // TableInfoVersion is the schema file version carried with incoming DML. // Source: // 1. DDL finishedTs for schema-changing DDLs. // 2. Checkpoint/startTs during dispatcher recover/move. @@ -176,7 +162,8 @@ type VersionedTableName struct { DispatcherID commonType.DispatcherID } -// FilePathGenerator is used to generate data file path and index file path. +// FilePathGenerator generates schema, data, and index paths for one storage +// sink. type FilePathGenerator struct { changefeedID commonType.ChangeFeedID extension string @@ -187,7 +174,6 @@ type FilePathGenerator struct { // VersionedTableName and date bucket. fileIndex map[VersionedTableName]*indexWithDate - hasher *hash.PositionInertia // versionMap maps an input VersionedTableName to the effective table version // used in output directory: // /
//... @@ -196,7 +182,8 @@ type FilePathGenerator struct { versionMap map[VersionedTableName]uint64 } -// NewFilePathGenerator creates a FilePathGenerator. +// NewFilePathGenerator creates a FilePathGenerator for one changefeed storage +// sink. extension is the data file suffix used by GenerateDataFilePath. func NewFilePathGenerator( changefeedID commonType.ChangeFeedID, config *Config, @@ -211,14 +198,14 @@ func NewFilePathGenerator( storage: storage, pdClock: pdClock, fileIndex: make(map[VersionedTableName]*indexWithDate), - hasher: hash.NewPositionInertia(), versionMap: make(map[VersionedTableName]uint64), } } -// CheckOrWriteSchema checks whether the schema file exists in the storage and -// write scheme.json if necessary. -// It returns true if there is a newer schema version in storage than the passed table version. +// CheckOrWriteSchema ensures the schema file for table/tableInfo exists. +// It records the effective table version used by later data/index path +// generation. It returns true when storage already contains a newer schema file +// with the same checksum and a version greater than table.TableInfoVersion. func (f *FilePathGenerator) CheckOrWriteSchema( ctx context.Context, table VersionedTableName, @@ -231,30 +218,18 @@ func (f *FilePathGenerator) CheckOrWriteSchema( keyspace := f.changefeedID.Keyspace() changefeed := f.changefeedID.Name() - var def TableDefinition - def.FromTableInfo( - tableInfo.GetTargetSchemaName(), - tableInfo.GetTargetTableName(), - tableInfo, - table.TableInfoVersion, - f.config.OutputColumnID, - ) - if !def.IsTableSchema() { - // only check schema for table - log.Error("invalid table schema", - zap.String("keyspace", keyspace), - zap.String("changefeedID", changefeed), - zap.Any("versionedTableName", table), - zap.Any("tableInfo", tableInfo)) - return false, errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table schema in FilePathGenerator") + event := &commonEvent.DDLEvent{ + SchemaName: tableInfo.GetTargetSchemaName(), + TableName: tableInfo.GetTargetTableName(), + TableInfo: tableInfo, + FinishedTs: table.TableInfoVersion, } + var schemaFile SchemaFile + schemaFile.Build(event, f.config.OutputColumnID) // Case 1: point check if the schema file exists. - tblSchemaFile, err := def.GenerateSchemaFilePath(f.config.UseTableIDAsPath, table.TableNameWithPhysicTableID.TableID) - if err != nil { - return false, err - } - exist, err := f.storage.FileExists(ctx, tblSchemaFile) + schemaFilePath := schemaFile.Path(f.config.UseTableIDAsPath, table.TableNameWithPhysicTableID.TableID) + exist, err := f.storage.FileExists(ctx, schemaFilePath) if err != nil { return false, err } @@ -263,18 +238,13 @@ func (f *FilePathGenerator) CheckOrWriteSchema( return false, nil } // walk the table meta path to find the last schema file - _, checksum := mustParseSchemaName(tblSchemaFile) - schemaFileCnt := 0 + _, checksum := mustParseSchemaFileName(schemaFilePath) + schemaFileCount := 0 lastVersion := uint64(0) - tablePathPart, err := generateTablePath(def.Table, table.TableNameWithPhysicTableID.TableID, f.config.UseTableIDAsPath) - if err != nil { - return false, err - } - var subDir string + tablePathPart := generateTablePath(schemaFile.Table, table.TableNameWithPhysicTableID.TableID, f.config.UseTableIDAsPath) + subDir := fmt.Sprintf(tableMetaPrefix, schemaFile.Schema, tablePathPart) if f.config.UseTableIDAsPath { subDir = fmt.Sprintf(tableIDPrefix, tablePathPart) - } else { - subDir = fmt.Sprintf(tableSchemaPrefix, def.Schema, tablePathPart) } checksumSuffix := fmt.Sprintf("%010d.json", checksum) hasNewerSchemaVersion := false @@ -282,20 +252,11 @@ func (f *FilePathGenerator) CheckOrWriteSchema( SubDir: subDir, /* use subDir to prevent walk the whole storage */ ObjPrefix: "schema_", }, func(path string, _ int64) error { - schemaFileCnt++ + schemaFileCount++ if !strings.HasSuffix(path, checksumSuffix) { return nil } - version, parsedChecksum := mustParseSchemaName(path) - if parsedChecksum != checksum { - log.Error("invalid schema file name", - zap.String("keyspace", keyspace), - zap.String("changefeedID", changefeed), - zap.String("path", path), zap.Any("checksum", checksum)) - return errors.ErrInternalCheckFailed.GenWithStack( - "invalid schema filename in storage sink, expected checksum: %d, actual checksum: %d", - checksum, parsedChecksum) - } + version, _ := mustParseSchemaFileName(path) if version > table.TableInfoVersion { hasNewerSchemaVersion = true } @@ -312,14 +273,14 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } // Case 2: the table meta path is not empty. - if schemaFileCnt != 0 && lastVersion != 0 { - log.Info("table schema file with exact version not found, using latest available", + if schemaFileCount != 0 && lastVersion != 0 { + log.Info("schema file with exact version not found, using latest available", zap.String("keyspace", keyspace), zap.String("changefeedID", changefeed), zap.Any("versionedTableName", table), zap.Uint64("tableVersion", lastVersion), zap.Uint32("checksum", checksum)) - // record the last version of the table schema file. + // record the last version of the schema file. // we don't need to write schema file to external storage again. f.versionMap[table] = lastVersion return false, nil @@ -328,28 +289,25 @@ func (f *FilePathGenerator) CheckOrWriteSchema( // Case 3: the table meta path is empty, which happens when: // a. the table is existed before changefeed started. We need to write schema file to external storage. // b. the schema file is deleted by the consumer. We write schema file to external storage too. - if schemaFileCnt != 0 && lastVersion == 0 { - log.Warn("no table schema file found in an non-empty meta path", + if schemaFileCount != 0 && lastVersion == 0 { + log.Warn("no schema file found in a non-empty meta path", zap.String("keyspace", keyspace), zap.String("changefeedID", changefeed), zap.Any("versionedTableName", table), zap.Uint32("checksum", checksum)) } - encodedDetail, err := def.MarshalWithQuery() - if err != nil { - return false, err - } + encodedSchemaFile := schemaFile.Marshal() f.versionMap[table] = table.TableInfoVersion - return false, f.storage.WriteFile(ctx, tblSchemaFile, encodedDetail) + return false, f.storage.WriteFile(ctx, schemaFilePath, encodedSchemaFile) } -// SetClock is used for unit test +// SetClock sets the clock used by GenerateDateStr. It is used by tests. func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock) { f.pdClock = pdClock } -// GenerateDateStr generates a date string base on current time -// and the date-separator configuration item. +// GenerateDateStr returns the current date bucket from the date-separator +// config: "2006", "2006-01", "2006-01-02", or "" when disabled. func (f *FilePathGenerator) GenerateDateStr() string { var dateStr string @@ -368,29 +326,29 @@ func (f *FilePathGenerator) GenerateDateStr() string { return dateStr } -// GenerateIndexFilePath generates a canonical path for index file. -func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) (string, error) { - dir, err := f.generateDataDirPath(tbl, date) - if err != nil { - return "", err - } +// GenerateIndexFilePath returns the index file path for tbl and date. +// The directory uses the effective table version recorded by CheckOrWriteSchema. +// Output is /meta/CDC.index or +// /meta/CDC_.index. +func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string { + dir := f.generateDataDirPath(tbl, date) name := defaultIndexFileName if f.config.EnableTableAcrossNodes { name = fmt.Sprintf(defaultTableAcrossNodesIndexFileName, tbl.DispatcherID.String()) } - return path.Join(dir, name), nil + return path.Join(dir, name) } -// GenerateDataFilePath generates a canonical path for data file. +// GenerateDataFilePath returns the next available data file path for tbl and +// date. It updates the in-memory file index and may read storage to avoid +// colliding with existing files. Storage errors are returned. func (f *FilePathGenerator) GenerateDataFilePath( ctx context.Context, tbl VersionedTableName, date string, ) (string, error) { - dir, err := f.generateDataDirPath(tbl, date) - if err != nil { - return "", err - } + dir := f.generateDataDirPath(tbl, date) loadedIndexFile := false - if idx, ok := f.fileIndex[tbl]; !ok { + idx, ok := f.fileIndex[tbl] + if !ok { fileIdx, err := f.getFileIdxFromIndexFile(ctx, tbl, date) if err != nil { return "", err @@ -401,7 +359,8 @@ func (f *FilePathGenerator) GenerateDataFilePath( index: fileIdx, } loadedIndexFile = true - } else { + } + if ok { idx.currDate = date } // if date changed, reset the counter @@ -447,58 +406,54 @@ func (f *FilePathGenerator) GenerateDataFilePath( } } -func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) (string, error) { - var elems []string - - tableVersion, ok := f.versionMap[tbl] - if !ok || tableVersion == 0 { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( - "table schema version is not initialized", - ) - } - +// generateDataDirPath returns the data directory for tbl and date. +// The table version comes from versionMap, which is set by CheckOrWriteSchema. +// Output matches DMLPathKey.generateDMLDataDirPath. +func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) string { + tableVersion := f.versionMap[tbl] if f.config.UseTableIDAsPath { - tablePathPart, err := generateTablePath( + tableIDPathPart := generateTablePath( tbl.TableNameWithPhysicTableID.Table, tbl.TableNameWithPhysicTableID.TableID, true, ) - if err != nil { - return "", err - } - elems = append(elems, tablePathPart) - } else { - elems = append(elems, tbl.TableNameWithPhysicTableID.Schema) - tablePathPart, err := generateTablePath( - tbl.TableNameWithPhysicTableID.Table, - tbl.TableNameWithPhysicTableID.TableID, - false, - ) - if err != nil { - return "", err - } - elems = append(elems, tablePathPart) - } - elems = append(elems, fmt.Sprintf("%d", tableVersion)) - - if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition && !f.config.UseTableIDAsPath { - elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID)) - } - - if len(date) != 0 { - elems = append(elems, date) - } - - return path.Join(elems...), nil + return DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: tableIDPathPart, + TableVersion: tableVersion, + }, + UseTableIDAsPath: true, + TableID: tbl.TableNameWithPhysicTableID.TableID, + Date: date, + }.generateDMLDataDirPath() + } + + tablePathPart := generateTablePath( + tbl.TableNameWithPhysicTableID.Table, + tbl.TableNameWithPhysicTableID.TableID, + false, + ) + var partitionNum int64 + if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition { + partitionNum = tbl.TableNameWithPhysicTableID.TableID + } + return DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: tbl.TableNameWithPhysicTableID.Schema, + Table: tablePathPart, + TableVersion: tableVersion, + }, + PartitionNum: partitionNum, + Date: date, + }.generateDMLDataDirPath() } +// getFileIdxFromIndexFile returns the max file index recorded in the index +// file for tbl/date. Missing index files return 0. func (f *FilePathGenerator) getFileIdxFromIndexFile( ctx context.Context, tbl VersionedTableName, date string, ) (uint64, error) { - indexFile, err := f.GenerateIndexFilePath(tbl, date) - if err != nil { - return 0, err - } + indexFile := f.GenerateIndexFilePath(tbl, date) exist, err := f.storage.FileExists(ctx, indexFile) if err != nil { return 0, err @@ -512,27 +467,48 @@ func (f *FilePathGenerator) getFileIdxFromIndexFile( return 0, err } fileName := strings.TrimSuffix(string(data), "\n") - return FetchIndexFromFileName(fileName, f.extension) + fileIndex, err := ParseFileIndexFromFileName(fileName, f.extension) + if err != nil { + return 0, err + } + return fileIndex.Idx, nil } -func FetchIndexFromFileName(fileName string, extension string) (uint64, error) { +// ParseFileIndexFromFileName returns the dispatcher ID and numeric index +// encoded in a data file name. extension must match the file suffix. +func ParseFileIndexFromFileName(fileName string, extension string) (FileIndex, error) { if len(fileName) < minFileNamePrefixLen+len(extension) || !strings.HasPrefix(fileName, "CDC") || !strings.HasSuffix(fileName, extension) { - return 0, errors.ErrStorageSinkInvalidFileName.GenWithStack("filename in storage sink is invalid: %q", fileName) + return FileIndex{}, errors.ErrStorageSinkInvalidFileName.GenWithStack( + "filename in storage sink is invalid: %q", fileName) } // CDC[_{dispatcherID}_]{num}.fileExtension - pathRE, err := regexp.Compile(`CDC(?:_(\w+)_)?(\d+).\w+`) - if err != nil { - return 0, err - } - - matches := pathRE.FindStringSubmatch(fileName) - if len(matches) != 3 { - return 0, errors.ErrStorageSinkInvalidFileName.GenWithStack("cannot match dml path pattern for %q", fileName) + name := strings.TrimSuffix(strings.TrimPrefix(fileName, "CDC"), extension) + dispatcherID := "" + idxStr := name + if strings.HasPrefix(name, "_") { + idxSep := strings.LastIndex(name, "_") + if idxSep <= 1 { + return FileIndex{}, errors.ErrStorageSinkInvalidFileName.GenWithStack( + "cannot match dml path pattern for %q", fileName) + } + dispatcherID = name[1:idxSep] + idxStr = name[idxSep+1:] } - return strconv.ParseUint(matches[2], 10, 64) + idx, err := strconv.ParseUint(idxStr, 10, 64) + if err != nil { + return FileIndex{}, errors.WrapError( + errors.ErrStorageSinkInvalidFileName, err, "cannot match dml path pattern for %q", fileName) + } + return FileIndex{ + FileIndexKey: FileIndexKey{ + DispatcherID: dispatcherID, + EnableTableAcrossNodes: dispatcherID != "", + }, + Idx: idx, + }, nil } var dateSeparatorDayRegexp *regexp.Regexp diff --git a/pkg/sink/cloudstorage/main_test.go b/pkg/cloudstorage/main_test.go similarity index 100% rename from pkg/sink/cloudstorage/main_test.go rename to pkg/cloudstorage/main_test.go diff --git a/pkg/cloudstorage/path_key.go b/pkg/cloudstorage/path_key.go new file mode 100644 index 0000000000..c60e58d725 --- /dev/null +++ b/pkg/cloudstorage/path_key.go @@ -0,0 +1,283 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "cmp" + "path" + "strconv" + "strings" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/config" + "go.uber.org/zap" +) + +const schemaFilePartitionNum int64 = -1 + +// SchemaPathKey identifies the schema path scope parsed from or used to build +// cloud storage paths. +type SchemaPathKey struct { + // Schema is the first directory level in storage sink paths. + // Example: /
//... + Schema string + // Table is the second directory level for table-level schema file and data paths. + // For database-level schema files, this field is empty and the path is + // /meta/schema_{tableVersion}_{checksum}.json. + Table string + // TableVersion is the schema version encoded in the path. + // In CDC it is carried by tableInfoVersion, and for DDL-related versions it + // is typically equal to the DDL finishedTs. + TableVersion uint64 +} + +// GetKey returns the quoted schema/table key used by consumer maps. +// For database-level schema files, Table is empty. +func (s *SchemaPathKey) GetKey() string { + return common.QuoteSchema(s.Schema, s.Table) +} + +// Parse fills SchemaPathKey from a schema file path. +// Input: +// - /meta/schema__.json +// - /
/meta/schema__.json +// +// Output fields are Schema, Table, and TableVersion. Table is empty for +// database-level schema files. Invalid paths panic. +func (s *SchemaPathKey) Parse(path string) { + // For /
/meta/schema_{tableVersion}_{checksum}.json, the parts + // should be ["", "
", "meta", "schema_{tableVersion}_{checksum}.json"]. + matches := strings.Split(path, "/") + + var schema, table string + schema = matches[0] + switch len(matches) { + case 3: + table = "" + case 4: + table = matches[1] + default: + log.Panic("cannot match schema path pattern", zap.String("path", path)) + } + + if matches[len(matches)-2] != "meta" { + log.Panic("cannot match schema path pattern", zap.String("path", path)) + } + + schemaFileName := matches[len(matches)-1] + version, _ := mustParseSchemaFileName(schemaFileName) + + *s = SchemaPathKey{ + Schema: schema, + Table: table, + TableVersion: version, + } +} + +type FileIndexKey struct { + // DispatcherID is used in file name only when table-across-nodes is enabled. + // File pattern: CDC_{dispatcherID}_{index}.{ext} + DispatcherID string + // EnableTableAcrossNodes controls whether dispatcher ID is embedded in + // data/index file names to avoid collisions across captures. + EnableTableAcrossNodes bool +} + +type FileIndex struct { + FileIndexKey + // Idx is the monotonically increasing file sequence number in one + // directory scope (schema/table/version[/partition][/date] or + // tableID/version[/date]). + Idx uint64 +} + +// DMLPathKey is the key of dml path. +type DMLPathKey struct { + SchemaPathKey + // UseTableIDAsPath controls whether TableID is used as the first path + // element instead of Schema/Table. + UseTableIDAsPath bool + // TableID is set when UseTableIDAsPath is true. + TableID int64 + // PartitionNum is an optional path level for partition table output. + // It is present only when partition-separator is enabled. + PartitionNum int64 + // Date is an optional path level controlled by date-separator + // (year/month/day/none). + Date string +} + +// NewSchemaFileDMLPathKey returns the synthetic DML path key used to order a +// schema file before data files with the same schema version. +func NewSchemaFileDMLPathKey(schemaKey SchemaPathKey) DMLPathKey { + return DMLPathKey{ + SchemaPathKey: schemaKey, + PartitionNum: schemaFilePartitionNum, + } +} + +// IsSchemaFileDMLPathKey checks whether the key represents a schema file marker. +func (d DMLPathKey) IsSchemaFileDMLPathKey() bool { + return d.PartitionNum == schemaFilePartitionNum && d.Date == "" +} + +// CompareDMLPathKey compares DML path keys in cloud storage replay order. +func CompareDMLPathKey(x, y DMLPathKey) int { + if r := cmp.Compare(x.TableVersion, y.TableVersion); r != 0 { + return r + } + if r := cmp.Compare(x.PartitionNum, y.PartitionNum); r != 0 { + return r + } + if r := cmp.Compare(x.Date, y.Date); r != 0 { + return r + } + if x.UseTableIDAsPath != y.UseTableIDAsPath { + if x.UseTableIDAsPath { + return 1 + } + return -1 + } + if r := cmp.Compare(x.TableID, y.TableID); r != 0 { + return r + } + if r := cmp.Compare(x.Schema, y.Schema); r != 0 { + return r + } + return cmp.Compare(x.Table, y.Table) +} + +// GenerateDMLFilePath returns the full data file path. +// The receiver supplies the data directory fields. fileIndex supplies the +// dispatcher ID and sequence number used in the file name. extension is the +// file suffix, for example ".json" or ".csv". +func (d *DMLPathKey) GenerateDMLFilePath( + fileIndex *FileIndex, extension string, fileIndexWidth int, +) string { + fileName := generateDataFileName( + fileIndex.EnableTableAcrossNodes, fileIndex.DispatcherID, + fileIndex.Idx, extension, fileIndexWidth) + return path.Join(d.generateDMLDataDirPath(), fileName) +} + +// generateDMLDataDirPath returns the canonical data directory path. +// Output is either /[/date] or +// /
/[/partition][/date]. +func (d DMLPathKey) generateDMLDataDirPath() string { + elems := make([]string, 0, 5) + if d.UseTableIDAsPath { + elems = append(elems, strconv.FormatInt(d.TableID, 10)) + elems = append(elems, strconv.FormatUint(d.TableVersion, 10)) + if d.Date != "" { + elems = append(elems, d.Date) + } + return path.Join(elems...) + } + elems = append(elems, d.Schema, d.Table) + elems = append(elems, strconv.FormatUint(d.TableVersion, 10)) + if d.PartitionNum != 0 { + elems = append(elems, strconv.FormatInt(d.PartitionNum, 10)) + } + if d.Date != "" { + elems = append(elems, d.Date) + } + return path.Join(elems...) +} + +func (d *DMLPathKey) parseDMLDataDir( + dateSeparator string, parts []string, filePath string, +) { + var ( + key DMLPathKey + version string + tableID string + partition string + hasDate bool + ) + switch dateSeparator { + case config.DateSeparatorNone.String(): + case config.DateSeparatorYear.String(), + config.DateSeparatorMonth.String(), + config.DateSeparatorDay.String(): + hasDate = true + default: + log.Panic("invalid date separator", zap.String("dateSeparator", dateSeparator)) + } + + switch { + case !hasDate && len(parts) == 2: + key.Schema = parts[0] + tableID = parts[0] + version = parts[1] + case !hasDate && len(parts) == 3: + key.Schema, key.Table = parts[0], parts[1] + version = parts[2] + case !hasDate && len(parts) == 4: + key.Schema, key.Table = parts[0], parts[1] + partition = parts[3] + version = parts[2] + case hasDate && len(parts) == 3: + key.Schema = parts[0] + tableID = parts[0] + key.Date = parts[2] + version = parts[1] + case hasDate && len(parts) == 4: + key.Schema, key.Table = parts[0], parts[1] + key.Date = parts[3] + version = parts[2] + case hasDate && len(parts) == 5: + key.Schema, key.Table = parts[0], parts[1] + partition = parts[3] + key.Date = parts[4] + version = parts[2] + default: + log.Panic("cannot match dml path pattern", zap.String("path", filePath)) + } + + if tableID != "" { + tableIDNum, err := strconv.ParseInt(tableID, 10, 64) + if err != nil { + log.Panic("parse table id failed", zap.String("value", tableID), zap.Error(err)) + } + key.UseTableIDAsPath = true + key.TableID = tableIDNum + } + if partition != "" { + partitionNum, err := strconv.ParseInt(partition, 10, 64) + if err != nil { + log.Panic("parse partition number failed", zap.String("value", partition), zap.Error(err)) + } + key.PartitionNum = partitionNum + } + tableVersion, err := strconv.ParseUint(version, 10, 64) + if err != nil { + log.Panic("parse table version failed", zap.String("value", version), zap.Error(err)) + } + key.TableVersion = tableVersion + *d = key +} + +// ParseIndexFilePath fills DMLPathKey from an index file path. +// Input is /meta/CDC.index or +// /meta/CDC_.index. Only the data directory portion is +// parsed here; the file index itself is stored in the index file content and is +// read by the caller. Invalid paths panic. +func (d *DMLPathKey) ParseIndexFilePath(dateSeparator, path string) { + parts := strings.Split(path, "/") + if len(parts) < 4 || parts[len(parts)-2] != "meta" { + log.Panic("cannot match dml path pattern", zap.String("path", path)) + } + d.parseDMLDataDir(dateSeparator, parts[:len(parts)-2], path) +} diff --git a/pkg/sink/cloudstorage/path_key_test.go b/pkg/cloudstorage/path_key_test.go similarity index 50% rename from pkg/sink/cloudstorage/path_key_test.go rename to pkg/cloudstorage/path_key_test.go index f768f7cf74..80d3814761 100644 --- a/pkg/sink/cloudstorage/path_key_test.go +++ b/pkg/cloudstorage/path_key_test.go @@ -27,7 +27,6 @@ func TestSchemaPathKey(t *testing.T) { testCases := []struct { path string schemakey SchemaPathKey - checksum uint32 }{ // Test for database schema path: /meta/schema_{tableVersion}_{checksum}.json { @@ -37,9 +36,8 @@ func TestSchemaPathKey(t *testing.T) { Table: "", TableVersion: 1, }, - checksum: 2, }, - // Test for table schema path: /
/meta/schema_{tableVersion}_{checksum}.json + // Test for table-level schema file path: /
/meta/schema_{tableVersion}_{checksum}.json { path: "test_schema/test_table/meta/schema_11_22.json", schemakey: SchemaPathKey{ @@ -47,19 +45,16 @@ func TestSchemaPathKey(t *testing.T) { Table: "test_table", TableVersion: 11, }, - checksum: 22, }, } for _, tc := range testCases { var schemaKey SchemaPathKey - checksum, err := schemaKey.ParseSchemaFilePath(tc.path) - require.NoError(t, err) + schemaKey.Parse(tc.path) require.Equal(t, tc.schemakey, schemaKey) - require.Equal(t, tc.checksum, checksum) } } -func TestDmlPathKey(t *testing.T) { +func TestGenerateDMLFilePath(t *testing.T) { t.Parallel() dispatcherID := common.NewDispatcherID() @@ -68,16 +63,14 @@ func TestDmlPathKey(t *testing.T) { fileIndexWidth int extension string path string - indexPath string - dmlkey DmlPathKey + dmlkey DMLPathKey }{ { index: 10, fileIndexWidth: 20, extension: ".csv", path: fmt.Sprintf("schema1/table1/123456/2023-05-09/CDC_%s_00000000000000000010.csv", dispatcherID.String()), - indexPath: fmt.Sprintf("schema1/table1/123456/2023-05-09/meta/CDC_%s.index", dispatcherID.String()), - dmlkey: DmlPathKey{ + dmlkey: DMLPathKey{ SchemaPathKey: SchemaPathKey{ Schema: "schema1", Table: "table1", @@ -87,23 +80,76 @@ func TestDmlPathKey(t *testing.T) { Date: "2023-05-09", }, }, + { + index: 10, + fileIndexWidth: 20, + extension: ".csv", + path: fmt.Sprintf("12345/123456/CDC_%s_00000000000000000010.csv", dispatcherID.String()), + dmlkey: DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "12345", + TableVersion: 123456, + }, + UseTableIDAsPath: true, + TableID: 12345, + }, + }, + { + index: 10, + fileIndexWidth: 20, + extension: ".csv", + path: fmt.Sprintf("schema1/table1/123456/55/2023-05-09/CDC_%s_00000000000000000010.csv", dispatcherID.String()), + dmlkey: DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "schema1", + Table: "table1", + TableVersion: 123456, + }, + PartitionNum: 55, + Date: "2023-05-09", + }, + }, } for _, tc := range testCases { - var dmlkey DmlPathKey - id, err := dmlkey.ParseIndexFilePath("day", tc.indexPath) - require.NoError(t, err) - require.Equal(t, tc.dmlkey, dmlkey) - require.Equal(t, id, dispatcherID.String()) - fileIndex := &FileIndex{ FileIndexKey: FileIndexKey{ - DispatcherID: id, - EnableTableAcrossNodes: id != "", + DispatcherID: dispatcherID.String(), + EnableTableAcrossNodes: true, }, Idx: tc.index, } - fileName := dmlkey.GenerateDMLFilePath(fileIndex, tc.extension, tc.fileIndexWidth) + fileName := tc.dmlkey.GenerateDMLFilePath(fileIndex, tc.extension, tc.fileIndexWidth) require.Equal(t, tc.path, fileName) } } + +func TestSchemaFileDMLPathKeyOrder(t *testing.T) { + t.Parallel() + + schemaKey := SchemaPathKey{ + Schema: "schema1", + Table: "table1", + TableVersion: 123456, + } + schemaDMLKey := NewSchemaFileDMLPathKey(schemaKey) + require.True(t, schemaDMLKey.IsSchemaFileDMLPathKey()) + + dataDMLKey := DMLPathKey{ + SchemaPathKey: schemaKey, + Date: "2023-05-09", + } + require.Less(t, CompareDMLPathKey(schemaDMLKey, dataDMLKey), 0) + require.Greater(t, CompareDMLPathKey(dataDMLKey, schemaDMLKey), 0) + require.Zero(t, CompareDMLPathKey(schemaDMLKey, NewSchemaFileDMLPathKey(schemaKey))) + + tableIDPathKey := DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "12345", + TableVersion: schemaKey.TableVersion, + }, + UseTableIDAsPath: true, + TableID: 12345, + } + require.NotZero(t, CompareDMLPathKey(dataDMLKey, tableIDPathKey)) +} diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/cloudstorage/path_test.go similarity index 85% rename from pkg/sink/cloudstorage/path_test.go rename to pkg/cloudstorage/path_test.go index 7c1d5fbed6..09b1c23ce4 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/cloudstorage/path_test.go @@ -71,8 +71,7 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP func TestGenerateDataFilePath(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + ctx := t.Context() table := VersionedTableName{ TableNameWithPhysicTableID: commonType.TableName{ @@ -152,8 +151,7 @@ func TestGenerateDataFilePath(t *testing.T) { func TestGenerateDataFilePathWithTableIDAsPath(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + ctx := t.Context() table := VersionedTableName{ TableNameWithPhysicTableID: commonType.TableName{ @@ -176,59 +174,134 @@ func TestGenerateDataFilePathWithTableIDAsPath(t *testing.T) { require.Equal(t, fmt.Sprintf("12345/5/CDC_%s_000001.json", table.DispatcherID.String()), path) } -func TestFetchIndexFromFileName(t *testing.T) { +func TestGenerateAndParseIndexFilePath(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + ctx := t.Context() - dir := t.TempDir() - f := testFilePathGenerator(ctx, t, dir) testCases := []struct { - fileName string - wantErr string + name string + dateSeparator string + date string + useTableIDAsPath bool + enablePartition bool + table VersionedTableName + expectedDMLPathKey DMLPathKey }{ { - fileName: "CDC000011.json", - wantErr: "", - }, - { - fileName: "CDC1000000.json", - wantErr: "", + name: "schema table with date", + dateSeparator: config.DateSeparatorDay.String(), + date: "2023-05-09", + table: VersionedTableName{ + TableNameWithPhysicTableID: commonType.TableName{ + Schema: "test", + Table: "table1", + }, + TableInfoVersion: 5, + DispatcherID: commonType.NewDispatcherID(), + }, + expectedDMLPathKey: DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "test", + Table: "table1", + TableVersion: 5, + }, + Date: "2023-05-09", + }, }, { - fileName: "CDC1.json", - wantErr: "filename in storage sink is invalid", + name: "table id path", + dateSeparator: config.DateSeparatorNone.String(), + useTableIDAsPath: true, + table: VersionedTableName{ + TableNameWithPhysicTableID: commonType.TableName{ + Schema: "test", + Table: "table1", + TableID: 12345, + }, + TableInfoVersion: 5, + DispatcherID: commonType.NewDispatcherID(), + }, + expectedDMLPathKey: DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "12345", + TableVersion: 5, + }, + UseTableIDAsPath: true, + TableID: 12345, + }, }, { - fileName: "cdc000001.json", - wantErr: "filename in storage sink is invalid", + name: "partition with date", + dateSeparator: config.DateSeparatorDay.String(), + date: "2023-05-09", + enablePartition: true, + table: VersionedTableName{ + TableNameWithPhysicTableID: commonType.TableName{ + Schema: "test", + Table: "table1", + TableID: 55, + IsPartition: true, + }, + TableInfoVersion: 5, + DispatcherID: commonType.NewDispatcherID(), + }, + expectedDMLPathKey: DMLPathKey{ + SchemaPathKey: SchemaPathKey{ + Schema: "test", + Table: "table1", + TableVersion: 5, + }, + PartitionNum: 55, + Date: "2023-05-09", + }, }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + f := testFilePathGenerator(ctx, t, t.TempDir()) + f.config.DateSeparator = tc.dateSeparator + f.config.UseTableIDAsPath = tc.useTableIDAsPath + f.config.EnablePartitionSeparator = tc.enablePartition + f.versionMap[tc.table] = tc.table.TableInfoVersion + + indexPath := f.GenerateIndexFilePath(tc.table, tc.date) + var pathKey DMLPathKey + pathKey.ParseIndexFilePath(tc.dateSeparator, indexPath) + require.Equal(t, tc.expectedDMLPathKey, pathKey) + }) + } +} + +func TestParseFileIndexFromFileName(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + testCases := []struct { + fileName string + }{ { - fileName: "CDC000005.xxx", - wantErr: "filename in storage sink is invalid", + fileName: "CDC000011.json", }, { - fileName: "CDChello.json", - wantErr: "filename in storage sink is invalid", + fileName: "CDC1000000.json", }, } for _, tc := range testCases { - _, err := FetchIndexFromFileName(tc.fileName, f.extension) - if len(tc.wantErr) != 0 { - require.Contains(t, err.Error(), tc.wantErr) - } else { - require.NoError(t, err) - } + _, err := ParseFileIndexFromFileName(tc.fileName, f.extension) + require.NoError(t, err) } } func TestGenerateDataFilePathWithIndexFile(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + ctx := t.Context() dir := t.TempDir() f := testFilePathGenerator(ctx, t, dir) @@ -248,9 +321,8 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { } f.versionMap[table] = table.TableInfoVersion date := f.GenerateDateStr() - indexFilePath, err := f.GenerateIndexFilePath(table, date) - require.NoError(t, err) - err = f.storage.WriteFile(ctx, indexFilePath, []byte(fmt.Sprintf("CDC_%s_000005.json\n", dispatcherID.String()))) + indexFilePath := f.GenerateIndexFilePath(table, date) + err := f.storage.WriteFile(ctx, indexFilePath, fmt.Appendf(nil, "CDC_%s_000005.json\n", dispatcherID.String())) require.NoError(t, err) dataFilePath, err := f.GenerateDataFilePath(ctx, table, date) @@ -280,8 +352,7 @@ func TestGenerateDataFilePathResyncIndexFile(t *testing.T) { f2.versionMap[table] = table.TableInfoVersion date := "" - indexFilePath, err := f1.GenerateIndexFilePath(table, date) - require.NoError(t, err) + indexFilePath := f1.GenerateIndexFilePath(table, date) // Simulate dispatcher moved between captures: // 1) f1 generates CDC_..._000001 and writes index file. @@ -337,8 +408,7 @@ func TestGenerateDataFilePathReconcilesStaleIndexFile(t *testing.T) { date := "" firstDataFile := fmt.Sprintf("test/table1/5/CDC_%s_000001.json", dispatcherID.String()) secondDataFile := fmt.Sprintf("test/table1/5/CDC_%s_000002.json", dispatcherID.String()) - indexFilePath, err := f.GenerateIndexFilePath(table, date) - require.NoError(t, err) + indexFilePath := f.GenerateIndexFilePath(table, date) require.NoError(t, f.storage.WriteFile(ctx, firstDataFile, []byte("test1"))) require.NoError(t, f.storage.WriteFile(ctx, secondDataFile, []byte("test2"))) require.NoError(t, f.storage.WriteFile(ctx, indexFilePath, fmt.Appendf(nil, "CDC_%s_000001.json\n", dispatcherID.String()))) @@ -361,7 +431,7 @@ func TestIsSchemaFile(t *testing.T) { "schema2/meta/schema_123_0123456789.json", true, }, { - "valid table schema /
/meta/", + "valid table-level schema file /
/meta/", "schema1/table1/meta/schema_123_0123456789.json", true, }, {"valid special prefix", "meta/meta/schema_123_0123456789.json", true}, @@ -385,8 +455,7 @@ func TestIsSchemaFile(t *testing.T) { func TestCheckOrWriteSchema(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := t.Context() dir := t.TempDir() f := testFilePathGenerator(ctx, t, dir) @@ -457,8 +526,7 @@ func TestCheckOrWriteSchema(t *testing.T) { func TestCheckOrWriteSchemaUsesRoutedTargetNames(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := t.Context() dir := t.TempDir() f := testFilePathGenerator(ctx, t, dir) @@ -499,8 +567,7 @@ func TestCheckOrWriteSchemaUsesRoutedTargetNames(t *testing.T) { func TestRemoveExpiredFilesWithoutPartition(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := t.Context() dir := t.TempDir() uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) storage, err := util.GetExternalStorageWithDefaultTimeout(ctx, uri) diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/cloudstorage/schema_file.go similarity index 52% rename from pkg/sink/cloudstorage/table_definition.go rename to pkg/cloudstorage/schema_file.go index 220e4e05b5..f295c4c979 100644 --- a/pkg/sink/cloudstorage/table_definition.go +++ b/pkg/cloudstorage/schema_file.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/hash" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" @@ -32,26 +31,27 @@ import ( ) const ( - defaultTableDefinitionVersion = 1 - marshalPrefix = "" - marshalIndent = " " + defaultSchemaFileVersion = 1 + marshalPrefix = "" + marshalIndent = " " ) -// TableCol denotes the column info for a table definition. +// TableCol is one column entry persisted in a schema file. type TableCol struct { - ID string `json:"ColumnId,omitempty"` - Name string `json:"ColumnName" ` - Tp string `json:"ColumnType"` - Default interface{} `json:"ColumnDefault,omitempty"` - Precision string `json:"ColumnPrecision,omitempty"` - Scale string `json:"ColumnScale,omitempty"` - Nullable string `json:"ColumnNullable,omitempty"` - IsPK string `json:"ColumnIsPk,omitempty"` - Elems []string `json:"ColumnElems,omitempty"` + ID string `json:"ColumnId,omitempty"` + Name string `json:"ColumnName" ` + Tp string `json:"ColumnType"` + Default any `json:"ColumnDefault,omitempty"` + Precision string `json:"ColumnPrecision,omitempty"` + Scale string `json:"ColumnScale,omitempty"` + Nullable string `json:"ColumnNullable,omitempty"` + IsPK string `json:"ColumnIsPk,omitempty"` + Elems []string `json:"ColumnElems,omitempty"` } -// FromTiColumnInfo converts from TiDB ColumnInfo to TableCol. -func (t *TableCol) FromTiColumnInfo(col *model.ColumnInfo, outputColumnID bool) { +// fromTiColumnInfo fills TableCol from a TiDB column. outputColumnID controls +// whether ColumnId is written into the schema file payload. +func (t *TableCol) fromTiColumnInfo(col *model.ColumnInfo, outputColumnID bool) { defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(col.GetType()) isDecimalNotDefault := col.GetDecimal() != defaultDecimal && col.GetDecimal() != 0 && @@ -105,18 +105,10 @@ func (t *TableCol) FromTiColumnInfo(col *model.ColumnInfo, outputColumnID bool) } } -// ToTiColumnInfo converts from TableCol to TiDB ColumnInfo. -func (t *TableCol) ToTiColumnInfo(colID int64) (*model.ColumnInfo, error) { +// toTiColumnInfo returns a TiDB column reconstructed from TableCol. colID is +// used as the returned column ID. +func (t *TableCol) toTiColumnInfo(colID int64) *model.ColumnInfo { col := new(model.ColumnInfo) - - if t.ID != "" { - var err error - col.ID, err = strconv.ParseInt(t.ID, 10, 64) - if err != nil { - return nil, errors.WrapError(errors.ErrInternalCheckFailed, err) - } - } - col.ID = colID col.Name = ast.NewCIStr(t.Name) tp := types.StrToType(strings.ToLower(strings.TrimSuffix(t.Tp, " UNSIGNED"))) @@ -136,58 +128,47 @@ func (t *TableCol) ToTiColumnInfo(colID int64) (*model.ColumnInfo, error) { } else { col.SetCharset(charset.CharsetUTF8MB4) } - setFlen := func(precision string) error { + setFlen := func(precision string) { if len(precision) > 0 { flen, err := strconv.Atoi(precision) if err != nil { - return errors.WrapError(errors.ErrInternalCheckFailed, err) + log.Panic("parse precision failed, this should not happen", + zap.String("precision", precision), zap.Error(err)) } col.SetFlen(flen) } - return nil } - setDecimal := func(scale string) error { + setDecimal := func(scale string) { if len(scale) > 0 { decimal, err := strconv.Atoi(scale) if err != nil { - return errors.WrapError(errors.ErrInternalCheckFailed, err) + log.Panic("parse scale failed, this should not happen", + zap.String("scale", scale), zap.Error(err)) } col.SetDecimal(decimal) } - return nil } switch col.GetType() { case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration: - err := setDecimal(t.Scale) - if err != nil { - return nil, err - } + setDecimal(t.Scale) case mysql.TypeDouble, mysql.TypeFloat, mysql.TypeNewDecimal: - err := setFlen(t.Precision) - if err != nil { - return nil, err - } - err = setDecimal(t.Scale) - if err != nil { - return nil, err - } + setFlen(t.Precision) + setDecimal(t.Scale) case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeBit, mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeYear: - err := setFlen(t.Precision) - if err != nil { - return nil, err - } + setFlen(t.Precision) case mysql.TypeEnum, mysql.TypeSet: col.SetElems(t.Elems) } - return col, nil + return col } -// TableDefinition is the detailed table definition used for cloud storage sink. -// TODO: find a better name for this struct. -type TableDefinition struct { +// SchemaFile is the payload persisted as schema_*.json. +// It carries table structure in Columns/TotalColumns and DDL replay metadata in +// Query/Type/TableVersion. +type SchemaFile struct { Table string `json:"Table"` Schema string `json:"Schema"` Version uint64 `json:"Version"` @@ -201,9 +182,8 @@ type TableDefinition struct { TotalColumns int `json:"TableColumnsTotal"` } -// tableDefWithoutQuery is the table definition without query, which ignores the -// Query, Type and TableVersion field. -type tableDefWithoutQuery struct { +// checksumPayload ignores DDL replay fields and path metadata. +type checksumPayload struct { Table string `json:"Table"` Schema string `json:"Schema"` Version uint64 `json:"Version"` @@ -211,61 +191,55 @@ type tableDefWithoutQuery struct { TotalColumns int `json:"TableColumnsTotal"` } -// FromDDLEvent converts from DDLEvent to TableDefinition. -func (t *TableDefinition) FromDDLEvent(event *commonEvent.DDLEvent, outputColumnID bool) { - t.FromTableInfo(event.GetTargetSchemaName(), event.GetTargetTableName(), event.TableInfo, event.FinishedTs, outputColumnID) - t.Query = event.Query - t.Type = event.Type -} - -// ToDDLEvent converts from TableDefinition to DDLEvent. -func (t *TableDefinition) ToDDLEvent() (*commonEvent.DDLEvent, error) { - tableInfo, err := t.ToTableInfo() - if err != nil { - return nil, err - } +// DDLEvent returns the DDL event represented by SchemaFile. +// The returned event includes a TableInfo rebuilt from Columns plus the Query, +// Type, schema/table name, and FinishedTs from the schema file. +func (t *SchemaFile) DDLEvent() *commonEvent.DDLEvent { return &commonEvent.DDLEvent{ - TableInfo: tableInfo, + TableInfo: t.TableInfo(), FinishedTs: t.TableVersion, Type: t.Type, Query: t.Query, SchemaName: t.Schema, TableName: t.Table, BlockedTables: &commonEvent.InfluencedTables{InfluenceType: commonEvent.InfluenceTypeAll}, - }, nil + } } -// FromTableInfo converts from TableInfo to TableDefinition. -func (t *TableDefinition) FromTableInfo( - schemaName string, tableName string, info *common.TableInfo, tableInfoVersion uint64, outputColumnID bool, -) { - t.Version = defaultTableDefinitionVersion - t.TableVersion = tableInfoVersion +// Build fills SchemaFile from a DDL event for persistence. +// outputColumnID controls whether column IDs are written. If event.TableInfo is +// nil, only schema/table name and DDL metadata are filled. +func (t *SchemaFile) Build(event *commonEvent.DDLEvent, outputColumnID bool) { + t.Version = defaultSchemaFileVersion + t.TableVersion = event.FinishedTs + t.Query = event.Query + t.Type = event.Type - t.Schema = schemaName - t.Table = tableName + info := event.TableInfo if info == nil { + t.Schema = event.GetTargetSchemaName() + t.Table = event.GetTargetTableName() return } + t.Schema = info.GetTargetSchemaName() + t.Table = info.GetTargetTableName() t.TotalColumns = len(info.GetColumns()) for _, col := range info.GetColumns() { var tableCol TableCol - tableCol.FromTiColumnInfo(col, outputColumnID) + tableCol.fromTiColumnInfo(col, outputColumnID) t.Columns = append(t.Columns, tableCol) } } -// ToTableInfo converts from TableDefinition to DDLEvent. -func (t *TableDefinition) ToTableInfo() (*common.TableInfo, error) { +// TableInfo returns decoder TableInfo rebuilt from SchemaFile columns. +// It uses deterministic mock column IDs starting from 100. +func (t *SchemaFile) TableInfo() *common.TableInfo { tidbTableInfo := &model.TableInfo{ Name: ast.NewCIStr(t.Table), } nextMockID := int64(100) // 100 is an arbitrary number for _, col := range t.Columns { - tiCol, err := col.ToTiColumnInfo(nextMockID) - if err != nil { - return nil, err - } + tiCol := col.toTiColumnInfo(nextMockID) if mysql.HasPriKeyFlag(tiCol.GetFlag()) { // use PKIsHandle to make sure that the primary keys can be detected tidbTableInfo.PKIsHandle = true @@ -273,29 +247,22 @@ func (t *TableDefinition) ToTableInfo() (*common.TableInfo, error) { tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol) nextMockID++ } - info := common.NewTableInfo4Decoder(t.Schema, tidbTableInfo) - return info, nil + return common.NewTableInfo4Decoder(t.Schema, tidbTableInfo) } -// IsTableSchema returns whether the TableDefinition is a table schema. -func (t *TableDefinition) IsTableSchema() bool { - if len(t.Columns) != t.TotalColumns { - log.Panic("invalid table definition", zap.Any("tableDef", t)) - } - return t.TotalColumns != 0 -} - -// MarshalWithQuery marshals TableDefinition with Query field. -func (t *TableDefinition) MarshalWithQuery() ([]byte, error) { +// Marshal returns the indented JSON payload written to schema files. +// Marshal failures panic. +func (t *SchemaFile) Marshal() []byte { data, err := json.MarshalIndent(t, marshalPrefix, marshalIndent) if err != nil { - return nil, errors.WrapError(errors.ErrMarshalFailed, err) + log.Panic("marshal the schema file failed, this should not happen", + zap.Any("schemaFile", t), zap.Error(err)) } - return data, nil + return data } -// marshalWithoutQuery marshals TableDefinition without Query field. -func (t *TableDefinition) marshalWithoutQuery() ([]byte, error) { +// marshalForChecksum marshals fields covered by the path checksum. +func (t *SchemaFile) marshalForChecksum() []byte { // sort columns by name sortedColumns := make([]TableCol, len(t.Columns)) copy(sortedColumns, t.Columns) @@ -303,71 +270,41 @@ func (t *TableDefinition) marshalWithoutQuery() ([]byte, error) { return sortedColumns[i].Name < sortedColumns[j].Name }) - defWithoutQuery := tableDefWithoutQuery{ + payload := checksumPayload{ Table: t.Table, Schema: t.Schema, Columns: sortedColumns, TotalColumns: t.TotalColumns, } - data, err := json.MarshalIndent(defWithoutQuery, marshalPrefix, marshalIndent) + data, err := json.MarshalIndent(payload, marshalPrefix, marshalIndent) if err != nil { - return nil, errors.WrapError(errors.ErrMarshalFailed, err) + log.Panic("marshal for the schema file checksum failed, this should not happen", + zap.Any("payload", payload), zap.Error(err)) } - return data, nil + return data } -// Sum32 returns the 32-bits hash value of TableDefinition. -func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error) { - if hasher == nil { - hasher = hash.NewPositionInertia() - } +// Checksum returns the checksum used in schema file names. +func (t *SchemaFile) Checksum() uint32 { + hasher := hash.NewPositionInertia() hasher.Reset() - data, err := t.marshalWithoutQuery() - if err != nil { - return 0, err - } + data := t.marshalForChecksum() hasher.Write(data) - return hasher.Sum32(), nil + return hasher.Sum32() } -// GenerateSchemaFilePath generates the schema file path for TableDefinition -// with optional table id path. -func (t *TableDefinition) GenerateSchemaFilePath(useTableIDAsPath bool, tableID int64) (string, error) { - checksum, err := t.Sum32(nil) - if err != nil { - return "", err - } - if t.Schema == "" { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("schema cannot be empty") - } - if t.TableVersion == 0 { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("table version cannot be zero") - } - if len(t.Columns) != t.TotalColumns { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table definition") - } - isTableSchema := t.TotalColumns != 0 - if !isTableSchema && t.Table != "" { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table definition") - } - if useTableIDAsPath && isTableSchema && tableID <= 0 { - return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table id for table-id path") - } - +// Path returns the schema file path for this payload. +// Database-level schema files use /meta/... +// Table-level schema files use /
/meta/... or /meta/... +// when useTableIDAsPath is true. The file name includes the generated checksum. +func (t *SchemaFile) Path(useTableIDAsPath bool, tableID int64) string { + tableLevel := t.TotalColumns != 0 table := t.Table - if isTableSchema { - tablePath, err := generateTablePath(t.Table, tableID, useTableIDAsPath) - if err != nil { - return "", err - } - table = tablePath - } - omitSchema := useTableIDAsPath && isTableSchema - path, err := generateSchemaFilePath(t.Schema, table, t.TableVersion, checksum, omitSchema) - if err != nil { - return "", err + if tableLevel { + table = generateTablePath(t.Table, tableID, useTableIDAsPath) } - return path, nil + omitSchema := useTableIDAsPath && tableLevel + return generateSchemaFilePath(t.Schema, table, t.TableVersion, t.Checksum(), omitSchema) } diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/cloudstorage/schema_file_test.go similarity index 74% rename from pkg/sink/cloudstorage/table_definition_test.go rename to pkg/cloudstorage/schema_file_test.go index 342059f571..fa31e955c4 100644 --- a/pkg/sink/cloudstorage/table_definition_test.go +++ b/pkg/cloudstorage/schema_file_test.go @@ -28,7 +28,7 @@ import ( "github.com/stretchr/testify/require" ) -func generateTableDef() (TableDefinition, *common.TableInfo) { +func generateSchemaFile() (SchemaFile, *common.TableInfo) { var columns []*timodel.ColumnInfo ft := types.NewFieldType(mysql.TypeLong) ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag) @@ -72,56 +72,15 @@ func generateTableDef() (TableDefinition, *common.TableInfo) { Columns: columns, UpdateTS: 100, }) - var def TableDefinition - def.FromTableInfo(tableInfo.GetSchemaName(), tableInfo.GetTableName(), tableInfo, tableInfo.GetUpdateTS(), false) - return def, tableInfo -} - -func TestFromDDLEventUsesTargetNames(t *testing.T) { - t.Parallel() - - idFieldType := types.NewFieldType(mysql.TypeLong) - idFieldType.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag) - routedTableInfo := common.WrapTableInfo("source_db", &timodel.TableInfo{ - ID: 20, - Name: ast.NewCIStr("source_table"), - UpdateTS: 100, - Columns: []*timodel.ColumnInfo{ - { - ID: 1, - Name: ast.NewCIStr("id"), - FieldType: *idFieldType, - State: timodel.StatePublic, - }, - }, - }).CloneWithRouting("target_db", "target_table") - sourceDDL := &commonEvent.DDLEvent{ - Version: commonEvent.DDLEventVersion1, - Type: byte(timodel.ActionCreateTable), - SchemaName: "source_db", - TableName: "source_table", - Query: "CREATE TABLE `source_db`.`source_table` (`id` INT PRIMARY KEY)", - TableInfo: routedTableInfo, - FinishedTs: 100, + event := &commonEvent.DDLEvent{ + SchemaName: tableInfo.GetSchemaName(), + TableName: tableInfo.GetTableName(), + TableInfo: tableInfo, + FinishedTs: tableInfo.GetUpdateTS(), } - - routedDDL := commonEvent.NewRoutedDDLEvent( - sourceDDL, - "CREATE TABLE `target_db`.`target_table` (`id` INT PRIMARY KEY)", - "target_db", - "target_table", - "", - "", - routedTableInfo, - nil, - nil, - ) - - var def TableDefinition - def.FromDDLEvent(routedDDL, false) - require.Equal(t, "target_db", def.Schema) - require.Equal(t, "target_table", def.Table) - require.Contains(t, def.Query, "`target_db`.`target_table`") + var schemaFile SchemaFile + schemaFile.Build(event, false) + return schemaFile, tableInfo } func TestTableCol(t *testing.T) { @@ -413,21 +372,20 @@ func TestTableCol(t *testing.T) { } col := &timodel.ColumnInfo{FieldType: *ft} var tableCol TableCol - tableCol.FromTiColumnInfo(col, false) + tableCol.fromTiColumnInfo(col, false) encodedCol, err := json.Marshal(tableCol) require.Nil(t, err, tc.name) require.JSONEq(t, tc.expected, string(encodedCol), tc.name) - _, err = tableCol.ToTiColumnInfo(100) - require.NoError(t, err) + _ = tableCol.toTiColumnInfo(100) } } -func TestTableDefinition(t *testing.T) { +func TestSchemaFile(t *testing.T) { t.Parallel() - def, tableInfo := generateTableDef() - encodedDef, err := json.MarshalIndent(def, "", " ") + schemaFile, tableInfo := generateSchemaFile() + encodedSchemaFile, err := json.MarshalIndent(schemaFile, "", " ") require.NoError(t, err) require.JSONEq(t, `{ "Table": "table1", @@ -465,9 +423,9 @@ func TestTableDefinition(t *testing.T) { } ], "TableColumnsTotal": 4 - }`, string(encodedDef)) + }`, string(encodedSchemaFile)) - def = TableDefinition{} + schemaFile = SchemaFile{} event := &commonEvent.DDLEvent{ FinishedTs: tableInfo.GetUpdateTS(), Type: byte(timodel.ActionAddColumn), @@ -476,8 +434,8 @@ func TestTableDefinition(t *testing.T) { SchemaName: "schema1", TableName: "table1", } - def.FromDDLEvent(event, false) - encodedDef, err = json.MarshalIndent(def, "", " ") + schemaFile.Build(event, false) + encodedSchemaFile, err = json.MarshalIndent(schemaFile, "", " ") require.NoError(t, err) require.JSONEq(t, `{ "Table": "table1", @@ -515,97 +473,70 @@ func TestTableDefinition(t *testing.T) { } ], "TableColumnsTotal": 4 - }`, string(encodedDef)) + }`, string(encodedSchemaFile)) - tableInfo, err = def.ToTableInfo() - require.NoError(t, err) + tableInfo = schemaFile.TableInfo() require.Len(t, tableInfo.GetColumns(), 4) - event, err = def.ToDDLEvent() - require.NoError(t, err) + event = schemaFile.DDLEvent() require.Equal(t, byte(timodel.ActionAddColumn), event.Type) require.Equal(t, uint64(100), event.FinishedTs) } -func TestTableDefinitionGenFilePath(t *testing.T) { +func TestSchemaFileGenFilePath(t *testing.T) { t.Parallel() - schemaDef := &TableDefinition{ + dbSchemaFile := &SchemaFile{ Schema: "schema1", - Version: defaultTableDefinitionVersion, + Version: defaultSchemaFileVersion, TableVersion: 100, } - schemaPath, err := schemaDef.GenerateSchemaFilePath(false, 0) - require.NoError(t, err) + schemaPath := dbSchemaFile.Path(false, 0) require.Equal(t, "schema1/meta/schema_100_3233644819.json", schemaPath) - schemaPath, err = schemaDef.GenerateSchemaFilePath(true, 0) - require.NoError(t, err) + schemaPath = dbSchemaFile.Path(true, 0) require.Equal(t, "schema1/meta/schema_100_3233644819.json", schemaPath) - def, _ := generateTableDef() - tablePath, err := def.GenerateSchemaFilePath(false, 0) - require.NoError(t, err) + schemaFile, _ := generateSchemaFile() + tablePath := schemaFile.Path(false, 0) require.Equal(t, "schema1/table1/meta/schema_100_3752767265.json", tablePath) - tablePath, err = def.GenerateSchemaFilePath(true, 12345) - require.NoError(t, err) + tablePath = schemaFile.Path(true, 12345) require.Equal(t, "12345/meta/schema_100_3752767265.json", tablePath) } -func TestGenerateSchemaFilePathValidation(t *testing.T) { +func TestParseSchemaFile(t *testing.T) { t.Parallel() - def, _ := generateTableDef() - - // empty schema - emptySchemaDef := &TableDefinition{Schema: "", Table: "t1", TableVersion: 100, TotalColumns: 1, Columns: []TableCol{{}}} - _, err := emptySchemaDef.GenerateSchemaFilePath(false, 0) - require.Error(t, err) - require.Contains(t, err.Error(), "schema cannot be empty") - - // zero table version - zeroVersionDef := &TableDefinition{Schema: "s1", Table: "t1", TableVersion: 0, TotalColumns: 1, Columns: []TableCol{{}}} - _, err = zeroVersionDef.GenerateSchemaFilePath(false, 0) - require.Error(t, err) - require.Contains(t, err.Error(), "table version cannot be zero") + schemaFile, _ := generateSchemaFile() + encodedSchemaFile := schemaFile.Marshal() - // use-table-id-as-path with invalid tableID - _, err = def.GenerateSchemaFilePath(true, 0) - require.Error(t, err) - require.Contains(t, err.Error(), "invalid table id for table-id path") - _, err = def.GenerateSchemaFilePath(true, -1) - require.Error(t, err) - require.Contains(t, err.Error(), "invalid table id for table-id path") - - // invalid table definition - invalidDef := &TableDefinition{Schema: "s1", Table: "t1", TableVersion: 100, TotalColumns: 1, Columns: nil} - _, err = invalidDef.GenerateSchemaFilePath(false, 0) - require.Error(t, err) - require.Contains(t, err.Error(), "invalid table definition") + var got SchemaFile + require.NoError(t, json.Unmarshal(encodedSchemaFile, &got)) + require.Equal(t, schemaFile.Schema, got.Schema) + require.Equal(t, schemaFile.Table, got.Table) + require.Equal(t, schemaFile.TableVersion, got.TableVersion) + require.Len(t, got.Columns, len(schemaFile.Columns)) } -func TestTableDefinitionSum32(t *testing.T) { +func TestSchemaFileChecksum(t *testing.T) { t.Parallel() - def, _ := generateTableDef() - checksum1, err := def.Sum32(nil) - require.NoError(t, err) - checksum2, err := def.Sum32(nil) - require.NoError(t, err) + schemaFile, _ := generateSchemaFile() + checksum1 := schemaFile.Checksum() + checksum2 := schemaFile.Checksum() require.Equal(t, checksum1, checksum2) - n := len(def.Columns) + n := len(schemaFile.Columns) newCol := make([]TableCol, n) - copy(newCol, def.Columns) - newDef := def - newDef.Columns = newCol + copy(newCol, schemaFile.Columns) + newSchemaFile := schemaFile + newSchemaFile.Columns = newCol - for i := 0; i < n; i++ { + for i := range n { target := rand.Intn(n) - newDef.Columns[i], newDef.Columns[target] = newDef.Columns[target], newDef.Columns[i] - newChecksum, err := newDef.Sum32(nil) - require.NoError(t, err) + newSchemaFile.Columns[i], newSchemaFile.Columns[target] = newSchemaFile.Columns[target], newSchemaFile.Columns[i] + newChecksum := newSchemaFile.Checksum() require.Equal(t, checksum1, newChecksum) } } diff --git a/pkg/logger/log.go b/pkg/logger/log.go index d4bce8d2d5..480e627c60 100644 --- a/pkg/logger/log.go +++ b/pkg/logger/log.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "io" + stdlog "log" "os" "strconv" "strings" @@ -257,6 +258,11 @@ func initMySQLLogger() error { // initSaramaLogger hacks logger used in sarama lib func initSaramaLogger(level zapcore.Level) error { + if zapcore.InfoLevel.Enabled(level) { + sarama.Logger = stdlog.New(io.Discard, "[Sarama] ", stdlog.LstdFlags) + return nil + } + logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) if err != nil { return errors.Trace(err) diff --git a/pkg/sink/cloudstorage/path_key.go b/pkg/sink/cloudstorage/path_key.go deleted file mode 100644 index 31f4291819..0000000000 --- a/pkg/sink/cloudstorage/path_key.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloudstorage - -import ( - "fmt" - "regexp" - "strconv" - "strings" - - "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/errors" -) - -// SchemaPathKey is the key of schema path. -type SchemaPathKey struct { - // Schema is the first directory level in storage sink paths. - // Example: /
//... - Schema string - // Table is the second directory level for table schema/data paths. - // For database-level schema files, this field is empty and the path is - // /meta/schema_{tableVersion}_{checksum}.json. - Table string - // TableVersion is the schema version encoded in the path. - // In CDC it is carried by tableInfoVersion, and for DDL-related versions it - // is typically equal to the DDL finishedTs. - TableVersion uint64 -} - -// GetKey returns the key of schema path. -func (s *SchemaPathKey) GetKey() string { - return common.QuoteSchema(s.Schema, s.Table) -} - -// ParseSchemaFilePath parses the schema file path and returns the table version and checksum. -func (s *SchemaPathKey) ParseSchemaFilePath(path string) (uint32, error) { - // For /
/meta/schema_{tableVersion}_{checksum}.json, the parts - // should be ["", "
", "meta", "schema_{tableVersion}_{checksum}.json"]. - matches := strings.Split(path, "/") - - var schema, table string - schema = matches[0] - switch len(matches) { - case 3: - table = "" - case 4: - table = matches[1] - default: - return 0, errors.Trace(fmt.Errorf("cannot match schema path pattern for %s", path)) - } - - if matches[len(matches)-2] != "meta" { - return 0, errors.Trace(fmt.Errorf("cannot match schema path pattern for %s", path)) - } - - schemaFileName := matches[len(matches)-1] - version, checksum := mustParseSchemaName(schemaFileName) - - *s = SchemaPathKey{ - Schema: schema, - Table: table, - TableVersion: version, - } - return checksum, nil -} - -type FileIndexKey struct { - // DispatcherID is used in file name only when table-across-nodes is enabled. - // File pattern: CDC_{dispatcherID}_{index}.{ext} - DispatcherID string - // EnableTableAcrossNodes controls whether dispatcher ID is embedded in - // data/index file names to avoid collisions across captures. - EnableTableAcrossNodes bool -} - -type FileIndex struct { - FileIndexKey - // Idx is the monotonically increasing file sequence number in one - // directory scope (schema/table/version[/partition][/date]). - Idx uint64 -} - -// DmlPathKey is the key of dml path. -type DmlPathKey struct { - SchemaPathKey - // PartitionNum is an optional path level for partition table output. - // It is present only when partition-separator is enabled. - PartitionNum int64 - // Date is an optional path level controlled by date-separator - // (year/month/day/none). - Date string -} - -// GenerateDMLFilePath generates the dml file path. -func (d *DmlPathKey) GenerateDMLFilePath( - fileIndex *FileIndex, extension string, fileIndexWidth int, -) string { - var elems []string - - elems = append(elems, d.Schema) - elems = append(elems, d.Table) - elems = append(elems, fmt.Sprintf("%d", d.TableVersion)) - - if d.PartitionNum != 0 { - elems = append(elems, fmt.Sprintf("%d", d.PartitionNum)) - } - if len(d.Date) != 0 { - elems = append(elems, d.Date) - } - elems = append(elems, generateDataFileName(fileIndex.EnableTableAcrossNodes, fileIndex.DispatcherID, fileIndex.Idx, extension, fileIndexWidth)) - - return strings.Join(elems, "/") -} - -// ParseIndexFilePath parses the index file path and returns the max file index. -// index file path pattern is as follows: -// {schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/meta/, where -// partition-separator and date-separator could be empty. -// DML file name pattern is as follows: CDC_{dispatcherID}.index or CDC.index -func (d *DmlPathKey) ParseIndexFilePath(dateSeparator, path string) (string, error) { - var partitionNum int64 - - str := `(\w+)\/(\w+)\/(\d+)\/(\d+)?\/*` - switch dateSeparator { - case config.DateSeparatorNone.String(): - str += `(\d{4})*` - case config.DateSeparatorYear.String(): - str += `(\d{4})\/` - case config.DateSeparatorMonth.String(): - str += `(\d{4}-\d{2})\/` - case config.DateSeparatorDay.String(): - str += `(\d{4}-\d{2}-\d{2})\/` - } - str += `meta\/` - // CDC[_{dispatcherID}].index - str += `CDC(?:_(\w+))?.index` - pathRE, err := regexp.Compile(str) - if err != nil { - return "", err - } - - matches := pathRE.FindStringSubmatch(path) - if len(matches) != 7 { - return "", fmt.Errorf("cannot match dml path pattern for %s", path) - } - - version, err := strconv.ParseUint(matches[3], 10, 64) - if err != nil { - return "", err - } - - if len(matches[4]) > 0 { - partitionNum, err = strconv.ParseInt(matches[4], 10, 64) - if err != nil { - return "", err - } - } - - *d = DmlPathKey{ - SchemaPathKey: SchemaPathKey{ - Schema: matches[1], - Table: matches[2], - TableVersion: version, - }, - PartitionNum: partitionNum, - Date: matches[5], - } - - return matches[6], nil -}