From d322291240b312f717faa39d01fc2c65882fe1e0 Mon Sep 17 00:00:00 2001 From: Kripa Dev Date: Sun, 29 Mar 2026 21:10:37 +0530 Subject: [PATCH] client: avoid duplicate abort onerror during close --- packages/client/src/client/streamableHttp.ts | 43 ++++++++++++++----- .../client/test/client/streamableHttp.test.ts | 37 ++++++++++++++++ 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index 3d45b60e9..ff22e4341 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -151,11 +151,32 @@ export class StreamableHTTPClientTransport implements Transport { private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping. private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field private _reconnectionTimeout?: ReturnType; + private _isClosing = false; onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; + private _isAbortError(error: unknown): boolean { + if (!(error instanceof Error)) { + return false; + } + + if (error.name === 'AbortError') { + return true; + } + + return error.message.includes('aborted'); + } + + private _emitError(error: Error): void { + if (this._isClosing && this._isAbortError(error)) { + return; + } + + this.onerror?.(error); + } + constructor(url: URL, opts?: StreamableHTTPClientTransportOptions) { this._url = url; this._resourceMetadataUrl = undefined; @@ -260,7 +281,7 @@ export class StreamableHTTPClientTransport implements Transport { this._handleSseStream(response.body, options, true); } catch (error) { - this.onerror?.(error as Error); + this._emitError(error as Error); throw error; } } @@ -298,7 +319,7 @@ export class StreamableHTTPClientTransport implements Transport { // Check if we've exceeded maximum retry attempts if (attemptCount >= maxRetries) { - this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`)); + this._emitError(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`)); return; } @@ -309,7 +330,7 @@ export class StreamableHTTPClientTransport implements Transport { this._reconnectionTimeout = setTimeout(() => { // Use the last event ID to resume where we left off this._startOrAuthSse(options).catch(error => { - this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); + this._emitError(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); // Schedule another attempt if this one failed, incrementing the attempt counter this._scheduleReconnection(options, attemptCount + 1); }); @@ -377,7 +398,7 @@ export class StreamableHTTPClientTransport implements Transport { } this.onmessage?.(message); } catch (error) { - this.onerror?.(error as Error); + this._emitError(error as Error); } } } @@ -400,7 +421,7 @@ export class StreamableHTTPClientTransport implements Transport { } } catch (error) { // Handle stream errors - likely a network disconnect - this.onerror?.(new Error(`SSE stream disconnected: ${error}`)); + this._emitError(new Error(`SSE stream disconnected: ${error}`)); // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) @@ -419,7 +440,7 @@ export class StreamableHTTPClientTransport implements Transport { 0 ); } catch (error) { - this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); + this._emitError(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); } } } @@ -434,6 +455,7 @@ export class StreamableHTTPClientTransport implements Transport { ); } + this._isClosing = false; this._abortController = new AbortController(); } @@ -462,6 +484,7 @@ export class StreamableHTTPClientTransport implements Transport { clearTimeout(this._reconnectionTimeout); this._reconnectionTimeout = undefined; } + this._isClosing = true; this._abortController?.abort(); this.onclose?.(); } @@ -484,7 +507,7 @@ export class StreamableHTTPClientTransport implements Transport { if (resumptionToken) { // If we have a last event ID, we need to reconnect the SSE stream this._startOrAuthSse({ resumptionToken, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch( - error => this.onerror?.(error) + error => this._emitError(error) ); return; } @@ -593,7 +616,7 @@ export class StreamableHTTPClientTransport implements Transport { // if it's supported by the server if (isInitializedNotification(message)) { // Start without a lastEventId since this is a fresh connection - this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this.onerror?.(error)); + this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this._emitError(error)); } return; } @@ -633,7 +656,7 @@ export class StreamableHTTPClientTransport implements Transport { await response.text?.().catch(() => {}); } } catch (error) { - this.onerror?.(error as Error); + this._emitError(error as Error); throw error; } } @@ -682,7 +705,7 @@ export class StreamableHTTPClientTransport implements Transport { this._sessionId = undefined; } catch (error) { - this.onerror?.(error as Error); + this._emitError(error as Error); throw error; } } diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index 55bf79a50..1b2c89394 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -1715,5 +1715,42 @@ describe('StreamableHTTPClientTransport', () => { refresh_token: 'refresh-token' // Refresh token is preserved }); }); + + it('should suppress abort-driven onerror callbacks during close()', async () => { + const abortError = new Error('The operation was aborted.'); + abortError.name = 'AbortError'; + + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockRejectedValueOnce(abortError); + + const onerror = vi.fn(); + transport.onerror = onerror; + + await transport.start(); + const sendPromise = transport.send({ jsonrpc: '2.0', method: 'test', params: {}, id: '1' } as JSONRPCMessage); + + await transport.close(); + await expect(sendPromise).rejects.toThrow('The operation was aborted.'); + + expect(onerror).not.toHaveBeenCalled(); + }); + + it('should still report abort errors when not closing', async () => { + const abortError = new Error('The operation was aborted.'); + abortError.name = 'AbortError'; + + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockRejectedValueOnce(abortError); + + const onerror = vi.fn(); + transport.onerror = onerror; + + await transport.start(); + await expect(transport.send({ jsonrpc: '2.0', method: 'test', params: {}, id: '1' } as JSONRPCMessage)).rejects.toThrow( + 'The operation was aborted.' + ); + + expect(onerror).toHaveBeenCalledTimes(1); + }); }); });