eventstore: fix checkpoint update race#4987
eventstore: fix checkpoint update race#4987lidezhu wants to merge 3 commits intoldz/optimize-event-store0506from
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request transitions checkpointTs to an atomic type within the dispatcherStat struct and refactors checkpoint update logic to use CompareAndSwap for better concurrency control. Additionally, it replaces bytes.Compare with common.StartCompare and common.EndCompare for key range checks and updates the event store iterator's boundary conditions. Feedback suggests using a monotonic increase utility for updating the dispatcher's checkpoint to prevent potential regressions from stale updates.
| return | ||
| } | ||
| dispatcherStat.checkpointTs = checkpointTs | ||
| dispatcherStat.checkpointTs.Store(checkpointTs) |
There was a problem hiding this comment.
To ensure that the dispatcher's checkpoint timestamp only moves forward and to handle potential concurrent updates safely, it is better to use util.CompareAndMonotonicIncrease instead of a direct Store. This prevents a stale update from regressing the checkpoint, which could incorrectly affect the minimum checkpoint calculation for the shared subscription.
| dispatcherStat.checkpointTs.Store(checkpointTs) | |
| util.CompareAndMonotonicIncrease(&dispatcherStat.checkpointTs, checkpointTs) |
What problem does this PR solve?
Issue Number: close #4992
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note