From 06602c96d01962ccd68e28a7b6112bcf475ef7a3 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Fri, 12 Dec 2025 18:10:52 +0800 Subject: [PATCH 1/9] fix: add timeout for SSE response waiting to prevent indefinite blocking --- client/transport/sse.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/client/transport/sse.go b/client/transport/sse.go index 66a96816..25645e23 100644 --- a/client/transport/sse.go +++ b/client/transport/sse.go @@ -439,10 +439,26 @@ func (c *SSE) SendRequest( return nil, fmt.Errorf("request failed with status %d: %s", resp.StatusCode, body) } + // Calculate response timeout + responseTimeout := 60 * time.Second + if deadline, ok := ctx.Deadline(); ok { + remaining := time.Until(deadline) + if remaining < responseTimeout { + responseTimeout = remaining + } + } + + timer := time.NewTimer(responseTimeout) + defer timer.Stop() + select { case <-ctx.Done(): deleteResponseChan() return nil, ctx.Err() + case <-timer.C: + // Timeout handling + deleteResponseChan() + return nil, fmt.Errorf("timeout waiting for SSE response after %v", responseTimeout) case response, ok := <-responseChan: if ok { return response, nil From 34b5fb502b9684404f3cba28586fd22f26addc6f Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Fri, 12 Dec 2025 18:27:38 +0800 Subject: [PATCH 2/9] add test --- client/transport/sse_test.go | 155 +++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index 7bbb15e0..6969c492 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -1077,3 +1077,158 @@ func TestSSE_SendNotification_Unauthorized_StaticToken(t *testing.T) { // Clean up transport.Close() } + +func TestSSE_SendRequest_Timeout(t *testing.T) { + t.Run("TimeoutWhenServerNeverResponds", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "text/event-stream" { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + fmt.Fprintf(w, "event: endpoint\ndata: /message\n\n") + flusher.Flush() + <-r.Context().Done() + return + } + + if r.Method == http.MethodPost { + w.WriteHeader(http.StatusAccepted) + return + } + })) + defer server.Close() + + transport, err := NewSSE(server.URL) + require.NoError(t, err) + defer transport.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = transport.Start(ctx) + require.NoError(t, err) + + requestCtx, requestCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer requestCancel() + + request := JSONRPCRequest{ + JSONRPC: "2.0", + ID: mcp.NewRequestId(int64(1)), + Method: "test/timeout", + } + + startTime := time.Now() + _, err = transport.SendRequest(requestCtx, request) + duration := time.Since(startTime) + + require.Error(t, err, "Expected timeout error") + require.Contains(t, err.Error(), "timeout", "Error should mention timeout") + require.GreaterOrEqual(t, duration, 1500*time.Millisecond) + require.LessOrEqual(t, duration, 2500*time.Millisecond) + }) + + t.Run("ContextDeadlineTakesPrecedence", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "text/event-stream" { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + fmt.Fprintf(w, "event: endpoint\ndata: /message\n\n") + flusher.Flush() + <-r.Context().Done() + return + } + + if r.Method == http.MethodPost { + w.WriteHeader(http.StatusAccepted) + return + } + })) + defer server.Close() + + transport, err := NewSSE(server.URL) + require.NoError(t, err) + defer transport.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = transport.Start(ctx) + require.NoError(t, err) + + requestCtx, requestCancel := context.WithTimeout(context.Background(), 1*time.Second) + defer requestCancel() + + request := JSONRPCRequest{ + JSONRPC: "2.0", + ID: mcp.NewRequestId(int64(1)), + Method: "test/deadline", + } + + startTime := time.Now() + _, err = transport.SendRequest(requestCtx, request) + duration := time.Since(startTime) + + require.Error(t, err) + errMsg := err.Error() + require.True(t, + strings.Contains(errMsg, "timeout") || strings.Contains(errMsg, "deadline exceeded"), + "Error should mention timeout or deadline, got: %v", err) + require.LessOrEqual(t, duration, 1500*time.Millisecond, "Should respect context deadline of 1s") + }) + + t.Run("TimeoutCleansUpResponseChannel", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "text/event-stream" { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + fmt.Fprintf(w, "event: endpoint\ndata: /message\n\n") + flusher.Flush() + <-r.Context().Done() + return + } + + if r.Method == http.MethodPost { + w.WriteHeader(http.StatusAccepted) + return + } + })) + defer server.Close() + + transport, err := NewSSE(server.URL) + require.NoError(t, err) + defer transport.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = transport.Start(ctx) + require.NoError(t, err) + + transport.mu.RLock() + initialCount := len(transport.responses) + transport.mu.RUnlock() + require.Equal(t, 0, initialCount) + + requestCtx, requestCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer requestCancel() + + request := JSONRPCRequest{ + JSONRPC: "2.0", + ID: mcp.NewRequestId(int64(999)), + Method: "test/timeout", + } + + _, err = transport.SendRequest(requestCtx, request) + require.Error(t, err) + + time.Sleep(50 * time.Millisecond) + + transport.mu.RLock() + finalCount := len(transport.responses) + transport.mu.RUnlock() + + require.Equal(t, 0, finalCount) + }) +} From c7341d275cbd128cdd8798c1d9b97866116e2a74 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Fri, 12 Dec 2025 18:36:01 +0800 Subject: [PATCH 3/9] improve timeout handling --- client/transport/sse.go | 31 +++++++++++++++++++++---- client/transport/sse_test.go | 44 ++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/client/transport/sse.go b/client/transport/sse.go index 25645e23..b59225bb 100644 --- a/client/transport/sse.go +++ b/client/transport/sse.go @@ -177,16 +177,31 @@ func (c *SSE) Start(ctx context.Context) error { go c.readSSE(resp.Body) // Wait for the endpoint to be received - timeout := time.NewTimer(30 * time.Second) - defer timeout.Stop() + endpointTimeout := 30 * time.Second + if deadline, ok := ctx.Deadline(); ok { + remaining := time.Until(deadline) + // If context deadline has already passed, return immediately + if remaining <= 0 { + cancel() + return ctx.Err() + } + // Use the shorter of remaining time or default timeout + if remaining < endpointTimeout { + endpointTimeout = remaining + } + } + + timer := time.NewTimer(endpointTimeout) + defer timer.Stop() + select { case <-c.endpointChan: // Endpoint received, proceed case <-ctx.Done(): - return fmt.Errorf("context cancelled while waiting for endpoint") - case <-timeout.C: // Add a timeout + return fmt.Errorf("context cancelled while waiting for endpoint: %w", ctx.Err()) + case <-timer.C: cancel() - return fmt.Errorf("timeout waiting for endpoint") + return fmt.Errorf("timeout waiting for endpoint after %v", endpointTimeout) } c.started.Store(true) @@ -443,6 +458,12 @@ func (c *SSE) SendRequest( responseTimeout := 60 * time.Second if deadline, ok := ctx.Deadline(); ok { remaining := time.Until(deadline) + // Check if context deadline has already passed + if remaining <= 0 { + deleteResponseChan() + return nil, ctx.Err() + } + // Use the shorter of remaining time or default timeout if remaining < responseTimeout { responseTimeout = remaining } diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index 6969c492..da1be6d1 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -1230,5 +1230,49 @@ func TestSSE_SendRequest_Timeout(t *testing.T) { transport.mu.RUnlock() require.Equal(t, 0, finalCount) + t.Run("AlreadyExpiredDeadline", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "text/event-stream" { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + fmt.Fprintf(w, "event: endpoint\ndata: /message\n\n") + flusher.Flush() + <-r.Context().Done() + return + } + + if r.Method == http.MethodPost { + w.WriteHeader(http.StatusAccepted) + return + } + })) + defer server.Close() + + transport, err := NewSSE(server.URL) + require.NoError(t, err) + defer transport.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = transport.Start(ctx) + require.NoError(t, err) + + expiredCtx, expiredCancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) + defer expiredCancel() + + request := JSONRPCRequest{ + JSONRPC: "2.0", + ID: mcp.NewRequestId(int64(1)), + Method: "test/expired", + } + + _, err = transport.SendRequest(expiredCtx, request) + + require.Error(t, err) + require.True(t, errors.Is(err, context.DeadlineExceeded), + "Expected context. DeadlineExceeded, got: %v", err) + }) }) } From a20cda27db955dc62634a7d1b97d56d933f1bc2f Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Fri, 12 Dec 2025 18:39:54 +0800 Subject: [PATCH 4/9] imporve test --- client/transport/sse_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index da1be6d1..50d3d83e 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -1123,8 +1123,9 @@ func TestSSE_SendRequest_Timeout(t *testing.T) { require.Error(t, err, "Expected timeout error") require.Contains(t, err.Error(), "timeout", "Error should mention timeout") - require.GreaterOrEqual(t, duration, 1500*time.Millisecond) - require.LessOrEqual(t, duration, 2500*time.Millisecond) + expectedTimeout := 2 * time.Second + require.GreaterOrEqual(t, duration, expectedTimeout*7/10) // 70% of expected + require.LessOrEqual(t, duration, expectedTimeout*13/10) // 130% of expected }) t.Run("ContextDeadlineTakesPrecedence", func(t *testing.T) { From 5c22011b4bc3e646a97a2184d290c617e0b6ba86 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Wed, 17 Dec 2025 02:34:44 +0800 Subject: [PATCH 5/9] fix --- client/transport/sse.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transport/sse.go b/client/transport/sse.go index b59225bb..82c10a9f 100644 --- a/client/transport/sse.go +++ b/client/transport/sse.go @@ -198,7 +198,7 @@ func (c *SSE) Start(ctx context.Context) error { case <-c.endpointChan: // Endpoint received, proceed case <-ctx.Done(): - return fmt.Errorf("context cancelled while waiting for endpoint: %w", ctx.Err()) + return fmt.Errorf("context cancelled while waiting for endpoint: %w", ctx.Err()) case <-timer.C: cancel() return fmt.Errorf("timeout waiting for endpoint after %v", endpointTimeout) From 6311134ef6895aaf0f4dc82bc3878f726b72108a Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Wed, 17 Dec 2025 03:00:27 +0800 Subject: [PATCH 6/9] test --- client/transport/sse_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index 50d3d83e..db8da007 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -1273,7 +1273,7 @@ func TestSSE_SendRequest_Timeout(t *testing.T) { require.Error(t, err) require.True(t, errors.Is(err, context.DeadlineExceeded), - "Expected context. DeadlineExceeded, got: %v", err) + "Expected context. DeadlineExceeded, got: %v", err) }) }) } From a5698566add67228034863be1dcb4915f9e3f227 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Wed, 17 Dec 2025 03:13:57 +0800 Subject: [PATCH 7/9] fix --- client/transport/sse_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index db8da007..975785a5 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -1273,7 +1273,7 @@ func TestSSE_SendRequest_Timeout(t *testing.T) { require.Error(t, err) require.True(t, errors.Is(err, context.DeadlineExceeded), - "Expected context. DeadlineExceeded, got: %v", err) + "Expected context.DeadlineExceeded, got: %v", err) }) }) } From 344053b6f3c79b9cef60de2321a39dbfca3409f8 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Wed, 17 Dec 2025 03:24:17 +0800 Subject: [PATCH 8/9] fix test --- client/transport/sse_test.go | 73 ++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index 975785a5..573359b6 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -1231,49 +1231,50 @@ func TestSSE_SendRequest_Timeout(t *testing.T) { transport.mu.RUnlock() require.Equal(t, 0, finalCount) - t.Run("AlreadyExpiredDeadline", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Accept") == "text/event-stream" { - w.Header().Set("Content-Type", "text/event-stream") - w.WriteHeader(http.StatusOK) - flusher, _ := w.(http.Flusher) - fmt.Fprintf(w, "event: endpoint\ndata: /message\n\n") - flusher.Flush() - <-r.Context().Done() - return - } + }) - if r.Method == http.MethodPost { - w.WriteHeader(http.StatusAccepted) - return - } - })) - defer server.Close() + t.Run("AlreadyExpiredDeadline", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "text/event-stream" { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + fmt.Fprintf(w, "event: endpoint\ndata: /message\n\n") + flusher.Flush() + <-r.Context().Done() + return + } + + if r.Method == http.MethodPost { + w.WriteHeader(http.StatusAccepted) + return + } + })) + defer server.Close() - transport, err := NewSSE(server.URL) - require.NoError(t, err) - defer transport.Close() + transport, err := NewSSE(server.URL) + require.NoError(t, err) + defer transport.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - err = transport.Start(ctx) - require.NoError(t, err) + err = transport.Start(ctx) + require.NoError(t, err) - expiredCtx, expiredCancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) - defer expiredCancel() + expiredCtx, expiredCancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) + defer expiredCancel() - request := JSONRPCRequest{ - JSONRPC: "2.0", - ID: mcp.NewRequestId(int64(1)), - Method: "test/expired", - } + request := JSONRPCRequest{ + JSONRPC: "2.0", + ID: mcp.NewRequestId(int64(1)), + Method: "test/expired", + } - _, err = transport.SendRequest(expiredCtx, request) + _, err = transport.SendRequest(expiredCtx, request) - require.Error(t, err) - require.True(t, errors.Is(err, context.DeadlineExceeded), - "Expected context.DeadlineExceeded, got: %v", err) - }) + require.Error(t, err) + require.True(t, errors.Is(err, context.DeadlineExceeded), + "Expected context.DeadlineExceeded, got: %v", err) }) } From 3360327d32c50c4dfc703ef0383b123c76733b1d Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Fri, 19 Dec 2025 16:36:13 +0800 Subject: [PATCH 9/9] fix resource leak --- client/transport/sse.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/transport/sse.go b/client/transport/sse.go index 82c10a9f..38425324 100644 --- a/client/transport/sse.go +++ b/client/transport/sse.go @@ -434,6 +434,7 @@ func (c *SSE) SendRequest( resp.Body.Close() if err != nil { + deleteResponseChan() return nil, fmt.Errorf("failed to read response body: %w", err) }