Skip to content

Commit 3c7864a

Browse files
committed
S2 wave 4: modern entry rewrite, Server.run wrapper, test migrations
Modern HTTP entry (_streamable_http_modern.py): - handle_modern_request: classify_inbound_request -> on rejection write JSONRPCError at ERROR_CODE_HTTP_STATUS-mapped status; on route -> Connection.from_envelope -> serve_one - All status codes via ERROR_CODE_HTTP_STATUS table (PARSE_ERROR added: 400) - id:null preserved on parse-error responses (exclude_none would drop it) - SingleExchangeDispatcher deleted (-73 lines); duplicate shielded-teardown deleted; both reportPrivateUsage ignores deleted; per-request lifespan entry deleted StreamableHTTPSessionManager: - run() enters lifespan once at startup; lifespan_state stored on self - _handle_stateless_request rewired: builds JSONRPCDispatcher (can_send_request=False so the per-request channel raises NoBackChannelError for server->client requests, allows notifications) + Connection.from_envelope(header-seeded version) -> serve_connection. Bypasses Server.run(); stateless kwarg removed from both manager call sites. lowlevel.Server.run(): - Body rewritten: enter lifespan -> JSONRPCDispatcher -> Connection.for_loop -> serve_connection. stateless kwarg deleted (no callers). Net -4 LoC. Test migrations: - test_runner.py: connected_runner uses factories; resolver tests deleted; per-request session tests; serve_one/serve_connection/to_jsonrpc_response/aclose_shielded coverage - test_connection.py: factory tests; check_capability via from_envelope - test_session.py: StubOutbound; two-channel selector tests (4 new) - test_stateless_mode.py: rewritten; 8x pytest.raises(NoBackChannelError) via _NoChannelOutbound - test_streamable_http_modern.py: classifier-path tests; SingleExchangeDispatcher tests deleted - test_lowlevel_exception_handling.py: stateless=True kwarg removed - test_streamable_http.py:1371: protocol_version assertion updated (non-Optional) Part of #2891.
1 parent 2693424 commit 3c7864a

14 files changed

Lines changed: 688 additions & 589 deletions

src/mcp/server/_streamable_http_modern.py

Lines changed: 80 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,32 @@
1212

1313
from __future__ import annotations
1414

15+
import json
1516
import logging
1617
from collections.abc import Mapping
1718
from dataclasses import dataclass, field
18-
from typing import TYPE_CHECKING, Any
19+
from typing import TYPE_CHECKING, Any, TypeVar
1920

2021
import anyio
21-
import anyio.abc
22-
from pydantic import ValidationError
22+
from pydantic import BaseModel, ValidationError
2323
from starlette.requests import Request
2424
from starlette.responses import Response
2525
from starlette.types import Receive, Scope, Send
2626

27-
from mcp.server.runner import (
28-
_EXIT_STACK_CLOSE_TIMEOUT, # type: ignore[reportPrivateUsage]
29-
ServerRunner,
30-
otel_middleware,
31-
)
27+
from mcp.server.connection import Connection
28+
from mcp.server.runner import serve_one
3229
from mcp.server.transport_security import TransportSecurityMiddleware, TransportSecuritySettings
33-
from mcp.shared.dispatcher import CallOptions, OnNotify, OnRequest
34-
from mcp.shared.exceptions import MCPError, NoBackChannelError
30+
from mcp.shared.dispatcher import CallOptions
31+
from mcp.shared.exceptions import NoBackChannelError
32+
from mcp.shared.inbound import ERROR_CODE_HTTP_STATUS, InboundLadderRejection, classify_inbound_request
3533
from mcp.shared.message import MessageMetadata, ServerMessageMetadata
3634
from mcp.shared.transport_context import TransportContext
3735
from mcp.types import (
38-
INTERNAL_ERROR,
39-
INVALID_PARAMS,
36+
METHOD_NOT_FOUND,
4037
PARSE_ERROR,
38+
ClientCapabilities,
4139
ErrorData,
40+
Implementation,
4241
JSONRPCError,
4342
JSONRPCRequest,
4443
JSONRPCResponse,
@@ -50,6 +49,10 @@
5049

5150
logger = logging.getLogger(__name__)
5251

52+
_ModelT = TypeVar("_ModelT", bound=BaseModel)
53+
54+
_OK_STATUS = 200
55+
5356

5457
@dataclass
5558
class _SingleExchangeDispatchContext:
@@ -75,100 +78,65 @@ async def send_raw_request(
7578
raise NoBackChannelError(method)
7679

7780
async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
81+
# TODO(D-005a): buffer and stream as SSE once the JSON-vs-SSE response mode lands.
7882
return None
7983

8084
async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
81-
# TODO: no progressToken plumbing yet.
85+
# TODO(D-005a): no progressToken plumbing yet; ships with the SSE response mode.
8286
return None
8387

8488

85-
class SingleExchangeDispatcher:
86-
"""Dispatcher for exactly one inbound JSON-RPC request over a single HTTP POST.
89+
def _typed(model: type[_ModelT], raw: Any) -> _ModelT | None:
90+
"""Validate the classifier's raw envelope value into a typed model.
8791
88-
The exception->wire boundary lives here (mirrors `JSONRPCDispatcher`'s
89-
role). Implements the `Dispatcher` Protocol so `ServerRunner` /
90-
`Connection` / `ServerSession` accept it; `run()` is never driven.
92+
The classifier checks presence only; a value that fails shape validation
93+
is treated as not supplied so the request still routes.
9194
"""
92-
93-
def __init__(self, request: Request) -> None:
94-
self._request = request
95-
self._tctx = TransportContext(
96-
kind="streamable-http",
97-
can_send_request=False,
98-
headers=request.headers,
99-
)
100-
101-
async def send_raw_request(
102-
self,
103-
method: str,
104-
params: Mapping[str, Any] | None,
105-
opts: CallOptions | None = None,
106-
*,
107-
_related_request_id: RequestId | None = None,
108-
) -> dict[str, Any]:
109-
raise NoBackChannelError(method)
110-
111-
async def notify(
112-
self,
113-
method: str,
114-
params: Mapping[str, Any] | None,
115-
*,
116-
_related_request_id: RequestId | None = None,
117-
) -> None:
118-
# TODO: buffer and stream as SSE once the response-mode design lands.
95+
if raw is None:
96+
return None
97+
try:
98+
return model.model_validate(raw, by_name=False)
99+
except ValidationError:
119100
return None
120101

121-
async def run(
122-
self,
123-
on_request: OnRequest,
124-
on_notify: OnNotify,
125-
*,
126-
task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
127-
) -> None:
128-
raise RuntimeError("SingleExchangeDispatcher.run() is never driven; use handle()")
129-
130-
async def handle(self, req: JSONRPCRequest, on_request: OnRequest) -> JSONRPCResponse | JSONRPCError:
131-
"""Dispatch one request and map any exception to a `JSONRPCError`."""
132-
dctx = _SingleExchangeDispatchContext(
133-
transport=self._tctx,
134-
request_id=req.id,
135-
message_metadata=ServerMessageMetadata(request_context=self._request),
136-
)
137-
try:
138-
result = await on_request(dctx, req.method, req.params)
139-
return JSONRPCResponse(jsonrpc="2.0", id=req.id, result=result)
140-
except MCPError as e:
141-
return JSONRPCError(jsonrpc="2.0", id=req.id, error=e.error)
142-
except ValidationError:
143-
return JSONRPCError(
144-
jsonrpc="2.0",
145-
id=req.id,
146-
error=ErrorData(code=INVALID_PARAMS, message="Invalid request parameters", data=""),
147-
)
148-
# TODO: consolidate the three exception->ErrorData copies once the
149-
# code=0 compat pin in JSONRPCDispatcher is lifted.
150-
except Exception:
151-
logger.exception("handler for %r raised", req.method)
152-
return JSONRPCError(
153-
jsonrpc="2.0",
154-
id=req.id,
155-
error=ErrorData(code=INTERNAL_ERROR, message="Internal server error"),
156-
)
102+
103+
async def _write(
104+
msg: JSONRPCResponse | JSONRPCError,
105+
scope: Scope,
106+
receive: Receive,
107+
send: Send,
108+
*,
109+
extra_headers: Mapping[str, str] | None = None,
110+
) -> None:
111+
"""Serialise a JSON-RPC reply with the table-mapped HTTP status."""
112+
status = ERROR_CODE_HTTP_STATUS.get(msg.error.code, _OK_STATUS) if isinstance(msg, JSONRPCError) else _OK_STATUS
113+
body = msg.model_dump(mode="json", by_alias=True, exclude_none=True)
114+
if isinstance(msg, JSONRPCError) and msg.id is None:
115+
# JSON-RPC requires `id: null` to appear on the wire when the request
116+
# id couldn't be parsed; `exclude_none` would otherwise drop it.
117+
body["id"] = None
118+
await Response(
119+
json.dumps(body, separators=(",", ":")),
120+
status_code=status,
121+
media_type="application/json",
122+
headers=dict(extra_headers) if extra_headers else None,
123+
)(scope, receive, send)
157124

158125

159126
async def handle_modern_request(
160127
app: Server[Any],
161128
security_settings: TransportSecuritySettings | None,
162-
protocol_version: str,
129+
lifespan_state: Any,
163130
scope: Scope,
164131
receive: Receive,
165132
send: Send,
166133
) -> None:
167134
"""ASGI handler for a single stateless-era POST.
168135
169136
Called from `StreamableHTTPSessionManager.handle_request` when the
170-
`MCP-Protocol-Version` header is in `MODERN_PROTOCOL_VERSIONS`; the header
171-
value is passed as `protocol_version`. Never sets `Mcp-Session-Id`.
137+
`MCP-Protocol-Version` header names a modern revision; the manager enters
138+
`app.lifespan` once at startup and passes the state in. Never sets
139+
`Mcp-Session-Id`.
172140
"""
173141
request = Request(scope, receive)
174142

@@ -178,54 +146,42 @@ async def handle_modern_request(
178146
await err(scope, receive, send)
179147
return
180148

181-
# TODO: validate Accept header once the JSON-vs-SSE response-mode design is settled.
149+
# TODO(D-005a): validate Accept once the JSON-vs-SSE response mode is settled.
182150

183151
if request.method != "POST":
184-
# TODO: GET/DELETE rejection (405 + -32601) lands with the validation ladder.
185-
await Response(status_code=405, headers={"Allow": "POST"})(scope, receive, send)
152+
rej = JSONRPCError(
153+
jsonrpc="2.0",
154+
id=None,
155+
error=ErrorData(code=METHOD_NOT_FOUND, message=f"HTTP {request.method} not supported on this endpoint"),
156+
)
157+
await _write(rej, scope, receive, send, extra_headers={"Allow": "POST"})
186158
return
187159

188160
body = await request.body()
189161
try:
190162
req = JSONRPCRequest.model_validate_json(body)
191163
except ValidationError:
192-
msg = JSONRPCError(jsonrpc="2.0", id=None, error=ErrorData(code=PARSE_ERROR, message="Parse error"))
193-
await Response(
194-
msg.model_dump_json(by_alias=True),
195-
status_code=400,
196-
media_type="application/json",
197-
)(scope, receive, send)
164+
rej = JSONRPCError(jsonrpc="2.0", id=None, error=ErrorData(code=PARSE_ERROR, message="Parse error"))
165+
await _write(rej, scope, receive, send)
198166
return
199167

200-
dispatcher = SingleExchangeDispatcher(request)
201-
# TODO: per-request lifespan re-entry matches stateless_http=True today; revisit in #2893.
202-
async with app.lifespan(app) as lifespan_state:
203-
runner = ServerRunner(
204-
server=app,
205-
dispatcher=dispatcher,
206-
lifespan_state=lifespan_state,
207-
has_standalone_channel=False,
208-
stateless=True,
209-
dispatch_middleware=[otel_middleware],
168+
verdict = classify_inbound_request({"method": req.method, "params": req.params}, headers=dict(request.headers))
169+
if isinstance(verdict, InboundLadderRejection):
170+
rej = JSONRPCError(
171+
jsonrpc="2.0", id=req.id, error=ErrorData(code=verdict.code, message=verdict.message, data=verdict.data)
210172
)
211-
runner.connection.protocol_version = protocol_version
212-
try:
213-
msg = await dispatcher.handle(req, runner._compose_on_request()) # type: ignore[reportPrivateUsage]
214-
finally:
215-
with anyio.move_on_after(_EXIT_STACK_CLOSE_TIMEOUT, shield=True) as cancel_scope:
216-
try:
217-
await runner.connection.exit_stack.aclose()
218-
except Exception:
219-
logger.exception("connection exit_stack cleanup raised")
220-
if cancel_scope.cancelled_caught:
221-
logger.warning(
222-
"connection exit_stack cleanup exceeded %s seconds; abandoning remaining callbacks",
223-
_EXIT_STACK_CLOSE_TIMEOUT,
224-
)
225-
226-
# TODO: error.code -> HTTP status mapping is a follow-up; 200 for all JSONRPCError bodies for now.
227-
await Response(
228-
msg.model_dump_json(by_alias=True, exclude_none=True),
229-
status_code=200,
230-
media_type="application/json",
231-
)(scope, receive, send)
173+
await _write(rej, scope, receive, send)
174+
return
175+
176+
connection = Connection.from_envelope(
177+
verdict.protocol_version,
178+
_typed(Implementation, verdict.client_info),
179+
_typed(ClientCapabilities, verdict.client_capabilities),
180+
)
181+
dctx = _SingleExchangeDispatchContext(
182+
transport=TransportContext(kind="streamable-http", can_send_request=False, headers=request.headers),
183+
request_id=req.id,
184+
message_metadata=ServerMessageMetadata(request_context=request),
185+
)
186+
msg = await serve_one(app, req, connection=connection, dctx=dctx, lifespan_state=lifespan_state)
187+
await _write(msg, scope, receive, send)

src/mcp/server/lowlevel/server.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ async def main():
5656
from mcp.server.auth.provider import OAuthAuthorizationServerProvider, TokenVerifier
5757
from mcp.server.auth.routes import build_resource_metadata_url, create_auth_routes, create_protected_resource_routes
5858
from mcp.server.auth.settings import AuthSettings
59+
from mcp.server.connection import Connection
5960
from mcp.server.context import HandlerResult, ServerMiddleware, ServerRequestContext
6061
from mcp.server.models import InitializationOptions
61-
from mcp.server.runner import ServerRunner, otel_middleware
62+
from mcp.server.runner import serve_connection
6263
from mcp.server.streamable_http import EventStore
6364
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
6465
from mcp.server.transport_security import TransportSecuritySettings
@@ -438,12 +439,13 @@ async def run(
438439
# but also make tracing exceptions much easier during testing and when using
439440
# in-process servers.
440441
raise_exceptions: bool = False,
441-
# When True, the server is stateless and
442-
# clients can perform initialization with any node. The client must still follow
443-
# the initialization lifecycle, but can do so with any available node
444-
# rather than requiring initialization for each connection.
445-
stateless: bool = False,
446442
) -> None:
443+
"""Serve a single connection over the given streams until the read side closes.
444+
445+
Thin wrapper over `serve_connection` (L28): enters the server lifespan,
446+
builds a `JSONRPCDispatcher` over the streams and a loop-mode
447+
`Connection` on it, then drives the dispatcher to completion.
448+
"""
447449
async with self.lifespan(self) as lifespan_context:
448450
dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(
449451
read_stream,
@@ -454,20 +456,14 @@ async def run(
454456
# the initialized state instead of failing the init-gate.
455457
inline_methods=frozenset({"initialize"}),
456458
)
457-
runner = ServerRunner(
458-
server=self,
459-
dispatcher=dispatcher,
459+
connection = Connection.for_loop(dispatcher)
460+
await serve_connection(
461+
self,
462+
dispatcher,
463+
connection=connection,
460464
lifespan_state=lifespan_context,
461465
init_options=initialization_options,
462-
# Stateless HTTP has no standalone GET stream, so server-initiated
463-
# requests on `runner.connection` must fail fast with
464-
# `NoBackChannelError` rather than write to a channel that will
465-
# never deliver a response.
466-
has_standalone_channel=not stateless,
467-
stateless=stateless,
468-
dispatch_middleware=[otel_middleware],
469466
)
470-
await runner.run()
471467

472468
def streamable_http_app(
473469
self,

0 commit comments

Comments
 (0)