1+ import { log } from "node:console" ;
12import { Transport } from "../shared/transport.js" ;
23import { isJSONRPCNotification , JSONRPCMessage , JSONRPCMessageSchema } from "../types.js" ;
34import { auth , AuthResult , OAuthClientProvider , UnauthorizedError } from "./auth.js" ;
@@ -12,6 +13,35 @@ export class StreamableHTTPError extends Error {
1213 }
1314}
1415
16+ /**
17+ * Configuration options for reconnection behavior of the StreamableHTTPClientTransport.
18+ */
19+ export interface StreamableHTTPReconnectionOptions {
20+ /**
21+ * Maximum backoff time between reconnection attempts in milliseconds.
22+ * Default is 30000 (30 seconds).
23+ */
24+ maxReconnectionDelay : number ;
25+
26+ /**
27+ * Initial backoff time between reconnection attempts in milliseconds.
28+ * Default is 1000 (1 second).
29+ */
30+ initialReconnectionDelay : number ;
31+
32+ /**
33+ * The factor by which the reconnection delay increases after each attempt.
34+ * Default is 1.5.
35+ */
36+ reconnectionDelayGrowFactor : number ;
37+
38+ /**
39+ * Maximum number of reconnection attempts before giving up.
40+ * Default is 0 (unlimited).
41+ */
42+ maxRetries : number ;
43+ }
44+
1545/**
1646 * Configuration options for the `StreamableHTTPClientTransport`.
1747 */
@@ -36,6 +66,11 @@ export type StreamableHTTPClientTransportOptions = {
3666 * Customizes HTTP requests to the server.
3767 */
3868 requestInit ?: RequestInit ;
69+
70+ /**
71+ * Options to configure the reconnection behavior.
72+ */
73+ reconnectionOptions ?: StreamableHTTPReconnectionOptions ;
3974} ;
4075
4176/**
@@ -49,6 +84,7 @@ export class StreamableHTTPClientTransport implements Transport {
4984 private _requestInit ?: RequestInit ;
5085 private _authProvider ?: OAuthClientProvider ;
5186 private _sessionId ?: string ;
87+ private _reconnectionOptions : StreamableHTTPReconnectionOptions ;
5288
5389 onclose ?: ( ) => void ;
5490 onerror ?: ( error : Error ) => void ;
@@ -61,6 +97,7 @@ export class StreamableHTTPClientTransport implements Transport {
6197 this . _url = url ;
6298 this . _requestInit = opts ?. requestInit ;
6399 this . _authProvider = opts ?. authProvider ;
100+ this . _reconnectionOptions = opts ?. reconnectionOptions || this . _defaultReconnectionOptions ;
64101 }
65102
66103 private async _authThenStart ( ) : Promise < void > {
@@ -136,36 +173,101 @@ export class StreamableHTTPClientTransport implements Transport {
136173 `Failed to open SSE stream: ${ response . statusText } ` ,
137174 ) ;
138175 }
139- // Successful connection, handle the SSE stream as a standalone listener
176+
140177 this . _handleSseStream ( response . body ) ;
141178 } catch ( error ) {
142179 this . onerror ?.( error as Error ) ;
143180 throw error ;
144181 }
145182 }
146183
184+ // Default reconnection options
185+ private readonly _defaultReconnectionOptions : StreamableHTTPReconnectionOptions = {
186+ initialReconnectionDelay : 1000 ,
187+ maxReconnectionDelay : 30000 ,
188+ reconnectionDelayGrowFactor : 1.5 ,
189+ maxRetries : 2 ,
190+ } ;
191+
192+ // We no longer need global reconnection state as it will be maintained per stream
193+
194+ /**
195+ * Calculates the next reconnection delay using exponential backoff algorithm
196+ * with jitter for more effective reconnections in high load scenarios.
197+ *
198+ * @param attempt Current reconnection attempt count for the specific stream
199+ * @returns Time to wait in milliseconds before next reconnection attempt
200+ */
201+ private _getNextReconnectionDelay ( attempt : number ) : number {
202+ // Access default values directly, ensuring they're never undefined
203+ const initialDelay = this . _reconnectionOptions . initialReconnectionDelay ;
204+ const growFactor = this . _reconnectionOptions . reconnectionDelayGrowFactor ;
205+ const maxDelay = this . _reconnectionOptions . maxReconnectionDelay ;
206+
207+ // Cap at maximum delay
208+ return Math . min ( initialDelay * Math . pow ( growFactor , attempt ) , maxDelay ) ;
209+
210+ }
211+
212+ /**
213+ * Schedule a reconnection attempt with exponential backoff
214+ *
215+ * @param lastEventId The ID of the last received event for resumability
216+ * @param attemptCount Current reconnection attempt count for this specific stream
217+ */
218+ private _scheduleReconnection ( lastEventId : string , attemptCount = 0 ) : void {
219+ // Use provided options or default options
220+ const maxRetries = this . _reconnectionOptions . maxRetries ;
221+
222+ // Check if we've exceeded maximum retry attempts
223+ if ( maxRetries > 0 && attemptCount >= maxRetries ) {
224+ this . onerror ?.( new Error ( `Maximum reconnection attempts (${ maxRetries } ) exceeded.` ) ) ;
225+ return ;
226+ }
227+
228+ // Calculate next delay based on current attempt count
229+ const delay = this . _getNextReconnectionDelay ( attemptCount ) ;
230+ log ( `Reconnection attempt ${ attemptCount + 1 } in ${ delay } ms...` ) ;
231+
232+ // Schedule the reconnection
233+ setTimeout ( ( ) => {
234+ // Use the last event ID to resume where we left off
235+ this . _startOrAuthStandaloneSSE ( lastEventId ) . catch ( error => {
236+ this . onerror ?.( new Error ( `Failed to reconnect SSE stream: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
237+ // Schedule another attempt if this one failed, incrementing the attempt counter
238+ this . _scheduleReconnection ( lastEventId , attemptCount + 1 ) ;
239+ } ) ;
240+ } , delay ) ;
241+ }
242+
147243 private _handleSseStream ( stream : ReadableStream < Uint8Array > | null ) : void {
148244 if ( ! stream ) {
149245 return ;
150246 }
151247
152248 let lastEventId : string | undefined ;
153-
154249 const processStream = async ( ) => {
155- // Create a pipeline: binary stream -> text decoder -> SSE parser
156- const eventStream = stream
157- . pipeThrough ( new TextDecoderStream ( ) )
158- . pipeThrough ( new EventSourceParserStream ( ) ) ;
159-
250+ // this is the closest we can get to trying to cath network errors
251+ // if something happens reader will throw
160252 try {
161- for await ( const event of eventStream ) {
253+ // Create a pipeline: binary stream -> text decoder -> SSE parser
254+ const reader = stream
255+ . pipeThrough ( new TextDecoderStream ( ) )
256+ . pipeThrough ( new EventSourceParserStream ( ) )
257+ . getReader ( ) ;
258+
259+
260+ while ( true ) {
261+ const { value : event , done } = await reader . read ( ) ;
262+ if ( done ) {
263+ break ;
264+ }
265+
162266 // Update last event ID if provided
163267 if ( event . id ) {
164268 lastEventId = event . id ;
165269 }
166270
167- // Handle message events (default event type is undefined per docs)
168- // or explicit 'message' event type
169271 if ( ! event . event || event . event === "message" ) {
170272 try {
171273 const message = JSONRPCMessageSchema . parse ( JSON . parse ( event . data ) ) ;
@@ -179,31 +281,22 @@ export class StreamableHTTPClientTransport implements Transport {
179281 // Handle stream errors - likely a network disconnect
180282 this . onerror ?.( new Error ( `SSE stream disconnected: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
181283
182- // Attempt to reconnect if the stream disconnects unexpectedly
183- // Wait a short time before reconnecting to avoid rapid reconnection loops
284+ // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
184285 if ( this . _abortController && ! this . _abortController . signal . aborted ) {
185- setTimeout ( ( ) => {
186- // Use the last event ID to resume where we left off
187- this . _startOrAuthStandaloneSSE ( lastEventId ) . catch ( reconnectError => {
188- this . onerror ?.( new Error ( `Failed to reconnect SSE stream: ${ reconnectError instanceof Error ? reconnectError . message : String ( reconnectError ) } ` ) ) ;
189- } ) ;
190- } , 1000 ) ; // 1 second delay before reconnection attempt
286+ // Use the exponential backoff reconnection strategy
287+ if ( lastEventId !== undefined ) {
288+ try {
289+ this . _scheduleReconnection ( lastEventId , 0 ) ;
290+ }
291+ catch ( error ) {
292+ this . onerror ?.( new Error ( `Failed to reconnect: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
293+
294+ }
295+ }
191296 }
192297 }
193298 } ;
194-
195- processStream ( ) . catch ( err => {
196- this . onerror ?.( err ) ;
197-
198- // Try to reconnect on unexpected errors
199- if ( this . _abortController && ! this . _abortController . signal . aborted ) {
200- setTimeout ( ( ) => {
201- this . _startOrAuthStandaloneSSE ( lastEventId ) . catch ( reconnectError => {
202- this . onerror ?.( new Error ( `Failed to reconnect SSE stream: ${ reconnectError instanceof Error ? reconnectError . message : String ( reconnectError ) } ` ) ) ;
203- } ) ;
204- } , 1000 ) ;
205- }
206- } ) ;
299+ processStream ( ) ;
207300 }
208301
209302 async start ( ) {
0 commit comments