Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/async-onclose-stdin-eof.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@modelcontextprotocol/core": patch
"@modelcontextprotocol/server": patch
"@modelcontextprotocol/client": patch
---

Allow async `onclose` callbacks on Transport and Protocol. The signature changes from `() => void` to `() => void | Promise<void>`, and all call sites now await the callback. This lets MCP servers perform async cleanup (e.g., releasing browser sessions or database connections) when the transport closes.

Close `StdioServerTransport` when stdin reaches EOF, so containerized servers exit cleanly on client disconnect.

Add SIGTERM handlers alongside SIGINT in all examples, since MCP servers run as background processes stopped by SIGTERM, not interactively via Ctrl+C.
4 changes: 4 additions & 0 deletions examples/client/src/elicitationUrlExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,10 @@ process.on('SIGINT', async () => {
console.log('\nReceived SIGINT. Cleaning up...');
await cleanup();
});
process.on('SIGTERM', async () => {
console.log('\nReceived SIGINT. Cleaning up...');
await cleanup();
});

// Start the interactive client
try {
Expand Down
5 changes: 5 additions & 0 deletions examples/client/src/simpleOAuthClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ async function main(): Promise<void> {
client.close();
process.exit(0);
});
process.on('SIGTERM', () => {
console.log('\n\n👋 Goodbye!');
client.close();
process.exit(0);
});

try {
await client.connect();
Expand Down
4 changes: 4 additions & 0 deletions examples/client/src/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@ process.on('SIGINT', async () => {
console.log('\nReceived SIGINT. Cleaning up...');
await cleanup();
});
process.on('SIGTERM', async () => {
console.log('\nReceived SIGINT. Cleaning up...');
await cleanup();
});

// Start the interactive client
try {
Expand Down
16 changes: 16 additions & 0 deletions examples/server/src/elicitationFormExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,22 @@ async function main() {
process.on('SIGINT', async () => {
console.log('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId]!.close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
Expand Down
17 changes: 17 additions & 0 deletions examples/server/src/elicitationUrlExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -731,3 +731,20 @@ process.on('SIGINT', async () => {
console.log('Server shutdown complete');
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId]!.close();
delete transports[sessionId];
delete sessionsNeedingElicitation[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});
4 changes: 4 additions & 0 deletions examples/server/src/jsonResponseStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,7 @@ process.on('SIGINT', async () => {
console.log('Shutting down server...');
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down server...');
process.exit(0);
});
5 changes: 5 additions & 0 deletions examples/server/src/simpleStatelessStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,8 @@ process.on('SIGINT', async () => {
// eslint-disable-next-line unicorn/no-process-exit
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down server...');
// eslint-disable-next-line unicorn/no-process-exit
process.exit(0);
});
16 changes: 16 additions & 0 deletions examples/server/src/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -815,3 +815,19 @@ process.on('SIGINT', async () => {
console.log('Server shutdown complete');
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId]!.close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});
15 changes: 15 additions & 0 deletions examples/server/src/simpleTaskInteractive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -741,3 +741,18 @@ process.on('SIGINT', async () => {
console.log('Server shutdown complete');
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('\nShutting down server...');
for (const sessionId of Object.keys(transports)) {
try {
await transports[sessionId]!.close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing session ${sessionId}:`, error);
}
}
taskStore.cleanup();
messageQueue.cleanup();
console.log('Server shutdown complete');
process.exit(0);
});
18 changes: 18 additions & 0 deletions examples/server/src/standaloneSseWithGetStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,21 @@ process.on('SIGINT', async () => {
console.log('Server shutdown complete');
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down server...');
clearInterval(resourceChangeInterval);

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId]!.close();
delete transports[sessionId];
delete servers[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});
4 changes: 2 additions & 2 deletions packages/client/src/client/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class SSEClientTransport implements Transport {
private _fetchWithInit: FetchLike;
private _protocolVersion?: string;

onclose?: () => void;
onclose?: () => void | Promise<void>;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

Expand Down Expand Up @@ -242,7 +242,7 @@ export class SSEClientTransport implements Transport {
async close(): Promise<void> {
this._abortController?.abort();
this._eventSource?.close();
this.onclose?.();
await this.onclose?.();
}

async send(message: JSONRPCMessage): Promise<void> {
Expand Down
6 changes: 3 additions & 3 deletions packages/client/src/client/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class StdioClientTransport implements Transport {
private _serverParams: StdioServerParameters;
private _stderrStream: PassThrough | null = null;

onclose?: () => void;
onclose?: () => void | Promise<void>;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

Expand Down Expand Up @@ -139,9 +139,9 @@ export class StdioClientTransport implements Transport {
resolve();
});

this._process.on('close', _code => {
this._process.on('close', async _code => {
this._process = undefined;
this.onclose?.();
await this.onclose?.();
});

this._process.stdin?.on('error', error => {
Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;

onclose?: () => void;
onclose?: () => void | Promise<void>;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

Expand Down Expand Up @@ -463,7 +463,7 @@ export class StreamableHTTPClientTransport implements Transport {
this._reconnectionTimeout = undefined;
}
this._abortController?.abort();
this.onclose?.();
await this.onclose?.();
}

async send(
Expand Down
6 changes: 3 additions & 3 deletions packages/client/src/client/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export class WebSocketClientTransport implements Transport {
private _socket?: WebSocket;
private _url: URL;

onclose?: () => void;
onclose?: () => void | Promise<void>;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

Expand Down Expand Up @@ -38,8 +38,8 @@ export class WebSocketClientTransport implements Transport {
resolve();
};

this._socket.onclose = () => {
this.onclose?.();
this._socket.onclose = async () => {
await this.onclose?.();
};

this._socket.onmessage = (event: MessageEvent) => {
Expand Down
29 changes: 13 additions & 16 deletions packages/core/src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
*
* This is invoked when {@linkcode Protocol.close | close()} is called as well.
*/
onclose?: () => void;
onclose?: () => void | Promise<void>;

/**
* Callback for when an error occurs.
Expand Down Expand Up @@ -456,11 +456,11 @@ export abstract class Protocol<ContextT extends BaseContext> {
async connect(transport: Transport): Promise<void> {
this._transport = transport;
const _onclose = this.transport?.onclose;
this._transport.onclose = () => {
this._transport.onclose = async () => {
try {
_onclose?.();
if (_onclose) await _onclose();
} finally {
this._onclose();
await this._onclose();
}
};

Expand Down Expand Up @@ -490,12 +490,15 @@ export abstract class Protocol<ContextT extends BaseContext> {
await this._transport.start();
}

private _onclose(): void {
private async _onclose(): Promise<void> {
const responseHandlers = this._responseHandlers;
this._responseHandlers = new Map();
this._progressHandlers.clear();
this._taskManager.onClose();
this._pendingDebouncedNotifications.clear();
this._transport = undefined;

await this.onclose?.();

for (const info of this._timeoutInfo.values()) {
clearTimeout(info.timeoutId);
Expand All @@ -507,18 +510,12 @@ export abstract class Protocol<ContextT extends BaseContext> {

const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed');

this._transport = undefined;

try {
this.onclose?.();
} finally {
for (const handler of responseHandlers.values()) {
handler(error);
}
for (const handler of responseHandlers.values()) {
handler(error);
}

for (const controller of requestHandlerAbortControllers.values()) {
controller.abort(error);
}
for (const controller of requestHandlerAbortControllers.values()) {
controller.abort(error);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/shared/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export interface Transport {
*
* This should be invoked when {@linkcode Transport.close | close()} is called as well.
*/
onclose?: () => void;
onclose?: () => void | Promise<void>;

/**
* Callback for when an error occurs.
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/util/inMemory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class InMemoryTransport implements Transport {
private _messageQueue: QueuedMessage[] = [];
private _closed = false;

onclose?: () => void;
onclose?: () => void | Promise<void>;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void;
sessionId?: string;
Expand Down Expand Up @@ -48,7 +48,7 @@ export class InMemoryTransport implements Transport {
try {
await other?.close();
} finally {
this.onclose?.();
await this.onclose?.();
}
}

Expand Down
24 changes: 18 additions & 6 deletions packages/core/test/inMemory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ describe('InMemoryTransport', () => {
let clientCloseCount = 0;
let serverCloseCount = 0;

clientTransport.onclose = () => clientCloseCount++;
serverTransport.onclose = () => serverCloseCount++;
clientTransport.onclose = () => {
clientCloseCount++;
};
serverTransport.onclose = () => {
serverCloseCount++;
};

await clientTransport.close();

Expand All @@ -114,7 +118,9 @@ describe('InMemoryTransport', () => {

test('should handle double close idempotently', async () => {
let clientCloseCount = 0;
clientTransport.onclose = () => clientCloseCount++;
clientTransport.onclose = () => {
clientCloseCount++;
};

await clientTransport.close();
await clientTransport.close();
Expand All @@ -126,8 +132,12 @@ describe('InMemoryTransport', () => {
let clientCloseCount = 0;
let serverCloseCount = 0;

clientTransport.onclose = () => clientCloseCount++;
serverTransport.onclose = () => serverCloseCount++;
clientTransport.onclose = () => {
clientCloseCount++;
};
serverTransport.onclose = () => {
serverCloseCount++;
};

await Promise.all([clientTransport.close(), serverTransport.close()]);

Expand All @@ -137,7 +147,9 @@ describe('InMemoryTransport', () => {

test('should fire onclose even if peer onclose throws', async () => {
let clientCloseCount = 0;
clientTransport.onclose = () => clientCloseCount++;
clientTransport.onclose = () => {
clientCloseCount++;
};
serverTransport.onclose = () => {
throw new Error('boom');
};
Expand Down
2 changes: 1 addition & 1 deletion packages/core/test/shared/protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ interface TestProtocolInternals {

// Mock Transport class
class MockTransport implements Transport {
onclose?: () => void;
onclose?: () => void | Promise<void>;
onerror?: (error: Error) => void;
onmessage?: (message: unknown) => void;

Expand Down
Loading
Loading