[WIP]#5479
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a dedicated, configurable memory quota and region scan throttling mechanism for the log puller, separating deregister requests into a control queue to prevent blocking cleanup tasks. The review feedback suggests adding defensive checks against division-by-zero when maxPendingSize is zero, passing the subscription's stoppedCh to popControlOrWait to prevent potential deadlocks or delays when a subscription is stopped while the scan gate is closed, and adding a nil check for c.Puller to avoid a potential nil pointer dereference.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func (q *pullerMemoryQuota) ShouldPausePath( | ||
| _ bool, pathPendingSize int64, _ int64, maxPendingSize uint64, _ int64, | ||
| ) (bool, bool, float64) { | ||
| return false, false, float64(pathPendingSize) / float64(maxPendingSize) | ||
| } |
There was a problem hiding this comment.
If maxPendingSize is 0, dividing pathPendingSize by maxPendingSize will result in NaN or +Inf. It is safer to add a defensive check for maxPendingSize == 0 to avoid unexpected float values.
func (q *pullerMemoryQuota) ShouldPausePath(
_ bool, pathPendingSize int64, _ int64, maxPendingSize uint64, _ int64,
) (bool, bool, float64) {
if maxPendingSize == 0 {
return false, false, 0.0
}
return false, false, float64(pathPendingSize) / float64(maxPendingSize)
}| func (q *pullerMemoryQuota) ShouldPauseArea( | ||
| paused bool, pendingSize int64, maxPendingSize uint64, | ||
| ) (bool, bool, float64) { | ||
| usageRatio := float64(pendingSize) / float64(maxPendingSize) | ||
| q.updateRegionScanState(usageRatio, pendingSize, maxPendingSize) | ||
|
|
||
| if paused { | ||
| return false, usageRatio < 1, usageRatio | ||
| } | ||
| return usageRatio >= 1, false, usageRatio | ||
| } |
There was a problem hiding this comment.
Similarly to ShouldPausePath, if maxPendingSize is 0, usageRatio will be NaN or +Inf, which can cause comparisons like usageRatio < 1 or usageRatio >= 1 to behave unexpectedly. Adding a guard for maxPendingSize > 0 ensures robust behavior.
func (q *pullerMemoryQuota) ShouldPauseArea(
paused bool, pendingSize int64, maxPendingSize uint64,
) (bool, bool, float64) {
var usageRatio float64
if maxPendingSize > 0 {
usageRatio = float64(pendingSize) / float64(maxPendingSize)
}
q.updateRegionScanState(usageRatio, pendingSize, maxPendingSize)
if paused {
return false, usageRatio < 1, usageRatio
}
return usageRatio >= 1, false, usageRatio
}| func (c *requestCache) popControlOrWait( | ||
| ctx context.Context, resume <-chan struct{}, | ||
| ) (regionReq, bool, error) { | ||
| select { | ||
| case req := <-c.controlQueue: | ||
| return req, true, nil | ||
| case <-resume: | ||
| return regionReq{}, false, nil | ||
| case <-ctx.Done(): | ||
| return regionReq{}, false, ctx.Err() | ||
| } | ||
| } |
There was a problem hiding this comment.
To prevent potential deadlocks or unnecessary delays when a subscription is stopped while the scan gate is closed, we should pass the subscription's stoppedCh to popControlOrWait and select on it. Additionally, we should prioritize controlQueue over resume using a non-blocking check to ensure deregister requests are processed immediately when both are ready.
func (c *requestCache) popControlOrWait(
ctx context.Context, resume <-chan struct{}, stoppedCh <-chan struct{},
) (regionReq, bool, error) {
select {
case req := <-c.controlQueue:
return req, true, nil
default:
}
select {
case req := <-c.controlQueue:
return req, true, nil
case <-resume:
return regionReq{}, false, nil
case <-stoppedCh:
return regionReq{}, false, nil
case <-ctx.Done():
return regionReq{}, false, ctx.Err()
}
}| if !paused { | ||
| break | ||
| } | ||
| controlReq, ok, err := s.requestCache.popControlOrWait(ctx, resume) |
There was a problem hiding this comment.
Pass region.subscribedSpan.stoppedCh to popControlOrWait to ensure the worker immediately unblocks and exits the loop if the subscription is stopped while waiting for the scan gate to resume.
| controlReq, ok, err := s.requestCache.popControlOrWait(ctx, resume) | |
| controlReq, ok, err := s.requestCache.popControlOrWait(ctx, resume, region.subscribedSpan.stoppedCh) |
| if c.Puller.MemoryQuota == 0 { | ||
| return cerror.ErrInvalidServerOption.GenWithStackByArgs( | ||
| "debug.puller.memory-quota must be greater than 0") | ||
| } |
There was a problem hiding this comment.
If c.Puller is nil (e.g., due to an incomplete configuration file), accessing c.Puller.MemoryQuota will cause a nil pointer dereference panic during startup. Adding a nil check or initializing it with defaults prevents this.
| if c.Puller.MemoryQuota == 0 { | |
| return cerror.ErrInvalidServerOption.GenWithStackByArgs( | |
| "debug.puller.memory-quota must be greater than 0") | |
| } | |
| if c.Puller == nil { | |
| c.Puller = NewDefaultPullerConfig() | |
| } else if c.Puller.MemoryQuota == 0 { | |
| return cerror.ErrInvalidServerOption.GenWithStackByArgs( | |
| "debug.puller.memory-quota must be greater than 0") | |
| } |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note