Skip to content

Commit 2127a41

Browse files
committed
Inject Clock into connection pools to make time-based behavior testable
1 parent 23bb1c8 commit 2127a41

7 files changed

Lines changed: 377 additions & 55 deletions

File tree

httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727
package org.apache.hc.core5.pool;
2828

29+
import java.time.Clock;
2930
import java.util.Deque;
3031
import java.util.HashSet;
3132
import java.util.Iterator;
@@ -78,6 +79,8 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
7879
private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
7980
private final AtomicBoolean isShutDown;
8081

82+
private final Clock clock;
83+
8184
private volatile int defaultMaxPerRoute;
8285

8386
/**
@@ -89,6 +92,16 @@ public LaxConnPool(
8992
final PoolReusePolicy policy,
9093
final DisposalCallback<C> disposalCallback,
9194
final ConnPoolListener<T> connPoolListener) {
95+
this(defaultMaxPerRoute, timeToLive, policy, disposalCallback, connPoolListener, Clock.systemUTC());
96+
}
97+
98+
LaxConnPool(
99+
final int defaultMaxPerRoute,
100+
final TimeValue timeToLive,
101+
final PoolReusePolicy policy,
102+
final DisposalCallback<C> disposalCallback,
103+
final ConnPoolListener<T> connPoolListener,
104+
final Clock clock) {
92105
super();
93106
Args.positive(defaultMaxPerRoute, "Max per route value");
94107
this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
@@ -98,6 +111,7 @@ public LaxConnPool(
98111
this.routeToPool = new ConcurrentHashMap<>();
99112
this.isShutDown = new AtomicBoolean();
100113
this.defaultMaxPerRoute = defaultMaxPerRoute;
114+
this.clock = Args.notNull(clock, "clock");
101115
}
102116

103117
/**
@@ -145,7 +159,8 @@ private PerRoutePool<T, C> getPool(final T route) {
145159
policy,
146160
this,
147161
disposalCallback,
148-
connPoolListener);
162+
connPoolListener,
163+
clock);
149164
routePool = routeToPool.putIfAbsent(route, newRoutePool);
150165
if (routePool == null) {
151166
routePool = newRoutePool;
@@ -266,7 +281,7 @@ public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
266281

267282
@Override
268283
public void closeIdle(final TimeValue idleTime) {
269-
final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
284+
final long deadline = clock.millis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
270285
enumAvailable(entry -> {
271286
if (entry.getUpdated() <= deadline) {
272287
entry.discardConnection(CloseMode.GRACEFUL);
@@ -276,7 +291,7 @@ public void closeIdle(final TimeValue idleTime) {
276291

277292
@Override
278293
public void closeExpired() {
279-
final long now = System.currentTimeMillis();
294+
final long now = clock.millis();
280295
enumAvailable(entry -> {
281296
if (entry.getExpiryDeadline().isBefore(now)) {
282297
entry.discardConnection(CloseMode.GRACEFUL);
@@ -306,11 +321,11 @@ static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
306321

307322
LeaseRequest(
308323
final Object state,
309-
final Timeout requestTimeout,
324+
final Deadline deadline,
310325
final BasicFuture<PoolEntry<T, C>> future) {
311326
super();
312327
this.state = state;
313-
this.deadline = Deadline.calculate(requestTimeout);
328+
this.deadline = deadline;
314329
this.future = future;
315330
}
316331

@@ -362,6 +377,8 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
362377
private final AtomicInteger allocated;
363378
private final AtomicLong releaseSeqNum;
364379

380+
private final Clock clock;
381+
365382
private volatile int max;
366383

367384
PerRoutePool(
@@ -371,7 +388,8 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
371388
final PoolReusePolicy policy,
372389
final ConnPoolStats<T> connPoolStats,
373390
final DisposalCallback<C> disposalCallback,
374-
final ConnPoolListener<T> connPoolListener) {
391+
final ConnPoolListener<T> connPoolListener,
392+
final Clock clock) {
375393
super();
376394
this.route = route;
377395
this.timeToLive = timeToLive;
@@ -386,6 +404,7 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
386404
this.allocated = new AtomicInteger(0);
387405
this.releaseSeqNum = new AtomicLong(0);
388406
this.max = max;
407+
this.clock = Args.notNull(clock, "clock");
389408
}
390409

391410
public void shutdown(final CloseMode closeMode) {
@@ -412,7 +431,7 @@ private PoolEntry<T, C> createPoolEntry() {
412431
prev = allocated.get();
413432
next = (prev < poolMax) ? prev + 1 : prev;
414433
} while (!allocated.compareAndSet(prev, next));
415-
return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
434+
return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback, clock) : null;
416435
}
417436

418437
private void deallocatePoolEntry() {
@@ -437,12 +456,13 @@ private void removeLeased(final PoolEntry<T, C> entry) {
437456
}
438457

439458
private PoolEntry<T, C> getAvailableEntry(final Object state) {
459+
final long now = clock.millis();
440460
for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
441461
final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
442462
final PoolEntry<T, C> entry = ref.getReference();
443463
if (ref.compareAndSet(entry, entry, false, true)) {
444464
it.remove();
445-
if (entry.getExpiryDeadline().isExpired() || !Objects.equals(entry.getState(), state)) {
465+
if (entry.getExpiryDeadline().isBefore(now) || !Objects.equals(entry.getState(), state)) {
446466
entry.discardConnection(CloseMode.GRACEFUL);
447467
deallocatePoolEntry();
448468
} else {
@@ -484,7 +504,7 @@ public PoolEntry<T, C> get(
484504
addLeased(entry);
485505
future.completed(entry);
486506
} else {
487-
pending.add(new LeaseRequest<>(state, requestTimeout, future));
507+
pending.add(new LeaseRequest<>(state, Deadline.calculate(clock.millis(), requestTimeout), future));
488508
if (releaseState != releaseSeqNum.get()) {
489509
servicePendingRequest();
490510
}
@@ -494,7 +514,8 @@ public PoolEntry<T, C> get(
494514

495515
public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
496516
removeLeased(releasedEntry);
497-
if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
517+
final long now = clock.millis();
518+
if (!reusable || releasedEntry.getExpiryDeadline().isBefore(now)) {
498519
releasedEntry.discardConnection(CloseMode.GRACEFUL);
499520
}
500521
if (releasedEntry.hasConnection()) {
@@ -529,8 +550,9 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy
529550
}
530551
final Object state = leaseRequest.getState();
531552
final Deadline deadline = leaseRequest.getDeadline();
553+
final long now = clock.millis();
532554

533-
if (deadline.isExpired()) {
555+
if (deadline.isBefore(now)) {
534556
leaseRequest.failed(DeadlineTimeoutException.from(deadline));
535557
} else {
536558
final long releaseState = releaseSeqNum.get();
@@ -558,6 +580,7 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy
558580
}
559581

560582
public void validatePendingRequests() {
583+
final long now = clock.millis();
561584
final Iterator<LeaseRequest<T, C>> it = pending.iterator();
562585
while (it.hasNext()) {
563586
final LeaseRequest<T, C> request = it.next();
@@ -566,7 +589,7 @@ public void validatePendingRequests() {
566589
it.remove();
567590
} else {
568591
final Deadline deadline = request.getDeadline();
569-
if (deadline.isExpired()) {
592+
if (deadline.isBefore(now)) {
570593
request.failed(DeadlineTimeoutException.from(deadline));
571594
}
572595
if (request.isDone()) {

httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntry.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
*/
2727
package org.apache.hc.core5.pool;
2828

29+
import java.time.Clock;
2930
import java.util.concurrent.atomic.AtomicReference;
3031

31-
import org.apache.hc.core5.function.Supplier;
3232
import org.apache.hc.core5.io.CloseMode;
3333
import org.apache.hc.core5.io.ModalCloseable;
3434
import org.apache.hc.core5.util.Args;
@@ -53,7 +53,7 @@ public final class PoolEntry<T, C extends ModalCloseable> {
5353
private final TimeValue timeToLive;
5454
private final AtomicReference<C> connRef;
5555
private final DisposalCallback<C> disposalCallback;
56-
private final Supplier<Long> currentTimeSupplier;
56+
private final Clock clock;
5757

5858
private volatile Object state;
5959
private volatile long created;
@@ -62,17 +62,17 @@ public final class PoolEntry<T, C extends ModalCloseable> {
6262
private volatile Deadline validityDeadline = Deadline.MIN_VALUE;
6363

6464
PoolEntry(final T route, final TimeValue timeToLive, final DisposalCallback<C> disposalCallback,
65-
final Supplier<Long> currentTimeSupplier) {
65+
final Clock clock) {
6666
super();
6767
this.route = Args.notNull(route, "Route");
6868
this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
6969
this.connRef = new AtomicReference<>();
7070
this.disposalCallback = disposalCallback;
71-
this.currentTimeSupplier = currentTimeSupplier;
71+
this.clock = clock;
7272
}
7373

74-
PoolEntry(final T route, final TimeValue timeToLive, final Supplier<Long> currentTimeSupplier) {
75-
this(route, timeToLive, null, currentTimeSupplier);
74+
PoolEntry(final T route, final TimeValue timeToLive, final Clock clock) {
75+
this(route, timeToLive, null, clock);
7676
}
7777

7878
/**
@@ -103,7 +103,7 @@ public PoolEntry(final T route) {
103103
}
104104

105105
long getCurrentTime() {
106-
return currentTimeSupplier != null ? currentTimeSupplier.get() : System.currentTimeMillis();
106+
return clock != null ? clock.millis() : System.currentTimeMillis();
107107
}
108108

109109
public T getRoute() {

httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
package org.apache.hc.core5.pool;
2828

2929
import java.io.IOException;
30+
import java.time.Clock;
3031
import java.util.ArrayDeque;
3132
import java.util.Deque;
3233
import java.util.HashSet;
@@ -104,6 +105,8 @@ public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implement
104105

105106
private final ScheduledThreadPoolExecutor timeouts;
106107

108+
private final Clock clock;
109+
107110
/**
108111
* Dedicated executor for asynchronous, best-effort disposal.
109112
* Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread.
@@ -131,6 +134,17 @@ public RouteSegmentedConnPool(
131134
final PoolReusePolicy reusePolicy,
132135
final DisposalCallback<C> disposal,
133136
final ConnPoolListener<R> connPoolListener) {
137+
this(defaultMaxPerRoute, maxTotal, timeToLive, reusePolicy, disposal, connPoolListener, Clock.systemUTC());
138+
}
139+
140+
RouteSegmentedConnPool(
141+
final int defaultMaxPerRoute,
142+
final int maxTotal,
143+
final TimeValue timeToLive,
144+
final PoolReusePolicy reusePolicy,
145+
final DisposalCallback<C> disposal,
146+
final ConnPoolListener<R> connPoolListener,
147+
final Clock clock) {
134148

135149
this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5);
136150
this.maxTotal.set(maxTotal > 0 ? maxTotal : 25);
@@ -148,6 +162,8 @@ public RouteSegmentedConnPool(
148162
this.disposal = Args.notNull(disposal, "disposal");
149163
this.connPoolListener = connPoolListener;
150164

165+
this.clock = Args.notNull(clock, "clock");
166+
151167
final ThreadFactory tf = r -> {
152168
final Thread t = new Thread(r, "seg-pool-timeouts");
153169
t.setDaemon(true);
@@ -231,7 +247,7 @@ public Future<PoolEntry<R, C>> lease(
231247
if (hit == null) {
232248
break;
233249
}
234-
final long now = System.currentTimeMillis();
250+
final long now = clock.millis();
235251
if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit, now)) {
236252
discardAndDecr(hit, CloseMode.GRACEFUL);
237253
continue;
@@ -248,7 +264,7 @@ public Future<PoolEntry<R, C>> lease(
248264

249265
// 2) Try to allocate new within caps
250266
if (tryAllocateOne(route, seg)) {
251-
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
267+
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal, clock);
252268
fireOnLease(route);
253269
if (callback != null) {
254270
callback.completed(entry);
@@ -326,7 +342,7 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
326342
return;
327343
}
328344

329-
final long now = System.currentTimeMillis();
345+
final long now = clock.millis();
330346
final boolean stillValid = reusable && !isPastTtl(entry, now) && !entry.getExpiryDeadline().isBefore(now);
331347

332348
if (stillValid) {
@@ -391,7 +407,7 @@ public void close(final CloseMode closeMode) {
391407

392408
@Override
393409
public void closeIdle(final TimeValue idleTime) {
394-
final long cutoff = System.currentTimeMillis()
410+
final long cutoff = clock.millis()
395411
- Math.max(0L, idleTime != null ? idleTime.toMilliseconds() : 0L);
396412

397413
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
@@ -417,7 +433,7 @@ public void closeIdle(final TimeValue idleTime) {
417433

418434
@Override
419435
public void closeExpired() {
420-
final long now = System.currentTimeMillis();
436+
final long now = clock.millis();
421437

422438
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
423439
final R route = e.getKey();
@@ -731,7 +747,7 @@ private void serveRoundRobin(final int budget) {
731747
seg.allocated.decrementAndGet();
732748
totalAllocated.decrementAndGet();
733749
} else {
734-
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
750+
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal, clock);
735751
cancelTimeout(w);
736752
w.complete(entry);
737753
fireOnLease(w.route);

0 commit comments

Comments
 (0)