Skip to content

Commit 38cc559

Browse files
committed
rest5client: cancel http request on interruption
When the thread is interrupted, cancel the underlying http request instead of letting it run to completion. This avoids runaway tasks running on the ES cluster while their client has been interrupted and will never read the results. fixes elastic#1195 This is an alternative to elastic#1196 . That pr fixed the issue with any http client, including those outside of our control that have the same issue. This PR focuses on our own rest client. The bug will still be present when the library is used with other http clients that do not cancel requests on interruption.
1 parent 056ed9c commit 38cc559

File tree

2 files changed

+130
-3
lines changed

2 files changed

+130
-3
lines changed

rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,11 +298,12 @@ private Response performRequest(final Iterator<Node> nodes, final InternalReques
298298
throws IOException {
299299
RequestContext context = request.createContextForNextAttempt(nodes.next());
300300
ClassicHttpResponse httpResponse;
301+
Future<ClassicHttpResponse> future =
302+
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null);
301303
try {
302-
httpResponse = client.execute(context.requestProducer,
303-
context.asyncResponseConsumer,
304-
context.context, null).get();
304+
httpResponse = future.get();
305305
} catch (Exception e) {
306+
future.cancel(true);
306307
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
307308
onFailure(context.node);
308309
Exception cause = extractAndWrapCause(e);
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package co.elastic.clients.transport.rest5_client.low_level;
21+
22+
import com.sun.net.httpserver.HttpServer;
23+
import org.apache.hc.core5.http.HttpHost;
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.Timeout;
28+
29+
import java.io.IOException;
30+
import java.io.OutputStream;
31+
import java.net.InetAddress;
32+
import java.net.InetSocketAddress;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.locks.LockSupport;
37+
38+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
39+
import static org.junit.jupiter.api.Assertions.assertTrue;
40+
41+
/**
42+
* Verifies that {@link Rest5Client#performRequest(Request)} cancels the underlying HTTP request
43+
* when the calling thread is interrupted.
44+
*/
45+
public class RestClientInterruptTests {
46+
47+
private HttpServer httpServer;
48+
private Rest5Client restClient;
49+
private CountDownLatch requestArrived;
50+
private CompletableFuture<Exception> serverException;
51+
52+
@BeforeEach
53+
public void setUp() throws Exception {
54+
requestArrived = new CountDownLatch(1);
55+
serverException = new CompletableFuture<>();
56+
57+
httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
58+
httpServer.createContext("/slow", exchange -> {
59+
try (exchange; OutputStream out = exchange.getResponseBody()) {
60+
exchange.sendResponseHeaders(200, 0);
61+
requestArrived.countDown();
62+
63+
// Stream data slowly for 30 seconds. Throws IOException on client disconnect.
64+
for (int i = 0; i < 30_000; i++) {
65+
out.write(' ');
66+
out.flush();
67+
LockSupport.parkNanos(1_000_000L);
68+
}
69+
} catch (IOException e) {
70+
serverException.complete(e);
71+
} finally {
72+
serverException.complete(null);
73+
}
74+
});
75+
httpServer.start();
76+
77+
InetSocketAddress address = httpServer.getAddress();
78+
HttpHost host = new HttpHost("http", address.getHostName(), address.getPort());
79+
restClient = Rest5Client.builder(host).build();
80+
}
81+
82+
@AfterEach
83+
public void tearDown() throws Exception {
84+
restClient.close();
85+
httpServer.stop(0);
86+
}
87+
88+
@Test
89+
@Timeout(10)
90+
public void syncRequestShouldCancelOnThreadInterruption() throws Exception {
91+
Thread clientThread = new Thread(() -> {
92+
try { restClient.performRequest(new Request("GET", "/slow")); } catch (Exception e) { }
93+
});
94+
clientThread.start();
95+
96+
assertTrue(requestArrived.await(5, TimeUnit.SECONDS));
97+
clientThread.interrupt();
98+
99+
// The server should get an IOException from writing to the closed connection.
100+
assertInstanceOf(IOException.class, serverException.join(),
101+
"HTTP connection should have been closed after thread interruption");
102+
}
103+
104+
@Test
105+
@Timeout(10)
106+
public void asyncRequestShouldCancelOnFutureCancellation() throws Exception {
107+
CompletableFuture<Response> responseFuture = new CompletableFuture<>();
108+
Cancellable cancellable = restClient.performRequestAsync(new Request("GET", "/slow"), new ResponseListener() {
109+
@Override
110+
public void onSuccess(Response response) {
111+
responseFuture.complete(response);
112+
}
113+
114+
@Override
115+
public void onFailure(Exception exception) {
116+
responseFuture.completeExceptionally(exception);
117+
}
118+
});
119+
120+
assertTrue(requestArrived.await(5, TimeUnit.SECONDS));
121+
cancellable.cancel();
122+
123+
assertInstanceOf(IOException.class, serverException.join(),
124+
"HTTP connection should have been closed after future cancellation");
125+
}
126+
}

0 commit comments

Comments
 (0)