Skip to content

Commit ec53adc

Browse files
authored
feat: introduce StreamableHttpServerTransport.Configuration (#560)
## Introduce Configuration class for StreamableHttpServerTransport Replace the six-parameter flat constructor of `StreamableHttpServerTransport` with a typed `Configuration` class. This improves API ergonomics, enables structural equality and copy(), and provides a stable extension point for future options without further breaking the constructor signature. ### Changes: - Add `Configuration` as a public class nested directly on `StreamableHttpServerTransport`, with `enableJsonResponse` as the first parameter (most commonly set) - Change the primary constructor to accept `Configuration`. - Rename `retryIntervalMillis: Long?` to `retryInterval: Duration?` in Configuration, aligning with Kotlin's type-safe time API - Deprecate the old flat constructor with a compatibility bridge - Update KotlinTestBase integration test to use the new constructor - Enable test AbstractResourceIntegrationTest.testSubscribeAndUnsubscribe() since #249 is closed ## Motivation and Context The current StreamableHttpServerTransport API cannot be easily extended: adding more parameters would be a breaking change. But this is already needed for #521. This PR is a prerequisite for #535 ## How Has This Been Tested? Regression tests ## Breaking Changes No. Current StreamableHttpServerTransport constructor was deprecated ## Types of changes <!-- What types of changes does your code introduce? Put an `x` in all the boxes that apply: --> - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update ## Checklist <!-- Go over all the following points, and put an `x` in all the boxes that apply. --> - [x] I have read the [MCP Documentation](https://modelcontextprotocol.io) - [x] My code follows the repository's style guidelines - [x] New and existing tests pass locally - [ ] I have added appropriate error handling - [ ] I have added or updated documentation as needed
1 parent 43b59c9 commit ec53adc

5 files changed

Lines changed: 128 additions & 51 deletions

File tree

integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractResourceIntegrationTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import kotlinx.coroutines.test.runTest
1919
import org.junit.jupiter.api.Test
2020
import org.junit.jupiter.api.assertThrows
2121
import java.util.concurrent.atomic.AtomicBoolean
22-
import kotlin.test.Ignore
2322
import kotlin.test.assertEquals
2423
import kotlin.test.assertNotNull
2524
import kotlin.test.assertTrue
@@ -165,7 +164,6 @@ abstract class AbstractResourceIntegrationTest : KotlinTestBase() {
165164
assertEquals(testResourceContent, content.text, "Resource content should match")
166165
}
167166

168-
@Ignore("Blocked by https://github.com/modelcontextprotocol/kotlin-sdk/issues/249")
169167
@Test
170168
fun testSubscribeAndUnsubscribe() {
171169
runBlocking(Dispatchers.IO) {

integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ abstract class KotlinTestBase {
148148
// Create StreamableHTTP server transport
149149
// Using JSON response mode for simpler testing (no SSE session required)
150150
val transport = StreamableHttpServerTransport(
151-
enableJsonResponse = true, // Use JSON response mode for testing
151+
StreamableHttpServerTransport.Configuration(
152+
enableJsonResponse = true, // Use JSON response mode for testing
153+
),
152154
)
153155
// Use stateless mode to skip session validation for simpler testing
154156
transport.setSessionIdGenerator(null)

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor
195195
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
196196
public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String;
197197
public fun <init> ()V
198+
public fun <init> (Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration;)V
198199
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;)V
199200
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
200201
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -211,6 +212,17 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServe
211212
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
212213
}
213214

215+
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration {
216+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/time/Duration;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
217+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/time/Duration;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
218+
public final fun getAllowedHosts ()Ljava/util/List;
219+
public final fun getAllowedOrigins ()Ljava/util/List;
220+
public final fun getEnableDnsRebindingProtection ()Z
221+
public final fun getEnableJsonResponse ()Z
222+
public final fun getEventStore ()Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;
223+
public final fun getRetryInterval-FghU774 ()Lkotlin/time/Duration;
224+
}
225+
214226
public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpKtorServerExtensionsKt {
215227
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V
216228
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function0;)V

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,13 @@ private suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint(
225225
block: RoutingContext.() -> Server,
226226
) {
227227
val transport = StreamableHttpServerTransport(
228-
enableDnsRebindingProtection = enableDnsRebindingProtection,
229-
allowedHosts = allowedHosts,
230-
allowedOrigins = allowedOrigins,
231-
eventStore = eventStore,
232-
enableJsonResponse = true,
228+
StreamableHttpServerTransport.Configuration(
229+
enableDnsRebindingProtection = enableDnsRebindingProtection,
230+
allowedHosts = allowedHosts,
231+
allowedOrigins = allowedOrigins,
232+
eventStore = eventStore,
233+
enableJsonResponse = true,
234+
),
233235
).also { it.setSessionIdGenerator(null) }
234236

235237
logger.info { "New stateless StreamableHttp connection established without sessionId" }
@@ -305,11 +307,13 @@ private suspend fun RoutingContext.streamableTransport(
305307
}
306308

307309
val transport = StreamableHttpServerTransport(
308-
enableDnsRebindingProtection = enableDnsRebindingProtection,
309-
allowedHosts = allowedHosts,
310-
allowedOrigins = allowedOrigins,
311-
eventStore = eventStore,
312-
enableJsonResponse = true,
310+
StreamableHttpServerTransport.Configuration(
311+
enableDnsRebindingProtection = enableDnsRebindingProtection,
312+
allowedHosts = allowedHosts,
313+
allowedOrigins = allowedOrigins,
314+
eventStore = eventStore,
315+
enableJsonResponse = true,
316+
),
313317
)
314318

315319
transport.setOnSessionInitialized { initializedSessionId ->

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 99 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import kotlinx.serialization.json.JsonObject
3636
import kotlinx.serialization.json.decodeFromJsonElement
3737
import kotlin.concurrent.atomics.AtomicBoolean
3838
import kotlin.concurrent.atomics.ExperimentalAtomicApi
39+
import kotlin.time.Duration
40+
import kotlin.time.Duration.Companion.milliseconds
3941
import kotlin.uuid.ExperimentalUuidApi
4042
import kotlin.uuid.Uuid
4143

@@ -46,8 +48,8 @@ private const val MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024 // 4 MB
4648

4749
/**
4850
* A holder for an active request call.
49-
* If enableJsonResponse is true, session is null.
50-
* Otherwise, session is not null.
51+
* If [StreamableHttpServerTransport.Configuration.enableJsonResponse] is true, the session is null.
52+
* Otherwise, the session is not null.
5153
*/
5254
private data class SessionContext(val session: ServerSSESession?, val call: ApplicationCall)
5355

@@ -66,32 +68,87 @@ private data class SessionContext(val session: ServerSSESession?, val call: Appl
6668
* - No Session ID is included in any responses
6769
* - No session validation is performed
6870
*
69-
* @param enableJsonResponse If true, the server will return JSON responses instead of starting an SSE stream.
70-
* This can be useful for simple request/response scenarios without streaming.
71-
* Default is false (SSE streams are preferred).
72-
* @param enableDnsRebindingProtection Enable DNS rebinding protection
73-
* (requires allowedHosts and/or allowedOrigins to be configured).
74-
* Default is false for backwards compatibility.
75-
* @param allowedHosts List of allowed host header values for DNS rebinding protection.
76-
* If not specified, host validation is disabled.
77-
* @param allowedOrigins List of allowed origin header values for DNS rebinding protection.
78-
* If not specified, origin validation is disabled.
79-
* @param eventStore Event store for resumability support
80-
* If provided, resumability will be enabled, allowing clients to reconnect and resume messages
81-
* @param retryIntervalMillis Retry interval (in milliseconds) advertised via SSE priming events
82-
* to hint the client when to reconnect. Applies only when an [eventStore] is configured.
83-
* Defaults to `null` (no retry hint).
71+
* @param configuration Transport configuration. See [Configuration] for available options.
8472
*/
8573
@OptIn(ExperimentalUuidApi::class, ExperimentalAtomicApi::class)
8674
@Suppress("TooManyFunctions")
87-
public class StreamableHttpServerTransport(
88-
private val enableJsonResponse: Boolean = false,
89-
private val enableDnsRebindingProtection: Boolean = false,
90-
private val allowedHosts: List<String>? = null,
91-
private val allowedOrigins: List<String>? = null,
92-
private val eventStore: EventStore? = null,
93-
private val retryIntervalMillis: Long? = null,
94-
) : AbstractTransport() {
75+
public class StreamableHttpServerTransport(private val configuration: Configuration) : AbstractTransport() {
76+
77+
@Deprecated("Use default constructor with explicit Configuration()")
78+
public constructor() : this(configuration = Configuration())
79+
80+
/**
81+
* Secondary constructor for `StreamableHttpServerTransport` that simplifies initialization by directly taking the
82+
* configurable parameters without requiring a `Configuration` instance.
83+
*
84+
* @param enableJsonResponse Determines whether the server should return JSON responses.
85+
* Defaults to `false`.
86+
* @param enableDnsRebindingProtection Enables DNS rebinding protection.
87+
* Defaults to `false`.
88+
* @param allowedHosts A list of hosts allowed for server communication.
89+
* Defaults to `null`, allowing all hosts.
90+
* @param allowedOrigins A list of allowed origins for CORS (Cross-Origin Resource Sharing).
91+
* Defaults to `null`, allowing all origins.
92+
* @param eventStore The `EventStore` instance for handling resumable events.
93+
* Defaults to `null`, disabling resumability.
94+
* @param retryIntervalMillis Retry interval in milliseconds for event handling or reconnection attempts.
95+
* Defaults to `null`.
96+
*/
97+
@Suppress("MaxLineLength")
98+
@Deprecated(
99+
"Use constructor with Configuration: StreamableHttpServerTransport(Configuration(enableJsonResponse = ...))",
100+
replaceWith = ReplaceWith(
101+
"StreamableHttpServerTransport(Configuration(enableJsonResponse = enableJsonResponse, enableDnsRebindingProtection = enableDnsRebindingProtection, allowedHosts = allowedHosts, allowedOrigins = allowedOrigins, eventStore = eventStore, retryIntervalMillis = retryIntervalMillis))",
102+
),
103+
)
104+
public constructor(
105+
enableJsonResponse: Boolean = false,
106+
enableDnsRebindingProtection: Boolean = false,
107+
allowedHosts: List<String>? = null,
108+
allowedOrigins: List<String>? = null,
109+
eventStore: EventStore? = null,
110+
retryIntervalMillis: Long? = null,
111+
) : this(
112+
Configuration(
113+
enableJsonResponse = enableJsonResponse,
114+
enableDnsRebindingProtection = enableDnsRebindingProtection,
115+
allowedHosts = allowedHosts,
116+
allowedOrigins = allowedOrigins,
117+
eventStore = eventStore,
118+
retryInterval = retryIntervalMillis?.milliseconds,
119+
),
120+
)
121+
122+
/**
123+
* Configuration for managing various aspects of the StreamableHttpServerTransport.
124+
*
125+
* @property enableJsonResponse Determines whether the server should return JSON responses.
126+
* Defaults to `false`.
127+
*
128+
* @property enableDnsRebindingProtection Enables DNS rebinding protection.
129+
* Defaults to `false`.
130+
*
131+
* @property allowedHosts A list of hosts allowed for server communication.
132+
* Defaults to `null`, allowing all hosts.
133+
*
134+
* @property allowedOrigins A list of allowed origins for CORS (Cross-Origin Resource Sharing).
135+
* Defaults to `null`, allowing all origins.
136+
*
137+
* @property eventStore The `EventStore` instance for handling resumable events.
138+
* Defaults to `null`, disabling resumability.
139+
*
140+
* @property retryInterval Retry interval for event handling or reconnection attempts.
141+
* Defaults to `null`.
142+
*/
143+
public class Configuration(
144+
public val enableJsonResponse: Boolean = false,
145+
public val enableDnsRebindingProtection: Boolean = false,
146+
public val allowedHosts: List<String>? = null,
147+
public val allowedOrigins: List<String>? = null,
148+
public val eventStore: EventStore? = null,
149+
public val retryInterval: Duration? = null,
150+
)
151+
95152
public var sessionId: String? = null
96153
private set
97154

@@ -177,7 +234,7 @@ public class StreamableHttpServerTransport(
177234
?: error("No connection established for request id $routingRequestId")
178235
val activeStream = streamsMapping[streamId]
179236

180-
if (!enableJsonResponse) {
237+
if (!configuration.enableJsonResponse) {
181238
activeStream?.let { stream ->
182239
emitOnStream(streamId, stream.session, message)
183240
}
@@ -194,7 +251,7 @@ public class StreamableHttpServerTransport(
194251
streamMutex.withLock {
195252
if (activeStream == null) error("No connection established for request ID: $routingRequestId")
196253

197-
if (enableJsonResponse) {
254+
if (configuration.enableJsonResponse) {
198255
activeStream.call.response.header(HttpHeaders.ContentType, ContentType.Application.Json.toString())
199256
sessionId?.let { activeStream.call.response.header(MCP_SESSION_ID_HEADER, it) }
200257
val responses = relatedIds.mapNotNull { requestToResponseMapping[it] }
@@ -261,7 +318,7 @@ public class StreamableHttpServerTransport(
261318
@Suppress("CyclomaticComplexMethod", "LongMethod", "ReturnCount", "TooGenericExceptionCaught")
262319
public suspend fun handlePostRequest(session: ServerSSESession?, call: ApplicationCall) {
263320
try {
264-
if (!enableJsonResponse && session == null) {
321+
if (!configuration.enableJsonResponse && session == null) {
265322
error("Server session can't be null for SSE responses")
266323
}
267324

@@ -328,7 +385,7 @@ public class StreamableHttpServerTransport(
328385
}
329386

330387
val streamId = Uuid.random().toString()
331-
if (!enableJsonResponse) {
388+
if (!configuration.enableJsonResponse) {
332389
call.appendSseHeaders()
333390
flushSse(session) // flush headers immediately
334391
maybeSendPrimingEvent(streamId, session)
@@ -353,7 +410,7 @@ public class StreamableHttpServerTransport(
353410

354411
@Suppress("ReturnCount")
355412
public suspend fun handleGetRequest(session: ServerSSESession?, call: ApplicationCall) {
356-
if (enableJsonResponse) {
413+
if (configuration.enableJsonResponse) {
357414
call.reject(
358415
HttpStatusCode.MethodNotAllowed,
359416
RPCError.ErrorCode.CONNECTION_CLOSED,
@@ -375,7 +432,7 @@ public class StreamableHttpServerTransport(
375432

376433
if (!validateSession(call) || !validateProtocolVersion(call)) return
377434

378-
eventStore?.let { store ->
435+
configuration.eventStore?.let { store ->
379436
call.request.header(MCP_RESUMPTION_TOKEN_HEADER)?.let { lastEventId ->
380437
replayEvents(store, lastEventId, sseSession)
381438
return
@@ -413,7 +470,7 @@ public class StreamableHttpServerTransport(
413470
*/
414471
@Suppress("ReturnCount", "TooGenericExceptionCaught")
415472
public suspend fun closeSseStream(requestId: RequestId) {
416-
if (enableJsonResponse) return
473+
if (configuration.enableJsonResponse) return
417474
val streamId = requestToStreamMapping[requestId] ?: return
418475
val sessionContext = streamsMapping[streamId] ?: return
419476

@@ -562,9 +619,9 @@ public class StreamableHttpServerTransport(
562619

563620
@Suppress("ReturnCount")
564621
private fun validateHeaders(call: ApplicationCall): String? {
565-
if (!enableDnsRebindingProtection) return null
622+
if (!configuration.enableDnsRebindingProtection) return null
566623

567-
allowedHosts?.let { hosts ->
624+
configuration.allowedHosts?.let { hosts ->
568625
val hostHeader = call.request.headers[HttpHeaders.Host]?.lowercase()
569626
val allowedHostsLowercase = hosts.map { it.lowercase() }
570627

@@ -573,7 +630,7 @@ public class StreamableHttpServerTransport(
573630
}
574631
}
575632

576-
allowedOrigins?.let { origins ->
633+
configuration.allowedOrigins?.let { origins ->
577634
val originHeader = call.request.headers[HttpHeaders.Origin]?.lowercase()
578635
val allowedOriginsLowercase = origins.map { it.lowercase() }
579636

@@ -636,7 +693,7 @@ public class StreamableHttpServerTransport(
636693
this?.lowercase()?.contains(mime.toString().lowercase()) == true
637694

638695
private suspend fun emitOnStream(streamId: String, session: ServerSSESession?, message: JSONRPCMessage) {
639-
val eventId = eventStore?.storeEvent(streamId, message)
696+
val eventId = configuration.eventStore?.storeEvent(streamId, message)
640697
try {
641698
session?.send(event = "message", id = eventId, data = McpJson.encodeToString(message))
642699
} catch (_: Exception) {
@@ -646,11 +703,15 @@ public class StreamableHttpServerTransport(
646703

647704
@Suppress("TooGenericExceptionCaught")
648705
private suspend fun maybeSendPrimingEvent(streamId: String, session: ServerSSESession?) {
649-
val store = eventStore ?: return
706+
val store = configuration.eventStore ?: return
650707
val sseSession = session ?: return
651708
try {
652709
val primingEventId = store.storeEvent(streamId, JSONRPCEmptyMessage)
653-
sseSession.send(id = primingEventId, retry = retryIntervalMillis, data = "")
710+
sseSession.send(
711+
id = primingEventId,
712+
retry = configuration.retryInterval?.inWholeMilliseconds,
713+
data = "",
714+
)
654715
} catch (e: Exception) {
655716
_onError(e)
656717
}

0 commit comments

Comments
 (0)