Skip to content

Commit 46c0742

Browse files
committed
modern_on_request driver + Client in-process modern path via DirectDispatcher peer-pair
- modern_on_request(server, lifespan_state) returns an OnRequest callback that builds Connection.from_envelope per call and drives serve_one — wire it into the server side of a DirectDispatcher peer-pair for an in-process server on the modern per-request path - Client(Server|MCPServer, mode!=legacy) enters lifespan once, creates a peer-pair, runs the server side with modern_on_request, and hands the client side to ClientSession; legacy in-process keeps InMemoryTransport - Interaction-suite in-memory transport unlocked for 2026-07-28: 71 tests now run on [in-memory-2026-07-28], 67 pass; the 5 streamable-http-only notify-drop xfails are scoped to that transport; 4 progress-notification tests still xfail (peer-pair progress wiring tracked separately)
1 parent 1fbd075 commit 46c0742

5 files changed

Lines changed: 106 additions & 17 deletions

File tree

src/mcp/client/client.py

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
from __future__ import annotations
44

5+
from collections.abc import Mapping
56
from contextlib import AsyncExitStack
67
from dataclasses import KW_ONLY, dataclass, field
78
from typing import Any, Literal
89

10+
import anyio
911
from typing_extensions import deprecated
1012

1113
from mcp import types
@@ -15,6 +17,8 @@
1517
from mcp.client.streamable_http import streamable_http_client
1618
from mcp.server import Server
1719
from mcp.server.mcpserver import MCPServer
20+
from mcp.server.runner import modern_on_request
21+
from mcp.shared.direct_dispatcher import create_direct_dispatcher_pair
1822
from mcp.shared.dispatcher import ProgressFnT
1923
from mcp.shared.exceptions import MCPDeprecationWarning
2024
from mcp.types import (
@@ -48,6 +52,14 @@ def _synthesize_discover(protocol_version: str) -> types.DiscoverResult:
4852
)
4953

5054

55+
async def _drop_notify(_dctx: Any, _method: str, _params: Mapping[str, Any] | None) -> None:
56+
"""Server-side ``OnNotify`` for the modern in-process path: client→server notifications are dropped.
57+
58+
The per-request driver (`serve_one`) has no notification dispatch table; progress and
59+
cancellation travel via `CallOptions` on the `DirectDispatcher`, not as JSON-RPC notifies.
60+
"""
61+
62+
5163
@dataclass
5264
class Client:
5365
"""A high-level MCP client for connecting to MCP servers.
@@ -121,11 +133,14 @@ async def main():
121133

122134
_session: ClientSession | None = field(init=False, default=None)
123135
_exit_stack: AsyncExitStack | None = field(init=False, default=None)
124-
_transport: Transport = field(init=False)
136+
_transport: Transport | None = field(init=False, default=None)
137+
_inproc_server: Server[Any] | None = field(init=False, default=None)
125138

126139
def __post_init__(self) -> None:
127-
if isinstance(self.server, Server | MCPServer):
128-
self._transport = InMemoryTransport(self.server, raise_exceptions=self.raise_exceptions)
140+
if isinstance(self.server, MCPServer):
141+
self._inproc_server = self.server._lowlevel_server # pyright: ignore[reportPrivateUsage]
142+
elif isinstance(self.server, Server):
143+
self._inproc_server = self.server
129144
elif isinstance(self.server, str):
130145
self._transport = streamable_http_client(self.server)
131146
else:
@@ -137,10 +152,34 @@ async def __aenter__(self) -> Client:
137152
raise RuntimeError("Client is already entered; cannot reenter")
138153

139154
async with AsyncExitStack() as exit_stack:
140-
read_stream, write_stream = await exit_stack.enter_async_context(self._transport)
141-
142-
self._session = await exit_stack.enter_async_context(
143-
ClientSession(
155+
if self._inproc_server is not None and self.mode != "legacy":
156+
# Modern in-process path: drive the server through a DirectDispatcher peer-pair
157+
# with one `serve_one` per request — no streams, no initialize handshake.
158+
lifespan_state = await exit_stack.enter_async_context(self._inproc_server.lifespan(self._inproc_server))
159+
client_disp, server_disp = create_direct_dispatcher_pair()
160+
tg = await exit_stack.enter_async_context(anyio.create_task_group())
161+
exit_stack.callback(server_disp.close)
162+
await tg.start(server_disp.run, modern_on_request(self._inproc_server, lifespan_state), _drop_notify)
163+
session = ClientSession(
164+
dispatcher=client_disp,
165+
read_timeout_seconds=self.read_timeout_seconds,
166+
sampling_callback=self.sampling_callback,
167+
list_roots_callback=self.list_roots_callback,
168+
logging_callback=self.logging_callback,
169+
message_handler=self.message_handler,
170+
client_info=self.client_info,
171+
elicitation_callback=self.elicitation_callback,
172+
)
173+
else:
174+
if self._inproc_server is not None:
175+
transport: Transport = InMemoryTransport(
176+
self._inproc_server, raise_exceptions=self.raise_exceptions
177+
)
178+
else:
179+
assert self._transport is not None
180+
transport = self._transport
181+
read_stream, write_stream = await exit_stack.enter_async_context(transport)
182+
session = ClientSession(
144183
read_stream=read_stream,
145184
write_stream=write_stream,
146185
read_timeout_seconds=self.read_timeout_seconds,
@@ -151,7 +190,8 @@ async def __aenter__(self) -> Client:
151190
client_info=self.client_info,
152191
elicitation_callback=self.elicitation_callback,
153192
)
154-
)
193+
194+
self._session = await exit_stack.enter_async_context(session)
155195

156196
if self.mode == "legacy":
157197
await self._session.initialize()

src/mcp/server/runner.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@
3636
from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher, handler_exception_to_error_data
3737
from mcp.shared.message import ServerMessageMetadata, SessionMessage
3838
from mcp.shared.transport_context import TransportContext
39-
from mcp.shared.version import HANDSHAKE_PROTOCOL_VERSIONS
39+
from mcp.shared.version import HANDSHAKE_PROTOCOL_VERSIONS, MODERN_PROTOCOL_VERSIONS
4040
from mcp.types import (
41+
CLIENT_CAPABILITIES_META_KEY,
42+
CLIENT_INFO_META_KEY,
4143
INTERNAL_ERROR,
4244
INVALID_PARAMS,
4345
METHOD_NOT_FOUND,
46+
PROTOCOL_VERSION_META_KEY,
4447
ErrorData,
4548
Implementation,
4649
InitializeRequestParams,
@@ -62,6 +65,7 @@
6265
"ServerMiddleware",
6366
"ServerRunner",
6467
"aclose_shielded",
68+
"modern_on_request",
6569
"otel_middleware",
6670
"serve_connection",
6771
"serve_loop",
@@ -512,3 +516,34 @@ async def serve_one(
512516
return await to_jsonrpc_response(request.id, runner.on_request(dctx, request.method, request.params))
513517
finally:
514518
await aclose_shielded(connection)
519+
520+
521+
def modern_on_request(server: Server[LifespanT], lifespan_state: LifespanT) -> OnRequest:
522+
"""Return an `OnRequest` callback that serves each call via `serve_one` with a fresh per-request `Connection`.
523+
524+
Wire this into the server side of a `DirectDispatcher` peer-pair to drive an
525+
in-process server on the modern per-request-envelope path (each request
526+
carries protocol version, client info, and capabilities in `params._meta`;
527+
no `initialize` handshake).
528+
"""
529+
530+
async def handle(
531+
dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
532+
) -> dict[str, Any]:
533+
meta = (params or {}).get("_meta", {})
534+
connection = Connection.from_envelope(
535+
meta.get(PROTOCOL_VERSION_META_KEY, MODERN_PROTOCOL_VERSIONS[-1]),
536+
meta.get(CLIENT_INFO_META_KEY),
537+
meta.get(CLIENT_CAPABILITIES_META_KEY),
538+
)
539+
# `OnRequest` is invoked for requests only, so `request_id` is always set.
540+
assert dctx.request_id is not None
541+
req = JSONRPCRequest(
542+
jsonrpc="2.0", id=dctx.request_id, method=method, params=dict(params) if params is not None else None
543+
)
544+
msg = await serve_one(server, req, connection=connection, dctx=dctx, lifespan_state=lifespan_state)
545+
if isinstance(msg, JSONRPCError):
546+
raise MCPError(code=msg.error.code, message=msg.error.message, data=msg.error.data)
547+
return msg.result
548+
549+
return handle

tests/interaction/_connect.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,15 @@ async def connect_in_memory(
8888
elicitation_callback: ElicitationFnT | None = None,
8989
spec_version: str = LATEST_PROTOCOL_VERSION,
9090
) -> AsyncIterator[Client]:
91-
"""Yield a Client connected to the server over the in-memory transport."""
91+
"""Yield a Client connected to the server over the in-memory transport.
92+
93+
When `spec_version` is a modern (2026-07-28+) revision the Client is opened with
94+
`mode=<version>`, which drives the server through the DirectDispatcher peer-pair
95+
(per-request `serve_one`, no initialize handshake) instead of the legacy stream pair.
96+
"""
9297
async with Client(
9398
server,
99+
mode=spec_version if spec_version in MODERN_PROTOCOL_VERSIONS else "legacy",
94100
read_timeout_seconds=read_timeout_seconds,
95101
sampling_callback=sampling_callback,
96102
list_roots_callback=list_roots_callback,

tests/interaction/_requirements.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@
6363

6464
TRANSPORT_SPEC_VERSIONS: dict[Transport, tuple[SpecVersion, ...]] = {
6565
"sse": ("2025-11-25",),
66-
# Temporary lock: the in-memory transport has no modern entry point yet, so it cannot
67-
# negotiate the newer revision. Remove once an in-memory factory for the modern path lands.
68-
"in-memory": ("2025-11-25",),
66+
"in-memory": ("2025-11-25", "2026-07-28"),
6967
# At the newer revision the protocol-version header check runs before the stateless branch is
7068
# taken, so a stateless connection at that revision behaves identically to the stateful one.
7169
# Locked to avoid a redundant matrix column; revisit if the header/stateless ordering changes.
@@ -745,7 +743,9 @@ def __post_init__(self) -> None:
745743
"Log notifications emitted by a tool handler during execution reach the client's logging "
746744
"callback before the tool result returns."
747745
),
748-
known_failures=(KnownFailure(spec_version="2026-07-28", note=_MODERN_NOTIFY_DROP, issue=None),),
746+
known_failures=(
747+
KnownFailure(spec_version="2026-07-28", transport="streamable-http", note=_MODERN_NOTIFY_DROP, issue=None),
748+
),
749749
),
750750
"tools:call:progress": Requirement(
751751
source=f"{SPEC_BASE_URL}/basic/utilities/progress#progress-flow",
@@ -974,7 +974,9 @@ def __post_init__(self) -> None:
974974
"The Context logging helpers (debug/info/warning/error) send log message notifications at the "
975975
"corresponding severity."
976976
),
977-
known_failures=(KnownFailure(spec_version="2026-07-28", note=_MODERN_NOTIFY_DROP, issue=None),),
977+
known_failures=(
978+
KnownFailure(spec_version="2026-07-28", transport="streamable-http", note=_MODERN_NOTIFY_DROP, issue=None),
979+
),
978980
),
979981
"mcpserver:context:progress": Requirement(
980982
source="sdk",
@@ -1339,15 +1341,19 @@ def __post_init__(self) -> None:
13391341
"logging:message:all-levels": Requirement(
13401342
source=f"{SPEC_BASE_URL}/server/utilities/logging#log-levels",
13411343
behavior="All eight RFC 5424 severity levels are deliverable as log message notifications.",
1342-
known_failures=(KnownFailure(spec_version="2026-07-28", note=_MODERN_NOTIFY_DROP, issue=None),),
1344+
known_failures=(
1345+
KnownFailure(spec_version="2026-07-28", transport="streamable-http", note=_MODERN_NOTIFY_DROP, issue=None),
1346+
),
13431347
),
13441348
"logging:message:fields": Requirement(
13451349
source=f"{SPEC_BASE_URL}/server/utilities/logging#log-message-notifications",
13461350
behavior=(
13471351
"A log message sent by a server handler is delivered to the client's logging callback with its "
13481352
"severity level, logger name, and data."
13491353
),
1350-
known_failures=(KnownFailure(spec_version="2026-07-28", note=_MODERN_NOTIFY_DROP, issue=None),),
1354+
known_failures=(
1355+
KnownFailure(spec_version="2026-07-28", transport="streamable-http", note=_MODERN_NOTIFY_DROP, issue=None),
1356+
),
13511357
),
13521358
"logging:message:filtered": Requirement(
13531359
source=f"{SPEC_BASE_URL}/server/utilities/logging#setting-log-level",
@@ -1970,6 +1976,7 @@ def __post_init__(self) -> None:
19701976
known_failures=(
19711977
KnownFailure(
19721978
spec_version="2026-07-28",
1979+
transport="streamable-http",
19731980
note=(
19741981
"List-mutation assertions hold; only the sentinel ctx.info() never reaches the client. "
19751982
+ _MODERN_NOTIFY_DROP

tests/interaction/test_coverage.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ def test_compute_cells_drops_era_locked_transport_outside_its_versions() -> None
301301
"sse-2025-11-25",
302302
"streamable-http-2025-11-25",
303303
"streamable-http-stateless-2025-11-25",
304+
"in-memory-2026-07-28",
304305
"streamable-http-2026-07-28",
305306
]
306307

0 commit comments

Comments
 (0)