From 3f70c0058e0dd22662d8b6cfe45c2033bf0d80b5 Mon Sep 17 00:00:00 2001 From: Aaron Paterson Date: Sun, 29 Mar 2026 14:02:21 +0000 Subject: [PATCH] fix: async onclose, stdin EOF detection, SIGTERM in examples Three related improvements to server lifecycle handling: 1. Allow async onclose callbacks on Transport and Protocol. MCP servers that hold external resources (browser sessions, database connections) need to await cleanup before the process exits. The onclose signature changes from `() => void` to `() => void | Promise`, matching the existing pattern used by onsessionclosed in StreamableHTTPServerTransport. All transports and Protocol._onclose now await the callback. 2. Close StdioServerTransport when stdin ends. The transport listened for data and error but not EOF. When the MCP client disconnects, the transport stays open and onclose never fires. This is especially visible with containerized servers using docker run with automatic removal: without onclose the server never exits and the container accumulates. 3. Add SIGTERM handlers alongside SIGINT in all examples. MCP servers run as background processes spawned by clients, not interactively. SIGTERM is what container runtimes and process managers send to stop a process. --- .changeset/async-onclose-stdin-eof.md | 11 +++++ examples/client/src/elicitationUrlExample.ts | 4 ++ examples/client/src/simpleOAuthClient.ts | 5 +++ examples/client/src/simpleStreamableHttp.ts | 4 ++ examples/server/src/elicitationFormExample.ts | 16 +++++++ examples/server/src/elicitationUrlExample.ts | 17 +++++++ .../server/src/jsonResponseStreamableHttp.ts | 4 ++ .../src/simpleStatelessStreamableHttp.ts | 5 +++ examples/server/src/simpleStreamableHttp.ts | 16 +++++++ examples/server/src/simpleTaskInteractive.ts | 15 +++++++ .../src/standaloneSseWithGetStreamableHttp.ts | 18 ++++++++ packages/client/src/client/sse.ts | 4 +- packages/client/src/client/stdio.ts | 6 +-- packages/client/src/client/streamableHttp.ts | 4 +- packages/client/src/client/websocket.ts | 6 +-- packages/core/src/shared/protocol.ts | 29 ++++++------ packages/core/src/shared/transport.ts | 2 +- packages/core/src/util/inMemory.ts | 4 +- packages/core/test/inMemory.test.ts | 24 +++++++--- packages/core/test/shared/protocol.test.ts | 2 +- .../shared/protocolTransportHandling.test.ts | 2 +- packages/server/src/server/stdio.ts | 7 ++- packages/server/src/server/streamableHttp.ts | 4 +- packages/server/test/server/stdio.test.ts | 45 ++++++++++++++++++- 24 files changed, 211 insertions(+), 43 deletions(-) create mode 100644 .changeset/async-onclose-stdin-eof.md diff --git a/.changeset/async-onclose-stdin-eof.md b/.changeset/async-onclose-stdin-eof.md new file mode 100644 index 000000000..c5d3e9e61 --- /dev/null +++ b/.changeset/async-onclose-stdin-eof.md @@ -0,0 +1,11 @@ +--- +"@modelcontextprotocol/core": patch +"@modelcontextprotocol/server": patch +"@modelcontextprotocol/client": patch +--- + +Allow async `onclose` callbacks on Transport and Protocol. The signature changes from `() => void` to `() => void | Promise`, and all call sites now await the callback. This lets MCP servers perform async cleanup (e.g., releasing browser sessions or database connections) when the transport closes. + +Close `StdioServerTransport` when stdin reaches EOF, so containerized servers exit cleanly on client disconnect. + +Add SIGTERM handlers alongside SIGINT in all examples, since MCP servers run as background processes stopped by SIGTERM, not interactively via Ctrl+C. diff --git a/examples/client/src/elicitationUrlExample.ts b/examples/client/src/elicitationUrlExample.ts index 7c5cce2ee..d0d95015d 100644 --- a/examples/client/src/elicitationUrlExample.ts +++ b/examples/client/src/elicitationUrlExample.ts @@ -813,6 +813,10 @@ process.on('SIGINT', async () => { console.log('\nReceived SIGINT. Cleaning up...'); await cleanup(); }); +process.on('SIGTERM', async () => { + console.log('\nReceived SIGINT. Cleaning up...'); + await cleanup(); +}); // Start the interactive client try { diff --git a/examples/client/src/simpleOAuthClient.ts b/examples/client/src/simpleOAuthClient.ts index c75aea948..c59d3716f 100644 --- a/examples/client/src/simpleOAuthClient.ts +++ b/examples/client/src/simpleOAuthClient.ts @@ -448,6 +448,11 @@ async function main(): Promise { client.close(); process.exit(0); }); + process.on('SIGTERM', () => { + console.log('\n\nšŸ‘‹ Goodbye!'); + client.close(); + process.exit(0); + }); try { await client.connect(); diff --git a/examples/client/src/simpleStreamableHttp.ts b/examples/client/src/simpleStreamableHttp.ts index f22d16ba4..af427d906 100644 --- a/examples/client/src/simpleStreamableHttp.ts +++ b/examples/client/src/simpleStreamableHttp.ts @@ -997,6 +997,10 @@ process.on('SIGINT', async () => { console.log('\nReceived SIGINT. Cleaning up...'); await cleanup(); }); +process.on('SIGTERM', async () => { + console.log('\nReceived SIGINT. Cleaning up...'); + await cleanup(); +}); // Start the interactive client try { diff --git a/examples/server/src/elicitationFormExample.ts b/examples/server/src/elicitationFormExample.ts index 9c13b739e..82fb1a7d2 100644 --- a/examples/server/src/elicitationFormExample.ts +++ b/examples/server/src/elicitationFormExample.ts @@ -453,6 +453,22 @@ async function main() { process.on('SIGINT', async () => { console.log('Shutting down server...'); + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); + }); + process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + // Close all active transports to properly clean up resources for (const sessionId in transports) { try { diff --git a/examples/server/src/elicitationUrlExample.ts b/examples/server/src/elicitationUrlExample.ts index c38dd75e8..db659ddf5 100644 --- a/examples/server/src/elicitationUrlExample.ts +++ b/examples/server/src/elicitationUrlExample.ts @@ -731,3 +731,20 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + delete sessionsNeedingElicitation[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/examples/server/src/jsonResponseStreamableHttp.ts b/examples/server/src/jsonResponseStreamableHttp.ts index 7a3aad67a..81c911efc 100644 --- a/examples/server/src/jsonResponseStreamableHttp.ts +++ b/examples/server/src/jsonResponseStreamableHttp.ts @@ -163,3 +163,7 @@ process.on('SIGINT', async () => { console.log('Shutting down server...'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + process.exit(0); +}); diff --git a/examples/server/src/simpleStatelessStreamableHttp.ts b/examples/server/src/simpleStatelessStreamableHttp.ts index 2b4f0363d..f2dbcb72a 100644 --- a/examples/server/src/simpleStatelessStreamableHttp.ts +++ b/examples/server/src/simpleStatelessStreamableHttp.ts @@ -169,3 +169,8 @@ process.on('SIGINT', async () => { // eslint-disable-next-line unicorn/no-process-exit process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + // eslint-disable-next-line unicorn/no-process-exit + process.exit(0); +}); diff --git a/examples/server/src/simpleStreamableHttp.ts b/examples/server/src/simpleStreamableHttp.ts index 1263f4bb5..6cf33744d 100644 --- a/examples/server/src/simpleStreamableHttp.ts +++ b/examples/server/src/simpleStreamableHttp.ts @@ -815,3 +815,19 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/examples/server/src/simpleTaskInteractive.ts b/examples/server/src/simpleTaskInteractive.ts index 9092926d9..708c98774 100644 --- a/examples/server/src/simpleTaskInteractive.ts +++ b/examples/server/src/simpleTaskInteractive.ts @@ -741,3 +741,18 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('\nShutting down server...'); + for (const sessionId of Object.keys(transports)) { + try { + await transports[sessionId]!.close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing session ${sessionId}:`, error); + } + } + taskStore.cleanup(); + messageQueue.cleanup(); + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/examples/server/src/standaloneSseWithGetStreamableHttp.ts b/examples/server/src/standaloneSseWithGetStreamableHttp.ts index b1b2ccf51..08e78200d 100644 --- a/examples/server/src/standaloneSseWithGetStreamableHttp.ts +++ b/examples/server/src/standaloneSseWithGetStreamableHttp.ts @@ -159,3 +159,21 @@ process.on('SIGINT', async () => { console.log('Server shutdown complete'); process.exit(0); }); +process.on('SIGTERM', async () => { + console.log('Shutting down server...'); + clearInterval(resourceChangeInterval); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId]!.close(); + delete transports[sessionId]; + delete servers[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + console.log('Server shutdown complete'); + process.exit(0); +}); diff --git a/packages/client/src/client/sse.ts b/packages/client/src/client/sse.ts index f441e9cdb..beb061e25 100644 --- a/packages/client/src/client/sse.ts +++ b/packages/client/src/client/sse.ts @@ -78,7 +78,7 @@ export class SSEClientTransport implements Transport { private _fetchWithInit: FetchLike; private _protocolVersion?: string; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -242,7 +242,7 @@ export class SSEClientTransport implements Transport { async close(): Promise { this._abortController?.abort(); this._eventSource?.close(); - this.onclose?.(); + await this.onclose?.(); } async send(message: JSONRPCMessage): Promise { diff --git a/packages/client/src/client/stdio.ts b/packages/client/src/client/stdio.ts index 6c1571f11..eab58402b 100644 --- a/packages/client/src/client/stdio.ts +++ b/packages/client/src/client/stdio.ts @@ -96,7 +96,7 @@ export class StdioClientTransport implements Transport { private _serverParams: StdioServerParameters; private _stderrStream: PassThrough | null = null; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -139,9 +139,9 @@ export class StdioClientTransport implements Transport { resolve(); }); - this._process.on('close', _code => { + this._process.on('close', async _code => { this._process = undefined; - this.onclose?.(); + await this.onclose?.(); }); this._process.stdin?.on('error', error => { diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index 3d45b60e9..d07d7971c 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -152,7 +152,7 @@ export class StreamableHTTPClientTransport implements Transport { private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field private _reconnectionTimeout?: ReturnType; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -463,7 +463,7 @@ export class StreamableHTTPClientTransport implements Transport { this._reconnectionTimeout = undefined; } this._abortController?.abort(); - this.onclose?.(); + await this.onclose?.(); } async send( diff --git a/packages/client/src/client/websocket.ts b/packages/client/src/client/websocket.ts index cb0c34687..54a5c3e15 100644 --- a/packages/client/src/client/websocket.ts +++ b/packages/client/src/client/websocket.ts @@ -10,7 +10,7 @@ export class WebSocketClientTransport implements Transport { private _socket?: WebSocket; private _url: URL; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -38,8 +38,8 @@ export class WebSocketClientTransport implements Transport { resolve(); }; - this._socket.onclose = () => { - this.onclose?.(); + this._socket.onclose = async () => { + await this.onclose?.(); }; this._socket.onmessage = (event: MessageEvent) => { diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index d6daf0172..9dee96eb7 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -316,7 +316,7 @@ export abstract class Protocol { * * This is invoked when {@linkcode Protocol.close | close()} is called as well. */ - onclose?: () => void; + onclose?: () => void | Promise; /** * Callback for when an error occurs. @@ -456,11 +456,11 @@ export abstract class Protocol { async connect(transport: Transport): Promise { this._transport = transport; const _onclose = this.transport?.onclose; - this._transport.onclose = () => { + this._transport.onclose = async () => { try { - _onclose?.(); + if (_onclose) await _onclose(); } finally { - this._onclose(); + await this._onclose(); } }; @@ -490,12 +490,15 @@ export abstract class Protocol { await this._transport.start(); } - private _onclose(): void { + private async _onclose(): Promise { const responseHandlers = this._responseHandlers; this._responseHandlers = new Map(); this._progressHandlers.clear(); this._taskManager.onClose(); this._pendingDebouncedNotifications.clear(); + this._transport = undefined; + + await this.onclose?.(); for (const info of this._timeoutInfo.values()) { clearTimeout(info.timeoutId); @@ -507,18 +510,12 @@ export abstract class Protocol { const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed'); - this._transport = undefined; - - try { - this.onclose?.(); - } finally { - for (const handler of responseHandlers.values()) { - handler(error); - } + for (const handler of responseHandlers.values()) { + handler(error); + } - for (const controller of requestHandlerAbortControllers.values()) { - controller.abort(error); - } + for (const controller of requestHandlerAbortControllers.values()) { + controller.abort(error); } } diff --git a/packages/core/src/shared/transport.ts b/packages/core/src/shared/transport.ts index 889b319a9..9fd4c4b20 100644 --- a/packages/core/src/shared/transport.ts +++ b/packages/core/src/shared/transport.ts @@ -98,7 +98,7 @@ export interface Transport { * * This should be invoked when {@linkcode Transport.close | close()} is called as well. */ - onclose?: () => void; + onclose?: () => void | Promise; /** * Callback for when an error occurs. diff --git a/packages/core/src/util/inMemory.ts b/packages/core/src/util/inMemory.ts index 256363c13..d5a8f3ddb 100644 --- a/packages/core/src/util/inMemory.ts +++ b/packages/core/src/util/inMemory.ts @@ -15,7 +15,7 @@ export class InMemoryTransport implements Transport { private _messageQueue: QueuedMessage[] = []; private _closed = false; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void; sessionId?: string; @@ -48,7 +48,7 @@ export class InMemoryTransport implements Transport { try { await other?.close(); } finally { - this.onclose?.(); + await this.onclose?.(); } } diff --git a/packages/core/test/inMemory.test.ts b/packages/core/test/inMemory.test.ts index 46332eaa2..9b313e22d 100644 --- a/packages/core/test/inMemory.test.ts +++ b/packages/core/test/inMemory.test.ts @@ -103,8 +103,12 @@ describe('InMemoryTransport', () => { let clientCloseCount = 0; let serverCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; - serverTransport.onclose = () => serverCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; + serverTransport.onclose = () => { + serverCloseCount++; + }; await clientTransport.close(); @@ -114,7 +118,9 @@ describe('InMemoryTransport', () => { test('should handle double close idempotently', async () => { let clientCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; await clientTransport.close(); await clientTransport.close(); @@ -126,8 +132,12 @@ describe('InMemoryTransport', () => { let clientCloseCount = 0; let serverCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; - serverTransport.onclose = () => serverCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; + serverTransport.onclose = () => { + serverCloseCount++; + }; await Promise.all([clientTransport.close(), serverTransport.close()]); @@ -137,7 +147,9 @@ describe('InMemoryTransport', () => { test('should fire onclose even if peer onclose throws', async () => { let clientCloseCount = 0; - clientTransport.onclose = () => clientCloseCount++; + clientTransport.onclose = () => { + clientCloseCount++; + }; serverTransport.onclose = () => { throw new Error('boom'); }; diff --git a/packages/core/test/shared/protocol.test.ts b/packages/core/test/shared/protocol.test.ts index 69735bc3a..5bfa5b4a8 100644 --- a/packages/core/test/shared/protocol.test.ts +++ b/packages/core/test/shared/protocol.test.ts @@ -69,7 +69,7 @@ interface TestProtocolInternals { // Mock Transport class class MockTransport implements Transport { - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: unknown) => void; diff --git a/packages/core/test/shared/protocolTransportHandling.test.ts b/packages/core/test/shared/protocolTransportHandling.test.ts index 4e9c33e67..031b9646b 100644 --- a/packages/core/test/shared/protocolTransportHandling.test.ts +++ b/packages/core/test/shared/protocolTransportHandling.test.ts @@ -8,7 +8,7 @@ import type { EmptyResult, JSONRPCMessage, Notification, Request, Result } from // Mock Transport class class MockTransport implements Transport { id: string; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: unknown) => void; sentMessages: JSONRPCMessage[] = []; diff --git a/packages/server/src/server/stdio.ts b/packages/server/src/server/stdio.ts index ac2dd3f78..d448fa560 100644 --- a/packages/server/src/server/stdio.ts +++ b/packages/server/src/server/stdio.ts @@ -26,7 +26,7 @@ export class StdioServerTransport implements Transport { private _stdout: Writable = process.stdout ) {} - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; @@ -59,6 +59,9 @@ export class StdioServerTransport implements Transport { this._stdin.on('data', this._ondata); this._stdin.on('error', this._onerror); this._stdout.on('error', this._onstdouterror); + this._stdin.on('end', () => { + this.close(); + }); } private processReadBuffer() { @@ -97,7 +100,7 @@ export class StdioServerTransport implements Transport { // Clear the buffer and notify closure this._readBuffer.clear(); - this.onclose?.(); + await this.onclose?.(); } send(message: JSONRPCMessage): Promise { diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 31053f35c..29146feae 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -241,7 +241,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { private _supportedProtocolVersions: string[]; sessionId?: string; - onclose?: () => void; + onclose?: () => void | Promise; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; @@ -908,7 +908,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // Clear any pending responses this._requestResponseMap.clear(); - this.onclose?.(); + await this.onclose?.(); } /** diff --git a/packages/server/test/server/stdio.test.ts b/packages/server/test/server/stdio.test.ts index 92671cacd..9afa4d017 100644 --- a/packages/server/test/server/stdio.test.ts +++ b/packages/server/test/server/stdio.test.ts @@ -41,6 +41,43 @@ test('should start then close cleanly', async () => { expect(didClose).toBeTruthy(); }); +test('should close when stdin ends', async () => { + const server = new StdioServerTransport(input, output); + server.onerror = error => { + throw error; + }; + + let didClose = false; + server.onclose = () => { + didClose = true; + }; + + await server.start(); + expect(didClose).toBeFalsy(); + + input.push(null); // signal EOF + // Give the event loop a tick to process the end event + await new Promise(resolve => setTimeout(resolve, 50)); + expect(didClose).toBeTruthy(); +}); + +test('should await async onclose callback', async () => { + const server = new StdioServerTransport(input, output); + server.onerror = error => { + throw error; + }; + + let cleanupDone = false; + server.onclose = async () => { + await new Promise(resolve => setTimeout(resolve, 50)); + cleanupDone = true; + }; + + await server.start(); + await server.close(); + expect(cleanupDone).toBeTruthy(); +}); + test('should not read until started', async () => { const server = new StdioServerTransport(input, output); server.onerror = error => { @@ -171,8 +208,12 @@ test('should fire onerror before onclose on stdout error', async () => { const server = new StdioServerTransport(input, output); const events: string[] = []; - server.onerror = () => events.push('error'); - server.onclose = () => events.push('close'); + server.onerror = () => { + events.push('error'); + }; + server.onclose = () => { + events.push('close'); + }; await server.start(); output.emit('error', new Error('EPIPE'));