Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions downstreamadapter/sink/cloudstorage/encoder_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func newTestTxnEncoderConfig(t *testing.T) *common.Config {
config.ProtocolCsv,
replicaConfig.Sink,
config.DefaultMaxMessageBytes,
config.DefaultMaxMessageBytes,
)
require.NoError(t, err)
return encoderConfig
Expand Down
8 changes: 4 additions & 4 deletions downstreamadapter/sink/cloudstorage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func Verify(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.
if err != nil {
return err
}
_, err = helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt)
_, err = helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt, math.MaxInt)
if err != nil {
return err
}
Expand Down Expand Up @@ -117,9 +117,9 @@ func New(
}
// get cloud storage file extension according to the specific protocol.
ext := helper.GetFileExtension(protocol)
// the last param maxMsgBytes is mainly to limit the size of a single message for
// batch protocols in mq scenario. In cloud storage sink, we just set it to max int.
encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt)
// Message size limits are mainly for MQ batch protocols. Cloud storage uses
// max int for both the final message limit and the batch threshold.
encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt, math.MaxInt)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions downstreamadapter/sink/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ func GetEncoderConfig(
sinkURI *url.URL,
protocol config.Protocol,
sinkConfig *config.SinkConfig,
maxMsgBytes int,
maxMessageBytes int,
batchMaxMessageBytes int,
) (*common.Config, error) {
encoderConfig := common.NewConfig(protocol)
if err := encoderConfig.Apply(sinkURI, sinkConfig); err != nil {
return nil, errors.WrapError(errors.ErrSinkInvalidConfig, err)
}
// Always set encoder's `MaxMessageBytes` equal to producer's `MaxMessageBytes`
// to prevent that the encoder generate batched message too large
// then cause producer meet `message too large`.
encoderConfig = encoderConfig.
WithMaxMessageBytes(maxMsgBytes).
WithMaxMessageBytes(maxMessageBytes).
WithMaxBatchMessageBytes(batchMaxMessageBytes).
WithChangefeedID(changefeedID)

tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
Expand Down
5 changes: 4 additions & 1 deletion downstreamadapter/sink/kafka/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func newKafkaSinkComponentWithFactory(ctx context.Context,
return kafkaComponent, protocol, errors.Trace(err)
}

encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, options.MaxMessageBytes)
encoderConfig, err := helper.GetEncoderConfig(
changefeedID, sinkURI, protocol, sinkConfig,
options.MaxMessageBytes, options.BatchMaxMessageBytes,
)
if err != nil {
return kafkaComponent, protocol, errors.Trace(err)
}
Expand Down
5 changes: 4 additions & 1 deletion downstreamadapter/sink/kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ func newKafkaSinkForTestWithProducers(ctx context.Context,
if err != nil {
return nil, err
}
encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, options.MaxMessageBytes)
encoderConfig, err := helper.GetEncoderConfig(
changefeedID, sinkURI, protocol, sinkConfig,
options.MaxMessageBytes, options.MaxMessageBytes,
)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion downstreamadapter/sink/pulsar/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ func newPulsarSinkComponentWithFactory(ctx context.Context,
return pulsarComponent, protocol, errors.Trace(err)
}

encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, config.DefaultMaxMessageBytes)
encoderConfig, err := helper.GetEncoderConfig(
changefeedID, sinkURI, protocol, sinkConfig,
config.DefaultMaxMessageBytes, config.DefaultMaxMessageBytes,
)
if err != nil {
return pulsarComponent, protocol, errors.Trace(err)
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/sink/codec/avro/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (a *BatchEncoder) AppendRowChangedEvent(
zap.Int("maxMessageBytes", a.config.MaxMessageBytes),
zap.Int("length", message.Length()),
zap.Any("table", e.TableInfo.TableName))
return errors.ErrMessageTooLarge.GenWithStackByArgs(e.TableInfo.GetTargetTableName(), message.Length(), a.config.MaxMessageBytes)
return errors.ErrMessageTooLarge.GenWithStackByArgs(
e.TableInfo.GetTargetTableName(), message.Length(), a.config.MaxMessageBytes)
}

a.result = append(a.result, message)
Expand All @@ -114,7 +115,12 @@ func (a *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)
}

value := buf.Bytes()
return common.NewMsg(nil, value), nil
message := common.NewMsg(nil, value)
if message.Length() > a.config.MaxMessageBytes {
return nil, errors.ErrMessageTooLarge.GenWithStackByArgs(
"checkpoint", message.Length(), a.config.MaxMessageBytes)
}
return message, nil
}
return nil, nil
}
Expand All @@ -140,7 +146,12 @@ func (a *BatchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.Message,
buf.Write(data)

value := buf.Bytes()
return common.NewMsg(nil, value), nil
message := common.NewMsg(nil, value)
if message.Length() > a.config.MaxMessageBytes {
return nil, errors.ErrMessageTooLarge.GenWithStackByArgs(
e.GetTargetTableName(), message.Length(), a.config.MaxMessageBytes)
}
return message, nil
}

return nil, nil
Expand Down
33 changes: 21 additions & 12 deletions pkg/sink/codec/canal/canal_json_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,12 @@ func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message,
return nil, errors.WrapError(errors.ErrCanalEncodeFailed, err)
}

return common.NewMsg(nil, value), nil
message := common.NewMsg(nil, value)
if message.Length() > c.config.MaxMessageBytes {
return nil, errors.ErrMessageTooLarge.GenWithStackByArgs(
"checkpoint", message.Length(), c.config.MaxMessageBytes)
}
return message, nil
}

// AppendRowChangedEvent implements the interface EventJSONBatchEncoder
Expand All @@ -468,16 +473,7 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent(

targetTable := e.TableInfo.GetTargetTableName()
originLength := m.Length()
if m.Length() > c.config.MaxMessageBytes {
// for single message that is longer than max-message-bytes, do not send it.
if c.config.LargeMessageHandle.Disabled() {
log.Error("Single message is too large for canal-json",
zap.Int("maxMessageBytes", c.config.MaxMessageBytes),
zap.Int("length", originLength),
zap.Any("table", e.TableInfo.TableName))
return errors.ErrMessageTooLarge.GenWithStackByArgs(targetTable, originLength, c.config.MaxMessageBytes)
}

if m.Length() > c.config.MaxMessageBytes && !c.config.LargeMessageHandle.Disabled() {
if c.config.LargeMessageHandle.HandleKeyOnly() {
value, err = newJSONMessageForDML(e, c.config, true, "")
if err != nil {
Expand Down Expand Up @@ -520,6 +516,14 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent(
}
}

if m.Length() > c.config.MaxMessageBytes {
log.Error("Single message is too large for canal-json",
zap.Int("maxMessageBytes", c.config.MaxMessageBytes),
zap.Int("length", m.Length()),
zap.Any("table", e.TableInfo.TableName))
return errors.ErrMessageTooLarge.GenWithStackByArgs(targetTable, m.Length(), c.config.MaxMessageBytes)
}

c.messages = append(c.messages, m)
return nil
}
Expand Down Expand Up @@ -580,7 +584,12 @@ func (c *JSONRowEventEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.M
return nil, errors.WrapError(errors.ErrCanalEncodeFailed, err)
}

return common.NewMsg(nil, value), nil
result := common.NewMsg(nil, value)
if result.Length() > c.config.MaxMessageBytes {
return nil, errors.ErrMessageTooLarge.GenWithStackByArgs(
e.GetTargetTableName(), result.Length(), c.config.MaxMessageBytes)
}
return result, nil
}

func (c *JSONRowEventEncoder) Clean() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/sink/codec/canal/canal_json_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,22 @@ func TestMaxMessageBytes(t *testing.T) {
})
require.NoError(t, err)

codecConfig = common.NewConfig(config.ProtocolCanalJSON).
WithMaxMessageBytes(maxMessageBytes).
WithMaxBatchMessageBytes(100)

encIface, err = NewJSONRowEventEncoder(ctx, codecConfig)
require.NoError(t, err)

encoder = encIface.(*JSONRowEventEncoder)
err = encoder.AppendRowChangedEvent(ctx, topic, &commonEvent.RowEvent{
TableInfo: dml.TableInfo,
CommitTs: dml.CommitTs,
Event: rc,
ColumnSelector: columnselector.NewDefaultColumnSelector(),
})
require.NoError(t, err)

// the test message length is larger than max-message-bytes
codecConfig = codecConfig.WithMaxMessageBytes(100)

Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/canal/canal_json_txn_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (j *JSONTxnEventEncoder) AppendTxnEvent(event *commonEvent.DMLEvent) error
return err
}
length := len(value) + common.MaxRecordOverhead
// For single message that is longer than max-message-bytes, do not send it.
if length > j.config.MaxMessageBytes {
log.Warn("Single message is too large for canal-json",
zap.Int("maxMessageBytes", j.config.MaxMessageBytes),
zap.Int("length", length),
zap.Any("table", event.TableInfo.TableName))
return errors.ErrMessageTooLarge.GenWithStackByArgs(event.TableInfo.GetTargetTableName(), length, j.config.MaxMessageBytes)
return errors.ErrMessageTooLarge.GenWithStackByArgs(
event.TableInfo.GetTargetTableName(), length, j.config.MaxMessageBytes)
}
j.valueBuf.Write(value)
j.valueBuf.Write(j.terminator)
Expand Down
35 changes: 33 additions & 2 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ type Config struct {

Protocol config.Protocol

// control batch behavior, only for `open-protocol` and `craft` at the moment.
// MaxMessageBytes is the final encoded message size limit.
MaxMessageBytes int
MaxBatchSize int
// MaxBatchMessageBytes controls batch splitting.
// If it is not set, codecs use MaxMessageBytes to keep the old behavior.
MaxBatchMessageBytes int
MaxBatchSize int

// DeleteOnlyHandleKeyColumns is true, for the delete event only output the handle key columns.
DeleteOnlyHandleKeyColumns bool
Expand Down Expand Up @@ -344,6 +347,20 @@ func (c *Config) WithMaxMessageBytes(bytes int) *Config {
return c
}

// WithMaxBatchMessageBytes sets the batch splitting threshold.
func (c *Config) WithMaxBatchMessageBytes(bytes int) *Config {
c.MaxBatchMessageBytes = bytes
return c
}

// BatchMaxMessageBytes returns the batch splitting threshold.
func (c *Config) BatchMaxMessageBytes() int {
if c.MaxBatchMessageBytes > 0 {
return c.MaxBatchMessageBytes
}
return c.MaxMessageBytes
}

// WithChangefeedID set the `changefeedID`
func (c *Config) WithChangefeedID(id common.ChangeFeedID) *Config {
c.ChangefeedID = id
Expand Down Expand Up @@ -415,6 +432,20 @@ func (c *Config) Validate() error {
errors.Errorf("invalid max-message-bytes %d", c.MaxMessageBytes),
)
}
if c.MaxBatchMessageBytes < 0 {
return errors.ErrCodecInvalidConfig.Wrap(
errors.Errorf("invalid max-batch-message-bytes %d", c.MaxBatchMessageBytes),
)
}
Comment on lines +435 to +439

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

We should validate that MaxBatchMessageBytes is not greater than MaxMessageBytes. A batch size threshold larger than the absolute maximum message size is an invalid configuration and could lead to unexpected behavior.

	if c.MaxBatchMessageBytes < 0 {
		return errors.ErrCodecInvalidConfig.Wrap(
			errors.Errorf("invalid max-batch-message-bytes %d", c.MaxBatchMessageBytes),
		)
	}
	if c.MaxBatchMessageBytes > c.MaxMessageBytes {
		return errors.ErrCodecInvalidConfig.Wrap(
			errors.Errorf("max-batch-message-bytes %d cannot be greater than max-message-bytes %d", c.MaxBatchMessageBytes, c.MaxMessageBytes),
		)
	}

if c.MaxBatchMessageBytes > c.MaxMessageBytes {
return errors.ErrCodecInvalidConfig.Wrap(
errors.Errorf(
"max-batch-message-bytes %d cannot be greater than max-message-bytes %d",
c.MaxBatchMessageBytes,
c.MaxMessageBytes,
),
)
}

if c.MaxBatchSize <= 0 {
return errors.ErrCodecInvalidConfig.Wrap(
Expand Down
35 changes: 35 additions & 0 deletions pkg/sink/codec/common/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"testing"

"github.com/pingcap/ticdc/pkg/config"
"github.com/stretchr/testify/require"
)

func TestValidateMaxBatchMessageBytes(t *testing.T) {
cfg := NewConfig(config.ProtocolOpen)
cfg.MaxMessageBytes = 100
cfg.MaxBatchMessageBytes = 101

err := cfg.Validate()
require.Error(t, err)
require.ErrorContains(
t,
err,
"max-batch-message-bytes 101 cannot be greater than max-message-bytes 100",
)
}
39 changes: 24 additions & 15 deletions pkg/sink/codec/open/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
)

// batchEncoder for open protocol will batch multiple row changed events into a single message.
// One message can contain at most MaxBatchSize events, and the total size of the message cannot exceed MaxMessageBytes.
// One message can contain at most MaxBatchSize events, and the total size of the message cannot exceed BatchMaxMessageBytes.
type batchEncoder struct {
messages []*common.Message
// buff the callback of the latest message
Expand Down Expand Up @@ -95,17 +95,7 @@ func (d *batchEncoder) AppendRowChangedEvent(
return errors.Trace(err)
}

if length > d.config.MaxMessageBytes {
// message len is larger than max-message-bytes
if d.config.LargeMessageHandle.Disabled() {
log.Warn("Single message is too large for open-protocol",
zap.Int("maxMessageBytes", d.config.MaxMessageBytes),
zap.Int("length", length),
zap.Any("table", e.TableInfo.TableName),
zap.Any("key", key))
return errors.ErrMessageTooLarge.GenWithStackByArgs(e.TableInfo.GetTargetTableName(), length, d.config.MaxMessageBytes)
}

if length > d.config.MaxMessageBytes && !d.config.LargeMessageHandle.Disabled() {
if d.config.LargeMessageHandle.EnableClaimCheck() {
// send the large message to the external storage first, then
// create a new message contains the reference of the large message.
Expand Down Expand Up @@ -149,6 +139,15 @@ func (d *batchEncoder) AppendRowChangedEvent(
}
}

if length > d.config.MaxMessageBytes {
log.Warn("Single message is too large for open-protocol",
zap.Int("maxMessageBytes", d.config.MaxMessageBytes),
zap.Int("length", length),
zap.Any("table", e.TableInfo.TableName),
zap.Any("key", key))
return errors.ErrMessageTooLarge.GenWithStackByArgs(e.TableInfo.GetTargetTableName(), length, d.config.MaxMessageBytes)
}

d.pushMessage(key, value, e.Callback)
return nil
}
Expand All @@ -174,7 +173,7 @@ func (d *batchEncoder) pushMessage(key, value []byte, callback func()) {
binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key)))
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

if len(d.messages) == 0 || d.messages[len(d.messages)-1].Length()+length > d.config.MaxMessageBytes || d.messages[len(d.messages)-1].GetRowsCount() >= d.config.MaxBatchSize {
if len(d.messages) == 0 || d.messages[len(d.messages)-1].Length()+length > d.config.BatchMaxMessageBytes() || d.messages[len(d.messages)-1].GetRowsCount() >= d.config.MaxBatchSize {
d.finalizeCallback()
// create a new message
versionHead := make([]byte, 8)
Expand Down Expand Up @@ -244,7 +243,12 @@ func (d *batchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.Message,
return nil, errors.Trace(err)
}

return common.NewMsg(key, value), nil
message := common.NewMsg(key, value)
if message.Length() > d.config.MaxMessageBytes {
return nil, errors.ErrMessageTooLarge.GenWithStackByArgs(
e.GetTargetTableName(), message.Length(), d.config.MaxMessageBytes)
}
return message, nil
}

// EncodeCheckpointEvent implements the RowEventEncoder interface
Expand Down Expand Up @@ -279,5 +283,10 @@ func (d *batchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)

key = keyOutput.Bytes()
value := valueOutput.Bytes()
return common.NewMsg(key, value), nil
message := common.NewMsg(key, value)
if message.Length() > d.config.MaxMessageBytes {
return nil, errors.ErrMessageTooLarge.GenWithStackByArgs(
"checkpoint", message.Length(), d.config.MaxMessageBytes)
}
return message, nil
}
Loading
Loading