diff --git a/.changeset/async-onclose-stdin-eof.md b/.changeset/async-onclose-stdin-eof.md new file mode 100644 index 000000000..c5d3e9e61 --- /dev/null +++ b/.changeset/async-onclose-stdin-eof.md @@ -0,0 +1,11 @@ +--- +"@modelcontextprotocol/core": patch +"@modelcontextprotocol/server": patch +"@modelcontextprotocol/client": patch +--- + +Allow async `onclose` callbacks on Transport and Protocol. The signature changes from `() => void` to `() => void | Promise`, and all call sites now await the callback. This lets MCP servers perform async cleanup (e.g., releasing browser sessions or database connections) when the transport closes. + +Close `StdioServerTransport` when stdin reaches EOF, so containerized servers exit cleanly on client disconnect. + +Add SIGTERM handlers alongside SIGINT in all examples, since MCP servers run as background processes stopped by SIGTERM, not interactively via Ctrl+C. diff --git a/examples/client/src/elicitationUrlExample.ts b/examples/client/src/elicitationUrlExample.ts index 7c5cce2ee..d0d95015d 100644 --- a/examples/client/src/elicitationUrlExample.ts +++ b/examples/client/src/elicitationUrlExample.ts @@ -813,6 +813,10 @@ process.on('SIGINT', async () => { console.log('\nReceived SIGINT. Cleaning up...'); await cleanup(); }); +process.on('SIGTERM', async () => { + console.log('\nReceived SIGINT. Cleaning up...'); + await cleanup(); +}); // Start the interactive client try { diff --git a/examples/client/src/simpleOAuthClient.ts b/examples/client/src/simpleOAuthClient.ts index c75aea948..c59d3716f 100644 --- a/examples/client/src/simpleOAuthClient.ts +++ b/examples/client/src/simpleOAuthClient.ts @@ -448,6 +448,11 @@ async function main(): Promise { client.close(); process.exit(0); }); + process.on('SIGTERM', () => { + console.log('\n\nšŸ‘‹ Goodbye!'); + client.close(); + process.exit(0); + }); try { await client.connect(); diff --git a/examples/client/src/simpleStreamableHttp.ts b/examples/client/src/simpleStreamableHttp.ts index f22d16ba4..af427d906 100644 --- a/examples/client/src/simpleStreamableHttp.ts +++ b/examples/client/src/simpleStreamableHttp.ts @@ -997,6 +997,10 @@ process.on('SIGINT', async () => { console.log('\nReceived SIGINT. Cleaning up...'); await cleanup(); }); +process.on('SIGTERM', async () => { + console.log('\nReceived SIGINT. Cleaning up...'); + await cleanup(); +}); // Start the interactive client try { diff --git a/examples/server/src/elicitationFormExample.ts b/examples/server/src/elicitationFormExample.ts index 9c13b739e..82fb1a7d2 100644 --- a/examples/server/src/elicitationFormExample.ts +++ b/examples/server/src/elicitationFormExample.ts @@ -453,6 +453,22 @@ async function main() { process.on('SIGINT', async () => { console.log('Shutting down server...'); + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); + }); + process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + // Close all active transports to properly clean up resources for (const sessionId in transports) { try { diff --git a/examples/server/src/elicitationUrlExample.ts b/examples/server/src/elicitationUrlExample.ts index c38dd75e8..db659ddf5 100644 --- a/examples/server/src/elicitationUrlExample.ts +++ b/examples/server/src/elicitationUrlExample.ts @@ -731,3 +731,20 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + delete sessionsNeedingElicitation[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/examples/server/src/jsonResponseStreamableHttp.ts b/examples/server/src/jsonResponseStreamableHttp.ts index 7a3aad67a..81c911efc 100644 --- a/examples/server/src/jsonResponseStreamableHttp.ts +++ b/examples/server/src/jsonResponseStreamableHttp.ts @@ -163,3 +163,7 @@ process.on('SIGINT', async () => { console.log('Shutting down server...'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + process.exit(0); +}); diff --git a/examples/server/src/simpleStatelessStreamableHttp.ts b/examples/server/src/simpleStatelessStreamableHttp.ts index 2b4f0363d..f2dbcb72a 100644 --- a/examples/server/src/simpleStatelessStreamableHttp.ts +++ b/examples/server/src/simpleStatelessStreamableHttp.ts @@ -169,3 +169,8 @@ process.on('SIGINT', async () => { // eslint-disable-next-line unicorn/no-process-exit process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + // eslint-disable-next-line unicorn/no-process-exit + process.exit(0); +}); diff --git a/examples/server/src/simpleStreamableHttp.ts b/examples/server/src/simpleStreamableHttp.ts index 1263f4bb5..6cf33744d 100644 --- a/examples/server/src/simpleStreamableHttp.ts +++ b/examples/server/src/simpleStreamableHttp.ts @@ -815,3 +815,19 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/examples/server/src/simpleTaskInteractive.ts b/examples/server/src/simpleTaskInteractive.ts index 9092926d9..708c98774 100644 --- a/examples/server/src/simpleTaskInteractive.ts +++ b/examples/server/src/simpleTaskInteractive.ts @@ -741,3 +741,18 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('\nShutting down server...'); + for (const sessionId of Object.keys(transports)) { + try { + await transports[sessionId]!.close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing session ${sessionId}:`, error); + } + } + taskStore.cleanup(); + messageQueue.cleanup(); + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/examples/server/src/standaloneSseWithGetStreamableHttp.ts b/examples/server/src/standaloneSseWithGetStreamableHttp.ts index b1b2ccf51..08e78200d 100644 --- a/examples/server/src/standaloneSseWithGetStreamableHttp.ts +++ b/examples/server/src/standaloneSseWithGetStreamableHttp.ts @@ -159,3 +159,21 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + clearInterval(resourceChangeInterval); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + delete servers[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/packages/client/src/client/sse.ts b/packages/client/src/client/sse.ts index f441e9cdb..beb061e25 100644 --- a/packages/client/src/client/sse.ts +++ b/packages/client/src/client/sse.ts @@ -78,7 +78,7 @@ export class SSEClientTransport implements Transport { private _fetchWithInit: FetchLike; private _protocolVersion?: string; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -242,7 +242,7 @@ export class SSEClientTransport implements Transport { async close(): Promise { this._abortController?.abort(); this._eventSource?.close(); - this.onclose?.(); + await this.onclose?.(); } async send(message: JSONRPCMessage): Promise { diff --git a/packages/client/src/client/stdio.ts b/packages/client/src/client/stdio.ts index 6c1571f11..eab58402b 100644 --- a/packages/client/src/client/stdio.ts +++ b/packages/client/src/client/stdio.ts @@ -96,7 +96,7 @@ export class StdioClientTransport implements Transport { private _serverParams: StdioServerParameters; private _stderrStream: PassThrough | null = null; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -139,9 +139,9 @@ export class StdioClientTransport implements Transport { resolve(); }); - this._process.on('close', _code => { + this._process.on('close', async _code => { this._process = undefined; - this.onclose?.(); + await this.onclose?.(); }); this._process.stdin?.on('error', error => { diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index 3d45b60e9..d07d7971c 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -152,7 +152,7 @@ export class StreamableHTTPClientTransport implements Transport { private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field private _reconnectionTimeout?: ReturnType; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -463,7 +463,7 @@ export class StreamableHTTPClientTransport implements Transport { this._reconnectionTimeout = undefined; } this._abortController?.abort(); - this.onclose?.(); + await this.onclose?.(); } async send( diff --git a/packages/client/src/client/websocket.ts b/packages/client/src/client/websocket.ts index cb0c34687..54a5c3e15 100644 --- a/packages/client/src/client/websocket.ts +++ b/packages/client/src/client/websocket.ts @@ -10,7 +10,7 @@ export class WebSocketClientTransport implements Transport { private _socket?: WebSocket; private _url: URL; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -38,8 +38,8 @@ export class WebSocketClientTransport implements Transport { resolve(); }; - this._socket.onclose = () => { - this.onclose?.(); + this._socket.onclose = async () => { + await this.onclose?.(); }; this._socket.onmessage = (event: MessageEvent) => { diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index d6daf0172..9dee96eb7 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -316,7 +316,7 @@ export abstract class Protocol { * * This is invoked when {@linkcode Protocol.close | close()} is called as well. */ - onclose?: () => void; + onclose?: () => void | Promise; /** * Callback for when an error occurs. @@ -456,11 +456,11 @@ export abstract class Protocol { async connect(transport: Transport): Promise { this._transport = transport; const _onclose = this.transport?.onclose; - this._transport.onclose = () => { + this._transport.onclose = async () => { try { - _onclose?.(); + if (_onclose) await _onclose(); } finally { - this._onclose(); + await this._onclose(); } }; @@ -490,12 +490,15 @@ export abstract class Protocol { await this._transport.start(); } - private _onclose(): void { + private async _onclose(): Promise { const responseHandlers = this._responseHandlers; this._responseHandlers = new Map(); this._progressHandlers.clear(); this._taskManager.onClose(); this._pendingDebouncedNotifications.clear(); + this._transport = undefined; + + await this.onclose?.(); for (const info of this._timeoutInfo.values()) { clearTimeout(info.timeoutId); @@ -507,18 +510,12 @@ export abstract class Protocol { const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed'); - this._transport = undefined; - - try { - this.onclose?.(); - } finally { - for (const handler of responseHandlers.values()) { - handler(error); - } + for (const handler of responseHandlers.values()) { + handler(error); + } - for (const controller of requestHandlerAbortControllers.values()) { - controller.abort(error); - } + for (const controller of requestHandlerAbortControllers.values()) { + controller.abort(error); } } diff --git a/packages/core/src/shared/transport.ts b/packages/core/src/shared/transport.ts index 889b319a9..9fd4c4b20 100644 --- a/packages/core/src/shared/transport.ts +++ b/packages/core/src/shared/transport.ts @@ -98,7 +98,7 @@ export interface Transport { * * This should be invoked when {@linkcode Transport.close | close()} is called as well. */ - onclose?: () => void; + onclose?: () => void | Promise; /** * Callback for when an error occurs. diff --git a/packages/core/src/util/inMemory.ts b/packages/core/src/util/inMemory.ts index 256363c13..d5a8f3ddb 100644 --- a/packages/core/src/util/inMemory.ts +++ b/packages/core/src/util/inMemory.ts @@ -15,7 +15,7 @@ export class InMemoryTransport implements Transport { private _messageQueue: QueuedMessage[] = []; private _closed = false; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void; sessionId?: string; @@ -48,7 +48,7 @@ export class InMemoryTransport implements Transport { try { await other?.close(); } finally { - this.onclose?.(); + await this.onclose?.(); } } diff --git a/packages/core/test/inMemory.test.ts b/packages/core/test/inMemory.test.ts index 46332eaa2..9b313e22d 100644 --- a/packages/core/test/inMemory.test.ts +++ b/packages/core/test/inMemory.test.ts @@ -103,8 +103,12 @@ describe('InMemoryTransport', () => { let clientCloseCount = 0; let serverCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; - serverTransport.onclose = () => serverCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; + serverTransport.onclose = () => { + serverCloseCount++; + }; await clientTransport.close(); @@ -114,7 +118,9 @@ describe('InMemoryTransport', () => { test('should handle double close idempotently', async () => { let clientCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; await clientTransport.close(); await clientTransport.close(); @@ -126,8 +132,12 @@ describe('InMemoryTransport', () => { let clientCloseCount = 0; let serverCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; - serverTransport.onclose = () => serverCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; + serverTransport.onclose = () => { + serverCloseCount++; + }; await Promise.all([clientTransport.close(), serverTransport.close()]); @@ -137,7 +147,9 @@ describe('InMemoryTransport', () => { test('should fire onclose even if peer onclose throws', async () => { let clientCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; serverTransport.onclose = () => { throw new Error('boom'); }; diff --git a/packages/core/test/shared/protocol.test.ts b/packages/core/test/shared/protocol.test.ts index 69735bc3a..5bfa5b4a8 100644 --- a/packages/core/test/shared/protocol.test.ts +++ b/packages/core/test/shared/protocol.test.ts @@ -69,7 +69,7 @@ interface TestProtocolInternals { // Mock Transport class class MockTransport implements Transport { - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: unknown) => void; diff --git a/packages/core/test/shared/protocolTransportHandling.test.ts b/packages/core/test/shared/protocolTransportHandling.test.ts index 4e9c33e67..031b9646b 100644 --- a/packages/core/test/shared/protocolTransportHandling.test.ts +++ b/packages/core/test/shared/protocolTransportHandling.test.ts @@ -8,7 +8,7 @@ import type { EmptyResult, JSONRPCMessage, Notification, Request, Result } from // Mock Transport class class MockTransport implements Transport { id: string; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: unknown) => void; sentMessages: JSONRPCMessage[] = []; diff --git a/packages/server/src/server/stdio.ts b/packages/server/src/server/stdio.ts index ac2dd3f78..d448fa560 100644 --- a/packages/server/src/server/stdio.ts +++ b/packages/server/src/server/stdio.ts @@ -26,7 +26,7 @@ export class StdioServerTransport implements Transport { private _stdout: Writable = process.stdout ) {} - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -59,6 +59,9 @@ export class StdioServerTransport implements Transport { this._stdin.on('data', this._ondata); this._stdin.on('error', this._onerror); this._stdout.on('error', this._onstdouterror); + this._stdin.on('end', () => { + this.close(); + }); } private processReadBuffer() { @@ -97,7 +100,7 @@ export class StdioServerTransport implements Transport { // Clear the buffer and notify closure this._readBuffer.clear(); - this.onclose?.(); + await this.onclose?.(); } send(message: JSONRPCMessage): Promise { diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 31053f35c..29146feae 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -241,7 +241,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { private _supportedProtocolVersions: string[]; sessionId?: string; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; @@ -908,7 +908,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // Clear any pending responses this._requestResponseMap.clear(); - this.onclose?.(); + await this.onclose?.(); } /** diff --git a/packages/server/test/server/stdio.test.ts b/packages/server/test/server/stdio.test.ts index 92671cacd..9afa4d017 100644 --- a/packages/server/test/server/stdio.test.ts +++ b/packages/server/test/server/stdio.test.ts @@ -41,6 +41,43 @@ test('should start then close cleanly', async () => { expect(didClose).toBeTruthy(); }); +test('should close when stdin ends', async () => { + const server = new StdioServerTransport(input, output); + server.onerror = error => { + throw error; + }; + + let didClose = false; + server.onclose = () => { + didClose = true; + }; + + await server.start(); + expect(didClose).toBeFalsy(); + + input.push(null); // signal EOF + // Give the event loop a tick to process the end event + await new Promise(resolve => setTimeout(resolve, 50)); + expect(didClose).toBeTruthy(); +}); + +test('should await async onclose callback', async () => { + const server = new StdioServerTransport(input, output); + server.onerror = error => { + throw error; + }; + + let cleanupDone = false; + server.onclose = async () => { + await new Promise(resolve => setTimeout(resolve, 50)); + cleanupDone = true; + }; + + await server.start(); + await server.close(); + expect(cleanupDone).toBeTruthy(); +}); + test('should not read until started', async () => { const server = new StdioServerTransport(input, output); server.onerror = error => { @@ -171,8 +208,12 @@ test('should fire onerror before onclose on stdout error', async () => { const server = new StdioServerTransport(input, output); const events: string[] = []; - server.onerror = () => events.push('error'); - server.onclose = () => events.push('close'); + server.onerror = () => { + events.push('error'); + }; + server.onclose = () => { + events.push('close'); + }; await server.start(); output.emit('error', new Error('EPIPE'));