From 317395d233c8f90eb4e57f58c25cf80957f419e7 Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Thu, 26 Mar 2026 17:24:24 +0100 Subject: [PATCH 1/5] fix idleDuration on chainedRateLimiter --- .../RateLimiting/ChainedRateLimiter.cs | 14 +++++++++----- .../tests/ChainedLimiterTests.cs | 19 +++++++++++++++---- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs index 1e786cbb70fd62..0715d4dcb42906 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs @@ -64,12 +64,16 @@ public override TimeSpan? IdleDuration foreach (RateLimiter limiter in _limiters) { - if (limiter.IdleDuration is { } idleDuration) + TimeSpan? idleDuration = limiter.IdleDuration; + if (idleDuration is null) { - if (lowestIdleDuration is null || idleDuration < lowestIdleDuration) - { - lowestIdleDuration = idleDuration; - } + // The chain should not be considered idle if any of its children is not idle. + return null; + } + + if (lowestIdleDuration is null || idleDuration < lowestIdleDuration) + { + lowestIdleDuration = idleDuration; } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs index 70e90b2ec397c1..fd3f5e426cafac 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs @@ -209,16 +209,27 @@ public async Task GetStatisticsHasCorrectValues() } [Fact] - public void IdleDurationReturnsLowestValue() + public void IdleDurationReturnsNullWhenAnyChildReturnsNull() { - using var limiter1 = new CustomizableLimiter(); + using var limiter1 = new CustomizableLimiter { IdleDurationImpl = () => null }; using var limiter2 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(2) }; using var limiter3 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(3) }; + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2, limiter3); + Assert.Null(chainedLimiter.IdleDuration); + } + + [Fact] + public void IdleDurationReturnsLowestValueWhenAllChildrenAreIdle() + { + using var limiter1 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(2) }; + using var limiter2 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(1) }; + using var limiter3 = new CustomizableLimiter { IdleDurationImpl = () => TimeSpan.FromMilliseconds(3) }; + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2, limiter3); var idleDuration = chainedLimiter.IdleDuration; - Assert.Equal(2, idleDuration.Value.TotalMilliseconds); + Assert.Equal(1, idleDuration.Value.TotalMilliseconds); } [Fact] @@ -809,7 +820,7 @@ public async Task AcquireAsyncWaitsForResourcesBeforeCallingNextLimiter() QueueProcessingOrder = QueueProcessingOrder.OldestFirst, QueueLimit = 0 }); - + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); var lease = chainedLimiter.AttemptAcquire(); From 7eccad934b2b277d6dbf97c8fe6db6668f3dc647 Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Thu, 26 Mar 2026 17:42:47 +0100 Subject: [PATCH 2/5] Replenishment on chainedRateLimiter --- .../RateLimiting/ChainedRateLimiter.cs | 25 +++ .../DefaultPartitionedRateLimiter.cs | 15 +- .../tests/PartitionedRateLimiterTests.cs | 143 ++++++++++++++++++ 3 files changed, 182 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs index 0715d4dcb42906..3fa17b492d5d5c 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs @@ -81,6 +81,31 @@ public override TimeSpan? IdleDuration } } + internal void TryReplenish() + { + List? exceptions = null; + foreach (RateLimiter limiter in _limiters) + { + if (limiter is ReplenishingRateLimiter replenishingRateLimiter) + { + try + { + replenishingRateLimiter.TryReplenish(); + } + catch (Exception ex) + { + exceptions ??= []; + exceptions.Add(ex); + } + } + } + + if (exceptions is not null) + { + throw new AggregateException(exceptions); + } + } + protected override RateLimitLease AttemptAcquireCore(int permitCount) { ThrowIfDisposed(); diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs index 5c58a1de5970dd..64f8b6c090dd98 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs @@ -259,7 +259,20 @@ private async Task Heartbeat() } catch (Exception ex) { - aggregateExceptions ??= new List(); + aggregateExceptions ??= []; + aggregateExceptions.Add(ex); + } + } + // ChainedRateLimiter is a special case: it may call replenish on its children + else if (rateLimiter.Value.Value is ChainedRateLimiter chainedRateLimiter) + { + try + { + chainedRateLimiter.TryReplenish(); + } + catch (Exception ex) + { + aggregateExceptions ??= []; aggregateExceptions.Add(ex); } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs index b0380a0dcd6a4f..21dbad67c48a22 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs @@ -632,6 +632,149 @@ public async Task ThrowingTryReplenishDoesNotPreventIdleLimiterBeingCleanedUp() await disposeTcs.Task; } + [Fact] + public async Task ChainedLimiter_HeartbeatCallsTryReplenishOnInnerReplenishingLimiters() + { + var replenishCallCount = 0; + CustomizableReplenishingLimiter replenishLimiter = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount++; return true; } + }; + CustomizableLimiter nonReplenishLimiter = new CustomizableLimiter(); + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => RateLimiter.CreateChained(nonReplenishLimiter, replenishLimiter)); + }); + + limiter.AttemptAcquire(""); + + await Utils.RunTimerFunc(limiter); + + Assert.Equal(1, replenishCallCount); + } + + [Fact] + public async Task ChainedLimiter_HeartbeatCallsTryReplenishOnAllInnerReplenishingLimiters() + { + var replenishCallCount1 = 0; + var replenishCallCount2 = 0; + CustomizableReplenishingLimiter replenishLimiter1 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount1++; return true; } + }; + CustomizableReplenishingLimiter replenishLimiter2 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount2++; return true; } + }; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => RateLimiter.CreateChained(replenishLimiter1, replenishLimiter2)); + }); + + limiter.AttemptAcquire(""); + + await Utils.RunTimerFunc(limiter); + + Assert.Equal(1, replenishCallCount1); + Assert.Equal(1, replenishCallCount2); + } + + [Fact] + public async Task ChainedLimiter_ThrowingTryReplenishStillReplenishesOtherLimiters() + { + var replenishCallCount = 0; + CustomizableReplenishingLimiter throwingLimiter = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => throw new Exception("replenish failed") + }; + CustomizableReplenishingLimiter goodLimiter = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => { replenishCallCount++; return true; } + }; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => RateLimiter.CreateChained(throwingLimiter, goodLimiter)); + }); + + limiter.AttemptAcquire(""); + + var ex = await Assert.ThrowsAsync(() => Utils.RunTimerFunc(limiter)); + Assert.Single(ex.InnerExceptions); + Assert.IsType(ex.InnerExceptions[0]); + + // The good limiter was still replenished despite the first one throwing + Assert.Equal(1, replenishCallCount); + } + + [Fact] + public async Task ChainedLimiter_IdleChainedLimiterWithNullChildIdleDurationNotEvicted() + { + var factoryCallCount = 0; + CustomizableReplenishingLimiter replenishLimiter = null; + CustomizableLimiter idleLimiter = null; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => + { + factoryCallCount++; + replenishLimiter = new CustomizableReplenishingLimiter(); + idleLimiter = new CustomizableLimiter(); + return RateLimiter.CreateChained(idleLimiter, replenishLimiter); + }); + }); + + limiter.AttemptAcquire(""); + Assert.Equal(1, factoryCallCount); + + // Idle limiter reports > 10s idle, but replenishing limiter reports null (still active) + idleLimiter.IdleDurationImpl = () => TimeSpan.FromMinutes(1); + replenishLimiter.IdleDurationImpl = () => null; + + await Utils.RunTimerFunc(limiter); + + // Factory should not have been called again — limiter was not evicted + limiter.AttemptAcquire(""); + Assert.Equal(1, factoryCallCount); + } + + [Fact] + public async Task ChainedLimiter_FullyIdleChainedLimiterIsEvicted() + { + var factoryCallCount = 0; + CustomizableLimiter innerLimiter1 = null; + CustomizableLimiter innerLimiter2 = null; + + using var limiter = Utils.CreatePartitionedLimiterWithoutTimer(resource => + { + return RateLimitPartition.Get(1, _ => + { + factoryCallCount++; + innerLimiter1 = new CustomizableLimiter(); + innerLimiter2 = new CustomizableLimiter(); + return RateLimiter.CreateChained(innerLimiter1, innerLimiter2); + }); + }); + + limiter.AttemptAcquire(""); + Assert.Equal(1, factoryCallCount); + + // Both children report idle > 10s, chain should be evicted + innerLimiter1.IdleDurationImpl = () => TimeSpan.FromMinutes(1); + innerLimiter2.IdleDurationImpl = () => TimeSpan.FromMinutes(1); + innerLimiter1.DisposeAsyncCoreImpl = () => default; + innerLimiter2.DisposeAsyncCoreImpl = () => default; + + await Utils.RunTimerFunc(limiter); + + // Factory should be called again on next acquire — limiter was evicted and recreated + limiter.AttemptAcquire(""); + Assert.Equal(2, factoryCallCount); + } + // Translate [Fact] From 7e8bed8afa55936ac0fbbc9f846f182bf1adad99 Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Thu, 26 Mar 2026 17:50:49 +0100 Subject: [PATCH 3/5] nit comment --- .../Threading/RateLimiting/DefaultPartitionedRateLimiter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs index 64f8b6c090dd98..cb988868516145 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs @@ -263,7 +263,7 @@ private async Task Heartbeat() aggregateExceptions.Add(ex); } } - // ChainedRateLimiter is a special case: it may call replenish on its children + // ChainedRateLimiter is a special case: it has to invoke replenish on its children else if (rateLimiter.Value.Value is ChainedRateLimiter chainedRateLimiter) { try From aada9751646a7ff1c4aa45ebc1aa108c8a5f4241 Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Fri, 27 Mar 2026 10:50:28 +0100 Subject: [PATCH 4/5] separate chainedReplenishingRateLimiter impl --- .../src/System.Threading.RateLimiting.csproj | 1 + .../RateLimiting/ChainedRateLimiter.cs | 125 ++++++------ .../ChainedReplenishingRateLimiter.cs | 112 +++++++++++ .../DefaultPartitionedRateLimiter.cs | 13 -- .../Threading/RateLimiting/RateLimiter.cs | 8 + .../tests/ChainedLimiterTests.cs | 182 ++++++++++++++++++ .../tests/Infrastructure/Utils.cs | 3 +- 7 files changed, 363 insertions(+), 81 deletions(-) create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs diff --git a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj index 8a42115792903d..35b31d13cac990 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj +++ b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj @@ -14,6 +14,7 @@ System.Threading.RateLimiting.RateLimitLease + diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs index 3fa17b492d5d5c..5fa4ea6af4c077 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedRateLimiter.cs @@ -24,13 +24,51 @@ public ChainedRateLimiter(RateLimiter[] limiters) public override RateLimiterStatistics? GetStatistics() { ThrowIfDisposed(); + return GetStatisticsCore(_limiters); + } + + public override TimeSpan? IdleDuration + { + get + { + ThrowIfDisposed(); + return GetIdleDurationCore(_limiters); + } + } + + protected override RateLimitLease AttemptAcquireCore(int permitCount) + { + ThrowIfDisposed(); + return AttemptAcquireChained(_limiters, permitCount); + } + + protected override async ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return await AcquireAsyncChained(_limiters, permitCount, cancellationToken).ConfigureAwait(false); + } + protected override void Dispose(bool disposing) + { + _disposed = true; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ChainedRateLimiter)); + } + } + + internal static RateLimiterStatistics GetStatisticsCore(RateLimiter[] limiters) + { long lowestAvailablePermits = long.MaxValue; long currentQueuedCount = 0; long totalFailedLeases = 0; long innerMostSuccessfulLeases = 0; - foreach (RateLimiter limiter in _limiters) + foreach (RateLimiter limiter in limiters) { if (limiter.GetStatistics() is { } statistics) { @@ -54,79 +92,47 @@ public ChainedRateLimiter(RateLimiter[] limiters) }; } - public override TimeSpan? IdleDuration + internal static TimeSpan? GetIdleDurationCore(RateLimiter[] limiters) { - get - { - ThrowIfDisposed(); - - TimeSpan? lowestIdleDuration = null; + TimeSpan? lowestIdleDuration = null; - foreach (RateLimiter limiter in _limiters) + foreach (RateLimiter limiter in limiters) + { + TimeSpan? idleDuration = limiter.IdleDuration; + if (idleDuration is null) { - TimeSpan? idleDuration = limiter.IdleDuration; - if (idleDuration is null) - { - // The chain should not be considered idle if any of its children is not idle. - return null; - } - - if (lowestIdleDuration is null || idleDuration < lowestIdleDuration) - { - lowestIdleDuration = idleDuration; - } + // The chain should not be considered idle if any of its children is not idle. + return null; } - return lowestIdleDuration; - } - } - - internal void TryReplenish() - { - List? exceptions = null; - foreach (RateLimiter limiter in _limiters) - { - if (limiter is ReplenishingRateLimiter replenishingRateLimiter) + if (lowestIdleDuration is null || idleDuration < lowestIdleDuration) { - try - { - replenishingRateLimiter.TryReplenish(); - } - catch (Exception ex) - { - exceptions ??= []; - exceptions.Add(ex); - } + lowestIdleDuration = idleDuration; } } - if (exceptions is not null) - { - throw new AggregateException(exceptions); - } + return lowestIdleDuration; } - protected override RateLimitLease AttemptAcquireCore(int permitCount) + internal static RateLimitLease AttemptAcquireChained(RateLimiter[] limiters, int permitCount) { - ThrowIfDisposed(); - RateLimitLease[]? leases = null; - for (int i = 0; i < _limiters.Length; i++) + for (int i = 0; i < limiters.Length; i++) { RateLimitLease? lease = null; Exception? exception = null; try { - lease = _limiters[i].AttemptAcquire(permitCount); + lease = limiters[i].AttemptAcquire(permitCount); } catch (Exception ex) { exception = ex; } - RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); + RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, limiters.Length); if (notAcquiredLease is not null) { @@ -137,27 +143,25 @@ protected override RateLimitLease AttemptAcquireCore(int permitCount) return new CombinedRateLimitLease(leases!); } - protected override async ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken) + internal static async ValueTask AcquireAsyncChained(RateLimiter[] limiters, int permitCount, CancellationToken cancellationToken) { - ThrowIfDisposed(); - RateLimitLease[]? leases = null; - for (int i = 0; i < _limiters.Length; i++) + for (int i = 0; i < limiters.Length; i++) { RateLimitLease? lease = null; Exception? exception = null; try { - lease = await _limiters[i].AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false); + lease = await limiters[i].AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { exception = ex; } - RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); + RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, limiters.Length); if (notAcquiredLease is not null) { @@ -168,19 +172,6 @@ protected override async ValueTask AcquireAsyncCore(int permitCo return new CombinedRateLimitLease(leases!); } - protected override void Dispose(bool disposing) - { - _disposed = true; - } - - private void ThrowIfDisposed() - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(ChainedRateLimiter)); - } - } - internal static RateLimitLease? CommonAcquireLogic(Exception? ex, RateLimitLease? lease, ref RateLimitLease[]? leases, int index, int length) { if (ex is not null) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs new file mode 100644 index 00000000000000..2aa6e5ff41cdc8 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs @@ -0,0 +1,112 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// A chained rate limiter that extends when at least one of the + /// chained limiters is a . + /// + internal sealed class ChainedReplenishingRateLimiter : ReplenishingRateLimiter + { + private readonly RateLimiter[] _limiters; + private readonly ReplenishingRateLimiter[] _replenishingLimiters; + private readonly bool _isAutoReplenishing; + private readonly TimeSpan _replenishmentPeriod; + private bool _disposed; + + public ChainedReplenishingRateLimiter(RateLimiter[] limiters) + { + _limiters = (RateLimiter[])limiters.Clone(); + + var replenishingLimiters = new List(); + bool isAutoReplenishing = true; + TimeSpan lowestPeriod = TimeSpan.MaxValue; + + foreach (RateLimiter limiter in _limiters) + { + if (limiter is ReplenishingRateLimiter replenishing) + { + replenishingLimiters.Add(replenishing); + + if (!replenishing.IsAutoReplenishing) + { + isAutoReplenishing = false; + } + + TimeSpan period = replenishing.ReplenishmentPeriod; + if (period > TimeSpan.Zero && period < lowestPeriod) + { + lowestPeriod = period; + } + } + } + + _replenishingLimiters = replenishingLimiters.ToArray(); + _isAutoReplenishing = isAutoReplenishing; + _replenishmentPeriod = lowestPeriod == TimeSpan.MaxValue ? TimeSpan.Zero : lowestPeriod; + } + + public override bool IsAutoReplenishing => _isAutoReplenishing; + + public override TimeSpan ReplenishmentPeriod => _replenishmentPeriod; + + public override bool TryReplenish() + { + ThrowIfDisposed(); + + bool replenished = false; + foreach (ReplenishingRateLimiter limiter in _replenishingLimiters) + { + if (limiter.TryReplenish()) + { + replenished = true; + } + } + return replenished; + } + + public override RateLimiterStatistics? GetStatistics() + { + ThrowIfDisposed(); + return ChainedRateLimiter.GetStatisticsCore(_limiters); + } + + public override TimeSpan? IdleDuration + { + get + { + ThrowIfDisposed(); + return ChainedRateLimiter.GetIdleDurationCore(_limiters); + } + } + + protected override RateLimitLease AttemptAcquireCore(int permitCount) + { + ThrowIfDisposed(); + return ChainedRateLimiter.AttemptAcquireChained(_limiters, permitCount); + } + + protected override async ValueTask AcquireAsyncCore(int permitCount, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return await ChainedRateLimiter.AcquireAsyncChained(_limiters, permitCount, cancellationToken).ConfigureAwait(false); + } + + protected override void Dispose(bool disposing) + { + _disposed = true; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ChainedReplenishingRateLimiter)); + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs index cb988868516145..5988d0b1b59f8e 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs @@ -263,19 +263,6 @@ private async Task Heartbeat() aggregateExceptions.Add(ex); } } - // ChainedRateLimiter is a special case: it has to invoke replenish on its children - else if (rateLimiter.Value.Value is ChainedRateLimiter chainedRateLimiter) - { - try - { - chainedRateLimiter.TryReplenish(); - } - catch (Exception ex) - { - aggregateExceptions ??= []; - aggregateExceptions.Add(ex); - } - } } foreach (RateLimiter limiter in _limitersToDispose) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs index 428c92504f9325..7307430622cb68 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs @@ -39,6 +39,14 @@ public static RateLimiter CreateChained(params RateLimiter[] limiters) throw new ArgumentException("Must pass in at least 1 limiter.", nameof(limiters)); } + foreach (RateLimiter limiter in limiters) + { + if (limiter is ReplenishingRateLimiter) + { + return new ChainedReplenishingRateLimiter(limiters); + } + } + return new ChainedRateLimiter(limiters); } diff --git a/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs index fd3f5e426cafac..6636a24711ae48 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs @@ -10,6 +10,188 @@ namespace System.Threading.RateLimiting.Tests { public class ChainedLimiterTests { + [Fact] + public void CreateChainedReturnsRateLimiterWhenNoReplenishingLimiters() + { + using var limiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + using var limiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + Assert.False(chainedLimiter is ReplenishingRateLimiter); + } + + [Fact] + public void CreateChainedReturnsReplenishingRateLimiterWhenAnyReplenishingLimiter() + { + using var limiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + using var limiter2 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(1), + TokensPerPeriod = 1, + AutoReplenishment = false + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + Assert.IsAssignableFrom(chainedLimiter); + } + + [Fact] + public void ReplenishingChainedLimiter_IsAutoReplenishingTrue_WhenAllReplenishingAreAutoReplenishing() + { + using var limiter1 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(1), + TokensPerPeriod = 1, + AutoReplenishment = true + }); + using var limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + Window = TimeSpan.FromSeconds(2), + AutoReplenishment = true + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.True(replenishing.IsAutoReplenishing); + } + + [Fact] + public void ReplenishingChainedLimiter_IsAutoReplenishingFalse_WhenAnyReplenishingIsNotAutoReplenishing() + { + using var limiter1 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(1), + TokensPerPeriod = 1, + AutoReplenishment = true + }); + using var limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + Window = TimeSpan.FromSeconds(2), + AutoReplenishment = false + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.False(replenishing.IsAutoReplenishing); + } + + [Fact] + public void ReplenishingChainedLimiter_ReplenishmentPeriod_IsLowestPositiveValue() + { + using var limiter1 = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions + { + TokenLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + ReplenishmentPeriod = TimeSpan.FromSeconds(5), + TokensPerPeriod = 1, + AutoReplenishment = false + }); + using var limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0, + Window = TimeSpan.FromSeconds(2), + AutoReplenishment = false + }); + using var nonReplenishing = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 0 + }); + + using var chainedLimiter = RateLimiter.CreateChained(limiter1, limiter2, nonReplenishing); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.Equal(TimeSpan.FromSeconds(2), replenishing.ReplenishmentPeriod); + } + + [Fact] + public void ReplenishingChainedLimiter_TryReplenish_CallsAllReplenishingLimiters() + { + var replenishingLimiter1 = new CustomizableReplenishingLimiter(); + bool replenish1Called = false; + replenishingLimiter1.TryReplenishImpl = () => + { + replenish1Called = true; + return true; + }; + + var replenishingLimiter2 = new CustomizableReplenishingLimiter(); + bool replenish2Called = false; + replenishingLimiter2.TryReplenishImpl = () => + { + replenish2Called = true; + return false; + }; + + using var nonReplenishing = new CustomizableLimiter(); + + using var chainedLimiter = RateLimiter.CreateChained(replenishingLimiter1, nonReplenishing, replenishingLimiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + bool result = replenishing.TryReplenish(); + + Assert.True(replenish1Called); + Assert.True(replenish2Called); + Assert.True(result); + } + + [Fact] + public void ReplenishingChainedLimiter_TryReplenish_ReturnsFalseWhenNoneReplenished() + { + var replenishingLimiter1 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => false + }; + + var replenishingLimiter2 = new CustomizableReplenishingLimiter + { + TryReplenishImpl = () => false + }; + + using var chainedLimiter = RateLimiter.CreateChained(replenishingLimiter1, replenishingLimiter2); + + var replenishing = Assert.IsAssignableFrom(chainedLimiter); + Assert.False(replenishing.TryReplenish()); + } + [Fact] public void ThrowsWhenNoLimitersProvided() { diff --git a/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs b/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs index fc8091b21fdc16..c807c197f1ee0a 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs @@ -203,7 +203,8 @@ internal sealed class CustomizableReplenishingLimiter : ReplenishingRateLimiter public override bool IsAutoReplenishing => false; - public override TimeSpan ReplenishmentPeriod => throw new NotImplementedException(); + public Func ReplenishmentPeriodImpl { get; set; } = () => TimeSpan.Zero; + public override TimeSpan ReplenishmentPeriod => ReplenishmentPeriodImpl(); public Func TryReplenishImpl { get; set; } = () => true; public override bool TryReplenish() => TryReplenishImpl(); From 527fb794d46740ca504a62943bf58586a1c5104a Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Fri, 27 Mar 2026 10:54:10 +0100 Subject: [PATCH 5/5] aggregate exceptions on chainedReplenishingRateLimiter --- .../ChainedReplenishingRateLimiter.cs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs index 2aa6e5ff41cdc8..48a3f0b321073c 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedReplenishingRateLimiter.cs @@ -59,13 +59,28 @@ public override bool TryReplenish() ThrowIfDisposed(); bool replenished = false; + List? exceptions = null; foreach (ReplenishingRateLimiter limiter in _replenishingLimiters) { - if (limiter.TryReplenish()) + try { - replenished = true; + if (limiter.TryReplenish()) + { + replenished = true; + } + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); } } + + if (exceptions is not null) + { + throw new AggregateException(exceptions); + } + return replenished; }