|
17 | 17 | from mcp.types import JSONRPCMessage, JSONRPCResponse |
18 | 18 |
|
19 | 19 |
|
20 | | -class _OrderTrackingStore(EventStore): |
21 | | - def __init__(self) -> None: |
22 | | - self.stored: list[tuple[StreamId, JSONRPCMessage | None]] = [] |
23 | | - |
24 | | - async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId: |
25 | | - self.stored.append((stream_id, message)) |
26 | | - return str(len(self.stored)) |
27 | | - |
28 | | - async def replay_events_after(self, last_event_id: EventId, send_callback: EventCallback) -> StreamId | None: |
29 | | - raise NotImplementedError |
30 | | - |
31 | | - |
32 | 20 | class _PrimingFailingStore(EventStore): |
33 | 21 | async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId: |
34 | 22 | raise RuntimeError("backend unavailable") |
@@ -76,40 +64,6 @@ async def server_writes() -> None: |
76 | 64 | await a_send.aclose() |
77 | 65 |
|
78 | 66 |
|
79 | | -@pytest.mark.anyio |
80 | | -async def test_priming_event_is_stored_before_any_routed_message() -> None: |
81 | | - """`_mint_priming_event` is awaited before the request is dispatched, so the |
82 | | - priming row precedes every `message_router` store for that stream regardless |
83 | | - of when `sse_writer` is scheduled. |
84 | | - """ |
85 | | - store = _OrderTrackingStore() |
86 | | - transport = StreamableHTTPServerTransport(mcp_session_id="sid", is_json_response_enabled=False, event_store=store) |
87 | | - streams = transport._request_streams |
88 | | - |
89 | | - async with transport.connect() as (_read_stream, write_stream): |
90 | | - # POST handler step: mint priming for "A" before dispatch. |
91 | | - priming = await transport._mint_priming_event("A", "2025-11-25") |
92 | | - assert priming is not None |
93 | | - streams["A"] = anyio.create_memory_object_stream[EventMessage](REQUEST_STREAM_BUFFER_SIZE) |
94 | | - a_send, a_recv = streams["A"] |
95 | | - |
96 | | - # Server emits 5 messages for "A" with no sse_writer scheduled. Each |
97 | | - # write_stream.send() rendezvous-hands to message_router, which stores |
98 | | - # then deposits into A's buffer; reading them back proves the router |
99 | | - # has finished storing. |
100 | | - for i in range(5): |
101 | | - await write_stream.send(SessionMessage(JSONRPCResponse(jsonrpc="2.0", id="A", result={"n": i}))) |
102 | | - with anyio.fail_after(5): |
103 | | - for _ in range(5): |
104 | | - await a_recv.receive() |
105 | | - await a_recv.aclose() |
106 | | - await a_send.aclose() |
107 | | - |
108 | | - assert store.stored[0] == ("A", None) |
109 | | - assert [sid for sid, _ in store.stored] == ["A"] * 6 |
110 | | - assert all(msg is not None for _, msg in store.stored[1:]) |
111 | | - |
112 | | - |
113 | 67 | @pytest.mark.anyio |
114 | 68 | async def test_priming_store_failure_leaves_no_per_request_state() -> None: |
115 | 69 | """`EventStore.store_event` raising on the priming row must not leak per-request entries.""" |
|
0 commit comments