Skip to content

add per-client concurrency limiter interceptor for gRPC clients#552

Open
k4leung4 wants to merge 3 commits into
chainguard-dev:mainfrom
k4leung4:grpc-concurrency-limiter
Open

add per-client concurrency limiter interceptor for gRPC clients#552
k4leung4 wants to merge 3 commits into
chainguard-dev:mainfrom
k4leung4:grpc-concurrency-limiter

Conversation

@k4leung4
Copy link
Copy Markdown
Contributor

@k4leung4 k4leung4 commented Mar 16, 2026

Summary

Add a new pkg/interceptors/concurrency package that provides gRPC
client interceptors to cap the number of concurrent in-flight RPCs per
connection. When the limit is reached, new calls block until a slot
opens or the context is cancelled.

Motivation

Addresses CUS-237 /
CUS-172 /
chainguard-dev/internal-dev#18988.

During INC-44, uncapped concurrency allowed individual API server
instances to exhaust the load balancer's 250 connection limit. A
per-client concurrency limiter throttles each instance's contribution
to downstream load, providing backpressure before the downstream is
overwhelmed.

Package API

limiter := concurrency.NewLimiter(100) // 0 disables

opts := []grpc.DialOption{
    grpc.WithUnaryInterceptor(limiter.UnaryClientInterceptor()),
    grpc.WithStreamInterceptor(limiter.StreamClientInterceptor()),
}

Behavior

Scenario Result
In-flight < limit Call proceeds immediately
In-flight = limit Call blocks until a slot opens
In-flight = limit + context cancelled Returns context error as gRPC status
Limit = 0 Interceptor is a no-op (passthrough)
Stream call Slot held until RecvMsg returns error/EOF or context cancelled

Stream slot lifecycle

The stream interceptor acquires a slot before stream establishment and
releases it on the first of:

  • RecvMsg returns an error (including io.EOF)
  • The stream context is cancelled (safety net for abandoned streams)

Release uses sync.Once for thread-safe exactly-once semantics. If the
stream establishment itself fails, the slot is released immediately.

A background goroutine watches stream.Context().Done() to prevent
permanent slot leaks from abandoned streams where RecvMsg is never
drained (e.g., caller gives up, context cancelled before recv).

Observability

  • limiter.InFlight() — current number of in-flight RPCs (approximate
    under contention)
  • limiter.Capacity() — configured maximum

These can be exported as Prometheus gauges by the caller if needed.

Design decisions

Semaphore over token bucket: A semaphore caps concurrent load (the
direct cause of INC-44), while a token bucket caps request rate. Under
normal conditions with fast responses, a semaphore allows high throughput.
Under degraded conditions with slow responses, it naturally throttles
because slots aren't released — exactly the behavior we want.

Opt-in per client: Not added to the default GRPCDialOptions() since
the right limit varies by service (IAM needs more headroom than the
recommender). Callers enable it explicitly when dialing.

Tests

  • TestLimiter_LimitsConcurrency — 5 concurrent calls with limit 2,
    verifies server-side serving count never exceeds 2
  • TestLimiter_RespectsContextCancellation — slot full + short context
    → DeadlineExceeded/Canceled
  • TestLimiter_DisabledWithZero — limit 0 is a no-op passthrough
  • TestLimiter_StreamReleasesSlotOnRecvError — stream holds slot,
    releases on RecvMsg error, slot is reusable for subsequent calls
  • TestLimiter_StreamReleasesSlotOnContextCancel — stream holds slot,
    context cancelled without draining RecvMsg, safety net goroutine
    releases slot, slot is reusable for subsequent calls
  • TestLimiter_Capacity — verifies Capacity() returns configured value

Follow-up in mono

After bumping go-grpc-kit, enable per-client in
api-impl/cmd/backend/main.go's configureClients():

limiter := concurrency.NewLimiter(100)
opts = append(opts, grpc.WithUnaryInterceptor(limiter.UnaryClientInterceptor()))

Test plan

  • go test ./pkg/interceptors/concurrency/ -v -race — 6/6 pass
  • golangci-lint run ./pkg/interceptors/concurrency/ — 0 issues
  • CI passes

🤖 Generated with Claude Code

Signed-off-by: Kenny Leung <kleung@chainguard.dev>
@k4leung4 k4leung4 force-pushed the grpc-concurrency-limiter branch from 1be93ca to 867dec4 Compare March 16, 2026 19:54
@k4leung4 k4leung4 marked this pull request as ready for review March 16, 2026 19:58
@k4leung4 k4leung4 requested review from cmdpdx and tcnghia March 16, 2026 19:58
Bug fix:
- Replace bool released + manual check with sync.Once in releasingStream
  to prevent double semaphore release under concurrent RecvMsg calls

Tests:
- Fix LimitsConcurrency: use server-side srv.serving counter instead of
  sampling InFlight() before the interceptor is entered (was a race)
- Add TestLimiter_StreamReleasesSlotOnRecvError: verifies stream holds
  slot, releases on RecvMsg error, and slot is reusable afterward

Docs:
- InFlight(): note value is approximate under contention
- StreamClientInterceptor: document that callers must drain RecvMsg to
  release the slot
- releasingStream: document the drain-to-EOF requirement

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@k4leung4 k4leung4 force-pushed the grpc-concurrency-limiter branch from 2ffa3c0 to c617fdb Compare March 22, 2026 21:11
Add a goroutine in newReleasingStream that watches the stream context
and releases the semaphore slot when the context is cancelled. This
prevents permanent slot leaks from abandoned streams where RecvMsg is
never drained (e.g., caller gives up, context cancelled before recv).

The slot is still released via RecvMsg on the normal path; the
goroutine is a safety net using sync.Once to ensure exactly-once
release regardless of which path fires first.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Kenny Leung <kleung@chainguard.dev>
@k4leung4 k4leung4 force-pushed the grpc-concurrency-limiter branch from c617fdb to 3a42f38 Compare March 22, 2026 21:18
Copy link
Copy Markdown

@eslerm eslerm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖: Clean implementation — buffered-channel semaphore is idiomatic, sync.Once handles dual-path release correctly, context errors properly mapped to gRPC status codes.

// StreamClientInterceptor returns a gRPC stream client interceptor that
// acquires a concurrency slot before establishing the stream. The slot is
// released when RecvMsg returns an error (including io.EOF). Callers must
// drain RecvMsg to completion to avoid leaking slots.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖: This comment says callers must drain RecvMsg to completion to avoid leaking slots — but the safety-net goroutine added in the third commit releases the slot on context cancellation too. The godoc should reflect both release paths to avoid misleading callers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants