logpuller: extract generic priority queue#5487
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Warning Review limit reached
More reviews will be available in 44 minutes and 13 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more credits in the billing tab to continue. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits. 🚦 How do rate limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan refill rate. For paid Pro and Pro+ PR reviews, CodeRabbit uses rolling per-developer review limits. Reviews become available again as older review attempts age out of the rolling limit window. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughRemoves the logpuller-local ChangesGeneric PriorityQueue extraction and migration
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request replaces the specialized, non-generic PriorityQueue in logpuller with a generic, thread-safe blocking priority queue under utils/priorityqueue. The subscriptionClient and related tests have been updated to use this new generic implementation. Regarding the feedback, the reviewer correctly pointed out that calling notifyOneLocked() inside TryPop is unnecessary and can be removed to prevent spurious wakeups and redundant channel operations.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
@coderabbitai review |
✅ Action performedReview finished.
|
|
/gemini summary |
✅ Action performedReview finished.
|
|
/gemini summary |
|
Warning Gemini encountered an error creating the summary. You can try again by commenting |
Summary of ChangesThis pull request refactors the priority queue used in the log service into a generic utility package. By moving this logic to Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Activity
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@utils/priorityqueue/priority_queue_test.go`:
- Around line 137-158: The issue is that require.NotNil and require.True
assertions are being called directly inside the spawned goroutines created by
wg.Go, which is unsafe because FailNow may be invoked from child goroutines
causing flaky tests. Remove the require assertions from inside both the consumer
goroutine (where require.NotNil is called on task) and the producer goroutine
(where require.True is called on q.Push result). Instead, collect any assertion
failures in shared state such as an error channel, then after wg.Wait()
completes, assert all collected errors from the main test goroutine to ensure
deterministic behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7e675952-1e67-4f8e-89de-ff5041981e3f
📒 Files selected for processing (7)
logservice/logpuller/priority_queue.gologservice/logpuller/priority_queue_test.gologservice/logpuller/priority_task_test.gologservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.goutils/priorityqueue/priority_queue.goutils/priorityqueue/priority_queue_test.go
💤 Files with no reviewable changes (2)
- logservice/logpuller/priority_queue.go
- logservice/logpuller/priority_queue_test.go
| for range numConsumers { | ||
| wg.Go(func() { | ||
| for { | ||
| task, err := q.Pop(ctx) | ||
| if err != nil { | ||
| return | ||
| } | ||
| require.NotNil(t, task) | ||
| if atomic.AddInt64(&consumedCount, 1) >= int64(totalTasks) { | ||
| cancel() | ||
| return | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| for producerID := range numProducers { | ||
| wg.Go(func() { | ||
| for j := range tasksPerProducer { | ||
| priority := producerID*tasksPerProducer + j | ||
| require.True(t, q.Push(newMockItem(priority, "task"))) | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether require/assertions are called inside goroutines in this test file.
ast-grep run --lang go \
--pattern 'go func() { $$$ require.$F($$$) $$$ }()' \
utils/priorityqueue/priority_queue_test.goRepository: pingcap/ticdc
Length of output: 151
🏁 Script executed:
# First, verify the file exists and check its size
ls -la utils/priorityqueue/priority_queue_test.go
# Read the specific lines mentioned in the review (137-158, 174-175)
echo "=== Lines 137-158 ==="
sed -n '137,158p' utils/priorityqueue/priority_queue_test.go
echo ""
echo "=== Lines 174-175 ==="
sed -n '174,175p' utils/priorityqueue/priority_queue_test.go
# Get full test function context around these lines
echo ""
echo "=== Full TestQueueConcurrentOperations function ==="
ast-grep outline utils/priorityqueue/priority_queue_test.go --match TestQueueConcurrentOperations --view expandedRepository: pingcap/ticdc
Length of output: 957
🏁 Script executed:
# Get more context around the function to verify line 174-175 situation
sed -n '120,180p' utils/priorityqueue/priority_queue_test.go | cat -n
# Also check if there's actual `require` usage in goroutines at the claimed lines
echo "=== Checking for require calls in goroutines ==="
grep -n "require\." utils/priorityqueue/priority_queue_test.go | head -20Repository: pingcap/ticdc
Length of output: 2589
Avoid require.* inside spawned goroutines
require calls like NotNil and True may invoke FailNow, which is unsafe from child goroutines and can cause flaky/non-deterministic test failures (lines 144, 157). Record failures in shared state (or an error channel) and assert from the main test goroutine instead.
Proposed fix
func TestQueueConcurrentOperations(t *testing.T) {
q := New[*mockItem]()
@@
var wg sync.WaitGroup
var consumedCount int64
+ var poppedNil atomic.Bool
+ var pushFailed atomic.Bool
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@
for range numConsumers {
wg.Go(func() {
for {
task, err := q.Pop(ctx)
if err != nil {
return
}
- require.NotNil(t, task)
+ if task == nil {
+ poppedNil.Store(true)
+ cancel()
+ return
+ }
if atomic.AddInt64(&consumedCount, 1) >= int64(totalTasks) {
cancel()
return
}
}
})
}
@@
for producerID := range numProducers {
wg.Go(func() {
for j := range tasksPerProducer {
priority := producerID*tasksPerProducer + j
- require.True(t, q.Push(newMockItem(priority, "task")))
+ if !q.Push(newMockItem(priority, "task")) {
+ pushFailed.Store(true)
+ cancel()
+ return
+ }
}
})
}
@@
require.Equal(t, int64(totalTasks), consumedCount)
+ require.False(t, poppedNil.Load())
+ require.False(t, pushFailed.Load())
}Per coding guidelines: **/*_test.go: "Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@utils/priorityqueue/priority_queue_test.go` around lines 137 - 158, The issue
is that require.NotNil and require.True assertions are being called directly
inside the spawned goroutines created by wg.Go, which is unsafe because FailNow
may be invoked from child goroutines causing flaky tests. Remove the require
assertions from inside both the consumer goroutine (where require.NotNil is
called on task) and the producer goroutine (where require.True is called on
q.Push result). Instead, collect any assertion failures in shared state such as
an error channel, then after wg.Wait() completes, assert all collected errors
from the main test goroutine to ensure deterministic behavior.
Source: Coding guidelines
|
/retest |
|
/test all |
What problem does this PR solve?
Issue Number: close #5488
What is changed and how it works?
This pull request refactors the priority queue used in the log service into a generic utility package. By moving this logic to
utils/priorityqueue, the codebase reduces duplication and provides a more robust, thread-safe implementation that can be leveraged by other components. The changes include updating existing consumers to use the new generic queue and ensuring proper error handling during queue closure.Highlights
logservice/logpullerinto a generic, reusableutils/priorityqueuepackage.Popmechanism with properErrClosedhandling.PriorityQueueimplementation inlogservice/logpullerand updated all references to use the new utility.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit