Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>

<ItemGroup>
<Compile Include="System\Threading\RateLimiting\ChainedRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\ChainedReplenishingRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\ChainedPartitionedRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\CombinedRateLimitLease.cs" />
<Compile Include="System\Threading\RateLimiting\ConcurrencyLimiter.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,51 @@ public ChainedRateLimiter(RateLimiter[] limiters)
public override RateLimiterStatistics? GetStatistics()
{
ThrowIfDisposed();
return GetStatisticsCore(_limiters);
}

public override TimeSpan? IdleDuration
{
get
{
ThrowIfDisposed();
return GetIdleDurationCore(_limiters);
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
{
ThrowIfDisposed();
return AttemptAcquireChained(_limiters, permitCount);
}

protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
ThrowIfDisposed();
return await AcquireAsyncChained(_limiters, permitCount, cancellationToken).ConfigureAwait(false);
}

protected override void Dispose(bool disposing)
{
_disposed = true;
}

private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ChainedRateLimiter));
}
}

internal static RateLimiterStatistics GetStatisticsCore(RateLimiter[] limiters)
{
long lowestAvailablePermits = long.MaxValue;
long currentQueuedCount = 0;
long totalFailedLeases = 0;
long innerMostSuccessfulLeases = 0;

foreach (RateLimiter limiter in _limiters)
foreach (RateLimiter limiter in limiters)
{
if (limiter.GetStatistics() is { } statistics)
{
Expand All @@ -54,50 +92,47 @@ public ChainedRateLimiter(RateLimiter[] limiters)
};
}

public override TimeSpan? IdleDuration
internal static TimeSpan? GetIdleDurationCore(RateLimiter[] limiters)
{
get
{
ThrowIfDisposed();

TimeSpan? lowestIdleDuration = null;
TimeSpan? lowestIdleDuration = null;

foreach (RateLimiter limiter in _limiters)
foreach (RateLimiter limiter in limiters)
{
TimeSpan? idleDuration = limiter.IdleDuration;
if (idleDuration is null)
{
if (limiter.IdleDuration is { } idleDuration)
{
if (lowestIdleDuration is null || idleDuration < lowestIdleDuration)
{
lowestIdleDuration = idleDuration;
}
}
// The chain should not be considered idle if any of its children is not idle.
return null;
}

return lowestIdleDuration;
if (lowestIdleDuration is null || idleDuration < lowestIdleDuration)
{
lowestIdleDuration = idleDuration;
}
}

return lowestIdleDuration;
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
internal static RateLimitLease AttemptAcquireChained(RateLimiter[] limiters, int permitCount)
{
ThrowIfDisposed();

RateLimitLease[]? leases = null;

for (int i = 0; i < _limiters.Length; i++)
for (int i = 0; i < limiters.Length; i++)
{
RateLimitLease? lease = null;
Exception? exception = null;

try
{
lease = _limiters[i].AttemptAcquire(permitCount);
lease = limiters[i].AttemptAcquire(permitCount);
}
catch (Exception ex)
{
exception = ex;
}

RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);
RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, limiters.Length);

if (notAcquiredLease is not null)
{
Expand All @@ -108,27 +143,25 @@ protected override RateLimitLease AttemptAcquireCore(int permitCount)
return new CombinedRateLimitLease(leases!);
}

protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
internal static async ValueTask<RateLimitLease> AcquireAsyncChained(RateLimiter[] limiters, int permitCount, CancellationToken cancellationToken)
{
ThrowIfDisposed();

RateLimitLease[]? leases = null;

for (int i = 0; i < _limiters.Length; i++)
for (int i = 0; i < limiters.Length; i++)
{
RateLimitLease? lease = null;
Exception? exception = null;

try
{
lease = await _limiters[i].AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false);
lease = await limiters[i].AcquireAsync(permitCount, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
exception = ex;
}

RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);
RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, limiters.Length);

if (notAcquiredLease is not null)
{
Expand All @@ -139,19 +172,6 @@ protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCo
return new CombinedRateLimitLease(leases!);
}

protected override void Dispose(bool disposing)
{
_disposed = true;
}

private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ChainedRateLimiter));
}
}

internal static RateLimitLease? CommonAcquireLogic(Exception? ex, RateLimitLease? lease, ref RateLimitLease[]? leases, int index, int length)
{
if (ex is not null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Threading.Tasks;

namespace System.Threading.RateLimiting
{
/// <summary>
/// A chained rate limiter that extends <see cref="ReplenishingRateLimiter"/> when at least one of the
/// chained limiters is a <see cref="ReplenishingRateLimiter"/>.
/// </summary>
internal sealed class ChainedReplenishingRateLimiter : ReplenishingRateLimiter
{
private readonly RateLimiter[] _limiters;
private readonly ReplenishingRateLimiter[] _replenishingLimiters;
private readonly bool _isAutoReplenishing;
private readonly TimeSpan _replenishmentPeriod;
private bool _disposed;

public ChainedReplenishingRateLimiter(RateLimiter[] limiters)
{
_limiters = (RateLimiter[])limiters.Clone();

var replenishingLimiters = new List<ReplenishingRateLimiter>();
bool isAutoReplenishing = true;
TimeSpan lowestPeriod = TimeSpan.MaxValue;

foreach (RateLimiter limiter in _limiters)
{
if (limiter is ReplenishingRateLimiter replenishing)
{
replenishingLimiters.Add(replenishing);

if (!replenishing.IsAutoReplenishing)
{
isAutoReplenishing = false;
}

TimeSpan period = replenishing.ReplenishmentPeriod;
if (period > TimeSpan.Zero && period < lowestPeriod)
{
lowestPeriod = period;
}
}
}

_replenishingLimiters = replenishingLimiters.ToArray();
_isAutoReplenishing = isAutoReplenishing;
_replenishmentPeriod = lowestPeriod == TimeSpan.MaxValue ? TimeSpan.Zero : lowestPeriod;
}

public override bool IsAutoReplenishing => _isAutoReplenishing;

public override TimeSpan ReplenishmentPeriod => _replenishmentPeriod;

public override bool TryReplenish()
{
ThrowIfDisposed();

bool replenished = false;
List<Exception>? exceptions = null;
foreach (ReplenishingRateLimiter limiter in _replenishingLimiters)
{
try
{
if (limiter.TryReplenish())
{
replenished = true;
}
}
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
}

if (exceptions is not null)
{
throw new AggregateException(exceptions);
}

return replenished;
}

public override RateLimiterStatistics? GetStatistics()
{
ThrowIfDisposed();
return ChainedRateLimiter.GetStatisticsCore(_limiters);
}

public override TimeSpan? IdleDuration
{
get
{
ThrowIfDisposed();
return ChainedRateLimiter.GetIdleDurationCore(_limiters);
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
{
ThrowIfDisposed();
return ChainedRateLimiter.AttemptAcquireChained(_limiters, permitCount);
}

protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
ThrowIfDisposed();
return await ChainedRateLimiter.AcquireAsyncChained(_limiters, permitCount, cancellationToken).ConfigureAwait(false);
}

protected override void Dispose(bool disposing)
{
_disposed = true;
}

private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ChainedReplenishingRateLimiter));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private async Task Heartbeat()
}
catch (Exception ex)
{
aggregateExceptions ??= new List<Exception>();
aggregateExceptions ??= [];
aggregateExceptions.Add(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public static RateLimiter CreateChained(params RateLimiter[] limiters)
throw new ArgumentException("Must pass in at least 1 limiter.", nameof(limiters));
}

foreach (RateLimiter limiter in limiters)
{
if (limiter is ReplenishingRateLimiter)
{
return new ChainedReplenishingRateLimiter(limiters);
}
}

return new ChainedRateLimiter(limiters);
}

Expand Down
Loading
Loading