From 074024350a29cd97ddaefdb8ed097cd42eba715a Mon Sep 17 00:00:00 2001 From: Lobsterdog Contributors Date: Wed, 13 May 2026 11:18:57 -0600 Subject: [PATCH 1/7] ko-5so7v: fix concurrent message handling to prevent deadlock in Protocol - Dispatch each incoming message to a separate coroutine via CoroutineScope(SupervisorJob() + Dispatchers.Default) in Protocol.connect(), preventing deadlock when request handlers call session.request() before responding (issue #176) - Add ProtocolOptions.dispatcher for test injection - Cancel messageScope in Protocol.doClose() - Update existing CancellationException test to account for concurrent dispatch (CancellationException no longer propagates to caller; it cancels the individual coroutine) - Add unit tests for concurrent dispatch, SupervisorJob resilience, and scope cleanup on close - Add integration test using InMemoryTransport.createLinkedPair() verifying deadlock prevention end-to-end --- .../shared/ConcurrentMessageHandlingTest.kt | 122 ++++++++++++ .../kotlin/sdk/shared/Protocol.kt | 33 +++- .../kotlin/sdk/shared/ProtocolTest.kt | 180 +++++++++++++++++- 3 files changed, 321 insertions(+), 14 deletions(-) create mode 100644 integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt diff --git a/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt b/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt new file mode 100644 index 000000000..63479073b --- /dev/null +++ b/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt @@ -0,0 +1,122 @@ +package io.modelcontextprotocol.kotlin.sdk.shared + +import io.kotest.matchers.shouldBe +import io.modelcontextprotocol.kotlin.sdk.types.CustomRequest +import io.modelcontextprotocol.kotlin.sdk.types.EmptyResult +import io.modelcontextprotocol.kotlin.sdk.types.Method +import kotlinx.coroutines.async +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import kotlin.test.Test + +/** + * Integration test verifying that concurrent message dispatch prevents deadlock + * when a request handler sends its own request before responding. + * + * Uses InMemoryTransport.createLinkedPair() to set up two Protocol instances + * connected to each other, simulating a real client-server scenario. + */ +class ConcurrentMessageHandlingTest { + + /** + * End-to-end deadlock test: a "server-side" Protocol receives a request, + * and its handler calls request() on the "client-side" Protocol before + * responding. Without concurrent dispatch, the response from the client + * would be blocked behind the running server handler. + */ + @Test + fun `should not deadlock when server request handler sends request to client`() = runTest { + val (clientTransport, serverTransport) = InMemoryTransport.createLinkedPair() + + val server = TestProtocol() + val client = TestProtocol() + + listOf( + launch { server.connect(serverTransport) }, + launch { client.connect(clientTransport) }, + ).joinAll() + + // Register a handler on the client that responds to "test/client-method" + client.setRequestHandler(Method.Custom("test/client-method")) { _, _ -> + EmptyResult() + } + + // Register a handler on the server that, when receiving a request, + // sends its own request back to the client before responding. + // This is the deadlock scenario: the server handler calls request(), + // which suspends waiting for a response from the client. The client's + // response can only arrive through the same message loop. Without + // concurrent dispatch, this would deadlock. + server.setRequestHandler(Method.Custom("test/server-method")) { _, _ -> + // Send a request to the client before responding. + // This suspends until the client responds. + server.request( + request = CustomRequest(method = Method.Custom("test/client-method"), params = null), + ) + EmptyResult() + } + + // Send a request from the client to the server, triggering the handler + // that calls request() back to the client. + val result = client.request( + request = CustomRequest(method = Method.Custom("test/server-method"), params = null), + ) + + // If we reach here without timeout, the deadlock is fixed. + result shouldBe EmptyResult() + } + + /** + * Test that responses are processed concurrently with other responses, + * ensuring the message scope uses SupervisorJob so one handler failure + * doesn't break other handlers. + */ + @Test + fun `should process concurrent requests without blocking`() = runTest { + val (clientTransport, serverTransport) = InMemoryTransport.createLinkedPair() + + val server = TestProtocol() + val client = TestProtocol() + + listOf( + launch { server.connect(serverTransport) }, + launch { client.connect(clientTransport) }, + ).joinAll() + + // Register a handler on the server that takes time + server.setRequestHandler(Method.Custom("test/slow")) { _, _ -> + EmptyResult() + } + + // Register a handler on the client that responds quickly + client.setRequestHandler(Method.Custom("test/fast")) { _, _ -> + EmptyResult() + } + + // Send a slow request from client to server + val slowRequest = async { + client.request( + request = CustomRequest(method = Method.Custom("test/slow"), params = null), + ) + } + + // Also send a fast request from server to client + // The fast request should complete even while the slow one is still running + val fastRequest = async { + server.request( + request = CustomRequest(method = Method.Custom("test/fast"), params = null), + ) + } + + // Both should complete successfully + fastRequest.await() shouldBe EmptyResult() + slowRequest.await() shouldBe EmptyResult() + } +} + +private class TestProtocol : Protocol(null) { + override fun assertCapabilityForMethod(method: Method) {} + override fun assertNotificationCapability(method: Method) {} + override fun assertRequestHandlerCapability(method: Method) {} +} \ No newline at end of file diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index 8f86c035f..0f1190268 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -30,8 +30,14 @@ import kotlinx.atomicfu.update import kotlinx.collections.immutable.PersistentMap import kotlinx.collections.immutable.persistentMapOf import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.JsonPrimitive @@ -64,6 +70,7 @@ public typealias ProgressCallback = (Progress) -> Unit public open class ProtocolOptions( public var enforceStrictCapabilities: Boolean = false, public var timeout: Duration = DEFAULT_REQUEST_TIMEOUT, + public var dispatcher: CoroutineDispatcher = Dispatchers.Default, ) /** @@ -148,6 +155,13 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio public var transport: Transport? = null private set + /** + * Coroutine scope for dispatching incoming messages concurrently. + * Created in [connect] and cancelled in [doClose]. + * Uses [SupervisorJob] so that a failure in one message handler does not cancel the scope. + */ + private var messageScope: CoroutineScope? = null + private val _requestHandlers: AtomicRef RequestResult?>> = atomic(persistentMapOf()) @@ -230,6 +244,8 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio */ public open suspend fun connect(transport: Transport) { this.transport = transport + this.messageScope = CoroutineScope(SupervisorJob() + (options?.dispatcher ?: Dispatchers.Default)) + transport.onClose { doClose() } @@ -239,12 +255,14 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio } transport.onMessage { message -> - when (message) { - is JSONRPCResponse -> onResponse(message, null) - is JSONRPCRequest -> onRequest(message) - is JSONRPCNotification -> onNotification(message) - is JSONRPCError -> onResponse(null, message) - is JSONRPCEmptyMessage -> Unit + messageScope?.launch { + when (message) { + is JSONRPCResponse -> onResponse(message, null) + is JSONRPCRequest -> onRequest(message) + is JSONRPCNotification -> onNotification(message) + is JSONRPCError -> onResponse(null, message) + is JSONRPCEmptyMessage -> Unit + } } } @@ -253,6 +271,9 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio } private fun doClose() { + messageScope?.cancel() + messageScope = null + val handlersToNotify = _responseHandlers.value.values.toList() _responseHandlers.getAndSet(persistentMapOf()) _progressHandlers.getAndSet(persistentMapOf()) diff --git a/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt b/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt index b726c2feb..b34382224 100644 --- a/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt +++ b/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt @@ -1,6 +1,5 @@ package io.modelcontextprotocol.kotlin.sdk.shared -import io.kotest.assertions.throwables.shouldThrow import io.kotest.matchers.collections.shouldContainExactly import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.nulls.shouldNotBeNull @@ -13,9 +12,15 @@ import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse import io.modelcontextprotocol.kotlin.sdk.types.McpJson import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification +import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotificationParams import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceRequest import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceRequestParams +import io.modelcontextprotocol.kotlin.sdk.types.RequestId import io.modelcontextprotocol.kotlin.sdk.types.RequestMeta +import io.modelcontextprotocol.kotlin.sdk.types.toJSON +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.runTest @@ -37,7 +42,7 @@ class ProtocolTest { @BeforeTest fun setUp() { - protocol = TestProtocol() + protocol = TestProtocol(ProtocolOptions(dispatcher = Dispatchers.Unconfined)) transport = RecordingTransport() } @@ -131,17 +136,22 @@ class ProtocolTest { } @Test - fun `should propagate CancellationException from notification handler without calling onError`() = runTest { + fun `should not report CancellationException from notification handler to onError`() = runTest { + // With concurrent message dispatch, CancellationException in a handler + // cancels the launched coroutine but does not propagate to the caller + // and does not trigger onError, consistent with SupervisorJob semantics. protocol.connect(transport) protocol.fallbackNotificationHandler = { throw CancellationException("test cancellation") } - shouldThrow { - transport.deliver(JSONRPCNotification(method = "test/notification")) - } + // CancellationException is caught by the coroutine machinery and + // cancels the individual launched coroutine; deliver() returns normally. + transport.deliver(JSONRPCNotification(method = "test/notification")) + // With Unconfined dispatcher, the handler executes immediately + // onError should NOT be called for CancellationException protocol.errors shouldHaveSize 0 } @@ -153,9 +163,10 @@ class ProtocolTest { throw IllegalStateException("handler failed") } - // Non-CE exceptions are caught and reported, not propagated + // Non-CE exceptions are caught and reported via onError transport.deliver(JSONRPCNotification(method = "test/notification")) + // With Unconfined dispatcher, the handler executes immediately protocol.errors shouldHaveSize 1 protocol.errors[0].message shouldBe "handler failed" } @@ -185,9 +196,162 @@ class ProtocolTest { transport.deliver(JSONRPCResponse(sent.id, EmptyResult())) inFlight.await() } + + @Test + fun `should process response while request handler is suspended`() = runTest { + // Core deadlock test: when a request handler is running (or suspended), + // an incoming response must still be processable. Without concurrent + // dispatch, the response would be stuck behind the running handler. + protocol.connect(transport) + + // Register a request handler that blocks until a latch is released. + // This simulates a handler that suspends (e.g., calling session.request()). + val handlerCanFinish = CompletableDeferred() + val handlerStarted = CompletableDeferred() + protocol.setRequestHandler(Method.Custom("test/slow-handler")) { _, _ -> + handlerStarted.complete(Unit) + handlerCanFinish.await() // Suspend until we allow it + EmptyResult() + } + + // Start an outgoing request from the protocol. + // This suspends waiting for a response with a specific request ID. + val inFlight = async { + protocol.request( + request = CustomRequest(method = Method.Custom("test/outgoing"), params = null), + ) + } + + // Get the outgoing request that the protocol sent + val outgoingRequest = transport.awaitRequest() + + // Deliver an incoming request to the protocol. + // This triggers the slow handler in a separate coroutine. + transport.deliver(JSONRPCRequest(method = Method.Custom("test/slow-handler").value, id = 999)) + + // Wait for the handler to start + handlerStarted.await() + + // While the slow handler is still suspended, deliver the response for our + // original outgoing request. This must be processable — if dispatch + // were serial, this response would be blocked behind the slow handler. + transport.deliver(JSONRPCResponse(outgoingRequest.id, EmptyResult())) + + // The outgoing request should complete even though the slow handler + // is still running. This proves concurrent dispatch works. + inFlight.await() + + // The slow handler has not finished yet (we haven't released it) + handlerCanFinish.complete(Unit) + } + + @Test + fun `should process notifications concurrently with request handling`() = runTest { + // Verify that a notification arriving while a request handler is running + // gets processed immediately (not blocked behind the request handler). + protocol.connect(transport) + + var notificationReceived = false + val handlerStarted = CompletableDeferred() + val handlerCanFinish = CompletableDeferred() + + // Register a request handler that suspends until we release it + protocol.setRequestHandler(Method.Custom("test/slow-request")) { _, _ -> + handlerStarted.complete(Unit) + handlerCanFinish.await() // Suspend until we allow it + EmptyResult() + } + + protocol.fallbackNotificationHandler = { + notificationReceived = true + } + + // Deliver a request (handler will suspend) + transport.deliver(JSONRPCRequest(method = Method.Custom("test/slow-request").value, id = 1)) + handlerStarted.await() + + // While the handler is suspended, deliver a notification + // It should be processed immediately (concurrent dispatch) + transport.deliver(JSONRPCNotification(method = "test/notification")) + + // Give the notification coroutine a chance to execute + // With Unconfined dispatcher, it should have already executed + notificationReceived shouldBe true + + // Clean up + handlerCanFinish.complete(Unit) + } + + @Test + fun `should not cancel scope when single message handler throws`() = runTest { + // SupervisorJob ensures one handler failure does not cancel the scope. + // After a handler throws, subsequent messages must still be processed. + protocol.connect(transport) + + var secondMessageProcessed = false + val secondMessageReceived = CompletableDeferred() + + protocol.fallbackNotificationHandler = { + throw IllegalStateException("handler failed") + } + + // Register a second handler to verify scope is still alive + protocol.setNotificationHandler( + Method.Defined.NotificationsProgress, + ) { + secondMessageProcessed = true + secondMessageReceived.complete(Unit) + CompletableDeferred(Unit) + } + + // First message: handler throws + transport.deliver(JSONRPCNotification(method = "test/notification")) + + // Second message: should still be processed + transport.deliver( + ProgressNotification( + ProgressNotificationParams( + progressToken = RequestId.NumberId(1), + progress = 1.0, + ), + ).toJSON(), + ) + + // Wait for second message to be processed + secondMessageReceived.await() + + secondMessageProcessed shouldBe true + // First handler's exception should have been reported via onError + protocol.errors.shouldHaveSize(1) + protocol.errors[0].message shouldBe "handler failed" + } + + @Test + fun `should close message scope on transport close`() = runTest { + // After doClose(), the messageScope is cancelled and transport is cleared. + protocol.connect(transport) + + var messageProcessed = false + val messageReceived = CompletableDeferred() + protocol.fallbackNotificationHandler = { + messageProcessed = true + messageReceived.complete(Unit) + } + + // Deliver a message before close + transport.deliver(JSONRPCNotification(method = "test/notification")) + messageReceived.await() + messageProcessed shouldBe true + + // Close the transport + transport.close() + + // After close, protocol's transport should be null (cleared by doClose) + protocol.transport shouldBe null + } } -private class TestProtocol : Protocol(null) { +private class TestProtocol(options: ProtocolOptions? = null) : Protocol(options) { val errors = mutableListOf() override fun onError(error: Throwable) { From a4a00d70da5f475c42bcd17a7aa86a1c81dcdd87 Mon Sep 17 00:00:00 2001 From: Lobsterdog Contributors Date: Wed, 13 May 2026 11:41:33 -0600 Subject: [PATCH 2/7] ko-5so7v: add KDoc for ProtocolOptions.dispatcher and document concurrent message dispatch in Module.md --- kotlin-sdk-core/Module.md | 6 +++++- .../io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/kotlin-sdk-core/Module.md b/kotlin-sdk-core/Module.md index b4ed71b52..72e654c76 100644 --- a/kotlin-sdk-core/Module.md +++ b/kotlin-sdk-core/Module.md @@ -16,7 +16,11 @@ designed for Kotlin Multiplatform with explicit API mode enabled. handling. `WebSocketMcpTransport` adds a shared WebSocket implementation for both client and server sides, and `ReadBuffer` handles streaming JSON-RPC framing. - **Protocol engine**: The `Protocol` base class manages request/response correlation, notifications, progress tokens, - and capability assertions. Higher-level modules extend it to become `Client` and `Server`. + and capability assertions. Incoming messages are dispatched concurrently — each message is launched in a + `CoroutineScope(SupervisorJob)` so that a slow or suspended handler does not block subsequent messages. This prevents + deadlock when a request handler sends its own request (e.g., `roots/list`) before responding. The dispatcher can be + configured via `ProtocolOptions.dispatcher`; defaults to `Dispatchers.Default`. Higher-level modules extend `Protocol` + to become `Client` and `Server`. - **Errors and safety**: Common exception types (`McpException`, parsing errors) plus capability enforcement hooks ensure callers cannot use endpoints the peer does not advertise. diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index 0f1190268..b81fe4f0c 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -66,6 +66,10 @@ public typealias ProgressCallback = (Progress) -> Unit * Currently defaults to `false` for backwards compatibility with SDK versions that did not advertise * capabilities correctly; in the future, this will default to `true`. * @property timeout default timeout for outgoing requests + * @property dispatcher the [CoroutineDispatcher] used to dispatch incoming messages concurrently. + * Each incoming message is launched in a [CoroutineScope][kotlinx.coroutines.CoroutineScope] backed + * by this dispatcher, so that handlers can run in parallel without blocking subsequent messages. + * Defaults to [Dispatchers.Default]. Use [Dispatchers.Unconfined] for deterministic test execution. */ public open class ProtocolOptions( public var enforceStrictCapabilities: Boolean = false, From 70d0f250a1634efb7fe762dc549c4b6edb562557 Mon Sep 17 00:00:00 2001 From: Lobsterdog Contributors Date: Wed, 13 May 2026 19:27:40 -0600 Subject: [PATCH 3/7] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20opt-in=20concurrentMessageHandling,=20inline=20resp?= =?UTF-8?q?onses,=20remove=20dispatcher=20param?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace always-on dispatcher param with concurrentMessageHandling mutable property on ProtocolOptions (backward compatible, preserves binary API) - Handle responses inline (not launched) since they never need concurrency - Launch requests/notifications in handlerScope only when concurrentMessageHandling=true; fall back to synchronous handling - Revert CancellationException test to synchronous propagation (default=false) - Remove unit tests that require concurrent dispatch (they tested dispatcher behavior, not the opt-in flag) - Add ChannelTransport-based integration test using real Client/Server that verifies concurrent tool calls are actually parallel - Remove ProtocolOptions.dispatcher from Module.md docs - Fix SSE/StreamableHTTP test failures caused by always-on concurrent dispatch --- .../kotlin/sdk/integration/ConcurrencyTest.kt | 159 ++++++++++++++++ .../shared/ConcurrentMessageHandlingTest.kt | 122 ------------ kotlin-sdk-core/Module.md | 10 +- kotlin-sdk-core/api/kotlin-sdk-core.api | 2 + .../kotlin/sdk/shared/Protocol.kt | 80 ++++++-- .../kotlin/sdk/shared/ProtocolTest.kt | 180 +----------------- 6 files changed, 233 insertions(+), 320 deletions(-) create mode 100644 integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt delete mode 100644 integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt diff --git a/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt b/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt new file mode 100644 index 000000000..80c14a877 --- /dev/null +++ b/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt @@ -0,0 +1,159 @@ +package io.modelcontextprotocol.kotlin.sdk.integration + +import io.modelcontextprotocol.kotlin.sdk.client.Client +import io.modelcontextprotocol.kotlin.sdk.client.ClientOptions +import io.modelcontextprotocol.kotlin.sdk.server.Server +import io.modelcontextprotocol.kotlin.sdk.server.ServerOptions +import io.modelcontextprotocol.kotlin.sdk.server.ServerSession +import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport +import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions +import io.modelcontextprotocol.kotlin.sdk.types.CallToolResult +import io.modelcontextprotocol.kotlin.sdk.types.Implementation +import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage +import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities +import io.modelcontextprotocol.kotlin.sdk.types.TextContent +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.test.Test +import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.seconds + +/** + * Tests that the Protocol layer handles incoming messages concurrently, + * preventing deadlock when a request handler needs to wait for other messages. + * + * See: https://github.com/modelcontextprotocol/kotlin-sdk/issues/176 + */ +class ConcurrencyTest { + + /** + * A channel-based transport that delivers messages asynchronously via Kotlin Channels, + * simulating real network transports. This is necessary to reproduce the concurrency + * bug — the synchronous InMemoryTransport masks the issue. + */ + @OptIn(ExperimentalAtomicApi::class) + private class ChannelTransport( + private val scope: CoroutineScope, + private val sendChannel: Channel, + private val receiveChannel: Channel, + ) : AbstractTransport() { + override suspend fun start() { + scope.launch { + for (message in receiveChannel) { + _onMessage.invoke(message) + } + } + } + + override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) { + sendChannel.send(message) + } + + override suspend fun close() { + sendChannel.close() + receiveChannel.cancel() + invokeOnCloseCallback() + } + + companion object { + fun createLinkedPair(scope: CoroutineScope): Pair { + val clientToServer = Channel(Channel.UNLIMITED) + val serverToClient = Channel(Channel.UNLIMITED) + return Pair( + ChannelTransport(scope, serverToClient, clientToServer), + ChannelTransport(scope, clientToServer, serverToClient), + ) + } + } + } + + /** + * Verifies that concurrent tool calls are handled concurrently, not serially. + * Uses real Dispatchers instead of runTest's virtual time to allow actual concurrent execution. + */ + @OptIn(ExperimentalAtomicApi::class) + @Test + fun `server handles concurrent requests concurrently`() = runBlocking { + val slowToolDelay = 500L + + val serverOptions = ServerOptions( + capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(null)), + ) + serverOptions.concurrentMessageHandling = true + + val server = Server( + serverInfo = Implementation("test-server", "1.0"), + options = serverOptions, + ) + + server.addTool("slow_tool", "A tool that takes a while") { + delay(slowToolDelay) + CallToolResult(content = listOf(TextContent("slow_tool_done"))) + } + server.addTool("fast_tool", "A tool that is quick") { + CallToolResult(content = listOf(TextContent("fast_tool_done"))) + } + + val client = Client( + clientInfo = Implementation("test-client", "1.0"), + options = ClientOptions(), + ) + + val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + val (clientTransport, serverTransport) = ChannelTransport.createLinkedPair(scope) + val serverSessionResult = CompletableDeferred() + + try { + listOf( + launch { client.connect(clientTransport) }, + launch { serverSessionResult.complete(server.createSession(serverTransport)) }, + ).joinAll() + + val startTime = System.currentTimeMillis() + + val slowResult = CompletableDeferred() + val fastResult = CompletableDeferred() + + launch { + slowResult.complete(client.callTool("slow_tool", mapOf())) + } + + delay(50) + + launch { + fastResult.complete(client.callTool("fast_tool", mapOf())) + } + + val slow = withTimeout(5.seconds) { slowResult.await() } + val fast = withTimeout(5.seconds) { fastResult.await() } + + assertNotNull(slow) + assertNotNull(fast) + + val elapsed = System.currentTimeMillis() - startTime + // If concurrent: elapsed ≈ slowToolDelay + overhead + // If serial: elapsed ≈ slowToolDelay * 2 + if (elapsed >= slowToolDelay * 2L) { + throw AssertionError( + "Fast tool was blocked by slow tool. Total duration ${elapsed}ms " + + "should be < ${slowToolDelay * 2}ms.", + ) + } + } finally { + // Clean up: close transports and cancel scope to prevent coroutine leaks + clientTransport.close() + serverTransport.close() + scope.cancel() + } + } +} diff --git a/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt b/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt deleted file mode 100644 index 63479073b..000000000 --- a/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ConcurrentMessageHandlingTest.kt +++ /dev/null @@ -1,122 +0,0 @@ -package io.modelcontextprotocol.kotlin.sdk.shared - -import io.kotest.matchers.shouldBe -import io.modelcontextprotocol.kotlin.sdk.types.CustomRequest -import io.modelcontextprotocol.kotlin.sdk.types.EmptyResult -import io.modelcontextprotocol.kotlin.sdk.types.Method -import kotlinx.coroutines.async -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runTest -import kotlin.test.Test - -/** - * Integration test verifying that concurrent message dispatch prevents deadlock - * when a request handler sends its own request before responding. - * - * Uses InMemoryTransport.createLinkedPair() to set up two Protocol instances - * connected to each other, simulating a real client-server scenario. - */ -class ConcurrentMessageHandlingTest { - - /** - * End-to-end deadlock test: a "server-side" Protocol receives a request, - * and its handler calls request() on the "client-side" Protocol before - * responding. Without concurrent dispatch, the response from the client - * would be blocked behind the running server handler. - */ - @Test - fun `should not deadlock when server request handler sends request to client`() = runTest { - val (clientTransport, serverTransport) = InMemoryTransport.createLinkedPair() - - val server = TestProtocol() - val client = TestProtocol() - - listOf( - launch { server.connect(serverTransport) }, - launch { client.connect(clientTransport) }, - ).joinAll() - - // Register a handler on the client that responds to "test/client-method" - client.setRequestHandler(Method.Custom("test/client-method")) { _, _ -> - EmptyResult() - } - - // Register a handler on the server that, when receiving a request, - // sends its own request back to the client before responding. - // This is the deadlock scenario: the server handler calls request(), - // which suspends waiting for a response from the client. The client's - // response can only arrive through the same message loop. Without - // concurrent dispatch, this would deadlock. - server.setRequestHandler(Method.Custom("test/server-method")) { _, _ -> - // Send a request to the client before responding. - // This suspends until the client responds. - server.request( - request = CustomRequest(method = Method.Custom("test/client-method"), params = null), - ) - EmptyResult() - } - - // Send a request from the client to the server, triggering the handler - // that calls request() back to the client. - val result = client.request( - request = CustomRequest(method = Method.Custom("test/server-method"), params = null), - ) - - // If we reach here without timeout, the deadlock is fixed. - result shouldBe EmptyResult() - } - - /** - * Test that responses are processed concurrently with other responses, - * ensuring the message scope uses SupervisorJob so one handler failure - * doesn't break other handlers. - */ - @Test - fun `should process concurrent requests without blocking`() = runTest { - val (clientTransport, serverTransport) = InMemoryTransport.createLinkedPair() - - val server = TestProtocol() - val client = TestProtocol() - - listOf( - launch { server.connect(serverTransport) }, - launch { client.connect(clientTransport) }, - ).joinAll() - - // Register a handler on the server that takes time - server.setRequestHandler(Method.Custom("test/slow")) { _, _ -> - EmptyResult() - } - - // Register a handler on the client that responds quickly - client.setRequestHandler(Method.Custom("test/fast")) { _, _ -> - EmptyResult() - } - - // Send a slow request from client to server - val slowRequest = async { - client.request( - request = CustomRequest(method = Method.Custom("test/slow"), params = null), - ) - } - - // Also send a fast request from server to client - // The fast request should complete even while the slow one is still running - val fastRequest = async { - server.request( - request = CustomRequest(method = Method.Custom("test/fast"), params = null), - ) - } - - // Both should complete successfully - fastRequest.await() shouldBe EmptyResult() - slowRequest.await() shouldBe EmptyResult() - } -} - -private class TestProtocol : Protocol(null) { - override fun assertCapabilityForMethod(method: Method) {} - override fun assertNotificationCapability(method: Method) {} - override fun assertRequestHandlerCapability(method: Method) {} -} \ No newline at end of file diff --git a/kotlin-sdk-core/Module.md b/kotlin-sdk-core/Module.md index 72e654c76..2bfd69efd 100644 --- a/kotlin-sdk-core/Module.md +++ b/kotlin-sdk-core/Module.md @@ -16,11 +16,11 @@ designed for Kotlin Multiplatform with explicit API mode enabled. handling. `WebSocketMcpTransport` adds a shared WebSocket implementation for both client and server sides, and `ReadBuffer` handles streaming JSON-RPC framing. - **Protocol engine**: The `Protocol` base class manages request/response correlation, notifications, progress tokens, - and capability assertions. Incoming messages are dispatched concurrently — each message is launched in a - `CoroutineScope(SupervisorJob)` so that a slow or suspended handler does not block subsequent messages. This prevents - deadlock when a request handler sends its own request (e.g., `roots/list`) before responding. The dispatcher can be - configured via `ProtocolOptions.dispatcher`; defaults to `Dispatchers.Default`. Higher-level modules extend `Protocol` - to become `Client` and `Server`. + and capability assertions. When `concurrentMessageHandling` is enabled on `ProtocolOptions`, incoming requests and + notifications are dispatched concurrently in separate coroutines backed by a `SupervisorJob`, preventing deadlock + when a request handler sends its own request (e.g., `roots/list`) before responding. Defaults to false for backward + compatibility; set to true for transports with independent receive loops (Stdio, WebSocket, Channel). + Higher-level modules extend `Protocol` to become `Client` and `Server`. - **Errors and safety**: Common exception types (`McpException`, parsing errors) plus capability enforcement hooks ensure callers cannot use endpoints the peer does not advertise. diff --git a/kotlin-sdk-core/api/kotlin-sdk-core.api b/kotlin-sdk-core/api/kotlin-sdk-core.api index c843aea06..e6b4f9b64 100644 --- a/kotlin-sdk-core/api/kotlin-sdk-core.api +++ b/kotlin-sdk-core/api/kotlin-sdk-core.api @@ -90,8 +90,10 @@ public final class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolKt { public class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions { public synthetic fun (ZJILkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun (ZJLkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun getConcurrentMessageHandling ()Z public final fun getEnforceStrictCapabilities ()Z public final fun getTimeout-UwyO8pc ()J + public final fun setConcurrentMessageHandling (Z)V public final fun setEnforceStrictCapabilities (Z)V public final fun setTimeout-LRDsOJo (J)V } diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index b81fe4f0c..92e3783dd 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -30,7 +30,7 @@ import kotlinx.atomicfu.update import kotlinx.collections.immutable.PersistentMap import kotlinx.collections.immutable.persistentMapOf import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers @@ -66,16 +66,21 @@ public typealias ProgressCallback = (Progress) -> Unit * Currently defaults to `false` for backwards compatibility with SDK versions that did not advertise * capabilities correctly; in the future, this will default to `true`. * @property timeout default timeout for outgoing requests - * @property dispatcher the [CoroutineDispatcher] used to dispatch incoming messages concurrently. - * Each incoming message is launched in a [CoroutineScope][kotlinx.coroutines.CoroutineScope] backed - * by this dispatcher, so that handlers can run in parallel without blocking subsequent messages. - * Defaults to [Dispatchers.Default]. Use [Dispatchers.Unconfined] for deterministic test execution. */ public open class ProtocolOptions( public var enforceStrictCapabilities: Boolean = false, public var timeout: Duration = DEFAULT_REQUEST_TIMEOUT, - public var dispatcher: CoroutineDispatcher = Dispatchers.Default, -) +) { + /** + * When true, incoming requests and notifications are handled concurrently + * in separate coroutines, allowing the message receive loop to continue + * processing other messages. This prevents deadlock when a handler sends + * its own request to the peer. Defaults to false for backward compatibility; + * set to true for transports with independent receive loops (Stdio, WebSocket, + * Channel) where a blocking handler would otherwise stall message processing. + */ + public var concurrentMessageHandling: Boolean = false +} /** * The default request timeout. @@ -160,11 +165,11 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio private set /** - * Coroutine scope for dispatching incoming messages concurrently. - * Created in [connect] and cancelled in [doClose]. - * Uses [SupervisorJob] so that a failure in one message handler does not cancel the scope. + * Scope for launching concurrent request and notification handlers. + * Created on [connect] and cancelled on [doClose]. + * Using [SupervisorJob] so a failing handler doesn't cancel sibling handlers. */ - private var messageScope: CoroutineScope? = null + private var handlerScope: CoroutineScope? = null private val _requestHandlers: AtomicRef RequestResult?>> = @@ -245,10 +250,19 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio * Attaches to the given transport, starts it, and starts listening for messages. * * The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward. + * + * @property concurrentMessageHandling when true, incoming requests and notifications are handled + * concurrently in separate coroutines, allowing the message receive loop to continue processing + * other messages (including responses to outgoing requests). This prevents deadlock when a request + * handler sends its own request to the peer and awaits the response. Defaults to false for backward + * compatibility; set to true for transports with independent receive loops (Stdio, WebSocket, + * Channel) where a blocking handler would otherwise stall message processing. */ public open suspend fun connect(transport: Transport) { this.transport = transport - this.messageScope = CoroutineScope(SupervisorJob() + (options?.dispatcher ?: Dispatchers.Default)) + if (options?.concurrentMessageHandling == true) { + handlerScope = CoroutineScope(SupervisorJob() + kotlinx.coroutines.Dispatchers.Default) + } transport.onClose { doClose() @@ -259,14 +273,38 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio } transport.onMessage { message -> - messageScope?.launch { - when (message) { - is JSONRPCResponse -> onResponse(message, null) - is JSONRPCRequest -> onRequest(message) - is JSONRPCNotification -> onNotification(message) - is JSONRPCError -> onResponse(null, message) - is JSONRPCEmptyMessage -> Unit + when (message) { + is JSONRPCResponse -> onResponse(message, null) + + is JSONRPCError -> onResponse(null, message) + + is JSONRPCRequest -> { + val scope = handlerScope + if (scope != null) { + // Concurrent handling: launch in a separate coroutine so the message + // receive loop is not blocked while the handler runs. + scope.launch(CoroutineName("MCP-Request-${message.id}")) { + onRequest(message) + } + } else { + // Synchronous handling: for transports that need responses sent within + // the same context (e.g., HTTP transports responding directly). + onRequest(message) + } + } + + is JSONRPCNotification -> { + val scope = handlerScope + if (scope != null) { + scope.launch(CoroutineName("MCP-Notification-${message.method}")) { + onNotification(message) + } + } else { + onNotification(message) + } } + + is JSONRPCEmptyMessage -> Unit } } @@ -275,8 +313,8 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio } private fun doClose() { - messageScope?.cancel() - messageScope = null + handlerScope?.cancel() + handlerScope = null val handlersToNotify = _responseHandlers.value.values.toList() _responseHandlers.getAndSet(persistentMapOf()) diff --git a/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt b/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt index b34382224..b726c2feb 100644 --- a/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt +++ b/kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt @@ -1,5 +1,6 @@ package io.modelcontextprotocol.kotlin.sdk.shared +import io.kotest.assertions.throwables.shouldThrow import io.kotest.matchers.collections.shouldContainExactly import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.nulls.shouldNotBeNull @@ -12,15 +13,9 @@ import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse import io.modelcontextprotocol.kotlin.sdk.types.McpJson import io.modelcontextprotocol.kotlin.sdk.types.Method -import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification -import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotificationParams import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceRequest import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceRequestParams -import io.modelcontextprotocol.kotlin.sdk.types.RequestId import io.modelcontextprotocol.kotlin.sdk.types.RequestMeta -import io.modelcontextprotocol.kotlin.sdk.types.toJSON -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.runTest @@ -42,7 +37,7 @@ class ProtocolTest { @BeforeTest fun setUp() { - protocol = TestProtocol(ProtocolOptions(dispatcher = Dispatchers.Unconfined)) + protocol = TestProtocol() transport = RecordingTransport() } @@ -136,22 +131,17 @@ class ProtocolTest { } @Test - fun `should not report CancellationException from notification handler to onError`() = runTest { - // With concurrent message dispatch, CancellationException in a handler - // cancels the launched coroutine but does not propagate to the caller - // and does not trigger onError, consistent with SupervisorJob semantics. + fun `should propagate CancellationException from notification handler without calling onError`() = runTest { protocol.connect(transport) protocol.fallbackNotificationHandler = { throw CancellationException("test cancellation") } - // CancellationException is caught by the coroutine machinery and - // cancels the individual launched coroutine; deliver() returns normally. - transport.deliver(JSONRPCNotification(method = "test/notification")) + shouldThrow { + transport.deliver(JSONRPCNotification(method = "test/notification")) + } - // With Unconfined dispatcher, the handler executes immediately - // onError should NOT be called for CancellationException protocol.errors shouldHaveSize 0 } @@ -163,10 +153,9 @@ class ProtocolTest { throw IllegalStateException("handler failed") } - // Non-CE exceptions are caught and reported via onError + // Non-CE exceptions are caught and reported, not propagated transport.deliver(JSONRPCNotification(method = "test/notification")) - // With Unconfined dispatcher, the handler executes immediately protocol.errors shouldHaveSize 1 protocol.errors[0].message shouldBe "handler failed" } @@ -196,162 +185,9 @@ class ProtocolTest { transport.deliver(JSONRPCResponse(sent.id, EmptyResult())) inFlight.await() } - - @Test - fun `should process response while request handler is suspended`() = runTest { - // Core deadlock test: when a request handler is running (or suspended), - // an incoming response must still be processable. Without concurrent - // dispatch, the response would be stuck behind the running handler. - protocol.connect(transport) - - // Register a request handler that blocks until a latch is released. - // This simulates a handler that suspends (e.g., calling session.request()). - val handlerCanFinish = CompletableDeferred() - val handlerStarted = CompletableDeferred() - protocol.setRequestHandler(Method.Custom("test/slow-handler")) { _, _ -> - handlerStarted.complete(Unit) - handlerCanFinish.await() // Suspend until we allow it - EmptyResult() - } - - // Start an outgoing request from the protocol. - // This suspends waiting for a response with a specific request ID. - val inFlight = async { - protocol.request( - request = CustomRequest(method = Method.Custom("test/outgoing"), params = null), - ) - } - - // Get the outgoing request that the protocol sent - val outgoingRequest = transport.awaitRequest() - - // Deliver an incoming request to the protocol. - // This triggers the slow handler in a separate coroutine. - transport.deliver(JSONRPCRequest(method = Method.Custom("test/slow-handler").value, id = 999)) - - // Wait for the handler to start - handlerStarted.await() - - // While the slow handler is still suspended, deliver the response for our - // original outgoing request. This must be processable — if dispatch - // were serial, this response would be blocked behind the slow handler. - transport.deliver(JSONRPCResponse(outgoingRequest.id, EmptyResult())) - - // The outgoing request should complete even though the slow handler - // is still running. This proves concurrent dispatch works. - inFlight.await() - - // The slow handler has not finished yet (we haven't released it) - handlerCanFinish.complete(Unit) - } - - @Test - fun `should process notifications concurrently with request handling`() = runTest { - // Verify that a notification arriving while a request handler is running - // gets processed immediately (not blocked behind the request handler). - protocol.connect(transport) - - var notificationReceived = false - val handlerStarted = CompletableDeferred() - val handlerCanFinish = CompletableDeferred() - - // Register a request handler that suspends until we release it - protocol.setRequestHandler(Method.Custom("test/slow-request")) { _, _ -> - handlerStarted.complete(Unit) - handlerCanFinish.await() // Suspend until we allow it - EmptyResult() - } - - protocol.fallbackNotificationHandler = { - notificationReceived = true - } - - // Deliver a request (handler will suspend) - transport.deliver(JSONRPCRequest(method = Method.Custom("test/slow-request").value, id = 1)) - handlerStarted.await() - - // While the handler is suspended, deliver a notification - // It should be processed immediately (concurrent dispatch) - transport.deliver(JSONRPCNotification(method = "test/notification")) - - // Give the notification coroutine a chance to execute - // With Unconfined dispatcher, it should have already executed - notificationReceived shouldBe true - - // Clean up - handlerCanFinish.complete(Unit) - } - - @Test - fun `should not cancel scope when single message handler throws`() = runTest { - // SupervisorJob ensures one handler failure does not cancel the scope. - // After a handler throws, subsequent messages must still be processed. - protocol.connect(transport) - - var secondMessageProcessed = false - val secondMessageReceived = CompletableDeferred() - - protocol.fallbackNotificationHandler = { - throw IllegalStateException("handler failed") - } - - // Register a second handler to verify scope is still alive - protocol.setNotificationHandler( - Method.Defined.NotificationsProgress, - ) { - secondMessageProcessed = true - secondMessageReceived.complete(Unit) - CompletableDeferred(Unit) - } - - // First message: handler throws - transport.deliver(JSONRPCNotification(method = "test/notification")) - - // Second message: should still be processed - transport.deliver( - ProgressNotification( - ProgressNotificationParams( - progressToken = RequestId.NumberId(1), - progress = 1.0, - ), - ).toJSON(), - ) - - // Wait for second message to be processed - secondMessageReceived.await() - - secondMessageProcessed shouldBe true - // First handler's exception should have been reported via onError - protocol.errors.shouldHaveSize(1) - protocol.errors[0].message shouldBe "handler failed" - } - - @Test - fun `should close message scope on transport close`() = runTest { - // After doClose(), the messageScope is cancelled and transport is cleared. - protocol.connect(transport) - - var messageProcessed = false - val messageReceived = CompletableDeferred() - protocol.fallbackNotificationHandler = { - messageProcessed = true - messageReceived.complete(Unit) - } - - // Deliver a message before close - transport.deliver(JSONRPCNotification(method = "test/notification")) - messageReceived.await() - messageProcessed shouldBe true - - // Close the transport - transport.close() - - // After close, protocol's transport should be null (cleared by doClose) - protocol.transport shouldBe null - } } -private class TestProtocol(options: ProtocolOptions? = null) : Protocol(options) { +private class TestProtocol : Protocol(null) { val errors = mutableListOf() override fun onError(error: Throwable) { From abddfb0c9b7cd08c731f8117be8367f471049d39 Mon Sep 17 00:00:00 2001 From: Lobsterdog Contributors Date: Wed, 13 May 2026 19:38:34 -0600 Subject: [PATCH 4/7] =?UTF-8?q?fix:=20address=20Copilot=20review=20?= =?UTF-8?q?=E2=80=94=20use=20TimeSource.Monotonic,=20remove=20unused=20Dis?= =?UTF-8?q?patchers=20import,=20improve=20concurrency=20assertion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace System.currentTimeMillis() with TimeSource.Monotonic.markNow() for multiplatform compatibility (copilot review) - Remove unused Dispatchers import from Protocol.kt (copilot review) - Add fastToolDelay so both tools have delays, making serial vs concurrent timing distinguishable (copilot review) - Use Duration type for delay thresholds instead of Long --- .../kotlin/sdk/integration/ConcurrencyTest.kt | 28 ++++++++++++------- .../kotlin/sdk/shared/Protocol.kt | 1 - 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt b/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt index 80c14a877..810b86613 100644 --- a/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt +++ b/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt @@ -26,7 +26,9 @@ import kotlinx.coroutines.withTimeout import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.test.Test import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds +import kotlin.time.TimeSource /** * Tests that the Protocol layer handles incoming messages concurrently, @@ -80,11 +82,15 @@ class ConcurrencyTest { /** * Verifies that concurrent tool calls are handled concurrently, not serially. * Uses real Dispatchers instead of runTest's virtual time to allow actual concurrent execution. + * + * Both tools have delays — if handling were serial, total time would be the sum of both + * delays. With concurrent handling, total time is roughly max(slow, fast) + overhead. */ @OptIn(ExperimentalAtomicApi::class) @Test fun `server handles concurrent requests concurrently`() = runBlocking { - val slowToolDelay = 500L + val slowToolDelay = 500.milliseconds + val fastToolDelay = 50.milliseconds val serverOptions = ServerOptions( capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(null)), @@ -97,10 +103,11 @@ class ConcurrencyTest { ) server.addTool("slow_tool", "A tool that takes a while") { - delay(slowToolDelay) + delay(slowToolDelay.inWholeMilliseconds) CallToolResult(content = listOf(TextContent("slow_tool_done"))) } - server.addTool("fast_tool", "A tool that is quick") { + server.addTool("fast_tool", "A tool that is quick but still has delay") { + delay(fastToolDelay.inWholeMilliseconds) CallToolResult(content = listOf(TextContent("fast_tool_done"))) } @@ -119,7 +126,7 @@ class ConcurrencyTest { launch { serverSessionResult.complete(server.createSession(serverTransport)) }, ).joinAll() - val startTime = System.currentTimeMillis() + val mark = TimeSource.Monotonic.markNow() val slowResult = CompletableDeferred() val fastResult = CompletableDeferred() @@ -140,13 +147,14 @@ class ConcurrencyTest { assertNotNull(slow) assertNotNull(fast) - val elapsed = System.currentTimeMillis() - startTime - // If concurrent: elapsed ≈ slowToolDelay + overhead - // If serial: elapsed ≈ slowToolDelay * 2 - if (elapsed >= slowToolDelay * 2L) { + val elapsed = mark.elapsedNow() + // If concurrent: elapsed ≈ max(slowToolDelay, fastToolDelay) + overhead + // If serial: elapsed ≈ slowToolDelay + fastToolDelay + val serialThreshold = slowToolDelay + fastToolDelay + if (elapsed >= serialThreshold) { throw AssertionError( - "Fast tool was blocked by slow tool. Total duration ${elapsed}ms " + - "should be < ${slowToolDelay * 2}ms.", + "Requests were handled serially. Total duration ${elapsed.inWholeMilliseconds}ms " + + "should be < ${serialThreshold.inWholeMilliseconds}ms.", ) } } finally { diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index 92e3783dd..d23accf6a 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -33,7 +33,6 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.cancel From 990061cc9a695efcceb3ea291e1716132bb3858c Mon Sep 17 00:00:00 2001 From: Lobsterdog Contributors Date: Wed, 13 May 2026 19:50:15 -0600 Subject: [PATCH 5/7] fix: move ConcurrencyTest to jvmTest and fix @property KDoc tag - Move ConcurrencyTest from commonTest to jvmTest since it uses runBlocking which is not available on JS/Wasm targets - Change @property concurrentMessageHandling KDoc tag to plain text in connect() documentation since it refers to ProtocolOptions, not a property of connect() Addresses remaining Copilot review comments on PR 764. --- .../kotlin/sdk/integration/ConcurrencyTest.kt | 0 .../io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename integration-test/src/{commonTest => jvmTest}/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt (100%) diff --git a/integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt b/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt similarity index 100% rename from integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt rename to integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index d23accf6a..e5d9a2c66 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -250,8 +250,8 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio * * The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward. * - * @property concurrentMessageHandling when true, incoming requests and notifications are handled - * concurrently in separate coroutines, allowing the message receive loop to continue processing + * When [ProtocolOptions.concurrentMessageHandling] is true, incoming requests and notifications + * are handled concurrently in separate coroutines, allowing the message receive loop to continue processing * other messages (including responses to outgoing requests). This prevents deadlock when a request * handler sends its own request to the peer and awaits the response. Defaults to false for backward * compatibility; set to true for transports with independent receive loops (Stdio, WebSocket, From 3f4c37363e3f9f4035afca12f4c51bd55269ad90 Mon Sep 17 00:00:00 2001 From: Lobsterdog Contributors Date: Wed, 13 May 2026 20:08:25 -0600 Subject: [PATCH 6/7] fix: make concurrency test deterministic and move to jvmTest Replace timing-based assertion with deterministic synchronization: the slow handler blocks on a CompletableDeferred latch, and the test asserts the fast handler completes while the slow one is still running. This removes wall-clock timing thresholds that caused flakiness on loaded CI runners. Also fixes JUnit discoverability: the test method now returns Unit (explicit trailing Unit) since assertNotNull(CallToolResult?) returns a value, causing JUnit to skip the method. --- .../kotlin/sdk/integration/ConcurrencyTest.kt | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt b/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt index 810b86613..feb78c9cd 100644 --- a/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt +++ b/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt @@ -18,7 +18,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -26,9 +25,8 @@ import kotlinx.coroutines.withTimeout import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.test.Test import kotlin.test.assertNotNull -import kotlin.time.Duration.Companion.milliseconds +import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds -import kotlin.time.TimeSource /** * Tests that the Protocol layer handles incoming messages concurrently, @@ -81,17 +79,14 @@ class ConcurrencyTest { /** * Verifies that concurrent tool calls are handled concurrently, not serially. - * Uses real Dispatchers instead of runTest's virtual time to allow actual concurrent execution. * - * Both tools have delays — if handling were serial, total time would be the sum of both - * delays. With concurrent handling, total time is roughly max(slow, fast) + overhead. + * Uses deterministic synchronization: the fast tool completes while the slow + * handler is still suspended, proving that handlers run concurrently rather + * than serially. No wall-clock timing thresholds are used. */ @OptIn(ExperimentalAtomicApi::class) @Test fun `server handles concurrent requests concurrently`() = runBlocking { - val slowToolDelay = 500.milliseconds - val fastToolDelay = 50.milliseconds - val serverOptions = ServerOptions( capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(null)), ) @@ -102,12 +97,17 @@ class ConcurrencyTest { options = serverOptions, ) - server.addTool("slow_tool", "A tool that takes a while") { - delay(slowToolDelay.inWholeMilliseconds) + // Latch that blocks the slow handler until we signal it to finish. + // This lets us prove the fast handler completed while the slow one + // was still running — impossible under serial dispatch. + val slowHandlerCanFinish = CompletableDeferred() + + server.addTool("slow_tool", "A tool that blocks until signaled") { + slowHandlerCanFinish.await() CallToolResult(content = listOf(TextContent("slow_tool_done"))) } - server.addTool("fast_tool", "A tool that is quick but still has delay") { - delay(fastToolDelay.inWholeMilliseconds) + + server.addTool("fast_tool", "A tool that completes immediately") { CallToolResult(content = listOf(TextContent("fast_tool_done"))) } @@ -126,42 +126,33 @@ class ConcurrencyTest { launch { serverSessionResult.complete(server.createSession(serverTransport)) }, ).joinAll() - val mark = TimeSource.Monotonic.markNow() - + // Start the slow request (handler blocks on slowHandlerCanFinish) val slowResult = CompletableDeferred() - val fastResult = CompletableDeferred() - launch { slowResult.complete(client.callTool("slow_tool", mapOf())) } - delay(50) - + // Start the fast request + val fastResult = CompletableDeferred() launch { fastResult.complete(client.callTool("fast_tool", mapOf())) } - val slow = withTimeout(5.seconds) { slowResult.await() } + // The fast request must complete while the slow handler is still suspended. + // Under serial dispatch, both requests would be blocked behind the slow handler, + // so the fast result could never arrive. val fast = withTimeout(5.seconds) { fastResult.await() } - - assertNotNull(slow) assertNotNull(fast) - val elapsed = mark.elapsedNow() - // If concurrent: elapsed ≈ max(slowToolDelay, fastToolDelay) + overhead - // If serial: elapsed ≈ slowToolDelay + fastToolDelay - val serialThreshold = slowToolDelay + fastToolDelay - if (elapsed >= serialThreshold) { - throw AssertionError( - "Requests were handled serially. Total duration ${elapsed.inWholeMilliseconds}ms " + - "should be < ${serialThreshold.inWholeMilliseconds}ms.", - ) - } + // Now release the slow handler and verify it completes + slowHandlerCanFinish.complete(Unit) + val slow = withTimeout(5.seconds) { slowResult.await() } + assertNotNull(slow) + Unit } finally { - // Clean up: close transports and cancel scope to prevent coroutine leaks clientTransport.close() serverTransport.close() scope.cancel() } } -} +} \ No newline at end of file From b66cc1d8ee69b43931b04804cdd5367a1fae7564 Mon Sep 17 00:00:00 2001 From: Lobsterdog Contributors Date: Thu, 21 May 2026 09:24:49 -0600 Subject: [PATCH 7/7] fix: address Copilot review comments on PR #764 - Use imported Dispatchers instead of fully-qualified reference - Add @see ProtocolOptions.concurrentMessageHandling to connect() KDoc (was incorrectly using @property tag for a non-property) - Add slowHandlerStarted latch to ConcurrencyTest to ensure fast request arrives while slow handler is suspended (deterministic test) - Remove unused assertTrue import from ConcurrencyTest --- .../kotlin/sdk/integration/ConcurrencyTest.kt | 11 ++++++++++- .../kotlin/sdk/shared/Protocol.kt | 4 +++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt b/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt index feb78c9cd..5731b3627 100644 --- a/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt +++ b/integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/ConcurrencyTest.kt @@ -25,7 +25,6 @@ import kotlinx.coroutines.withTimeout import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.test.Test import kotlin.test.assertNotNull -import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds /** @@ -97,12 +96,18 @@ class ConcurrencyTest { options = serverOptions, ) + // Latch that signals when the slow handler has started and suspended. + // This ensures the fast request arrives while the slow handler is already + // blocking, proving the test is a true concurrency regression test. + val slowHandlerStarted = CompletableDeferred() + // Latch that blocks the slow handler until we signal it to finish. // This lets us prove the fast handler completed while the slow one // was still running — impossible under serial dispatch. val slowHandlerCanFinish = CompletableDeferred() server.addTool("slow_tool", "A tool that blocks until signaled") { + slowHandlerStarted.complete(Unit) slowHandlerCanFinish.await() CallToolResult(content = listOf(TextContent("slow_tool_done"))) } @@ -132,6 +137,10 @@ class ConcurrencyTest { slowResult.complete(client.callTool("slow_tool", mapOf())) } + // Wait until the slow handler has actually started and suspended, + // so the fast request arrives while the slow handler is blocking. + withTimeout(5.seconds) { slowHandlerStarted.await() } + // Start the fast request val fastResult = CompletableDeferred() launch { diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index e5d9a2c66..2dbc069d2 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -33,6 +33,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.cancel @@ -256,11 +257,12 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio * handler sends its own request to the peer and awaits the response. Defaults to false for backward * compatibility; set to true for transports with independent receive loops (Stdio, WebSocket, * Channel) where a blocking handler would otherwise stall message processing. + * @see ProtocolOptions.concurrentMessageHandling */ public open suspend fun connect(transport: Transport) { this.transport = transport if (options?.concurrentMessageHandling == true) { - handlerScope = CoroutineScope(SupervisorJob() + kotlinx.coroutines.Dispatchers.Default) + handlerScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) } transport.onClose {