chore: simplify, modernize (Go 1.26), update deps#202
Conversation
- connection.go: close first conn on second-dial failure in NewConnPool and redial(); close old connections on Swap (prevent conn leaks); use defer Unlock() in redial(); replace string EOF comparison with errors.Is(et.Err, io.EOF); use stderr.Join for multi-error returns - driver.go: increment listeners before launching goroutine in Resume so concurrent Stop cannot miss the stopCh signal; remove dead reconnectCh, addr and network fields; switch listeners from atomic.Uint32 to atomic.Int32 and use Add(-1) instead of the Add(^uint32(0)) trick; inline ready() helper; use var bb bytes.Buffer (stack) instead of new(bytes.Buffer); remove per-construction otel.SetTextMapPropagator global side-effect - listen.go: skip pq.Insert after AutoAck delete to prevent double-processing - item.go: use bytes.NewReader (zero-copy) instead of bytes.NewBuffer - config.go: time.Second * 1 → time.Second
|
Warning Review limit reached
More reviews will be available in 16 minutes and 20 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR is a chore-style cleanup of the beanstalk jobs driver: it bumps the module toolchain to Go 1.26, fixes several latent bugs (TCP/conn leaks in NewConnPool and redial, AutoAck double-processing, a Resume/Stop race, and EOF detection via string match), removes dead fields/helpers, and applies small modernizations (atomic.Int32, stack-allocated bytes.Buffer, bytes.NewReader, errors.Join, defer Unlock).
Changes:
- Close leaked beanstalk connections in
NewConnPoolsecond-dial failure path and inredial's failure / swap paths; useerrors.Is(..., io.EOF)instead of string compare. - Skip
pq.Insertafter AutoAck delete in the listener (avoids double-processing) and reorderResumeto incrementlistenersbefore launching the goroutine. - Cleanups: drop dead
addr/network/reconnectChfields andready()helper, switchlistenerstoatomic.Int32, drop globalotel.SetTextMapPropagatorside-effect, replacebytes.NewBufferwithbytes.NewReaderfor decode,new(bytes.Buffer)→var bb bytes.Buffer,time.Second * 1→time.Second.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| beanstalkjobs/connection.go | Fix conn leaks on dial failures, close old conns on Swap, use errors.Is(io.EOF), errors.Join, defer Unlock. |
| beanstalkjobs/driver.go | Remove dead fields/helper, switch to atomic.Int32, reorder Resume increment, drop global propagator side-effect, stack-allocate encode buffer. |
| beanstalkjobs/listen.go | After AutoAck delete, end span and continue to avoid inserting the already-acked job into the priority queue. |
| beanstalkjobs/item.go | Use bytes.NewReader instead of bytes.NewBuffer for gob decode. |
| beanstalkjobs/config.go | Simplify time.Second * 1 to time.Second. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The previous change added a continue after the AutoAck delete branch, which skipped d.pq.Insert(item). AutoAck means acknowledge/delete the job on the broker immediately (so it is not redelivered), but the job must still be handed to a worker via the priority queue. Dropping the insert silently discarded every auto-ack job. Restore the fall-through to match the canonical AMQP and SQS drivers, where AutoAck acks the message and then still inserts it into the PQ.
Applied fixes
R (bugs) — 7 applied
connection.go: close first conn when secondDialTimeoutfails inNewConnPool— fixes TCP leak (High)connection.go: closeconnTwhen secondDialTimeoutfails inredial()— fixes TCP leak (High)connection.go: close old connections whenSwapping on redial — fixes conn leak on every reconnect (Med)listen.go: skippq.Insertafter AutoAck delete — fixes double-processing of auto-acked jobs (Med)driver.go: incrementlistenersbeforego d.listeninResume— fixes race where concurrentStopmissesstopCh(Med)driver.go: remove deadreconnectChfield (allocated, never read/written) (Med)connection.go:errors.Is(et.Err, io.EOF)instead ofet.Err.Error() == "EOF"(Low)M (modern Go) — 2 applied
driver.go:atomic.Uint32→atomic.Int32forlisteners;Add(^uint32(0))→Add(-1)(Low)config.go:time.Second * 1→time.Second(Low)S (simplify) — 5 applied
driver.go: remove deadaddr/networkfields (Low)driver.go:new(bytes.Buffer)→var bb bytes.Buffer(stack allocation) (Low)driver.go: inline trivialready()helper (Low)driver.go: remove per-constructionotel.SetTextMapPropagatorglobal side-effect (Low)item.go:bytes.NewBuffer(data)→bytes.NewReader(data)(zero-copy read) (Low)connection.go: usestderr.Joinfor multi-error returns;defer Unlock()inredial()(Low)