Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,59 @@ public void testOneSecondBurst() {
"R0.20");
}

public void testCreateWithMaxBurstSeconds_prefillsBucket() {
// Bucket starts full: burst of 5*2=10 permits available immediately (no prior sleep).
RateLimiter limiter = RateLimiter.create(5.0, 2.0, stopwatch);
limiter.acquire(1); // R0.00, from pre-filled bucket
limiter.acquire(1); // R0.00, from pre-filled bucket
limiter.acquire(3); // R0.00, from pre-filled bucket
limiter.acquire(5); // R0.00, drains remaining 5 permits from pre-filled bucket
limiter.acquire(); // R0.20, bucket exhausted
assertEvents("R0.00", "R0.00", "R0.00", "R0.00", "R0.20");
}

public void testCreateWithMaxBurstSeconds_vsCreateWithoutPrefill() {
// create(5.0) starts empty: first acquire is free but burst requires sleeping first.
RateLimiter empty = RateLimiter.create(5.0, stopwatch);
empty.acquire(5); // R0.00, first acquire always free
empty.acquire(1); // R1.00, must wait for 5 permits' debt
assertEvents("R0.00", "R1.00");

// create(5.0, 1.0) starts full: can serve 5-permit burst without any sleep.
RateLimiter prefilled = RateLimiter.create(5.0, 1.0, stopwatch);
prefilled.acquire(5); // R0.00, consumed from pre-filled bucket
prefilled.acquire(1); // R1.00, must wait after draining bucket
assertEvents("R0.00", "R1.00");
}

public void testCreateWithMaxBurstSeconds_rateAfterBurstIsStable() {
RateLimiter limiter = RateLimiter.create(5.0, 1.0, stopwatch);
limiter.acquire(5); // R0.00, drains the pre-filled bucket
limiter.acquire(); // R1.00, now at stable rate
limiter.acquire(); // R0.20
limiter.acquire(); // R0.20
assertEvents("R0.00", "R1.00", "R0.20", "R0.20");
}

public void testCreateWithMaxBurstSeconds_accumulatesAfterIdle() {
RateLimiter limiter = RateLimiter.create(5.0, 2.0, stopwatch);
limiter.acquire(10); // R0.00, drains pre-filled bucket (10 permits)
limiter.acquire(); // R2.00, repays 10-permit debt
stopwatch.sleepMillis(2000); // idle: accumulates up to 10 permits again
limiter.acquire(10); // R0.00, burst from re-accumulated permits
limiter.acquire(); // R2.00
assertEvents("R0.00", "R2.00", "U2.00", "R0.00", "R2.00");
}

public void testCreateWithMaxBurstSeconds_parameterValidation() {
assertThrows(
IllegalArgumentException.class, () -> RateLimiter.create(1.0, 0.0, stopwatch));
assertThrows(
IllegalArgumentException.class, () -> RateLimiter.create(1.0, -1.0, stopwatch));
assertThrows(
IllegalArgumentException.class, () -> RateLimiter.create(0.0, 1.0, stopwatch));
}

public void testCreateWarmupParameterValidation() {
RateLimiter unused;
unused = RateLimiter.create(1.0, 1, NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch)
return rateLimiter;
}

/**
* Creates a {@code RateLimiter} with the specified stable throughput and a pre-filled burst
* capacity. Unlike {@link #create(double)}, the returned {@code RateLimiter} starts with a full
* token bucket holding up to {@code maxBurstSeconds} seconds' worth of permits, enabling it to
* absorb an initial traffic surge immediately without throttling.
*
* <p>The returned {@code RateLimiter} ensures that on average no more than {@code
* permitsPerSecond} are issued during any given second. When unused, permits accumulate up to a
* maximum of {@code maxBurstSeconds * permitsPerSecond}.
*
* @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in how many
* permits become available per second
* @param maxBurstSeconds the maximum number of seconds' worth of permits that can be saved up;
* the token bucket is pre-filled to this capacity on creation
* @throws IllegalArgumentException if {@code permitsPerSecond} is negative or zero, or if
* {@code maxBurstSeconds} is negative or zero
*/
public static RateLimiter create(double permitsPerSecond, double maxBurstSeconds) {
return create(permitsPerSecond, maxBurstSeconds, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(
double permitsPerSecond, double maxBurstSeconds, SleepingStopwatch stopwatch) {
checkArgument(maxBurstSeconds > 0.0, "maxBurstSeconds must be positive");
SmoothBursty rateLimiter = new SmoothBursty(stopwatch, maxBurstSeconds);
rateLimiter.setRate(permitsPerSecond);
rateLimiter.prefillStoredPermits();
return rateLimiter;
}

/**
* Creates a {@code RateLimiter} with the specified stable throughput, given as "permits per
* second" (commonly referred to as <i>QPS</i>, queries per second), and a <i>warmup period</i>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
}
}

void prefillStoredPermits() {
storedPermits = maxPermits;
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,59 @@ public void testOneSecondBurst() {
"R0.20");
}

public void testCreateWithMaxBurstSeconds_prefillsBucket() {
// Bucket starts full: burst of 5*2=10 permits available immediately (no prior sleep).
RateLimiter limiter = RateLimiter.create(5.0, 2.0, stopwatch);
limiter.acquire(1); // R0.00, from pre-filled bucket
limiter.acquire(1); // R0.00, from pre-filled bucket
limiter.acquire(3); // R0.00, from pre-filled bucket
limiter.acquire(5); // R0.00, drains remaining 5 permits from pre-filled bucket
limiter.acquire(); // R0.20, bucket exhausted
assertEvents("R0.00", "R0.00", "R0.00", "R0.00", "R0.20");
}

public void testCreateWithMaxBurstSeconds_vsCreateWithoutPrefill() {
// create(5.0) starts empty: first acquire is free but burst requires sleeping first.
RateLimiter empty = RateLimiter.create(5.0, stopwatch);
empty.acquire(5); // R0.00, first acquire always free
empty.acquire(1); // R1.00, must wait for 5 permits' debt
assertEvents("R0.00", "R1.00");

// create(5.0, 1.0) starts full: can serve 5-permit burst without any sleep.
RateLimiter prefilled = RateLimiter.create(5.0, 1.0, stopwatch);
prefilled.acquire(5); // R0.00, consumed from pre-filled bucket
prefilled.acquire(1); // R1.00, must wait after draining bucket
assertEvents("R0.00", "R1.00");
}

public void testCreateWithMaxBurstSeconds_rateAfterBurstIsStable() {
RateLimiter limiter = RateLimiter.create(5.0, 1.0, stopwatch);
limiter.acquire(5); // R0.00, drains the pre-filled bucket
limiter.acquire(); // R1.00, now at stable rate
limiter.acquire(); // R0.20
limiter.acquire(); // R0.20
assertEvents("R0.00", "R1.00", "R0.20", "R0.20");
}

public void testCreateWithMaxBurstSeconds_accumulatesAfterIdle() {
RateLimiter limiter = RateLimiter.create(5.0, 2.0, stopwatch);
limiter.acquire(10); // R0.00, drains pre-filled bucket (10 permits)
limiter.acquire(); // R2.00, repays 10-permit debt
stopwatch.sleepMillis(2000); // idle: accumulates up to 10 permits again
limiter.acquire(10); // R0.00, burst from re-accumulated permits
limiter.acquire(); // R2.00
assertEvents("R0.00", "R2.00", "U2.00", "R0.00", "R2.00");
}

public void testCreateWithMaxBurstSeconds_parameterValidation() {
assertThrows(
IllegalArgumentException.class, () -> RateLimiter.create(1.0, 0.0, stopwatch));
assertThrows(
IllegalArgumentException.class, () -> RateLimiter.create(1.0, -1.0, stopwatch));
assertThrows(
IllegalArgumentException.class, () -> RateLimiter.create(0.0, 1.0, stopwatch));
}

public void testCreateWarmupParameterValidation() {
RateLimiter unused;
unused = RateLimiter.create(1.0, 1, NANOSECONDS);
Expand Down
31 changes: 31 additions & 0 deletions guava/src/com/google/common/util/concurrent/RateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch)
return rateLimiter;
}

/**
* Creates a {@code RateLimiter} with the specified stable throughput and a pre-filled burst
* capacity. Unlike {@link #create(double)}, the returned {@code RateLimiter} starts with a full
* token bucket holding up to {@code maxBurstSeconds} seconds' worth of permits, enabling it to
* absorb an initial traffic surge immediately without throttling.
*
* <p>The returned {@code RateLimiter} ensures that on average no more than {@code
* permitsPerSecond} are issued during any given second. When unused, permits accumulate up to a
* maximum of {@code maxBurstSeconds * permitsPerSecond}.
*
* @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in how many
* permits become available per second
* @param maxBurstSeconds the maximum number of seconds' worth of permits that can be saved up;
* the token bucket is pre-filled to this capacity on creation
* @throws IllegalArgumentException if {@code permitsPerSecond} is negative or zero, or if
* {@code maxBurstSeconds} is negative or zero
*/
public static RateLimiter create(double permitsPerSecond, double maxBurstSeconds) {
return create(permitsPerSecond, maxBurstSeconds, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(
double permitsPerSecond, double maxBurstSeconds, SleepingStopwatch stopwatch) {
checkArgument(maxBurstSeconds > 0.0, "maxBurstSeconds must be positive");
SmoothBursty rateLimiter = new SmoothBursty(stopwatch, maxBurstSeconds);
rateLimiter.setRate(permitsPerSecond);
rateLimiter.prefillStoredPermits();
return rateLimiter;
}

/**
* Creates a {@code RateLimiter} with the specified stable throughput, given as "permits per
* second" (commonly referred to as <i>QPS</i>, queries per second), and a <i>warmup period</i>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
}
}

void prefillStoredPermits() {
storedPermits = maxPermits;
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
Expand Down