From 260f59b9512060e9f639cd0899047f95a7bdb860 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Thu, 30 Apr 2026 18:33:26 +0800 Subject: [PATCH] fix(transport): resolve SSE stream timeout in JdkHttpTransport --- .../model/transport/HttpTransportConfig.java | 56 +++++++- .../model/transport/JdkHttpTransport.java | 111 +++++++++------ .../model/transport/JdkHttpTransportTest.java | 134 +++++++++++++++++- 3 files changed, 252 insertions(+), 49 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java index a824ed34d..f73f7a171 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java @@ -28,13 +28,21 @@ public class HttpTransportConfig { /** Default connect timeout: 30 seconds. */ public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(30); - /** Default read timeout: 5 minutes (for long-running model calls). */ + /** Default response timeout (TTFT): 5 minutes (Time To First Token for streaming). */ + public static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofMinutes(5); + + /** Default stream idle timeout: 30 seconds (Maximum wait time between consecutive data chunks). */ + public static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = Duration.ofSeconds(30); + + /** Default read timeout: 5 minutes (Overall timeout for non-streaming calls). */ public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5); /** Default write timeout: 30 seconds. */ public static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(30); private final Duration connectTimeout; + private final Duration responseTimeout; + private final Duration streamIdleTimeout; private final Duration readTimeout; private final Duration writeTimeout; private final int maxIdleConnections; @@ -45,6 +53,8 @@ public class HttpTransportConfig { private HttpTransportConfig(Builder builder) { this.connectTimeout = builder.connectTimeout; + this.responseTimeout = builder.responseTimeout; + this.streamIdleTimeout = builder.streamIdleTimeout; this.readTimeout = builder.readTimeout; this.writeTimeout = builder.writeTimeout; this.maxIdleConnections = builder.maxIdleConnections; @@ -64,7 +74,25 @@ public Duration getConnectTimeout() { } /** - * Get the read timeout. + * Get the response timeout (Time To First Token for streaming). + * + * @return the response timeout duration + */ + public Duration getResponseTimeout() { + return responseTimeout; + } + + /** + * Get the stream idle timeout (maximum time between two consecutive data chunks). + * + * @return the stream idle timeout duration + */ + public Duration getStreamIdleTimeout() { + return streamIdleTimeout; + } + + /** + * Get the read timeout(for non-streaming). * * @return the read timeout duration */ @@ -153,6 +181,8 @@ public static HttpTransportConfig defaults() { */ public static class Builder { private Duration connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private Duration responseTimeout = DEFAULT_RESPONSE_TIMEOUT; + private Duration streamIdleTimeout = DEFAULT_STREAM_IDLE_TIMEOUT; private Duration readTimeout = DEFAULT_READ_TIMEOUT; private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT; private int maxIdleConnections = 5; @@ -172,6 +202,28 @@ public Builder connectTimeout(Duration connectTimeout) { return this; } + /** + * Set the response timeout (Time To First Byte). + * + * @param responseTimeout the response timeout duration + * @return this builder + */ + public Builder responseTimeout(Duration responseTimeout) { + this.responseTimeout = responseTimeout; + return this; + } + + /** + * Set the stream idle timeout. + * + * @param streamIdleTimeout the stream idle timeout duration + * @return this builder + */ + public Builder streamIdleTimeout(Duration streamIdleTimeout) { + this.streamIdleTimeout = streamIdleTimeout; + return this; + } + /** * Set the read timeout. * diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 5bb749e64..0ad4da3e1 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -38,8 +38,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -174,7 +174,7 @@ public HttpResponse execute(HttpRequest request) throws HttpTransportException { throw new HttpTransportException("Transport has been closed"); } - var jdkRequest = buildJdkRequest(request); + var jdkRequest = buildJdkRequest(request, false); try { var response = client.send(jdkRequest, BodyHandlers.ofString()); @@ -193,50 +193,64 @@ public Flux stream(HttpRequest request) { return Flux.error(new HttpTransportException("Transport has been closed")); } - var jdkRequest = buildJdkRequest(request); - - // Check status code and read error body immediately when CompletableFuture completes - // to avoid stream being closed before we can read it - CompletableFuture> future = - client.sendAsync(jdkRequest, BodyHandlers.ofInputStream()) - .thenApply( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - // Read error body immediately while stream is still open - String errorBody = readInputStream(response.body()); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error:" - + " {}", - request.getUrl(), + var jdkRequest = buildJdkRequest(request, true); + + // Use Mono.fromFuture() to ensure lazy execution and proper cancellation propagation. + // This prevents "ghost connections" from leaking if the downstream cancels or times out + // before headers arrive. + return Mono.fromFuture(() -> client.sendAsync(jdkRequest, BodyHandlers.ofInputStream())) + .flatMapMany( + response -> { + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + String errorBody = readInputStream(response.body()); + log.warn( + "HTTP request failed. URL: {} | Status: {} | Error: {}", + request.getUrl(), + statusCode, + errorBody); + return Flux.error( + new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBody, statusCode, - errorBody); - throw new CompletionException( - new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBody, - statusCode, - errorBody)); - } - return response; - }); - - return Mono.fromCompletionStage(future) - .flatMapMany(response -> processStreamResponse(response, request)) - .publishOn(Schedulers.boundedElastic()) + errorBody)); + } + return processStreamResponse(response, request); + }) + .timeout( + // Timeout strategy 1: Time To First Token (TTFT). + // The maximum time to wait for the first piece of data. + Mono.delay( + config.getResponseTimeout() != null + ? config.getResponseTimeout() + : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT), + + // Timeout strategy 2: Inter-token gap (Stream Idle Timeout). + // The maximum time to wait between receiving two consecutive data chunks. + data -> + Mono.delay( + config.getStreamIdleTimeout() != null + ? config.getStreamIdleTimeout() + : HttpTransportConfig.DEFAULT_STREAM_IDLE_TIMEOUT)) .onErrorMap( - e -> !(e instanceof HttpTransportException), e -> { + if (e instanceof TimeoutException) { + return new HttpTransportException( + "Stream timeout: " + e.getMessage(), e); + } + if (e instanceof HttpTransportException) { + return e; + } Throwable cause = e instanceof CompletionException ? e.getCause() : e; if (cause instanceof HttpTransportException) { - return (HttpTransportException) cause; + return cause; } return new HttpTransportException( "SSE/NDJSON stream failed: " + e.getMessage(), e); - }) - .subscribeOn(Schedulers.boundedElastic()); + }); } private Flux processStreamResponse( @@ -253,11 +267,13 @@ private Flux processStreamResponse( // Use Flux.using to manage resource lifecycle return Flux.using( - () -> - new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)), - reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), - this::closeQuietly); + () -> + new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)), + reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), + this::closeQuietly) + // reader.lines() uses blocking I/O internally + .subscribeOn(Schedulers.boundedElastic()); } private Flux readSseLines(BufferedReader reader) { @@ -310,7 +326,7 @@ public boolean isClosed() { return closed.get(); } - private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { + private java.net.http.HttpRequest buildJdkRequest(HttpRequest request, boolean isStreaming) { URI uri; try { uri = URI.create(request.getUrl()); @@ -318,8 +334,11 @@ private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { throw new HttpTransportException("Invalid URL: " + request.getUrl(), e); } - var builder = - java.net.http.HttpRequest.newBuilder().uri(uri).timeout(config.getReadTimeout()); + var builder = java.net.http.HttpRequest.newBuilder().uri(uri); + + if (!isStreaming && config.getReadTimeout() != null) { + builder.timeout(config.getReadTimeout()); + } for (Map.Entry header : request.getHeaders().entrySet()) { builder.header(header.getKey(), header.getValue()); diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index 0f0695142..ed019f8f5 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -57,7 +57,9 @@ void setUp() throws Exception { HttpTransportConfig config = HttpTransportConfig.builder() .connectTimeout(Duration.ofSeconds(5)) - .readTimeout(Duration.ofSeconds(10)) + .readTimeout(Duration.ofSeconds(2)) // Global timeout for sync calls + .responseTimeout(Duration.ofSeconds(2)) // TTFT for streaming + .streamIdleTimeout(Duration.ofSeconds(1)) // Inter-token gap for streaming .build(); transport = new JdkHttpTransport(config); } @@ -304,6 +306,8 @@ void testJdkHttpTransportBuilder() { HttpTransportConfig.builder() .connectTimeout(Duration.ofSeconds(10)) .readTimeout(Duration.ofSeconds(30)) + .responseTimeout(Duration.ofSeconds(45)) + .streamIdleTimeout(Duration.ofSeconds(15)) .build(); JdkHttpTransport builtTransport = JdkHttpTransport.builder().config(config).build(); @@ -311,6 +315,8 @@ void testJdkHttpTransportBuilder() { assertNotNull(builtTransport); assertNotNull(builtTransport.getClient()); assertEquals(config, builtTransport.getConfig()); + assertEquals(Duration.ofSeconds(45), builtTransport.getConfig().getResponseTimeout()); + assertEquals(Duration.ofSeconds(15), builtTransport.getConfig().getStreamIdleTimeout()); assertFalse(builtTransport.isClosed()); builtTransport.close(); assertTrue(builtTransport.isClosed()); @@ -1063,4 +1069,130 @@ void testHttpVersionConfig() { assertNotNull(jdkHttpTransport); assertNotNull(jdkHttpTransport2); } + + @Test + void testStreamColdStartSurvivesGlobalTimeout() throws Exception { + // Reproduces the bug reported in the issue 1302 + + HttpTransportConfig customConfig = + HttpTransportConfig.builder() + .readTimeout(Duration.ofSeconds(1)) // Very tight global timeout + .responseTimeout(Duration.ofSeconds(4)) // Ample Time-To-First-Token timeout + .streamIdleTimeout(Duration.ofSeconds(2)) + .build(); + + JdkHttpTransport customTransport = new JdkHttpTransport(customConfig); + + try { + // Simulate the cold start overhead + LLM thinking time by delaying headers for 2 + // seconds. + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeadersDelay(2, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/cold-start-bug-reproduction").toString()) + .method("POST") + .body("{}") + .build(); + + // The test succeeds ONLY if the stream survives the 2-second initial delay + // without being killed by the 1-second global readTimeout. + StepVerifier.create(customTransport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .verifyComplete(); + } finally { + customTransport.close(); + } + } + + @Test + void testStreamResponseTimeout() { + // Test Timeout Strategy 1 (TTFT): + // Delay headers by 3 seconds, which exceeds the configured responseTimeout (2 seconds). + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeader("Content-Type", "text/event-stream") + .setHeadersDelay(3, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/ttft-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testStreamIdleTimeout() { + // Test Timeout Strategy 2 (Inter-token gap): + // Throttle body to emit 1 byte every 2 seconds. + // This exceeds the configured streamIdleTimeout (1 second). + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody("data: {\"id\":\"1\"}\n\ndata: {\"id\":\"2\"}\n\n") + .throttleBody(1, 2, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/idle-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testStreamSurvivesGlobalReadTimeout() { + // Verify that streaming requests are NOT killed by the global readTimeout. + // readTimeout is 2s, but we will make the stream take roughly 3s overall. + // We throttle 10 bytes every 500ms. Inter-token gap is < 1s, so streamIdleTimeout is + // respected. + String sseBody = + "data: 1\n\n" + + "data: 2\n\n" + + "data: 3\n\n" + + "data: 4\n\n" + + "data: 5\n\n" + + "data: [DONE]\n\n"; + + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody(sseBody) + .throttleBody(10, 500, TimeUnit.MILLISECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/survive-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectNextCount(5) // Should successfully receive all 5 data chunks + .verifyComplete(); + } }