Skip to content

Commit 90770d4

Browse files
committed
Release late-completed lease entries on timeout, interruption, and cancel
1 parent 1865484 commit 90770d4

3 files changed

Lines changed: 155 additions & 63 deletions

File tree

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.ExecutionException;
3434
import java.util.concurrent.ExecutorService;
3535
import java.util.concurrent.Executors;
36+
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
3738
import java.util.concurrent.atomic.AtomicLong;
3839
import java.util.function.Consumer;
@@ -442,4 +443,53 @@ void testConnectionRequestTimeout() throws Exception {
442443
connManager.close();
443444
}
444445

446+
@Test
447+
void testConnectionRequestCancelLateLeaseReleased() throws Exception {
448+
configureServer(bootstrap -> bootstrap
449+
.register("/random/*", new RandomHandler()));
450+
final HttpHost target = startServer();
451+
452+
connManager.setMaxTotal(1);
453+
454+
final HttpRoute route = new HttpRoute(target, null, false);
455+
final Timeout t = Timeout.ofSeconds(5);
456+
457+
final LeaseRequest holdRequest = connManager.lease("hold", route, t, null);
458+
final ConnectionEndpoint heldEndpoint = holdRequest.get(t);
459+
460+
final LeaseRequest pendingRequest = connManager.lease("pending", route, t, null);
461+
462+
connManager.release(heldEndpoint, null, null);
463+
464+
PoolStats stats;
465+
final long deadline1 = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
466+
for (;;) {
467+
stats = connManager.getStats(route);
468+
if (stats.getLeased() == 1) {
469+
break;
470+
}
471+
if (System.nanoTime() > deadline1) {
472+
break;
473+
}
474+
Thread.yield();
475+
}
476+
Assertions.assertEquals(1, stats.getLeased(), "Expected pending lease to complete and become leased");
477+
Assertions.assertFalse(pendingRequest.cancel(), "Expected cancel() to lose the race once lease is completed");
478+
479+
final long deadline2 = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
480+
for (;;) {
481+
stats = connManager.getStats(route);
482+
if (stats.getLeased() == 0) {
483+
break;
484+
}
485+
if (System.nanoTime() > deadline2) {
486+
break;
487+
}
488+
Thread.yield();
489+
}
490+
Assertions.assertEquals(0, stats.getLeased(), "Late-completed lease must not remain stranded after cancel()");
491+
492+
connManager.close();
493+
}
494+
445495
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.IOException;
3030
import java.nio.file.Path;
3131
import java.util.Set;
32-
import java.util.concurrent.CancellationException;
3332
import java.util.concurrent.ExecutionException;
3433
import java.util.concurrent.Future;
3534
import java.util.concurrent.TimeoutException;
@@ -57,6 +56,7 @@
5756
import org.apache.hc.core5.annotation.Contract;
5857
import org.apache.hc.core5.annotation.Internal;
5958
import org.apache.hc.core5.annotation.ThreadingBehavior;
59+
import org.apache.hc.core5.concurrent.FutureCallback;
6060
import org.apache.hc.core5.function.Resolver;
6161
import org.apache.hc.core5.http.ClassicHttpRequest;
6262
import org.apache.hc.core5.http.ClassicHttpResponse;
@@ -370,7 +370,36 @@ public LeaseRequest lease(
370370
if (LOG.isDebugEnabled()) {
371371
LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
372372
}
373-
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
373+
final AtomicBoolean aborted = new AtomicBoolean(false);
374+
final AtomicBoolean resultObtained = new AtomicBoolean(false);
375+
final AtomicReference<PoolEntry<HttpRoute, ManagedHttpClientConnection>> lateEntryRef = new AtomicReference<>();
376+
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(
377+
route,
378+
state,
379+
requestTimeout,
380+
new FutureCallback<PoolEntry<HttpRoute, ManagedHttpClientConnection>>() {
381+
382+
@Override
383+
public void completed(final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry) {
384+
lateEntryRef.set(poolEntry);
385+
if (aborted.get()) {
386+
if (lateEntryRef.compareAndSet(poolEntry, null)) {
387+
pool.release(poolEntry, false);
388+
}
389+
} else if (resultObtained.get()) {
390+
lateEntryRef.compareAndSet(poolEntry, null);
391+
}
392+
}
393+
394+
@Override
395+
public void failed(final Exception ex) {
396+
}
397+
398+
@Override
399+
public void cancelled() {
400+
}
401+
402+
});
374403
return new LeaseRequest() {
375404
// Using a ReentrantLock specific to each LeaseRequest instance to maintain the original
376405
// synchronization semantics. This ensures that each LeaseRequest has its own unique lock.
@@ -389,19 +418,14 @@ public ConnectionEndpoint get(
389418
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
390419
try {
391420
poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
421+
resultObtained.set(true);
422+
lateEntryRef.compareAndSet(poolEntry, null);
392423
} catch (final TimeoutException | InterruptedException ex) {
393-
if (!leaseFuture.cancel(true)) {
394-
try {
395-
final PoolEntry<HttpRoute, ManagedHttpClientConnection> latePoolEntry = leaseFuture.get(
396-
Timeout.ZERO_MILLISECONDS.getDuration(),
397-
Timeout.ZERO_MILLISECONDS.getTimeUnit());
398-
if (latePoolEntry != null) {
399-
pool.release(latePoolEntry, false);
400-
}
401-
} catch (final TimeoutException | ExecutionException | CancellationException ignore) {
402-
} catch (final InterruptedException interrupted) {
403-
Thread.currentThread().interrupt();
404-
}
424+
aborted.set(true);
425+
leaseFuture.cancel(true);
426+
final PoolEntry<HttpRoute, ManagedHttpClientConnection> latePoolEntry = lateEntryRef.getAndSet(null);
427+
if (latePoolEntry != null) {
428+
pool.release(latePoolEntry, false);
405429
}
406430
throw ex;
407431
}
@@ -480,7 +504,18 @@ public ConnectionEndpoint get(
480504

481505
@Override
482506
public boolean cancel() {
483-
return leaseFuture.cancel(true);
507+
lock.lock();
508+
try {
509+
aborted.set(true);
510+
final boolean cancelled = leaseFuture.cancel(true);
511+
final PoolEntry<HttpRoute, ManagedHttpClientConnection> latePoolEntry = lateEntryRef.getAndSet(null);
512+
if (latePoolEntry != null) {
513+
pool.release(latePoolEntry, false);
514+
}
515+
return cancelled;
516+
} finally {
517+
lock.unlock();
518+
}
484519
}
485520

486521
};

httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
5151
import org.apache.hc.client5.http.protocol.HttpClientContext;
5252
import org.apache.hc.client5.http.ssl.TlsSocketStrategy;
53+
import org.apache.hc.core5.concurrent.FutureCallback;
5354
import org.apache.hc.core5.http.HttpHost;
5455
import org.apache.hc.core5.http.config.Lookup;
5556
import org.apache.hc.core5.http.io.SocketConfig;
@@ -60,6 +61,7 @@
6061
import org.junit.jupiter.api.Assertions;
6162
import org.junit.jupiter.api.BeforeEach;
6263
import org.junit.jupiter.api.Test;
64+
import org.mockito.ArgumentCaptor;
6365
import org.mockito.Mock;
6466
import org.mockito.Mockito;
6567
import org.mockito.MockitoAnnotations;
@@ -112,10 +114,10 @@ void testLeaseRelease() throws Exception {
112114
Mockito.when(conn.isConsistent()).thenReturn(true);
113115
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
114116
Mockito.when(pool.lease(
115-
Mockito.eq(route),
116-
Mockito.eq(null),
117-
Mockito.any(),
118-
Mockito.eq(null)))
117+
Mockito.eq(route),
118+
Mockito.eq(null),
119+
Mockito.any(),
120+
Mockito.any(FutureCallback.class)))
119121
.thenReturn(future);
120122

121123
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
@@ -137,10 +139,10 @@ void testReleaseRouteIncomplete() throws Exception {
137139

138140
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
139141
Mockito.when(pool.lease(
140-
Mockito.eq(route),
141-
Mockito.eq(null),
142-
Mockito.any(),
143-
Mockito.eq(null)))
142+
Mockito.eq(route),
143+
Mockito.eq(null),
144+
Mockito.any(),
145+
Mockito.any(FutureCallback.class)))
144146
.thenReturn(future);
145147

146148
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
@@ -160,10 +162,10 @@ void testLeaseFutureTimeout() throws Exception {
160162

161163
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenThrow(new TimeoutException());
162164
Mockito.when(pool.lease(
163-
Mockito.eq(route),
164-
Mockito.eq(null),
165-
Mockito.any(),
166-
Mockito.eq(null)))
165+
Mockito.eq(route),
166+
Mockito.eq(null),
167+
Mockito.any(),
168+
Mockito.any(FutureCallback.class)))
167169
.thenReturn(future);
168170

169171
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
@@ -177,23 +179,27 @@ void testLeaseFutureTimeoutLateLeaseReleased() throws Exception {
177179
final HttpRoute route = new HttpRoute(target);
178180
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
179181

180-
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenThrow(new TimeoutException());
181-
Mockito.when(future.cancel(true)).thenReturn(false);
182-
Mockito.when(future.get(0L, TimeUnit.MILLISECONDS)).thenReturn(entry);
182+
final ArgumentCaptor<FutureCallback> callbackCaptor = ArgumentCaptor.forClass(FutureCallback.class);
183+
183184
Mockito.when(pool.lease(
184-
Mockito.eq(route),
185-
Mockito.eq(null),
186-
Mockito.any(),
187-
Mockito.eq(null)))
185+
Mockito.eq(route),
186+
Mockito.eq(null),
187+
Mockito.any(),
188+
callbackCaptor.capture()))
188189
.thenReturn(future);
190+
Mockito.when(future.cancel(true)).thenReturn(false);
191+
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenAnswer(invocation -> {
192+
callbackCaptor.getValue().completed(entry);
193+
throw new TimeoutException();
194+
});
189195

190196
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
191197
Assertions.assertThrows(TimeoutException.class, () ->
192198
connRequest1.get(Timeout.ofSeconds(1)));
193199

194200
Mockito.verify(future).cancel(true);
195-
Mockito.verify(future).get(0L, TimeUnit.MILLISECONDS);
196201
Mockito.verify(pool).release(entry, false);
202+
Mockito.verify(future, Mockito.never()).get(0L, TimeUnit.MILLISECONDS);
197203
}
198204

199205
@Test
@@ -202,23 +208,27 @@ void testLeaseFutureInterruptedLateLeaseReleased() throws Exception {
202208
final HttpRoute route = new HttpRoute(target);
203209
final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND);
204210

205-
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenThrow(new InterruptedException());
206-
Mockito.when(future.cancel(true)).thenReturn(false);
207-
Mockito.when(future.get(0L, TimeUnit.MILLISECONDS)).thenReturn(entry);
211+
final ArgumentCaptor<FutureCallback> callbackCaptor = ArgumentCaptor.forClass(FutureCallback.class);
212+
208213
Mockito.when(pool.lease(
209-
Mockito.eq(route),
210-
Mockito.eq(null),
211-
Mockito.any(),
212-
Mockito.eq(null)))
214+
Mockito.eq(route),
215+
Mockito.eq(null),
216+
Mockito.any(),
217+
callbackCaptor.capture()))
213218
.thenReturn(future);
219+
Mockito.when(future.cancel(true)).thenReturn(false);
220+
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenAnswer(invocation -> {
221+
callbackCaptor.getValue().completed(entry);
222+
throw new InterruptedException();
223+
});
214224

215225
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
216226
Assertions.assertThrows(InterruptedException.class, () ->
217227
connRequest1.get(Timeout.ofSeconds(1)));
218228

219229
Mockito.verify(future).cancel(true);
220-
Mockito.verify(future).get(0L, TimeUnit.MILLISECONDS);
221230
Mockito.verify(pool).release(entry, false);
231+
Mockito.verify(future, Mockito.never()).get(0L, TimeUnit.MILLISECONDS);
222232
}
223233

224234
@Test
@@ -231,10 +241,10 @@ void testReleaseReusable() throws Exception {
231241

232242
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
233243
Mockito.when(pool.lease(
234-
Mockito.eq(route),
235-
Mockito.eq(null),
236-
Mockito.any(),
237-
Mockito.eq(null)))
244+
Mockito.eq(route),
245+
Mockito.eq(null),
246+
Mockito.any(),
247+
Mockito.any(FutureCallback.class)))
238248
.thenReturn(future);
239249
Mockito.when(conn.isOpen()).thenReturn(true);
240250
Mockito.when(conn.isConsistent()).thenReturn(true);
@@ -260,10 +270,10 @@ void testReleaseNonReusable() throws Exception {
260270

261271
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
262272
Mockito.when(pool.lease(
263-
Mockito.eq(route),
264-
Mockito.eq(null),
265-
Mockito.any(),
266-
Mockito.eq(null)))
273+
Mockito.eq(route),
274+
Mockito.eq(null),
275+
Mockito.any(),
276+
Mockito.any(FutureCallback.class)))
267277
.thenReturn(future);
268278
Mockito.when(conn.isOpen()).thenReturn(Boolean.FALSE);
269279

@@ -291,10 +301,10 @@ void testTargetConnect() throws Exception {
291301
Mockito.when(conn.isOpen()).thenReturn(false);
292302
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
293303
Mockito.when(pool.lease(
294-
Mockito.eq(route),
295-
Mockito.eq(null),
296-
Mockito.any(),
297-
Mockito.eq(null)))
304+
Mockito.eq(route),
305+
Mockito.eq(null),
306+
Mockito.any(),
307+
Mockito.any(FutureCallback.class)))
298308
.thenReturn(future);
299309

300310
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
@@ -358,10 +368,10 @@ void testProxyConnectAndUpgrade() throws Exception {
358368
Mockito.when(conn.isOpen()).thenReturn(false);
359369
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
360370
Mockito.when(pool.lease(
361-
Mockito.eq(route),
362-
Mockito.eq(null),
363-
Mockito.any(),
364-
Mockito.eq(null)))
371+
Mockito.eq(route),
372+
Mockito.eq(null),
373+
Mockito.any(),
374+
Mockito.any(FutureCallback.class)))
365375
.thenReturn(future);
366376

367377
final LeaseRequest connRequest1 = mgr.lease("some-id", route, null);
@@ -426,7 +436,6 @@ void testLeaseAfterShutdown() {
426436
}, "Attempting to lease a connection after shutdown should throw an exception.");
427437
}
428438

429-
430439
@Test
431440
void testIsShutdown() {
432441
// Setup phase
@@ -442,7 +451,6 @@ void testIsShutdown() {
442451
Assertions.assertTrue(mgr.isClosed(), "Connection manager should be shutdown after close() is called.");
443452
}
444453

445-
446454
@Test
447455
void testConcurrentShutdown() throws InterruptedException {
448456
final ExecutorService executor = Executors.newFixedThreadPool(2);
@@ -455,5 +463,4 @@ void testConcurrentShutdown() throws InterruptedException {
455463
Assertions.assertTrue(mgr.isClosed(), "Connection manager should be shutdown after concurrent calls to shutdown.");
456464
}
457465

458-
459-
}
466+
}

0 commit comments

Comments
 (0)