Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion clients/go/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func (c *Consumer) dispatchWithRecover(ctx context.Context, fn HandlerFunc, msg
// that PgQue redelivers it on the next Receive — losing the Nack
// would otherwise mean losing the failure information for that
// message.
// - When a Nack or Ack failure leaves the batch unfinished, the loop
// waits the configured poll interval before re-polling, since the
// next Receive returns the same batch and re-runs every handler.
func (c *Consumer) Start(ctx context.Context) error {
for {
select {
Expand Down Expand Up @@ -181,12 +184,29 @@ func (c *Consumer) Start(ctx context.Context) error {
if batchID != 0 {
if nackFailed {
log.Printf("pgque: skipping ack for batch %d due to prior nack failures; PgQue will redeliver", batchID)
// The batch is unfinished, so the next Receive returns
// the same batch immediately and re-runs every handler.
// Wait pollInterval to avoid a tight re-poll loop.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.pollInterval):
}
continue
}
n, err := c.backend.Ack(ctx, batchID)
if err != nil {
log.Printf("pgque: ack error: %v", err)
} else if n == 0 {
// Same as the nack-failure case: the batch was never
// finished, so back off before re-receiving it.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.pollInterval):
}
continue
}
if n == 0 {
log.Printf("pgque: ack batch %d returned 0 rows — stale or double ack (batch already finished or not found)", batchID)
}
}
Expand Down
118 changes: 118 additions & 0 deletions clients/go/consumer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,124 @@ func TestConsumer_WithRetryAfterPassesNackOption(t *testing.T) {
}
}

// redeliverStubBackend simulates a backend where the batch is never
// finished: Receive keeps returning the same message (as pgque.next_batch
// does for an unfinished batch), and Nack/Ack fail with the configured
// errors. It counts Receive calls so tests can detect a tight re-poll
// loop that skips the poll-interval backoff.
type redeliverStubBackend struct {
msg Message
nackErr error
ackErr error

receiveCount int32
nackCount int32
ackCount int32
}

func (s *redeliverStubBackend) Receive(_ context.Context, _, _ string, _ int) ([]Message, error) {
atomic.AddInt32(&s.receiveCount, 1)
return []Message{s.msg}, nil
}

func (s *redeliverStubBackend) Ack(_ context.Context, _ int64) (int64, error) {
atomic.AddInt32(&s.ackCount, 1)
if s.ackErr != nil {
return 0, s.ackErr
}
return 1, nil
}

func (s *redeliverStubBackend) Nack(_ context.Context, _ int64, _ Message, _ NackOptions) error {
atomic.AddInt32(&s.nackCount, 1)
return s.nackErr
}

// maxPollsWithin returns a generous upper bound on how many Receive
// calls a well-behaved poll loop can make in window when it sleeps
// pollInterval between attempts: window/pollInterval plus slack for
// scheduling jitter and the initial poll.
func maxPollsWithin(window, pollInterval time.Duration) int32 {
return int32(window/pollInterval) + 3
}

// TestConsumer_NackFailure_BacksOffBeforeRepoll guards against the
// tight-loop bug: when a Nack fails, the batch is left unfinished and
// the next Receive returns the same batch immediately. Without a
// poll-interval sleep on the nack-failure path the loop re-receives
// and re-runs every handler at full speed. The Consumer must wait
// pollInterval before re-polling.
func TestConsumer_NackFailure_BacksOffBeforeRepoll(t *testing.T) {
client := &Client{}

stub := &redeliverStubBackend{
msg: Message{
MsgID: 1,
BatchID: 42,
Type: "no.handler.registered",
Payload: `{}`,
},
nackErr: errors.New("simulated persistent nack failure"),
}

pollInterval := 50 * time.Millisecond
window := 400 * time.Millisecond

c := client.NewConsumer("dummy_queue", "dummy_consumer",
WithPollInterval(pollInterval))
c.backend = stub
// No Handle() call — the unknown-type path Nacks under the default
// policy, the Nack fails, and the batch is redelivered forever.

ctx, cancel := context.WithTimeout(context.Background(), window)
defer cancel()
_ = c.Start(ctx)

got := atomic.LoadInt32(&stub.receiveCount)
if max := maxPollsWithin(window, pollInterval); got > max {
t.Fatalf("Receive called %d times in %v with pollInterval %v — "+
"tight re-poll loop on nack failure (want <= %d)",
got, window, pollInterval, max)
}
}

// TestConsumer_AckFailure_BacksOffBeforeRepoll is the same guard for
// the Ack-error path: an Ack failure leaves the batch unfinished, so
// re-polling without a pollInterval sleep re-executes every handler in
// the batch immediately (duplicate side effects at full speed).
func TestConsumer_AckFailure_BacksOffBeforeRepoll(t *testing.T) {
client := &Client{}

stub := &redeliverStubBackend{
msg: Message{
MsgID: 1,
BatchID: 43,
Type: "ok.type",
Payload: `{}`,
},
ackErr: errors.New("simulated persistent ack failure"),
}

pollInterval := 50 * time.Millisecond
window := 400 * time.Millisecond

c := client.NewConsumer("dummy_queue", "dummy_consumer",
WithPollInterval(pollInterval))
c.backend = stub
c.Handle("ok.type", func(_ context.Context, _ Message) error { return nil })

ctx, cancel := context.WithTimeout(context.Background(), window)
defer cancel()
_ = c.Start(ctx)

got := atomic.LoadInt32(&stub.receiveCount)
if max := maxPollsWithin(window, pollInterval); got > max {
t.Fatalf("Receive called %d times in %v with pollInterval %v — "+
"tight re-poll loop on ack failure (want <= %d)",
got, window, pollInterval, max)
}
}

func TestWithRetryAfterPanicsOnNegative(t *testing.T) {
defer func() {
if recover() == nil {
Expand Down
Loading