Pool memqueue producer response channels via sync.Pool#49337
Pool memqueue producer response channels via sync.Pool#49337
Conversation
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughIntroduces a package-level sync.Pool that provides reusable buffered chan queue.EntryID channels. Producers (forgetfulProducer and ackProducer) obtain response channels from the pool instead of allocating per request. openState.publish and openState.tryPublish defer returning non-nil req.resp to the pool, relying on existing draining behavior. Channel lifecycle and reuse are centralized around the open-state publish paths. No public APIs or exported signatures were changed. 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libbeat/publisher/queue/memqueue/produce.go`:
- Around line 151-152: The publish function currently defers
putRespChan(req.resp) immediately, which can return the response channel to the
pool while the queue goroutine may still send to it (race between st.events <-
req and handlePendingResponse), causing a pooled channel to receive a stale
EntryID; remove the early defer in openState.publish and instead call
putRespChan(req.resp) only on the error/early-return paths where the request is
not enqueued (i.e., when you know the queue goroutine will not send), and let
the consumer of the request (the queue goroutine/handler) be responsible for
returning req.resp to the pool after it has sent or closed the response; update
paths that call handlePendingResponse and any return branches to avoid returning
the channel to the pool when st.events <- req succeeded.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8403320f-eba1-45f2-99e6-674c31af592e
📒 Files selected for processing (1)
libbeat/publisher/queue/memqueue/produce.go
|
/test |
|
Profiling / Benchmarking confirm that the allocation is gone. In our Filebeat perf benchmark of about 10m events, it decreases total allocations by a little over 1%. https://buildkite.com/elastic/filebeat-benchmark/builds/2282
|
|
/test |
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
e0bb981 to
c7de87a
Compare
PR elastic#49337 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
This pull request is now in conflicts. Could you fix it? 🙏 |
Rebase onto current main (post queue-generics refactor #49954) and adapt the respChanPool optimization to the new generic types. No logic change: respChanPool.Get/Put wraps chan queue.EntryID exactly as before; makePushRequest on both forgetfulProducer[T] and ackProducer[T] now calls getRespChan() and publish/tryPublish defer respChanPool.Put. Benchmarks confirm: 10009 → 7 allocs/op on BenchmarkProducerThroughput (Apple M2 Pro, arm64). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
c7de87a to
c18400f
Compare
|
Rebased onto current main to resolve the merge conflict from the queue generics refactor (#49954). The pool optimization required adapting all type references to use |
Two tests and one benchmark addition: - TestRespChanAlwaysEmptyOnAcquire: verifies the safety invariant that channels returned to the pool by a completed publish are always empty when subsequently acquired. A stale buffered EntryID would corrupt the return value of the next publish that reuses the channel. - TestRespChanPoolNoAllocs: verifies 0 heap allocs per getRespChan/Put cycle after warmup, confirming channels are genuinely reused. GC is disabled during measurement (debug.SetGCPercent(-1)) to make the result deterministic. - BenchmarkProducerThroughput: add b.ReportAllocs() to surface the alloc reduction in CI output. With pool: 6-7 allocs/op for 10,000 events; without pool: ~10,009 allocs/op. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Added tests for the pool optimization:
|
The previous two tests were testing pool mechanics in isolation (raw Get/Put and empty-channel checks) rather than the actual publish path. TestRespChanAlwaysEmptyOnAcquire in particular was unreliable: getRespChan() after a publish might return a freshly allocated channel via New rather than a recycled one, making it trivially pass even with a broken pool. Replace both with TestPublishPoolNoAllocsInSteadyState, which runs testing.AllocsPerRun on the real p.Publish path (with a live queue and background consumer). This directly validates the PR's claim: with the pool warm and GC disabled, Publish allocates 0 channels per op. Without the pool, this would be 1 alloc/op. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
TL;DR
Remediation
Investigation detailsRoot CauseThe Evidence
Validation
Follow-up
What is this? | From workflow: PR Actions Detective Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
Summary
Pool and reuse
chan queue.EntryIDresponse channels in the memqueue producer withsync.Pool, matching the existingbatchPoolpattern inbroker.go. This eliminates the per-publish channel allocation that dominated the allocation profile (~96% of alloc_space onBenchmarkProducerThroughput).Changes
respChanPool(sync.Pool) producingmake(chan queue.EntryID, 1)makePushRequestacquires from pool viarespChanPool.Get().(chan queue.EntryID)publishandtryPublishreturn channels viadefer respChanPool.Put(req.resp)— safe becausehandlePendingResponsefully drains the channel before returningBenchmark
Test plan
🤖 Generated with Claude Code