Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 91 additions & 27 deletions pkg/settings/limits/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,23 @@ func (l *resourcePoolLimiter[N]) getLimit(ctx context.Context) (limit N) {
return limit
}

// waiter represents a goroutine waiting for resources in the FIFO queue.
type waiter[N Number] struct {
amount N
ready chan struct{} // closed when resources are granted
}

type resourcePoolUsage[N Number] struct {
*resourcePoolLimiter[N]
scope settings.Scope // optional
tenant string // optional
mu sync.Mutex
cond sync.Cond
used N
// queue holds waiters in FIFO order; head of slice is first to be serviced
queue []*waiter[N]
// onEnqueue is an optional callback invoked (under lock) when a waiter is added to the queue.
// Used for testing to synchronize without sleeps.
onEnqueue func()

recordUsage func(context.Context, N)
recordLimit func(context.Context, N)
Expand All @@ -165,6 +175,7 @@ type resourcePoolUsage[N Number] struct {
func (l *resourcePoolLimiter[N]) newLimitUsage(opts ...metric.RecordOption) *resourcePoolUsage[N] {
u := resourcePoolUsage[N]{
resourcePoolLimiter: l,
queue: make([]*waiter[N], 0),
stopCh: make(services.StopChan),
done: make(chan struct{}),
recordUsage: func(ctx context.Context, n N) {
Expand Down Expand Up @@ -193,7 +204,6 @@ func (l *resourcePoolLimiter[N]) newLimitUsage(opts ...metric.RecordOption) *res
}
},
}
u.cond.L = &u.mu
return &u
}

Expand All @@ -207,9 +217,25 @@ func (u *resourcePoolUsage[N]) free(amount N) {
defer cancel()
u.recordUsage(ctx, u.used)

u.cond.Broadcast() // notify others blocked on cond.Wait
u.tryWakeWaiters()
}

return
// tryWakeWaiters attempts to wake waiters at the head of the queue
// whose resource requests can now be satisfied.
// Must be called with u.mu held.
func (u *resourcePoolUsage[N]) tryWakeWaiters() {
for len(u.queue) > 0 {
head := u.queue[0]
limit := u.getLimit(context.Background())
if u.used+head.amount > limit {
// Not enough resources for the head waiter; stop here to preserve FIFO
break
}
// Grant resources to head waiter
u.used += head.amount
close(head.ready)
u.queue = u.queue[1:]
}
}

func (u *resourcePoolUsage[N]) newErrorLimitReached(limit, amount N) ErrorResourceLimited[N] {
Expand Down Expand Up @@ -241,7 +267,6 @@ func (u *resourcePoolUsage[N]) available(ctx context.Context) (N, error) {
return limit - u.used, nil
}

// opt: queue instead of racing for the [sync.Mutex] & [sync.Cond]
func (u *resourcePoolUsage[N]) use(ctx context.Context, amount N, block bool) error {
limit, err := u.get(ctx)
if err != nil {
Expand All @@ -250,34 +275,73 @@ func (u *resourcePoolUsage[N]) use(ctx context.Context, amount N, block bool) er

start := time.Now()
u.mu.Lock()
defer u.mu.Unlock()

if u.used+amount > limit {
if !block {
// Fast path: resources available immediately and no one else waiting
if len(u.queue) == 0 && u.used+amount <= limit {
u.used += amount
u.recordUsage(ctx, u.used)
u.recordAmount(ctx, amount)
u.recordBlockTime(ctx, time.Since(start).Seconds())
u.mu.Unlock()
return nil
}

// Not enough resources
if !block {
u.recordDenied(ctx, amount)
err := u.newErrorLimitReached(limit, amount)
u.mu.Unlock()
return err
}

// Slow path: need to queue up and wait (FIFO ordering)
w := &waiter[N]{
amount: amount,
ready: make(chan struct{}),
}
u.queue = append(u.queue, w)
if u.onEnqueue != nil {
u.onEnqueue()
}
u.mu.Unlock()

// Wait for our turn or context cancellation
select {
case <-w.ready:
// Resources have been granted to us
u.mu.Lock()
u.recordUsage(ctx, u.used)
u.recordAmount(ctx, amount)
u.recordBlockTime(ctx, time.Since(start).Seconds())
u.mu.Unlock()
return nil
case <-ctx.Done():
// Context cancelled - remove ourselves from queue
u.mu.Lock()
defer u.mu.Unlock()

// Check if we were already granted resources while acquiring the lock
select {
case <-w.ready:
// We got resources just as we were cancelling; return them
u.used -= amount
u.tryWakeWaiters()
u.recordDenied(ctx, amount)
return u.newErrorLimitReached(limit, amount)
return fmt.Errorf("context error (%w) after waiting %s for limit: %w", ctx.Err(), time.Since(start), u.newErrorLimitReached(limit, amount))
default:
}
// Ensure cond.Wait() yields to context expiration
stop := context.AfterFunc(ctx, func() {
u.mu.Lock()
defer u.mu.Unlock()
u.cond.Broadcast()
})
defer stop()
start := time.Now()
for u.used+amount > limit {
u.cond.Wait() // wait until some resources are freed, or context expiration
if err := ctx.Err(); err != nil {
u.recordDenied(ctx, amount)
return fmt.Errorf("context error (%w) after waiting %s for limit: %w", err, time.Since(start), u.newErrorLimitReached(limit, amount))

// Remove from queue. Only needed when the context was cancelled before the element got to the head of the queue.
// Otherwise it is already removed by tryWakeWaiters().
for i, waiter := range u.queue {
if waiter == w {
u.queue = append(u.queue[:i], u.queue[i+1:]...)
break
}
}
u.recordDenied(ctx, amount)
return fmt.Errorf("context error (%w) after waiting %s for limit: %w", ctx.Err(), time.Since(start), u.newErrorLimitReached(limit, amount))
}
u.used += amount
u.recordUsage(ctx, u.used)
u.recordAmount(ctx, amount)
u.recordBlockTime(ctx, time.Since(start).Seconds())
return nil
}

func (u *resourcePoolUsage[N]) wait(ctx context.Context, amount N) (free func(), err error) {
Expand Down
174 changes: 174 additions & 0 deletions pkg/settings/limits/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,177 @@ func Test_newScopedResourcePoolLimiterFromFactory(t *testing.T) {
})
require.Error(t, l.Use(ctx2, 1))
}

// TestResourcePoolLimiter_WaitOrderPreserved confirms that ResourcePoolLimiter
// preserves FIFO ordering when multiple goroutines are waiting.
func TestResourcePoolLimiter_WaitOrderPreserved(t *testing.T) {
const numWaiters = 10

ctx := context.Background()
limiter := newUnscopedResourcePoolLimiter(1)

// Channel to signal when each waiter has been enqueued
enqueued := make(chan struct{}, numWaiters)
limiter.resourcePoolUsage.setOnEnqueue(func() {
enqueued <- struct{}{}
})

// Acquire the single resource first
free, err := limiter.Wait(ctx, 1)
require.NoError(t, err)

// Track the order in which waiters acquired resources
acquiredOrder := make(chan int, numWaiters)

// Start multiple waiters sequentially, waiting for each to be enqueued before starting the next
for i := range numWaiters {
waiterID := i
go func() {
f, err := limiter.Wait(ctx, 1)
if err != nil {
return
}
acquiredOrder <- waiterID
f()
}()
// Wait for this waiter to be enqueued before starting the next
<-enqueued
}

// All waiters are now in the queue in order 0, 1, 2, ... numWaiters-1
// Release the resource - this should wake up waiters in FIFO order
free()

// Collect the order in which waiters acquired the resource
acquired := make([]int, 0, numWaiters)
for range numWaiters {
select {
case id := <-acquiredOrder:
acquired = append(acquired, id)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for waiter to acquire resource")
}
}

// Verify FIFO order is preserved
for i, id := range acquired {
require.Equalf(t, i, id, "expected waiter %d at position %d, got %d (acquired order: %v)", i, i, id, acquired)
}
}

// TestResourcePoolLimiter_ContextCancellation tests that context cancellation
// properly removes waiters from the queue without breaking FIFO order.
func TestResourcePoolLimiter_ContextCancellation(t *testing.T) {
ctx := context.Background()
limiter := newUnscopedResourcePoolLimiter(1)

// Channel to signal when each waiter has been enqueued
enqueued := make(chan struct{}, 5)
limiter.resourcePoolUsage.setOnEnqueue(func() {
enqueued <- struct{}{}
})

// Acquire the single resource
free, err := limiter.Wait(ctx, 1)
require.NoError(t, err)

// Start 5 waiters, but cancel the middle one
results := make(chan struct {
id int
err error
}, 5)

var ctxs []context.Context
var cancels []context.CancelFunc
for range 5 {
c, cancel := context.WithCancel(ctx)
ctxs = append(ctxs, c)
cancels = append(cancels, cancel)
}

// Start waiters sequentially, waiting for each to be enqueued
for i := range 5 {
waiterID := i
go func() {
f, err := limiter.Wait(ctxs[waiterID], 1)
if err != nil {
results <- struct {
id int
err error
}{waiterID, err}
return
}
results <- struct {
id int
err error
}{waiterID, nil}
f()
}()
// Wait for this waiter to be enqueued
<-enqueued
}

// All 5 waiters are now in the queue in order 0, 1, 2, 3, 4
// Cancel waiter 2 (middle of the queue) and wait for the cancellation result
cancels[2]()
cancelResult := <-results
require.Equal(t, 2, cancelResult.id)
require.Error(t, cancelResult.err)

// Release the resource - remaining waiters should acquire in FIFO order
free()

// Collect remaining results
var acquiredIDs []int
for range 4 {
select {
case r := <-results:
require.NoError(t, r.err)
acquiredIDs = append(acquiredIDs, r.id)
case <-time.After(time.Second):
t.Fatal("timed out waiting for results")
}
}

// Remaining waiters should acquire in order: 0, 1, 3, 4
assert.Equal(t, []int{0, 1, 3, 4}, acquiredIDs, "waiters should acquire in FIFO order, skipping cancelled")
}

// TestResourcePoolLimiter_BasicUsage tests basic Use/Free functionality.
func TestResourcePoolLimiter_BasicUsage(t *testing.T) {
ctx := context.Background()
limiter := GlobalResourcePoolLimiter(5)

// Use should work
require.NoError(t, limiter.Use(ctx, 3))

// Available should report 2
avail, err := limiter.Available(ctx)
require.NoError(t, err)
assert.Equal(t, 2, avail)

// Using more than available should fail
err = limiter.Use(ctx, 3)
require.Error(t, err)
var limitErr ErrorResourceLimited[int]
require.ErrorAs(t, err, &limitErr)
assert.Equal(t, 3, limitErr.Used)
assert.Equal(t, 5, limitErr.Limit)
assert.Equal(t, 3, limitErr.Amount)

// Free should work
require.NoError(t, limiter.Free(ctx, 3))

// Now should have 5 available
avail, err = limiter.Available(ctx)
require.NoError(t, err)
assert.Equal(t, 5, avail)
}

// setOnEnqueue sets a callback that is invoked each time a waiter is added to the queue.
// The callback is called with the mutex held. Used for testing to synchronize without sleeps.
func (u *resourcePoolUsage[N]) setOnEnqueue(fn func()) {
u.mu.Lock()
defer u.mu.Unlock()
u.onEnqueue = fn
}
Loading