@@ -106,7 +106,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
106106 private final AtomicReference <Consumer <Throwable >> exceptionHandler = new AtomicReference <>();
107107
108108 private WebClientStreamableHttpTransport (McpJsonMapper jsonMapper , WebClient .Builder webClientBuilder ,
109- String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
109+ String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
110110 this .jsonMapper = jsonMapper ;
111111 this .webClient = webClientBuilder .build ();
112112 this .endpoint = endpoint ;
@@ -146,16 +146,16 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
146146 private DefaultMcpTransportSession createTransportSession () {
147147 Function <String , Publisher <Void >> onClose = sessionId -> sessionId == null ? Mono .empty ()
148148 : webClient .delete ()
149- .uri (this .endpoint )
150- .header (HttpHeaders .MCP_SESSION_ID , sessionId )
151- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
152- .retrieve ()
153- .toBodilessEntity ()
154- .onErrorComplete (e -> {
155- logger .warn ("Got error when closing transport" , e );
156- return true ;
157- })
158- .then ();
149+ .uri (this .endpoint )
150+ .header (HttpHeaders .MCP_SESSION_ID , sessionId )
151+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
152+ .retrieve ()
153+ .toBodilessEntity ()
154+ .onErrorComplete (e -> {
155+ logger .warn ("Got error when closing transport" , e );
156+ return true ;
157+ })
158+ .then ();
159159 return new DefaultMcpTransportSession (onClose );
160160 }
161161
@@ -206,52 +206,52 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
206206 final McpTransportSession <Disposable > transportSession = this .activeSession .get ();
207207
208208 Disposable connection = webClient .get ()
209- .uri (this .endpoint )
210- .accept (MediaType .TEXT_EVENT_STREAM )
211- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
212- .headers (httpHeaders -> {
213- transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
214- if (stream != null ) {
215- stream .lastId ().ifPresent (id -> httpHeaders .add (HttpHeaders .LAST_EVENT_ID , id ));
216- }
217- })
218- .exchangeToFlux (response -> {
219- if (isEventStream (response )) {
220- logger .debug ("Established SSE stream via GET" );
221- return eventStream (stream , response );
222- }
223- else if (isNotAllowed (response )) {
224- logger .debug ("The server does not support SSE streams, using request-response mode." );
225- return Flux .empty ();
226- }
227- else if (isNotFound (response )) {
228- if (transportSession .sessionId ().isPresent ()) {
229- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
230- return mcpSessionNotFoundError (sessionIdRepresentation );
231- }
232- else {
233- return this .extractError (response , MISSING_SESSION_ID );
234- }
209+ .uri (this .endpoint )
210+ .accept (MediaType .TEXT_EVENT_STREAM )
211+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
212+ .headers (httpHeaders -> {
213+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
214+ if (stream != null ) {
215+ stream .lastId ().ifPresent (id -> httpHeaders .add (HttpHeaders .LAST_EVENT_ID , id ));
216+ }
217+ })
218+ .exchangeToFlux (response -> {
219+ if (isEventStream (response )) {
220+ logger .debug ("Established SSE stream via GET" );
221+ return eventStream (stream , response );
222+ }
223+ else if (isNotAllowed (response )) {
224+ logger .debug ("The server does not support SSE streams, using request-response mode." );
225+ return Flux .empty ();
226+ }
227+ else if (isNotFound (response )) {
228+ if (transportSession .sessionId ().isPresent ()) {
229+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
230+ return mcpSessionNotFoundError (sessionIdRepresentation );
235231 }
236232 else {
237- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
238- logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
239- }).flux ();
233+ return this .extractError (response , MISSING_SESSION_ID );
240234 }
241- })
242- .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
243- .onErrorComplete (t -> {
244- this .handleException (t );
245- return true ;
246- })
247- .doFinally (s -> {
248- Disposable ref = disposableRef .getAndSet (null );
249- if (ref != null ) {
250- transportSession .removeConnection (ref );
251- }
252- })
253- .contextWrite (ctx )
254- .subscribe ();
235+ }
236+ else {
237+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
238+ logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
239+ }).flux ();
240+ }
241+ })
242+ .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
243+ .onErrorComplete (t -> {
244+ this .handleException (t );
245+ return true ;
246+ })
247+ .doFinally (s -> {
248+ Disposable ref = disposableRef .getAndSet (null );
249+ if (ref != null ) {
250+ transportSession .removeConnection (ref );
251+ }
252+ })
253+ .contextWrite (ctx )
254+ .subscribe ();
255255
256256 disposableRef .set (connection );
257257 transportSession .addConnection (connection );
@@ -272,83 +272,83 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
272272 final McpTransportSession <Disposable > transportSession = this .activeSession .get ();
273273
274274 Disposable connection = webClient .post ()
275- .uri (this .endpoint )
276- .accept (MediaType .APPLICATION_JSON , MediaType .TEXT_EVENT_STREAM )
277- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
278- .headers (httpHeaders -> {
279- transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
280- })
281- .bodyValue (message )
282- .exchangeToFlux (response -> {
283- String mcpSessionId = response .headers ().asHttpHeaders ().getFirst (HttpHeaders .MCP_SESSION_ID );
284- if (StringUtils .hasText (mcpSessionId ) && transportSession .markInitialized (mcpSessionId )) {
285- // Once we have a session, we try to open an async stream for
286- // the server to send notifications and requests out-of-band.
287- reconnect (null ).contextWrite (sink .contextView ()).subscribe ();
275+ .uri (this .endpoint )
276+ .accept (MediaType .APPLICATION_JSON , MediaType .TEXT_EVENT_STREAM )
277+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
278+ .headers (httpHeaders -> {
279+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
280+ })
281+ .bodyValue (message )
282+ .exchangeToFlux (response -> {
283+ String mcpSessionId = response .headers ().asHttpHeaders ().getFirst (HttpHeaders .MCP_SESSION_ID );
284+ if (StringUtils .hasText (mcpSessionId ) && transportSession .markInitialized (mcpSessionId )) {
285+ // Once we have a session, we try to open an async stream for
286+ // the server to send notifications and requests out-of-band.
287+ reconnect (null ).contextWrite (sink .contextView ()).subscribe ();
288+ }
289+
290+ String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
291+
292+ // The spec mentions only ACCEPTED, but the existing SDKs can return
293+ // 200 OK for notifications
294+ if (response .statusCode ().is2xxSuccessful ()) {
295+ Optional <MediaType > contentType = response .headers ().contentType ();
296+ // Existing SDKs consume notifications with no response body nor
297+ // content type
298+ if (contentType .isEmpty ()) {
299+ logger .trace ("Message was successfully sent via POST for session {}" ,
300+ sessionRepresentation );
301+ // signal the caller that the message was successfully
302+ // delivered
303+ sink .success ();
304+ // communicate to downstream there is no streamed data coming
305+ return Flux .empty ();
288306 }
289-
290- String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
291-
292- // The spec mentions only ACCEPTED, but the existing SDKs can return
293- // 200 OK for notifications
294- if (response .statusCode ().is2xxSuccessful ()) {
295- Optional <MediaType > contentType = response .headers ().contentType ();
296- // Existing SDKs consume notifications with no response body nor
297- // content type
298- if (contentType .isEmpty ()) {
299- logger .trace ("Message was successfully sent via POST for session {}" ,
300- sessionRepresentation );
301- // signal the caller that the message was successfully
302- // delivered
307+ else {
308+ MediaType mediaType = contentType .get ();
309+ if (mediaType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
310+ logger .debug ("Established SSE stream via POST" );
311+ // communicate to caller that the message was delivered
303312 sink .success ();
304- // communicate to downstream there is no streamed data coming
305- return Flux . empty ( );
313+ // starting a stream
314+ return newEventStream ( response , sessionRepresentation );
306315 }
307- else {
308- MediaType mediaType = contentType .get ();
309- if (mediaType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
310- logger .debug ("Established SSE stream via POST" );
311- // communicate to caller that the message was delivered
312- sink .success ();
313- // starting a stream
314- return newEventStream (response , sessionRepresentation );
315- }
316- else if (mediaType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
317- logger .trace ("Received response to POST for session {}" , sessionRepresentation );
318- // communicate to caller the message was delivered
319- sink .success ();
320- return directResponseFlux (message , response );
321- }
322- else {
323- logger .warn ("Unknown media type {} returned for POST in session {}" , contentType ,
324- sessionRepresentation );
325- return Flux .error (new RuntimeException ("Unknown media type returned: " + contentType ));
326- }
316+ else if (mediaType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
317+ logger .trace ("Received response to POST for session {}" , sessionRepresentation );
318+ // communicate to caller the message was delivered
319+ sink .success ();
320+ return directResponseFlux (message , response );
327321 }
328- }
329- else {
330- if ( isNotFound ( response ) && ! sessionRepresentation . equals ( MISSING_SESSION_ID )) {
331- return mcpSessionNotFoundError ( sessionRepresentation );
322+ else {
323+ logger . warn ( "Unknown media type {} returned for POST in session {}" , contentType ,
324+ sessionRepresentation );
325+ return Flux . error ( new RuntimeException ( "Unknown media type returned: " + contentType ) );
332326 }
333- return this .extractError (response , sessionRepresentation );
334327 }
335- })
336- .flatMap (jsonRpcMessage -> this .handler .get ().apply (Mono .just (jsonRpcMessage )))
337- .onErrorComplete (t -> {
338- // handle the error first
339- this .handleException (t );
340- // inform the caller of sendMessage
341- sink .error (t );
342- return true ;
343- })
344- .doFinally (s -> {
345- Disposable ref = disposableRef .getAndSet (null );
346- if (ref != null ) {
347- transportSession .removeConnection (ref );
328+ }
329+ else {
330+ if (isNotFound (response ) && !sessionRepresentation .equals (MISSING_SESSION_ID )) {
331+ return mcpSessionNotFoundError (sessionRepresentation );
348332 }
349- })
350- .contextWrite (sink .contextView ())
351- .subscribe ();
333+ return this .extractError (response , sessionRepresentation );
334+ }
335+ })
336+ .flatMap (jsonRpcMessage -> this .handler .get ().apply (Mono .just (jsonRpcMessage )))
337+ .onErrorComplete (t -> {
338+ // handle the error first
339+ this .handleException (t );
340+ // inform the caller of sendMessage
341+ sink .error (t );
342+ return true ;
343+ })
344+ .doFinally (s -> {
345+ Disposable ref = disposableRef .getAndSet (null );
346+ if (ref != null ) {
347+ transportSession .removeConnection (ref );
348+ }
349+ })
350+ .contextWrite (sink .contextView ())
351+ .subscribe ();
352352 disposableRef .set (connection );
353353 transportSession .addConnection (connection );
354354 });
@@ -419,7 +419,7 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes
419419 }
420420
421421 private Flux <McpSchema .JSONRPCMessage > directResponseFlux (McpSchema .JSONRPCMessage sentMessage ,
422- ClientResponse response ) {
422+ ClientResponse response ) {
423423 return response .bodyToMono (String .class ).<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
424424 try {
425425 if (sentMessage instanceof McpSchema .JSONRPCNotification && Utils .hasText (responseMessage )) {
0 commit comments