1- import { log } from "node:console" ;
21import { Transport } from "../shared/transport.js" ;
32import { isJSONRPCNotification , JSONRPCMessage , JSONRPCMessageSchema } from "../types.js" ;
43import { auth , AuthResult , OAuthClientProvider , UnauthorizedError } from "./auth.js" ;
54import { EventSourceParserStream } from "eventsource-parser/stream" ;
65
6+ // Default reconnection options for StreamableHTTP connections
7+ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS : StreamableHTTPReconnectionOptions = {
8+ initialReconnectionDelay : 1000 ,
9+ maxReconnectionDelay : 30000 ,
10+ reconnectionDelayGrowFactor : 1.5 ,
11+ maxRetries : 2 ,
12+ } ;
13+
714export class StreamableHTTPError extends Error {
815 constructor (
916 public readonly code : number | undefined ,
@@ -13,6 +20,16 @@ export class StreamableHTTPError extends Error {
1320 }
1421}
1522
23+ /**
24+ * Options for starting or authenticating an SSE connection
25+ */
26+ export interface StartSSEOptions {
27+ /**
28+ * The ID of the last received event, used for resuming a disconnected stream
29+ */
30+ lastEventId ?: string ;
31+ }
32+
1633/**
1734 * Configuration options for reconnection behavior of the StreamableHTTPClientTransport.
1835 */
@@ -37,7 +54,7 @@ export interface StreamableHTTPReconnectionOptions {
3754
3855 /**
3956 * Maximum number of reconnection attempts before giving up.
40- * Default is 0 (unlimited) .
57+ * Default is 2 .
4158 */
4259 maxRetries : number ;
4360}
@@ -97,7 +114,7 @@ export class StreamableHTTPClientTransport implements Transport {
97114 this . _url = url ;
98115 this . _requestInit = opts ?. requestInit ;
99116 this . _authProvider = opts ?. authProvider ;
100- this . _reconnectionOptions = opts ?. reconnectionOptions || this . _defaultReconnectionOptions ;
117+ this . _reconnectionOptions = opts ?. reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS ;
101118 }
102119
103120 private async _authThenStart ( ) : Promise < void > {
@@ -117,7 +134,7 @@ export class StreamableHTTPClientTransport implements Transport {
117134 throw new UnauthorizedError ( ) ;
118135 }
119136
120- return await this . _startOrAuthStandaloneSSE ( ) ;
137+ return await this . _startOrAuthStandaloneSSE ( { lastEventId : undefined } ) ;
121138 }
122139
123140 private async _commonHeaders ( ) : Promise < Headers > {
@@ -138,7 +155,9 @@ export class StreamableHTTPClientTransport implements Transport {
138155 ) ;
139156 }
140157
141- private async _startOrAuthStandaloneSSE ( lastEventId ?: string ) : Promise < void > {
158+
159+ private async _startOrAuthStandaloneSSE ( options : StartSSEOptions ) : Promise < void > {
160+ const { lastEventId } = options ;
142161 try {
143162 // Try to open an initial SSE stream with GET to listen for server messages
144163 // This is optional according to the spec - server may not support it
@@ -181,19 +200,9 @@ export class StreamableHTTPClientTransport implements Transport {
181200 }
182201 }
183202
184- // Default reconnection options
185- private readonly _defaultReconnectionOptions : StreamableHTTPReconnectionOptions = {
186- initialReconnectionDelay : 1000 ,
187- maxReconnectionDelay : 30000 ,
188- reconnectionDelayGrowFactor : 1.5 ,
189- maxRetries : 2 ,
190- } ;
191-
192- // We no longer need global reconnection state as it will be maintained per stream
193203
194204 /**
195- * Calculates the next reconnection delay using exponential backoff algorithm
196- * with jitter for more effective reconnections in high load scenarios.
205+ * Calculates the next reconnection delay using backoff algorithm
197206 *
198207 * @param attempt Current reconnection attempt count for the specific stream
199208 * @returns Time to wait in milliseconds before next reconnection attempt
@@ -227,12 +236,11 @@ export class StreamableHTTPClientTransport implements Transport {
227236
228237 // Calculate next delay based on current attempt count
229238 const delay = this . _getNextReconnectionDelay ( attemptCount ) ;
230- log ( `Reconnection attempt ${ attemptCount + 1 } in ${ delay } ms...` ) ;
231239
232240 // Schedule the reconnection
233241 setTimeout ( ( ) => {
234242 // Use the last event ID to resume where we left off
235- this . _startOrAuthStandaloneSSE ( lastEventId ) . catch ( error => {
243+ this . _startOrAuthStandaloneSSE ( { lastEventId } ) . catch ( error => {
236244 this . onerror ?.( new Error ( `Failed to reconnect SSE stream: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
237245 // Schedule another attempt if this one failed, incrementing the attempt counter
238246 this . _scheduleReconnection ( lastEventId , attemptCount + 1 ) ;
@@ -247,7 +255,7 @@ export class StreamableHTTPClientTransport implements Transport {
247255
248256 let lastEventId : string | undefined ;
249257 const processStream = async ( ) => {
250- // this is the closest we can get to trying to cath network errors
258+ // this is the closest we can get to trying to catch network errors
251259 // if something happens reader will throw
252260 try {
253261 // Create a pipeline: binary stream -> text decoder -> SSE parser
@@ -279,7 +287,7 @@ export class StreamableHTTPClientTransport implements Transport {
279287 }
280288 } catch ( error ) {
281289 // Handle stream errors - likely a network disconnect
282- this . onerror ?.( new Error ( `SSE stream disconnected: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
290+ this . onerror ?.( new Error ( `SSE stream disconnected: ${ error } ` ) ) ;
283291
284292 // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
285293 if ( this . _abortController && ! this . _abortController . signal . aborted ) {
@@ -375,7 +383,7 @@ export class StreamableHTTPClientTransport implements Transport {
375383 // if it's supported by the server
376384 if ( isJSONRPCNotification ( message ) && message . method === "notifications/initialized" ) {
377385 // Start without a lastEventId since this is a fresh connection
378- this . _startOrAuthStandaloneSSE ( ) . catch ( err => this . onerror ?.( err ) ) ;
386+ this . _startOrAuthStandaloneSSE ( { lastEventId : undefined } ) . catch ( err => this . onerror ?.( err ) ) ;
379387 }
380388 return ;
381389 }
@@ -390,9 +398,6 @@ export class StreamableHTTPClientTransport implements Transport {
390398
391399 if ( hasRequests ) {
392400 if ( contentType ?. includes ( "text/event-stream" ) ) {
393- // Handle SSE stream responses for requests
394- // We use the same handler as standalone streams, which now supports
395- // reconnection with the last event ID
396401 this . _handleSseStream ( response . body ) ;
397402 } else if ( contentType ?. includes ( "application/json" ) ) {
398403 // For non-streaming servers, we might get direct JSON responses
0 commit comments