eventcollector: decouple dispatcher session from dispatcher stat#5007
eventcollector: decouple dispatcher session from dispatcher stat#5007lidezhu wants to merge 4 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Warning Rate limit exceeded
To continue reviewing without waiting, purchase usage credits in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughDispatcher session is refactored to eliminate its dependency on ChangesDispatcher Session Dependency Injection
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the dispatcher management and heartbeat mechanism by introducing a dispatcherSession to handle event service interactions and implementing epoch-based progress tracking. Key changes include the addition of a background heartbeat sender in the EventCollector and updates to the wire format to include epoch information, ensuring stale progress updates are ignored. Review feedback identifies a critical missing length validation in the Unmarshal method for DispatcherProgressLegacy that could lead to panics, and notes a design concern regarding stale heartbeat responses in the session management logic.
| func (dp *DispatcherProgressLegacy) Unmarshal(data []byte) error { | ||
| buf := bytes.NewBuffer(data) | ||
| dp.DispatcherID.Unmarshal(buf.Next(dp.DispatcherID.GetSize())) | ||
| dp.CheckpointTs = binary.BigEndian.Uint64(buf.Next(8)) |
There was a problem hiding this comment.
The Unmarshal method does not validate the length of the input data before calling buf.Next(). If data is shorter than expected, buf.Next() will return a shorter slice, and binary.BigEndian.Uint64 will panic. This is a high-severity issue as it can lead to service crashes with malformed input.
func (dp *DispatcherProgressLegacy) Unmarshal(data []byte) error {
if len(data) < dp.DispatcherID.GetSize()+8 {
return fmt.Errorf("data too short")
}
dp.DispatcherID.Unmarshal(data[:dp.DispatcherID.GetSize()])
dp.CheckpointTs = binary.BigEndian.Uint64(data[dp.DispatcherID.GetSize():dp.DispatcherID.GetSize()+8])
return nil
}| // TODO: this design is bad because we may receive stale heartbeat response, | ||
| // which make us call clear and register again. But the register may be ignore, | ||
| // so we will not receive any ready event. |
2d1ecd2 to
fe27b6e
Compare
fe27b6e to
298ba37
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@downstreamadapter/eventcollector/dispatcher_session.go`:
- Around line 292-299: When handleNotReusableEvent in dispatcherSession observes
no replacement candidate (connState.getNextRemoteCandidate() returns empty),
clear the failed remote binding so the session no longer appears attached;
specifically, if candidate == "" then reset the remote binding (e.g., clear
connState.eventServiceID or call the connState clear/unbind helper) before
returning so trySetRemoteCandidates can accept new candidates later; keep the
existing registerTo(path) behavior when a candidate exists and optionally log
the clear action for debugging.
- Around line 160-179: commitReady and reset currently call doReset with
s.target.GetCheckpointTs(), which can advance reset epochs past
collector-observed progress and violate dispatcherEpochState.maxEventTs; change
both commitReady and reset to compute resetTs via the safe, capped progress (use
the dispatcher state helper, e.g. s.state.getSafeResetTs() or the equivalent
getSafeResetTs method wired from dispatcherStat used for EventService
heartbeats) and pass that value into doReset(serverID, resetTs), leaving doReset
unchanged; update imports/struct wiring if needed to expose getSafeResetTs on
s.state so commitReady/reset use the capped progress instead of
s.target.GetCheckpointTs().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f5703def-593e-412f-ab4e-7da1b9f8898c
📒 Files selected for processing (3)
downstreamadapter/eventcollector/dispatcher_session.godownstreamadapter/eventcollector/dispatcher_stat.godownstreamadapter/eventcollector/dispatcher_stat_test.go
| // commitReady is used to notify the event service to start sending events. | ||
| func (s *dispatcherSession) commitReady(serverID node.ID) { | ||
| s.doReset(serverID, s.owner.getResetTs()) | ||
| s.doReset(serverID, s.target.GetCheckpointTs()) | ||
| } | ||
|
|
||
| // reset is used to reset the dispatcher to the specified commitTs, | ||
| // it will remove the dispatcher from the dynamic stream and add it back. | ||
| func (s *dispatcherSession) reset(serverID node.ID) { | ||
| s.doReset(serverID, s.owner.getResetTs()) | ||
| s.doReset(serverID, s.target.GetCheckpointTs()) | ||
| } | ||
|
|
||
| func (s *dispatcherSession) doReset(serverID node.ID, resetTs uint64) { | ||
| var epoch uint64 | ||
| for { | ||
| currentState := s.owner.loadCurrentEpochState() | ||
| nextState := newDispatcherEpochState(currentState.epoch+1, 0, resetTs) | ||
| if s.owner.currentEpoch.CompareAndSwap(currentState, nextState) { | ||
| epoch = nextState.epoch | ||
| break | ||
| } | ||
| } | ||
| resetRequest := s.owner.newDispatcherResetRequest( | ||
| s.owner.eventCollector.getLocalServerID().String(), | ||
| epoch := s.nextResetEpoch(resetTs) | ||
| resetRequest := s.newDispatcherResetRequest( | ||
| s.localServerID.String(), | ||
| resetTs, | ||
| epoch, | ||
| ) | ||
| msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, resetRequest) | ||
| s.owner.eventCollector.enqueueMessageForSend(msg) | ||
| s.sendMessage(msg) |
There was a problem hiding this comment.
Clamp reset ts to collector-observed progress, not raw sink checkpoint.
Lines 162 and 168 use s.target.GetCheckpointTs() directly. That breaks the safety invariant documented in dispatcherEpochState.maxEventTs: old in-flight events can advance the sink checkpoint after a reset, even when the collector has not accepted that progress in the new epoch yet. If another commitReady/reset happens in that window, this code will start the next epoch too far ahead and can permanently skip data.
Possible fix shape
type dispatcherSession struct {
target dispatcher.DispatcherService
localServerID node.ID
sendMessage func(*messaging.TargetMessage)
+ getSafeResetTs func() uint64
nextResetEpoch func(resetTs uint64) uint64
readyCallback func()
}
-func newDispatcherSession(
+func newDispatcherSession(
target dispatcher.DispatcherService,
localServerID node.ID,
sendMessage func(*messaging.TargetMessage),
+ getSafeResetTs func() uint64,
nextResetEpoch func(resetTs uint64) uint64,
readyCallback func(),
) *dispatcherSession {
return &dispatcherSession{
target: target,
localServerID: localServerID,
sendMessage: sendMessage,
+ getSafeResetTs: getSafeResetTs,
nextResetEpoch: nextResetEpoch,
readyCallback: readyCallback,
}
}
func (s *dispatcherSession) commitReady(serverID node.ID) {
- s.doReset(serverID, s.target.GetCheckpointTs())
+ s.doReset(serverID, s.getSafeResetTs())
}
func (s *dispatcherSession) reset(serverID node.ID) {
- s.doReset(serverID, s.target.GetCheckpointTs())
+ s.doReset(serverID, s.getSafeResetTs())
}Wire getSafeResetTs from dispatcherStat using the same capped progress used for EventService heartbeats.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/eventcollector/dispatcher_session.go` around lines 160 -
179, commitReady and reset currently call doReset with
s.target.GetCheckpointTs(), which can advance reset epochs past
collector-observed progress and violate dispatcherEpochState.maxEventTs; change
both commitReady and reset to compute resetTs via the safe, capped progress (use
the dispatcher state helper, e.g. s.state.getSafeResetTs() or the equivalent
getSafeResetTs method wired from dispatcherStat used for EventService
heartbeats) and pass that value into doReset(serverID, resetTs), leaving doReset
unchanged; update imports/struct wiring if needed to expose getSafeResetTs on
s.state so commitReady/reset use the capped progress instead of
s.target.GetCheckpointTs().
| func (s *dispatcherSession) handleNotReusableEvent(event dispatcher.DispatcherEvent) { | ||
| if *event.From == s.localServerID { | ||
| log.Panic("should not happen: local event service should not send not reusable event") | ||
| } | ||
| candidate := s.connState.getNextRemoteCandidate() | ||
| if candidate != "" { | ||
| s.registerTo(candidate) | ||
| } |
There was a problem hiding this comment.
Clear the failed remote binding when no replacement candidate exists.
If getNextRemoteCandidate() returns empty here, connState.eventServiceID still points at the rejected remote. After that, trySetRemoteCandidates() will refuse any later candidate list because the session still looks attached, so this dispatcher can get stuck until some unrelated clear path runs.
Minimal fix
func (s *dispatcherSession) handleNotReusableEvent(event dispatcher.DispatcherEvent) {
if *event.From == s.localServerID {
log.Panic("should not happen: local event service should not send not reusable event")
}
candidate := s.connState.getNextRemoteCandidate()
- if candidate != "" {
- s.registerTo(candidate)
- }
+ if candidate == "" {
+ s.connState.setEventServiceID("")
+ return
+ }
+ s.registerTo(candidate)
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/eventcollector/dispatcher_session.go` around lines 292 -
299, When handleNotReusableEvent in dispatcherSession observes no replacement
candidate (connState.getNextRemoteCandidate() returns empty), clear the failed
remote binding so the session no longer appears attached; specifically, if
candidate == "" then reset the remote binding (e.g., clear
connState.eventServiceID or call the connState clear/unbind helper) before
returning so trySetRemoteCandidates can accept new candidates later; keep the
existing registerTo(path) behavior when a candidate exists and optionally log
the clear action for debugging.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
@lidezhu: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Refactor
Tests