Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,7 +1770,8 @@ func verifyTable4MQ(
return nil
}

eventRouter, err := eventrouter.NewEventRouter(replicaConfig.Sink, topic, config.IsPulsarScheme(scheme), protocol == config.ProtocolAvro)
isAvroLike := protocol == config.ProtocolAvro || protocol == config.ProtocolDebeziumAvro
eventRouter, err := eventrouter.NewEventRouter(replicaConfig.Sink, topic, config.IsPulsarScheme(scheme), isAvroLike)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func newWriter(ctx context.Context, o *option) *writer {
w.progresses[i] = newPartitionProgress(int32(i), decoder)
}

eventRouter, err := eventrouter.NewEventRouter(o.sinkConfig, o.topic, false, o.protocol == config.ProtocolAvro)
isAvroLike := o.protocol == config.ProtocolAvro || o.protocol == config.ProtocolDebeziumAvro
eventRouter, err := eventrouter.NewEventRouter(o.sinkConfig, o.topic, false, isAvroLike)
if err != nil {
log.Panic("initialize the event router failed",
zap.Any("protocol", o.protocol), zap.Any("topic", o.topic),
Expand Down Expand Up @@ -493,7 +494,8 @@ func (w *writer) onDDL(ddl *event.DDLEvent) {
return
}
switch w.protocol {
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro, config.ProtocolSimple, config.ProtocolDebezium:
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro, config.ProtocolSimple,
config.ProtocolDebezium, config.ProtocolDebeziumAvro:
default:
return
}
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func GetProtocol(protocolStr string) (config.Protocol, error) {
// GetFileExtension returns the extension for specific protocol
func GetFileExtension(protocol config.Protocol) string {
switch protocol {
case config.ProtocolAvro, config.ProtocolCanalJSON, config.ProtocolMaxwell,
case config.ProtocolAvro, config.ProtocolDebeziumAvro, config.ProtocolCanalJSON, config.ProtocolMaxwell,
config.ProtocolOpen, config.ProtocolSimple:
return ".json"
case config.ProtocolCraft:
Expand Down
3 changes: 2 additions & 1 deletion downstreamadapter/sink/kafka/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func newKafkaSinkComponentWithFactory(ctx context.Context,
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaNewProducer, err)
}

isAvroLike := protocol == config.ProtocolAvro || protocol == config.ProtocolDebeziumAvro
kafkaComponent.eventRouter, err = eventrouter.NewEventRouter(
sinkConfig, topic, false, protocol == config.ProtocolAvro)
sinkConfig, topic, false, isAvroLike)
if err != nil {
return kafkaComponent, protocol, errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ type SinkConfig struct {
DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"`

ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"`
// SchemaRegistry is only available when the downstream is MQ using avro protocol.
// SchemaRegistry is only available when the downstream is MQ using avro protocol
// or debezium protocol with Confluent Avro encoding.
SchemaRegistry *string `toml:"schema-registry" json:"schema-registry,omitempty"`
// EncoderConcurrency is only available when the downstream is MQ.
EncoderConcurrency *int `toml:"encoder-concurrency" json:"encoder-concurrency,omitempty"`
Expand Down Expand Up @@ -965,7 +966,7 @@ func (s *SinkConfig) ValidateProtocol(scheme string) error {
if s.OpenProtocol != nil {
outputOldValue = s.OpenProtocol.OutputOldValue
}
case ProtocolDebezium:
case ProtocolDebezium, ProtocolDebeziumAvro:
if s.Debezium != nil {
outputOldValue = s.Debezium.OutputOldValue
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/sink_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
ProtocolCsv
ProtocolDebezium
ProtocolSimple
ProtocolDebeziumAvro
)

// IsBatchEncode returns whether the protocol is a batch encoder.
Expand Down Expand Up @@ -71,6 +72,8 @@ func ParseSinkProtocolFromString(protocol string) (Protocol, error) {
return ProtocolCsv, nil
case "debezium":
return ProtocolDebezium, nil
case "debezium-avro":
return ProtocolDebeziumAvro, nil
case "simple":
return ProtocolSimple, nil
default:
Expand Down Expand Up @@ -101,6 +104,8 @@ func (p Protocol) String() string {
return "debezium"
case ProtocolSimple:
return "simple"
case ProtocolDebeziumAvro:
return "debezium-avro"
default:
panic("unreachable")
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/sink_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func TestParseSinkProtocolFromString(t *testing.T) {
protocol: "open-protocol",
expectedProtocolEnum: ProtocolOpen,
},
{
protocol: "debezium-avro",
expectedProtocolEnum: ProtocolDebeziumAvro,
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -109,6 +113,10 @@ func TestString(t *testing.T) {
protocolEnum: ProtocolOpen,
expectedProtocol: "open-protocol",
},
{
protocolEnum: ProtocolDebeziumAvro,
expectedProtocol: "debezium-avro",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -151,6 +159,10 @@ func TestIsBatchEncoder(t *testing.T) {
protocolEnum: ProtocolOpen,
expect: true,
},
{
protocolEnum: ProtocolDebeziumAvro,
expect: false,
},
}

for _, tc := range testCases {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/codec/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func NewEventEncoder(ctx context.Context, cfg *common.Config) (common.EventEncod
return canal.NewJSONRowEventEncoder(ctx, cfg)
case config.ProtocolDebezium:
return debezium.NewBatchEncoder(cfg, config.GetGlobalServerConfig().ClusterID), nil
case config.ProtocolDebeziumAvro:
return debezium.NewAvroBatchEncoder(ctx, cfg, config.GetGlobalServerConfig().ClusterID)
case config.ProtocolSimple:
return simple.NewEncoder(ctx, cfg)
default:
Expand All @@ -67,6 +69,8 @@ func NewEventDecoder(
return simple.NewDecoder(ctx, codecConfig, upstreamTiDB)
case config.ProtocolDebezium:
return debezium.NewDecoder(codecConfig, idx, upstreamTiDB), nil
case config.ProtocolDebeziumAvro:
return debezium.NewAvroDecoder(ctx, codecConfig, idx, upstreamTiDB)
default:
}
log.Panic("Protocol not supported", zap.Any("Protocol", codecConfig.Protocol))
Expand Down
67 changes: 59 additions & 8 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Config struct {

OutputRowKey bool

// avro only
// avro and debezium-avro only
// protocol when Confluent Avro encoding is enabled.
AvroConfluentSchemaRegistry string
AvroDecimalHandlingMode string
AvroBigintUnsignedHandlingMode string
Expand Down Expand Up @@ -236,9 +237,10 @@ func (c *Config) Apply(sinkURI *url.URL, sinkConfig *config.SinkConfig) error {
sinkConfig.KafkaConfig.GlueSchemaRegistryConfig != nil {
c.AvroGlueSchemaRegistry = sinkConfig.KafkaConfig.GlueSchemaRegistryConfig
}
if c.Protocol == config.ProtocolAvro && util.GetOrZero(sinkConfig.ForceReplicate) {
if (c.Protocol == config.ProtocolAvro || c.Protocol == config.ProtocolDebeziumAvro) &&
util.GetOrZero(sinkConfig.ForceReplicate) {
return errors.ErrCodecInvalidConfig.GenWithStack(
`force-replicate must be disabled, when using avro protocol`)
`force-replicate must be disabled, when using avro or debezium-avro protocol`)
}

if sinkConfig != nil {
Expand Down Expand Up @@ -353,30 +355,57 @@ func (c *Config) WithChangefeedID(id common.ChangeFeedID) *Config {
// Validate the Config
func (c *Config) Validate() error {
if c.EnableTiDBExtension &&
(c.Protocol != config.ProtocolCanalJSON && c.Protocol != config.ProtocolAvro && c.Protocol != config.ProtocolDebezium) {
(c.Protocol != config.ProtocolCanalJSON && c.Protocol != config.ProtocolAvro &&
c.Protocol != config.ProtocolDebezium && c.Protocol != config.ProtocolDebeziumAvro) {
log.Warn("ignore invalid config, enable-tidb-extension"+
"only supports canal-json/avro/debezium protocol",
"only supports canal-json/avro/debezium/debezium-avro protocol",
zap.Bool("enableTidbExtension", c.EnableTiDBExtension),
zap.String("protocol", c.Protocol.String()))
}

if c.Protocol == config.ProtocolAvro {
if c.Protocol == config.ProtocolDebezium &&
(c.AvroConfluentSchemaRegistry != "" || c.AvroGlueSchemaRegistry != nil) {
return errors.ErrCodecInvalidConfig.GenWithStack(
`Debezium protocol does not support schema registry; use protocol "debezium-avro"`,
)
}

if c.Protocol == config.ProtocolAvro || c.Protocol == config.ProtocolDebeziumAvro {
if c.AvroConfluentSchemaRegistry != "" && c.AvroGlueSchemaRegistry != nil {
protocol := "Avro"
if c.Protocol == config.ProtocolDebeziumAvro {
protocol = "Debezium Avro"
}
return errors.ErrCodecInvalidConfig.GenWithStack(
`Avro protocol requires only one of "%s" or "%s" to specify the schema registry`,
`%s protocol requires only one of "%s" or "%s" to specify the schema registry`,
protocol,
codecOPTAvroSchemaRegistry,
coderOPTAvroGlueSchemaRegistry,
)
}

if c.AvroConfluentSchemaRegistry == "" && c.AvroGlueSchemaRegistry == nil {
protocol := "Avro"
if c.Protocol == config.ProtocolDebeziumAvro {
protocol = "Debezium Avro"
}
return errors.ErrCodecInvalidConfig.GenWithStack(
`Avro protocol requires parameter "%s" or "%s" to specify the schema registry`,
`%s protocol requires parameter "%s" or "%s" to specify the schema registry`,
protocol,
codecOPTAvroSchemaRegistry,
coderOPTAvroGlueSchemaRegistry,
)
}

if c.Protocol == config.ProtocolDebeziumAvro && c.AvroGlueSchemaRegistry != nil {
return errors.ErrCodecInvalidConfig.GenWithStack(
`Debezium Avro protocol only supports "%s" for Confluent Avro Schema Registry`,
codecOPTAvroSchemaRegistry,
)
}
}

if c.Protocol == config.ProtocolAvro {
if c.AvroDecimalHandlingMode != DecimalHandlingModePrecise &&
c.AvroDecimalHandlingMode != DecimalHandlingModeString {
return errors.ErrCodecInvalidConfig.GenWithStack(
Expand Down Expand Up @@ -410,6 +439,28 @@ func (c *Config) Validate() error {
}
}

if c.Protocol == config.ProtocolDebeziumAvro {
if c.AvroDecimalHandlingMode != DecimalHandlingModePrecise &&
c.AvroDecimalHandlingMode != DecimalHandlingModeString {
return errors.ErrCodecInvalidConfig.GenWithStack(
`%s value could only be "%s" or "%s"`,
codecOPTAvroDecimalHandlingMode,
DecimalHandlingModeString,
DecimalHandlingModePrecise,
)
}

if c.AvroBigintUnsignedHandlingMode != BigintUnsignedHandlingModeLong &&
c.AvroBigintUnsignedHandlingMode != BigintUnsignedHandlingModeString {
return errors.ErrCodecInvalidConfig.GenWithStack(
`%s value could only be "%s" or "%s"`,
codecOPTAvroBigintUnsignedHandlingMode,
BigintUnsignedHandlingModeLong,
BigintUnsignedHandlingModeString,
)
}
}

if c.MaxMessageBytes <= 0 {
return errors.ErrCodecInvalidConfig.Wrap(
errors.Errorf("invalid max-message-bytes %d", c.MaxMessageBytes),
Expand Down
40 changes: 40 additions & 0 deletions pkg/sink/codec/common/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"testing"

"github.com/pingcap/ticdc/pkg/config"
"github.com/stretchr/testify/require"
)

func TestDebeziumAvroSchemaRegistryConfig(t *testing.T) {
t.Parallel()

cfg := NewConfig(config.ProtocolDebeziumAvro)
cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081"
require.NoError(t, cfg.Validate())

cfg = NewConfig(config.ProtocolDebeziumAvro)
require.ErrorContains(t, cfg.Validate(), `Debezium Avro protocol requires parameter "schema-registry"`)

cfg = NewConfig(config.ProtocolDebeziumAvro)
cfg.AvroGlueSchemaRegistry = &config.GlueSchemaRegistryConfig{}
require.ErrorContains(t, cfg.Validate(), `Debezium Avro protocol only supports "schema-registry"`)

cfg = NewConfig(config.ProtocolDebezium)
cfg.AvroConfluentSchemaRegistry = "http://127.0.0.1:8081"
require.ErrorContains(t, cfg.Validate(), `Debezium protocol does not support schema registry`)
}
Loading
Loading