44
55package io .modelcontextprotocol .client .transport ;
66
7- import io .modelcontextprotocol .json .McpJsonMapper ;
8- import io .modelcontextprotocol .json .TypeRef ;
9- import io .modelcontextprotocol .spec .*;
10- import io .modelcontextprotocol .util .Assert ;
11- import io .modelcontextprotocol .util .Utils ;
7+ import java .io .IOException ;
8+ import java .util .List ;
9+ import java .util .Optional ;
10+ import java .util .concurrent .atomic .AtomicReference ;
11+ import java .util .function .Consumer ;
12+ import java .util .function .Function ;
13+
1214import org .reactivestreams .Publisher ;
1315import org .slf4j .Logger ;
1416import org .slf4j .LoggerFactory ;
2022import org .springframework .web .reactive .function .client .ClientResponse ;
2123import org .springframework .web .reactive .function .client .WebClient ;
2224import org .springframework .web .reactive .function .client .WebClientResponseException ;
25+
26+ import io .modelcontextprotocol .json .TypeRef ;
27+ import io .modelcontextprotocol .json .McpJsonMapper ;
28+
29+ import io .modelcontextprotocol .spec .DefaultMcpTransportSession ;
30+ import io .modelcontextprotocol .spec .DefaultMcpTransportStream ;
31+ import io .modelcontextprotocol .spec .HttpHeaders ;
32+ import io .modelcontextprotocol .spec .McpClientTransport ;
33+ import io .modelcontextprotocol .spec .McpError ;
34+ import io .modelcontextprotocol .spec .McpSchema ;
35+ import io .modelcontextprotocol .spec .McpTransportException ;
36+ import io .modelcontextprotocol .spec .McpTransportSession ;
37+ import io .modelcontextprotocol .spec .McpTransportSessionNotFoundException ;
38+ import io .modelcontextprotocol .spec .McpTransportStream ;
39+ import io .modelcontextprotocol .spec .ProtocolVersions ;
40+ import io .modelcontextprotocol .util .Assert ;
41+ import io .modelcontextprotocol .util .Utils ;
2342import reactor .core .Disposable ;
2443import reactor .core .publisher .Flux ;
2544import reactor .core .publisher .Mono ;
2645import reactor .util .function .Tuple2 ;
2746import reactor .util .function .Tuples ;
2847
29- import java .io .IOException ;
30- import java .util .List ;
31- import java .util .Optional ;
32- import java .util .concurrent .atomic .AtomicReference ;
33- import java .util .function .Consumer ;
34- import java .util .function .Function ;
35-
3648/**
3749 * An implementation of the Streamable HTTP protocol as defined by the
3850 * <code>2025-03-26</code> version of the MCP specification.
@@ -94,7 +106,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
94106 private final AtomicReference <Consumer <Throwable >> exceptionHandler = new AtomicReference <>();
95107
96108 private WebClientStreamableHttpTransport (McpJsonMapper jsonMapper , WebClient .Builder webClientBuilder ,
97- String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
109+ String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
98110 this .jsonMapper = jsonMapper ;
99111 this .webClient = webClientBuilder .build ();
100112 this .endpoint = endpoint ;
@@ -134,16 +146,16 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
134146 private DefaultMcpTransportSession createTransportSession () {
135147 Function <String , Publisher <Void >> onClose = sessionId -> sessionId == null ? Mono .empty ()
136148 : webClient .delete ()
137- .uri (this .endpoint )
138- .header (HttpHeaders .MCP_SESSION_ID , sessionId )
139- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
140- .retrieve ()
141- .toBodilessEntity ()
142- .onErrorComplete (e -> {
143- logger .warn ("Got error when closing transport" , e );
144- return true ;
145- })
146- .then ();
149+ .uri (this .endpoint )
150+ .header (HttpHeaders .MCP_SESSION_ID , sessionId )
151+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
152+ .retrieve ()
153+ .toBodilessEntity ()
154+ .onErrorComplete (e -> {
155+ logger .warn ("Got error when closing transport" , e );
156+ return true ;
157+ })
158+ .then ();
147159 return new DefaultMcpTransportSession (onClose );
148160 }
149161
@@ -194,52 +206,52 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
194206 final McpTransportSession <Disposable > transportSession = this .activeSession .get ();
195207
196208 Disposable connection = webClient .get ()
197- .uri (this .endpoint )
198- .accept (MediaType .TEXT_EVENT_STREAM )
199- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
200- .headers (httpHeaders -> {
201- transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
202- if (stream != null ) {
203- stream .lastId ().ifPresent (id -> httpHeaders .add (HttpHeaders .LAST_EVENT_ID , id ));
204- }
205- })
206- .exchangeToFlux (response -> {
207- if (isEventStream (response )) {
208- logger .debug ("Established SSE stream via GET" );
209- return eventStream (stream , response );
210- }
211- else if (isNotAllowed (response )) {
212- logger .debug ("The server does not support SSE streams, using request-response mode." );
213- return Flux .empty ();
214- }
215- else if (isNotFound (response )) {
216- if (transportSession .sessionId ().isPresent ()) {
217- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
218- return mcpSessionNotFoundError (sessionIdRepresentation );
209+ .uri (this .endpoint )
210+ .accept (MediaType .TEXT_EVENT_STREAM )
211+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
212+ .headers (httpHeaders -> {
213+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
214+ if (stream != null ) {
215+ stream .lastId ().ifPresent (id -> httpHeaders .add (HttpHeaders .LAST_EVENT_ID , id ));
216+ }
217+ })
218+ .exchangeToFlux (response -> {
219+ if (isEventStream (response )) {
220+ logger .debug ("Established SSE stream via GET" );
221+ return eventStream (stream , response );
222+ }
223+ else if (isNotAllowed (response )) {
224+ logger .debug ("The server does not support SSE streams, using request-response mode." );
225+ return Flux .empty ();
226+ }
227+ else if (isNotFound (response )) {
228+ if (transportSession .sessionId ().isPresent ()) {
229+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
230+ return mcpSessionNotFoundError (sessionIdRepresentation );
231+ }
232+ else {
233+ return this .extractError (response , MISSING_SESSION_ID );
234+ }
219235 }
220236 else {
221- return this .extractError (response , MISSING_SESSION_ID );
237+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
238+ logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
239+ }).flux ();
222240 }
223- }
224- else {
225- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
226- logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
227- }).flux ();
228- }
229- })
230- .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
231- .onErrorComplete (t -> {
232- this .handleException (t );
233- return true ;
234- })
235- .doFinally (s -> {
236- Disposable ref = disposableRef .getAndSet (null );
237- if (ref != null ) {
238- transportSession .removeConnection (ref );
239- }
240- })
241- .contextWrite (ctx )
242- .subscribe ();
241+ })
242+ .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
243+ .onErrorComplete (t -> {
244+ this .handleException (t );
245+ return true ;
246+ })
247+ .doFinally (s -> {
248+ Disposable ref = disposableRef .getAndSet (null );
249+ if (ref != null ) {
250+ transportSession .removeConnection (ref );
251+ }
252+ })
253+ .contextWrite (ctx )
254+ .subscribe ();
243255
244256 disposableRef .set (connection );
245257 transportSession .addConnection (connection );
@@ -260,83 +272,83 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
260272 final McpTransportSession <Disposable > transportSession = this .activeSession .get ();
261273
262274 Disposable connection = webClient .post ()
263- .uri (this .endpoint )
264- .accept (MediaType .APPLICATION_JSON , MediaType .TEXT_EVENT_STREAM )
265- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
266- .headers (httpHeaders -> {
267- transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
268- })
269- .bodyValue (message )
270- .exchangeToFlux (response -> {
271- String mcpSessionId = response .headers ().asHttpHeaders ().getFirst (HttpHeaders .MCP_SESSION_ID );
272- if (StringUtils .hasText (mcpSessionId ) && transportSession .markInitialized (mcpSessionId )) {
273- // Once we have a session, we try to open an async stream for
274- // the server to send notifications and requests out-of-band.
275- reconnect (null ).contextWrite (sink .contextView ()).subscribe ();
276- }
277-
278- String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
279-
280- // The spec mentions only ACCEPTED, but the existing SDKs can return
281- // 200 OK for notifications
282- if (response .statusCode ().is2xxSuccessful ()) {
283- Optional <MediaType > contentType = response .headers ().contentType ();
284- // Existing SDKs consume notifications with no response body nor
285- // content type
286- if (contentType .isEmpty ()) {
287- logger .trace ("Message was successfully sent via POST for session {}" ,
288- sessionRepresentation );
289- // signal the caller that the message was successfully
290- // delivered
291- sink .success ();
292- // communicate to downstream there is no streamed data coming
293- return Flux .empty ();
275+ .uri (this .endpoint )
276+ .accept (MediaType .APPLICATION_JSON , MediaType .TEXT_EVENT_STREAM )
277+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
278+ .headers (httpHeaders -> {
279+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
280+ })
281+ .bodyValue (message )
282+ .exchangeToFlux (response -> {
283+ String mcpSessionId = response .headers ().asHttpHeaders ().getFirst (HttpHeaders .MCP_SESSION_ID );
284+ if (StringUtils .hasText (mcpSessionId ) && transportSession .markInitialized (mcpSessionId )) {
285+ // Once we have a session, we try to open an async stream for
286+ // the server to send notifications and requests out-of-band.
287+ reconnect (null ).contextWrite (sink .contextView ()).subscribe ();
294288 }
295- else {
296- MediaType mediaType = contentType .get ();
297- if (mediaType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
298- logger .debug ("Established SSE stream via POST" );
299- // communicate to caller that the message was delivered
300- sink .success ();
301- // starting a stream
302- return newEventStream (response , sessionRepresentation );
303- }
304- else if (mediaType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
305- logger .trace ("Received response to POST for session {}" , sessionRepresentation );
306- // communicate to caller the message was delivered
289+
290+ String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
291+
292+ // The spec mentions only ACCEPTED, but the existing SDKs can return
293+ // 200 OK for notifications
294+ if (response .statusCode ().is2xxSuccessful ()) {
295+ Optional <MediaType > contentType = response .headers ().contentType ();
296+ // Existing SDKs consume notifications with no response body nor
297+ // content type
298+ if (contentType .isEmpty ()) {
299+ logger .trace ("Message was successfully sent via POST for session {}" ,
300+ sessionRepresentation );
301+ // signal the caller that the message was successfully
302+ // delivered
307303 sink .success ();
308- return directResponseFlux (message , response );
304+ // communicate to downstream there is no streamed data coming
305+ return Flux .empty ();
309306 }
310307 else {
311- logger .warn ("Unknown media type {} returned for POST in session {}" , contentType ,
312- sessionRepresentation );
313- return Flux .error (new RuntimeException ("Unknown media type returned: " + contentType ));
308+ MediaType mediaType = contentType .get ();
309+ if (mediaType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
310+ logger .debug ("Established SSE stream via POST" );
311+ // communicate to caller that the message was delivered
312+ sink .success ();
313+ // starting a stream
314+ return newEventStream (response , sessionRepresentation );
315+ }
316+ else if (mediaType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
317+ logger .trace ("Received response to POST for session {}" , sessionRepresentation );
318+ // communicate to caller the message was delivered
319+ sink .success ();
320+ return directResponseFlux (message , response );
321+ }
322+ else {
323+ logger .warn ("Unknown media type {} returned for POST in session {}" , contentType ,
324+ sessionRepresentation );
325+ return Flux .error (new RuntimeException ("Unknown media type returned: " + contentType ));
326+ }
314327 }
315328 }
316- }
317- else {
318- if (isNotFound (response ) && !sessionRepresentation .equals (MISSING_SESSION_ID )) {
319- return mcpSessionNotFoundError (sessionRepresentation );
329+ else {
330+ if (isNotFound (response ) && !sessionRepresentation .equals (MISSING_SESSION_ID )) {
331+ return mcpSessionNotFoundError (sessionRepresentation );
332+ }
333+ return this .extractError (response , sessionRepresentation );
320334 }
321- return this .extractError (response , sessionRepresentation );
322- }
323- })
324- .flatMap (jsonRpcMessage -> this .handler .get ().apply (Mono .just (jsonRpcMessage )))
325- .onErrorComplete (t -> {
326- // handle the error first
327- this .handleException (t );
328- // inform the caller of sendMessage
329- sink .error (t );
330- return true ;
331- })
332- .doFinally (s -> {
333- Disposable ref = disposableRef .getAndSet (null );
334- if (ref != null ) {
335- transportSession .removeConnection (ref );
336- }
337- })
338- .contextWrite (sink .contextView ())
339- .subscribe ();
335+ })
336+ .flatMap (jsonRpcMessage -> this .handler .get ().apply (Mono .just (jsonRpcMessage )))
337+ .onErrorComplete (t -> {
338+ // handle the error first
339+ this .handleException (t );
340+ // inform the caller of sendMessage
341+ sink .error (t );
342+ return true ;
343+ })
344+ .doFinally (s -> {
345+ Disposable ref = disposableRef .getAndSet (null );
346+ if (ref != null ) {
347+ transportSession .removeConnection (ref );
348+ }
349+ })
350+ .contextWrite (sink .contextView ())
351+ .subscribe ();
340352 disposableRef .set (connection );
341353 transportSession .addConnection (connection );
342354 });
@@ -407,7 +419,7 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes
407419 }
408420
409421 private Flux <McpSchema .JSONRPCMessage > directResponseFlux (McpSchema .JSONRPCMessage sentMessage ,
410- ClientResponse response ) {
422+ ClientResponse response ) {
411423 return response .bodyToMono (String .class ).<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
412424 try {
413425 if (sentMessage instanceof McpSchema .JSONRPCNotification && Utils .hasText (responseMessage )) {
0 commit comments