1010import io .modelcontextprotocol .spec .McpSessionNotFoundException ;
1111import io .modelcontextprotocol .spec .McpTransportSession ;
1212import io .modelcontextprotocol .spec .McpTransportStream ;
13+ import org .reactivestreams .Publisher ;
1314import org .slf4j .Logger ;
1415import org .slf4j .LoggerFactory ;
1516import org .springframework .core .ParameterizedTypeReference ;
3132import java .util .concurrent .atomic .AtomicReference ;
3233import java .util .function .Consumer ;
3334import java .util .function .Function ;
35+ import java .util .function .Supplier ;
3436
3537public class WebClientStreamableHttpTransport implements McpClientTransport {
3638
@@ -69,7 +71,24 @@ public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bui
6971 this .endpoint = endpoint ;
7072 this .resumableStreams = resumableStreams ;
7173 this .openConnectionOnStartup = openConnectionOnStartup ;
72- this .activeSession .set (new DefaultMcpTransportSession ());
74+ this .activeSession .set (createTransportSession ());
75+ }
76+
77+ private DefaultMcpTransportSession createTransportSession () {
78+ Supplier <Publisher <Void >> onClose = () -> {
79+ DefaultMcpTransportSession transportSession = this .activeSession .get ();
80+ return transportSession .sessionId ().isEmpty () ? Mono .empty () : webClient
81+ .delete ()
82+ .uri (this .endpoint )
83+ .headers (httpHeaders -> {
84+ httpHeaders .add ("mcp-session-id" , transportSession .sessionId ().get ());
85+ })
86+ .retrieve ()
87+ .toBodilessEntity ()
88+ .doOnError (e -> logger .info ("Got response {}" , e ))
89+ .then ();
90+ };
91+ return new DefaultMcpTransportSession (onClose );
7392 }
7493
7594 @ Override
@@ -93,7 +112,7 @@ public void setExceptionHandler(Consumer<Throwable> handler) {
93112 private void handleException (Throwable t ) {
94113 logger .debug ("Handling exception for session {}" , sessionIdOrPlaceholder (this .activeSession .get ()), t );
95114 if (t instanceof McpSessionNotFoundException ) {
96- McpTransportSession <?> invalidSession = this .activeSession .getAndSet (new DefaultMcpTransportSession ());
115+ McpTransportSession <?> invalidSession = this .activeSession .getAndSet (createTransportSession ());
97116 logger .warn ("Server does not recognize session {}. Invalidating." , invalidSession .sessionId ());
98117 invalidSession .close ();
99118 }
@@ -107,7 +126,7 @@ private void handleException(Throwable t) {
107126 public Mono <Void > closeGracefully () {
108127 return Mono .defer (() -> {
109128 logger .debug ("Graceful close triggered" );
110- DefaultMcpTransportSession currentSession = this .activeSession .get ( );
129+ DefaultMcpTransportSession currentSession = this .activeSession .getAndSet ( createTransportSession () );
111130 if (currentSession != null ) {
112131 return currentSession .closeGracefully ();
113132 }
0 commit comments