Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,32 @@ export class StreamableHTTPClientTransport implements Transport {
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
private _isClosing = false;

onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

private _isAbortError(error: unknown): boolean {
if (!(error instanceof Error)) {
return false;
}

if (error.name === 'AbortError') {
return true;
}

return error.message.includes('aborted');
}

private _emitError(error: Error): void {
if (this._isClosing && this._isAbortError(error)) {
return;
}

this.onerror?.(error);
}

constructor(url: URL, opts?: StreamableHTTPClientTransportOptions) {
this._url = url;
this._resourceMetadataUrl = undefined;
Expand Down Expand Up @@ -260,7 +281,7 @@ export class StreamableHTTPClientTransport implements Transport {

this._handleSseStream(response.body, options, true);
} catch (error) {
this.onerror?.(error as Error);
this._emitError(error as Error);
throw error;
}
}
Expand Down Expand Up @@ -298,7 +319,7 @@ export class StreamableHTTPClientTransport implements Transport {

// Check if we've exceeded maximum retry attempts
if (attemptCount >= maxRetries) {
this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
this._emitError(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
return;
}

Expand All @@ -309,7 +330,7 @@ export class StreamableHTTPClientTransport implements Transport {
this._reconnectionTimeout = setTimeout(() => {
// Use the last event ID to resume where we left off
this._startOrAuthSse(options).catch(error => {
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
this._emitError(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
// Schedule another attempt if this one failed, incrementing the attempt counter
this._scheduleReconnection(options, attemptCount + 1);
});
Expand Down Expand Up @@ -377,7 +398,7 @@ export class StreamableHTTPClientTransport implements Transport {
}
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
this._emitError(error as Error);
}
}
}
Expand All @@ -400,7 +421,7 @@ export class StreamableHTTPClientTransport implements Transport {
}
} catch (error) {
// Handle stream errors - likely a network disconnect
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
this._emitError(new Error(`SSE stream disconnected: ${error}`));

// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
Expand All @@ -419,7 +440,7 @@ export class StreamableHTTPClientTransport implements Transport {
0
);
} catch (error) {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
this._emitError(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
}
}
}
Expand All @@ -434,6 +455,7 @@ export class StreamableHTTPClientTransport implements Transport {
);
}

this._isClosing = false;
this._abortController = new AbortController();
}

Expand Down Expand Up @@ -462,6 +484,7 @@ export class StreamableHTTPClientTransport implements Transport {
clearTimeout(this._reconnectionTimeout);
this._reconnectionTimeout = undefined;
}
this._isClosing = true;
this._abortController?.abort();
this.onclose?.();
}
Expand All @@ -484,7 +507,7 @@ export class StreamableHTTPClientTransport implements Transport {
if (resumptionToken) {
// If we have a last event ID, we need to reconnect the SSE stream
this._startOrAuthSse({ resumptionToken, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(
error => this.onerror?.(error)
error => this._emitError(error)
);
return;
}
Expand Down Expand Up @@ -593,7 +616,7 @@ export class StreamableHTTPClientTransport implements Transport {
// if it's supported by the server
if (isInitializedNotification(message)) {
// Start without a lastEventId since this is a fresh connection
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this.onerror?.(error));
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this._emitError(error));
}
return;
}
Expand Down Expand Up @@ -633,7 +656,7 @@ export class StreamableHTTPClientTransport implements Transport {
await response.text?.().catch(() => {});
}
} catch (error) {
this.onerror?.(error as Error);
this._emitError(error as Error);
throw error;
}
}
Expand Down Expand Up @@ -682,7 +705,7 @@ export class StreamableHTTPClientTransport implements Transport {

this._sessionId = undefined;
} catch (error) {
this.onerror?.(error as Error);
this._emitError(error as Error);
throw error;
}
}
Expand Down
37 changes: 37 additions & 0 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1715,5 +1715,42 @@ describe('StreamableHTTPClientTransport', () => {
refresh_token: 'refresh-token' // Refresh token is preserved
});
});

it('should suppress abort-driven onerror callbacks during close()', async () => {
const abortError = new Error('The operation was aborted.');
abortError.name = 'AbortError';

const fetchMock = globalThis.fetch as Mock;
fetchMock.mockRejectedValueOnce(abortError);

const onerror = vi.fn();
transport.onerror = onerror;

await transport.start();
const sendPromise = transport.send({ jsonrpc: '2.0', method: 'test', params: {}, id: '1' } as JSONRPCMessage);

await transport.close();
await expect(sendPromise).rejects.toThrow('The operation was aborted.');

expect(onerror).not.toHaveBeenCalled();
});

it('should still report abort errors when not closing', async () => {
const abortError = new Error('The operation was aborted.');
abortError.name = 'AbortError';

const fetchMock = globalThis.fetch as Mock;
fetchMock.mockRejectedValueOnce(abortError);

const onerror = vi.fn();
transport.onerror = onerror;

await transport.start();
await expect(transport.send({ jsonrpc: '2.0', method: 'test', params: {}, id: '1' } as JSONRPCMessage)).rejects.toThrow(
'The operation was aborted.'
);

expect(onerror).toHaveBeenCalledTimes(1);
});
});
});
Loading