From 25900f923547d4c943569fce85aaf89fce3c3f41 Mon Sep 17 00:00:00 2001 From: Koichi ITO Date: Sat, 13 Jun 2026 00:41:05 +0900 Subject: [PATCH] Support SSE Reconnection Per SEP-1699 in the HTTP Client Transport ## Motivation and Context Resolves the `sse-retry` client conformance scenario (SEP-1699, https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699). Per SEP-1699, a server may close a request's SSE stream right after a priming event (an event carrying an `id:`) without delivering the response, expecting the client to treat the graceful close like a network failure: wait the server-specified `retry:` interval, then reconnect with an HTTP GET carrying `Last-Event-ID` so the server can replay the pending response on the resumed stream. The Ruby client previously read the entire SSE body to EOF and parsed it afterward, so it had no notion of per-event state (`id:`, `retry:`) or reconnection, and the scenario failed all three checks (graceful reconnect, retry timing, `Last-Event-ID`). `MCP::Client::HTTP#send_request` now consumes responses incrementally via Faraday's `on_data` streaming callback, feeding SSE chunks to `event_stream_parser` as they arrive while buffering plain JSON bodies as before. A new internal `SSEStream` tracks the last received event id, the `retry:` reconnection delay, and the awaited JSON-RPC response; once the response arrives, a held-open stream is abandoned via an internal control-flow exception since servers may never close it. When a stream closes gracefully after a priming event but before the response, the client sleeps for the `retry:` interval (default 1000ms when the server sent none), then issues a GET with `Accept: text/event-stream`, the session headers, and `Last-Event-ID`, for up to 2 attempts. The defaults match the Python SDK's `DEFAULT_RECONNECTION_DELAY_MS` and `MAX_RECONNECTION_ATTEMPTS` (`streamable_http.py`); the TypeScript SDK's `StreamableHTTPClientTransport` (`streamableHttp.ts`) likewise lets the `retry:` field override its backoff and reconnects only when a priming event was received and no response has arrived. The public behavior of `send_request` is unchanged: same return values, same error mapping, and a stream that closes without a priming event still raises the "No valid JSON-RPC response found in SSE stream" error without reconnecting. Because the previous implementation read `response.body` and therefore worked with any Faraday adapter, a fallback keeps that compatibility: when nothing was parsed during streaming, the body is read from `response.body` (adapters without `on_data` support, e.g. the Faraday test adapter) or from the buffered chunks (Faraday < 2.1, which invokes `on_data` without `env`). The conformance client gains an `sse-retry` branch that calls the harness's `test_reconnection` tool, and the scenario is removed from the expected failures baseline. ## How Has This Been Tested? New unit tests covering: reconnection with `Last-Event-ID` after a primed graceful close including the `retry:` wait, the default 1000ms delay when `retry:` is absent, raising after the reconnection attempt cap with `Last-Event-ID` advancing across attempts, no reconnection for unprimed streams, and the non-streaming fallbacks (JSON and SSE responses through the Faraday test adapter, which ignores `on_data`, plus buffered SSE chunks when `on_data` receives no `env`) ## Breaking Changes None. The `send_request` contract is unchanged; SSE bodies are now parsed incrementally instead of after EOF, and reconnection only activates when the server opts in by sending a priming event. --- conformance/client.rb | 9 ++ conformance/expected_failures.yml | 2 - lib/mcp/client/http.rb | 221 +++++++++++++++++++++++++----- test/mcp/client/http_test.rb | 165 +++++++++++++++++++++- 4 files changed, 363 insertions(+), 34 deletions(-) diff --git a/conformance/client.rb b/conformance/client.rb index c9f3ec5f..7b7ddea7 100644 --- a/conformance/client.rb +++ b/conformance/client.rb @@ -135,6 +135,15 @@ def build_provider_for(scenario, context) add_numbers = tools.find { |t| t.name == "add_numbers" } abort("Tool add_numbers not found") unless add_numbers client.call_tool(tool: add_numbers, arguments: { a: 1, b: 2 }) +when "sse-retry" + # SEP-1699: the server closes the tools/call SSE stream right after a priming event. + # The transport waits the server's `retry:` interval, reconnects with a GET carrying `Last-Event-ID`, + # and receives the tool result on the resumed stream; the harness verifies the reconnect, + # its timing, and the header. + tools = client.tools + test_reconnection = tools.find { |t| t.name == "test_reconnection" } + abort("Tool test_reconnection not found") unless test_reconnection + client.call_tool(tool: test_reconnection, arguments: {}) when %r|\Aauth/| # Auth-only scenarios: the protocol-level checks (PRM/AS metadata, DCR, PKCE, token usage) # are observed by the conformance server during `connect` and the subsequent request below. diff --git a/conformance/expected_failures.yml b/conformance/expected_failures.yml index 1ab4cc45..cb397ba1 100644 --- a/conformance/expected_failures.yml +++ b/conformance/expected_failures.yml @@ -1,7 +1,5 @@ server: [] client: - # TODO: SSE reconnection not implemented in Ruby client. - - sse-retry # TODO: Elicitation not implemented in Ruby client. - elicitation-sep1034-client-defaults # TODO: Remaining OAuth/auth scenarios not yet implemented in Ruby client. diff --git a/lib/mcp/client/http.rb b/lib/mcp/client/http.rb index e23c154f..c693073f 100644 --- a/lib/mcp/client/http.rb +++ b/lib/mcp/client/http.rb @@ -8,14 +8,22 @@ module MCP class Client - # TODO: HTTP GET for SSE streaming is not yet implemented. + # TODO: A standalone HTTP GET listening stream for server-initiated messages is not yet implemented; + # GET is currently used only to resume a request's SSE stream after a disconnect. # https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#listening-for-messages-from-the-server - # TODO: Resumability and redelivery with Last-Event-ID is not yet implemented. - # https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#resumability-and-redelivery class HTTP ACCEPT_HEADER = "application/json, text/event-stream" + SSE_ACCEPT_HEADER = "text/event-stream" SESSION_ID_HEADER = "Mcp-Session-Id" PROTOCOL_VERSION_HEADER = "MCP-Protocol-Version" + LAST_EVENT_ID_HEADER = "Last-Event-ID" + + # SEP-1699 reconnection tuning: the SSE `retry:` field from the server takes precedence; + # this default applies when the server sent none. Both values match the Python SDK + # (`DEFAULT_RECONNECTION_DELAY_MS`, `MAX_RECONNECTION_ATTEMPTS`); the TypeScript SDK uses + # an exponential backoff that the `retry:` field likewise overrides. + DEFAULT_RECONNECTION_DELAY_MS = 1000 + MAX_RECONNECTION_ATTEMPTS = 2 # Raised when an `oauth:` provider is paired with an MCP URL that is neither HTTPS nor # a loopback `http://` URL, since a bearer token sent over plain HTTP to a remote host @@ -52,6 +60,101 @@ def call(env) end end + # Internal control-flow signal raised inside the streaming callback to stop reading + # an SSE stream once the awaited JSON-RPC response has arrived; servers may hold + # a stream open indefinitely (notably the standalone GET stream), so EOF cannot be relied on. + class StreamAbort < StandardError; end + private_constant :StreamAbort + + # Per-exchange SSE state shared between the initial POST stream and any SEP-1699 reconnection GET streams: + # the incrementally parsed JSON-RPC response, the last received SSE event id (for `Last-Event-ID`), and + # the server's `retry:` reconnection delay. Non-SSE bodies accumulate in `buffer` for the JSON path. + class SSEStream + attr_reader :buffer, :response, :last_event_id, :retry_ms + attr_accessor :abortable + + def initialize(abortable:) + @abortable = abortable + @buffer = +"" + @parser = nil + @response = nil + @last_event_id = nil + @retry_ms = nil + end + + # Whether the server sent a SEP-1699 priming event (any event carrying an id), + # which marks the stream as resumable after a graceful close. + def primed? + !@last_event_id.nil? + end + + # Faraday `on_data` streaming callback. SSE chunks are parsed incrementally; + # anything else (JSON bodies) accumulates in `buffer`. + def on_data + proc do |chunk, _received_bytes, env| + if event_stream?(env) + feed(chunk) + raise StreamAbort if @abortable && @response + else + @buffer << chunk + end + end + end + + # A fresh parser for a new HTTP connection (reconnection GET), + # so a partial line from the previous stream cannot corrupt the next one. + def reset_parser! + @parser = nil + end + + # Parses an SSE body that was delivered outside the streaming callback: + # Faraday versions that do not pass `env` to `on_data` leave SSE chunks in `buffer` + # (consumed here so they are not parsed twice), and adapters without `on_data` support + # yield the whole body only via `fallback_body` (the Faraday `response.body`). + def ingest_pending!(fallback_body) + text = @buffer.empty? ? fallback_body : @buffer.slice!(0..-1) + feed(text) if text.is_a?(String) && !text.empty? + end + + private + + def event_stream?(env) + headers = env&.response_headers + content_type = headers && (headers["content-type"] || headers["Content-Type"]) + !!content_type&.include?("text/event-stream") + end + + def feed(chunk) + parser.feed(chunk) do |_type, data, id, reconnection_time| + @last_event_id = id if id && !id.empty? + @retry_ms = reconnection_time if reconnection_time + next if data.nil? || data.empty? + + begin + parsed = JSON.parse(data) + rescue JSON::ParserError + next + end + + if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error")) + @response ||= parsed + end + end + end + + def parser + @parser ||= begin + require "event_stream_parser" + EventStreamParser::Parser.new + rescue LoadError + raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \ + "Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \ + "See https://rubygems.org/gems/event_stream_parser for more details." + end + end + end + private_constant :SSEStream + attr_reader :url, :session_id, :protocol_version, :server_info, :oauth def initialize(url:, headers: {}, oauth: nil, &block) @@ -197,11 +300,22 @@ def send_request(request:) step_up_retried = false begin + # The response is consumed incrementally so that an SSE stream the server holds open + # (or closes early per SEP-1699) can be handled; `initialize` streams are read to EOF + # so the response object (and its `Mcp-Session-Id` header) is always available for capture. + stream = SSEStream.new(abortable: method.to_s != MCP::Methods::INITIALIZE) yield if block_given? - response = client.post("", request, session_headers) - body = parse_response_body(response, method, params) + response = begin + client.post("", request, session_headers) do |req| + req.options.on_data = stream.on_data + end + rescue StreamAbort + nil + end + + body = resolve_response_body(stream, response, method, params) - capture_session_info(method, response, body) + capture_session_info(method, response, body) if response body rescue Faraday::BadRequestError => e @@ -501,15 +615,11 @@ def require_faraday! "See https://rubygems.org/gems/faraday for more details." end - def require_event_stream_parser! - require "event_stream_parser" - rescue LoadError - raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \ - "Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \ - "See https://rubygems.org/gems/event_stream_parser for more details." - end + # Determines the logical JSON-RPC body of the exchange after the POST stream finished + # (or was aborted because the response already arrived). + def resolve_response_body(stream, response, method, params) + return stream.response if stream.response - def parse_response_body(response, method, params) # 202 Accepted is the server's ACK for a JSON-RPC notification or response; no body is expected. # https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#sending-messages-to-the-server return if response.status == 202 @@ -517,9 +627,29 @@ def parse_response_body(response, method, params) content_type = response.headers["Content-Type"] if content_type&.include?("text/event-stream") - parse_sse_response(response.body, method, params) + # Nothing was parsed during streaming: either the adapter does not support `on_data` + # (the whole body sits in `response.body`) or Faraday did not pass `env` to `on_data` + # (Faraday < 2.1; the SSE chunks sit in `buffer`). Parse whichever holds the body. + unless stream.primed? + stream.ingest_pending!(response.body) + return stream.response if stream.response + end + + # SEP-1699: a graceful close after a priming event (an event with an id) but before + # the response means "reconnect via GET to resume". + return await_response_after_disconnect(stream, method, params) if stream.primed? + + raise RequestHandlerError.new( + "No valid JSON-RPC response found in SSE stream", + { method: method, params: params }, + error_type: :parse_error, + ) elsif content_type&.include?("application/json") - response.body + return parse_json_buffer(stream.buffer, method, params) unless stream.buffer.empty? + + # Adapters without `on_data` support deliver the body via `response.body`, + # already parsed by the json response middleware. + response.body.is_a?(String) ? parse_json_buffer(response.body, method, params) : response.body else raise RequestHandlerError.new( "Unsupported Content-Type: #{content_type.inspect}. Expected application/json or text/event-stream.", @@ -529,28 +659,57 @@ def parse_response_body(response, method, params) end end - def parse_sse_response(body, method, params) - require_event_stream_parser! + def parse_json_buffer(buffer, method, params) + return if buffer.empty? - json_rpc_response = nil - parser = EventStreamParser::Parser.new - parser.feed(body.to_s) do |_type, data, _id| - next if data.empty? + JSON.parse(buffer) + rescue JSON::ParserError => e + raise RequestHandlerError.new( + "Failed to parse JSON response: #{e.message}", + { method: method, params: params }, + error_type: :parse_error, + ) + end - begin - parsed = JSON.parse(data) - json_rpc_response = parsed if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error")) - rescue JSON::ParserError - next + # SEP-1699 resumability: the server closed the SSE stream after a priming event + # without delivering the response. Treat the graceful close like a network failure: + # wait the server-specified `retry:` interval (default 1000ms), then reconnect with + # a GET carrying `Last-Event-ID` so the server can replay the pending response on + # the standalone stream. Mirrors the TypeScript SDK's `StreamableHTTPClientTransport` + # reconnection and the Python SDK's `_handle_reconnection` (including its 2-attempt cap). + # https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699 + def await_response_after_disconnect(stream, method, params) + stream.abortable = true + + MAX_RECONNECTION_ATTEMPTS.times do + sleep((stream.retry_ms || DEFAULT_RECONNECTION_DELAY_MS) / 1000.0) + stream.reset_parser! + + reconnect_response = begin + client.get("") do |req| + req.headers.update(session_headers) + req.headers["Accept"] = SSE_ACCEPT_HEADER + req.headers[LAST_EVENT_ID_HEADER] = stream.last_event_id if stream.last_event_id + req.options.on_data = stream.on_data + end + rescue StreamAbort + # The awaited response arrived on the reconnected stream. + nil end - end - return json_rpc_response if json_rpc_response + # Same fallback as `resolve_response_body` for adapters that do not stream via `on_data`. + if reconnect_response && stream.response.nil? + stream.ingest_pending!(reconnect_response.body) + end + + return stream.response if stream.response + end raise RequestHandlerError.new( - "No valid JSON-RPC response found in SSE stream", + "Server closed the SSE stream without a response for #{method} " \ + "after #{MAX_RECONNECTION_ATTEMPTS} reconnection attempts", { method: method, params: params }, - error_type: :parse_error, + error_type: :internal_error, ) end end diff --git a/test/mcp/client/http_test.rb b/test/mcp/client/http_test.rb index c1d6c4a3..130c5b49 100644 --- a/test/mcp/client/http_test.rb +++ b/test/mcp/client/http_test.rb @@ -35,7 +35,7 @@ def test_raises_load_error_when_event_stream_parser_not_available ) HTTP.any_instance.stubs(:require).with("faraday").returns(true) - HTTP.any_instance.stubs(:require).with("event_stream_parser") + HTTP.const_get(:SSEStream).any_instance.stubs(:require).with("event_stream_parser") .raises(LoadError, "cannot load such file -- event_stream_parser") error = assert_raises(LoadError) do @@ -456,6 +456,169 @@ def test_send_request_raises_error_for_sse_without_response assert_includes(error.message, "No valid JSON-RPC response found in SSE stream") assert_equal(:parse_error, error.error_type) + assert_not_requested(:get, url) + end + + def test_send_request_reconnects_with_last_event_id_after_primed_graceful_close + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/call", + params: { name: "test_reconnection", arguments: {} }, + } + + stub_request(:post, url).with( + body: request.to_json, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-1\nretry: 100\ndata:\n\n", + ) + + get_body = "id: event-2\nretry: 100\ndata:\n\n" \ + "event: message\nid: event-3\n" \ + 'data: {"jsonrpc":"2.0","id":"test_id","result":{"content":[]}}' \ + "\n\n" + get_stub = stub_request(:get, url).with( + headers: { "Accept" => "text/event-stream", "Last-Event-ID" => "event-1" }, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: get_body, + ) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + response = client.send_request(request: request) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + assert_equal({ "content" => [] }, response["result"]) + assert_requested(get_stub) + + # The server-specified `retry:` interval must elapse before the reconnection GET. + assert_operator(elapsed, :>=, 0.1) + end + + def test_send_request_uses_default_reconnection_delay_when_retry_field_absent + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/call", + params: { name: "test_reconnection", arguments: {} }, + } + + stub_request(:post, url).with( + body: request.to_json, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-1\ndata:\n\n", + ) + + get_body = 'data: {"jsonrpc":"2.0","id":"test_id","result":{"content":[]}}' \ + "\n\n" + stub_request(:get, url).with( + headers: { "Last-Event-ID" => "event-1" }, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: get_body, + ) + + client.expects(:sleep).with(HTTP::DEFAULT_RECONNECTION_DELAY_MS / 1000.0) + + response = client.send_request(request: request) + + assert_equal({ "content" => [] }, response["result"]) + end + + def test_send_request_raises_after_reconnection_attempts_are_exhausted + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/call", + params: { name: "test_reconnection", arguments: {} }, + } + + stub_request(:post, url).with( + body: request.to_json, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-1\nretry: 10\ndata:\n\n", + ) + + first_get = stub_request(:get, url) + .with(headers: { "Last-Event-ID" => "event-1" }) + .to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-2\nretry: 10\ndata:\n\n", + ) + second_get = stub_request(:get, url).with( + headers: { "Last-Event-ID" => "event-2" }, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-3\nretry: 10\ndata:\n\n", + ) + + error = assert_raises(RequestHandlerError) do + client.send_request(request: request) + end + + assert_includes(error.message, "after 2 reconnection attempts") + assert_equal(:internal_error, error.error_type) + assert_requested(first_get) + assert_requested(second_get) + end + + def test_send_request_parses_json_response_when_adapter_does_not_stream + # The Faraday test adapter ignores `on_data`, like adapters without + # streaming support; the body must be read from `response.body`. + stubs = Faraday::Adapter::Test::Stubs.new do |stub| + stub.post("/") do + [200, { "Content-Type" => "application/json" }, { result: { tools: [] } }.to_json] + end + end + client = HTTP.new(url: url) { |faraday| faraday.adapter(:test, stubs) } + + response = client.send_request(request: { jsonrpc: "2.0", id: "test_id", method: "tools/list" }) + + assert_equal({ "result" => { "tools" => [] } }, response) + end + + def test_send_request_parses_sse_response_when_adapter_does_not_stream + sse_body = "event: message\n" \ + 'data: {"jsonrpc":"2.0","id":"test_id","result":{"tools":[]}}' \ + "\n\n" + stubs = Faraday::Adapter::Test::Stubs.new do |stub| + stub.post("/") do + [200, { "Content-Type" => "text/event-stream" }, sse_body] + end + end + client = HTTP.new(url: url) { |faraday| faraday.adapter(:test, stubs) } + + response = client.send_request(request: { jsonrpc: "2.0", id: "test_id", method: "tools/list" }) + + assert_equal({ "tools" => [] }, response["result"]) + end + + def test_sse_stream_parses_buffered_chunks_when_env_is_unavailable + # Faraday < 2.1 invokes `on_data` without `env`; the content type + # cannot be detected, so SSE chunks accumulate in the buffer and are + # parsed by the `ingest_pending!` fallback. + stream = HTTP.const_get(:SSEStream).new(abortable: true) + chunk = "data: {\"jsonrpc\":\"2.0\",\"id\":\"test_id\",\"result\":{}}\n\n" + + stream.on_data.call(chunk, chunk.bytesize) + + assert_nil(stream.response) + assert_equal(chunk, stream.buffer) + + stream.ingest_pending!(nil) + + assert_equal({ "jsonrpc" => "2.0", "id" => "test_id", "result" => {} }, stream.response) + assert_empty(stream.buffer) end def test_captures_session_id_and_protocol_version_on_initialize