diff --git a/conformance/client.rb b/conformance/client.rb index c9f3ec5f..7b7ddea7 100644 --- a/conformance/client.rb +++ b/conformance/client.rb @@ -135,6 +135,15 @@ def build_provider_for(scenario, context) add_numbers = tools.find { |t| t.name == "add_numbers" } abort("Tool add_numbers not found") unless add_numbers client.call_tool(tool: add_numbers, arguments: { a: 1, b: 2 }) +when "sse-retry" + # SEP-1699: the server closes the tools/call SSE stream right after a priming event. + # The transport waits the server's `retry:` interval, reconnects with a GET carrying `Last-Event-ID`, + # and receives the tool result on the resumed stream; the harness verifies the reconnect, + # its timing, and the header. + tools = client.tools + test_reconnection = tools.find { |t| t.name == "test_reconnection" } + abort("Tool test_reconnection not found") unless test_reconnection + client.call_tool(tool: test_reconnection, arguments: {}) when %r|\Aauth/| # Auth-only scenarios: the protocol-level checks (PRM/AS metadata, DCR, PKCE, token usage) # are observed by the conformance server during `connect` and the subsequent request below. diff --git a/conformance/expected_failures.yml b/conformance/expected_failures.yml index 1ab4cc45..cb397ba1 100644 --- a/conformance/expected_failures.yml +++ b/conformance/expected_failures.yml @@ -1,7 +1,5 @@ server: [] client: - # TODO: SSE reconnection not implemented in Ruby client. - - sse-retry # TODO: Elicitation not implemented in Ruby client. - elicitation-sep1034-client-defaults # TODO: Remaining OAuth/auth scenarios not yet implemented in Ruby client. diff --git a/lib/mcp/client/http.rb b/lib/mcp/client/http.rb index e23c154f..c693073f 100644 --- a/lib/mcp/client/http.rb +++ b/lib/mcp/client/http.rb @@ -8,14 +8,22 @@ module MCP class Client - # TODO: HTTP GET for SSE streaming is not yet implemented. + # TODO: A standalone HTTP GET listening stream for server-initiated messages is not yet implemented; + # GET is currently used only to resume a request's SSE stream after a disconnect. # https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#listening-for-messages-from-the-server - # TODO: Resumability and redelivery with Last-Event-ID is not yet implemented. - # https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#resumability-and-redelivery class HTTP ACCEPT_HEADER = "application/json, text/event-stream" + SSE_ACCEPT_HEADER = "text/event-stream" SESSION_ID_HEADER = "Mcp-Session-Id" PROTOCOL_VERSION_HEADER = "MCP-Protocol-Version" + LAST_EVENT_ID_HEADER = "Last-Event-ID" + + # SEP-1699 reconnection tuning: the SSE `retry:` field from the server takes precedence; + # this default applies when the server sent none. Both values match the Python SDK + # (`DEFAULT_RECONNECTION_DELAY_MS`, `MAX_RECONNECTION_ATTEMPTS`); the TypeScript SDK uses + # an exponential backoff that the `retry:` field likewise overrides. + DEFAULT_RECONNECTION_DELAY_MS = 1000 + MAX_RECONNECTION_ATTEMPTS = 2 # Raised when an `oauth:` provider is paired with an MCP URL that is neither HTTPS nor # a loopback `http://` URL, since a bearer token sent over plain HTTP to a remote host @@ -52,6 +60,101 @@ def call(env) end end + # Internal control-flow signal raised inside the streaming callback to stop reading + # an SSE stream once the awaited JSON-RPC response has arrived; servers may hold + # a stream open indefinitely (notably the standalone GET stream), so EOF cannot be relied on. + class StreamAbort < StandardError; end + private_constant :StreamAbort + + # Per-exchange SSE state shared between the initial POST stream and any SEP-1699 reconnection GET streams: + # the incrementally parsed JSON-RPC response, the last received SSE event id (for `Last-Event-ID`), and + # the server's `retry:` reconnection delay. Non-SSE bodies accumulate in `buffer` for the JSON path. + class SSEStream + attr_reader :buffer, :response, :last_event_id, :retry_ms + attr_accessor :abortable + + def initialize(abortable:) + @abortable = abortable + @buffer = +"" + @parser = nil + @response = nil + @last_event_id = nil + @retry_ms = nil + end + + # Whether the server sent a SEP-1699 priming event (any event carrying an id), + # which marks the stream as resumable after a graceful close. + def primed? + !@last_event_id.nil? + end + + # Faraday `on_data` streaming callback. SSE chunks are parsed incrementally; + # anything else (JSON bodies) accumulates in `buffer`. + def on_data + proc do |chunk, _received_bytes, env| + if event_stream?(env) + feed(chunk) + raise StreamAbort if @abortable && @response + else + @buffer << chunk + end + end + end + + # A fresh parser for a new HTTP connection (reconnection GET), + # so a partial line from the previous stream cannot corrupt the next one. + def reset_parser! + @parser = nil + end + + # Parses an SSE body that was delivered outside the streaming callback: + # Faraday versions that do not pass `env` to `on_data` leave SSE chunks in `buffer` + # (consumed here so they are not parsed twice), and adapters without `on_data` support + # yield the whole body only via `fallback_body` (the Faraday `response.body`). + def ingest_pending!(fallback_body) + text = @buffer.empty? ? fallback_body : @buffer.slice!(0..-1) + feed(text) if text.is_a?(String) && !text.empty? + end + + private + + def event_stream?(env) + headers = env&.response_headers + content_type = headers && (headers["content-type"] || headers["Content-Type"]) + !!content_type&.include?("text/event-stream") + end + + def feed(chunk) + parser.feed(chunk) do |_type, data, id, reconnection_time| + @last_event_id = id if id && !id.empty? + @retry_ms = reconnection_time if reconnection_time + next if data.nil? || data.empty? + + begin + parsed = JSON.parse(data) + rescue JSON::ParserError + next + end + + if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error")) + @response ||= parsed + end + end + end + + def parser + @parser ||= begin + require "event_stream_parser" + EventStreamParser::Parser.new + rescue LoadError + raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \ + "Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \ + "See https://rubygems.org/gems/event_stream_parser for more details." + end + end + end + private_constant :SSEStream + attr_reader :url, :session_id, :protocol_version, :server_info, :oauth def initialize(url:, headers: {}, oauth: nil, &block) @@ -197,11 +300,22 @@ def send_request(request:) step_up_retried = false begin + # The response is consumed incrementally so that an SSE stream the server holds open + # (or closes early per SEP-1699) can be handled; `initialize` streams are read to EOF + # so the response object (and its `Mcp-Session-Id` header) is always available for capture. + stream = SSEStream.new(abortable: method.to_s != MCP::Methods::INITIALIZE) yield if block_given? - response = client.post("", request, session_headers) - body = parse_response_body(response, method, params) + response = begin + client.post("", request, session_headers) do |req| + req.options.on_data = stream.on_data + end + rescue StreamAbort + nil + end + + body = resolve_response_body(stream, response, method, params) - capture_session_info(method, response, body) + capture_session_info(method, response, body) if response body rescue Faraday::BadRequestError => e @@ -501,15 +615,11 @@ def require_faraday! "See https://rubygems.org/gems/faraday for more details." end - def require_event_stream_parser! - require "event_stream_parser" - rescue LoadError - raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \ - "Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \ - "See https://rubygems.org/gems/event_stream_parser for more details." - end + # Determines the logical JSON-RPC body of the exchange after the POST stream finished + # (or was aborted because the response already arrived). + def resolve_response_body(stream, response, method, params) + return stream.response if stream.response - def parse_response_body(response, method, params) # 202 Accepted is the server's ACK for a JSON-RPC notification or response; no body is expected. # https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#sending-messages-to-the-server return if response.status == 202 @@ -517,9 +627,29 @@ def parse_response_body(response, method, params) content_type = response.headers["Content-Type"] if content_type&.include?("text/event-stream") - parse_sse_response(response.body, method, params) + # Nothing was parsed during streaming: either the adapter does not support `on_data` + # (the whole body sits in `response.body`) or Faraday did not pass `env` to `on_data` + # (Faraday < 2.1; the SSE chunks sit in `buffer`). Parse whichever holds the body. + unless stream.primed? + stream.ingest_pending!(response.body) + return stream.response if stream.response + end + + # SEP-1699: a graceful close after a priming event (an event with an id) but before + # the response means "reconnect via GET to resume". + return await_response_after_disconnect(stream, method, params) if stream.primed? + + raise RequestHandlerError.new( + "No valid JSON-RPC response found in SSE stream", + { method: method, params: params }, + error_type: :parse_error, + ) elsif content_type&.include?("application/json") - response.body + return parse_json_buffer(stream.buffer, method, params) unless stream.buffer.empty? + + # Adapters without `on_data` support deliver the body via `response.body`, + # already parsed by the json response middleware. + response.body.is_a?(String) ? parse_json_buffer(response.body, method, params) : response.body else raise RequestHandlerError.new( "Unsupported Content-Type: #{content_type.inspect}. Expected application/json or text/event-stream.", @@ -529,28 +659,57 @@ def parse_response_body(response, method, params) end end - def parse_sse_response(body, method, params) - require_event_stream_parser! + def parse_json_buffer(buffer, method, params) + return if buffer.empty? - json_rpc_response = nil - parser = EventStreamParser::Parser.new - parser.feed(body.to_s) do |_type, data, _id| - next if data.empty? + JSON.parse(buffer) + rescue JSON::ParserError => e + raise RequestHandlerError.new( + "Failed to parse JSON response: #{e.message}", + { method: method, params: params }, + error_type: :parse_error, + ) + end - begin - parsed = JSON.parse(data) - json_rpc_response = parsed if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error")) - rescue JSON::ParserError - next + # SEP-1699 resumability: the server closed the SSE stream after a priming event + # without delivering the response. Treat the graceful close like a network failure: + # wait the server-specified `retry:` interval (default 1000ms), then reconnect with + # a GET carrying `Last-Event-ID` so the server can replay the pending response on + # the standalone stream. Mirrors the TypeScript SDK's `StreamableHTTPClientTransport` + # reconnection and the Python SDK's `_handle_reconnection` (including its 2-attempt cap). + # https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699 + def await_response_after_disconnect(stream, method, params) + stream.abortable = true + + MAX_RECONNECTION_ATTEMPTS.times do + sleep((stream.retry_ms || DEFAULT_RECONNECTION_DELAY_MS) / 1000.0) + stream.reset_parser! + + reconnect_response = begin + client.get("") do |req| + req.headers.update(session_headers) + req.headers["Accept"] = SSE_ACCEPT_HEADER + req.headers[LAST_EVENT_ID_HEADER] = stream.last_event_id if stream.last_event_id + req.options.on_data = stream.on_data + end + rescue StreamAbort + # The awaited response arrived on the reconnected stream. + nil end - end - return json_rpc_response if json_rpc_response + # Same fallback as `resolve_response_body` for adapters that do not stream via `on_data`. + if reconnect_response && stream.response.nil? + stream.ingest_pending!(reconnect_response.body) + end + + return stream.response if stream.response + end raise RequestHandlerError.new( - "No valid JSON-RPC response found in SSE stream", + "Server closed the SSE stream without a response for #{method} " \ + "after #{MAX_RECONNECTION_ATTEMPTS} reconnection attempts", { method: method, params: params }, - error_type: :parse_error, + error_type: :internal_error, ) end end diff --git a/test/mcp/client/http_test.rb b/test/mcp/client/http_test.rb index c1d6c4a3..130c5b49 100644 --- a/test/mcp/client/http_test.rb +++ b/test/mcp/client/http_test.rb @@ -35,7 +35,7 @@ def test_raises_load_error_when_event_stream_parser_not_available ) HTTP.any_instance.stubs(:require).with("faraday").returns(true) - HTTP.any_instance.stubs(:require).with("event_stream_parser") + HTTP.const_get(:SSEStream).any_instance.stubs(:require).with("event_stream_parser") .raises(LoadError, "cannot load such file -- event_stream_parser") error = assert_raises(LoadError) do @@ -456,6 +456,169 @@ def test_send_request_raises_error_for_sse_without_response assert_includes(error.message, "No valid JSON-RPC response found in SSE stream") assert_equal(:parse_error, error.error_type) + assert_not_requested(:get, url) + end + + def test_send_request_reconnects_with_last_event_id_after_primed_graceful_close + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/call", + params: { name: "test_reconnection", arguments: {} }, + } + + stub_request(:post, url).with( + body: request.to_json, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-1\nretry: 100\ndata:\n\n", + ) + + get_body = "id: event-2\nretry: 100\ndata:\n\n" \ + "event: message\nid: event-3\n" \ + 'data: {"jsonrpc":"2.0","id":"test_id","result":{"content":[]}}' \ + "\n\n" + get_stub = stub_request(:get, url).with( + headers: { "Accept" => "text/event-stream", "Last-Event-ID" => "event-1" }, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: get_body, + ) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + response = client.send_request(request: request) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + assert_equal({ "content" => [] }, response["result"]) + assert_requested(get_stub) + + # The server-specified `retry:` interval must elapse before the reconnection GET. + assert_operator(elapsed, :>=, 0.1) + end + + def test_send_request_uses_default_reconnection_delay_when_retry_field_absent + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/call", + params: { name: "test_reconnection", arguments: {} }, + } + + stub_request(:post, url).with( + body: request.to_json, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-1\ndata:\n\n", + ) + + get_body = 'data: {"jsonrpc":"2.0","id":"test_id","result":{"content":[]}}' \ + "\n\n" + stub_request(:get, url).with( + headers: { "Last-Event-ID" => "event-1" }, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: get_body, + ) + + client.expects(:sleep).with(HTTP::DEFAULT_RECONNECTION_DELAY_MS / 1000.0) + + response = client.send_request(request: request) + + assert_equal({ "content" => [] }, response["result"]) + end + + def test_send_request_raises_after_reconnection_attempts_are_exhausted + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/call", + params: { name: "test_reconnection", arguments: {} }, + } + + stub_request(:post, url).with( + body: request.to_json, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-1\nretry: 10\ndata:\n\n", + ) + + first_get = stub_request(:get, url) + .with(headers: { "Last-Event-ID" => "event-1" }) + .to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-2\nretry: 10\ndata:\n\n", + ) + second_get = stub_request(:get, url).with( + headers: { "Last-Event-ID" => "event-2" }, + ).to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "id: event-3\nretry: 10\ndata:\n\n", + ) + + error = assert_raises(RequestHandlerError) do + client.send_request(request: request) + end + + assert_includes(error.message, "after 2 reconnection attempts") + assert_equal(:internal_error, error.error_type) + assert_requested(first_get) + assert_requested(second_get) + end + + def test_send_request_parses_json_response_when_adapter_does_not_stream + # The Faraday test adapter ignores `on_data`, like adapters without + # streaming support; the body must be read from `response.body`. + stubs = Faraday::Adapter::Test::Stubs.new do |stub| + stub.post("/") do + [200, { "Content-Type" => "application/json" }, { result: { tools: [] } }.to_json] + end + end + client = HTTP.new(url: url) { |faraday| faraday.adapter(:test, stubs) } + + response = client.send_request(request: { jsonrpc: "2.0", id: "test_id", method: "tools/list" }) + + assert_equal({ "result" => { "tools" => [] } }, response) + end + + def test_send_request_parses_sse_response_when_adapter_does_not_stream + sse_body = "event: message\n" \ + 'data: {"jsonrpc":"2.0","id":"test_id","result":{"tools":[]}}' \ + "\n\n" + stubs = Faraday::Adapter::Test::Stubs.new do |stub| + stub.post("/") do + [200, { "Content-Type" => "text/event-stream" }, sse_body] + end + end + client = HTTP.new(url: url) { |faraday| faraday.adapter(:test, stubs) } + + response = client.send_request(request: { jsonrpc: "2.0", id: "test_id", method: "tools/list" }) + + assert_equal({ "tools" => [] }, response["result"]) + end + + def test_sse_stream_parses_buffered_chunks_when_env_is_unavailable + # Faraday < 2.1 invokes `on_data` without `env`; the content type + # cannot be detected, so SSE chunks accumulate in the buffer and are + # parsed by the `ingest_pending!` fallback. + stream = HTTP.const_get(:SSEStream).new(abortable: true) + chunk = "data: {\"jsonrpc\":\"2.0\",\"id\":\"test_id\",\"result\":{}}\n\n" + + stream.on_data.call(chunk, chunk.bytesize) + + assert_nil(stream.response) + assert_equal(chunk, stream.buffer) + + stream.ingest_pending!(nil) + + assert_equal({ "jsonrpc" => "2.0", "id" => "test_id", "result" => {} }, stream.response) + assert_empty(stream.buffer) end def test_captures_session_id_and_protocol_version_on_initialize