Skip to content

consumer: revert https://github.com/pingcap/ticdc/pull/4052#5438

Open
wk989898 wants to merge 3 commits into
pingcap:masterfrom
wk989898:consumer-revert
Open

consumer: revert https://github.com/pingcap/ticdc/pull/4052#5438
wk989898 wants to merge 3 commits into
pingcap:masterfrom
wk989898:consumer-revert

Conversation

@wk989898

@wk989898 wk989898 commented Jun 18, 2026

Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: ref #4051

What is changed and how it works?

TiCDC must send commit-ts events with linear growth.
The only possibility for receiving small commi-ts with large offsets is that the event is repeated.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of out-of-order DML rows using partition progress watermarks, reducing cases where fallback rows were dropped too early.
    • Enhanced partition-table recognition for CanalJSON, Open, and Avro protocols (including CREATE TABLE ... LIKE ...) to prevent incorrect routing and missing data.
  • Refactor

    • Simplified DML/DDL flush and watermark progression behavior to make ordering more consistent and reliable.
    • Streamlined row append behavior with protocol-specific fallback/ignore rules.
  • Tests

    • Updated and added coverage for DDL partition-table routing and DML/DDL execution gating behavior.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jun 18, 2026
@wk989898

Copy link
Copy Markdown
Collaborator Author

/test all

@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

Removes the AppliedWatermark field from EventsGroup and eliminates all per-group AppliedWatermark update logic from flushDDLEvent and flushDMLEventsByWatermark in both kafka and pulsar consumer writers. Rewrites appendRow2Group to gate on progress.watermark and group.HighWatermark with protocol-specific partition-table handling, centralizes partition-table detection into helpers, and replaces tests that verified the old behavior.

Changes

Remove AppliedWatermark and rework DML fallback routing

Layer / File(s) Summary
EventsGroup: remove AppliedWatermark field and simplify Append
cmd/util/event_group.go, cmd/util/event_group_test.go
Removes the exported AppliedWatermark field from EventsGroup, inlines the equal-CommitTs merge in Append, drops the force-path same-CommitTs conditional merge/skip, removes compareTableInfo helper, and cleans up the pkg/common import.
Flush functions: drop resolvedGroups and AppliedWatermark updates
cmd/kafka-consumer/writer.go, cmd/pulsar-consumer/writer.go
flushDDLEvent and flushDMLEventsByWatermark in both writers remove the resolvedGroups map, per-group maxCommitTs bookkeeping, and the post-flush loop that advanced each group's AppliedWatermark; total is now the delta of resolvedEvents length.
appendRow2Group: rewrite fallback and ignore logic
cmd/kafka-consumer/writer.go, cmd/pulsar-consumer/writer.go
Early ignore when commitTs < progress.watermark; normal append when commitTs >= group.HighWatermark; force-append when enableTableAcrossNodes is set; protocol-specific rules (Simple/Debezium ignore; CanalJSON/Open/Avro append only for partition tables via partitionTableAccessor). Removes prior AppliedWatermark-based replay-ignore and old forceInsert condition.
DDL: centralize partition-table detection and registration
cmd/kafka-consumer/writer.go, cmd/pulsar-consumer/writer.go
Both onDDL functions add markPartitionTableFromDDL helper to mark tables directly from ddl.TableInfo.IsPartitionTable(), and addPartitionTable to validate and register partition tables. CreateTable short-circuits through direct marking before parsing SQL; RenameTable preserves prior extra-table handling and adds direct marking attempt.
Test updates: import alias, intermediate assertions, removed tests
cmd/kafka-consumer/writer_test.go, cmd/pulsar-consumer/writer_test.go
Removes TestAppendRow2Group_DoesNotDropCommitTsFallbackBeforeApplied from both files; kafka tests update import alias from codecCommon to codeccommon and add intermediate assertions checking ddls is empty and ddlList length before watermark advancement; replaced tests validate partition-table marking via onDDL with CREATE TABLE ... LIKE ...; unused imports are removed.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Suggested reviewers

  • asddongmen
  • 3AceShowHand

Poem

🐇 A watermark once applied to each group,
Now swept away — no need for that loop!
The progress.watermark guards the gate,
Protocol rules decide each row's fate.
Simpler paths hop forward, light and free,
Partition tables marked with DDL decree! 🌊

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ⚠️ Warning The PR description is incomplete and uses boilerplate template text without substantive information about changes, issue linking, or testing details. Complete the description by: (1) linking the issue with 'Issue Number: close #xxxx' instead of 'ref #4051', (2) providing detailed explanation of what changed and how, (3) specifying which tests were added/modified, (4) addressing performance/compatibility implications, and (5) including a release note.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: reverting PR #4052 in the consumer component. It is concise and directly related to the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Jun 18, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request simplifies the DML event handling and watermark tracking in both the Kafka and Pulsar consumers. It removes the AppliedWatermark tracking and resolvedGroups logic from the writers and EventsGroup, and refactors appendRow2Group to handle fallback events based on the protocol type. Additionally, corresponding tests for the removed fallback merging logic have been cleaned up. The review feedback highlights two improvement opportunities in cmd/pulsar-consumer/writer.go: changing a high-frequency log from Info to Debug level to prevent log flooding, and removing a redundant logging entry for progress.watermark.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

}
if commitTs >= group.HighWatermark {
group.Append(dml, false)
log.Info("DML event append to the group",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Logging every DML event append at Info level can cause severe log flooding and performance issues under high throughput. It should be logged at Debug level, consistent with the Kafka consumer implementation.

Suggested change
log.Info("DML event append to the group",
log.Debug("DML event append to the group",

}
log.Warn("DML event fallback row, since less than the group high watermark, ignore it",
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.Any("partitionWatermark", progress.watermark), zap.Any("watermark", progress.watermark),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The progress.watermark is logged twice redundantly under different keys (partitionWatermark and watermark). This appears to be a copy-paste error from the Kafka consumer where watermarkOffset was logged. We should simplify this to log it only once, and use zap.Uint64 for better type safety.

Suggested change
zap.Any("partitionWatermark", progress.watermark), zap.Any("watermark", progress.watermark),
zap.Uint64("partitionWatermark", progress.watermark),

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
cmd/kafka-consumer/writer.go (1)

560-624: 💤 Low value

Stale comment mentions "open protocol" in Simple/Debezium case block.

The comment on lines 589-592 mentions "open protocol set the partition table id..." but this case block only handles ProtocolSimple and ProtocolDebezium. The open protocol handling is in the next case block (lines 601-620). This is misleading and should be updated.

📝 Suggested comment fix
 	case config.ProtocolSimple, config.ProtocolDebezium:
-		// simple protocol set the table id for all row message, it can be known which table the row message belongs to,
-		// also consider the table partition.
-		// open protocol set the partition table id if the table is partitioned.
-		// for normal table, the table id is generated by the fake table id generator by using schema and table name.
-		// so one event group for one normal table or one table partition, replayed messages can be ignored.
+		// Simple and Debezium protocols include the table ID for all row messages, making it possible
+		// to identify which table the row belongs to. Since each event group corresponds to one table
+		// (or one table partition), replayed messages with commitTs below HighWatermark can be safely ignored.
 		log.Warn("DML event fallback row, since less than the group high watermark, ignore it",
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cmd/kafka-consumer/writer.go` around lines 560 - 624, The comment block in
the case config.ProtocolSimple, config.ProtocolDebezium branch incorrectly
mentions open protocol behavior, which is actually handled in the separate case
config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro branch.
Update the comment to only describe the Simple and Debezium protocol behavior
without referencing open protocol, and remove the part about fake table id
generation and how open protocol assigns partition table ids since those details
belong in the other case block.
cmd/pulsar-consumer/writer.go (1)

490-510: Pulsar sink intentionally supports only CanalJSON protocol, which is enforced by validation before the writer is initialized.

The protocol validation in options.go (lines 88-91) explicitly panics if any protocol other than ProtocolCanalJSON is used, preventing unsupported protocols from reaching the writer. The switch statement at lines 490-510 is unreachable defensive code.

However, line 115 contains unnecessary code: the comparison o.protocol == config.ProtocolAvro always evaluates to false since Avro is not a supported protocol for Pulsar. This dead code should be simplified to pass false directly to NewEventRouter.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cmd/pulsar-consumer/writer.go` around lines 490 - 510, Since the Pulsar sink
only supports ProtocolCanalJSON and this is enforced by validation in options.go
before the writer is initialized, the switch statement in the writer is
unreachable defensive code that should be simplified. Additionally, remove the
dead code comparison at line 115 where o.protocol is compared to
config.ProtocolAvro (which always evaluates to false since Avro is not a
supported protocol for Pulsar) and instead pass false directly to the
NewEventRouter call. The switch statement and protocol check create unnecessary
complexity that contradicts the enforced protocol restrictions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@cmd/pulsar-consumer/writer.go`:
- Around line 474-481: The log level for the DML event append operation is
inconsistent with the equivalent Kafka writer implementation. Change the
log.Info call for the "DML event append to the group" message to log.Debug to
match the log level used in the Kafka writer and prevent excessive log volume in
production during high-throughput scenarios.

---

Nitpick comments:
In `@cmd/kafka-consumer/writer.go`:
- Around line 560-624: The comment block in the case config.ProtocolSimple,
config.ProtocolDebezium branch incorrectly mentions open protocol behavior,
which is actually handled in the separate case config.ProtocolCanalJSON,
config.ProtocolOpen, config.ProtocolAvro branch. Update the comment to only
describe the Simple and Debezium protocol behavior without referencing open
protocol, and remove the part about fake table id generation and how open
protocol assigns partition table ids since those details belong in the other
case block.

In `@cmd/pulsar-consumer/writer.go`:
- Around line 490-510: Since the Pulsar sink only supports ProtocolCanalJSON and
this is enforced by validation in options.go before the writer is initialized,
the switch statement in the writer is unreachable defensive code that should be
simplified. Additionally, remove the dead code comparison at line 115 where
o.protocol is compared to config.ProtocolAvro (which always evaluates to false
since Avro is not a supported protocol for Pulsar) and instead pass false
directly to the NewEventRouter call. The switch statement and protocol check
create unnecessary complexity that contradicts the enforced protocol
restrictions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e3c9de01-fe8b-49be-ae5f-a28117c48606

📥 Commits

Reviewing files that changed from the base of the PR and between 640d3d3 and e12b1e4.

📒 Files selected for processing (6)
  • cmd/kafka-consumer/writer.go
  • cmd/kafka-consumer/writer_test.go
  • cmd/pulsar-consumer/writer.go
  • cmd/pulsar-consumer/writer_test.go
  • cmd/util/event_group.go
  • cmd/util/event_group_test.go
💤 Files with no reviewable changes (2)
  • cmd/util/event_group_test.go
  • cmd/pulsar-consumer/writer_test.go

Comment on lines +474 to 481
if commitTs >= group.HighWatermark {
group.Append(dml, false)
log.Info("DML event append to the group",
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]))
return
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Log level inconsistency with Kafka writer.

The normal append path uses log.Info here (line 476), while the equivalent code in the Kafka writer uses log.Debug (line 570). This could cause excessive log volume in production for high-throughput scenarios.

📝 Suggested fix
 	if commitTs >= group.HighWatermark {
 		group.Append(dml, false)
-		log.Info("DML event append to the group",
+		log.Debug("DML event append to the group",
 			zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
 			zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
 			zap.Stringer("eventType", dml.RowTypes[0]))
 		return
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if commitTs >= group.HighWatermark {
group.Append(dml, false)
log.Info("DML event append to the group",
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]))
return
}
if commitTs >= group.HighWatermark {
group.Append(dml, false)
log.Debug("DML event append to the group",
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]))
return
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cmd/pulsar-consumer/writer.go` around lines 474 - 481, The log level for the
DML event append operation is inconsistent with the equivalent Kafka writer
implementation. Change the log.Info call for the "DML event append to the group"
message to log.Debug to match the log level used in the Kafka writer and prevent
excessive log volume in production during high-throughput scenarios.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot ti-chi-bot Bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jun 18, 2026
@wk989898

Copy link
Copy Markdown
Collaborator Author

/test pull-cdc-kafka-integration-light

@ti-chi-bot ti-chi-bot Bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Jun 22, 2026
@ti-chi-bot ti-chi-bot Bot added lgtm and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels Jun 22, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 22, 2026

Copy link
Copy Markdown

[LGTM Timeline notifier]

Timeline:

  • 2026-06-22 08:24:34.227230423 +0000 UTC m=+1985175.297547813: ☑️ agreed by 3AceShowHand.
  • 2026-06-22 08:40:48.352067048 +0000 UTC m=+1986149.422384428: ☑️ agreed by wlwilliamx.

@ti-chi-bot

ti-chi-bot Bot commented Jun 23, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: 3AceShowHand, lidezhu, wlwilliamx

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [3AceShowHand,lidezhu,wlwilliamx]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@wk989898

Copy link
Copy Markdown
Collaborator Author

/retest

1 similar comment
@wk989898

Copy link
Copy Markdown
Collaborator Author

/retest

@ti-chi-bot

ti-chi-bot Bot commented Jun 23, 2026

Copy link
Copy Markdown

@wk989898: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-kafka-integration-light 3aa7db7 link unknown /test pull-cdc-kafka-integration-light

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants