1+ /*
2+ * ====================================================================
3+ * Licensed to the Apache Software Foundation (ASF) under one
4+ * or more contributor license agreements. See the NOTICE file
5+ * distributed with this work for additional information
6+ * regarding copyright ownership. The ASF licenses this file
7+ * to you under the Apache License, Version 2.0 (the
8+ * "License"); you may not use this file except in compliance
9+ * with the License. You may obtain a copy of the License at
10+ *
11+ * http://www.apache.org/licenses/LICENSE-2.0
12+ *
13+ * Unless required by applicable law or agreed to in writing,
14+ * software distributed under the License is distributed on an
15+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+ * KIND, either express or implied. See the License for the
17+ * specific language governing permissions and limitations
18+ * under the License.
19+ * ====================================================================
20+ *
21+ * This software consists of voluntary contributions made by many
22+ * individuals on behalf of the Apache Software Foundation. For more
23+ * information on the Apache Software Foundation, please see
24+ * <http://www.apache.org/>.
25+ *
26+ */
27+ package org .apache .hc .core5 .pool ;
28+
29+ import java .io .IOException ;
30+ import java .util .concurrent .CountDownLatch ;
31+ import java .util .concurrent .ExecutionException ;
32+ import java .util .concurrent .ExecutorService ;
33+ import java .util .concurrent .Executors ;
34+ import java .util .concurrent .Future ;
35+ import java .util .concurrent .TimeoutException ;
36+ import java .util .concurrent .atomic .AtomicLong ;
37+ import java .util .concurrent .atomic .AtomicReference ;
38+ import java .util .function .Supplier ;
39+ import java .util .stream .Stream ;
40+
41+ import org .apache .hc .core5 .http .SocketModalCloseable ;
42+ import org .apache .hc .core5 .io .CloseMode ;
43+ import org .apache .hc .core5 .util .TimeValue ;
44+ import org .apache .hc .core5 .util .Timeout ;
45+ import org .junit .jupiter .api .Assertions ;
46+ import org .junit .jupiter .params .ParameterizedTest ;
47+ import org .junit .jupiter .params .provider .MethodSource ;
48+
49+ class TestConnPoolLeaseTimeout {
50+
51+ private static final Timeout TIMEOUT = Timeout .ofSeconds (30 );
52+
53+ static final class DummyConn implements SocketModalCloseable {
54+
55+ private volatile Timeout socketTimeout ;
56+
57+ @ Override
58+ public Timeout getSocketTimeout () {
59+ return socketTimeout ;
60+ }
61+
62+ @ Override
63+ public void setSocketTimeout (final Timeout timeout ) {
64+ this .socketTimeout = timeout ;
65+ }
66+
67+ @ Override
68+ public void close (final CloseMode closeMode ) {
69+ }
70+
71+ @ Override
72+ public void close () throws IOException {
73+ }
74+ }
75+
76+ static final class PoolCase {
77+ final String name ;
78+ final Supplier <ManagedConnPool <String , DummyConn >> supplier ;
79+
80+ PoolCase (final String name , final Supplier <ManagedConnPool <String , DummyConn >> supplier ) {
81+ this .name = name ;
82+ this .supplier = supplier ;
83+ }
84+
85+ @ Override
86+ public String toString () {
87+ return name ;
88+ }
89+ }
90+
91+ static Stream <PoolCase > pools () {
92+ return Stream .of (
93+ new PoolCase ("STRICT" , () -> new StrictConnPool <>(1 , 1 )),
94+ new PoolCase ("LAX" , () -> new LaxConnPool <>(1 )),
95+ new PoolCase ("OFFLOCK" , () -> new RouteSegmentedConnPool <>(
96+ 1 ,
97+ 1 ,
98+ TimeValue .NEG_ONE_MILLISECOND ,
99+ PoolReusePolicy .LIFO ,
100+ new DefaultDisposalCallback <>()))
101+ );
102+ }
103+
104+ @ ParameterizedTest (name = "{0}" )
105+ @ MethodSource ("pools" )
106+ @ org .junit .jupiter .api .Timeout (60 )
107+ void testLeaseTimeoutDoesNotLeakLeasedEntries (final PoolCase poolCase ) throws Exception {
108+ final ManagedConnPool <String , DummyConn > pool = poolCase .supplier .get ();
109+
110+ final String route = "route-1" ;
111+ final Timeout requestTimeout = Timeout .ofMicroseconds (1 );
112+
113+ final int concurrentThreads = 10 ;
114+ final CountDownLatch countDownLatch = new CountDownLatch (concurrentThreads );
115+ final AtomicLong n = new AtomicLong (concurrentThreads * 100 );
116+
117+ final ExecutorService executorService = Executors .newFixedThreadPool (concurrentThreads );
118+ final AtomicReference <Exception > unexpectedException = new AtomicReference <>();
119+ try {
120+ for (int i = 0 ; i < concurrentThreads ; i ++) {
121+ executorService .execute (() -> {
122+ try {
123+ while (n .decrementAndGet () > 0 ) {
124+ final Future <PoolEntry <String , DummyConn >> f = pool .lease (route , null , requestTimeout , null );
125+ try {
126+ final PoolEntry <String , DummyConn > entry =
127+ f .get (requestTimeout .getDuration (), requestTimeout .getTimeUnit ());
128+ pool .release (entry , true );
129+ } catch (final InterruptedException ex ) {
130+ Thread .currentThread ().interrupt ();
131+ unexpectedException .compareAndSet (null , ex );
132+ } catch (final TimeoutException ex ) {
133+ f .cancel (true );
134+ } catch (final ExecutionException ex ) {
135+ f .cancel (true );
136+ if (!(ex .getCause () instanceof TimeoutException )) {
137+ unexpectedException .compareAndSet (null , ex );
138+ }
139+ }
140+ }
141+ } finally {
142+ countDownLatch .countDown ();
143+ }
144+ });
145+ }
146+
147+ Assertions .assertTrue (countDownLatch .await (TIMEOUT .getDuration (), TIMEOUT .getTimeUnit ()));
148+ Assertions .assertTrue (n .get () <= 0 );
149+ Assertions .assertNull (unexpectedException .get ());
150+
151+ final PoolStats stats = pool .getStats (route );
152+ Assertions .assertEquals (0 , stats .getLeased ());
153+
154+ } finally {
155+ executorService .shutdownNow ();
156+ executorService .awaitTermination (TIMEOUT .getDuration (), TIMEOUT .getTimeUnit ());
157+ pool .close (CloseMode .GRACEFUL );
158+ }
159+ }
160+ }
0 commit comments