diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 5bb749e64..99e0a8735 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -75,6 +75,9 @@ public class JdkHttpTransport implements HttpTransport { private final HttpTransportConfig config; private final AtomicBoolean closed = new AtomicBoolean(false); + /** HTTP/1.1 client for protocol fallback. */ + private final HttpClient http1_1Client; + /** * Create a new JdkHttpTransport with default configuration. */ @@ -91,6 +94,7 @@ public class JdkHttpTransport implements HttpTransport { JdkHttpTransport(HttpTransportConfig config) { this.config = Objects.requireNonNull(config, "config must not be null"); this.client = buildClient(config); + this.http1_1Client = buildClient(config, HttpVersion.HTTP_1_1); } /** @@ -106,12 +110,17 @@ public class JdkHttpTransport implements HttpTransport { public JdkHttpTransport(HttpClient client, HttpTransportConfig config) { this.client = Objects.requireNonNull(client, "client must not be null"); this.config = Objects.requireNonNull(config, "config must not be null"); + this.http1_1Client = buildClient(config, HttpVersion.HTTP_1_1); } private static HttpClient buildClient(HttpTransportConfig config) { + return buildClient(config, config.getHttpVersion()); + } + + private static HttpClient buildClient(HttpTransportConfig config, HttpVersion version) { HttpClient.Builder builder = HttpClient.newBuilder() - .version(config.getHttpVersion().toJdkHttpVersion()) + .version(version.toJdkHttpVersion()) .followRedirects(Redirect.NORMAL) .connectTimeout(config.getConnectTimeout()); @@ -177,7 +186,25 @@ public HttpResponse execute(HttpRequest request) throws HttpTransportException { var jdkRequest = buildJdkRequest(request); try { - var response = client.send(jdkRequest, BodyHandlers.ofString()); + return doSend(client, jdkRequest); + } catch (HttpTransportException e) { + if (isConnectionError(e) && shouldFallback()) { + log.warn("HTTP/2 request failed, retrying with HTTP/1.1: {}", e.getMessage()); + try { + return doSend(http1_1Client, jdkRequest); + } catch (HttpTransportException ex) { + // Fallback also failed, throw the original error + throw e; + } + } + throw e; + } + } + + private HttpResponse doSend(HttpClient httpClient, java.net.http.HttpRequest jdkRequest) + throws HttpTransportException { + try { + var response = httpClient.send(jdkRequest, BodyHandlers.ofString()); return buildHttpResponse(response); } catch (IOException e) { throw new HttpTransportException("HTTP request failed: " + e.getMessage(), e); @@ -195,36 +222,25 @@ public Flux stream(HttpRequest request) { var jdkRequest = buildJdkRequest(request); - // Check status code and read error body immediately when CompletableFuture completes - // to avoid stream being closed before we can read it - CompletableFuture> future = - client.sendAsync(jdkRequest, BodyHandlers.ofInputStream()) - .thenApply( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - // Read error body immediately while stream is still open - String errorBody = readInputStream(response.body()); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error:" - + " {}", - request.getUrl(), - statusCode, - errorBody); - throw new CompletionException( - new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBody, - statusCode, - errorBody)); - } - return response; - }); - - return Mono.fromCompletionStage(future) + return Mono.fromCompletionStage(sendAsyncStream(client, jdkRequest, request.getUrl())) .flatMapMany(response -> processStreamResponse(response, request)) + .onErrorResume( + e -> { + if (isStreamConnectionError(e)) { + log.warn( + "HTTP/2 stream request failed, retrying with HTTP/1.1: {}", + e.getMessage()); + return Mono.fromCompletionStage( + sendAsyncStream( + http1_1Client, + jdkRequest, + request.getUrl())) + .flatMapMany( + response -> + processStreamResponse(response, request)); + } + return Mono.error(e); + }) .publishOn(Schedulers.boundedElastic()) .onErrorMap( e -> !(e instanceof HttpTransportException), @@ -310,6 +326,53 @@ public boolean isClosed() { return closed.get(); } + private static boolean isConnectionError(HttpTransportException e) { + return !e.isHttpError(); + } + + private boolean shouldFallback() { + return config.getHttpVersion() == HttpVersion.HTTP_2; + } + + private CompletableFuture> sendAsyncStream( + HttpClient httpClient, java.net.http.HttpRequest jdkRequest, String url) { + return httpClient + .sendAsync(jdkRequest, BodyHandlers.ofInputStream()) + .thenApply( + response -> { + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + // Read error body immediately while stream is still open + String errorBody = readInputStream(response.body()); + log.warn( + "HTTP request failed. URL: {} | Status: {} | Error: {}", + url, + statusCode, + errorBody); + throw new CompletionException( + new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBody, + statusCode, + errorBody)); + } + return response; + }); + } + + private boolean isStreamConnectionError(Throwable e) { + if (!shouldFallback()) { + return false; + } + Throwable cause = e instanceof CompletionException ? e.getCause() : e; + if (cause instanceof HttpTransportException hte) { + return !hte.isHttpError(); + } + return cause instanceof IOException; + } + private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { URI uri; try { diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index 0f0695142..49f26eedc 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -36,6 +36,7 @@ import okhttp3.mockwebserver.SocketPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -587,7 +588,8 @@ void testStreamCancellation() throws Exception { @Test void testStreamConnectionError() throws Exception { - // Disconnect immediately to simulate connection error + // HTTP/2→HTTP/1.1 fallback retries once, so enqueue two responses + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); HttpRequest request = @@ -1063,4 +1065,242 @@ void testHttpVersionConfig() { assertNotNull(jdkHttpTransport); assertNotNull(jdkHttpTransport2); } + + // ── HTTP/2 → HTTP/1.1 fallback tests ─────────────────────────────── + // + // Decision tree tested: + // + // Connection error (statusCode == null)? + // ├─ Yes → HTTP_2 configured? + // │ ├─ Yes → fallback to HTTP/1.1, retry once + // │ │ ├─ Success → return response + // │ │ └─ Failure → throw original error + // │ └─ No (already HTTP_1_1) → throw (no lower protocol) + // └─ No (has HTTP status code) → don't fallback (protocol is fine) + // + // Uses SocketPolicy.DISCONNECT_AT_START to simulate connection-level + // errors (no HTTP status code) which match the signature of real + // HTTP/2 GOAWAY / RST_STREAM failures. + + @Test + @DisplayName("HTTP_2 + connection drop → fallback to HTTP/1.1 succeeds (execute)") + void testHttp2ConnectionDropFallbackExecute() { + // Enqueue 1: connection drops (simulates HTTP/2 GOAWAY) + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + // Enqueue 2: HTTP/1.1 fallback success + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\": \"ok\"}")); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + // After fallback: first request fails (HTTP/2 connection drop), + // second request succeeds via HTTP/1.1. + HttpResponse response = http2Transport.execute(request); + assertEquals(200, response.getStatusCode()); + assertEquals("{\"result\": \"ok\"}", response.getBody()); + } finally { + http2Transport.close(); + } + } + + @Test + @DisplayName("HTTP_2 + stream connection drop → fallback to HTTP/1.1 succeeds (stream)") + void testHttp2ConnectionDropFallbackStream() { + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeader("Content-Type", "text/event-stream")); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/stream").toString()) + .method("POST") + .body("{}") + .build(); + + try { + // After fallback: first stream fails, second succeeds. + StepVerifier.create(http2Transport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .verifyComplete(); + } finally { + http2Transport.close(); + } + } + + @Test + @DisplayName("HTTP_1_1 + normal request → execute succeeds") + void testHttp11NormalExecute() throws Exception { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\": \"ok\"}")); + + HttpTransportConfig http11Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http11Transport = new JdkHttpTransport(http11Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + HttpResponse response = http11Transport.execute(request); + assertEquals(200, response.getStatusCode()); + assertEquals("{\"result\": \"ok\"}", response.getBody()); + } finally { + http11Transport.close(); + } + } + + @Test + @DisplayName("HTTP_1_1 + normal request → stream succeeds") + void testHttp11NormalStream() { + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeader("Content-Type", "text/event-stream")); + + HttpTransportConfig http11Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http11Transport = new JdkHttpTransport(http11Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/stream").toString()) + .method("POST") + .body("{}") + .build(); + + try { + StepVerifier.create(http11Transport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .verifyComplete(); + } finally { + http11Transport.close(); + } + } + + @Test + @DisplayName("HTTP_1_1 + connection drop → throw directly (already at lowest protocol)") + void testHttp11ConnectionDropNoFallback() { + // HTTP_1_1 already — nothing to fall back to. + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + // Second response never reached because there's no fallback. + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("ok")); + + HttpTransportConfig http11Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http11Transport = new JdkHttpTransport(http11Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + assertThrows(HttpTransportException.class, () -> http11Transport.execute(request)); + } finally { + http11Transport.close(); + } + } + + @Test + @DisplayName("HTTP_2 + HTTP 500 error → do NOT fallback (has status code, protocol is fine)") + void testHttp2NoFallbackForHttpError() throws Exception { + // Server returned HTTP status code → protocol connection succeeded. + mockServer.enqueue( + new MockResponse().setResponseCode(500).setBody("{\"error\": \"server error\"}")); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + HttpResponse response = http2Transport.execute(request); + assertEquals(500, response.getStatusCode()); + } finally { + http2Transport.close(); + } + } + + @Test + @DisplayName("HTTP_2 + two connection drops → fallback fails, throw original error") + void testHttp2FallbackFailsThrowsOriginalError() { + // First request: HTTP/2 connection drops. + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + // Fallback: HTTP/1.1 also drops. + mockServer.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + + HttpTransportConfig http2Config = + HttpTransportConfig.builder() + .httpVersion(HttpVersion.HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .readTimeout(Duration.ofSeconds(10)) + .build(); + JdkHttpTransport http2Transport = new JdkHttpTransport(http2Config); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/test").toString()) + .method("POST") + .body("{}") + .build(); + + try { + // Both attempts fail — must still throw, not silently eat the error. + assertThrows(HttpTransportException.class, () -> http2Transport.execute(request)); + } finally { + http2Transport.close(); + } + } }