0429 maintainer skip syncpoint#4996
Conversation
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
Skipping CI for Draft Pull Request. |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to control Sync Point events in TiCDC, primarily to mitigate performance overhead during periods of high replication lag. It adds SyncPointControl and SyncPointControlMessage to the heartbeat protocol, enabling the maintainer to broadcast skip windows to dispatchers. Corresponding logic is implemented in the DispatcherManager and BasicDispatcher to skip events within these windows, and the maintainer's Barrier is updated to support direct passing of Sync Points when lag exceeds defined thresholds. Feedback on the implementation highlighted a critical safety issue in the SyncPointControlMessageHandler, where the code would panic if multiple messages were batched by the dynstream framework; a suggestion was provided to process the most recent message instead.
| func (h *SyncPointControlMessageHandler) Handle(dispatcherManager *DispatcherManager, messages ...SyncPointControlMessage) bool { | ||
| if len(messages) != 1 { | ||
| panic("invalid message count") | ||
| } | ||
| dispatcherManager.SetSyncPointControl(messages[0].Control) | ||
| return false | ||
| } |
There was a problem hiding this comment.
The Handle method for SyncPointControlMessageHandler should not panic if the number of messages is not exactly one. The dynstream framework can batch multiple events for the same path into a single Handle call if they are queued. Panicking in production code due to unexpected batching is dangerous. Instead, you should process all messages in the batch or at least the most recent one (the last one in the slice) to ensure the dispatcher manager stays in sync with the maintainer's desired state.
func (h *SyncPointControlMessageHandler) Handle(dispatcherManager *DispatcherManager, messages ...SyncPointControlMessage) bool {
if len(messages) > 0 {
dispatcherManager.SetSyncPointControl(messages[len(messages)-1].Control)
}
return false
}
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note