Skip to content

Commit f6ce6bd

Browse files
author
Jianke LIN
committed
test(streamable-http): cover disconnect without resumption anchor
1 parent 89d677f commit f6ce6bd

2 files changed

Lines changed: 44 additions & 13 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -435,18 +435,15 @@ async def _handle_reconnection(
435435
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
436436
# Bail if max retries exceeded
437437
if attempt >= MAX_RECONNECTION_ATTEMPTS:
438-
original_request_id = None
439-
if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch
440-
original_request_id = ctx.session_message.message.id
441-
442-
if original_request_id is not None:
443-
error_data = ErrorData(
444-
code=CONNECTION_CLOSED,
445-
message="SSE stream disconnected and could not be resumed",
446-
data={"last_event_id": last_event_id},
447-
)
448-
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
449-
await ctx.read_stream_writer.send(error_msg)
438+
assert isinstance(ctx.session_message.message, JSONRPCRequest)
439+
original_request_id = ctx.session_message.message.id
440+
error_data = ErrorData(
441+
code=CONNECTION_CLOSED,
442+
message="SSE stream disconnected and could not be resumed",
443+
data={"last_event_id": last_event_id},
444+
)
445+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
446+
await ctx.read_stream_writer.send(error_msg)
450447
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
451448
return
452449

tests/client/test_streamable_http.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,21 @@
1717
from mcp.client import ClientSession
1818
from mcp.client.streamable_http import (
1919
MCP_PROTOCOL_VERSION,
20+
RequestContext,
2021
StreamableHTTPTransport,
2122
_encode_header_value,
2223
streamable_http_client,
2324
)
24-
from mcp.types import JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, JSONRPCResponse
25+
from mcp.shared._context_streams import create_context_streams
26+
from mcp.shared.message import SessionMessage
27+
from mcp.types import (
28+
CONNECTION_CLOSED,
29+
JSONRPCError,
30+
JSONRPCMessage,
31+
JSONRPCNotification,
32+
JSONRPCRequest,
33+
JSONRPCResponse,
34+
)
2535

2636

2737
@pytest.mark.parametrize(
@@ -98,6 +108,30 @@ def test_mcp_name_header_values_are_base64_wrapped_when_unsafe_for_an_http_field
98108
assert encoded == raw
99109

100110

111+
@pytest.mark.anyio
112+
async def test_sse_response_disconnect_before_any_event_id_fails_request() -> None:
113+
transport = StreamableHTTPTransport("http://example.com/mcp")
114+
async with httpx.AsyncClient() as client:
115+
read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](1)
116+
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="tools/call", params={"name": "noop", "arguments": {}})
117+
ctx = RequestContext(
118+
client=client,
119+
session_id=None,
120+
session_message=SessionMessage(request),
121+
metadata=None,
122+
read_stream_writer=read_stream_writer,
123+
)
124+
response = httpx.Response(200, headers={"content-type": "text/event-stream"}, content=b"")
125+
126+
async with read_stream_writer, read_stream:
127+
await transport._handle_sse_response(response, ctx)
128+
message = await read_stream.receive()
129+
130+
assert isinstance(message.message, JSONRPCError)
131+
assert message.message.id == 1
132+
assert message.message.error.code == CONNECTION_CLOSED
133+
134+
101135
@pytest.mark.anyio
102136
async def test_pinned_transport_ignores_returned_session_id_and_never_opens_get_or_delete() -> None:
103137
"""A server-issued ``Mcp-Session-Id`` never reaches a pinned client's wire: only POSTs are sent.

0 commit comments

Comments
 (0)