Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package io.modelcontextprotocol.kotlin.sdk.integration

import io.modelcontextprotocol.kotlin.sdk.client.Client
import io.modelcontextprotocol.kotlin.sdk.client.ClientOptions
import io.modelcontextprotocol.kotlin.sdk.server.Server
import io.modelcontextprotocol.kotlin.sdk.server.ServerOptions
import io.modelcontextprotocol.kotlin.sdk.server.ServerSession
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions
import io.modelcontextprotocol.kotlin.sdk.types.CallToolResult
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities
import io.modelcontextprotocol.kotlin.sdk.types.TextContent
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.test.Test
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.seconds

/**
* Tests that the Protocol layer handles incoming messages concurrently,
* preventing deadlock when a request handler needs to wait for other messages.
*
* See: https://github.com/modelcontextprotocol/kotlin-sdk/issues/176
*/
class ConcurrencyTest {

/**
* A channel-based transport that delivers messages asynchronously via Kotlin Channels,
* simulating real network transports. This is necessary to reproduce the concurrency
* bug — the synchronous InMemoryTransport masks the issue.
*/
@OptIn(ExperimentalAtomicApi::class)
private class ChannelTransport(
private val scope: CoroutineScope,
private val sendChannel: Channel<JSONRPCMessage>,
private val receiveChannel: Channel<JSONRPCMessage>,
) : AbstractTransport() {
override suspend fun start() {
scope.launch {
for (message in receiveChannel) {
_onMessage.invoke(message)
}
}
}
Comment on lines +49 to +55

override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
sendChannel.send(message)
}

override suspend fun close() {
sendChannel.close()
receiveChannel.cancel()
invokeOnCloseCallback()
}

companion object {
fun createLinkedPair(scope: CoroutineScope): Pair<ChannelTransport, ChannelTransport> {
val clientToServer = Channel<JSONRPCMessage>(Channel.UNLIMITED)
val serverToClient = Channel<JSONRPCMessage>(Channel.UNLIMITED)
return Pair(
ChannelTransport(scope, serverToClient, clientToServer),
ChannelTransport(scope, clientToServer, serverToClient),
)
}
}
}

/**
* Verifies that concurrent tool calls are handled concurrently, not serially.
*
* Uses deterministic synchronization: the fast tool completes while the slow
* handler is still suspended, proving that handlers run concurrently rather
* than serially. No wall-clock timing thresholds are used.
*/
@OptIn(ExperimentalAtomicApi::class)
@Test
fun `server handles concurrent requests concurrently`() = runBlocking {
Comment thread
MichielDean marked this conversation as resolved.
val serverOptions = ServerOptions(
capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(null)),
)
serverOptions.concurrentMessageHandling = true

val server = Server(
serverInfo = Implementation("test-server", "1.0"),
options = serverOptions,
)

// Latch that signals when the slow handler has started and suspended.
// This ensures the fast request arrives while the slow handler is already
// blocking, proving the test is a true concurrency regression test.
val slowHandlerStarted = CompletableDeferred<Unit>()

// Latch that blocks the slow handler until we signal it to finish.
// This lets us prove the fast handler completed while the slow one
// was still running — impossible under serial dispatch.
val slowHandlerCanFinish = CompletableDeferred<Unit>()

server.addTool("slow_tool", "A tool that blocks until signaled") {
slowHandlerStarted.complete(Unit)
slowHandlerCanFinish.await()
CallToolResult(content = listOf(TextContent("slow_tool_done")))
}

server.addTool("fast_tool", "A tool that completes immediately") {
CallToolResult(content = listOf(TextContent("fast_tool_done")))
}

val client = Client(
clientInfo = Implementation("test-client", "1.0"),
options = ClientOptions(),
)

val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
val (clientTransport, serverTransport) = ChannelTransport.createLinkedPair(scope)
val serverSessionResult = CompletableDeferred<ServerSession>()

try {
listOf(
launch { client.connect(clientTransport) },
launch { serverSessionResult.complete(server.createSession(serverTransport)) },
).joinAll()

// Start the slow request (handler blocks on slowHandlerCanFinish)
val slowResult = CompletableDeferred<CallToolResult>()
launch {
slowResult.complete(client.callTool("slow_tool", mapOf()))
}

// Wait until the slow handler has actually started and suspended,
// so the fast request arrives while the slow handler is blocking.
withTimeout(5.seconds) { slowHandlerStarted.await() }

// Start the fast request
val fastResult = CompletableDeferred<CallToolResult>()
launch {
fastResult.complete(client.callTool("fast_tool", mapOf()))
}
Comment on lines +134 to +148
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in this commit. Added a slowHandlerStarted CompletableDeferred that is completed at the start of the slow handler. The test now awaits this latch before launching the fast request, ensuring the slow handler is already suspended when the fast request arrives.


// The fast request must complete while the slow handler is still suspended.
// Under serial dispatch, both requests would be blocked behind the slow handler,
// so the fast result could never arrive.
val fast = withTimeout(5.seconds) { fastResult.await() }
assertNotNull(fast)
Comment on lines +153 to +154

// Now release the slow handler and verify it completes
slowHandlerCanFinish.complete(Unit)
val slow = withTimeout(5.seconds) { slowResult.await() }
assertNotNull(slow)
Comment on lines +158 to +159
Unit
} finally {
clientTransport.close()
serverTransport.close()
scope.cancel()
}
}
}
6 changes: 5 additions & 1 deletion kotlin-sdk-core/Module.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ designed for Kotlin Multiplatform with explicit API mode enabled.
handling. `WebSocketMcpTransport` adds a shared WebSocket implementation for both client and server sides, and
`ReadBuffer` handles streaming JSON-RPC framing.
- **Protocol engine**: The `Protocol` base class manages request/response correlation, notifications, progress tokens,
and capability assertions. Higher-level modules extend it to become `Client` and `Server`.
and capability assertions. When `concurrentMessageHandling` is enabled on `ProtocolOptions`, incoming requests and
notifications are dispatched concurrently in separate coroutines backed by a `SupervisorJob`, preventing deadlock
when a request handler sends its own request (e.g., `roots/list`) before responding. Defaults to false for backward
compatibility; set to true for transports with independent receive loops (Stdio, WebSocket, Channel).
Higher-level modules extend `Protocol` to become `Client` and `Server`.
- **Errors and safety**: Common exception types (`McpException`, parsing errors) plus capability enforcement hooks
ensure callers cannot use endpoints the peer does not advertise.

Expand Down
2 changes: 2 additions & 0 deletions kotlin-sdk-core/api/kotlin-sdk-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ public final class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolKt {
public class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions {
public synthetic fun <init> (ZJILkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> (ZJLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getConcurrentMessageHandling ()Z
public final fun getEnforceStrictCapabilities ()Z
public final fun getTimeout-UwyO8pc ()J
public final fun setConcurrentMessageHandling (Z)V
public final fun setEnforceStrictCapabilities (Z)V
public final fun setTimeout-LRDsOJo (J)V
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ import kotlinx.atomicfu.update
import kotlinx.collections.immutable.PersistentMap
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.JsonPrimitive
Expand Down Expand Up @@ -64,7 +70,17 @@ public typealias ProgressCallback = (Progress) -> Unit
public open class ProtocolOptions(
public var enforceStrictCapabilities: Boolean = false,
public var timeout: Duration = DEFAULT_REQUEST_TIMEOUT,
)
) {
/**
* When true, incoming requests and notifications are handled concurrently
* in separate coroutines, allowing the message receive loop to continue
* processing other messages. This prevents deadlock when a handler sends
* its own request to the peer. Defaults to false for backward compatibility;
* set to true for transports with independent receive loops (Stdio, WebSocket,
* Channel) where a blocking handler would otherwise stall message processing.
*/
public var concurrentMessageHandling: Boolean = false
}

/**
* The default request timeout.
Expand Down Expand Up @@ -148,6 +164,13 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
public var transport: Transport? = null
private set

/**
* Scope for launching concurrent request and notification handlers.
* Created on [connect] and cancelled on [doClose].
* Using [SupervisorJob] so a failing handler doesn't cancel sibling handlers.
*/
private var handlerScope: CoroutineScope? = null

private val _requestHandlers:
AtomicRef<PersistentMap<String, suspend (JSONRPCRequest, RequestHandlerExtra) -> RequestResult?>> =
atomic(persistentMapOf())
Expand Down Expand Up @@ -227,9 +250,21 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
* Attaches to the given transport, starts it, and starts listening for messages.
*
* The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward.
*
* When [ProtocolOptions.concurrentMessageHandling] is true, incoming requests and notifications
* are handled concurrently in separate coroutines, allowing the message receive loop to continue processing
* other messages (including responses to outgoing requests). This prevents deadlock when a request
* handler sends its own request to the peer and awaits the response. Defaults to false for backward
* compatibility; set to true for transports with independent receive loops (Stdio, WebSocket,
* Channel) where a blocking handler would otherwise stall message processing.
* @see ProtocolOptions.concurrentMessageHandling
*/
public open suspend fun connect(transport: Transport) {
this.transport = transport
if (options?.concurrentMessageHandling == true) {
handlerScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
}
Comment on lines 262 to +266

transport.onClose {
doClose()
}
Expand All @@ -241,9 +276,35 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
transport.onMessage { message ->
when (message) {
is JSONRPCResponse -> onResponse(message, null)
is JSONRPCRequest -> onRequest(message)
is JSONRPCNotification -> onNotification(message)

is JSONRPCError -> onResponse(null, message)

is JSONRPCRequest -> {
val scope = handlerScope
if (scope != null) {
// Concurrent handling: launch in a separate coroutine so the message
// receive loop is not blocked while the handler runs.
scope.launch(CoroutineName("MCP-Request-${message.id}")) {
onRequest(message)
}
Comment on lines +285 to +289
} else {
// Synchronous handling: for transports that need responses sent within
// the same context (e.g., HTTP transports responding directly).
onRequest(message)
}
}

is JSONRPCNotification -> {
val scope = handlerScope
if (scope != null) {
scope.launch(CoroutineName("MCP-Notification-${message.method}")) {
onNotification(message)
}
} else {
onNotification(message)
}
}

is JSONRPCEmptyMessage -> Unit
}
}
Expand All @@ -253,6 +314,9 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
}

private fun doClose() {
handlerScope?.cancel()
handlerScope = null

val handlersToNotify = _responseHandlers.value.values.toList()
_responseHandlers.getAndSet(persistentMapOf())
_progressHandlers.getAndSet(persistentMapOf())
Expand Down