Skip to content

Commit 1f1cb48

Browse files
committed
replay_sender finally + replace _mint_priming_event unit tests with end-to-end
Adds a finally to replay_sender (mirroring _run_sse_writer) so resumed connections clean up _sse_stream_writers[stream_id] and _request_streams[stream_id] on disconnect. Nested inside the stream_id-is-set block so no edge-case None-handling is needed. Drops the four unit tests that poked _mint_priming_event directly and adds an end-to-end test in test_hosting_resume.py asserting the event store records [(S, priming), (S, msg1), ..., (S, response)] for a real POST through a real MCPServer + transport. The dropped tests' branches remain covered by the existing high-level tests in the same file.
1 parent 127b209 commit 1f1cb48

3 files changed

Lines changed: 63 additions & 61 deletions

File tree

src/mcp/server/streamable_http.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -889,28 +889,32 @@ async def send_event(event_message: EventMessage) -> None:
889889

890890
# If stream ID not in mapping, create it
891891
if stream_id and stream_id not in self._request_streams: # pragma: no branch
892-
# Register SSE writer so close_sse_stream() can close it
893-
self._sse_stream_writers[stream_id] = sse_stream_writer
894-
895-
# Prime the resumed connection so the client sees the stream
896-
# is re-registered. The replay→live-tail ordering window here
897-
# is pre-existing and tracked separately.
898-
priming_event = await self._mint_priming_event(stream_id, replay_protocol_version)
899-
if priming_event is not None:
900-
await sse_stream_writer.send(priming_event)
901-
902-
# Create new request streams for this connection
903-
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](
904-
REQUEST_STREAM_BUFFER_SIZE
905-
)
906-
msg_reader = self._request_streams[stream_id][1]
907-
908-
# Forward messages to SSE
909-
async with msg_reader:
910-
async for event_message in msg_reader:
911-
event_data = self._create_event_data(event_message)
912-
913-
await sse_stream_writer.send(event_data)
892+
try:
893+
# Register SSE writer so close_sse_stream() can close it
894+
self._sse_stream_writers[stream_id] = sse_stream_writer
895+
896+
# Prime the resumed connection so the client sees the stream
897+
# is re-registered. The replay→live-tail ordering window here
898+
# is pre-existing and tracked separately.
899+
priming_event = await self._mint_priming_event(stream_id, replay_protocol_version)
900+
if priming_event is not None:
901+
await sse_stream_writer.send(priming_event)
902+
903+
# Create new request streams for this connection
904+
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](
905+
REQUEST_STREAM_BUFFER_SIZE
906+
)
907+
msg_reader = self._request_streams[stream_id][1]
908+
909+
# Forward messages to SSE
910+
async with msg_reader:
911+
async for event_message in msg_reader:
912+
event_data = self._create_event_data(event_message)
913+
914+
await sse_stream_writer.send(event_data)
915+
finally:
916+
self._sse_stream_writers.pop(stream_id, None)
917+
await self._clean_up_memory_streams(stream_id)
914918
except anyio.ClosedResourceError: # pragma: lax no cover
915919
# Expected when close_sse_stream() is called
916920
logger.debug("Replay SSE stream closed by close_sse_stream()")

tests/interaction/transports/test_hosting_resume.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,43 @@ async def test_a_post_sse_stream_begins_with_a_priming_event_and_stamps_every_ev
113113
)
114114

115115

116+
@requirement("hosting:resume:priming")
117+
async def test_the_priming_row_is_stored_before_any_handler_output_for_that_stream() -> None:
118+
"""The priming cursor is the first row the event store records for a request's stream.
119+
120+
The POST handler stores the priming row before dispatching the request, so by construction
121+
it precedes anything `message_router` can store for that stream id.
122+
"""
123+
store = SequencedEventStore()
124+
mcp = MCPServer("resumable")
125+
126+
@mcp.tool()
127+
async def burst(ctx: Context) -> str:
128+
await ctx.info("a") # pyright: ignore[reportDeprecated]
129+
await ctx.info("b") # pyright: ignore[reportDeprecated]
130+
await ctx.info("c") # pyright: ignore[reportDeprecated]
131+
return "done"
132+
133+
async with mounted_app(mcp, event_store=store, retry_interval=0) as (http, _):
134+
session_id = await initialize_via_http(http)
135+
with anyio.fail_after(5):
136+
async with http.stream( # pragma: no branch
137+
"POST", "/mcp", content=_tools_call(2, "burst", {}), headers=base_headers(session_id=session_id)
138+
) as response:
139+
await _read_events(response, 5)
140+
141+
# initialize wrote two rows (its own priming + response); everything after is this call.
142+
call_rows = store._events[2:]
143+
stream_id = call_rows[0][0]
144+
assert [(s, None if m is None else type(m).__name__) for s, m in call_rows] == [
145+
(stream_id, None),
146+
(stream_id, "JSONRPCNotification"),
147+
(stream_id, "JSONRPCNotification"),
148+
(stream_id, "JSONRPCNotification"),
149+
(stream_id, "JSONRPCResponse"),
150+
]
151+
152+
116153
@requirement("hosting:resume:replay")
117154
@requirement("hosting:resume:stream-scoped")
118155
@requirement("hosting:resume:buffered-replay")

tests/shared/test_streamable_http.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,34 +1634,6 @@ async def test_handle_sse_event_skips_empty_data() -> None:
16341634
await read_stream.aclose()
16351635

16361636

1637-
@pytest.mark.anyio
1638-
async def test_priming_event_not_sent_for_old_protocol_version() -> None:
1639-
"""`_mint_priming_event` skips for old protocol versions (backwards compat)."""
1640-
transport = StreamableHTTPServerTransport("/mcp", event_store=SimpleEventStore())
1641-
assert await transport._mint_priming_event("test-request-id", "2025-06-18") is None
1642-
assert await transport._mint_priming_event("test-request-id-2", "2025-11-25") is not None
1643-
1644-
1645-
@pytest.mark.anyio
1646-
async def test_priming_event_not_sent_without_event_store() -> None:
1647-
"""`_mint_priming_event` returns `None` when no event_store is configured."""
1648-
transport = StreamableHTTPServerTransport("/mcp")
1649-
assert await transport._mint_priming_event("test-request-id", "2025-11-25") is None
1650-
1651-
1652-
@pytest.mark.anyio
1653-
async def test_priming_event_includes_retry_interval() -> None:
1654-
"""`_mint_priming_event` includes the retry field when `retry_interval` is set."""
1655-
transport = StreamableHTTPServerTransport(
1656-
"/mcp",
1657-
event_store=SimpleEventStore(),
1658-
retry_interval=5000,
1659-
)
1660-
event = await transport._mint_priming_event("test-request-id", "2025-11-25")
1661-
assert event is not None
1662-
assert event["retry"] == 5000
1663-
1664-
16651637
@pytest.mark.anyio
16661638
async def test_close_sse_stream_callback_not_provided_for_old_protocol_version() -> None:
16671639
"""close_sse_stream callbacks are only provided for protocol versions that support polling."""
@@ -1694,17 +1666,6 @@ async def test_close_sse_stream_callback_not_provided_for_old_protocol_version()
16941666
assert session_msg_new.metadata.close_standalone_sse_stream is not None
16951667

16961668

1697-
@pytest.mark.anyio
1698-
async def test_priming_event_not_sent_for_unknown_protocol_version() -> None:
1699-
"""`_mint_priming_event` treats unrecognized version strings conservatively.
1700-
1701-
A garbage version must not be mistaken for a future one (lexicographically
1702-
"zzz" sorts after every date-shaped revision).
1703-
"""
1704-
transport = StreamableHTTPServerTransport("/mcp", event_store=SimpleEventStore())
1705-
assert await transport._mint_priming_event("test-request-id", "zzz") is None
1706-
1707-
17081669
@pytest.mark.anyio
17091670
async def test_close_sse_stream_callback_not_provided_for_unknown_protocol_version() -> None:
17101671
"""close_sse_stream callbacks are withheld when the client's version is unrecognized."""

0 commit comments

Comments
 (0)