Skip to content

kafka: decouple the max-message-bytes, allow it can be set by adjust kafka configurations#5420

Open
3AceShowHand wants to merge 3 commits into
pingcap:masterfrom
3AceShowHand:kafka-decouple-max-message-bytes
Open

kafka: decouple the max-message-bytes, allow it can be set by adjust kafka configurations#5420
3AceShowHand wants to merge 3 commits into
pingcap:masterfrom
3AceShowHand:kafka-decouple-max-message-bytes

Conversation

@3AceShowHand

@3AceShowHand 3AceShowHand commented Jun 16, 2026

Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: close #1405

Kafka sink used max-message-bytes for multiple purposes:

  • the changefeed configured batch splitting threshold;
  • the codec large-message handling trigger;
  • the final encoded Kafka message size limit used by the producer.

Because of this coupling, when a single row event is larger than the changefeed max-message-bytes, operators have to pause the changefeed, increase max-message-bytes, and also update the downstream Kafka topic or broker message size limit. This is operationally cumbersome. The desired behavior is that once Kafka topic or broker accepts a larger message, TiCDC can use that larger producer limit without requiring a changefeed configuration update.

What is changed and how it works?

This PR decouples the batch threshold from the final producer message limit.

  • Kafka sink reads the user configured max-message-bytes first and keeps that value as the batch splitting threshold.
  • Kafka sink then derives the final producer-side maximum message size from Kafka topic max.message.bytes or broker message.max.bytes, and caps it below Sarama's request size limit.
  • options.MaxMessageBytes is adjusted after Kafka metadata is read, so it represents the final producer hard limit after adjustOptions.
  • Codec Config uses MaxBatchMessageBytes for batch splitting decisions and MaxMessageBytes for the final encoded message hard limit.
  • Large-message handling is triggered by the producer hard limit, not by the batch splitting threshold. A single row event larger than the batch threshold but still within Kafka's producer limit can be sent directly.
  • The Kafka consumer used in tests is adjusted to match the new producer limit behavior.

Result

After this change, when Kafka rejects a message because the downstream topic or broker limit is too small, operators only need to increase Kafka's message size configuration. TiCDC can then pick up the downstream limit as the producer hard limit, without requiring a manual changefeed max-message-bytes update.

The original changefeed max-message-bytes behavior for batching remains compatible: existing batch splitting behavior is preserved unless Kafka's producer hard limit is smaller, in which case the batch threshold is clamped to the producer limit to avoid forming unsendable batches.

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No expected performance regression. The change is configuration plumbing and encoder-side limit selection. It preserves the existing max-message-bytes semantics for batch splitting, while allowing Kafka's actual producer limit to be derived from downstream Kafka configuration.

Do you need to update user documentation, design documentation or monitoring documentation?

No monitoring update is needed. User-facing documentation may be updated separately to clarify that Kafka topic/broker message size limits can be increased without changing the changefeed max-message-bytes setting.

Release note

Fix Kafka sink message size handling so increasing Kafka topic or broker message size limits can unblock large single-row events without changing the TiCDC changefeed `max-message-bytes` configuration.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added support for configurable batch message size limits, enabling independent control over individual message and batched message size constraints.
  • Bug Fixes

    • Enhanced message size validation across codec implementations to prevent oversized messages from being returned.
    • Improved Kafka producer message size enforcement to comply with underlying constraints.
  • Tests

    • Added test coverage for batch message size limit scenarios.

@ti-chi-bot ti-chi-bot Bot added the release-note Denotes a PR that will be considered when it comes time to generate release notes. label Jun 16, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 16, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign asddongmen for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 54590172-2942-4139-901e-91ddc42a3e84

📥 Commits

Reviewing files that changed from the base of the PR and between 56e5b48 and 5d1388b.

📒 Files selected for processing (12)
  • downstreamadapter/sink/helper/helper.go
  • downstreamadapter/sink/kafka/helper.go
  • downstreamadapter/sink/kafka/sink_test.go
  • pkg/sink/codec/canal/canal_json_encoder.go
  • pkg/sink/codec/common/config.go
  • pkg/sink/codec/common/config_test.go
  • pkg/sink/codec/open/encoder.go
  • pkg/sink/codec/simple/encoder.go
  • pkg/sink/codec/simple/encoder_test.go
  • pkg/sink/kafka/options.go
  • pkg/sink/kafka/options_test.go
  • pkg/sink/kafka/sarama_config.go
✅ Files skipped from review due to trivial changes (1)
  • downstreamadapter/sink/kafka/helper.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • pkg/sink/codec/simple/encoder_test.go
  • downstreamadapter/sink/kafka/sink_test.go
  • pkg/sink/codec/common/config.go

📝 Walkthrough

Walkthrough

Introduces MaxBatchMessageBytes to codec Config and refactors Kafka's max-message-size handling to clamp against sarama.MaxRequestSize instead of using a fixed overhead subtraction. GetEncoderConfig now accepts both message size limits and computes batchMaxMessageBytes = min(configured, producer). All encoders (open, canal, simple, avro) are updated to use BatchMaxMessageBytes() for batch-split decisions and MaxMessageBytes for per-message hard limits.

Changes

Separate batch and producer message size limits

Layer / File(s) Summary
Codec Config: MaxBatchMessageBytes field, accessor, and validation
pkg/sink/codec/common/config.go, pkg/sink/codec/common/config_test.go
Adds MaxBatchMessageBytes field, WithMaxBatchMessageBytes setter, BatchMaxMessageBytes() accessor (falls back to MaxMessageBytes when unset or non-positive), and Validate() guard rejecting negative values and configurations where MaxBatchMessageBytes exceeds MaxMessageBytes.
Kafka options: ProducerMaxMessageBytes clamping and initialization
pkg/sink/kafka/options.go, pkg/sink/kafka/sarama_config.go
Adds Sarama import and removes maxMessageBytesOverhead constant; adds ProducerMaxMessageBytes field to options struct; updates NewOptions() to initialize both MaxMessageBytes and BatchMaxMessageBytes; adds Apply() sync when MaxMessageBytes is overridden; introduces setProducerMaxMessageBytes helper clamping to min(kafkaMaxMessageBytes, sarama.MaxRequestSize-1); updates Sarama producer config to set Flush.MaxMessages to 0.
Kafka adjustOptions: topic/broker config derivation and batchMaxMessageBytes computation
pkg/sink/kafka/options.go
Refactors adjustOptions to validate required acks via helper, fetch topic/broker max message bytes, apply producer limit clamping via setProducerMaxMessageBytes, and compute final BatchMaxMessageBytes as min(pre-adjust, final MaxMessageBytes); adds helper functions to retrieve topic and broker max-message-bytes settings.
Kafka options tests: producer limit assertions and scenario updates
pkg/sink/kafka/options_test.go
Updates expectedAdjustedMaxMessageBytes helper to cap source max at sarama.MaxRequestSize-1; refactors TestAdjustConfigFallsBackToBrokerMessageMaxBytesWhenTopicConfigMissing to compute expected producer limit separately and assert both MaxMessageBytes and BatchMaxMessageBytes post-adjustment; adds/renames scenario cases for "broker below user" and "topic above sarama"; updates TestConfigurationCombinations with expectedProducerLimit logic and encoder BatchMaxMessageBytes assertions; adds BatchMaxMessageBytes assertions to TestCompleteOptions and TestMerge.
GetEncoderConfig: two-parameter signature and batchMaxMessageBytes computation
downstreamadapter/sink/helper/helper.go
GetEncoderConfig now accepts configuredMaxMessageBytes and producerMaxMessageBytes separately; wires MaxMessageBytes from producer value and MaxBatchMessageBytes from configured value; rest of encoder-config flow unchanged.
Sink callers: updated GetEncoderConfig invocations
downstreamadapter/sink/kafka/helper.go, downstreamadapter/sink/kafka/sink_test.go, downstreamadapter/sink/pulsar/helper.go, downstreamadapter/sink/cloudstorage/sink.go, downstreamadapter/sink/cloudstorage/encoder_group_test.go
All sink callers pass the new two-argument form with multi-line formatting; Kafka sinks pass options.MaxMessageBytes and options.BatchMaxMessageBytes; cloud-storage and Pulsar pass math.MaxInt and config.DefaultMaxMessageBytes.
Open and simple encoders: BatchMaxMessageBytes for batch decisions
pkg/sink/codec/open/encoder.go, pkg/sink/codec/simple/encoder.go
Open encoder: updates batch encoder comment, gates large-message-handle path on both MaxMessageBytes and handle-enabled checks, adds post-processing overflow guard, uses BatchMaxMessageBytes() for batch-split logic, guards checkpoint/DDL events. Simple encoder: restructures AppendRowChangedEvent to append when length ≤ MaxMessageBytes or handle disabled (returning ErrMessageTooLarge when disabled and oversized); guards checkpoint/DDL events with MaxMessageBytes checks.
Canal and Avro encoders: explicit MaxMessageBytes guards
pkg/sink/codec/canal/canal_json_encoder.go, pkg/sink/codec/canal/canal_json_txn_encoder.go, pkg/sink/codec/avro/encoder.go
All three: EncodeCheckpointEvent and EncodeDDLEvent construct message first, check MaxMessageBytes, return ErrMessageTooLarge on overflow. Canal JSON: AppendRowChangedEvent gates large-message-handling on both MaxMessageBytes and handle-enabled, adds unified overflow check. Canal txn: reformats error-return formatting.
Encoder tests: batch-versus-max-limit scenarios
pkg/sink/codec/open/encoder_test.go, pkg/sink/codec/simple/encoder_test.go, pkg/sink/codec/canal/canal_json_encoder_test.go
Open encoder: adds TestMessageLargerThanBatchLimit for single-row event between batch and max limits. Simple encoder: adds TestDMLLargerThanBatchLimit with MaxBatchMessageBytes=50. Canal encoder: updates TestMaxMessageBytes to set WithMaxBatchMessageBytes(100) and verify batch-specific path.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~70 minutes

Suggested labels

lgtm, approved, size/XXL

Suggested reviewers

  • tenfyzhong
  • wk989898
  • asddongmen

Poem

🐇 Two limits now dance in harmony's way,
Batch and producer—both have their say.
Sarama's request cap, no overhead's plight,
Each message finds the right batching height.
The rabbit hops through—limits aligned tight,
Encoding flows smooth from morning to night! 🚀

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.24% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main change: decoupling max-message-bytes configuration in Kafka sink and allowing it to be set via Kafka configuration adjustments.
Linked Issues check ✅ Passed The PR successfully decouples max-message-bytes into separate batch and producer limits, uses MaxBatchMessageBytes for batch splitting and MaxMessageBytes for the producer hard limit, derives producer limits from Kafka configuration, and allows single events larger than batch threshold but within Kafka limits to be sent, fully addressing issue #1405's objectives.
Out of Scope Changes check ✅ Passed All changes are directly related to decoupling max-message-bytes configuration and implementing separate batch/producer size limits. The changes to encoder logic, Kafka options handling, and codec configuration are all necessary to support the core objective without unrelated modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

Tools execution failed with the following error:

Failed to run tools: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error)


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Jun 16, 2026
@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request decouples the final encoded message size limit (MaxMessageBytes) from the batch splitting and large-message threshold (MaxBatchMessageBytes) across various TiCDC sinks and codecs (including Kafka, Pulsar, Cloud Storage, Avro, Canal JSON, Open Protocol, and Simple). This allows for more granular control over message batching and limits. The feedback suggests simplifying duplicate validation logic in the simple encoder and adding configuration validation to ensure MaxBatchMessageBytes does not exceed MaxMessageBytes.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread pkg/sink/codec/simple/encoder.go Outdated
Comment on lines +435 to +439
if c.MaxBatchMessageBytes < 0 {
return errors.ErrCodecInvalidConfig.Wrap(
errors.Errorf("invalid max-batch-message-bytes %d", c.MaxBatchMessageBytes),
)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

We should validate that MaxBatchMessageBytes is not greater than MaxMessageBytes. A batch size threshold larger than the absolute maximum message size is an invalid configuration and could lead to unexpected behavior.

	if c.MaxBatchMessageBytes < 0 {
		return errors.ErrCodecInvalidConfig.Wrap(
			errors.Errorf("invalid max-batch-message-bytes %d", c.MaxBatchMessageBytes),
		)
	}
	if c.MaxBatchMessageBytes > c.MaxMessageBytes {
		return errors.ErrCodecInvalidConfig.Wrap(
			errors.Errorf("max-batch-message-bytes %d cannot be greater than max-message-bytes %d", c.MaxBatchMessageBytes, c.MaxMessageBytes),
		)
	}

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@pkg/sink/codec/simple/encoder_test.go`:
- Around line 1595-1611: The TestDMLLargerThanBatchLimit test sets
MaxBatchMessageBytes to 50 but never verifies that the actual encoded message
payload exceeds this threshold, making the test non-deterministic and unable to
guarantee it exercises the intended code path. After calling enc.Build() to
obtain the messages, add an assertion that explicitly checks the payload size of
messages[0] is greater than the MaxBatchMessageBytes limit to ensure the test
fixture is sufficiently large and the behavior remains deterministic.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5d97615e-8665-4e5f-9988-6942075570a7

📥 Commits

Reviewing files that changed from the base of the PR and between aa0083b and 56e5b48.

📒 Files selected for processing (18)
  • downstreamadapter/sink/cloudstorage/encoder_group_test.go
  • downstreamadapter/sink/cloudstorage/sink.go
  • downstreamadapter/sink/helper/helper.go
  • downstreamadapter/sink/kafka/helper.go
  • downstreamadapter/sink/kafka/sink_test.go
  • downstreamadapter/sink/pulsar/helper.go
  • pkg/sink/codec/avro/encoder.go
  • pkg/sink/codec/canal/canal_json_encoder.go
  • pkg/sink/codec/canal/canal_json_encoder_test.go
  • pkg/sink/codec/canal/canal_json_txn_encoder.go
  • pkg/sink/codec/common/config.go
  • pkg/sink/codec/open/encoder.go
  • pkg/sink/codec/open/encoder_test.go
  • pkg/sink/codec/simple/encoder.go
  • pkg/sink/codec/simple/encoder_test.go
  • pkg/sink/kafka/options.go
  • pkg/sink/kafka/options_test.go
  • pkg/sink/kafka/sarama_config.go

Comment thread pkg/sink/codec/simple/encoder_test.go
@ti-chi-bot

ti-chi-bot Bot commented Jun 16, 2026

Copy link
Copy Markdown

@3AceShowHand: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-mysql-integration-heavy 56e5b48 link true /test pull-cdc-mysql-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@ti-chi-bot ti-chi-bot Bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jun 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Don't raise errors for the events larger than max-message-bytes

1 participant