From f567fd8156f7eb724fd4dde159d82be9fd4c8efd Mon Sep 17 00:00:00 2001 From: wuyujie <805937849@qq.com> Date: Tue, 5 May 2026 16:18:51 +0800 Subject: [PATCH] fix(transport): add HTTP/2 to HTTP/1.1 automatic fallback for connection errors When HTTP/2 connections are dropped by cloud infrastructure (e.g. Alibaba Cloud SLB GOAWAY, RST_STREAM due to rate limiting),the transport now automatically retries the request with HTTP/1.1 instead of failing immediately. Previously, such connection-level errors propagated directly to the caller with no protocol fallback. Fixes #1287 1.0.12-SNAPSHOT **Background:** Alibaba Cloud SLB/ALB enforces a default limit of concurrent streams per HTTP/2 connection. When this limit is exceeded, the load balancer sends GOAWAY or RST_STREAM frames, causing the JDK HttpClient to throw IOExceptions without an HTTP status code. Other cloud providers (Tencent Cloud CLB, Huawei Cloud ELB) and enterprise proxies exhibit similar behavior. JDK HttpClient's built-in ALPN negotiation cannot handle these mid-connection failures since the HTTP/2 connection was already successfully negotiated. **Changes:** - `JdkHttpTransport`: Added `http1_1Client` field, built eagerly in all three constructors via `buildClient(config, HttpVersion.HTTP_1_1)`. - `execute()`: Catches `HttpTransportException` with no status code when HTTP/2 is configured, then retries via `doSend(http1_1Client, jdkRequest)`. If the fallback also fails, the original error is thrown to preserve the root cause. - `stream()`: Added `onErrorResume` in the reactive chain that catches connection- level errors (IOException or HttpTransportException without status code) and retries with the HTTP/1.1 client via `sendAsyncStream()`. - Extracted `doSend()` and `sendAsyncStream()` helpers that accept a `HttpClient` parameter, making both protocol paths share the same execution logic. - `buildClient()` overloaded to accept an explicit `HttpVersion` parameter. - Added 7 tests in `JdkHttpTransportTest` covering the full decision tree: HTTP/2 fallback success (execute + stream), HTTP/1.1 normal success (execute + stream), HTTP/1.1 no fallback, HTTP/2 no fallback for HTTP errors, and fallback failure propagating original error. - [x] Code has been formatted with `mvn spotless:apply` - [x] All tests are passing (`mvn test`) - [x] Javadoc comments are complete and follow project conventions - [ ] Related documentation has been updated - [x] Code is ready for review --- .../model/transport/JdkHttpTransport.java | 125 ++++++--- .../model/transport/JdkHttpTransportTest.java | 242 +++++++++++++++++- 2 files changed, 335 insertions(+), 32 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 5bb749e64..99e0a8735 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -75,6 +75,9 @@ public class JdkHttpTransport implements HttpTransport { private final HttpTransportConfig config; private final AtomicBoolean closed = new AtomicBoolean(false); + /** HTTP/1.1 client for protocol fallback. */ + private final HttpClient http1_1Client; + /** * Create a new JdkHttpTransport with default configuration. */ @@ -91,6 +94,7 @@ public class JdkHttpTransport implements HttpTransport { JdkHttpTransport(HttpTransportConfig config) { this.config = Objects.requireNonNull(config, "config must not be null"); this.client = buildClient(config); + this.http1_1Client = buildClient(config, HttpVersion.HTTP_1_1); } /** @@ -106,12 +110,17 @@ public class JdkHttpTransport implements HttpTransport { public JdkHttpTransport(HttpClient client, HttpTransportConfig config) { this.client = Objects.requireNonNull(client, "client must not be null"); this.config = Objects.requireNonNull(config, "config must not be null"); + this.http1_1Client = buildClient(config, HttpVersion.HTTP_1_1); } private static HttpClient buildClient(HttpTransportConfig config) { + return buildClient(config, config.getHttpVersion()); + } + + private static HttpClient buildClient(HttpTransportConfig config, HttpVersion version) { HttpClient.Builder builder = HttpClient.newBuilder() - .version(config.getHttpVersion().toJdkHttpVersion()) + .version(version.toJdkHttpVersion()) .followRedirects(Redirect.NORMAL) .connectTimeout(config.getConnectTimeout()); @@ -177,7 +186,25 @@ public HttpResponse execute(HttpRequest request) throws HttpTransportException { var jdkRequest = buildJdkRequest(request); try { - var response = client.send(jdkRequest, BodyHandlers.ofString()); + return doSend(client, jdkRequest); + } catch (HttpTransportException e) { + if (isConnectionError(e) && shouldFallback()) { + log.warn("HTTP/2 request failed, retrying with HTTP/1.1: {}", e.getMessage()); + try { + return doSend(http1_1Client, jdkRequest); + } catch (HttpTransportException ex) { + // Fallback also failed, throw the original error + throw e; + } + } + throw e; + } + } + + private HttpResponse doSend(HttpClient httpClient, java.net.http.HttpRequest jdkRequest) + throws HttpTransportException { + try { + var response = httpClient.send(jdkRequest, BodyHandlers.ofString()); return buildHttpResponse(response); } catch (IOException e) { throw new HttpTransportException("HTTP request failed: " + e.getMessage(), e); @@ -195,36 +222,25 @@ public Flux stream(HttpRequest request) { var jdkRequest = buildJdkRequest(request); - // Check status code and read error body immediately when CompletableFuture completes - // to avoid stream being closed before we can read it - CompletableFuture> future = - client.sendAsync(jdkRequest, BodyHandlers.ofInputStream()) - .thenApply( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - // Read error body immediately while stream is still open - String errorBody = readInputStream(response.body()); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error:" - + " {}", - request.getUrl(), - statusCode, - errorBody); - throw new CompletionException( - new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBody, - statusCode, - errorBody)); - } - return response; - }); - - return Mono.fromCompletionStage(future) + return Mono.fromCompletionStage(sendAsyncStream(client, jdkRequest, request.getUrl())) .flatMapMany(response -> processStreamResponse(response, request)) + .onErrorResume( + e -> { + if (isStreamConnectionError(e)) { + log.warn( + "HTTP/2 stream request failed, retrying with HTTP/1.1: {}", + e.getMessage()); + return Mono.fromCompletionStage( + sendAsyncStream( + http1_1Client, + jdkRequest, + request.getUrl())) + .flatMapMany( + response -> + processStreamResponse(response, request)); + } + return Mono.error(e); + }) .publishOn(Schedulers.boundedElastic()) .onErrorMap( e -> !(e instanceof HttpTransportException), @@ -310,6 +326,53 @@ public boolean isClosed() { return closed.get(); } + private static boolean isConnectionError(HttpTransportException e) { + return !e.isHttpError(); + } + + private boolean shouldFallback() { + return config.getHttpVersion() == HttpVersion.HTTP_2; + } + + private CompletableFuture> sendAsyncStream( + HttpClient httpClient, java.net.http.HttpRequest jdkRequest, String url) { + return httpClient + .sendAsync(jdkRequest, BodyHandlers.ofInputStream()) + .thenApply( + response -> { + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + // Read error body immediately while stream is still open + String errorBody = readInputStream(response.body()); + log.warn( + "HTTP request failed. URL: {} | Status: {} | Error: {}", + url, + statusCode, + errorBody); + throw new CompletionException( + new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBody, + statusCode, + errorBody)); + } + return response; + }); + } + + private boolean isStreamConnectionError(Throwable e) { + if (!shouldFallback()) { + return false; + } + Throwable cause = e instanceof CompletionException ? e.getCause() : e; + if (cause instanceof HttpTransportException hte) { + return !hte.isHttpError(); + } + return cause instanceof IOException; + } + private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { URI uri; try { diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index 0f0695142..49f26eedc 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -36,6 +36,7 @@ import okhttp3.mockwebserver.SocketPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -587,7 +588,8 @@ void testStreamCancellation() throws Exception { @Test void testStreamConnectionError() throws Exception { - // Disconnect immediately to simulate connection error + // HTTP/2→HTTP/1.1 fallback retries once, so enqueue two responses + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); HttpRequest request = @@ -1063,4 +1065,242 @@ void testHttpVersionConfig() { assertNotNull(jdkHttpTransport); assertNotNull(jdkHttpTransport2); } + + // ── HTTP/2 → HTTP/1.1 fallback tests ─────────────────────────────── + // + // Decision tree tested: + // + // Connection error (statusCode == null)? + // ├─ Yes → HTTP_2 configured? + // │ ├─ Yes → fallback to HTTP/1.1, retry once + // │ │ ├─ Success → return response + // │ │ └─ Failure → throw original error + // │ └─ No (already HTTP_1_1) → throw (no lower protocol) + // └─ No (has HTTP status code) → don't fallback (protocol is fine) + // + // Uses SocketPolicy.DISCONNECT_AT_START to simulate connection-level + // errors (no HTTP status code) which match the signature of real + // HTTP/2 GOAWAY / RST_STREAM failures. + + @Test + @DisplayName("HTTP_2 + connection drop → fallback to HTTP/1.1 succeeds (execute)") + void testHttp2ConnectionDropFallbackExecute() { + // Enqueue 1: connection drops (simulates HTTP/2 GOAWAY) + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + // Enqueue 2: HTTP/1.1 fallback success + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\": \"ok\"}")); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + // After fallback: first request fails (HTTP/2 connection drop), + // second request succeeds via HTTP/1.1. + HttpResponse response = http2Transport.execute(request); + assertEquals(200, response.getStatusCode()); + assertEquals("{\"result\": \"ok\"}", response.getBody()); + } finally { + http2Transport.close(); + } + } + + @Test + @DisplayName("HTTP_2 + stream connection drop → fallback to HTTP/1.1 succeeds (stream)") + void testHttp2ConnectionDropFallbackStream() { + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeader("Content-Type", "text/event-stream")); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/stream").toString()) + .method("POST") + .body("{}") + .build(); + + try { + // After fallback: first stream fails, second succeeds. + StepVerifier.create(http2Transport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .verifyComplete(); + } finally { + http2Transport.close(); + } + } + + @Test + @DisplayName("HTTP_1_1 + normal request → execute succeeds") + void testHttp11NormalExecute() throws Exception { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\": \"ok\"}")); + + HttpTransportConfig http11Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http11Transport = new JdkHttpTransport(http11Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + HttpResponse response = http11Transport.execute(request); + assertEquals(200, response.getStatusCode()); + assertEquals("{\"result\": \"ok\"}", response.getBody()); + } finally { + http11Transport.close(); + } + } + + @Test + @DisplayName("HTTP_1_1 + normal request → stream succeeds") + void testHttp11NormalStream() { + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeader("Content-Type", "text/event-stream")); + + HttpTransportConfig http11Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http11Transport = new JdkHttpTransport(http11Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/stream").toString()) + .method("POST") + .body("{}") + .build(); + + try { + StepVerifier.create(http11Transport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .verifyComplete(); + } finally { + http11Transport.close(); + } + } + + @Test + @DisplayName("HTTP_1_1 + connection drop → throw directly (already at lowest protocol)") + void testHttp11ConnectionDropNoFallback() { + // HTTP_1_1 already — nothing to fall back to. + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + // Second response never reached because there's no fallback. + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("ok")); + + HttpTransportConfig http11Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http11Transport = new JdkHttpTransport(http11Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + assertThrows(HttpTransportException.class, () -> http11Transport.execute(request)); + } finally { + http11Transport.close(); + } + } + + @Test + @DisplayName("HTTP_2 + HTTP 500 error → do NOT fallback (has status code, protocol is fine)") + void testHttp2NoFallbackForHttpError() throws Exception { + // Server returned HTTP status code → protocol connection succeeded. + mockServer.enqueue( + new MockResponse().setResponseCode(500).setBody("{\"error\": \"server error\"}")); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + HttpResponse response = http2Transport.execute(request); + assertEquals(500, response.getStatusCode()); + } finally { + http2Transport.close(); + } + } + + @Test + @DisplayName("HTTP_2 + two connection drops → fallback fails, throw original error") + void testHttp2FallbackFailsThrowsOriginalError() { + // First request: HTTP/2 connection drops. + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + // Fallback: HTTP/1.1 also drops. + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + // Both attempts fail — must still throw, not silently eat the error. + assertThrows(HttpTransportException.class, () -> http2Transport.execute(request)); + } finally { + http2Transport.close(); + } + } }