diff --git a/downstreamadapter/sink/cloudstorage/encoder_group_test.go b/downstreamadapter/sink/cloudstorage/encoder_group_test.go index acde673e5a..bd681936ad 100644 --- a/downstreamadapter/sink/cloudstorage/encoder_group_test.go +++ b/downstreamadapter/sink/cloudstorage/encoder_group_test.go @@ -230,6 +230,7 @@ func newTestTxnEncoderConfig(t *testing.T) *common.Config { config.ProtocolCsv, replicaConfig.Sink, config.DefaultMaxMessageBytes, + config.DefaultMaxMessageBytes, ) require.NoError(t, err) return encoderConfig diff --git a/downstreamadapter/sink/cloudstorage/sink.go b/downstreamadapter/sink/cloudstorage/sink.go index 8a64f45396..d93588b97b 100644 --- a/downstreamadapter/sink/cloudstorage/sink.go +++ b/downstreamadapter/sink/cloudstorage/sink.go @@ -87,7 +87,7 @@ func Verify(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url. if err != nil { return err } - _, err = helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt) + _, err = helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt, math.MaxInt) if err != nil { return err } @@ -117,9 +117,9 @@ func New( } // get cloud storage file extension according to the specific protocol. ext := helper.GetFileExtension(protocol) - // the last param maxMsgBytes is mainly to limit the size of a single message for - // batch protocols in mq scenario. In cloud storage sink, we just set it to max int. - encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt) + // Message size limits are mainly for MQ batch protocols. Cloud storage uses + // max int for both the final message limit and the batch threshold. + encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt, math.MaxInt) if err != nil { return nil, err } diff --git a/downstreamadapter/sink/helper/helper.go b/downstreamadapter/sink/helper/helper.go index 47cd949220..d5621efc39 100644 --- a/downstreamadapter/sink/helper/helper.go +++ b/downstreamadapter/sink/helper/helper.go @@ -50,17 +50,16 @@ func GetEncoderConfig( sinkURI *url.URL, protocol config.Protocol, sinkConfig *config.SinkConfig, - maxMsgBytes int, + maxMessageBytes int, + batchMaxMessageBytes int, ) (*common.Config, error) { encoderConfig := common.NewConfig(protocol) if err := encoderConfig.Apply(sinkURI, sinkConfig); err != nil { return nil, errors.WrapError(errors.ErrSinkInvalidConfig, err) } - // Always set encoder's `MaxMessageBytes` equal to producer's `MaxMessageBytes` - // to prevent that the encoder generate batched message too large - // then cause producer meet `message too large`. encoderConfig = encoderConfig. - WithMaxMessageBytes(maxMsgBytes). + WithMaxMessageBytes(maxMessageBytes). + WithMaxBatchMessageBytes(batchMaxMessageBytes). WithChangefeedID(changefeedID) tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ) diff --git a/downstreamadapter/sink/kafka/helper.go b/downstreamadapter/sink/kafka/helper.go index de6ce2fb71..cbc5b5fb28 100644 --- a/downstreamadapter/sink/kafka/helper.go +++ b/downstreamadapter/sink/kafka/helper.go @@ -88,7 +88,10 @@ func newKafkaSinkComponentWithFactory(ctx context.Context, return kafkaComponent, protocol, errors.Trace(err) } - encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, options.MaxMessageBytes) + encoderConfig, err := helper.GetEncoderConfig( + changefeedID, sinkURI, protocol, sinkConfig, + options.MaxMessageBytes, options.BatchMaxMessageBytes, + ) if err != nil { return kafkaComponent, protocol, errors.Trace(err) } diff --git a/downstreamadapter/sink/kafka/sink_test.go b/downstreamadapter/sink/kafka/sink_test.go index 7d46286e27..7309acfcb8 100644 --- a/downstreamadapter/sink/kafka/sink_test.go +++ b/downstreamadapter/sink/kafka/sink_test.go @@ -97,7 +97,10 @@ func newKafkaSinkForTestWithProducers(ctx context.Context, if err != nil { return nil, err } - encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, options.MaxMessageBytes) + encoderConfig, err := helper.GetEncoderConfig( + changefeedID, sinkURI, protocol, sinkConfig, + options.MaxMessageBytes, options.MaxMessageBytes, + ) if err != nil { return nil, err } diff --git a/downstreamadapter/sink/pulsar/helper.go b/downstreamadapter/sink/pulsar/helper.go index d2c33430b5..3ccb299767 100644 --- a/downstreamadapter/sink/pulsar/helper.go +++ b/downstreamadapter/sink/pulsar/helper.go @@ -122,7 +122,10 @@ func newPulsarSinkComponentWithFactory(ctx context.Context, return pulsarComponent, protocol, errors.Trace(err) } - encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, config.DefaultMaxMessageBytes) + encoderConfig, err := helper.GetEncoderConfig( + changefeedID, sinkURI, protocol, sinkConfig, + config.DefaultMaxMessageBytes, config.DefaultMaxMessageBytes, + ) if err != nil { return pulsarComponent, protocol, errors.Trace(err) } diff --git a/pkg/sink/codec/avro/encoder.go b/pkg/sink/codec/avro/encoder.go index fdad6c7dbf..bcb0d1f5f6 100644 --- a/pkg/sink/codec/avro/encoder.go +++ b/pkg/sink/codec/avro/encoder.go @@ -93,7 +93,8 @@ func (a *BatchEncoder) AppendRowChangedEvent( zap.Int("maxMessageBytes", a.config.MaxMessageBytes), zap.Int("length", message.Length()), zap.Any("table", e.TableInfo.TableName)) - return errors.ErrMessageTooLarge.GenWithStackByArgs(e.TableInfo.GetTargetTableName(), message.Length(), a.config.MaxMessageBytes) + return errors.ErrMessageTooLarge.GenWithStackByArgs( + e.TableInfo.GetTargetTableName(), message.Length(), a.config.MaxMessageBytes) } a.result = append(a.result, message) @@ -114,7 +115,12 @@ func (a *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) } value := buf.Bytes() - return common.NewMsg(nil, value), nil + message := common.NewMsg(nil, value) + if message.Length() > a.config.MaxMessageBytes { + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + "checkpoint", message.Length(), a.config.MaxMessageBytes) + } + return message, nil } return nil, nil } @@ -140,7 +146,12 @@ func (a *BatchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.Message, buf.Write(data) value := buf.Bytes() - return common.NewMsg(nil, value), nil + message := common.NewMsg(nil, value) + if message.Length() > a.config.MaxMessageBytes { + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + e.GetTargetTableName(), message.Length(), a.config.MaxMessageBytes) + } + return message, nil } return nil, nil diff --git a/pkg/sink/codec/canal/canal_json_encoder.go b/pkg/sink/codec/canal/canal_json_encoder.go index 7425a666ef..215b0e1936 100644 --- a/pkg/sink/codec/canal/canal_json_encoder.go +++ b/pkg/sink/codec/canal/canal_json_encoder.go @@ -441,7 +441,12 @@ func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, return nil, errors.WrapError(errors.ErrCanalEncodeFailed, err) } - return common.NewMsg(nil, value), nil + message := common.NewMsg(nil, value) + if message.Length() > c.config.MaxMessageBytes { + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + "checkpoint", message.Length(), c.config.MaxMessageBytes) + } + return message, nil } // AppendRowChangedEvent implements the interface EventJSONBatchEncoder @@ -468,16 +473,7 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( targetTable := e.TableInfo.GetTargetTableName() originLength := m.Length() - if m.Length() > c.config.MaxMessageBytes { - // for single message that is longer than max-message-bytes, do not send it. - if c.config.LargeMessageHandle.Disabled() { - log.Error("Single message is too large for canal-json", - zap.Int("maxMessageBytes", c.config.MaxMessageBytes), - zap.Int("length", originLength), - zap.Any("table", e.TableInfo.TableName)) - return errors.ErrMessageTooLarge.GenWithStackByArgs(targetTable, originLength, c.config.MaxMessageBytes) - } - + if m.Length() > c.config.MaxMessageBytes && !c.config.LargeMessageHandle.Disabled() { if c.config.LargeMessageHandle.HandleKeyOnly() { value, err = newJSONMessageForDML(e, c.config, true, "") if err != nil { @@ -520,6 +516,14 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( } } + if m.Length() > c.config.MaxMessageBytes { + log.Error("Single message is too large for canal-json", + zap.Int("maxMessageBytes", c.config.MaxMessageBytes), + zap.Int("length", m.Length()), + zap.Any("table", e.TableInfo.TableName)) + return errors.ErrMessageTooLarge.GenWithStackByArgs(targetTable, m.Length(), c.config.MaxMessageBytes) + } + c.messages = append(c.messages, m) return nil } @@ -580,7 +584,12 @@ func (c *JSONRowEventEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.M return nil, errors.WrapError(errors.ErrCanalEncodeFailed, err) } - return common.NewMsg(nil, value), nil + result := common.NewMsg(nil, value) + if result.Length() > c.config.MaxMessageBytes { + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + e.GetTargetTableName(), result.Length(), c.config.MaxMessageBytes) + } + return result, nil } func (c *JSONRowEventEncoder) Clean() { diff --git a/pkg/sink/codec/canal/canal_json_encoder_test.go b/pkg/sink/codec/canal/canal_json_encoder_test.go index 2953642f89..ac68162050 100644 --- a/pkg/sink/codec/canal/canal_json_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_encoder_test.go @@ -666,6 +666,22 @@ func TestMaxMessageBytes(t *testing.T) { }) require.NoError(t, err) + codecConfig = common.NewConfig(config.ProtocolCanalJSON). + WithMaxMessageBytes(maxMessageBytes). + WithMaxBatchMessageBytes(100) + + encIface, err = NewJSONRowEventEncoder(ctx, codecConfig) + require.NoError(t, err) + + encoder = encIface.(*JSONRowEventEncoder) + err = encoder.AppendRowChangedEvent(ctx, topic, &commonEvent.RowEvent{ + TableInfo: dml.TableInfo, + CommitTs: dml.CommitTs, + Event: rc, + ColumnSelector: columnselector.NewDefaultColumnSelector(), + }) + require.NoError(t, err) + // the test message length is larger than max-message-bytes codecConfig = codecConfig.WithMaxMessageBytes(100) diff --git a/pkg/sink/codec/canal/canal_json_txn_encoder.go b/pkg/sink/codec/canal/canal_json_txn_encoder.go index 0af6f4f3f2..4998378ef7 100644 --- a/pkg/sink/codec/canal/canal_json_txn_encoder.go +++ b/pkg/sink/codec/canal/canal_json_txn_encoder.go @@ -65,13 +65,13 @@ func (j *JSONTxnEventEncoder) AppendTxnEvent(event *commonEvent.DMLEvent) error return err } length := len(value) + common.MaxRecordOverhead - // For single message that is longer than max-message-bytes, do not send it. if length > j.config.MaxMessageBytes { log.Warn("Single message is too large for canal-json", zap.Int("maxMessageBytes", j.config.MaxMessageBytes), zap.Int("length", length), zap.Any("table", event.TableInfo.TableName)) - return errors.ErrMessageTooLarge.GenWithStackByArgs(event.TableInfo.GetTargetTableName(), length, j.config.MaxMessageBytes) + return errors.ErrMessageTooLarge.GenWithStackByArgs( + event.TableInfo.GetTargetTableName(), length, j.config.MaxMessageBytes) } j.valueBuf.Write(value) j.valueBuf.Write(j.terminator) diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 83486f3da3..ab448adeda 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -41,9 +41,12 @@ type Config struct { Protocol config.Protocol - // control batch behavior, only for `open-protocol` and `craft` at the moment. + // MaxMessageBytes is the final encoded message size limit. MaxMessageBytes int - MaxBatchSize int + // MaxBatchMessageBytes controls batch splitting. + // If it is not set, codecs use MaxMessageBytes to keep the old behavior. + MaxBatchMessageBytes int + MaxBatchSize int // DeleteOnlyHandleKeyColumns is true, for the delete event only output the handle key columns. DeleteOnlyHandleKeyColumns bool @@ -344,6 +347,20 @@ func (c *Config) WithMaxMessageBytes(bytes int) *Config { return c } +// WithMaxBatchMessageBytes sets the batch splitting threshold. +func (c *Config) WithMaxBatchMessageBytes(bytes int) *Config { + c.MaxBatchMessageBytes = bytes + return c +} + +// BatchMaxMessageBytes returns the batch splitting threshold. +func (c *Config) BatchMaxMessageBytes() int { + if c.MaxBatchMessageBytes > 0 { + return c.MaxBatchMessageBytes + } + return c.MaxMessageBytes +} + // WithChangefeedID set the `changefeedID` func (c *Config) WithChangefeedID(id common.ChangeFeedID) *Config { c.ChangefeedID = id @@ -415,6 +432,20 @@ func (c *Config) Validate() error { errors.Errorf("invalid max-message-bytes %d", c.MaxMessageBytes), ) } + if c.MaxBatchMessageBytes < 0 { + return errors.ErrCodecInvalidConfig.Wrap( + errors.Errorf("invalid max-batch-message-bytes %d", c.MaxBatchMessageBytes), + ) + } + if c.MaxBatchMessageBytes > c.MaxMessageBytes { + return errors.ErrCodecInvalidConfig.Wrap( + errors.Errorf( + "max-batch-message-bytes %d cannot be greater than max-message-bytes %d", + c.MaxBatchMessageBytes, + c.MaxMessageBytes, + ), + ) + } if c.MaxBatchSize <= 0 { return errors.ErrCodecInvalidConfig.Wrap( diff --git a/pkg/sink/codec/common/config_test.go b/pkg/sink/codec/common/config_test.go new file mode 100644 index 0000000000..5f8ecd4005 --- /dev/null +++ b/pkg/sink/codec/common/config_test.go @@ -0,0 +1,35 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestValidateMaxBatchMessageBytes(t *testing.T) { + cfg := NewConfig(config.ProtocolOpen) + cfg.MaxMessageBytes = 100 + cfg.MaxBatchMessageBytes = 101 + + err := cfg.Validate() + require.Error(t, err) + require.ErrorContains( + t, + err, + "max-batch-message-bytes 101 cannot be greater than max-message-bytes 100", + ) +} diff --git a/pkg/sink/codec/open/encoder.go b/pkg/sink/codec/open/encoder.go index 75f82e9d03..3212fb25a8 100644 --- a/pkg/sink/codec/open/encoder.go +++ b/pkg/sink/codec/open/encoder.go @@ -38,7 +38,7 @@ var ( ) // batchEncoder for open protocol will batch multiple row changed events into a single message. -// One message can contain at most MaxBatchSize events, and the total size of the message cannot exceed MaxMessageBytes. +// One message can contain at most MaxBatchSize events, and the total size of the message cannot exceed BatchMaxMessageBytes. type batchEncoder struct { messages []*common.Message // buff the callback of the latest message @@ -95,17 +95,7 @@ func (d *batchEncoder) AppendRowChangedEvent( return errors.Trace(err) } - if length > d.config.MaxMessageBytes { - // message len is larger than max-message-bytes - if d.config.LargeMessageHandle.Disabled() { - log.Warn("Single message is too large for open-protocol", - zap.Int("maxMessageBytes", d.config.MaxMessageBytes), - zap.Int("length", length), - zap.Any("table", e.TableInfo.TableName), - zap.Any("key", key)) - return errors.ErrMessageTooLarge.GenWithStackByArgs(e.TableInfo.GetTargetTableName(), length, d.config.MaxMessageBytes) - } - + if length > d.config.MaxMessageBytes && !d.config.LargeMessageHandle.Disabled() { if d.config.LargeMessageHandle.EnableClaimCheck() { // send the large message to the external storage first, then // create a new message contains the reference of the large message. @@ -149,6 +139,15 @@ func (d *batchEncoder) AppendRowChangedEvent( } } + if length > d.config.MaxMessageBytes { + log.Warn("Single message is too large for open-protocol", + zap.Int("maxMessageBytes", d.config.MaxMessageBytes), + zap.Int("length", length), + zap.Any("table", e.TableInfo.TableName), + zap.Any("key", key)) + return errors.ErrMessageTooLarge.GenWithStackByArgs(e.TableInfo.GetTargetTableName(), length, d.config.MaxMessageBytes) + } + d.pushMessage(key, value, e.Callback) return nil } @@ -174,7 +173,7 @@ func (d *batchEncoder) pushMessage(key, value []byte, callback func()) { binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - if len(d.messages) == 0 || d.messages[len(d.messages)-1].Length()+length > d.config.MaxMessageBytes || d.messages[len(d.messages)-1].GetRowsCount() >= d.config.MaxBatchSize { + if len(d.messages) == 0 || d.messages[len(d.messages)-1].Length()+length > d.config.BatchMaxMessageBytes() || d.messages[len(d.messages)-1].GetRowsCount() >= d.config.MaxBatchSize { d.finalizeCallback() // create a new message versionHead := make([]byte, 8) @@ -244,7 +243,12 @@ func (d *batchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.Message, return nil, errors.Trace(err) } - return common.NewMsg(key, value), nil + message := common.NewMsg(key, value) + if message.Length() > d.config.MaxMessageBytes { + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + e.GetTargetTableName(), message.Length(), d.config.MaxMessageBytes) + } + return message, nil } // EncodeCheckpointEvent implements the RowEventEncoder interface @@ -279,5 +283,10 @@ func (d *batchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) key = keyOutput.Bytes() value := valueOutput.Bytes() - return common.NewMsg(key, value), nil + message := common.NewMsg(key, value) + if message.Length() > d.config.MaxMessageBytes { + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + "checkpoint", message.Length(), d.config.MaxMessageBytes) + } + return message, nil } diff --git a/pkg/sink/codec/open/encoder_test.go b/pkg/sink/codec/open/encoder_test.go index 9b02366709..c0aed9ebb2 100644 --- a/pkg/sink/codec/open/encoder_test.go +++ b/pkg/sink/codec/open/encoder_test.go @@ -885,6 +885,46 @@ func TestMessageTooLarge(t *testing.T) { require.Equal(t, count, 0) } +func TestMessageLargerThanBatchLimit(t *testing.T) { + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolOpen). + WithMaxMessageBytes(400). + WithMaxBatchMessageBytes(100) + encoder, err := NewBatchEncoder(ctx, codecConfig) + require.NoError(t, err) + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + helper.Tk().MustExec("use test") + + job := helper.DDL2Job(`create table test.t(a tinyint primary key, b int)`) + tableInfo := helper.GetTableInfo(job) + dmlEvent := helper.DML2Event("test", "t", `insert into test.t values (1, 123)`) + require.NotNil(t, dmlEvent) + insertRow, ok := dmlEvent.GetNextRow() + require.True(t, ok) + + count := 0 + insertRowEvent := &commonEvent.RowEvent{ + TableInfo: tableInfo, + CommitTs: dmlEvent.GetCommitTs(), + Event: insertRow, + ColumnSelector: columnselector.NewDefaultColumnSelector(), + Callback: func() { count += 1 }, + } + + err = encoder.AppendRowChangedEvent(ctx, "", insertRowEvent) + require.NoError(t, err) + + messages := encoder.Build() + require.Len(t, messages, 1) + require.Equal(t, 1, messages[0].GetRowsCount()) + require.Equal(t, 0, count) + + messages[0].Callback() + require.Equal(t, 1, count) +} + func TestLargeMessageWithHandleEnableHandleKeyOnly(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() diff --git a/pkg/sink/codec/simple/encoder.go b/pkg/sink/codec/simple/encoder.go index b8ef228561..9648f1b42e 100644 --- a/pkg/sink/codec/simple/encoder.go +++ b/pkg/sink/codec/simple/encoder.go @@ -67,19 +67,19 @@ func (e *Encoder) AppendRowChangedEvent(ctx context.Context, _ string, event *co result.IncRowsCount() length := result.Length() - if length <= e.config.MaxMessageBytes { + if length <= e.config.MaxMessageBytes || e.config.LargeMessageHandle.Disabled() { + if length > e.config.MaxMessageBytes { + log.Error("Single message is too large for simple", + zap.Int("maxMessageBytes", e.config.MaxMessageBytes), + zap.Int("length", length), + zap.Any("table", event.TableInfo.TableName)) + return errors.ErrMessageTooLarge.GenWithStackByArgs( + event.TableInfo.GetTargetTableName(), length, e.config.MaxMessageBytes) + } e.messages = append(e.messages, result) return nil } - if e.config.LargeMessageHandle.Disabled() { - log.Error("Single message is too large for simple", - zap.Int("maxMessageBytes", e.config.MaxMessageBytes), - zap.Int("length", length), - zap.Any("table", event.TableInfo.TableName)) - return errors.ErrMessageTooLarge.GenWithStackByArgs(event.TableInfo.GetTargetTableName(), length, e.config.MaxMessageBytes) - } - var claimCheckLocation string if e.config.LargeMessageHandle.EnableClaimCheck() { fileName := claimcheck.NewFileName() @@ -113,7 +113,8 @@ func (e *Encoder) AppendRowChangedEvent(ctx context.Context, _ string, event *co zap.Int("maxMessageBytes", e.config.MaxMessageBytes), zap.Int("length", result.Length()), zap.Any("table", event.TableInfo.TableName)) - return errors.ErrMessageTooLarge.GenWithStackByArgs(event.TableInfo.GetTargetTableName(), result.Length(), e.config.MaxMessageBytes) + return errors.ErrMessageTooLarge.GenWithStackByArgs( + event.TableInfo.GetTargetTableName(), result.Length(), e.config.MaxMessageBytes) } // Build implement the RowEventEncoder interface @@ -135,7 +136,15 @@ func (e *Encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { value, err = common.Compress(e.config.ChangefeedID, e.config.LargeMessageHandle.LargeMessageHandleCompression, value) - return common.NewMsg(nil, value), err + if err != nil { + return nil, err + } + result := common.NewMsg(nil, value) + if result.Length() > e.config.MaxMessageBytes { + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + "checkpoint", result.Length(), e.config.MaxMessageBytes) + } + return result, nil } // EncodeDDLEvent implement the DDLEventBatchEncoder interface @@ -157,7 +166,8 @@ func (e *Encoder) EncodeDDLEvent(event *commonEvent.DDLEvent) (*common.Message, zap.Int("maxMessageBytes", e.config.MaxMessageBytes), zap.Int("length", result.Length()), zap.String("table", event.GetTargetTableName())) - return nil, errors.ErrMessageTooLarge.GenWithStackByArgs(event.GetTargetTableName(), result.Length(), e.config.MaxMessageBytes) + return nil, errors.ErrMessageTooLarge.GenWithStackByArgs( + event.GetTargetTableName(), result.Length(), e.config.MaxMessageBytes) } return result, nil } diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 68170f6f05..ae2f9faf94 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -1592,6 +1592,26 @@ func TestDMLMessageTooLarge(t *testing.T) { } } +func TestDMLLargerThanBatchLimit(t *testing.T) { + _, insertEvent, _, _ := common.NewLargeEvent4Test(t) + + codecConfig := common.NewConfig(config.ProtocolSimple) + codecConfig.MaxMessageBytes = config.DefaultMaxMessageBytes + codecConfig.MaxBatchMessageBytes = 50 + + enc, err := NewEncoder(context.Background(), codecConfig) + require.NoError(t, err) + + err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent) + require.NoError(t, err) + + messages := enc.Build() + require.Len(t, messages, 1) + require.Greater(t, messages[0].Length(), codecConfig.MaxBatchMessageBytes) + require.LessOrEqual(t, messages[0].Length(), codecConfig.MaxMessageBytes) + require.Equal(t, 1, messages[0].GetRowsCount()) +} + func TestLargerMessageHandleClaimCheck(t *testing.T) { ddlEvent, _, updateEvent, _ := common.NewLargeEvent4Test(t) diff --git a/pkg/sink/kafka/options.go b/pkg/sink/kafka/options.go index c9b992814e..6bef416711 100644 --- a/pkg/sink/kafka/options.go +++ b/pkg/sink/kafka/options.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/IBM/sarama" "github.com/gin-gonic/gin/binding" "github.com/imdario/mergo" "github.com/pingcap/errors" @@ -40,13 +41,6 @@ const ( defaultPartitionNum = 3 // defaultMaxRetry is the default retry budget for Kafka producers. defaultMaxRetry = 5 - - // the `max-message-bytes` is set equal to topic's `max.message.bytes`, and is used to check - // whether the message is larger than the max size limit. It's found some message pass the message - // size limit check at the client side and failed at the broker side since message enlarged during - // the network transmission. so we set the `max-message-bytes` to a smaller value to avoid this problem. - // maxMessageBytesOverhead is used to reduce the `max-message-bytes`. - maxMessageBytesOverhead = 128 ) const ( @@ -143,7 +137,7 @@ type urlConfig struct { InsecureSkipVerify *bool `form:"insecure-skip-verify"` } -// options stores user specified configurations +// options stores Kafka sink configurations type options struct { Topic string BrokerEndpoints []string @@ -156,14 +150,17 @@ type options struct { Version string IsAssignedVersion bool RequestVersion int16 - MaxMessageBytes int - MaxRetry int - Compression string - ClientID string - RequiredAcks RequiredAcks - // Only for test. User can not set this value. - // The current prod default value is 0. - MaxMessages int + + // MaxMessageBytes controls the byte size limit of the producer. + MaxMessageBytes int + // BatchMaxMessageBytes controls the byte size limit when batching messages. + // this is not exposed, and is inferred from the `MaxMessageBytes` and kafka related configurations in the `adjustOption`. + BatchMaxMessageBytes int + + MaxRetry int + Compression string + ClientID string + RequiredAcks RequiredAcks // Credential is used to connect to kafka cluster. EnableTLS bool @@ -180,20 +177,20 @@ type options struct { // NewOptions returns a default Kafka configuration func NewOptions() *options { return &options{ - Version: "2.4.0", - // MaxMessageBytes will be used to initialize producer - MaxMessageBytes: config.DefaultMaxMessageBytes, - MaxRetry: defaultMaxRetry, - ReplicationFactor: 1, - Compression: "none", - RequiredAcks: WaitForAll, - Credential: &security.Credential{}, - InsecureSkipVerify: false, - SASL: &security.SASL{}, - AutoCreate: true, - DialTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - ReadTimeout: 10 * time.Second, + Version: "2.4.0", + MaxMessageBytes: config.DefaultMaxMessageBytes, + BatchMaxMessageBytes: config.DefaultMaxMessageBytes, + MaxRetry: defaultMaxRetry, + ReplicationFactor: 1, + Compression: "none", + RequiredAcks: WaitForAll, + Credential: &security.Credential{}, + InsecureSkipVerify: false, + SASL: &security.SASL{}, + AutoCreate: true, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, } } @@ -260,6 +257,7 @@ func (o *options) Apply(changefeedID common.ChangeFeedID, if urlParameter.MaxMessageBytes != nil { o.MaxMessageBytes = *urlParameter.MaxMessageBytes } + o.BatchMaxMessageBytes = o.MaxMessageBytes if urlParameter.MaxRetry != nil && *urlParameter.MaxRetry >= 0 { o.MaxRetry = *urlParameter.MaxRetry @@ -575,7 +573,9 @@ func NewKafkaClientID(captureAddr string, return } -// adjustOptions adjust the `options` and `sarama.Config` by condition. +// adjustOptions adjusts options with Kafka runtime metadata. +// It overwrites MaxMessageBytes with the final producer message limit derived +// from the topic or broker configuration. func adjustOptions( ctx context.Context, admin ClusterAdminClient, @@ -587,88 +587,94 @@ func adjustOptions( return errors.Trace(err) } - // Only check replicationFactor >= minInsyncReplicas when producer's required acks is -1. - // If we don't check it, the producer probably can not send message to the topic. - // Because it will wait for the ack from all replicas. But we do not have enough replicas. - if options.RequiredAcks == WaitForAll { - err = validateMinInsyncReplicas(ctx, admin, topics, topic, int(options.ReplicationFactor)) - if err != nil { - return errors.Trace(err) - } + if err = validateRequiredAcks(ctx, admin, topics, topic, options); err != nil { + return errors.Trace(err) } + return adjustTopicOptions(ctx, admin, options, topic, topics) +} +func adjustTopicOptions( + ctx context.Context, + admin ClusterAdminClient, + options *options, + topic string, + topics map[string]TopicDetail, +) error { + batchMaxBytes := options.MaxMessageBytes info, exists := topics[topic] // once we have found the topic, no matter `auto-create-topic`, // make sure user input parameters are valid. + var err error if exists { - // make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes` - topicMaxMessageBytesStr, err := getTopicConfig( - ctx, admin, info.Name, - TopicMaxMessageBytesConfigName, - BrokerMessageMaxBytesConfigName, - ) - if err != nil { - return errors.Trace(err) - } - topicMaxMessageBytes, err := strconv.Atoi(topicMaxMessageBytesStr) - if err != nil { - return errors.Trace(err) - } - - maxMessageBytes := topicMaxMessageBytes - maxMessageBytesOverhead - if topicMaxMessageBytes <= options.MaxMessageBytes { - log.Warn("topic's `max.message.bytes` less than the `max-message-bytes`,"+ - "use topic's `max.message.bytes` to initialize the Kafka producer", - zap.Int("max.message.bytes", topicMaxMessageBytes), - zap.Int("max-message-bytes", options.MaxMessageBytes), - zap.Int("real-max-message-bytes", maxMessageBytes)) - options.MaxMessageBytes = maxMessageBytes - } else { - if maxMessageBytes < options.MaxMessageBytes { - options.MaxMessageBytes = maxMessageBytes - } - } - - // no need to create the topic, - // but we would have to log user if they found enter wrong topic name later - if options.AutoCreate { - log.Warn("topic already exist, TiCDC will not create the topic", - zap.String("topic", topic), zap.Any("detail", info)) - } + err = adjustExistingTopicOptions(ctx, admin, options, topic, info) + } else { + err = adjustNewTopicOptions(admin, options, topic) + } + if err != nil { + return err + } - if err = options.setPartitionNum(info.NumPartitions); err != nil { - return errors.Trace(err) - } + options.BatchMaxMessageBytes = min(batchMaxBytes, options.MaxMessageBytes) + return nil +} +func validateRequiredAcks( + ctx context.Context, + admin ClusterAdminClient, + topics map[string]TopicDetail, + topic string, + options *options, +) error { + // Only check replicationFactor >= minInsyncReplicas when producer's required acks is -1. + // If we don't check it, the producer probably can not send message to the topic. + // Because it will wait for the ack from all replicas. But we do not have enough replicas. + if options.RequiredAcks != WaitForAll { return nil } + return validateMinInsyncReplicas(ctx, admin, topics, topic, int(options.ReplicationFactor)) +} - brokerMessageMaxBytesStr, err := admin.GetBrokerConfig(BrokerMessageMaxBytesConfigName) +func adjustExistingTopicOptions( + ctx context.Context, + admin ClusterAdminClient, + options *options, + topic string, + info TopicDetail, +) error { + topicMaxMessageBytes, err := getTopicMaxMessageBytes(ctx, admin, info.Name) if err != nil { - log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration") - return errors.Trace(err) + return err } - brokerMessageMaxBytes, err := strconv.Atoi(brokerMessageMaxBytesStr) - if err != nil { + if err = options.setMaxMessageBytes(topicMaxMessageBytes); err != nil { + return err + } + + // no need to create the topic, + // but we would have to log user if they found enter wrong topic name later + if options.AutoCreate { + log.Warn("topic already exist, TiCDC will not create the topic", + zap.String("topic", topic), zap.Any("detail", info)) + } + + if err = options.setPartitionNum(info.NumPartitions); err != nil { return errors.Trace(err) } + return nil +} +func adjustNewTopicOptions( + admin ClusterAdminClient, + options *options, + topic string, +) error { // when create the topic, `max.message.bytes` is decided by the broker, // it would use broker's `message.max.bytes` to set topic's `max.message.bytes`. - // TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than - // broker's `message.max.bytes`. - maxMessageBytes := brokerMessageMaxBytes - maxMessageBytesOverhead - if brokerMessageMaxBytes <= options.MaxMessageBytes { - log.Warn("broker's `message.max.bytes` less than the `max-message-bytes`,"+ - "use broker's `message.max.bytes` to initialize the Kafka producer", - zap.Int("message.max.bytes", brokerMessageMaxBytes), - zap.Int("max-message-bytes", options.MaxMessageBytes), - zap.Int("real-max-message-bytes", maxMessageBytes)) - options.MaxMessageBytes = maxMessageBytes - } else { - if maxMessageBytes < options.MaxMessageBytes { - options.MaxMessageBytes = maxMessageBytes - } + brokerMessageMaxBytes, err := getBrokerMaxMessageBytes(admin) + if err != nil { + return err + } + if err = options.setMaxMessageBytes(brokerMessageMaxBytes); err != nil { + return err } // topic not exists yet, and user does not specify the `partition-num` in the sink uri. @@ -680,6 +686,49 @@ func adjustOptions( return nil } +func getTopicMaxMessageBytes( + ctx context.Context, + admin ClusterAdminClient, + topic string, +) (int, error) { + maxMessageBytesStr, err := getTopicConfig( + ctx, admin, topic, + TopicMaxMessageBytesConfigName, + BrokerMessageMaxBytesConfigName, + ) + if err != nil { + return 0, errors.Trace(err) + } + maxMessageBytes, err := strconv.Atoi(maxMessageBytesStr) + if err != nil { + return 0, errors.Trace(err) + } + return maxMessageBytes, nil +} + +func getBrokerMaxMessageBytes(admin ClusterAdminClient) (int, error) { + maxMessageBytesStr, err := admin.GetBrokerConfig(BrokerMessageMaxBytesConfigName) + if err != nil { + log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration") + return 0, errors.Trace(err) + } + maxMessageBytes, err := strconv.Atoi(maxMessageBytesStr) + if err != nil { + return 0, errors.Trace(err) + } + return maxMessageBytes, nil +} + +func (o *options) setMaxMessageBytes(kafkaMaxMessageBytes int) error { + // Sarama ignores Producer.MaxMessageBytes when it is not smaller than MaxRequestSize. + o.MaxMessageBytes = min(kafkaMaxMessageBytes, int(sarama.MaxRequestSize)-1) + if o.MaxMessageBytes <= 0 { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "invalid Kafka max message bytes %d", kafkaMaxMessageBytes) + } + return nil +} + func validateMinInsyncReplicas( ctx context.Context, admin ClusterAdminClient, diff --git a/pkg/sink/kafka/options_test.go b/pkg/sink/kafka/options_test.go index 0650d3baf2..b4b8058aac 100644 --- a/pkg/sink/kafka/options_test.go +++ b/pkg/sink/kafka/options_test.go @@ -158,12 +158,8 @@ func (f *kafkaAdminFixture) setMessageMaxBytes(brokerValue, topicValue string) { f.topicConfig[defaultMockTopicName][TopicMaxMessageBytesConfigName] = topicValue } -func expectedAdjustedMaxMessageBytes(configuredMaxMessageBytes, sourceMaxMessageBytes int) int { - sourceMaxMessageBytes -= maxMessageBytesOverhead - if configuredMaxMessageBytes < sourceMaxMessageBytes { - return configuredMaxMessageBytes - } - return sourceMaxMessageBytes +func expectedAdjustedMaxMessageBytes(sourceMaxMessageBytes int) int { + return min(sourceMaxMessageBytes, int(sarama.MaxRequestSize)-1) } func (f *kafkaAdminFixture) setMinInsyncReplicas(minInsyncReplicas string) { @@ -193,6 +189,7 @@ func TestCompleteOptions(t *testing.T) { require.Equal(t, int16(3), options.ReplicationFactor) require.Equal(t, "2.6.0", options.Version) require.Equal(t, 4096, options.MaxMessageBytes) + require.Equal(t, 4096, options.BatchMaxMessageBytes) require.Equal(t, WaitForLocal, options.RequiredAcks) require.Equal(t, defaultMaxRetry, options.MaxRetry) @@ -375,13 +372,13 @@ func TestAdjustConfigFallsBackToBrokerMessageMaxBytesWhenTopicConfigMissing(t *t configuredMaxMessageBytes func(*kafkaAdminFixture) int }{ { - name: "keeps configured value below broker limit", + name: "uses broker limit when configured value is below broker", configuredMaxMessageBytes: func(*kafkaAdminFixture) int { return 1024 }, }, { - name: "uses broker limit when configured value is within overhead", + name: "uses broker limit when configured value is below broker by one byte", configuredMaxMessageBytes: func(f *kafkaAdminFixture) int { return f.brokerMessageMaxBytes() - 1 }, @@ -410,11 +407,10 @@ func TestAdjustConfigFallsBackToBrokerMessageMaxBytesWhenTopicConfigMissing(t *t options := NewOptions() options.BrokerEndpoints = []string{"127.0.0.1:9092"} - options.MaxMessageBytes = test.configuredMaxMessageBytes(adminFixture) - expectedMaxMessageBytes := expectedAdjustedMaxMessageBytes( - options.MaxMessageBytes, - adminFixture.brokerMessageMaxBytes(), - ) + configuredMaxMessageBytes := test.configuredMaxMessageBytes(adminFixture) + options.MaxMessageBytes = configuredMaxMessageBytes + expectedProducerLimit := expectedAdjustedMaxMessageBytes( + adminFixture.brokerMessageMaxBytes()) ctx := context.Background() err = adjustOptions(ctx, adminClient, options, topicName) @@ -423,8 +419,14 @@ func TestAdjustConfigFallsBackToBrokerMessageMaxBytesWhenTopicConfigMissing(t *t saramaConfig, err := newSaramaConfig(ctx, options) require.NoError(t, err) - require.Equal(t, expectedMaxMessageBytes, options.MaxMessageBytes) - require.Equal(t, expectedMaxMessageBytes, saramaConfig.Producer.MaxMessageBytes) + require.NotEqual(t, configuredMaxMessageBytes, options.MaxMessageBytes) + require.Equal(t, expectedProducerLimit, options.MaxMessageBytes) + require.Equal( + t, + min(configuredMaxMessageBytes, expectedProducerLimit), + options.BatchMaxMessageBytes, + ) + require.Equal(t, expectedProducerLimit, saramaConfig.Producer.MaxMessageBytes) }) } } @@ -559,7 +561,7 @@ func TestConfigurationCombinations(t *testing.T) { mockTopicMessageMaxBytes, }, { - "new topic broker overhead below user", + "new topic broker below user", "kafka://127.0.0.1:9092/%s?max-message-bytes=%s", []any{"not-created-topic", strconv.Itoa(1024*1024 + 1)}, mockBrokerMessageMaxBytes, @@ -625,12 +627,19 @@ func TestConfigurationCombinations(t *testing.T) { strconv.Itoa(config.DefaultMaxMessageBytes + 1), }, { - "existing topic topic overhead below user", + "existing topic topic below user", "kafka://127.0.0.1:9092/%s?max-message-bytes=%s", []any{defaultMockTopicName, strconv.Itoa(1024*1024 + 1)}, mockBrokerMessageMaxBytes, mockTopicMessageMaxBytes, }, + { + "existing topic topic above sarama request limit", + "kafka://127.0.0.1:9092/%s", + []any{defaultMockTopicName}, + mockBrokerMessageMaxBytes, + strconv.Itoa(int(sarama.MaxRequestSize) + 4096), + }, { "existing topic topic below default and user", "kafka://127.0.0.1:9092/%s?max-message-bytes=%s", @@ -677,6 +686,7 @@ func TestConfigurationCombinations(t *testing.T) { options := NewOptions() err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) require.Nil(t, err) + configuredMaxMessageBytes := options.MaxMessageBytes topic, ok := a.uriParams[0].(string) require.True(t, ok) @@ -686,15 +696,20 @@ func TestConfigurationCombinations(t *testing.T) { if _, exists := adminFixture.topics[topic]; exists { sourceMaxMessageBytes = adminFixture.topicMaxMessageBytes(topic) } - expectedMaxMessageBytes := expectedAdjustedMaxMessageBytes(options.MaxMessageBytes, sourceMaxMessageBytes) + expectedProducerLimit := expectedAdjustedMaxMessageBytes(sourceMaxMessageBytes) err = adjustOptions(ctx, adminClient, options, topic) require.Nil(t, err) - require.Equal(t, expectedMaxMessageBytes, options.MaxMessageBytes) + require.Equal(t, expectedProducerLimit, options.MaxMessageBytes) + require.Equal( + t, + min(configuredMaxMessageBytes, expectedProducerLimit), + options.BatchMaxMessageBytes, + ) saramaConfig, err := newSaramaConfig(ctx, options) require.Nil(t, err) - require.Equal(t, expectedMaxMessageBytes, saramaConfig.Producer.MaxMessageBytes) + require.Equal(t, expectedProducerLimit, saramaConfig.Producer.MaxMessageBytes) encoderConfig := common.NewConfig(config.ProtocolOpen) err = encoderConfig.Apply(sinkURI, &config.SinkConfig{ @@ -703,13 +718,19 @@ func TestConfigurationCombinations(t *testing.T) { }, }) require.Nil(t, err) - encoderConfig.WithMaxMessageBytes(options.MaxMessageBytes) + encoderConfig. + WithMaxMessageBytes(options.MaxMessageBytes). + WithMaxBatchMessageBytes(options.BatchMaxMessageBytes) err = encoderConfig.Validate() require.Nil(t, err) - // producer's `MaxMessageBytes` = encoder's `MaxMessageBytes`. - require.Equal(t, expectedMaxMessageBytes, encoderConfig.MaxMessageBytes) + require.Equal(t, expectedProducerLimit, encoderConfig.MaxMessageBytes) + require.Equal( + t, + min(configuredMaxMessageBytes, expectedProducerLimit), + encoderConfig.BatchMaxMessageBytes(), + ) adminClient.Close() }) @@ -754,6 +775,7 @@ func TestMerge(t *testing.T) { require.Equal(t, int16(5), c.ReplicationFactor) require.Equal(t, "3.1.2", c.Version) require.Equal(t, 1024*1024, c.MaxMessageBytes) + require.Equal(t, 1024*1024, c.BatchMaxMessageBytes) require.Equal(t, "gzip", c.Compression) require.Equal(t, "test-id", c.ClientID) require.Equal(t, true, c.AutoCreate) @@ -835,6 +857,7 @@ func TestMerge(t *testing.T) { require.Equal(t, int16(5), c.ReplicationFactor) require.Equal(t, "3.1.2", c.Version) require.Equal(t, 1024*1024, c.MaxMessageBytes) + require.Equal(t, 1024*1024, c.BatchMaxMessageBytes) require.Equal(t, "gzip", c.Compression) require.Equal(t, "test-id", c.ClientID) require.Equal(t, true, c.AutoCreate) diff --git a/pkg/sink/kafka/sarama_config.go b/pkg/sink/kafka/sarama_config.go index b53dc47b37..d18e8b0c56 100644 --- a/pkg/sink/kafka/sarama_config.go +++ b/pkg/sink/kafka/sarama_config.go @@ -62,7 +62,7 @@ func newSaramaConfig(ctx context.Context, o *options) (*sarama.Config, error) { config.Producer.Flush.Bytes = 0 config.Producer.Flush.Messages = 0 config.Producer.Flush.Frequency = time.Duration(0) - config.Producer.Flush.MaxMessages = o.MaxMessages + config.Producer.Flush.MaxMessages = 0 config.Net.MaxOpenRequests = 1 config.Net.DialTimeout = o.DialTimeout