Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ Set `stateless: true` in `MCP::Server::Transports::StreamableHTTPTransport.new`
transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, stateless: true)
```

By default, sessions do not expire. To mitigate session hijacking risks, you can set a `session_idle_timeout` (in seconds).
When configured, sessions that receive no HTTP requests for this duration are automatically expired and cleaned up:

```ruby
# Session timeout of 30 minutes
transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, session_idle_timeout: 1800)
```

### Unsupported Features (to be implemented in future versions)

- Resource subscriptions
Expand Down
104 changes: 95 additions & 9 deletions lib/mcp/server/transports/streamable_http_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,30 @@ module MCP
class Server
module Transports
class StreamableHTTPTransport < Transport
def initialize(server, stateless: false)
def initialize(server, stateless: false, session_idle_timeout: nil)
super(server)
# Maps `session_id` to `{ stream: stream_object, server_session: ServerSession }`.
# Maps `session_id` to `{ stream: stream_object, server_session: ServerSession, last_active_at: float_from_monotonic_clock }`.
@sessions = {}
@mutex = Mutex.new

@stateless = stateless
@session_idle_timeout = session_idle_timeout

if @session_idle_timeout
if @stateless
raise ArgumentError, "session_idle_timeout is not supported in stateless mode."
elsif @session_idle_timeout <= 0
raise ArgumentError, "session_idle_timeout must be a positive number."
end
end

start_reaper_thread if @session_idle_timeout
end

REQUIRED_POST_ACCEPT_TYPES = ["application/json", "text/event-stream"].freeze
REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze
STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze
SESSION_REAP_INTERVAL = 60

def handle_request(request)
case request.env["REQUEST_METHOD"]
Expand All @@ -35,6 +47,9 @@ def handle_request(request)
end

def close
@reaper_thread&.kill
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the mutex if the reaper thread had a lock on it and was busy reaping?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I could verify, In MRI Ruby, this should be safe. Ruby runs ensure clauses even when a thread is terminated via Thread#kill, and Mutex#synchronize releases the lock in its ensure path.

So if the reaper is killed while holding the mutex, the thread will unwind and the lock should be released as part of that process. As a result, close's @mutex.synchronize is not expected to deadlock due to the mutex being left locked by Thread#kill.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM!

@reaper_thread = nil

@mutex.synchronize do
@sessions.each_key { |session_id| cleanup_session_unsafe(session_id) }
end
Expand All @@ -56,6 +71,11 @@ def send_notification(method, params = nil, session_id: nil)
session = @sessions[session_id]
return false unless session && session[:stream]

if session_expired?(session)
cleanup_session_unsafe(session_id)
return false
end

begin
send_to_stream(session[:stream], notification)
true
Expand All @@ -75,6 +95,11 @@ def send_notification(method, params = nil, session_id: nil)
@sessions.each do |sid, session|
next unless session[:stream]

if session_expired?(session)
failed_sessions << sid
next
end

begin
send_to_stream(session[:stream], notification)
sent_count += 1
Expand All @@ -97,6 +122,39 @@ def send_notification(method, params = nil, session_id: nil)

private

def start_reaper_thread
@reaper_thread = Thread.new do
loop do
sleep(SESSION_REAP_INTERVAL)
reap_expired_sessions
rescue StandardError => e
MCP.configuration.exception_reporter.call(e, error: "Session reaper error")
end
end
end

def reap_expired_sessions
return unless @session_idle_timeout

expired_streams = @mutex.synchronize do
@sessions.each_with_object([]) do |(session_id, session), streams|
next unless session_expired?(session)

streams << session[:stream] if session[:stream]
@sessions.delete(session_id)
end
end

expired_streams.each do |stream|
# Closing outside the mutex is safe because expired sessions are already
# removed from `@sessions` above, so other threads will not find them
# and will not attempt to close the same stream.
stream.close
rescue
nil
end
end

def send_to_stream(stream, data)
message = data.is_a?(String) ? data : data.to_json
stream.write("data: #{message}\n\n")
Expand Down Expand Up @@ -145,7 +203,9 @@ def handle_get(request)
session_id = extract_session_id(request)

return missing_session_id_response unless session_id
return session_not_found_response unless session_exists?(session_id)

error_response = validate_and_touch_session(session_id)
return error_response if error_response
return session_already_connected_response if get_session_stream(session_id)

setup_sse_stream(session_id)
Expand Down Expand Up @@ -242,6 +302,7 @@ def handle_initialization(body_string, body)
@sessions[session_id] = {
stream: nil,
server_session: server_session,
last_active_at: Process.clock_gettime(Process::CLOCK_MONOTONIC),
}
end
end
Expand Down Expand Up @@ -269,13 +330,16 @@ def handle_regular_request(body_string, session_id)
server_session = nil
stream = nil

if session_id && !@stateless
@mutex.synchronize do
session = @sessions[session_id]
return session_not_found_response unless session
unless @stateless
if session_id
error_response = validate_and_touch_session(session_id)
return error_response if error_response

server_session = session[:server_session]
stream = session[:stream]
@mutex.synchronize do
session = @sessions[session_id]
server_session = session[:server_session] if session
stream = session[:stream] if session
end
end
end

Expand All @@ -292,6 +356,22 @@ def handle_regular_request(body_string, session_id)
end
end

def validate_and_touch_session(session_id)
@mutex.synchronize do
return session_not_found_response unless (session = @sessions[session_id])
return unless @session_idle_timeout

if session_expired?(session)
cleanup_session_unsafe(session_id)
return session_not_found_response
end

session[:last_active_at] = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

nil
end

def get_session_stream(session_id)
@mutex.synchronize { @sessions[session_id]&.fetch(:stream, nil) }
end
Expand Down Expand Up @@ -397,6 +477,12 @@ def send_keepalive_ping(session_id)
)
raise # Re-raise to exit the keepalive loop
end

def session_expired?(session)
return false unless @session_idle_timeout

Process.clock_gettime(Process::CLOCK_MONOTONIC) - session[:last_active_at] > @session_idle_timeout
end
end
end
end
Expand Down
Loading