From 0d8ad9c97e95c40d16b3697f4735975a56579e75 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 13:27:58 +0000 Subject: [PATCH] fix(go): back off before re-polling failed batch When a batch was received but finalization failed (a Nack failed, or Ack returned an error), the consumer loop re-polled immediately. The unfinished batch is redelivered by pgque.next_batch at once, so a persistent nack/ack failure (e.g. partial grants) produced a tight loop re-running every handler at full speed, and even one transient ack failure re-executed the whole batch with zero delay. Sleep pollInterval (respecting ctx cancellation) before re-polling on both the nack-failure and ack-error paths. The Ack n==0 stale/double ack case stays warning-only with no sleep. Verified red/green: the new tests counted 52k/236k Receive calls in 400 ms on the unfixed code; with the fix the count stays within the pollInterval bound. https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv --- clients/go/consumer.go | 22 ++++- clients/go/consumer_internal_test.go | 118 +++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 1 deletion(-) diff --git a/clients/go/consumer.go b/clients/go/consumer.go index a5c0fcdf..35f12db0 100644 --- a/clients/go/consumer.go +++ b/clients/go/consumer.go @@ -100,6 +100,9 @@ func (c *Consumer) dispatchWithRecover(ctx context.Context, fn HandlerFunc, msg // that PgQue redelivers it on the next Receive — losing the Nack // would otherwise mean losing the failure information for that // message. +// - When a Nack or Ack failure leaves the batch unfinished, the loop +// waits the configured poll interval before re-polling, since the +// next Receive returns the same batch and re-runs every handler. func (c *Consumer) Start(ctx context.Context) error { for { select { @@ -181,12 +184,29 @@ func (c *Consumer) Start(ctx context.Context) error { if batchID != 0 { if nackFailed { log.Printf("pgque: skipping ack for batch %d due to prior nack failures; PgQue will redeliver", batchID) + // The batch is unfinished, so the next Receive returns + // the same batch immediately and re-runs every handler. + // Wait pollInterval to avoid a tight re-poll loop. + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(c.pollInterval): + } continue } n, err := c.backend.Ack(ctx, batchID) if err != nil { log.Printf("pgque: ack error: %v", err) - } else if n == 0 { + // Same as the nack-failure case: the batch was never + // finished, so back off before re-receiving it. + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(c.pollInterval): + } + continue + } + if n == 0 { log.Printf("pgque: ack batch %d returned 0 rows — stale or double ack (batch already finished or not found)", batchID) } } diff --git a/clients/go/consumer_internal_test.go b/clients/go/consumer_internal_test.go index 4251f7f2..33699bff 100644 --- a/clients/go/consumer_internal_test.go +++ b/clients/go/consumer_internal_test.go @@ -221,6 +221,124 @@ func TestConsumer_WithRetryAfterPassesNackOption(t *testing.T) { } } +// redeliverStubBackend simulates a backend where the batch is never +// finished: Receive keeps returning the same message (as pgque.next_batch +// does for an unfinished batch), and Nack/Ack fail with the configured +// errors. It counts Receive calls so tests can detect a tight re-poll +// loop that skips the poll-interval backoff. +type redeliverStubBackend struct { + msg Message + nackErr error + ackErr error + + receiveCount int32 + nackCount int32 + ackCount int32 +} + +func (s *redeliverStubBackend) Receive(_ context.Context, _, _ string, _ int) ([]Message, error) { + atomic.AddInt32(&s.receiveCount, 1) + return []Message{s.msg}, nil +} + +func (s *redeliverStubBackend) Ack(_ context.Context, _ int64) (int64, error) { + atomic.AddInt32(&s.ackCount, 1) + if s.ackErr != nil { + return 0, s.ackErr + } + return 1, nil +} + +func (s *redeliverStubBackend) Nack(_ context.Context, _ int64, _ Message, _ NackOptions) error { + atomic.AddInt32(&s.nackCount, 1) + return s.nackErr +} + +// maxPollsWithin returns a generous upper bound on how many Receive +// calls a well-behaved poll loop can make in window when it sleeps +// pollInterval between attempts: window/pollInterval plus slack for +// scheduling jitter and the initial poll. +func maxPollsWithin(window, pollInterval time.Duration) int32 { + return int32(window/pollInterval) + 3 +} + +// TestConsumer_NackFailure_BacksOffBeforeRepoll guards against the +// tight-loop bug: when a Nack fails, the batch is left unfinished and +// the next Receive returns the same batch immediately. Without a +// poll-interval sleep on the nack-failure path the loop re-receives +// and re-runs every handler at full speed. The Consumer must wait +// pollInterval before re-polling. +func TestConsumer_NackFailure_BacksOffBeforeRepoll(t *testing.T) { + client := &Client{} + + stub := &redeliverStubBackend{ + msg: Message{ + MsgID: 1, + BatchID: 42, + Type: "no.handler.registered", + Payload: `{}`, + }, + nackErr: errors.New("simulated persistent nack failure"), + } + + pollInterval := 50 * time.Millisecond + window := 400 * time.Millisecond + + c := client.NewConsumer("dummy_queue", "dummy_consumer", + WithPollInterval(pollInterval)) + c.backend = stub + // No Handle() call — the unknown-type path Nacks under the default + // policy, the Nack fails, and the batch is redelivered forever. + + ctx, cancel := context.WithTimeout(context.Background(), window) + defer cancel() + _ = c.Start(ctx) + + got := atomic.LoadInt32(&stub.receiveCount) + if max := maxPollsWithin(window, pollInterval); got > max { + t.Fatalf("Receive called %d times in %v with pollInterval %v — "+ + "tight re-poll loop on nack failure (want <= %d)", + got, window, pollInterval, max) + } +} + +// TestConsumer_AckFailure_BacksOffBeforeRepoll is the same guard for +// the Ack-error path: an Ack failure leaves the batch unfinished, so +// re-polling without a pollInterval sleep re-executes every handler in +// the batch immediately (duplicate side effects at full speed). +func TestConsumer_AckFailure_BacksOffBeforeRepoll(t *testing.T) { + client := &Client{} + + stub := &redeliverStubBackend{ + msg: Message{ + MsgID: 1, + BatchID: 43, + Type: "ok.type", + Payload: `{}`, + }, + ackErr: errors.New("simulated persistent ack failure"), + } + + pollInterval := 50 * time.Millisecond + window := 400 * time.Millisecond + + c := client.NewConsumer("dummy_queue", "dummy_consumer", + WithPollInterval(pollInterval)) + c.backend = stub + c.Handle("ok.type", func(_ context.Context, _ Message) error { return nil }) + + ctx, cancel := context.WithTimeout(context.Background(), window) + defer cancel() + _ = c.Start(ctx) + + got := atomic.LoadInt32(&stub.receiveCount) + if max := maxPollsWithin(window, pollInterval); got > max { + t.Fatalf("Receive called %d times in %v with pollInterval %v — "+ + "tight re-poll loop on ack failure (want <= %d)", + got, window, pollInterval, max) + } +} + func TestWithRetryAfterPanicsOnNegative(t *testing.T) { defer func() { if recover() == nil {