Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class JdkHttpTransport implements HttpTransport {
private final HttpTransportConfig config;
private final AtomicBoolean closed = new AtomicBoolean(false);

/** HTTP/1.1 client for protocol fallback. */
private final HttpClient http1_1Client;

/**
* Create a new JdkHttpTransport with default configuration.
*/
Expand All @@ -91,6 +94,7 @@ public class JdkHttpTransport implements HttpTransport {
JdkHttpTransport(HttpTransportConfig config) {
this.config = Objects.requireNonNull(config, "config must not be null");
this.client = buildClient(config);
this.http1_1Client = buildClient(config, HttpVersion.HTTP_1_1);
}

/**
Expand All @@ -106,12 +110,17 @@ public class JdkHttpTransport implements HttpTransport {
public JdkHttpTransport(HttpClient client, HttpTransportConfig config) {
this.client = Objects.requireNonNull(client, "client must not be null");
this.config = Objects.requireNonNull(config, "config must not be null");
this.http1_1Client = buildClient(config, HttpVersion.HTTP_1_1);
}

private static HttpClient buildClient(HttpTransportConfig config) {
return buildClient(config, config.getHttpVersion());
}

private static HttpClient buildClient(HttpTransportConfig config, HttpVersion version) {
HttpClient.Builder builder =
HttpClient.newBuilder()
.version(config.getHttpVersion().toJdkHttpVersion())
.version(version.toJdkHttpVersion())
.followRedirects(Redirect.NORMAL)
.connectTimeout(config.getConnectTimeout());

Expand Down Expand Up @@ -177,7 +186,25 @@ public HttpResponse execute(HttpRequest request) throws HttpTransportException {
var jdkRequest = buildJdkRequest(request);

try {
var response = client.send(jdkRequest, BodyHandlers.ofString());
return doSend(client, jdkRequest);
} catch (HttpTransportException e) {
if (isConnectionError(e) && shouldFallback()) {
log.warn("HTTP/2 request failed, retrying with HTTP/1.1: {}", e.getMessage());
try {
return doSend(http1_1Client, jdkRequest);
} catch (HttpTransportException ex) {
// Fallback also failed, throw the original error
throw e;
}
}
throw e;
}
}

private HttpResponse doSend(HttpClient httpClient, java.net.http.HttpRequest jdkRequest)
throws HttpTransportException {
try {
var response = httpClient.send(jdkRequest, BodyHandlers.ofString());
return buildHttpResponse(response);
} catch (IOException e) {
throw new HttpTransportException("HTTP request failed: " + e.getMessage(), e);
Expand All @@ -195,36 +222,25 @@ public Flux<String> stream(HttpRequest request) {

var jdkRequest = buildJdkRequest(request);

// Check status code and read error body immediately when CompletableFuture completes
// to avoid stream being closed before we can read it
CompletableFuture<java.net.http.HttpResponse<InputStream>> future =
client.sendAsync(jdkRequest, BodyHandlers.ofInputStream())
.thenApply(
response -> {
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
// Read error body immediately while stream is still open
String errorBody = readInputStream(response.body());
log.warn(
"HTTP request failed. URL: {} | Status: {} | Error:"
+ " {}",
request.getUrl(),
statusCode,
errorBody);
throw new CompletionException(
new HttpTransportException(
"HTTP request failed with status "
+ statusCode
+ " | "
+ errorBody,
statusCode,
errorBody));
}
return response;
});

return Mono.fromCompletionStage(future)
return Mono.fromCompletionStage(sendAsyncStream(client, jdkRequest, request.getUrl()))
.flatMapMany(response -> processStreamResponse(response, request))
.onErrorResume(
e -> {
if (isStreamConnectionError(e)) {
log.warn(
"HTTP/2 stream request failed, retrying with HTTP/1.1: {}",
e.getMessage());
return Mono.fromCompletionStage(
sendAsyncStream(
http1_1Client,
jdkRequest,
request.getUrl()))
.flatMapMany(
response ->
processStreamResponse(response, request));
}
return Mono.error(e);
})
.publishOn(Schedulers.boundedElastic())
.onErrorMap(
e -> !(e instanceof HttpTransportException),
Expand Down Expand Up @@ -310,6 +326,53 @@ public boolean isClosed() {
return closed.get();
}

private static boolean isConnectionError(HttpTransportException e) {
return !e.isHttpError();
}

private boolean shouldFallback() {
return config.getHttpVersion() == HttpVersion.HTTP_2;
}

private CompletableFuture<java.net.http.HttpResponse<InputStream>> sendAsyncStream(
HttpClient httpClient, java.net.http.HttpRequest jdkRequest, String url) {
return httpClient
.sendAsync(jdkRequest, BodyHandlers.ofInputStream())
.thenApply(
response -> {
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
// Read error body immediately while stream is still open
String errorBody = readInputStream(response.body());
log.warn(
"HTTP request failed. URL: {} | Status: {} | Error: {}",
url,
statusCode,
errorBody);
throw new CompletionException(
new HttpTransportException(
"HTTP request failed with status "
+ statusCode
+ " | "
+ errorBody,
statusCode,
errorBody));
}
return response;
});
}

private boolean isStreamConnectionError(Throwable e) {
if (!shouldFallback()) {
return false;
}
Throwable cause = e instanceof CompletionException ? e.getCause() : e;
if (cause instanceof HttpTransportException hte) {
return !hte.isHttpError();
}
return cause instanceof IOException;
}

private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) {
URI uri;
try {
Expand Down
Loading
Loading