From 38cc559bed99baaf9764fe8fb8885bc4756e20da Mon Sep 17 00:00:00 2001 From: Ophir LOJKINE Date: Wed, 1 Apr 2026 15:19:55 +0200 Subject: [PATCH 1/2] rest5client: cancel http request on interruption When the thread is interrupted, cancel the underlying http request instead of letting it run to completion. This avoids runaway tasks running on the ES cluster while their client has been interrupted and will never read the results. fixes #1195 This is an alternative to https://github.com/elastic/elasticsearch-java/pull/1196 . That pr fixed the issue with any http client, including those outside of our control that have the same issue. This PR focuses on our own rest client. The bug will still be present when the library is used with other http clients that do not cancel requests on interruption. --- .../rest5_client/low_level/Rest5Client.java | 7 +- .../low_level/RestClientInterruptTests.java | 126 ++++++++++++++++++ 2 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientInterruptTests.java diff --git a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java index f14834532b..f0b3a7475b 100644 --- a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java +++ b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java @@ -298,11 +298,12 @@ private Response performRequest(final Iterator nodes, final InternalReques throws IOException { RequestContext context = request.createContextForNextAttempt(nodes.next()); ClassicHttpResponse httpResponse; + Future future = + client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null); try { - httpResponse = client.execute(context.requestProducer, - context.asyncResponseConsumer, - context.context, null).get(); + httpResponse = future.get(); } catch (Exception e) { + future.cancel(true); RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); onFailure(context.node); Exception cause = extractAndWrapCause(e); diff --git a/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientInterruptTests.java b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientInterruptTests.java new file mode 100644 index 0000000000..f80ecf88a6 --- /dev/null +++ b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientInterruptTests.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest5_client.low_level; + +import com.sun.net.httpserver.HttpServer; +import org.apache.hc.core5.http.HttpHost; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies that {@link Rest5Client#performRequest(Request)} cancels the underlying HTTP request + * when the calling thread is interrupted. + */ +public class RestClientInterruptTests { + + private HttpServer httpServer; + private Rest5Client restClient; + private CountDownLatch requestArrived; + private CompletableFuture serverException; + + @BeforeEach + public void setUp() throws Exception { + requestArrived = new CountDownLatch(1); + serverException = new CompletableFuture<>(); + + httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + httpServer.createContext("/slow", exchange -> { + try (exchange; OutputStream out = exchange.getResponseBody()) { + exchange.sendResponseHeaders(200, 0); + requestArrived.countDown(); + + // Stream data slowly for 30 seconds. Throws IOException on client disconnect. + for (int i = 0; i < 30_000; i++) { + out.write(' '); + out.flush(); + LockSupport.parkNanos(1_000_000L); + } + } catch (IOException e) { + serverException.complete(e); + } finally { + serverException.complete(null); + } + }); + httpServer.start(); + + InetSocketAddress address = httpServer.getAddress(); + HttpHost host = new HttpHost("http", address.getHostName(), address.getPort()); + restClient = Rest5Client.builder(host).build(); + } + + @AfterEach + public void tearDown() throws Exception { + restClient.close(); + httpServer.stop(0); + } + + @Test + @Timeout(10) + public void syncRequestShouldCancelOnThreadInterruption() throws Exception { + Thread clientThread = new Thread(() -> { + try { restClient.performRequest(new Request("GET", "/slow")); } catch (Exception e) { } + }); + clientThread.start(); + + assertTrue(requestArrived.await(5, TimeUnit.SECONDS)); + clientThread.interrupt(); + + // The server should get an IOException from writing to the closed connection. + assertInstanceOf(IOException.class, serverException.join(), + "HTTP connection should have been closed after thread interruption"); + } + + @Test + @Timeout(10) + public void asyncRequestShouldCancelOnFutureCancellation() throws Exception { + CompletableFuture responseFuture = new CompletableFuture<>(); + Cancellable cancellable = restClient.performRequestAsync(new Request("GET", "/slow"), new ResponseListener() { + @Override + public void onSuccess(Response response) { + responseFuture.complete(response); + } + + @Override + public void onFailure(Exception exception) { + responseFuture.completeExceptionally(exception); + } + }); + + assertTrue(requestArrived.await(5, TimeUnit.SECONDS)); + cancellable.cancel(); + + assertInstanceOf(IOException.class, serverException.join(), + "HTTP connection should have been closed after future cancellation"); + } +} From 28de62db0981ce8cd6324f2ab1bebe43c5cdafe4 Mon Sep 17 00:00:00 2001 From: Ophir LOJKINE Date: Tue, 7 Apr 2026 18:12:03 +0200 Subject: [PATCH 2/2] client.execute() stays inside the try block addresses https://github.com/elastic/elasticsearch-java/pull/1197#issuecomment-4199594271 --- .../transport/rest5_client/low_level/Rest5Client.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java index f0b3a7475b..5069005f21 100644 --- a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java +++ b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java @@ -298,12 +298,12 @@ private Response performRequest(final Iterator nodes, final InternalReques throws IOException { RequestContext context = request.createContextForNextAttempt(nodes.next()); ClassicHttpResponse httpResponse; - Future future = - client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null); + Future future = null; try { + future = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null); httpResponse = future.get(); } catch (Exception e) { - future.cancel(true); + if (future != null) future.cancel(true); RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); onFailure(context.node); Exception cause = extractAndWrapCause(e);