diff --git a/.changeset/subscriptions-listen-result.md b/.changeset/subscriptions-listen-result.md new file mode 100644 index 0000000000..4f62eb608b --- /dev/null +++ b/.changeset/subscriptions-listen-result.md @@ -0,0 +1,7 @@ +--- +'@modelcontextprotocol/core': minor +'@modelcontextprotocol/server': minor +'@modelcontextprotocol/client': minor +--- + +`subscriptions/listen` graceful close: per spec PR #2953, a server-side graceful close (`createMcpHandler` / `serveStdio` `close()`) now emits the empty `subscriptions/listen` JSON-RPC result (the new `SubscriptionsListenResult` — `_meta` carries the subscriptionId) before closing the stream, replacing the previous server-originated `notifications/cancelled`. On the client, `McpSubscription.closed` now resolves `'graceful'` for this signal (added alongside `'local'` and `'remote'`); a stream close without a result remains `'remote'` (unexpected disconnect). diff --git a/docs/migration/support-2026-07-28.md b/docs/migration/support-2026-07-28.md index c00fba923d..a094123616 100644 --- a/docs/migration/support-2026-07-28.md +++ b/docs/migration/support-2026-07-28.md @@ -356,6 +356,12 @@ explicitly. `resources/subscribe` is 2025-only — on a 2026-07-28 connection, r `notifications/resources/updated` via the `resourceSubscriptions` field of the listen filter instead. +**Graceful close.** When the server closes the listen stream deliberately (entry +`close()`/shutdown), it sends the empty `subscriptions/listen` JSON-RPC result before +closing the stream; `McpSubscription.closed` resolves `'graceful'`. A stream close +without a result resolves `'remote'` and indicates an unexpected disconnect — re-listen +if you still want events. + --- ## `Mcp-Param-*` and standard headers (SEP-2243) diff --git a/packages/client/src/client/client.ts b/packages/client/src/client/client.ts index 02a240f15a..a77a29b20c 100644 --- a/packages/client/src/client/client.ts +++ b/packages/client/src/client/client.ts @@ -406,10 +406,13 @@ export interface McpSubscription { * * - `'local'` — you called {@linkcode close} (or aborted the * `RequestOptions.signal` you passed to `listen()`). - * - `'remote'` — the server cancelled, the stream ended, or the transport - * dropped. Re-listen if you still want events. + * - `'graceful'` — the server ended the subscription deliberately by + * sending the empty `subscriptions/listen` response (e.g. on shutdown). + * - `'remote'` — the stream ended without a response, or the transport + * dropped — an unexpected disconnect. Re-listen if you still want + * events. */ - readonly closed: Promise<'local' | 'remote'>; + readonly closed: Promise<'local' | 'graceful' | 'remote'>; } /** @internal */ @@ -421,7 +424,7 @@ interface ListenStateEntry { * failure, ack timeout, caller-signal abort, `_resetConnectionState` — * routes through it. */ - settle: (outcome: { ack: SubscriptionFilter } | { cause: 'local' | 'remote'; error?: Error }) => void; + settle: (outcome: { ack: SubscriptionFilter } | { cause: 'local' | 'graceful' | 'remote'; error?: Error }) => void; } /** @@ -1894,12 +1897,12 @@ export class Client extends Protocol { // settle()'s `→ closed` transition; never rejects. When listen() // itself rejects (pre-ack) there is no McpSubscription to observe it // on — settle() resolves it anyway so nothing dangles. - let resolveClosed!: (cause: 'local' | 'remote') => void; - const closed = new Promise<'local' | 'remote'>(resolve => { + let resolveClosed!: (cause: 'local' | 'graceful' | 'remote') => void; + const closed = new Promise<'local' | 'graceful' | 'remote'>(resolve => { resolveClosed = resolve; }); - const settle = (outcome: { ack: SubscriptionFilter } | { cause: 'local' | 'remote'; error?: Error }): void => { + const settle = (outcome: { ack: SubscriptionFilter } | { cause: 'local' | 'graceful' | 'remote'; error?: Error }): void => { if (state === 'closed') return; const wasOpening = state === 'opening'; if (ackTimer !== undefined) { @@ -2103,13 +2106,14 @@ export class Client extends Protocol { } /** - * Transport-level demux for `subscriptions/listen` responses. The spec - * defines listen as never receiving a JSON-RPC result; a JSON-RPC ERROR - * for the listen id is the server's pre-ack capacity/params rejection. A - * string-id response that matches a live `_listenState` entry is consumed - * here (Protocol's `_responseHandlers` map is keyed by NUMBER and never - * holds a listen id, so passing a string-id response through would - * surface as "unknown message ID" via `onerror`). + * Transport-level demux for `subscriptions/listen` responses. A JSON-RPC + * ERROR for the listen id is the server's pre-ack capacity/params + * rejection; a JSON-RPC RESULT for the listen id is the spec's + * `SubscriptionsListenResult` — the server's GRACEFUL-close signal (sent + * on shutdown). A string-id response that matches a live `_listenState` + * entry is consumed here (Protocol's `_responseHandlers` map is keyed by + * NUMBER and never holds a listen id, so passing a string-id response + * through would surface as "unknown message ID" via `onerror`). */ protected override _onresponse(response: JSONRPCResponse): void { const id = response.id; @@ -2121,11 +2125,20 @@ export class Client extends Protocol { error: ProtocolError.fromError(response.error.code, response.error.message, response.error.data) }); } else { + // The empty `SubscriptionsListenResult` — the server ended + // the subscription deliberately. Handles both pre-ack and + // post-ack: while opening, settle rejects the pending listen() + // promise with a ConnectionClosed (a server that answers + // before the ack is shutting down before serving); once open, + // settle transitions to closed and `closed` resolves + // 'graceful'. Per Q8, the result body itself is not validated + // — receipt for the listen id IS the signal (foreign servers + // may omit `_meta`). entry.settle({ - cause: 'remote', + cause: 'graceful', error: new SdkError( - SdkErrorCode.InvalidResult, - 'server answered subscriptions/listen with a result; expected the acknowledged notification' + SdkErrorCode.ConnectionClosed, + 'subscriptions/listen: server closed the subscription gracefully before acknowledging' ) }); } diff --git a/packages/client/test/client/listen.test.ts b/packages/client/test/client/listen.test.ts index 855dbc14b8..86e921e960 100644 --- a/packages/client/test/client/listen.test.ts +++ b/packages/client/test/client/listen.test.ts @@ -667,7 +667,7 @@ describe('Client.listen()', () => { await client.close(); }); - it('server answers listen with a JSON-RPC RESULT during opening: rejects with a typed InvalidResult (not 60s)', async () => { + it('server answers listen with a JSON-RPC RESULT during opening: rejects ConnectionClosed (graceful pre-ack close, not 60s)', async () => { const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); serverTx.onmessage = m => { const req = m as { id?: number | string; method?: string }; @@ -684,9 +684,9 @@ describe('Client.listen()', () => { }); } if (req.method === 'subscriptions/listen' && req.id !== undefined) { - // Buggy server: answers with a result instead of the - // acknowledged notification. Spec defines listen as never - // receiving a result. + // Server is shutting down: emits the SubscriptionsListenResult + // before ever sending the ack. The client treats receipt of + // any result for the listen id as the graceful-close signal. void serverTx.send({ jsonrpc: '2.0', id: req.id, result: {} }); } }; @@ -696,13 +696,35 @@ describe('Client.listen()', () => { const t0 = Date.now(); const error = await client.listen({ toolsListChanged: true }).catch(e => e as SdkError); expect(error).toBeInstanceOf(SdkError); - expect((error as SdkError).code).toBe(SdkErrorCode.InvalidResult); - expect((error as SdkError).message).toContain('expected the acknowledged notification'); + expect((error as SdkError).code).toBe(SdkErrorCode.ConnectionClosed); + expect((error as SdkError).message).toContain('closed the subscription gracefully before acknowledging'); expect(Date.now() - t0).toBeLessThan(1000); expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); await client.close(); }); + it("inbound SubscriptionsListenResult post-ack: closed resolves 'graceful'; subscription torn down", async () => { + let listenId!: number | string; + let send!: (m: JSONRPCMessage) => void; + const { clientTx } = await scriptedModern((id, _f, s) => { + listenId = id; + send = s; + }); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + // The spec's graceful-close signal: the server emits the empty + // subscriptions/listen response, then closes the stream. + send({ + jsonrpc: '2.0', + id: listenId, + result: { resultType: 'complete', _meta: { [SUBSCRIPTION_ID_META_KEY]: listenId } } + } as JSONRPCMessage); + await expect(sub.closed).resolves.toBe('graceful'); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + await client.close(); + }); + it('transport closes BEFORE the ack: listen() rejects fast', async () => { const { clientTx, serverTx } = await scriptedModernNoAck(); const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); diff --git a/packages/codemod/src/generated/specSchemaMap.ts b/packages/codemod/src/generated/specSchemaMap.ts index 3c75149a72..4b130b5e6f 100644 --- a/packages/codemod/src/generated/specSchemaMap.ts +++ b/packages/codemod/src/generated/specSchemaMap.ts @@ -147,6 +147,8 @@ export const SPEC_SCHEMA_NAMES: ReadonlySet = new Set([ 'SubscriptionsAcknowledgedNotificationSchema', 'SubscriptionsListenRequestParamsSchema', 'SubscriptionsListenRequestSchema', + 'SubscriptionsListenResultMetaSchema', + 'SubscriptionsListenResultSchema', 'TaskAugmentedRequestParamsSchema', 'TaskCreationParamsSchema', 'TaskMetadataSchema', diff --git a/packages/core/src/types/schemas.ts b/packages/core/src/types/schemas.ts index 2efae28424..2ebf045ded 100644 --- a/packages/core/src/types/schemas.ts +++ b/packages/core/src/types/schemas.ts @@ -1,6 +1,6 @@ import * as z from 'zod/v4'; -import { JSONRPC_VERSION, RELATED_TASK_META_KEY } from './constants.js'; +import { JSONRPC_VERSION, RELATED_TASK_META_KEY, SUBSCRIPTION_ID_META_KEY } from './constants.js'; import type { JSONArray, JSONObject, JSONValue } from './types.js'; export const JSONValueSchema: z.ZodType = z.lazy(() => @@ -959,6 +959,26 @@ export const SubscriptionsAcknowledgedNotificationSchema = NotificationSchema.ex params: SubscriptionsAcknowledgedNotificationParamsSchema }); +/** + * `_meta` for a {@linkcode SubscriptionsListenResult}: the listen request's + * JSON-RPC ID under the canonical subscription-id key (mirroring the same key + * on every notification delivered on the stream). + */ +export const SubscriptionsListenResultMetaSchema = z.looseObject({ + [SUBSCRIPTION_ID_META_KEY]: RequestIdSchema +}); + +/** + * The response to a `subscriptions/listen` request, signalling that the + * subscription has ended gracefully (for example, during server shutdown). + * Because the listen stream is long-lived, this result is sent only when the + * server tears the subscription down; an abrupt transport close carries no + * response. The result body is otherwise empty. + */ +export const SubscriptionsListenResultSchema = ResultSchema.extend({ + _meta: SubscriptionsListenResultMetaSchema +}); + /** * Parameters for a {@linkcode ResourceUpdatedNotification | notifications/resources/updated} notification. */ @@ -2394,5 +2414,6 @@ export const ServerResultSchema = z.union([ ListResourceTemplatesResultSchema, ReadResourceResultSchema, CallToolResultSchema, - ListToolsResultSchema + ListToolsResultSchema, + SubscriptionsListenResultSchema ]); diff --git a/packages/core/src/types/spec.types.2026-07-28.ts b/packages/core/src/types/spec.types.2026-07-28.ts index 9111ccda50..a8ec4f8077 100644 --- a/packages/core/src/types/spec.types.2026-07-28.ts +++ b/packages/core/src/types/spec.types.2026-07-28.ts @@ -3,7 +3,7 @@ * * Source: https://github.com/modelcontextprotocol/modelcontextprotocol * Pulled from: https://raw.githubusercontent.com/modelcontextprotocol/modelcontextprotocol/main/schema/draft/schema.ts - * Last updated from commit: dc105208d6c5737c010ed3b6ff50ca19746317c1 + * Last updated from commit: f68d864a813754e188c6df52dcc5772a12f96c63 * * DO NOT EDIT THIS FILE MANUALLY. Changes will be overwritten by automated updates. * To update this file, run: pnpm run fetch:spec-types 2026-07-28 @@ -1283,6 +1283,40 @@ export interface SubscriptionsListenRequest extends JSONRPCRequest { params: SubscriptionsListenRequestParams; } +/** + * Extends {@link MetaObject} with the subscription-stream identifier carried by a + * {@link SubscriptionsListenResult}. All key naming rules from `MetaObject` apply. + * + * @see {@link MetaObject} for key naming rules and reserved prefixes. + * @category `subscriptions/listen` + */ +export interface SubscriptionsListenResultMeta extends MetaObject { + /** + * Identifies the subscription stream this response closes, so the client can + * correlate it with the originating subscription — mirroring the same key on + * the stream's notifications. The value is the JSON-RPC ID of the + * `subscriptions/listen` request that opened the stream (and equals this + * response's `id`). + */ + 'io.modelcontextprotocol/subscriptionId': RequestId; +} + +/** + * The response to a {@link SubscriptionsListenRequest | subscriptions/listen} + * request, signalling that the subscription has ended gracefully (for example, + * during server shutdown). Because the listen stream is long-lived, this result + * is sent only when the server tears the subscription down; an abrupt transport + * close carries no response. The result body is otherwise empty. + * + * @example Subscription closed gracefully + * {@includeCode ./examples/SubscriptionsListenResult/listen-closed.json} + * + * @category `subscriptions/listen` + */ +export interface SubscriptionsListenResult extends Result { + _meta: SubscriptionsListenResultMeta; +} + /** * Parameters for a {@link SubscriptionsAcknowledgedNotification | notifications/subscriptions/acknowledged} notification. * @@ -3086,6 +3120,7 @@ export type ServerResult = | ListResourceTemplatesResult | ListResourcesResult | ReadResourceResult + | SubscriptionsListenResult | CallToolResult | ListToolsResult | InputRequiredResult; diff --git a/packages/core/src/types/specTypeSchema.ts b/packages/core/src/types/specTypeSchema.ts index a011d1f7f5..c3cc3e0e74 100644 --- a/packages/core/src/types/specTypeSchema.ts +++ b/packages/core/src/types/specTypeSchema.ts @@ -167,6 +167,8 @@ const SPEC_SCHEMA_KEYS = [ 'SubscriptionsAcknowledgedNotificationParamsSchema', 'SubscriptionsListenRequestSchema', 'SubscriptionsListenRequestParamsSchema', + 'SubscriptionsListenResultSchema', + 'SubscriptionsListenResultMetaSchema', 'TaskAugmentedRequestParamsSchema', 'TaskCreationParamsSchema', 'TaskMetadataSchema', diff --git a/packages/core/src/types/types.ts b/packages/core/src/types/types.ts index 8c6cb07814..3c349286d8 100644 --- a/packages/core/src/types/types.ts +++ b/packages/core/src/types/types.ts @@ -149,6 +149,8 @@ import type { SubscriptionsAcknowledgedNotificationSchema, SubscriptionsListenRequestParamsSchema, SubscriptionsListenRequestSchema, + SubscriptionsListenResultMetaSchema, + SubscriptionsListenResultSchema, TaskAugmentedRequestParamsSchema, TaskCreationParamsSchema, TaskMetadataSchema, @@ -376,6 +378,8 @@ export type SubscriptionsListenRequestParams = Infer; export type SubscriptionsAcknowledgedNotificationParams = Infer; export type SubscriptionsAcknowledgedNotification = Infer; +export type SubscriptionsListenResultMeta = Infer; +export type SubscriptionsListenResult = StripWireOnly>; /* Prompts */ export type PromptArgument = Infer; @@ -681,11 +685,11 @@ export type ResultTypeMap = { 'resources/read': ReadResourceResult; 'resources/subscribe': EmptyResult; 'resources/unsubscribe': EmptyResult; - // `subscriptions/listen` never receives a JSON-RPC result on the wire: - // termination is stream close (HTTP) or `notifications/cancelled` (stdio). - // The `EmptyResult` entry exists only to keep the mapped types total — - // see `Client.listen()` and the serving entries' listen routers. - 'subscriptions/listen': EmptyResult; + // `subscriptions/listen` receives a JSON-RPC result only on a server-side + // graceful close (the empty `SubscriptionsListenResult`). Listen requests + // never reach `request()` / the typed result map — `Client.listen()` sends + // directly on the transport and demuxes the response in `_onresponse`. + 'subscriptions/listen': SubscriptionsListenResult; 'tools/call': CallToolResult; 'tools/list': ListToolsResult; 'sampling/createMessage': CreateMessageResult | CreateMessageResultWithTools; diff --git a/packages/core/src/wire/rev2026-07-28/schemas.ts b/packages/core/src/wire/rev2026-07-28/schemas.ts index 370ab10d7e..240a75d655 100644 --- a/packages/core/src/wire/rev2026-07-28/schemas.ts +++ b/packages/core/src/wire/rev2026-07-28/schemas.ts @@ -1014,6 +1014,23 @@ export const SubscriptionFilterSchema = z.object({ const subscriptionsListenParamsShape = { notifications: SubscriptionFilterSchema }; export const SubscriptionsListenRequestSchema = wireRequest('subscriptions/listen', subscriptionsListenParamsShape); +/** Anchor SubscriptionsListenResultMeta — required subscriptionId stamp on the graceful-close result. */ +export const SubscriptionsListenResultMetaSchema = z.looseObject({ + 'io.modelcontextprotocol/subscriptionId': RequestIdSchema +}); + +/** + * Anchor SubscriptionsListenResult (2026-only). The empty `subscriptions/listen` + * response signalling that the subscription has ended gracefully (server + * shutdown). An abrupt transport close carries no response — the client treats + * stream-close-without-result as a disconnect. + */ +export const SubscriptionsListenResultSchema = z.looseObject({ + /** Required `_meta` (the subscriptionId stamp); the result body is otherwise empty. */ + _meta: SubscriptionsListenResultMetaSchema, + resultType: ResultTypeSchema.default('complete') +}); + /** * The 2026-era request-method set — the hand-registry seed (see registry.ts * for the seed decisions). The dispatch maps below are mapped types over this @@ -1114,10 +1131,11 @@ export const dispatchResultSchemas: { readonly [M in Rev2026RequestMethod]: z.Zo serverInfo: ImplementationSchema, instructions: z.string().optional() }), - // `subscriptions/listen` never receives a JSON-RPC result: termination is - // stream close (HTTP) or `notifications/cancelled` (stdio). The empty - // entry keeps the mapped type total; the codec's `decodeResult` would - // never be called for this method in practice. + // `subscriptions/listen` receives a JSON-RPC result only on a server-side + // graceful close (the empty `SubscriptionsListenResult` — `_meta` carries + // the subscriptionId stamp). The dispatch result schema stays the lifted + // empty body so the mapped type is total; the listen-response demux is + // entry-layer (`Client._onresponse`) and never reaches `decodeResult`. 'subscriptions/listen': liftedResult({}) }; diff --git a/packages/core/test/corpus/fixtures/2026-07-28/PromptListChangedNotification/prompts-list-changed.json b/packages/core/test/corpus/fixtures/2026-07-28/PromptListChangedNotification/prompts-list-changed.json index 858cd5d874..1fb5771b28 100644 --- a/packages/core/test/corpus/fixtures/2026-07-28/PromptListChangedNotification/prompts-list-changed.json +++ b/packages/core/test/corpus/fixtures/2026-07-28/PromptListChangedNotification/prompts-list-changed.json @@ -1,4 +1,9 @@ { "jsonrpc": "2.0", - "method": "notifications/prompts/list_changed" + "method": "notifications/prompts/list_changed", + "params": { + "_meta": { + "io.modelcontextprotocol/subscriptionId": "listen-1" + } + } } diff --git a/packages/core/test/corpus/fixtures/2026-07-28/ResourceListChangedNotification/resources-list-changed.json b/packages/core/test/corpus/fixtures/2026-07-28/ResourceListChangedNotification/resources-list-changed.json index 6ba5e168ec..8c894de0ed 100644 --- a/packages/core/test/corpus/fixtures/2026-07-28/ResourceListChangedNotification/resources-list-changed.json +++ b/packages/core/test/corpus/fixtures/2026-07-28/ResourceListChangedNotification/resources-list-changed.json @@ -1,4 +1,9 @@ { "jsonrpc": "2.0", - "method": "notifications/resources/list_changed" + "method": "notifications/resources/list_changed", + "params": { + "_meta": { + "io.modelcontextprotocol/subscriptionId": "listen-1" + } + } } diff --git a/packages/core/test/corpus/fixtures/2026-07-28/ResourceUpdatedNotification/file-resource-updated-notification.json b/packages/core/test/corpus/fixtures/2026-07-28/ResourceUpdatedNotification/file-resource-updated-notification.json index b5f9ef67f7..0b85f54ba7 100644 --- a/packages/core/test/corpus/fixtures/2026-07-28/ResourceUpdatedNotification/file-resource-updated-notification.json +++ b/packages/core/test/corpus/fixtures/2026-07-28/ResourceUpdatedNotification/file-resource-updated-notification.json @@ -2,6 +2,9 @@ "jsonrpc": "2.0", "method": "notifications/resources/updated", "params": { + "_meta": { + "io.modelcontextprotocol/subscriptionId": "listen-1" + }, "uri": "file:///project/src/main.rs" } } diff --git a/packages/core/test/corpus/fixtures/2026-07-28/SubscriptionsListenResult/listen-closed.json b/packages/core/test/corpus/fixtures/2026-07-28/SubscriptionsListenResult/listen-closed.json new file mode 100644 index 0000000000..2e7e507206 --- /dev/null +++ b/packages/core/test/corpus/fixtures/2026-07-28/SubscriptionsListenResult/listen-closed.json @@ -0,0 +1,6 @@ +{ + "resultType": "complete", + "_meta": { + "io.modelcontextprotocol/subscriptionId": "listen-1" + } +} diff --git a/packages/core/test/corpus/fixtures/2026-07-28/ToolListChangedNotification/tools-list-changed.json b/packages/core/test/corpus/fixtures/2026-07-28/ToolListChangedNotification/tools-list-changed.json index a28e846763..89cd4e1a6f 100644 --- a/packages/core/test/corpus/fixtures/2026-07-28/ToolListChangedNotification/tools-list-changed.json +++ b/packages/core/test/corpus/fixtures/2026-07-28/ToolListChangedNotification/tools-list-changed.json @@ -1,4 +1,9 @@ { "jsonrpc": "2.0", - "method": "notifications/tools/list_changed" + "method": "notifications/tools/list_changed", + "params": { + "_meta": { + "io.modelcontextprotocol/subscriptionId": "listen-1" + } + } } diff --git a/packages/core/test/corpus/fixtures/2026-07-28/manifest.json b/packages/core/test/corpus/fixtures/2026-07-28/manifest.json index 4be2503e4f..ef08b0f827 100644 --- a/packages/core/test/corpus/fixtures/2026-07-28/manifest.json +++ b/packages/core/test/corpus/fixtures/2026-07-28/manifest.json @@ -3,11 +3,11 @@ "source": { "repo": "modelcontextprotocol/modelcontextprotocol", "path": "schema/draft/examples", - "commit": "dc105208d6c5737c010ed3b6ff50ca19746317c1" + "commit": "f68d864a813754e188c6df52dcc5772a12f96c63" }, "regenerate": "pnpm fetch:spec-examples --spec-dir # or [sha] to fetch from GitHub", - "directoryCount": 86, - "fileCount": 127, + "directoryCount": 87, + "fileCount": 128, "directories": { "AudioContent": [ "audio-wav-content.json" @@ -270,6 +270,9 @@ "SubscriptionsListenRequest": [ "listen-for-list-changes.json" ], + "SubscriptionsListenResult": [ + "listen-closed.json" + ], "TextContent": [ "text-content.json" ], diff --git a/packages/core/test/corpus/schema-twins/2026-07-28.schema.json b/packages/core/test/corpus/schema-twins/2026-07-28.schema.json index be1ae2f93c..82cac4d58b 100644 --- a/packages/core/test/corpus/schema-twins/2026-07-28.schema.json +++ b/packages/core/test/corpus/schema-twins/2026-07-28.schema.json @@ -3220,6 +3220,9 @@ { "$ref": "#/$defs/ReadResourceResult" }, + { + "$ref": "#/$defs/SubscriptionsListenResult" + }, { "$ref": "#/$defs/ListPromptsResult" }, @@ -3389,6 +3392,36 @@ ], "type": "object" }, + "SubscriptionsListenResult": { + "description": "The response to a {@link SubscriptionsListenRequestsubscriptions/listen}\nrequest, signalling that the subscription has ended gracefully (for example,\nduring server shutdown). Because the listen stream is long-lived, this result\nis sent only when the server tears the subscription down; an abrupt transport\nclose carries no response. The result body is otherwise empty.", + "properties": { + "_meta": { + "$ref": "#/$defs/SubscriptionsListenResultMeta" + }, + "resultType": { + "description": "Indicates the type of the result, which allows the client to determine\nhow to parse the result object.\n\nServers implementing this protocol version MUST include this field.\nFor backward compatibility, when a client receives a result from a\nserver implementing an earlier protocol version (which does not include\n`resultType`), the client MUST treat the absent field as `\"complete\"`.", + "type": "string" + } + }, + "required": [ + "_meta", + "resultType" + ], + "type": "object" + }, + "SubscriptionsListenResultMeta": { + "description": "Extends {@link MetaObject} with the subscription-stream identifier carried by a\n{@link SubscriptionsListenResult}. All key naming rules from `MetaObject` apply.", + "properties": { + "io.modelcontextprotocol/subscriptionId": { + "$ref": "#/$defs/RequestId", + "description": "Identifies the subscription stream this response closes, so the client can\ncorrelate it with the originating subscription — mirroring the same key on\nthe stream's notifications. The value is the JSON-RPC ID of the\n`subscriptions/listen` request that opened the stream (and equals this\nresponse's `id`)." + } + }, + "required": [ + "io.modelcontextprotocol/subscriptionId" + ], + "type": "object" + }, "TextContent": { "description": "Text provided to or from an LLM.", "properties": { diff --git a/packages/core/test/corpus/schema-twins/manifest.json b/packages/core/test/corpus/schema-twins/manifest.json index dd0c396684..d5f56f2134 100644 --- a/packages/core/test/corpus/schema-twins/manifest.json +++ b/packages/core/test/corpus/schema-twins/manifest.json @@ -2,12 +2,12 @@ "comment": "Vendored schema.json twins (TEST-ONLY conformance oracles; never bundled, never runtime). RAW upstream bytes - never reformat: each file is locked to the sha256/bytes below by schemaTwinConformance. Refresh via `pnpm fetch:schema-twins [sha]`, ATOMICALLY with the matching spec.types anchor (see packages/core/src/types/README.md lifecycle rule 4).", "source": { "repository": "modelcontextprotocol/modelcontextprotocol", - "commit": "dc105208d6c5737c010ed3b6ff50ca19746317c1" + "commit": "f68d864a813754e188c6df52dcc5772a12f96c63" }, "files": { "2026-07-28": { - "sha256": "2ee387342f81e9f38a87ece7abeaf29d9fe3769cd7400ccad1fb1f0b80966bb0", - "bytes": 176374, + "sha256": "14398c3dd2c66b9c3f6661fc7a7eaa24174952ed1598d0b7f011b686ba5c4c83", + "bytes": 178613, "upstreamPath": "schema/draft/schema.json" }, "2025-11-25": { diff --git a/packages/core/test/corpus/specCorpus.test.ts b/packages/core/test/corpus/specCorpus.test.ts index b546d5135e..8184af7fca 100644 --- a/packages/core/test/corpus/specCorpus.test.ts +++ b/packages/core/test/corpus/specCorpus.test.ts @@ -186,8 +186,8 @@ describe('corpus inventory pins', () => { // The corpus size at the pinned spec commit. A change here means the // vendored corpus was regenerated — review the delta deliberately. - expect(manifest.directoryCount).toBe(86); - expect(manifest.fileCount).toBe(127); + expect(manifest.directoryCount).toBe(87); + expect(manifest.fileCount).toBe(128); }); test('the frozen 2025-11-25 corpus keeps its inventory', () => { diff --git a/packages/core/test/spec.types.2026-07-28.test.ts b/packages/core/test/spec.types.2026-07-28.test.ts index ada62d07e0..fc58dd0e73 100644 --- a/packages/core/test/spec.types.2026-07-28.test.ts +++ b/packages/core/test/spec.types.2026-07-28.test.ts @@ -89,6 +89,8 @@ type WSubscriptionsListenRequest = z4.infer; type WSubscriptionsAcknowledgedNotificationParams = WSubscriptionsAcknowledgedNotification['params']; +type WSubscriptionsListenResult = z4.infer; +type WSubscriptionsListenResultMeta = z4.infer; // The anchor's ClientRequest union, composed from the era module's wire requests. type WClientRequest = | WCompleteRequest @@ -149,6 +151,7 @@ type WServerResult = | WReadResourceResult | WCallToolResult | WListToolsResult + | WSubscriptionsListenResult | WInputRequiredResult; const sdkTypeChecks = { @@ -790,6 +793,14 @@ const wireParityChecks = { sdk = spec; spec = sdk; }, + SubscriptionsListenResult: (sdk: WSubscriptionsListenResult, spec: SpecTypes.SubscriptionsListenResult) => { + sdk = spec; + spec = sdk; + }, + SubscriptionsListenResultMeta: (sdk: WSubscriptionsListenResultMeta, spec: SpecTypes.SubscriptionsListenResultMeta) => { + sdk = spec; + spec = sdk; + }, ClientRequest: (sdk: WithJSONRPCRequest, spec: SpecTypes.ClientRequest) => { sdk = spec; spec = sdk; @@ -855,7 +866,8 @@ describe('Spec Types (2026-07-28)', () => { expect(specTypes).toContain('DiscoverRequest'); expect(specTypes).toContain('InputRequiredResult'); expect(specTypes).toContain('SubscriptionsListenRequest'); - expect(specTypes).toHaveLength(151); + expect(specTypes).toContain('SubscriptionsListenResult'); + expect(specTypes).toHaveLength(153); }); it('should only allowlist types that exist in the 2026-07-28 schema', () => { diff --git a/packages/server/src/server/listenRouter.ts b/packages/server/src/server/listenRouter.ts index 82a9b9db3e..4bff1c2a70 100644 --- a/packages/server/src/server/listenRouter.ts +++ b/packages/server/src/server/listenRouter.ts @@ -17,7 +17,11 @@ * - Every notification on the stream (including the ack) carries the listen * request's JSON-RPC id under `_meta['io.modelcontextprotocol/subscriptionId']`. * - The server MUST NOT deliver a notification type the client did not request. - * - Termination is stream close (HTTP); no JSON-RPC result is ever emitted. + * - Server-side graceful close (`closeAll()`) emits the empty + * `subscriptions/listen` JSON-RPC result (the `SubscriptionsListenResult` — + * `_meta` carries the subscription id) before closing the stream; an abrupt + * transport close carries no response and the client treats it as a + * disconnect. */ import type { JSONRPCRequest, RequestId, ServerCapabilities, SubscriptionFilter } from '@modelcontextprotocol/core'; import { codecForVersion, MODERN_WIRE_REVISION, SUBSCRIPTION_ID_META_KEY } from '@modelcontextprotocol/core'; @@ -99,8 +103,9 @@ export interface ListenRouter { */ serve(message: JSONRPCRequest, signal: AbortSignal | undefined, capabilities: ServerCapabilities): Response; /** - * Close every open subscription stream (HTTP teardown is stream close — - * no JSON-RPC result is written). + * Gracefully close every open subscription stream: emits the empty + * `subscriptions/listen` JSON-RPC result (the spec's graceful-close + * signal) as the final SSE frame, then closes the stream. */ closeAll(): void; /** The number of currently open subscription streams (for tests / introspection). */ @@ -112,7 +117,7 @@ export function createListenRouter(options: ListenRouterOptions): ListenRouter { const maxSubscriptions = options.maxSubscriptions ?? DEFAULT_MAX_SUBSCRIPTIONS; const keepAliveMs = options.keepAliveMs ?? DEFAULT_LISTEN_KEEPALIVE_MS; - const open = new Set<() => void>(); + const open = new Set<(graceful: boolean) => void>(); function serve(message: JSONRPCRequest, signal: AbortSignal | undefined, capabilities: ServerCapabilities): Response { // Capacity guard, pre-ack: in-band -32603 on HTTP 200. @@ -149,8 +154,22 @@ export function createListenRouter(options: ListenRouterOptions): ListenRouter { writeFrame(`event: message\ndata: ${JSON.stringify({ jsonrpc: '2.0', method, params })}\n\n`); }; - const teardown = () => { + const teardown = (graceful: boolean) => { if (closed) return; + if (graceful) { + // Server-side graceful close: emit the empty + // `subscriptions/listen` JSON-RPC result before closing the + // stream so the client distinguishes graceful end from a + // transport drop. Written before `closed = true` so writeFrame + // still enqueues. + writeFrame( + `event: message\ndata: ${JSON.stringify({ + jsonrpc: '2.0', + id: subscriptionId, + result: { resultType: 'complete', _meta: { [SUBSCRIPTION_ID_META_KEY]: subscriptionId } } + })}\n\n` + ); + } closed = true; unsubscribe?.(); if (keepAliveTimer !== undefined) clearInterval(keepAliveTimer); @@ -194,16 +213,18 @@ export function createListenRouter(options: ListenRouterOptions): ListenRouter { open.add(teardown); }, cancel() { - // The client closed the SSE stream — the spec's HTTP cancel signal. - teardown(); + // The client closed the SSE stream — the spec's HTTP cancel + // signal. Not a server-side graceful close, so no listen + // result is written (and the consumer is gone anyway). + teardown(false); } }); if (signal !== undefined) { if (signal.aborted) { - teardown(); + teardown(false); } else { - const onAbort = () => teardown(); + const onAbort = () => teardown(false); signal.addEventListener('abort', onAbort, { once: true }); abortCleanup = () => signal.removeEventListener('abort', onAbort); } @@ -223,7 +244,7 @@ export function createListenRouter(options: ListenRouterOptions): ListenRouter { return { serve, closeAll() { - for (const teardown of open) teardown(); + for (const teardown of open) teardown(true); }, get openCount() { return open.size; @@ -350,14 +371,23 @@ export class StdioListenRouter { } /** - * Server-side teardown of every active subscription: returns the single - * `notifications/cancelled` per subscription id the entry MUST emit on - * stdio teardown (and clears the set so nothing further is delivered). + * Server-side graceful teardown of every active subscription: returns the + * empty `subscriptions/listen` JSON-RPC result for each subscription id — + * the spec's graceful-close signal — for the entry to emit before closing + * the wire. Clears the set so nothing further is delivered. */ - teardownAll(): { jsonrpc: '2.0'; method: 'notifications/cancelled'; params: { requestId: RequestId } }[] { - const out: { jsonrpc: '2.0'; method: 'notifications/cancelled'; params: { requestId: RequestId } }[] = []; + teardownAll(): { + jsonrpc: '2.0'; + id: RequestId; + result: { resultType: 'complete'; _meta: { [SUBSCRIPTION_ID_META_KEY]: RequestId } }; + }[] { + const out: { + jsonrpc: '2.0'; + id: RequestId; + result: { resultType: 'complete'; _meta: { [SUBSCRIPTION_ID_META_KEY]: RequestId } }; + }[] = []; for (const id of this._subs.keys()) { - out.push({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: id } }); + out.push({ jsonrpc: '2.0', id, result: { resultType: 'complete', _meta: { [SUBSCRIPTION_ID_META_KEY]: id } } }); } this._subs.clear(); return out; diff --git a/packages/server/src/server/serveStdio.ts b/packages/server/src/server/serveStdio.ts index 496e3d6ac8..a3fed9b4ee 100644 --- a/packages/server/src/server/serveStdio.ts +++ b/packages/server/src/server/serveStdio.ts @@ -774,11 +774,13 @@ export function serveStdio(factory: McpServerFactory, options: ServeStdioOptions closing = true; const current = state; state = { phase: 'closed' }; - // Stdio server-side teardown: emit ONE `notifications/cancelled` per - // active listen subscription referencing the listen request id (the - // spec MUST), before the wire is closed. - for (const cancelled of listenRouter.teardownAll()) { - await wire.send(cancelled).catch(error => reportError(toError(error))); + // Stdio server-side graceful teardown: emit the empty + // `subscriptions/listen` JSON-RPC result for every active subscription + // (the spec's graceful-close signal — `SubscriptionsListenResult`) + // before the wire is closed, so the client distinguishes graceful end + // from a transport drop. + for (const result of listenRouter.teardownAll()) { + await wire.send(result).catch(error => reportError(toError(error))); } if (current.phase === 'probe' || current.phase === 'pinned') { await current.instance.product.close().catch(error => reportError(toError(error))); diff --git a/packages/server/test/server/createMcpHandlerListen.test.ts b/packages/server/test/server/createMcpHandlerListen.test.ts index 980e7e7578..2dd3fa372c 100644 --- a/packages/server/test/server/createMcpHandlerListen.test.ts +++ b/packages/server/test/server/createMcpHandlerListen.test.ts @@ -185,22 +185,31 @@ describe('createMcpHandler — subscriptions/listen', () => { await handler.close(); }); - it('handler.close() tears down every open listen stream (HTTP teardown is stream close)', async () => { + it('handler.close() emits the empty subscriptions/listen result, then closes the stream (graceful-close signal)', async () => { const handler = createMcpHandler(trivialFactory(), { keepAliveMs: 0 }); const response = await handler.fetch(listenRequest(1, { toolsListChanged: true })); const reader = response.body!.getReader(); // First frame is the ack. await reader.read(); await handler.close(); - // Stream-close termination: the read loop ends with no result frame. - let sawResult = false; + // Graceful-close termination: the SubscriptionsListenResult is the + // final SSE frame, then the stream ends. + let resultFrame: unknown; for (;;) { const { done, value } = await reader.read(); if (done) break; const text = new TextDecoder().decode(value); - if (text.includes('"result"')) sawResult = true; + const dataLine = text.split('\n').find(l => l.startsWith('data: ')); + if (dataLine) { + const message = JSON.parse(dataLine.slice(6)) as Record; + if ('result' in message) resultFrame = message; + } } - expect(sawResult).toBe(false); + expect(resultFrame).toEqual({ + jsonrpc: '2.0', + id: 1, + result: { resultType: 'complete', _meta: { 'io.modelcontextprotocol/subscriptionId': 1 } } + }); }); it('legacy-classified listen never reaches the entry listen router (no ack delivered)', async () => { diff --git a/packages/server/test/server/serveStdioListen.test.ts b/packages/server/test/server/serveStdioListen.test.ts index b78ae5a75a..af5e15c6ca 100644 --- a/packages/server/test/server/serveStdioListen.test.ts +++ b/packages/server/test/server/serveStdioListen.test.ts @@ -4,7 +4,8 @@ * Covers ack-first on the single channel, subscription-id stamping, the * pinned instance's send*ListChanged() feeding the connection's listen * router (era-gated; legacy unchanged), inbound cancel hardening, and the - * stdio teardown MUST (one notifications/cancelled per subscription id). + * graceful-close path (one empty `subscriptions/listen` result per + * subscription id on `handle.close()`). */ import type { JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, Transport } from '@modelcontextprotocol/core'; import { @@ -109,17 +110,22 @@ describe('serveStdio — subscriptions/listen', () => { await handle.close(); }); - it('handle.close() emits one notifications/cancelled per active subscription id (stdio teardown MUST)', async () => { + it('handle.close() emits one empty subscriptions/listen result per active subscription id (graceful-close signal)', async () => { const { handle, inbound, send, flush } = await bootModern(); await send(listenReq('s1', { toolsListChanged: true })); await send(listenReq('s2', { promptsListChanged: true })); await flush(); inbound.length = 0; await handle.close(); - const cancelled = inbound.filter(m => (m as JSONRPCNotification).method === 'notifications/cancelled'); - expect(cancelled.map(m => (m as JSONRPCNotification).params)).toEqual([{ requestId: 's1' }, { requestId: 's2' }]); - // No JSON-RPC result for the listen ids — termination is the cancelled notification only. - expect(inbound.some(m => 'result' in m)).toBe(false); + // The spec's SubscriptionsListenResult — one per subscription id, then + // the wire closes. No notifications/cancelled (the pre-#2953 path). + const results = inbound.filter(m => 'result' in m) as { id: unknown; result: unknown }[]; + expect(results.map(m => m.id)).toEqual(['s1', 's2']); + expect(results.map(m => m.result)).toEqual([ + { resultType: 'complete', _meta: { [SUBSCRIPTION_ID_META_KEY]: 's1' } }, + { resultType: 'complete', _meta: { [SUBSCRIPTION_ID_META_KEY]: 's2' } } + ]); + expect(inbound.some(m => (m as JSONRPCNotification).method === 'notifications/cancelled')).toBe(false); }); it('refuses pre-ack with -32603 when at capacity', async () => { diff --git a/test/e2e/requirements.ts b/test/e2e/requirements.ts index e4bb0c9b1e..1615d93997 100644 --- a/test/e2e/requirements.ts +++ b/test/e2e/requirements.ts @@ -3160,6 +3160,14 @@ export const REQUIREMENTS: Record = { transports: ['entryModern'], note: 'Hosted by the test body via createMcpHandler with maxSubscriptions: 1.' }, + 'subscriptions:listen:graceful-close': { + source: 'https://modelcontextprotocol.io/specification/draft/basic/patterns/subscriptions#graceful-closure', + behavior: + "On a server-side graceful close, the server emits the empty subscriptions/listen JSON-RPC result (the SubscriptionsListenResult — _meta carries the subscriptionId stamp) before closing the stream; the client surfaces this on McpSubscription.closed as 'graceful' (distinct from a transport drop, which surfaces as 'remote').", + addedInSpecVersion: '2026-07-28', + transports: ['entryModern'], + note: 'Hosted by the test body via createMcpHandler so it can call handler.close(). The stdio path is covered at unit level in serveStdioListen.test.ts.' + }, 'typescript:subscriptions:listChanged-auto-open-modern': { source: 'sdk', behavior: diff --git a/test/e2e/scenarios/subscriptions.test.ts b/test/e2e/scenarios/subscriptions.test.ts index e60e1bafce..0fb481d724 100644 --- a/test/e2e/scenarios/subscriptions.test.ts +++ b/test/e2e/scenarios/subscriptions.test.ts @@ -97,6 +97,23 @@ verifies('subscriptions:listen:ack-first-stamped', async () => { await handler.close(); }); +verifies('subscriptions:listen:graceful-close', async () => { + // Hosted directly so the test owns handler.close(); `await using` of + // hostListen() would close on dispose and obscure the assertion. + const handler = createMcpHandler(() => makeServer(), { legacy: 'reject', keepAliveMs: 0 }); + const fetch = (u: URL | string, init?: RequestInit) => handler.fetch(new Request(u, init)); + const client = new Client({ name: 'subs-e2e-client', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(new StreamableHTTPClientTransport(new URL('http://in-process/mcp'), { fetch })); + const sub = await client.listen({ toolsListChanged: true }); + // Server-side graceful close: the entry's listen router emits the empty + // SubscriptionsListenResult as the final SSE frame, then closes the + // stream. The client surfaces this as `closed: 'graceful'` (distinct from + // `'remote'`, which is the transport-drop / no-result path). + await handler.close(); + await expect(sub.closed).resolves.toBe('graceful'); + await client.close(); +}); + verifies('subscriptions:listen:per-stream-filter', async () => { await using h = await hostListen(); const seen: string[] = [];