Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions conformance/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ def build_provider_for(scenario, context)
add_numbers = tools.find { |t| t.name == "add_numbers" }
abort("Tool add_numbers not found") unless add_numbers
client.call_tool(tool: add_numbers, arguments: { a: 1, b: 2 })
when "sse-retry"
# SEP-1699: the server closes the tools/call SSE stream right after a priming event.
# The transport waits the server's `retry:` interval, reconnects with a GET carrying `Last-Event-ID`,
# and receives the tool result on the resumed stream; the harness verifies the reconnect,
# its timing, and the header.
tools = client.tools
test_reconnection = tools.find { |t| t.name == "test_reconnection" }
abort("Tool test_reconnection not found") unless test_reconnection
client.call_tool(tool: test_reconnection, arguments: {})
when %r|\Aauth/|
# Auth-only scenarios: the protocol-level checks (PRM/AS metadata, DCR, PKCE, token usage)
# are observed by the conformance server during `connect` and the subsequent request below.
Expand Down
2 changes: 0 additions & 2 deletions conformance/expected_failures.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
server: []
client:
# TODO: SSE reconnection not implemented in Ruby client.
- sse-retry
# TODO: Elicitation not implemented in Ruby client.
- elicitation-sep1034-client-defaults
# TODO: Remaining OAuth/auth scenarios not yet implemented in Ruby client.
Expand Down
221 changes: 190 additions & 31 deletions lib/mcp/client/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@

module MCP
class Client
# TODO: HTTP GET for SSE streaming is not yet implemented.
# TODO: A standalone HTTP GET listening stream for server-initiated messages is not yet implemented;
# GET is currently used only to resume a request's SSE stream after a disconnect.
# https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#listening-for-messages-from-the-server
# TODO: Resumability and redelivery with Last-Event-ID is not yet implemented.
# https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#resumability-and-redelivery
class HTTP
ACCEPT_HEADER = "application/json, text/event-stream"
SSE_ACCEPT_HEADER = "text/event-stream"
SESSION_ID_HEADER = "Mcp-Session-Id"
PROTOCOL_VERSION_HEADER = "MCP-Protocol-Version"
LAST_EVENT_ID_HEADER = "Last-Event-ID"

# SEP-1699 reconnection tuning: the SSE `retry:` field from the server takes precedence;
# this default applies when the server sent none. Both values match the Python SDK
# (`DEFAULT_RECONNECTION_DELAY_MS`, `MAX_RECONNECTION_ATTEMPTS`); the TypeScript SDK uses
# an exponential backoff that the `retry:` field likewise overrides.
DEFAULT_RECONNECTION_DELAY_MS = 1000
MAX_RECONNECTION_ATTEMPTS = 2

# Raised when an `oauth:` provider is paired with an MCP URL that is neither HTTPS nor
# a loopback `http://` URL, since a bearer token sent over plain HTTP to a remote host
Expand Down Expand Up @@ -52,6 +60,101 @@ def call(env)
end
end

# Internal control-flow signal raised inside the streaming callback to stop reading
# an SSE stream once the awaited JSON-RPC response has arrived; servers may hold
# a stream open indefinitely (notably the standalone GET stream), so EOF cannot be relied on.
class StreamAbort < StandardError; end
private_constant :StreamAbort

# Per-exchange SSE state shared between the initial POST stream and any SEP-1699 reconnection GET streams:
# the incrementally parsed JSON-RPC response, the last received SSE event id (for `Last-Event-ID`), and
# the server's `retry:` reconnection delay. Non-SSE bodies accumulate in `buffer` for the JSON path.
class SSEStream
attr_reader :buffer, :response, :last_event_id, :retry_ms
attr_accessor :abortable

def initialize(abortable:)
@abortable = abortable
@buffer = +""
@parser = nil
@response = nil
@last_event_id = nil
@retry_ms = nil
end

# Whether the server sent a SEP-1699 priming event (any event carrying an id),
# which marks the stream as resumable after a graceful close.
def primed?
!@last_event_id.nil?
end

# Faraday `on_data` streaming callback. SSE chunks are parsed incrementally;
# anything else (JSON bodies) accumulates in `buffer`.
def on_data
proc do |chunk, _received_bytes, env|
if event_stream?(env)
feed(chunk)
raise StreamAbort if @abortable && @response
else
@buffer << chunk
end
end
end

# A fresh parser for a new HTTP connection (reconnection GET),
# so a partial line from the previous stream cannot corrupt the next one.
def reset_parser!
@parser = nil
end

# Parses an SSE body that was delivered outside the streaming callback:
# Faraday versions that do not pass `env` to `on_data` leave SSE chunks in `buffer`
# (consumed here so they are not parsed twice), and adapters without `on_data` support
# yield the whole body only via `fallback_body` (the Faraday `response.body`).
def ingest_pending!(fallback_body)
text = @buffer.empty? ? fallback_body : @buffer.slice!(0..-1)
feed(text) if text.is_a?(String) && !text.empty?
end

private

def event_stream?(env)
headers = env&.response_headers
content_type = headers && (headers["content-type"] || headers["Content-Type"])
!!content_type&.include?("text/event-stream")
end

def feed(chunk)
parser.feed(chunk) do |_type, data, id, reconnection_time|
@last_event_id = id if id && !id.empty?
@retry_ms = reconnection_time if reconnection_time
next if data.nil? || data.empty?

begin
parsed = JSON.parse(data)
rescue JSON::ParserError
next
end

if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error"))
@response ||= parsed
end
end
end

def parser
@parser ||= begin
require "event_stream_parser"
EventStreamParser::Parser.new
rescue LoadError
raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \
"Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \
"See https://rubygems.org/gems/event_stream_parser for more details."
end
end
end
private_constant :SSEStream

attr_reader :url, :session_id, :protocol_version, :server_info, :oauth

def initialize(url:, headers: {}, oauth: nil, &block)
Expand Down Expand Up @@ -197,11 +300,22 @@ def send_request(request:)
step_up_retried = false

begin
# The response is consumed incrementally so that an SSE stream the server holds open
# (or closes early per SEP-1699) can be handled; `initialize` streams are read to EOF
# so the response object (and its `Mcp-Session-Id` header) is always available for capture.
stream = SSEStream.new(abortable: method.to_s != MCP::Methods::INITIALIZE)
yield if block_given?
response = client.post("", request, session_headers)
body = parse_response_body(response, method, params)
response = begin
client.post("", request, session_headers) do |req|
req.options.on_data = stream.on_data
end
rescue StreamAbort
nil
end

body = resolve_response_body(stream, response, method, params)

capture_session_info(method, response, body)
capture_session_info(method, response, body) if response

body
rescue Faraday::BadRequestError => e
Expand Down Expand Up @@ -501,25 +615,41 @@ def require_faraday!
"See https://rubygems.org/gems/faraday for more details."
end

def require_event_stream_parser!
require "event_stream_parser"
rescue LoadError
raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \
"Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \
"See https://rubygems.org/gems/event_stream_parser for more details."
end
# Determines the logical JSON-RPC body of the exchange after the POST stream finished
# (or was aborted because the response already arrived).
def resolve_response_body(stream, response, method, params)
return stream.response if stream.response

def parse_response_body(response, method, params)
# 202 Accepted is the server's ACK for a JSON-RPC notification or response; no body is expected.
# https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#sending-messages-to-the-server
return if response.status == 202

content_type = response.headers["Content-Type"]

if content_type&.include?("text/event-stream")
parse_sse_response(response.body, method, params)
# Nothing was parsed during streaming: either the adapter does not support `on_data`
# (the whole body sits in `response.body`) or Faraday did not pass `env` to `on_data`
# (Faraday < 2.1; the SSE chunks sit in `buffer`). Parse whichever holds the body.
unless stream.primed?
stream.ingest_pending!(response.body)
return stream.response if stream.response
end

# SEP-1699: a graceful close after a priming event (an event with an id) but before
# the response means "reconnect via GET to resume".
return await_response_after_disconnect(stream, method, params) if stream.primed?

raise RequestHandlerError.new(
"No valid JSON-RPC response found in SSE stream",
{ method: method, params: params },
error_type: :parse_error,
)
elsif content_type&.include?("application/json")
response.body
return parse_json_buffer(stream.buffer, method, params) unless stream.buffer.empty?

# Adapters without `on_data` support deliver the body via `response.body`,
# already parsed by the json response middleware.
response.body.is_a?(String) ? parse_json_buffer(response.body, method, params) : response.body
else
raise RequestHandlerError.new(
"Unsupported Content-Type: #{content_type.inspect}. Expected application/json or text/event-stream.",
Expand All @@ -529,28 +659,57 @@ def parse_response_body(response, method, params)
end
end

def parse_sse_response(body, method, params)
require_event_stream_parser!
def parse_json_buffer(buffer, method, params)
return if buffer.empty?

json_rpc_response = nil
parser = EventStreamParser::Parser.new
parser.feed(body.to_s) do |_type, data, _id|
next if data.empty?
JSON.parse(buffer)
rescue JSON::ParserError => e
raise RequestHandlerError.new(
"Failed to parse JSON response: #{e.message}",
{ method: method, params: params },
error_type: :parse_error,
)
end

begin
parsed = JSON.parse(data)
json_rpc_response = parsed if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error"))
rescue JSON::ParserError
next
# SEP-1699 resumability: the server closed the SSE stream after a priming event
# without delivering the response. Treat the graceful close like a network failure:
# wait the server-specified `retry:` interval (default 1000ms), then reconnect with
# a GET carrying `Last-Event-ID` so the server can replay the pending response on
# the standalone stream. Mirrors the TypeScript SDK's `StreamableHTTPClientTransport`
# reconnection and the Python SDK's `_handle_reconnection` (including its 2-attempt cap).
# https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699
def await_response_after_disconnect(stream, method, params)
stream.abortable = true

MAX_RECONNECTION_ATTEMPTS.times do
sleep((stream.retry_ms || DEFAULT_RECONNECTION_DELAY_MS) / 1000.0)
stream.reset_parser!

reconnect_response = begin
client.get("") do |req|
req.headers.update(session_headers)
req.headers["Accept"] = SSE_ACCEPT_HEADER
req.headers[LAST_EVENT_ID_HEADER] = stream.last_event_id if stream.last_event_id
req.options.on_data = stream.on_data
end
rescue StreamAbort
# The awaited response arrived on the reconnected stream.
nil
end
end

return json_rpc_response if json_rpc_response
# Same fallback as `resolve_response_body` for adapters that do not stream via `on_data`.
if reconnect_response && stream.response.nil?
stream.ingest_pending!(reconnect_response.body)
end

return stream.response if stream.response
end

raise RequestHandlerError.new(
"No valid JSON-RPC response found in SSE stream",
"Server closed the SSE stream without a response for #{method} " \
"after #{MAX_RECONNECTION_ATTEMPTS} reconnection attempts",
{ method: method, params: params },
error_type: :parse_error,
error_type: :internal_error,
)
end
end
Expand Down
Loading