diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 954ee75082..d2a76176bc 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -1770,7 +1770,8 @@ func verifyTable4MQ( return nil } - eventRouter, err := eventrouter.NewEventRouter(replicaConfig.Sink, topic, config.IsPulsarScheme(scheme), protocol == config.ProtocolAvro) + isAvroLike := protocol == config.ProtocolAvro || protocol == config.ProtocolDebeziumAvro + eventRouter, err := eventrouter.NewEventRouter(replicaConfig.Sink, topic, config.IsPulsarScheme(scheme), isAvroLike) if err != nil { return err } diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 8f6beff3bb..897a42c4ff 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -119,7 +119,8 @@ func newWriter(ctx context.Context, o *option) *writer { w.progresses[i] = newPartitionProgress(int32(i), decoder) } - eventRouter, err := eventrouter.NewEventRouter(o.sinkConfig, o.topic, false, o.protocol == config.ProtocolAvro) + isAvroLike := o.protocol == config.ProtocolAvro || o.protocol == config.ProtocolDebeziumAvro + eventRouter, err := eventrouter.NewEventRouter(o.sinkConfig, o.topic, false, isAvroLike) if err != nil { log.Panic("initialize the event router failed", zap.Any("protocol", o.protocol), zap.Any("topic", o.topic), @@ -493,7 +494,8 @@ func (w *writer) onDDL(ddl *event.DDLEvent) { return } switch w.protocol { - case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro, config.ProtocolSimple, config.ProtocolDebezium: + case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro, config.ProtocolSimple, + config.ProtocolDebezium, config.ProtocolDebeziumAvro: default: return } diff --git a/downstreamadapter/sink/helper/helper.go b/downstreamadapter/sink/helper/helper.go index 47cd949220..11dd7b85de 100644 --- a/downstreamadapter/sink/helper/helper.go +++ b/downstreamadapter/sink/helper/helper.go @@ -100,7 +100,7 @@ func GetProtocol(protocolStr string) (config.Protocol, error) { // GetFileExtension returns the extension for specific protocol func GetFileExtension(protocol config.Protocol) string { switch protocol { - case config.ProtocolAvro, config.ProtocolCanalJSON, config.ProtocolMaxwell, + case config.ProtocolAvro, config.ProtocolDebeziumAvro, config.ProtocolCanalJSON, config.ProtocolMaxwell, config.ProtocolOpen, config.ProtocolSimple: return ".json" case config.ProtocolCraft: diff --git a/downstreamadapter/sink/kafka/helper.go b/downstreamadapter/sink/kafka/helper.go index de6ce2fb71..4b83310595 100644 --- a/downstreamadapter/sink/kafka/helper.go +++ b/downstreamadapter/sink/kafka/helper.go @@ -77,8 +77,9 @@ func newKafkaSinkComponentWithFactory(ctx context.Context, return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaNewProducer, err) } + isAvroLike := protocol == config.ProtocolAvro || protocol == config.ProtocolDebeziumAvro kafkaComponent.eventRouter, err = eventrouter.NewEventRouter( - sinkConfig, topic, false, protocol == config.ProtocolAvro) + sinkConfig, topic, false, isAvroLike) if err != nil { return kafkaComponent, protocol, errors.Trace(err) } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index c0f9342953..dca4d50572 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -144,7 +144,8 @@ type SinkConfig struct { DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"` - // SchemaRegistry is only available when the downstream is MQ using avro protocol. + // SchemaRegistry is only available when the downstream is MQ using avro protocol + // or debezium protocol with Confluent Avro encoding. SchemaRegistry *string `toml:"schema-registry" json:"schema-registry,omitempty"` // EncoderConcurrency is only available when the downstream is MQ. EncoderConcurrency *int `toml:"encoder-concurrency" json:"encoder-concurrency,omitempty"` @@ -965,7 +966,7 @@ func (s *SinkConfig) ValidateProtocol(scheme string) error { if s.OpenProtocol != nil { outputOldValue = s.OpenProtocol.OutputOldValue } - case ProtocolDebezium: + case ProtocolDebezium, ProtocolDebeziumAvro: if s.Debezium != nil { outputOldValue = s.Debezium.OutputOldValue } diff --git a/pkg/config/sink_protocol.go b/pkg/config/sink_protocol.go index c9b9a7fd5a..113d66eab9 100644 --- a/pkg/config/sink_protocol.go +++ b/pkg/config/sink_protocol.go @@ -41,6 +41,7 @@ const ( ProtocolCsv ProtocolDebezium ProtocolSimple + ProtocolDebeziumAvro ) // IsBatchEncode returns whether the protocol is a batch encoder. @@ -71,6 +72,8 @@ func ParseSinkProtocolFromString(protocol string) (Protocol, error) { return ProtocolCsv, nil case "debezium": return ProtocolDebezium, nil + case "debezium-avro": + return ProtocolDebeziumAvro, nil case "simple": return ProtocolSimple, nil default: @@ -101,6 +104,8 @@ func (p Protocol) String() string { return "debezium" case ProtocolSimple: return "simple" + case ProtocolDebeziumAvro: + return "debezium-avro" default: panic("unreachable") } diff --git a/pkg/config/sink_protocol_test.go b/pkg/config/sink_protocol_test.go index 27e7106b3c..05b8525fda 100644 --- a/pkg/config/sink_protocol_test.go +++ b/pkg/config/sink_protocol_test.go @@ -62,6 +62,10 @@ func TestParseSinkProtocolFromString(t *testing.T) { protocol: "open-protocol", expectedProtocolEnum: ProtocolOpen, }, + { + protocol: "debezium-avro", + expectedProtocolEnum: ProtocolDebeziumAvro, + }, } for _, tc := range testCases { @@ -109,6 +113,10 @@ func TestString(t *testing.T) { protocolEnum: ProtocolOpen, expectedProtocol: "open-protocol", }, + { + protocolEnum: ProtocolDebeziumAvro, + expectedProtocol: "debezium-avro", + }, } for _, tc := range testCases { @@ -151,6 +159,10 @@ func TestIsBatchEncoder(t *testing.T) { protocolEnum: ProtocolOpen, expect: true, }, + { + protocolEnum: ProtocolDebeziumAvro, + expect: false, + }, } for _, tc := range testCases { diff --git a/pkg/sink/codec/builder.go b/pkg/sink/codec/builder.go index 8a17146921..e003710bfb 100644 --- a/pkg/sink/codec/builder.go +++ b/pkg/sink/codec/builder.go @@ -41,6 +41,8 @@ func NewEventEncoder(ctx context.Context, cfg *common.Config) (common.EventEncod return canal.NewJSONRowEventEncoder(ctx, cfg) case config.ProtocolDebezium: return debezium.NewBatchEncoder(cfg, config.GetGlobalServerConfig().ClusterID), nil + case config.ProtocolDebeziumAvro: + return debezium.NewAvroBatchEncoder(ctx, cfg, config.GetGlobalServerConfig().ClusterID) case config.ProtocolSimple: return simple.NewEncoder(ctx, cfg) default: @@ -67,6 +69,8 @@ func NewEventDecoder( return simple.NewDecoder(ctx, codecConfig, upstreamTiDB) case config.ProtocolDebezium: return debezium.NewDecoder(codecConfig, idx, upstreamTiDB), nil + case config.ProtocolDebeziumAvro: + return debezium.NewAvroDecoder(ctx, codecConfig, idx, upstreamTiDB) default: } log.Panic("Protocol not supported", zap.Any("Protocol", codecConfig.Protocol)) diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 83486f3da3..bb3d9fcbc6 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -55,7 +55,8 @@ type Config struct { OutputRowKey bool - // avro only + // avro and debezium-avro only + // protocol when Confluent Avro encoding is enabled. AvroConfluentSchemaRegistry string AvroDecimalHandlingMode string AvroBigintUnsignedHandlingMode string @@ -236,9 +237,10 @@ func (c *Config) Apply(sinkURI *url.URL, sinkConfig *config.SinkConfig) error { sinkConfig.KafkaConfig.GlueSchemaRegistryConfig != nil { c.AvroGlueSchemaRegistry = sinkConfig.KafkaConfig.GlueSchemaRegistryConfig } - if c.Protocol == config.ProtocolAvro && util.GetOrZero(sinkConfig.ForceReplicate) { + if (c.Protocol == config.ProtocolAvro || c.Protocol == config.ProtocolDebeziumAvro) && + util.GetOrZero(sinkConfig.ForceReplicate) { return errors.ErrCodecInvalidConfig.GenWithStack( - `force-replicate must be disabled, when using avro protocol`) + `force-replicate must be disabled, when using avro or debezium-avro protocol`) } if sinkConfig != nil { @@ -353,30 +355,57 @@ func (c *Config) WithChangefeedID(id common.ChangeFeedID) *Config { // Validate the Config func (c *Config) Validate() error { if c.EnableTiDBExtension && - (c.Protocol != config.ProtocolCanalJSON && c.Protocol != config.ProtocolAvro && c.Protocol != config.ProtocolDebezium) { + (c.Protocol != config.ProtocolCanalJSON && c.Protocol != config.ProtocolAvro && + c.Protocol != config.ProtocolDebezium && c.Protocol != config.ProtocolDebeziumAvro) { log.Warn("ignore invalid config, enable-tidb-extension"+ - "only supports canal-json/avro/debezium protocol", + "only supports canal-json/avro/debezium/debezium-avro protocol", zap.Bool("enableTidbExtension", c.EnableTiDBExtension), zap.String("protocol", c.Protocol.String())) } - if c.Protocol == config.ProtocolAvro { + if c.Protocol == config.ProtocolDebezium && + (c.AvroConfluentSchemaRegistry != "" || c.AvroGlueSchemaRegistry != nil) { + return errors.ErrCodecInvalidConfig.GenWithStack( + `Debezium protocol does not support schema registry; use protocol "debezium-avro"`, + ) + } + + if c.Protocol == config.ProtocolAvro || c.Protocol == config.ProtocolDebeziumAvro { if c.AvroConfluentSchemaRegistry != "" && c.AvroGlueSchemaRegistry != nil { + protocol := "Avro" + if c.Protocol == config.ProtocolDebeziumAvro { + protocol = "Debezium Avro" + } return errors.ErrCodecInvalidConfig.GenWithStack( - `Avro protocol requires only one of "%s" or "%s" to specify the schema registry`, + `%s protocol requires only one of "%s" or "%s" to specify the schema registry`, + protocol, codecOPTAvroSchemaRegistry, coderOPTAvroGlueSchemaRegistry, ) } if c.AvroConfluentSchemaRegistry == "" && c.AvroGlueSchemaRegistry == nil { + protocol := "Avro" + if c.Protocol == config.ProtocolDebeziumAvro { + protocol = "Debezium Avro" + } return errors.ErrCodecInvalidConfig.GenWithStack( - `Avro protocol requires parameter "%s" or "%s" to specify the schema registry`, + `%s protocol requires parameter "%s" or "%s" to specify the schema registry`, + protocol, codecOPTAvroSchemaRegistry, coderOPTAvroGlueSchemaRegistry, ) } + if c.Protocol == config.ProtocolDebeziumAvro && c.AvroGlueSchemaRegistry != nil { + return errors.ErrCodecInvalidConfig.GenWithStack( + `Debezium Avro protocol only supports "%s" for Confluent Avro Schema Registry`, + codecOPTAvroSchemaRegistry, + ) + } + } + + if c.Protocol == config.ProtocolAvro { if c.AvroDecimalHandlingMode != DecimalHandlingModePrecise && c.AvroDecimalHandlingMode != DecimalHandlingModeString { return errors.ErrCodecInvalidConfig.GenWithStack( @@ -410,6 +439,28 @@ func (c *Config) Validate() error { } } + if c.Protocol == config.ProtocolDebeziumAvro { + if c.AvroDecimalHandlingMode != DecimalHandlingModePrecise && + c.AvroDecimalHandlingMode != DecimalHandlingModeString { + return errors.ErrCodecInvalidConfig.GenWithStack( + `%s value could only be "%s" or "%s"`, + codecOPTAvroDecimalHandlingMode, + DecimalHandlingModeString, + DecimalHandlingModePrecise, + ) + } + + if c.AvroBigintUnsignedHandlingMode != BigintUnsignedHandlingModeLong && + c.AvroBigintUnsignedHandlingMode != BigintUnsignedHandlingModeString { + return errors.ErrCodecInvalidConfig.GenWithStack( + `%s value could only be "%s" or "%s"`, + codecOPTAvroBigintUnsignedHandlingMode, + BigintUnsignedHandlingModeLong, + BigintUnsignedHandlingModeString, + ) + } + } + if c.MaxMessageBytes <= 0 { return errors.ErrCodecInvalidConfig.Wrap( errors.Errorf("invalid max-message-bytes %d", c.MaxMessageBytes), diff --git a/pkg/sink/codec/common/config_test.go b/pkg/sink/codec/common/config_test.go new file mode 100644 index 0000000000..474ed689d6 --- /dev/null +++ b/pkg/sink/codec/common/config_test.go @@ -0,0 +1,40 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestDebeziumAvroSchemaRegistryConfig(t *testing.T) { + t.Parallel() + + cfg := NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + require.NoError(t, cfg.Validate()) + + cfg = NewConfig(config.ProtocolDebeziumAvro) + require.ErrorContains(t, cfg.Validate(), `Debezium Avro protocol requires parameter "schema-registry"`) + + cfg = NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroGlueSchemaRegistry = &config.GlueSchemaRegistryConfig{} + require.ErrorContains(t, cfg.Validate(), `Debezium Avro protocol only supports "schema-registry"`) + + cfg = NewConfig(config.ProtocolDebezium) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + require.ErrorContains(t, cfg.Validate(), `Debezium protocol does not support schema registry`) +} diff --git a/pkg/sink/codec/debezium/avro.go b/pkg/sink/codec/debezium/avro.go new file mode 100644 index 0000000000..0fec1cc024 --- /dev/null +++ b/pkg/sink/codec/debezium/avro.go @@ -0,0 +1,589 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package debezium + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "math" + "math/big" + "strconv" + "strings" + + "github.com/linkedin/goavro/v2" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/sink/codec/common" +) + +const ( + debeziumAvroKeySchemaSuffix = "-key" + debeziumAvroValueSchemaSuffix = "-value" + + debeziumAvroConnectFieldKey = "connect.field" + debeziumAvroTiDBTypeKey = "tidb_type" + debeziumAvroDecimalName = "org.apache.kafka.connect.data.Decimal" +) + +type debeziumAvroMessage struct { + Schema *debeziumConnectSchema `json:"schema"` + Payload any `json:"payload"` +} + +type debeziumConnectSchema struct { + Type string `json:"type"` + Optional bool `json:"optional"` + Name string `json:"name"` + Version int `json:"version"` + Field string `json:"field"` + Fields []*debeziumConnectSchema `json:"fields"` + Items *debeziumConnectSchema `json:"items"` + Parameters map[string]string `json:"parameters"` + TiDBType string `json:"tidb_type"` +} + +type debeziumAvroSchemaConverter struct { + definedNames map[string]struct{} +} + +func (d *BatchEncoder) appendAvroRowChangedEvent( + ctx context.Context, + topic string, + e *commonEvent.RowEvent, +) error { + keyBuf := bytes.Buffer{} + if err := d.codec.EncodeKey(e, &keyBuf); err != nil { + return errors.Trace(err) + } + + valueBuf := bytes.Buffer{} + if err := d.codec.EncodeValue(e, &valueBuf); err != nil { + return errors.Trace(err) + } + + message, err := d.encodeAvroMessage( + ctx, + topic, + keyBuf.Bytes(), + valueBuf.Bytes(), + e.TableInfo.GetUpdateTS(), + ) + if err != nil { + return err + } + message.Callback = e.Callback + message.IncRowsCount() + + d.messages = append(d.messages, message) + return nil +} + +func (d *BatchEncoder) encodeAvroMessage( + ctx context.Context, + topic string, + keyJSON []byte, + valueJSON []byte, + schemaVersion uint64, +) (*common.Message, error) { + key, err := d.encodeAvroPayload( + ctx, + topic, + debeziumAvroKeySchemaSuffix, + keyJSON, + schemaVersion, + ) + if err != nil { + return nil, err + } + + value, err := d.encodeAvroPayload( + ctx, + topic, + debeziumAvroValueSchemaSuffix, + valueJSON, + schemaVersion, + ) + if err != nil { + return nil, err + } + + return common.NewMsg(key, value), nil +} + +func (d *BatchEncoder) encodeAvroPayload( + ctx context.Context, + topic string, + subjectSuffix string, + data []byte, + schemaVersion uint64, +) ([]byte, error) { + message, err := unmarshalDebeziumAvroMessage(data) + if err != nil { + return nil, err + } + if message.Schema == nil { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("schema is missing") + } + + converter := newDebeziumAvroSchemaConverter() + avroSchema, err := converter.toAvroSchema(message.Schema, "") + if err != nil { + return nil, err + } + schemaBytes, err := json.Marshal(avroSchema) + if err != nil { + return nil, errors.WrapError(errors.ErrAvroMarshalFailed, err) + } + + subject := debeziumAvroSubject(topic, subjectSuffix, message.Schema.Name) + avroCodec, header, err := d.schemaM.GetCachedOrRegister( + ctx, + subject, + schemaVersion, + func() (string, error) { + return string(schemaBytes), nil + }, + ) + if err != nil { + return nil, errors.Trace(err) + } + + native, err := converter.toNative(message.Schema, message.Payload, "") + if err != nil { + return nil, err + } + binaryData, err := avroCodec.BinaryFromNative(nil, native) + if err != nil { + return nil, errors.WrapError(errors.ErrAvroEncodeToBinary, err) + } + + result := make([]byte, 0, len(header)+len(binaryData)) + result = append(result, header...) + result = append(result, binaryData...) + return result, nil +} + +func unmarshalDebeziumAvroMessage(data []byte) (*debeziumAvroMessage, error) { + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.UseNumber() + + var message debeziumAvroMessage + if err := decoder.Decode(&message); err != nil { + return nil, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + return &message, nil +} + +func newDebeziumAvroSchemaConverter() *debeziumAvroSchemaConverter { + return &debeziumAvroSchemaConverter{ + definedNames: make(map[string]struct{}), + } +} + +func debeziumAvroSubject(topic string, subjectSuffix string, schemaName string) string { + if topic != "" { + return topic + subjectSuffix + } + if schemaName != "" { + return schemaName + } + return "debezium" + subjectSuffix +} + +func (c *debeziumAvroSchemaConverter) toAvroSchema( + schema *debeziumConnectSchema, + fallbackName string, +) (any, error) { + if schema == nil { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("schema is nil") + } + + switch schema.Type { + case "struct": + fullName := avroFullName(schema.Name, fallbackName) + if _, exists := c.definedNames[fullName]; exists { + return fullName, nil + } + c.definedNames[fullName] = struct{}{} + + name, namespace := splitAvroFullName(fullName) + record := map[string]any{ + "type": "record", + "name": name, + "fields": make([]any, 0, len(schema.Fields)), + } + if namespace != "" { + record["namespace"] = namespace + } + addConnectMetadata(record, schema) + + fields := record["fields"].([]any) + for _, fieldSchema := range schema.Fields { + fieldName := avroFieldName(fieldSchema.Field) + fieldType, err := c.toAvroSchema(fieldSchema, fieldName) + if err != nil { + return nil, err + } + + field := map[string]any{ + "name": fieldName, + "type": fieldType, + } + if fieldSchema.Field != "" { + field[debeziumAvroConnectFieldKey] = fieldSchema.Field + } + if fieldSchema.TiDBType != "" { + field[debeziumAvroTiDBTypeKey] = fieldSchema.TiDBType + } + if fieldSchema.Optional { + field["type"] = []any{"null", fieldType} + field["default"] = nil + } + fields = append(fields, field) + } + record["fields"] = fields + return record, nil + case "array": + if schema.Items == nil { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("array schema is missing items") + } + items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") + if err != nil { + return nil, err + } + arraySchema := map[string]any{ + "type": "array", + "items": items, + } + addConnectMetadata(arraySchema, schema) + return arraySchema, nil + default: + if isDebeziumAvroDecimalSchema(schema) { + precision, scale, err := decimalSchemaPrecisionAndScale(schema) + if err != nil { + return nil, err + } + decimalSchema := map[string]any{ + "type": "bytes", + "logicalType": "decimal", + "precision": precision, + "scale": scale, + } + addConnectMetadata(decimalSchema, schema) + return decimalSchema, nil + } + + avroType, err := connectPrimitiveToAvro(schema.Type) + if err != nil { + return nil, err + } + if !hasConnectMetadata(schema) && schema.Type != "int8" && schema.Type != "int16" { + return avroType, nil + } + primitive := map[string]any{ + "type": avroType, + } + if schema.Type == "int8" || schema.Type == "int16" { + primitive["connect.type"] = schema.Type + } + addConnectMetadata(primitive, schema) + return primitive, nil + } +} + +func (c *debeziumAvroSchemaConverter) toNative( + schema *debeziumConnectSchema, + value any, + fallbackName string, +) (any, error) { + if value == nil { + return nil, nil + } + + switch schema.Type { + case "struct": + valueMap, ok := value.(map[string]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("struct payload is not an object") + } + + native := make(map[string]any, len(schema.Fields)) + for _, fieldSchema := range schema.Fields { + fieldName := avroFieldName(fieldSchema.Field) + rawValue := valueMap[fieldSchema.Field] + if rawValue == nil && fieldSchema.Field != fieldName { + rawValue = valueMap[fieldName] + } + + fieldValue, err := c.toNative(fieldSchema, rawValue, fieldName) + if err != nil { + return nil, err + } + if fieldSchema.Optional { + if fieldValue == nil { + native[fieldName] = nil + } else { + native[fieldName] = goavro.Union( + avroUnionBranchName(fieldSchema, fieldName), + fieldValue, + ) + } + } else { + native[fieldName] = fieldValue + } + } + return native, nil + case "array": + values, ok := value.([]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("array payload is not an array") + } + native := make([]any, 0, len(values)) + for _, item := range values { + itemValue, err := c.toNative(schema.Items, item, fallbackName+"Item") + if err != nil { + return nil, err + } + if schema.Items.Optional && itemValue != nil { + itemValue = goavro.Union( + avroUnionBranchName(schema.Items, fallbackName+"Item"), + itemValue, + ) + } + native = append(native, itemValue) + } + return native, nil + case "boolean": + v, ok := value.(bool) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("boolean payload is invalid") + } + return v, nil + case "string": + v, ok := value.(string) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("string payload is invalid") + } + return v, nil + case "bytes": + if isDebeziumAvroDecimalSchema(schema) { + v, ok := value.(string) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("decimal payload is invalid") + } + rat, ok := new(big.Rat).SetString(v) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("decimal payload is invalid") + } + return rat, nil + } + + v, ok := value.(string) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("bytes payload is invalid") + } + data, err := base64.StdEncoding.DecodeString(v) + if err != nil { + return nil, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + return data, nil + case "int8", "int16", "int32": + v, err := numberToInt64(value) + if err != nil { + return nil, err + } + return int64ToInt32(v) + case "int64": + return numberToInt64(value) + case "float": + v, err := numberToFloat64(value) + if err != nil { + return nil, err + } + return float32(v), nil + case "double": + return numberToFloat64(value) + default: + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("unsupported schema type " + schema.Type) + } +} + +func connectPrimitiveToAvro(connectType string) (string, error) { + switch connectType { + case "boolean": + return "boolean", nil + case "string": + return "string", nil + case "bytes": + return "bytes", nil + case "int8", "int16", "int32": + return "int", nil + case "int64": + return "long", nil + case "float": + return "float", nil + case "double": + return "double", nil + default: + return "", errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("unsupported schema type " + connectType) + } +} + +func isDebeziumAvroDecimalSchema(schema *debeziumConnectSchema) bool { + return schema.Type == "bytes" && schema.Name == debeziumAvroDecimalName +} + +func decimalSchemaPrecisionAndScale(schema *debeziumConnectSchema) (int, int, error) { + if schema.Parameters == nil { + return 0, 0, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("decimal schema is missing parameters") + } + precision, err := strconv.Atoi(schema.Parameters["precision"]) + if err != nil { + return 0, 0, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + scale, err := strconv.Atoi(schema.Parameters["scale"]) + if err != nil { + return 0, 0, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + return precision, scale, nil +} + +func addConnectMetadata(avroSchema map[string]any, schema *debeziumConnectSchema) { + if schema.Name != "" { + avroSchema["connect.name"] = schema.Name + } + if schema.Version != 0 { + avroSchema["connect.version"] = schema.Version + } + if len(schema.Parameters) != 0 { + avroSchema["connect.parameters"] = schema.Parameters + } +} + +func hasConnectMetadata(schema *debeziumConnectSchema) bool { + return schema.Name != "" || schema.Version != 0 || len(schema.Parameters) != 0 +} + +func avroFullName(connectName string, fallbackName string) string { + if connectName != "" { + return connectName + } + if fallbackName != "" { + return avroFieldName(fallbackName) + } + return "ConnectDefault" +} + +func splitAvroFullName(fullName string) (name string, namespace string) { + idx := strings.LastIndex(fullName, ".") + if idx < 0 { + return avroFieldName(fullName), "" + } + return avroFieldName(fullName[idx+1:]), fullName[:idx] +} + +func avroFieldName(field string) string { + return common.SanitizeName(field) +} + +func avroUnionBranchName(schema *debeziumConnectSchema, fallbackName string) string { + if isDebeziumAvroDecimalSchema(schema) { + return "bytes.decimal" + } + + switch schema.Type { + case "struct": + return avroFullName(schema.Name, fallbackName) + case "array": + return "array" + case "int8", "int16", "int32": + return "int" + case "int64": + return "long" + case "float": + return "float" + case "double": + return "double" + default: + return schema.Type + } +} + +func numberToInt64(value any) (int64, error) { + switch v := value.(type) { + case json.Number: + i, err := v.Int64() + if err == nil { + return i, nil + } + f, err := v.Float64() + if err != nil { + return 0, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + return int64(f), nil + case int: + return int64(v), nil + case int32: + return int64(v), nil + case int64: + return v, nil + case uint64: + return uint64ToInt64(v) + case float64: + return int64(v), nil + default: + return 0, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("number payload is invalid") + } +} + +func int64ToInt32(value int64) (int32, error) { + if value < math.MinInt32 || value > math.MaxInt32 { + return 0, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("number payload is out of int32 range") + } + return int32(value), nil +} + +func uint64ToInt64(value uint64) (int64, error) { + if value > math.MaxInt64 { + return 0, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("number payload is out of int64 range") + } + return int64(value), nil +} + +func numberToFloat64(value any) (float64, error) { + switch v := value.(type) { + case json.Number: + f, err := v.Float64() + if err != nil { + return 0, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + return f, nil + case int: + return float64(v), nil + case int32: + return float64(v), nil + case int64: + return float64(v), nil + case uint64: + return float64(v), nil + case float32: + return float64(v), nil + case float64: + return v, nil + default: + return 0, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("number payload is invalid") + } +} diff --git a/pkg/sink/codec/debezium/avro_decoder.go b/pkg/sink/codec/debezium/avro_decoder.go new file mode 100644 index 0000000000..083fc3481d --- /dev/null +++ b/pkg/sink/codec/debezium/avro_decoder.go @@ -0,0 +1,731 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package debezium + +import ( + "context" + "database/sql" + "encoding/binary" + "encoding/json" + "io" + "math/big" + "net/http" + "strconv" + "strings" + "sync" + + "github.com/linkedin/goavro/v2" + "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/errors" + codecavro "github.com/pingcap/ticdc/pkg/sink/codec/avro" + "github.com/pingcap/ticdc/pkg/sink/codec/common" + "go.uber.org/zap" +) + +const confluentAvroHeaderLen = 5 + +type avroDecoder struct { + ctx context.Context + registryURL string + httpClient *http.Client + inner *decoder + + mu sync.RWMutex + schemas map[int]*registeredDebeziumAvroSchema +} + +type registeredDebeziumAvroSchema struct { + schema any + namedSchemas map[string]any + codec *goavro.Codec +} + +// NewAvroDecoder returns a Debezium decoder for Confluent Avro wire-format +// messages. It decodes the Avro payload and then delegates Debezium event +// semantics to the JSON decoder. +func NewAvroDecoder( + ctx context.Context, + config *common.Config, + idx int, + db *sql.DB, +) (common.Decoder, error) { + registryURL := strings.TrimRight(config.AvroConfluentSchemaRegistry, "/") + if registryURL == "" { + return nil, errors.ErrAvroSchemaAPIError.GenWithStackByArgs("schema registry URI is empty") + } + + return &avroDecoder{ + ctx: ctx, + registryURL: registryURL, + httpClient: http.DefaultClient, + inner: NewDecoder(config, idx, db).(*decoder), + schemas: make(map[int]*registeredDebeziumAvroSchema), + }, nil +} + +func (d *avroDecoder) AddKeyValue(key, value []byte) { + keyJSON, err := d.toDebeziumJSON(key) + if err != nil { + log.Panic("decode Debezium Avro key failed", zap.Error(err), zap.Int("keySize", len(key))) + } + valueJSON, err := d.toDebeziumJSON(value) + if err != nil { + log.Panic("decode Debezium Avro value failed", zap.Error(err), zap.Int("valueSize", len(value))) + } + d.inner.AddKeyValue(keyJSON, valueJSON) +} + +func (d *avroDecoder) HasNext() (common.MessageType, bool) { + return d.inner.HasNext() +} + +func (d *avroDecoder) NextResolvedEvent() uint64 { + return d.inner.NextResolvedEvent() +} + +func (d *avroDecoder) NextDMLEvent() *commonEvent.DMLEvent { + return d.inner.NextDMLEvent() +} + +func (d *avroDecoder) NextDDLEvent() *commonEvent.DDLEvent { + return d.inner.NextDDLEvent() +} + +func (d *avroDecoder) toDebeziumJSON(data []byte) ([]byte, error) { + payload, schema, err := d.decodeConfluentAvroMessage(data) + if err != nil { + return nil, err + } + message := map[string]any{ + "schema": schema, + "payload": payload, + } + result, err := json.Marshal(message) + if err != nil { + return nil, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + return result, nil +} + +func (d *avroDecoder) decodeConfluentAvroMessage(data []byte) (any, map[string]any, error) { + if len(data) == 0 { + return nil, nil, errors.ErrDebeziumEmptyValueMessage.GenWithStackByArgs() + } + if len(data) < confluentAvroHeaderLen { + return nil, nil, errors.ErrAvroInvalidMessage.GenWithStackByArgs("confluent header is too short") + } + if data[0] != 0 { + return nil, nil, errors.ErrAvroInvalidMessage.GenWithStackByArgs("invalid confluent magic byte") + } + + schemaID := int(binary.BigEndian.Uint32(data[1:confluentAvroHeaderLen])) + registeredSchema, err := d.getSchema(schemaID) + if err != nil { + return nil, nil, err + } + + native, _, err := registeredSchema.codec.NativeFromBinary(data[confluentAvroHeaderLen:]) + if err != nil { + return nil, nil, errors.WrapError(errors.ErrAvroInvalidMessage, err) + } + + payload, err := avroNativeToConnectPayload( + registeredSchema.schema, + native, + registeredSchema.namedSchemas, + ) + if err != nil { + return nil, nil, err + } + schema, err := avroSchemaToConnectSchema( + registeredSchema.schema, + "", + nil, + registeredSchema.namedSchemas, + ) + if err != nil { + return nil, nil, err + } + return payload, schema, nil +} + +func (d *avroDecoder) getSchema(schemaID int) (*registeredDebeziumAvroSchema, error) { + d.mu.RLock() + schema, ok := d.schemas[schemaID] + d.mu.RUnlock() + if ok { + return schema, nil + } + + uri := d.registryURL + "/schemas/ids/" + strconv.Itoa(schemaID) + req, err := http.NewRequestWithContext(d.ctx, http.MethodGet, uri, nil) + if err != nil { + return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) + } + req.Header.Add( + "Accept", + "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, "+ + "application/json", + ) + + resp, err := d.httpClient.Do(req) + if err != nil { + return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) + } + if resp.StatusCode != http.StatusOK { + return nil, errors.ErrAvroSchemaAPIError.GenWithStackByArgs( + "failed to query schema id " + strconv.Itoa(schemaID)) + } + + var lookupResp struct { + Schema string `json:"schema"` + } + if err := json.Unmarshal(body, &lookupResp); err != nil { + return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) + } + + codec, err := codecavro.GenCodec(lookupResp.Schema) + if err != nil { + return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) + } + + decoder := json.NewDecoder(strings.NewReader(lookupResp.Schema)) + decoder.UseNumber() + var schemaDef any + if err := decoder.Decode(&schemaDef); err != nil { + return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) + } + + namedSchemas := make(map[string]any) + collectAvroNamedSchemas(schemaDef, namedSchemas) + + schema = ®isteredDebeziumAvroSchema{ + schema: schemaDef, + namedSchemas: namedSchemas, + codec: codec, + } + d.mu.Lock() + d.schemas[schemaID] = schema + d.mu.Unlock() + return schema, nil +} + +func avroNativeToConnectPayload(schema any, value any, namedSchemas map[string]any) (any, error) { + switch typedSchema := schema.(type) { + case []any: + if value == nil { + return nil, nil + } + branchSchema, branchValue, err := avroUnionBranch(typedSchema, value) + if err != nil { + return nil, err + } + return avroNativeToConnectPayload(branchSchema, branchValue, namedSchemas) + case map[string]any: + rawType, ok := typedSchema["type"] + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro schema is missing type") + } + if unionType, ok := rawType.([]any); ok { + return avroNativeToConnectPayload(unionType, value, namedSchemas) + } + typeName, ok := rawType.(string) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro schema type is invalid") + } + switch typeName { + case "record": + valueMap, ok := value.(map[string]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro record payload is invalid") + } + fields, ok := typedSchema["fields"].([]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro record schema is missing fields") + } + result := make(map[string]any, len(fields)) + for _, rawField := range fields { + field, ok := rawField.(map[string]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro field schema is invalid") + } + avroFieldName, ok := field["name"].(string) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro field is missing name") + } + connectFieldName := avroConnectFieldName(field, avroFieldName) + rawValue, exists := valueMap[avroFieldName] + if !exists && connectFieldName != avroFieldName { + rawValue, exists = valueMap[connectFieldName] + } + if !exists { + rawValue, exists = avroMissingFieldValue(field) + } + if !exists { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs( + "avro record payload is missing field " + avroFieldName) + } + fieldValue, err := avroNativeToConnectPayload( + field["type"], + rawValue, + namedSchemas, + ) + if err != nil { + return nil, err + } + result[connectFieldName] = fieldValue + } + return result, nil + case "array": + items, ok := typedSchema["items"] + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro array schema is missing items") + } + if value == nil { + return []any{}, nil + } + values, ok := value.([]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro array payload is invalid") + } + result := make([]any, 0, len(values)) + for _, item := range values { + itemValue, err := avroNativeToConnectPayload(items, item, namedSchemas) + if err != nil { + return nil, err + } + result = append(result, itemValue) + } + return result, nil + case "bytes": + if avroSchemaIsDecimal(typedSchema) { + return avroDecimalNativeToString(typedSchema, value) + } + return value, nil + default: + return value, nil + } + case string: + if namedSchema, ok := namedSchemas[typedSchema]; ok { + return avroNativeToConnectPayload(namedSchema, value, namedSchemas) + } + return value, nil + default: + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro schema is invalid") + } +} + +func avroSchemaToConnectSchema( + schema any, + fieldName string, + fieldMeta map[string]any, + namedSchemas map[string]any, +) (map[string]any, error) { + switch typedSchema := schema.(type) { + case []any: + branchSchema, _, err := avroNonNullUnionBranch(typedSchema) + if err != nil { + return nil, err + } + connectSchema, err := avroSchemaToConnectSchema( + branchSchema, + fieldName, + fieldMeta, + namedSchemas, + ) + if err != nil { + return nil, err + } + connectSchema["optional"] = true + return connectSchema, nil + case map[string]any: + rawType, ok := typedSchema["type"] + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro schema is missing type") + } + if unionType, ok := rawType.([]any); ok { + return avroSchemaToConnectSchema(unionType, fieldName, fieldMeta, namedSchemas) + } + typeName, ok := rawType.(string) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro schema type is invalid") + } + switch typeName { + case "record": + connectSchema := newConnectSchema("struct", false, fieldName, typedSchema, fieldMeta) + fields, ok := typedSchema["fields"].([]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro record schema is missing fields") + } + connectFields := make([]any, 0, len(fields)) + for _, rawField := range fields { + field, ok := rawField.(map[string]any) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro field schema is invalid") + } + avroFieldName, ok := field["name"].(string) + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro field is missing name") + } + fieldSchema, err := avroSchemaToConnectSchema( + field["type"], + avroConnectFieldName(field, avroFieldName), + field, + namedSchemas, + ) + if err != nil { + return nil, err + } + connectFields = append(connectFields, fieldSchema) + } + connectSchema["fields"] = connectFields + return connectSchema, nil + case "array": + connectSchema := newConnectSchema("array", false, fieldName, typedSchema, fieldMeta) + items, ok := typedSchema["items"] + if !ok { + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro array schema is missing items") + } + connectItems, err := avroSchemaToConnectSchema(items, "", nil, namedSchemas) + if err != nil { + return nil, err + } + connectSchema["items"] = connectItems + return connectSchema, nil + default: + connectType, err := avroPrimitiveToConnectType(typeName, typedSchema) + if err != nil { + return nil, err + } + return newConnectSchema(connectType, false, fieldName, typedSchema, fieldMeta), nil + } + case string: + if namedSchema, ok := namedSchemas[typedSchema]; ok { + return avroSchemaToConnectSchema(namedSchema, fieldName, fieldMeta, namedSchemas) + } + connectType, err := avroPrimitiveToConnectType(typedSchema, nil) + if err != nil { + return nil, err + } + return newConnectSchema(connectType, false, fieldName, nil, fieldMeta), nil + default: + return nil, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro schema is invalid") + } +} + +func collectAvroNamedSchemas(schema any, namedSchemas map[string]any) { + switch typedSchema := schema.(type) { + case []any: + for _, branch := range typedSchema { + collectAvroNamedSchemas(branch, namedSchemas) + } + case map[string]any: + rawType := typedSchema["type"] + if unionType, ok := rawType.([]any); ok { + collectAvroNamedSchemas(unionType, namedSchemas) + return + } + typeName, _ := rawType.(string) + switch typeName { + case "record": + name := avroBranchName(typedSchema) + if name != "" { + namedSchemas[name] = typedSchema + shortName := avroShortBranchName(name) + if _, exists := namedSchemas[shortName]; shortName != "" && !exists { + namedSchemas[shortName] = typedSchema + } + } + fields, _ := typedSchema["fields"].([]any) + for _, rawField := range fields { + field, ok := rawField.(map[string]any) + if !ok { + continue + } + collectAvroNamedSchemas(field["type"], namedSchemas) + } + case "array": + collectAvroNamedSchemas(typedSchema["items"], namedSchemas) + } + } +} + +func newConnectSchema( + connectType string, + optional bool, + fieldName string, + schemaMeta map[string]any, + fieldMeta map[string]any, +) map[string]any { + connectSchema := map[string]any{ + "type": connectType, + "optional": optional, + } + if fieldName != "" { + connectSchema["field"] = fieldName + } + addConnectSchemaMetadata(connectSchema, schemaMeta) + addConnectFieldMetadata(connectSchema, fieldMeta) + return connectSchema +} + +func addConnectSchemaMetadata(connectSchema map[string]any, schemaMeta map[string]any) { + if schemaMeta == nil { + return + } + if name, ok := schemaMeta["connect.name"].(string); ok && name != "" { + connectSchema["name"] = name + } + if version, ok := schemaMeta["connect.version"]; ok { + connectSchema["version"] = version + } + if parameters, ok := schemaMeta["connect.parameters"].(map[string]any); ok { + connectSchema["parameters"] = parameters + } + if tidbType, ok := schemaMeta[debeziumAvroTiDBTypeKey].(string); ok && tidbType != "" { + connectSchema[debeziumAvroTiDBTypeKey] = tidbType + } +} + +func addConnectFieldMetadata(connectSchema map[string]any, fieldMeta map[string]any) { + if fieldMeta == nil { + return + } + if tidbType, ok := fieldMeta[debeziumAvroTiDBTypeKey].(string); ok && tidbType != "" { + connectSchema[debeziumAvroTiDBTypeKey] = tidbType + } +} + +func avroPrimitiveToConnectType(avroType string, schemaMeta map[string]any) (string, error) { + if schemaMeta != nil { + if connectType, ok := schemaMeta["connect.type"].(string); ok && connectType != "" { + return connectType, nil + } + } + switch avroType { + case "boolean": + return "boolean", nil + case "string": + return "string", nil + case "bytes": + return "bytes", nil + case "int": + return "int32", nil + case "long": + return "int64", nil + case "float": + return "float", nil + case "double": + return "double", nil + default: + return "", errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("unsupported avro type " + avroType) + } +} + +func avroSchemaIsDecimal(schema map[string]any) bool { + typeName, _ := schema["type"].(string) + logicalType, _ := schema["logicalType"].(string) + return typeName == "bytes" && logicalType == "decimal" +} + +func avroDecimalNativeToString(schema map[string]any, value any) (string, error) { + scale, err := avroDecimalScale(schema) + if err != nil { + return "", err + } + switch v := value.(type) { + case *big.Rat: + return v.FloatString(scale), nil + case big.Rat: + return v.FloatString(scale), nil + case string: + return v, nil + default: + return "", errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("decimal payload is invalid") + } +} + +func avroDecimalScale(schema map[string]any) (int, error) { + switch scale := schema["scale"].(type) { + case float64: + return int(scale), nil + case int: + return scale, nil + case int32: + return int(scale), nil + case int64: + return int(scale), nil + case json.Number: + value, err := scale.Int64() + if err != nil { + return 0, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) + } + return int(value), nil + default: + return 0, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("decimal schema is missing scale") + } +} + +func avroUnionBranch(union []any, value any) (any, any, error) { + if value == nil { + return nil, nil, nil + } + var wrappedBranchName string + var wrappedBranchValue any + hasWrappedBranch := false + if branchValueMap, ok := value.(map[string]any); ok && len(branchValueMap) == 1 { + for branchName, branchValue := range branchValueMap { + wrappedBranchName = branchName + wrappedBranchValue = branchValue + hasWrappedBranch = true + for _, branchSchema := range union { + if avroBranchName(branchSchema) == branchName { + return branchSchema, branchValue, nil + } + } + } + } + + branchSchema, isSingleNonNullBranch, err := avroNonNullUnionBranch(union) + if err != nil { + return nil, nil, err + } + if hasWrappedBranch && + isSingleNonNullBranch && + avroShortBranchName(branchSchema) == avroShortBranchName(wrappedBranchName) { + return branchSchema, wrappedBranchValue, nil + } + return branchSchema, value, nil +} + +func avroNonNullUnionBranch(union []any) (any, bool, error) { + var result any + count := 0 + for _, branch := range union { + if avroBranchName(branch) != "null" { + if count == 0 { + result = branch + } + count++ + } + } + if count > 0 { + return result, count == 1, nil + } + return nil, false, errors.ErrDebeziumInvalidMessage.GenWithStackByArgs("avro union has no non-null branch") +} + +func avroBranchName(schema any) string { + switch typedSchema := schema.(type) { + case string: + return typedSchema + case map[string]any: + typeName, _ := typedSchema["type"].(string) + switch typeName { + case "record": + name, _ := typedSchema["name"].(string) + namespace, _ := typedSchema["namespace"].(string) + if namespace != "" && name != "" { + return namespace + "." + name + } + return name + case "array": + return "array" + default: + if avroSchemaIsDecimal(typedSchema) { + return "bytes.decimal" + } + return typeName + } + default: + return "" + } +} + +func avroShortBranchName(schema any) string { + switch typedSchema := schema.(type) { + case string: + if idx := strings.LastIndex(typedSchema, "."); idx >= 0 { + return typedSchema[idx+1:] + } + return typedSchema + case map[string]any: + typeName, _ := typedSchema["type"].(string) + if typeName == "record" { + name, _ := typedSchema["name"].(string) + return name + } + return avroBranchName(schema) + default: + return "" + } +} + +func avroFieldAllowsMissing(field map[string]any) bool { + if _, hasDefault := field["default"]; hasDefault { + return true + } + return avroSchemaAllowsNull(field["type"]) +} + +func avroMissingFieldValue(field map[string]any) (any, bool) { + if avroFieldAllowsMissing(field) { + return nil, true + } + if avroSchemaIsArray(field["type"]) { + return []any{}, true + } + return nil, false +} + +func avroSchemaIsArray(schema any) bool { + switch typedSchema := schema.(type) { + case map[string]any: + typeName, _ := typedSchema["type"].(string) + return typeName == "array" + case string: + return typedSchema == "array" + default: + return false + } +} + +func avroSchemaAllowsNull(schema any) bool { + union, ok := schema.([]any) + if !ok { + return false + } + for _, branch := range union { + if avroBranchName(branch) == "null" { + return true + } + } + return false +} + +func avroConnectFieldName(field map[string]any, fallback string) string { + if fieldName, ok := field[debeziumAvroConnectFieldKey].(string); ok && fieldName != "" { + return fieldName + } + return fallback +} diff --git a/pkg/sink/codec/debezium/avro_test.go b/pkg/sink/codec/debezium/avro_test.go new file mode 100644 index 0000000000..0fe63b6bce --- /dev/null +++ b/pkg/sink/codec/debezium/avro_test.go @@ -0,0 +1,591 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package debezium + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/pingcap/ticdc/downstreamadapter/sink/columnselector" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/sink/codec/avro" + "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/stretchr/testify/require" +) + +func TestDebeziumConfluentAvroEncodeRowEvent(t *testing.T) { + ctx := context.Background() + _, err := avro.SetupEncoderAndSchemaRegistry4Testing( + ctx, + common.NewConfig(config.ProtocolAvro), + ) + require.NoError(t, err) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + + helper := NewSQLTestHelper(t, "foo", ` + create table foo( + id int primary key, + name varchar(16), + bin varbinary(16), + price decimal(10, 4), + ubig bigint unsigned, + v bigint null + )`) + defer helper.Close() + + dmls := helper.helper.DML2Event("test", "foo", + "insert into foo values (1, 'alice', x'010203', 12.3400, 18446744073709551615, null)") + row, ok := dmls.GetNextRow() + require.True(t, ok) + + cfg := common.NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + cfg.AvroBigintUnsignedHandlingMode = common.BigintUnsignedHandlingModeString + cfg.DebeziumDisableSchema = true + cfg.TimeZone = time.UTC + + encoder, err := NewAvroBatchEncoder(ctx, cfg, "dbserver1") + require.NoError(t, err) + require.NoError(t, encoder.AppendRowChangedEvent(ctx, "dbserver1.test.foo", &commonEvent.RowEvent{ + TableInfo: helper.tableInfo, + CommitTs: 1, + Event: row, + ColumnSelector: columnselector.NewDefaultColumnSelector(), + Callback: func() {}, + })) + + messages := encoder.Build() + require.Len(t, messages, 1) + require.Equal(t, byte(0), messages[0].Key[0]) + require.Equal(t, byte(0), messages[0].Value[0]) + + key := decodeConfluentAvroForTest(t, messages[0].Key) + require.Equal(t, int32(1), unwrapAvroUnionForTest(t, key["id"], "int")) + + value := decodeConfluentAvroForTest(t, messages[0].Value) + require.Equal(t, "c", value["op"]) + require.Nil(t, value["before"]) + require.NotContains(t, value, "transaction") + require.IsType(t, int64(0), value["ts_ms"]) + + afterUnion, ok := value["after"].(map[string]any) + require.True(t, ok) + after, ok := afterUnion["dbserver1.test.foo"].(map[string]any) + require.True(t, ok) + require.Equal(t, int32(1), unwrapAvroUnionForTest(t, after["id"], "int")) + require.Equal(t, "alice", unwrapAvroUnionForTest(t, after["name"], "string")) + require.Equal(t, []byte{1, 2, 3}, unwrapAvroUnionForTest(t, after["bin"], "bytes")) + require.Equal(t, "18446744073709551615", unwrapAvroUnionForTest(t, after["ubig"], "string")) + require.Nil(t, after["v"]) + + source, ok := value["source"].(map[string]any) + require.True(t, ok) + require.Equal(t, "test", source["db"]) + require.Equal(t, "foo", source["table"]) + require.Nil(t, source["snapshot"]) + require.Nil(t, source["thread"]) + require.Equal(t, "dbserver1", source["name"]) + + valueSchema := decodeConfluentAvroSchemaForTest(t, messages[0].Value) + require.Contains(t, valueSchema, `"name":"fooEnvelope"`) + require.Contains(t, valueSchema, `"name":"foo"`) + require.Contains(t, valueSchema, `"name":"Source"`) + require.Contains(t, valueSchema, `"logicalType":"decimal"`) + require.NotContains(t, valueSchema, `"field":"transaction"`) +} + +func TestDebeziumConfluentAvroDecodeRowEvent(t *testing.T) { + ctx := context.Background() + _, err := avro.SetupEncoderAndSchemaRegistry4Testing( + ctx, + common.NewConfig(config.ProtocolAvro), + ) + require.NoError(t, err) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + + helper := NewSQLTestHelper(t, "foo", ` + create table foo( + id int primary key, + name varchar(16), + bin varbinary(16), + price decimal(10, 4), + ubig bigint unsigned, + v bigint null + )`) + defer helper.Close() + + dmls := helper.helper.DML2Event("test", "foo", + "insert into foo values (1, 'alice', x'010203', 12.3400, 18446744073709551615, null)") + row, ok := dmls.GetNextRow() + require.True(t, ok) + + cfg := common.NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + cfg.AvroBigintUnsignedHandlingMode = common.BigintUnsignedHandlingModeString + cfg.EnableTiDBExtension = true + cfg.TimeZone = time.UTC + + commitTs := uint64(123) + encoder, err := NewAvroBatchEncoder(ctx, cfg, "dbserver1") + require.NoError(t, err) + require.NoError(t, encoder.AppendRowChangedEvent(ctx, "dbserver1.test.foo", &commonEvent.RowEvent{ + TableInfo: helper.tableInfo, + CommitTs: commitTs, + Event: row, + ColumnSelector: columnselector.NewDefaultColumnSelector(), + Callback: func() {}, + })) + + messages := encoder.Build() + require.Len(t, messages, 1) + + decoder, err := NewAvroDecoder(ctx, cfg, 0, nil) + require.NoError(t, err) + decoder.AddKeyValue(messages[0].Key, messages[0].Value) + + messageType, hasNext := decoder.HasNext() + require.True(t, hasNext) + require.Equal(t, common.MessageTypeRow, messageType) + + decoded := decoder.NextDMLEvent() + require.Equal(t, commitTs, decoded.CommitTs) + require.Equal(t, "test", decoded.TableInfo.GetSchemaName()) + require.Equal(t, "foo", decoded.TableInfo.GetTableName()) + + change, ok := decoded.GetNextRow() + require.True(t, ok) + common.CompareRow(t, row, helper.tableInfo, change, decoded.TableInfo) +} + +func TestDebeziumConfluentAvroDecodeAccountDMLEvents(t *testing.T) { + ctx := context.Background() + _, err := avro.SetupEncoderAndSchemaRegistry4Testing( + ctx, + common.NewConfig(config.ProtocolAvro), + ) + require.NoError(t, err) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + + helper := NewSQLTestHelper(t, "tp_account", ` + create table tp_account( + id int primary key, + account_id int not null + )`) + defer helper.Close() + + insertDML := helper.helper.DML2Event("test", "tp_account", + "insert into tp_account values (12, 34)") + updateDML, _ := helper.helper.DML2UpdateEvent("test", "tp_account", + "insert into tp_account values (13, 34)", + "update tp_account set account_id = 35 where id = 13") + deleteDML := helper.helper.DML2DeleteEvent("test", "tp_account", + "insert into tp_account values (14, 34)", + "delete from tp_account where id = 14") + + cfg := common.NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + cfg.EnableTiDBExtension = true + cfg.TimeZone = time.UTC + + encoder, err := NewAvroBatchEncoder(ctx, cfg, "dbserver1") + require.NoError(t, err) + + rows := make([]commonEvent.RowChange, 0, 3) + for _, dml := range []*commonEvent.DMLEvent{insertDML, updateDML, deleteDML} { + row, ok := dml.GetNextRow() + if !ok { + continue + } + rows = append(rows, row) + require.NoError(t, encoder.AppendRowChangedEvent(ctx, "dbserver1.test.tp_account", &commonEvent.RowEvent{ + TableInfo: helper.tableInfo, + CommitTs: 1, + Event: row, + ColumnSelector: columnselector.NewDefaultColumnSelector(), + Callback: func() {}, + })) + } + require.Len(t, rows, 3) + + messages := encoder.Build() + require.Len(t, messages, 3) + for idx, message := range messages { + decoder, err := NewAvroDecoder(ctx, cfg, 0, nil) + require.NoError(t, err) + decoder.AddKeyValue(message.Key, message.Value) + + messageType, hasNext := decoder.HasNext() + require.True(t, hasNext) + require.Equal(t, common.MessageTypeRow, messageType) + + decoded := decoder.NextDMLEvent() + require.Equal(t, "test", decoded.TableInfo.GetSchemaName()) + require.Equal(t, "tp_account", decoded.TableInfo.GetTableName()) + + change, ok := decoded.GetNextRow() + require.True(t, ok) + common.CompareRow(t, rows[idx], helper.tableInfo, change, decoded.TableInfo) + } +} + +func TestDebeziumConfluentAvroDecodeShortNamedUnionBranch(t *testing.T) { + valueSchema := map[string]any{ + "type": "record", + "name": "Value", + "namespace": "dbserver1.test.tp_account", + "fields": []any{ + map[string]any{ + "name": "id", + "type": "int", + debeziumAvroConnectFieldKey: "id", + }, + map[string]any{ + "name": "account_id", + "type": "int", + debeziumAvroConnectFieldKey: "account_id", + }, + }, + } + namedSchemas := map[string]any{ + "dbserver1.test.tp_account.Value": valueSchema, + } + + payload, err := avroNativeToConnectPayload( + []any{"null", "dbserver1.test.tp_account.Value"}, + map[string]any{ + "Value": map[string]any{ + "id": int32(12), + "account_id": int32(34), + }, + }, + namedSchemas, + ) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "id": int32(12), + "account_id": int32(34), + }, payload) +} + +func TestDebeziumConfluentAvroDecodeFullNamedWrapperForShortUnionBranch(t *testing.T) { + valueSchema := map[string]any{ + "type": "record", + "name": "Value", + "namespace": "default.test.tp_account", + "fields": []any{ + map[string]any{ + "name": "id", + "type": "int", + debeziumAvroConnectFieldKey: "id", + }, + map[string]any{ + "name": "account_id", + "type": "int", + debeziumAvroConnectFieldKey: "account_id", + }, + }, + } + envelopeSchema := map[string]any{ + "type": "record", + "name": "Envelope", + "namespace": "default.test.tp_account", + "fields": []any{ + map[string]any{ + "name": "after", + "type": []any{"null", "Value"}, + }, + }, + } + namedSchemas := map[string]any{} + collectAvroNamedSchemas(valueSchema, namedSchemas) + collectAvroNamedSchemas(envelopeSchema, namedSchemas) + + payload, err := avroNativeToConnectPayload( + []any{"null", "Value"}, + map[string]any{ + "default.test.tp_account.Value": map[string]any{ + "id": int32(12), + "account_id": int32(34), + }, + }, + namedSchemas, + ) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "id": int32(12), + "account_id": int32(34), + }, payload) +} + +func TestDebeziumConfluentAvroDecodeSingleFieldUnionRecord(t *testing.T) { + valueSchema := map[string]any{ + "type": "record", + "name": "Value", + "namespace": "dbserver1.test.only_pk", + "fields": []any{ + map[string]any{ + "name": "id", + "type": "int", + debeziumAvroConnectFieldKey: "id", + }, + }, + } + namedSchemas := map[string]any{ + "dbserver1.test.only_pk.Value": valueSchema, + } + + payload, err := avroNativeToConnectPayload( + []any{"null", "dbserver1.test.only_pk.Value"}, + map[string]any{ + "id": int32(12), + }, + namedSchemas, + ) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "id": int32(12), + }, payload) +} + +func TestDebeziumConfluentAvroDecodeMissingRecordField(t *testing.T) { + valueSchema := map[string]any{ + "type": "record", + "name": "Value", + "namespace": "dbserver1.test.tp_account", + "fields": []any{ + map[string]any{ + "name": "id", + "type": "int", + debeziumAvroConnectFieldKey: "id", + }, + map[string]any{ + "name": "account_id", + "type": "int", + debeziumAvroConnectFieldKey: "account_id", + }, + }, + } + + _, err := avroNativeToConnectPayload( + valueSchema, + map[string]any{ + "id": int32(12), + }, + nil, + ) + require.Error(t, err) + require.Contains(t, err.Error(), "avro record payload is missing field account_id") +} + +func TestDebeziumConfluentAvroDecodeMissingOptionalRecordField(t *testing.T) { + valueSchema := map[string]any{ + "type": "record", + "name": "Table", + "namespace": "io.debezium.connector.schema", + "fields": []any{ + map[string]any{ + "name": "defaultCharsetName", + "type": []any{"null", "string"}, + "default": nil, + }, + map[string]any{ + "name": "columns", + "type": map[string]any{ + "type": "array", + "items": "string", + }, + }, + }, + } + + payload, err := avroNativeToConnectPayload( + valueSchema, + map[string]any{ + "columns": []any{"id"}, + }, + nil, + ) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "defaultCharsetName": nil, + "columns": []any{"id"}, + }, payload) +} + +func TestDebeziumConfluentAvroDecodeMissingArrayRecordField(t *testing.T) { + valueSchema := map[string]any{ + "type": "record", + "name": "Table", + "namespace": "io.debezium.connector.schema", + "fields": []any{ + map[string]any{ + "name": "columns", + "type": map[string]any{ + "type": "array", + "items": "string", + }, + }, + }, + } + + payload, err := avroNativeToConnectPayload( + valueSchema, + map[string]any{}, + nil, + ) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "columns": []any{}, + }, payload) +} + +func TestDebeziumConfluentAvroDoesNotEncodeDDLEvent(t *testing.T) { + ctx := context.Background() + _, err := avro.SetupEncoderAndSchemaRegistry4Testing( + ctx, + common.NewConfig(config.ProtocolAvro), + ) + require.NoError(t, err) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + + cfg := common.NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + cfg.EnableTiDBExtension = true + cfg.TimeZone = time.UTC + + encoder, err := NewAvroBatchEncoder(ctx, cfg, "dbserver1") + require.NoError(t, err) + + routedDDL := common.NewRoutedDDLEvent4Test() + message, err := encoder.EncodeDDLEvent(routedDDL) + require.NoError(t, err) + require.Nil(t, message) +} + +func TestDebeziumConfluentAvroEncodeCheckpointEvent(t *testing.T) { + ctx := context.Background() + _, err := avro.SetupEncoderAndSchemaRegistry4Testing( + ctx, + common.NewConfig(config.ProtocolAvro), + ) + require.NoError(t, err) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + + cfg := common.NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + cfg.EnableTiDBExtension = true + cfg.AvroEnableWatermark = true + cfg.TimeZone = time.UTC + + encoder, err := NewAvroBatchEncoder(ctx, cfg, "dbserver1") + require.NoError(t, err) + + message, err := encoder.EncodeCheckpointEvent(100) + require.NoError(t, err) + require.NotNil(t, message) + + decoder, err := NewAvroDecoder(ctx, cfg, 0, nil) + require.NoError(t, err) + decoder.AddKeyValue(message.Key, message.Value) + + messageType, hasNext := decoder.HasNext() + require.True(t, hasNext) + require.Equal(t, common.MessageTypeResolved, messageType) + require.Equal(t, uint64(100), decoder.NextResolvedEvent()) +} + +func TestDebeziumConfluentAvroDoesNotEncodeCheckpointEventByDefault(t *testing.T) { + ctx := context.Background() + _, err := avro.SetupEncoderAndSchemaRegistry4Testing( + ctx, + common.NewConfig(config.ProtocolAvro), + ) + require.NoError(t, err) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + + cfg := common.NewConfig(config.ProtocolDebeziumAvro) + cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081" + cfg.EnableTiDBExtension = true + cfg.TimeZone = time.UTC + + encoder, err := NewAvroBatchEncoder(ctx, cfg, "dbserver1") + require.NoError(t, err) + + message, err := encoder.EncodeCheckpointEvent(100) + require.NoError(t, err) + require.Nil(t, message) +} + +func decodeConfluentAvroForTest(t *testing.T, data []byte) map[string]any { + t.Helper() + + schema, binaryData := decodeConfluentAvroEnvelopeForTest(t, data) + codec, err := avro.GenCodec(schema) + require.NoError(t, err) + + native, _, err := codec.NativeFromBinary(binaryData) + require.NoError(t, err) + + result, ok := native.(map[string]any) + require.True(t, ok) + return result +} + +func decodeConfluentAvroSchemaForTest(t *testing.T, data []byte) string { + t.Helper() + + schema, _ := decodeConfluentAvroEnvelopeForTest(t, data) + return schema +} + +func decodeConfluentAvroEnvelopeForTest(t *testing.T, data []byte) (string, []byte) { + t.Helper() + + require.GreaterOrEqual(t, len(data), 5) + require.Equal(t, byte(0), data[0]) + schemaID := int(binary.BigEndian.Uint32(data[1:5])) + binaryData := data[5:] + + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:8081/schemas/ids/%d", schemaID)) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var schemaResp struct { + Schema string `json:"schema"` + } + require.NoError(t, json.Unmarshal(body, &schemaResp)) + + return schemaResp.Schema, binaryData +} + +func unwrapAvroUnionForTest(t *testing.T, value any, branch string) any { + t.Helper() + + union, ok := value.(map[string]any) + require.True(t, ok) + result, ok := union[branch] + require.True(t, ok) + return result +} diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index 9f7853bb33..1ceea135e8 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "fmt" "io" + "math" "strconv" "strings" "time" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/log" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/pingcap/ticdc/pkg/util" @@ -42,6 +44,73 @@ type dbzCodec struct { nowFunc func() time.Time } +func (c *dbzCodec) isDebeziumAvro() bool { + return c.config.Protocol == config.ProtocolDebeziumAvro +} + +func (c *dbzCodec) debeziumAvroNamespace(schema string) string { + return fmt.Sprintf("%s.%s", + common.SanitizeName(c.clusterID), + common.SanitizeName(schema)) +} + +func (c *dbzCodec) debeziumAvroTableName(table string) string { + return common.SanitizeName(table) +} + +func (c *dbzCodec) keySchemaName(schema string, table string) string { + if c.isDebeziumAvro() { + return fmt.Sprintf("%s.%sKey", + c.debeziumAvroNamespace(schema), + c.debeziumAvroTableName(table)) + } + return fmt.Sprintf("%s.Key", getSchemaTopicName(c.clusterID, schema, table)) +} + +func (c *dbzCodec) envelopeSchemaName(schema string, table string) string { + if c.isDebeziumAvro() { + return fmt.Sprintf("%s.%sEnvelope", + c.debeziumAvroNamespace(schema), + c.debeziumAvroTableName(table)) + } + return fmt.Sprintf("%s.Envelope", getSchemaTopicName(c.clusterID, schema, table)) +} + +func (c *dbzCodec) valueSchemaName(schema string, table string) string { + if c.isDebeziumAvro() { + return fmt.Sprintf("%s.%s", + c.debeziumAvroNamespace(schema), + c.debeziumAvroTableName(table)) + } + return fmt.Sprintf("%s.Value", getSchemaTopicName(c.clusterID, schema, table)) +} + +func (c *dbzCodec) sourceSchemaName(schema string) string { + if c.isDebeziumAvro() { + return fmt.Sprintf("%s.Source", c.debeziumAvroNamespace(schema)) + } + return "io.debezium.connector.mysql.Source" +} + +func decimalPrecisionAndScale(ft *types.FieldType) (int, int) { + defaultPrecision, defaultScale := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType()) + precision, scale := ft.GetFlen(), ft.GetDecimal() + if precision == -1 { + precision = defaultPrecision + } + if scale == -1 { + scale = defaultScale + } + return precision, scale +} + +func (c *dbzCodec) columnOptional(ft *types.FieldType) bool { + if c.isDebeziumAvro() { + return true + } + return !mysql.HasNotNullFlag(ft.GetFlag()) +} + func (c *dbzCodec) writeDebeziumFieldValues( writer *util.JSONWriter, fieldName string, @@ -119,14 +188,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } if n == 1 { writer.WriteStringField("type", "boolean") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { writer.WriteBoolField("default", v != 0) // bool } } else { writer.WriteStringField("type", "bytes") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.data.Bits") writer.WriteIntField("version", 1) writer.WriteObjectField("parameters", func() { @@ -139,15 +208,19 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: - writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + if c.isDebeziumAvro() && mysql.HasBinaryFlag(ft.GetFlag()) { + writer.WriteStringField("type", "bytes") + } else { + writer.WriteStringField("type", "string") + } + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { writer.WriteAnyField("default", col.GetDefaultValue()) } case mysql.TypeEnum: writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.data.Enum") writer.WriteIntField("version", 1) writer.WriteObjectField("parameters", func() { @@ -164,7 +237,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeSet: writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.data.EnumSet") writer.WriteIntField("version", 1) writer.WriteObjectField("parameters", func() { @@ -176,7 +249,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeDate, mysql.TypeNewDate: writer.WriteStringField("type", "int32") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.time.Date") writer.WriteIntField("version", 1) writer.WriteStringField("field", colName) @@ -206,7 +279,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeDatetime: writer.WriteStringField("type", "int64") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) if ft.GetDecimal() <= 3 { writer.WriteStringField("name", "io.debezium.time.Timestamp") } else { @@ -251,7 +324,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeTimestamp: writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.time.ZonedTimestamp") writer.WriteIntField("version", 1) writer.WriteStringField("field", colName) @@ -293,7 +366,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeDuration: writer.WriteStringField("type", "int64") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.time.MicroTime") writer.WriteIntField("version", 1) writer.WriteStringField("field", colName) @@ -310,7 +383,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeJSON: writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.data.Json") writer.WriteIntField("version", 1) writer.WriteStringField("field", colName) @@ -319,7 +392,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeTiny: // TINYINT writer.WriteStringField("type", "int16") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) @@ -338,7 +411,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } else { writer.WriteStringField("type", "int16") } - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) @@ -353,7 +426,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeInt24: // MEDIUMINT writer.WriteStringField("type", "int32") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) @@ -372,7 +445,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } else { writer.WriteStringField("type", "int32") } - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) @@ -386,8 +459,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteFloat64Field("default", floatV) } case mysql.TypeLonglong: // BIGINT - writer.WriteStringField("type", "int64") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + if c.isDebeziumAvro() && + mysql.HasUnsignedFlag(ft.GetFlag()) && + c.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString { + writer.WriteStringField("type", "string") + } else { + writer.WriteStringField("type", "int64") + } + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) @@ -406,7 +485,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } else { writer.WriteStringField("type", "float") } - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) @@ -419,11 +498,11 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } writer.WriteFloat64Field("default", floatV) } - case mysql.TypeDouble, mysql.TypeNewDecimal: + case mysql.TypeDouble: // https://dev.mysql.com/doc/refman/8.4/en/numeric-types.html // MySQL also treats REAL as a synonym for DOUBLE PRECISION (a nonstandard variation), unless the REAL_AS_FLOAT SQL mode is enabled. writer.WriteStringField("type", "double") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) @@ -436,9 +515,42 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } writer.WriteFloat64Field("default", floatV) } + case mysql.TypeNewDecimal: + if c.isDebeziumAvro() && + c.config.AvroDecimalHandlingMode == common.DecimalHandlingModePrecise { + precision, scale := decimalPrecisionAndScale(ft) + writer.WriteStringField("type", "bytes") + writer.WriteStringField("name", "org.apache.kafka.connect.data.Decimal") + writer.WriteObjectField("parameters", func() { + writer.WriteStringField("precision", strconv.Itoa(precision)) + writer.WriteStringField("scale", strconv.Itoa(scale)) + }) + } else if c.isDebeziumAvro() && + c.config.AvroDecimalHandlingMode == common.DecimalHandlingModeString { + writer.WriteStringField("type", "string") + } else { + writer.WriteStringField("type", "double") + } + writer.WriteBoolField("optional", c.columnOptional(ft)) + writer.WriteStringField("field", colName) + if col.GetDefaultValue() != nil { + v, ok := col.GetDefaultValue().(string) + if !ok { + return + } + if c.isDebeziumAvro() { + writer.WriteStringField("default", v) + return + } + floatV, err := strconv.ParseFloat(v, 64) + if err != nil { + return + } + writer.WriteFloat64Field("default", floatV) + } case mysql.TypeYear: writer.WriteStringField("type", "int32") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.time.Year") writer.WriteIntField("version", 1) writer.WriteStringField("field", colName) @@ -462,7 +574,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( } case mysql.TypeTiDBVectorFloat32: writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("name", "io.debezium.data.TiDBVectorFloat32") writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { @@ -554,6 +666,10 @@ func (c *dbzCodec) writeDebeziumFieldValue( return nil case mysql.TypeNewDecimal: + if c.isDebeziumAvro() { + writer.WriteStringField(colName, datum.GetMysqlDecimal().String()) + return nil + } v, err := datum.GetMysqlDecimal().ToFloat64() if err != nil { return errors.WrapError( @@ -711,6 +827,18 @@ func (c *dbzCodec) writeDebeziumFieldValue( isUnsigned := mysql.HasUnsignedFlag(colInfo.GetFlag()) if isUnsigned { v := datum.GetUint64() + if c.isDebeziumAvro() && ft.GetType() == mysql.TypeLonglong { + if c.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString { + writer.WriteStringField(colName, strconv.FormatUint(v, 10)) + } else { + if v > math.MaxInt64 { + return errors.ErrDebeziumEncodeFailed.GenWithStackByArgs( + fmt.Sprintf("unsigned bigint value %d overflows avro long", v)) + } + writer.WriteInt64Field(colName, int64(v)) + } + return nil + } if ft.GetType() == mysql.TypeLonglong && v == maxValue.GetUint64() || v > maxValue.GetUint64() { writer.WriteAnyField(colName, -1) } else { @@ -758,7 +886,7 @@ func (c *dbzCodec) writeBinaryField(writer *util.JSONWriter, fieldName string, v writer.WriteBase64StringField(fieldName, value) } -func (c *dbzCodec) writeSourceSchema(writer *util.JSONWriter) { +func (c *dbzCodec) writeSourceSchema(writer *util.JSONWriter, schemaName string) { writer.WriteObjectElement(func() { writer.WriteStringField("type", "struct") writer.WriteArrayField("fields", func() { @@ -785,12 +913,14 @@ func (c *dbzCodec) writeSourceSchema(writer *util.JSONWriter) { writer.WriteObjectElement(func() { writer.WriteStringField("type", "string") writer.WriteBoolField("optional", true) - writer.WriteStringField("name", "io.debezium.data.Enum") - writer.WriteIntField("version", 1) - writer.WriteObjectField("parameters", func() { - writer.WriteStringField("allowed", "true,last,false,incremental") - }) - writer.WriteStringField("default", "false") + if !c.isDebeziumAvro() { + writer.WriteStringField("name", "io.debezium.data.Enum") + writer.WriteIntField("version", 1) + writer.WriteObjectField("parameters", func() { + writer.WriteStringField("allowed", "true,last,false,incremental") + }) + writer.WriteStringField("default", "false") + } writer.WriteStringField("field", "snapshot") }) writer.WriteObjectElement(func() { @@ -798,14 +928,16 @@ func (c *dbzCodec) writeSourceSchema(writer *util.JSONWriter) { writer.WriteBoolField("optional", false) writer.WriteStringField("field", "db") }) + if !c.isDebeziumAvro() { + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", true) + writer.WriteStringField("field", "sequence") + }) + } writer.WriteObjectElement(func() { writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", true) - writer.WriteStringField("field", "sequence") - }) - writer.WriteObjectElement(func() { - writer.WriteStringField("type", "string") - writer.WriteBoolField("optional", true) + writer.WriteBoolField("optional", !c.isDebeziumAvro()) writer.WriteStringField("field", "table") }) writer.WriteObjectElement(func() { @@ -843,9 +975,21 @@ func (c *dbzCodec) writeSourceSchema(writer *util.JSONWriter) { writer.WriteBoolField("optional", true) writer.WriteStringField("field", "query") }) + if c.config.EnableTiDBExtension || c.isDebeziumAvro() { + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int64") + writer.WriteBoolField("optional", false) + writer.WriteStringField("field", "commit_ts") + }) + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", false) + writer.WriteStringField("field", "cluster_id") + }) + } }) writer.WriteBoolField("optional", false) - writer.WriteStringField("name", "io.debezium.connector.mysql.Source") + writer.WriteStringField("name", c.sourceSchemaName(schemaName)) writer.WriteStringField("field", "source") }) } @@ -879,8 +1023,7 @@ func (c *dbzCodec) EncodeKey( if !c.config.DebeziumDisableSchema { jWriter.WriteObjectField("schema", func() { jWriter.WriteStringField("type", "struct") - jWriter.WriteStringField("name", - fmt.Sprintf("%s.Key", getSchemaTopicName(c.clusterID, schemaName, tableName))) + jWriter.WriteStringField("name", c.keySchemaName(schemaName, tableName)) jWriter.WriteBoolField("optional", false) jWriter.WriteArrayField("fields", func() { columns := e.TableInfo.GetColumns() @@ -920,7 +1063,11 @@ func (c *dbzCodec) EncodeValue( // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-create-events jWriter.WriteInt64Field("ts_ms", commitTime.UnixMilli()) // snapshot field is a string of true,last,false,incremental - jWriter.WriteStringField("snapshot", "false") + if c.isDebeziumAvro() { + jWriter.WriteNullField("snapshot") + } else { + jWriter.WriteStringField("snapshot", "false") + } jWriter.WriteStringField("db", schemaName) jWriter.WriteStringField("table", tableName) jWriter.WriteInt64Field("server_id", 0) @@ -928,7 +1075,11 @@ func (c *dbzCodec) EncodeValue( jWriter.WriteStringField("file", "") jWriter.WriteInt64Field("pos", 0) jWriter.WriteInt64Field("row", 0) - jWriter.WriteInt64Field("thread", 0) + if c.isDebeziumAvro() { + jWriter.WriteNullField("thread") + } else { + jWriter.WriteInt64Field("thread", 0) + } jWriter.WriteNullField("query") // The followings are TiDB extended fields @@ -939,7 +1090,9 @@ func (c *dbzCodec) EncodeValue( // ts_ms: displays the time at which the connector processed the event // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-create-events jWriter.WriteInt64Field("ts_ms", c.nowFunc().UnixMilli()) - jWriter.WriteNullField("transaction") + if !c.isDebeziumAvro() { + jWriter.WriteNullField("transaction") + } if e.IsInsert() { // op: Mandatory string that describes the type of operation that caused the connector to generate the event. // Valid values are: @@ -979,8 +1132,7 @@ func (c *dbzCodec) EncodeValue( jWriter.WriteObjectField("schema", func() { jWriter.WriteStringField("type", "struct") jWriter.WriteBoolField("optional", false) - jWriter.WriteStringField("name", - fmt.Sprintf("%s.Envelope", getSchemaTopicName(c.clusterID, schemaName, tableName))) + jWriter.WriteStringField("name", c.envelopeSchemaName(schemaName, tableName)) jWriter.WriteIntField("version", 1) jWriter.WriteArrayField("fields", func() { // schema is the same for `before` and `after`. So we build a new buffer to @@ -1009,8 +1161,7 @@ func (c *dbzCodec) EncodeValue( jWriter.WriteObjectElement(func() { jWriter.WriteStringField("type", "struct") jWriter.WriteBoolField("optional", true) - jWriter.WriteStringField("name", - fmt.Sprintf("%s.Value", getSchemaTopicName(c.clusterID, schemaName, tableName))) + jWriter.WriteStringField("name", c.valueSchemaName(schemaName, tableName)) jWriter.WriteStringField("field", "before") jWriter.WriteArrayField("fields", func() { jWriter.WriteRaw(fieldsJSON) @@ -1019,14 +1170,13 @@ func (c *dbzCodec) EncodeValue( jWriter.WriteObjectElement(func() { jWriter.WriteStringField("type", "struct") jWriter.WriteBoolField("optional", true) - jWriter.WriteStringField("name", - fmt.Sprintf("%s.Value", getSchemaTopicName(c.clusterID, schemaName, tableName))) + jWriter.WriteStringField("name", c.valueSchemaName(schemaName, tableName)) jWriter.WriteStringField("field", "after") jWriter.WriteArrayField("fields", func() { jWriter.WriteRaw(fieldsJSON) }) }) - c.writeSourceSchema(jWriter) + c.writeSourceSchema(jWriter, schemaName) jWriter.WriteObjectElement(func() { jWriter.WriteStringField("type", "string") jWriter.WriteBoolField("optional", false) @@ -1034,33 +1184,35 @@ func (c *dbzCodec) EncodeValue( }) jWriter.WriteObjectElement(func() { jWriter.WriteStringField("type", "int64") - jWriter.WriteBoolField("optional", true) + jWriter.WriteBoolField("optional", !c.isDebeziumAvro()) jWriter.WriteStringField("field", "ts_ms") }) - jWriter.WriteObjectElement(func() { - jWriter.WriteStringField("type", "struct") - jWriter.WriteArrayField("fields", func() { - jWriter.WriteObjectElement(func() { - jWriter.WriteStringField("type", "string") - jWriter.WriteBoolField("optional", false) - jWriter.WriteStringField("field", "id") - }) - jWriter.WriteObjectElement(func() { - jWriter.WriteStringField("type", "int64") - jWriter.WriteBoolField("optional", false) - jWriter.WriteStringField("field", "total_order") - }) - jWriter.WriteObjectElement(func() { - jWriter.WriteStringField("type", "int64") - jWriter.WriteBoolField("optional", false) - jWriter.WriteStringField("field", "data_collection_order") + if !c.isDebeziumAvro() { + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "struct") + jWriter.WriteArrayField("fields", func() { + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "id") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "total_order") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "data_collection_order") + }) }) + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("name", "event.block") + jWriter.WriteIntField("version", 1) + jWriter.WriteStringField("field", "transaction") }) - jWriter.WriteBoolField("optional", true) - jWriter.WriteStringField("name", "event.block") - jWriter.WriteIntField("version", 1) - jWriter.WriteStringField("field", "transaction") - }) + } }) }) } @@ -1312,7 +1464,7 @@ func (c *dbzCodec) EncodeDDLEvent( jWriter.WriteIntField("version", 1) jWriter.WriteStringField("name", "io.debezium.connector.mysql.SchemaChangeValue") jWriter.WriteArrayField("fields", func() { - c.writeSourceSchema(jWriter) + c.writeSourceSchema(jWriter, dbName) jWriter.WriteObjectElement(func() { jWriter.WriteStringField("field", "ts_ms") jWriter.WriteBoolField("optional", false) @@ -1551,7 +1703,7 @@ func (c *dbzCodec) EncodeCheckpointEvent( fmt.Sprintf("%s.%s.Envelope", common.SanitizeName(c.clusterID), "watermark")) jWriter.WriteIntField("version", 1) jWriter.WriteArrayField("fields", func() { - c.writeSourceSchema(jWriter) + c.writeSourceSchema(jWriter, "watermark") jWriter.WriteObjectElement(func() { jWriter.WriteStringField("type", "string") jWriter.WriteBoolField("optional", false) diff --git a/pkg/sink/codec/debezium/decoder.go b/pkg/sink/codec/debezium/decoder.go index d9ddab35c6..7dc91f377a 100644 --- a/pkg/sink/codec/debezium/decoder.go +++ b/pkg/sink/codec/debezium/decoder.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "strconv" "strings" "time" @@ -346,6 +347,13 @@ func decodeColumn(value interface{}, colInfo *timodel.ColumnInfo, timeZone *time } value = types.NewDuration(0, 0, 0, int(val), types.MaxFsp) case mysql.TypeLonglong, mysql.TypeLong, mysql.TypeInt24, mysql.TypeShort, mysql.TypeTiny: + if strVal, ok := value.(string); ok && mysql.HasUnsignedFlag(colInfo.GetFlag()) { + uintVal, err := strconv.ParseUint(strVal, 10, 64) + if err != nil { + log.Panic("decode value failed", zap.Error(err), zap.String("value", util.RedactAny(value))) + } + return uintVal + } var intVal int64 intVal, err = value.(json.Number).Int64() if err != nil { @@ -371,6 +379,14 @@ func decodeColumn(value interface{}, colInfo *timodel.ColumnInfo, timeZone *time return types.NewBinaryLiteralFromUint(uint64(0), -1) } case mysql.TypeNewDecimal: + if strVal, ok := value.(string); ok { + dec := new(types.MyDecimal) + err = dec.FromString([]byte(strVal)) + if err != nil { + log.Panic("decode value failed", zap.Error(err), zap.String("value", util.RedactAny(value))) + } + return dec + } var f64 float64 f64, err = value.(json.Number).Float64() if err != nil { diff --git a/pkg/sink/codec/debezium/encoder.go b/pkg/sink/codec/debezium/encoder.go index c0f8c3d07a..2f49412e59 100644 --- a/pkg/sink/codec/debezium/encoder.go +++ b/pkg/sink/codec/debezium/encoder.go @@ -20,7 +20,9 @@ import ( "github.com/pingcap/log" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/sink/codec/avro" "github.com/pingcap/ticdc/pkg/sink/codec/common" "go.uber.org/zap" ) @@ -31,10 +33,15 @@ type BatchEncoder struct { config *common.Config codec *dbzCodec + + schemaM avro.SchemaManager } // EncodeCheckpointEvent implements the RowEventEncoder interface func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { + if d.config.Protocol == config.ProtocolDebeziumAvro && !d.config.AvroEnableWatermark { + return nil, nil + } if !d.config.EnableTiDBExtension { return nil, nil } @@ -44,6 +51,15 @@ func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) if err != nil { return nil, errors.Trace(err) } + if d.schemaM != nil { + return d.encodeAvroMessage( + context.Background(), + "", + keyMap.Bytes(), + valueBuf.Bytes(), + 0, + ) + } key, err := common.Compress( d.config.ChangefeedID, d.config.LargeMessageHandle.LargeMessageHandleCompression, @@ -66,10 +82,14 @@ func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) // AppendRowChangedEvent implements the RowEventEncoder interface func (d *BatchEncoder) AppendRowChangedEvent( - _ context.Context, - _ string, + ctx context.Context, + topic string, e *commonEvent.RowEvent, ) error { + if d.schemaM != nil { + return d.appendAvroRowChangedEvent(ctx, topic, e) + } + var key []byte var value []byte var err error @@ -93,6 +113,9 @@ func (d *BatchEncoder) AppendRowChangedEvent( // EncodeDDLEvent implements the RowEventEncoder interface // DDL message unresolved tso func (d *BatchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.Message, error) { + if d.config.Protocol == config.ProtocolDebeziumAvro { + return nil, nil + } valueBuf := bytes.Buffer{} keyMap := bytes.Buffer{} err := d.codec.EncodeDDLEvent(e, &keyMap, &valueBuf) @@ -103,6 +126,15 @@ func (d *BatchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.Message, } return nil, errors.Trace(err) } + if d.schemaM != nil { + return d.encodeAvroMessage( + context.Background(), + "", + keyMap.Bytes(), + valueBuf.Bytes(), + 0, + ) + } key, err := common.Compress( d.config.ChangefeedID, d.config.LargeMessageHandle.LargeMessageHandleCompression, @@ -180,3 +212,28 @@ func NewBatchEncoder(c *common.Config, clusterID string) common.EventEncoder { } return batch } + +func NewAvroBatchEncoder( + ctx context.Context, + c *common.Config, + clusterID string, +) (common.EventEncoder, error) { + schemaM, err := avro.NewConfluentSchemaManager(ctx, c.AvroConfluentSchemaRegistry, nil) + if err != nil { + return nil, errors.Trace(err) + } + + codecConfig := *c + codecConfig.DebeziumDisableSchema = false + batch := &BatchEncoder{ + messages: nil, + config: c, + codec: &dbzCodec{ + config: &codecConfig, + clusterID: clusterID, + nowFunc: time.Now, + }, + schemaM: schemaM, + } + return batch, nil +} diff --git a/tests/integration_tests/debezium_avro/conf/diff_config.toml b/tests/integration_tests/debezium_avro/conf/diff_config.toml new file mode 100644 index 0000000000..c241f97f87 --- /dev/null +++ b/tests/integration_tests/debezium_avro/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/debezium_avro/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/debezium_avro/data/ddl.sql b/tests/integration_tests/debezium_avro/data/ddl.sql new file mode 100644 index 0000000000..fce820e74f --- /dev/null +++ b/tests/integration_tests/debezium_avro/data/ddl.sql @@ -0,0 +1,3 @@ +USE test; + +ALTER TABLE tp_account ADD COLUMN note VARCHAR(32) NULL; diff --git a/tests/integration_tests/debezium_avro/data/post_ddl_workload.sql b/tests/integration_tests/debezium_avro/data/post_ddl_workload.sql new file mode 100644 index 0000000000..9a89bbb3be --- /dev/null +++ b/tests/integration_tests/debezium_avro/data/post_ddl_workload.sql @@ -0,0 +1,8 @@ +USE test; + +UPDATE tp_account SET note = 'after ddl' WHERE id = 12; +INSERT INTO tp_account(id, account_id, name, balance, payload, note) +VALUES (15, 45, 'carol', 90.1200, x'070809', 'deleted'); +DELETE FROM tp_account WHERE id = 15; +INSERT INTO tp_account(id, account_id, name, balance, payload, note) +VALUES (16, 46, 'dave', NULL, NULL, 'final'); diff --git a/tests/integration_tests/debezium_avro/data/prepare.sql b/tests/integration_tests/debezium_avro/data/prepare.sql new file mode 100644 index 0000000000..deee75e17a --- /dev/null +++ b/tests/integration_tests/debezium_avro/data/prepare.sql @@ -0,0 +1,11 @@ +DROP DATABASE IF EXISTS test; +CREATE DATABASE test; +USE test; + +CREATE TABLE tp_account ( + id BIGINT UNSIGNED PRIMARY KEY, + account_id INT NOT NULL, + name VARCHAR(64) NULL, + balance DECIMAL(20, 4) NULL, + payload VARBINARY(16) NULL +); diff --git a/tests/integration_tests/debezium_avro/data/workload.sql b/tests/integration_tests/debezium_avro/data/workload.sql new file mode 100644 index 0000000000..a6e3ad3bc4 --- /dev/null +++ b/tests/integration_tests/debezium_avro/data/workload.sql @@ -0,0 +1,9 @@ +USE test; + +INSERT INTO tp_account VALUES (12, 34, 'alice', 12.3400, x'010203'); +UPDATE tp_account +SET account_id = 35, name = 'bob', balance = 56.7800, payload = x'040506' +WHERE id = 12; +INSERT INTO tp_account +VALUES (18446744073709551615, 99, 'max', 1.2300, x'ff'); +DELETE FROM tp_account WHERE id = 18446744073709551615; diff --git a/tests/integration_tests/debezium_avro/run.sh b/tests/integration_tests/debezium_avro/run.sh new file mode 100644 index 0000000000..31c96d73ce --- /dev/null +++ b/tests/integration_tests/debezium_avro/run.sh @@ -0,0 +1,77 @@ +#!/bin/bash + +set -e + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function start_schema_registry() { + if ! curl -o /dev/null -s "http://127.0.0.1:8088"; then + echo 'Starting schema registry...' + ./bin/bin/schema-registry-start -daemon ./bin/etc/schema-registry/schema-registry.properties + local i=0 + while ! curl -o /dev/null -s "http://127.0.0.1:8088"; do + i=$((i + 1)) + if [ "$i" -gt 30 ]; then + echo 'Failed to start schema registry' + exit 1 + fi + sleep 2 + done + fi + + curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "NONE"}' http://127.0.0.1:8088/config +} + +function check_schema_registry_subject() { + local subject=$1 + local expected=$2 + + curl -fsS "http://127.0.0.1:8088/subjects/${subject}/versions/latest" | grep -q "$expected" +} + +function run() { + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + rm -rf "$WORK_DIR" && mkdir -p "$WORK_DIR" + + start_schema_registry + start_tidb_cluster --workdir "$WORK_DIR" + + run_sql_file "$CUR/data/prepare.sql" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + run_sql_file "$CUR/data/prepare.sql" "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + + start_ts=$(run_cdc_cli_tso_query "$UP_PD_HOST_1" "$UP_PD_PORT_1") + + run_cdc_server --workdir "$WORK_DIR" --binary "$CDC_BINARY" + + TOPIC_NAME="ticdc-debezium-avro-$RANDOM" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=debezium-avro&enable-tidb-extension=true&partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760&avro-decimal-handling-mode=precise&avro-bigint-unsigned-handling-mode=string" + schema_registry_uri="http://127.0.0.1:8088" + changefeed_id="debezium-avro-$RANDOM" + + cdc_cli_changefeed create --start-ts="$start_ts" --sink-uri="$SINK_URI" -c "$changefeed_id" --schema-registry="$schema_registry_uri" + sleep 5 # wait for changefeed to start + run_kafka_consumer "$WORK_DIR" "$SINK_URI" "" "$schema_registry_uri" + + run_sql_file "$CUR/data/workload.sql" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + run_sql_file "$CUR/data/ddl.sql" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + run_sql_file "$CUR/data/ddl.sql" "$DOWN_TIDB_HOST" "$DOWN_TIDB_PORT" + run_sql_file "$CUR/data/post_ddl_workload.sql" "$UP_TIDB_HOST" "$UP_TIDB_PORT" + + check_sync_diff "$WORK_DIR" "$CUR/conf/diff_config.toml" 120 + check_schema_registry_subject "$TOPIC_NAME-key" "tp_accountKey" + check_schema_registry_subject "$TOPIC_NAME-value" "tp_accountEnvelope" + + cleanup_process "$CDC_BINARY" +} + +trap 'stop_test $WORK_DIR' EXIT +run "$@" +check_logs "$WORK_DIR" +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_light_it_in_ci.sh b/tests/integration_tests/run_light_it_in_ci.sh index 1dd75a780f..c16268cdea 100755 --- a/tests/integration_tests/run_light_it_in_ci.sh +++ b/tests/integration_tests/run_light_it_in_ci.sh @@ -99,7 +99,7 @@ kafka_groups=( # G13 'cli_with_auth fail_over_ddl_N maintainer_failover_when_operator' # G14 - 'kafka_simple_basic avro_basic fail_over_ddl_O update_changefeed_check_config' + 'kafka_simple_basic avro_basic debezium_avro fail_over_ddl_O update_changefeed_check_config' # G15 'kafka_simple_basic_avro split_region autorandom gc_safepoint kafka_log_info' )