Skip to content

Commit 97067f4

Browse files
committed
Implement shared FIFO execution queue for H2 client
Introduce one shared per-client queue to cap concurrently executing requests and enqueue overflow. Ensure queued starts release the slot on any terminal path, including synchronous start failures.
1 parent d07ff4d commit 97067f4

11 files changed

Lines changed: 1056 additions & 323 deletions

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.Closeable;
3030
import java.util.List;
3131
import java.util.concurrent.ThreadFactory;
32-
import java.util.concurrent.atomic.AtomicInteger;
3332

3433
import org.apache.hc.client5.http.HttpRoute;
3534
import org.apache.hc.client5.http.async.AsyncExecRuntime;
@@ -55,11 +54,6 @@
5554

5655
/**
5756
* Internal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}.
58-
* <p>
59-
* Concurrent message exchanges with the same connection route executed by
60-
* this client will get automatically multiplexed over a single physical HTTP/2
61-
* connection.
62-
* </p>
6357
*
6458
* @since 5.0
6559
*/
@@ -70,8 +64,12 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
7064
private static final Logger LOG = LoggerFactory.getLogger(InternalH2AsyncClient.class);
7165
private final HttpRoutePlanner routePlanner;
7266
private final InternalH2ConnPool connPool;
73-
private final int maxQueuedRequests;
74-
private final AtomicInteger queuedRequests;
67+
68+
/**
69+
* One shared FIFO queue per client instance (Oleg).
70+
* null means "unlimited" / no throttling.
71+
*/
72+
private final SharedRequestExecutionQueue executionQueue;
7573

7674
InternalH2AsyncClient(
7775
final DefaultConnectingIOReactor ioReactor,
@@ -87,25 +85,22 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
8785
final RequestConfig defaultConfig,
8886
final List<Closeable> closeables,
8987
final int maxQueuedRequests) {
88+
9089
super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
9190
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, HttpClientContext::castOrCreate,
9291
defaultConfig, closeables);
9392
this.connPool = connPool;
9493
this.routePlanner = routePlanner;
95-
this.maxQueuedRequests = maxQueuedRequests;
96-
this.queuedRequests = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
94+
this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null;
9795
}
9896

9997
@Override
10098
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
101-
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, maxQueuedRequests, queuedRequests);
99+
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, executionQueue);
102100
}
103101

104102
@Override
105-
HttpRoute determineRoute(
106-
final HttpHost httpHost,
107-
final HttpRequest request,
108-
final HttpClientContext clientContext) throws HttpException {
103+
HttpRoute determineRoute(final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
109104
final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext);
110105
if (route.isTunnelled()) {
111106
throw new HttpException("HTTP/2 tunneling not supported");

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
package org.apache.hc.client5.http.impl.async;
2929

3030
import java.io.InterruptedIOException;
31-
import java.util.concurrent.RejectedExecutionException;
32-
import java.util.concurrent.atomic.AtomicInteger;
3331
import java.util.concurrent.atomic.AtomicReference;
3432

3533
import org.apache.hc.client5.http.EndpointInfo;
@@ -63,30 +61,27 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
6361
private final InternalH2ConnPool connPool;
6462
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
6563
private final AtomicReference<Endpoint> sessionRef;
66-
private final int maxQueued;
67-
private final AtomicInteger sharedQueued;
64+
private final SharedRequestExecutionQueue executionQueue;
6865
private volatile boolean reusable;
6966

7067
InternalH2AsyncExecRuntime(
7168
final Logger log,
7269
final InternalH2ConnPool connPool,
7370
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
74-
this(log, connPool, pushHandlerFactory, -1, null);
71+
this(log, connPool, pushHandlerFactory, null);
7572
}
7673

7774
InternalH2AsyncExecRuntime(
7875
final Logger log,
7976
final InternalH2ConnPool connPool,
8077
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
81-
final int maxQueued,
82-
final AtomicInteger sharedQueued) {
78+
final SharedRequestExecutionQueue executionQueue) {
8379
super();
8480
this.log = log;
8581
this.connPool = connPool;
8682
this.pushHandlerFactory = pushHandlerFactory;
8783
this.sessionRef = new AtomicReference<>();
88-
this.maxQueued = maxQueued;
89-
this.sharedQueued = sharedQueued;
84+
this.executionQueue = executionQueue;
9085
}
9186

9287
@Override
@@ -179,7 +174,6 @@ public boolean isEndpointConnected() {
179174
return endpoint != null && endpoint.session.isOpen();
180175
}
181176

182-
183177
Endpoint ensureValid() {
184178
final Endpoint endpoint = sessionRef.get();
185179
if (endpoint == null) {
@@ -261,49 +255,55 @@ public EndpointInfo getEndpointInfo() {
261255
return null;
262256
}
263257

264-
private boolean tryAcquireSlot() {
265-
if (sharedQueued == null || maxQueued <= 0) {
266-
return true;
258+
@Override
259+
public Cancellable execute(
260+
final String id,
261+
final AsyncClientExchangeHandler exchangeHandler,
262+
final HttpClientContext context) {
263+
264+
final Endpoint endpoint = ensureValid();
265+
final ComplexCancellable complexCancellable = new ComplexCancellable();
266+
267+
if (executionQueue == null) {
268+
startExecution(id, endpoint, exchangeHandler, context, complexCancellable);
269+
return complexCancellable;
267270
}
268-
for (;;) {
269-
final int q = sharedQueued.get();
270-
if (q >= maxQueued) {
271-
return false;
272-
}
273-
if (sharedQueued.compareAndSet(q, q + 1)) {
271+
272+
final Cancellable queued = executionQueue.enqueue(
273+
() -> {
274+
final AsyncClientExchangeHandler wrapped =
275+
new ReleasingAsyncClientExchangeHandler(exchangeHandler, executionQueue::completed);
276+
try {
277+
startExecution(id, endpoint, wrapped, context, complexCancellable);
278+
} catch (final RuntimeException ex) {
279+
wrapped.failed(ex);
280+
}
281+
},
282+
exchangeHandler::cancel);
283+
284+
return () -> {
285+
if (queued.cancel()) {
274286
return true;
275287
}
276-
}
277-
}
278-
279-
private void releaseSlot() {
280-
if (sharedQueued != null && maxQueued > 0) {
281-
sharedQueued.decrementAndGet();
282-
}
288+
return complexCancellable.cancel();
289+
};
283290
}
284291

285-
@Override
286-
public Cancellable execute(
292+
private void startExecution(
287293
final String id,
288-
final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
289-
final Endpoint endpoint = ensureValid();
290-
if (!tryAcquireSlot()) {
291-
exchangeHandler.failed(new RejectedExecutionException(
292-
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
293-
return Operations.nonCancellable();
294-
}
295-
final AsyncClientExchangeHandler actual = sharedQueued != null
296-
? new ReleasingAsyncClientExchangeHandler(exchangeHandler, this::releaseSlot)
297-
: exchangeHandler;
298-
final ComplexCancellable complexCancellable = new ComplexCancellable();
294+
final Endpoint endpoint,
295+
final AsyncClientExchangeHandler exchangeHandler,
296+
final HttpClientContext context,
297+
final ComplexCancellable complexCancellable) {
298+
299299
final IOSession session = endpoint.session;
300300
if (session.isOpen()) {
301301
if (log.isDebugEnabled()) {
302302
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
303303
}
304304
context.setProtocolVersion(HttpVersion.HTTP_2);
305305
session.enqueue(
306-
new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
306+
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
307307
Command.Priority.NORMAL);
308308
} else {
309309
final HttpRoute route = endpoint.route;
@@ -321,23 +321,22 @@ public void completed(final IOSession ioSession) {
321321
}
322322
context.setProtocolVersion(HttpVersion.HTTP_2);
323323
ioSession.enqueue(
324-
new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
324+
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
325325
Command.Priority.NORMAL);
326326
}
327327

328328
@Override
329329
public void failed(final Exception ex) {
330-
actual.failed(ex);
330+
exchangeHandler.failed(ex);
331331
}
332332

333333
@Override
334334
public void cancelled() {
335-
actual.failed(new InterruptedIOException());
335+
exchangeHandler.failed(new InterruptedIOException());
336336
}
337337

338338
});
339339
}
340-
return complexCancellable;
341340
}
342341

343342
@Override
@@ -369,7 +368,7 @@ public String getId() {
369368

370369
@Override
371370
public AsyncExecRuntime fork() {
372-
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, maxQueued, sharedQueued);
371+
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, executionQueue);
373372
}
374373

375374
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.Closeable;
3030
import java.util.List;
3131
import java.util.concurrent.ThreadFactory;
32-
import java.util.concurrent.atomic.AtomicInteger;
3332
import java.util.function.Function;
3433

3534
import org.apache.hc.client5.http.HttpRoute;
@@ -76,8 +75,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
7675
private final AsyncClientConnectionManager manager;
7776
private final HttpRoutePlanner routePlanner;
7877
private final TlsConfig tlsConfig;
79-
private final int maxQueuedRequests;
80-
private final AtomicInteger queuedCounter;
78+
79+
/**
80+
* One shared FIFO queue per client instance.
81+
* null means "unlimited" / no throttling.
82+
*/
83+
private final SharedRequestExecutionQueue executionQueue;
8184

8285
InternalHttpAsyncClient(
8386
final DefaultConnectingIOReactor ioReactor,
@@ -101,13 +104,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
101104
this.manager = manager;
102105
this.routePlanner = routePlanner;
103106
this.tlsConfig = tlsConfig;
104-
this.maxQueuedRequests = maxQueuedRequests;
105-
this.queuedCounter = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
107+
this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null;
106108
}
107109

108110
@Override
109111
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
110-
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, maxQueuedRequests, queuedCounter);
112+
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, executionQueue);
111113
}
112114

113115
@Override

0 commit comments

Comments
 (0)