Skip to content

Commit ef8ba32

Browse files
committed
feat: introduce Configuration data class for StreamableHttpServerTransport
Replace the six-parameter flat constructor of StreamableHttpServerTransport with a typed Configuration data class. This improves API ergonomics, enables structural equality and copy(), and provides a stable extension point for future options without further breaking the constructor signature. Changes: - Add `Configuration` as a public data class nested directly on `StreamableHttpServerTransport`, with `enableJsonResponse` as the first parameter (most commonly set) - Change the primary constructor to accept `Configuration` - Rename `retryIntervalMillis: Long?` to `retryInterval: Duration?` in Configuration, aligning with Kotlin's type-safe time API - Deprecate the old flat constructor with a compatibility bridge - Update KotlinTestBase integration test to use the new constructor
1 parent fe97b44 commit ef8ba32

File tree

5 files changed

+144
-50
lines changed

5 files changed

+144
-50
lines changed

integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/AbstractResourceIntegrationTest.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ abstract class AbstractResourceIntegrationTest : KotlinTestBase() {
165165
assertEquals(testResourceContent, content.text, "Resource content should match")
166166
}
167167

168-
@Ignore("Blocked by https://github.com/modelcontextprotocol/kotlin-sdk/issues/249")
169168
@Test
170169
fun testSubscribeAndUnsubscribe() {
171170
runBlocking(Dispatchers.IO) {

integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/kotlin/KotlinTestBase.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ abstract class KotlinTestBase {
148148
// Create StreamableHTTP server transport
149149
// Using JSON response mode for simpler testing (no SSE session required)
150150
val transport = StreamableHttpServerTransport(
151-
enableJsonResponse = true, // Use JSON response mode for testing
151+
StreamableHttpServerTransport.Configuration(
152+
enableJsonResponse = true, // Use JSON response mode for testing
153+
),
152154
)
153155
// Use stateless mode to skip session validation for simpler testing
154156
transport.setSessionIdGenerator(null)

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor
164164
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
165165
public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String;
166166
public fun <init> ()V
167+
public fun <init> (Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration;)V
168+
public synthetic fun <init> (Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
167169
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;)V
168170
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
169171
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -180,6 +182,33 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServe
180182
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
181183
}
182184

185+
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration {
186+
public static final field Companion Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration$Companion;
187+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/time/Duration;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
188+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/time/Duration;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
189+
public final fun component1 ()Z
190+
public final fun component2 ()Z
191+
public final fun component3 ()Ljava/util/List;
192+
public final fun component4 ()Ljava/util/List;
193+
public final fun component5 ()Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;
194+
public final fun component6-FghU774 ()Lkotlin/time/Duration;
195+
public final fun copy-BAu0izY (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/time/Duration;)Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration;
196+
public static synthetic fun copy-BAu0izY$default (Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration;ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/time/Duration;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration;
197+
public fun equals (Ljava/lang/Object;)Z
198+
public final fun getAllowedHosts ()Ljava/util/List;
199+
public final fun getAllowedOrigins ()Ljava/util/List;
200+
public final fun getEnableDnsRebindingProtection ()Z
201+
public final fun getEnableJsonResponse ()Z
202+
public final fun getEventStore ()Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;
203+
public final fun getRetryInterval-FghU774 ()Lkotlin/time/Duration;
204+
public fun hashCode ()I
205+
public fun toString ()Ljava/lang/String;
206+
}
207+
208+
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration$Companion {
209+
public final fun getDefault ()Lio/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport$Configuration;
210+
}
211+
183212
public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpKtorServerExtensionsKt {
184213
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V
185214
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function0;)V

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,13 @@ private suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint(
225225
block: RoutingContext.() -> Server,
226226
) {
227227
val transport = StreamableHttpServerTransport(
228-
enableDnsRebindingProtection = enableDnsRebindingProtection,
229-
allowedHosts = allowedHosts,
230-
allowedOrigins = allowedOrigins,
231-
eventStore = eventStore,
232-
enableJsonResponse = true,
228+
StreamableHttpServerTransport.Configuration(
229+
enableDnsRebindingProtection = enableDnsRebindingProtection,
230+
allowedHosts = allowedHosts,
231+
allowedOrigins = allowedOrigins,
232+
eventStore = eventStore,
233+
enableJsonResponse = true,
234+
),
233235
).also { it.setSessionIdGenerator(null) }
234236

235237
logger.info { "New stateless StreamableHttp connection established without sessionId" }
@@ -305,11 +307,13 @@ private suspend fun RoutingContext.streamableTransport(
305307
}
306308

307309
val transport = StreamableHttpServerTransport(
308-
enableDnsRebindingProtection = enableDnsRebindingProtection,
309-
allowedHosts = allowedHosts,
310-
allowedOrigins = allowedOrigins,
311-
eventStore = eventStore,
312-
enableJsonResponse = true,
310+
StreamableHttpServerTransport.Configuration(
311+
enableDnsRebindingProtection = enableDnsRebindingProtection,
312+
allowedHosts = allowedHosts,
313+
allowedOrigins = allowedOrigins,
314+
eventStore = eventStore,
315+
enableJsonResponse = true,
316+
),
313317
)
314318

315319
transport.setOnSessionInitialized { initializedSessionId ->

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import kotlinx.serialization.json.JsonObject
3636
import kotlinx.serialization.json.decodeFromJsonElement
3737
import kotlin.concurrent.atomics.AtomicBoolean
3838
import kotlin.concurrent.atomics.ExperimentalAtomicApi
39+
import kotlin.time.Duration
40+
import kotlin.time.Duration.Companion.milliseconds
3941
import kotlin.uuid.ExperimentalUuidApi
4042
import kotlin.uuid.Uuid
4143

@@ -46,8 +48,8 @@ private const val MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024 // 4 MB
4648

4749
/**
4850
* A holder for an active request call.
49-
* If enableJsonResponse is true, session is null.
50-
* Otherwise, session is not null.
51+
* If [StreamableHttpServerTransport.Configuration.enableJsonResponse] is true, the session is null.
52+
* Otherwise, the session is not null.
5153
*/
5254
private data class SessionContext(val session: ServerSSESession?, val call: ApplicationCall)
5355

@@ -66,32 +68,86 @@ private data class SessionContext(val session: ServerSSESession?, val call: Appl
6668
* - No Session ID is included in any responses
6769
* - No session validation is performed
6870
*
69-
* @param enableJsonResponse If true, the server will return JSON responses instead of starting an SSE stream.
70-
* This can be useful for simple request/response scenarios without streaming.
71-
* Default is false (SSE streams are preferred).
72-
* @param enableDnsRebindingProtection Enable DNS rebinding protection
73-
* (requires allowedHosts and/or allowedOrigins to be configured).
74-
* Default is false for backwards compatibility.
75-
* @param allowedHosts List of allowed host header values for DNS rebinding protection.
76-
* If not specified, host validation is disabled.
77-
* @param allowedOrigins List of allowed origin header values for DNS rebinding protection.
78-
* If not specified, origin validation is disabled.
79-
* @param eventStore Event store for resumability support
80-
* If provided, resumability will be enabled, allowing clients to reconnect and resume messages
81-
* @param retryIntervalMillis Retry interval (in milliseconds) advertised via SSE priming events
82-
* to hint the client when to reconnect. Applies only when an [eventStore] is configured.
83-
* Defaults to `null` (no retry hint).
71+
* @param configuration Transport configuration. See [Configuration] for available options.
8472
*/
8573
@OptIn(ExperimentalUuidApi::class, ExperimentalAtomicApi::class)
8674
@Suppress("TooManyFunctions")
87-
public class StreamableHttpServerTransport(
88-
private val enableJsonResponse: Boolean = false,
89-
private val enableDnsRebindingProtection: Boolean = false,
90-
private val allowedHosts: List<String>? = null,
91-
private val allowedOrigins: List<String>? = null,
92-
private val eventStore: EventStore? = null,
93-
private val retryIntervalMillis: Long? = null,
94-
) : AbstractTransport() {
75+
public class StreamableHttpServerTransport(private val configuration: Configuration = Configuration.Default) :
76+
AbstractTransport() {
77+
78+
/**
79+
* Secondary constructor for `StreamableHttpServerTransport` that simplifies initialization by directly taking the
80+
* configurable parameters without requiring a `Configuration` instance.
81+
*
82+
* @param enableJsonResponse Determines whether the server should return JSON responses.
83+
* Defaults to `false`.
84+
* @param enableDnsRebindingProtection Enables DNS rebinding protection.
85+
* Defaults to `false`.
86+
* @param allowedHosts A list of hosts allowed for server communication.
87+
* Defaults to `null`, allowing all hosts.
88+
* @param allowedOrigins A list of allowed origins for CORS (Cross-Origin Resource Sharing).
89+
* Defaults to `null`, allowing all origins.
90+
* @param eventStore The `EventStore` instance for handling resumable events.
91+
* Defaults to `null`, disabling resumability.
92+
* @param retryIntervalMillis Retry interval in milliseconds for event handling or reconnection attempts.
93+
* Defaults to `null`.
94+
*/
95+
@Deprecated(
96+
"Use constructor with Configuration: StreamableHttpServerTransport(Configuration(enableJsonResponse = ...))",
97+
level = DeprecationLevel.WARNING,
98+
)
99+
public constructor(
100+
enableJsonResponse: Boolean = false,
101+
enableDnsRebindingProtection: Boolean = false,
102+
allowedHosts: List<String>? = null,
103+
allowedOrigins: List<String>? = null,
104+
eventStore: EventStore? = null,
105+
retryIntervalMillis: Long? = null,
106+
) : this(
107+
Configuration(
108+
enableJsonResponse = enableJsonResponse,
109+
enableDnsRebindingProtection = enableDnsRebindingProtection,
110+
allowedHosts = allowedHosts,
111+
allowedOrigins = allowedOrigins,
112+
eventStore = eventStore,
113+
retryInterval = retryIntervalMillis?.milliseconds,
114+
),
115+
)
116+
117+
/**
118+
* Configuration for managing various aspects of the StreamableHttpServerTransport.
119+
*
120+
* @property enableJsonResponse Determines whether the server should return JSON responses.
121+
* Defaults to `false`.
122+
*
123+
* @property enableDnsRebindingProtection Enables DNS rebinding protection.
124+
* Defaults to `false`.
125+
*
126+
* @property allowedHosts A list of hosts allowed for server communication.
127+
* Defaults to `null`, allowing all hosts.
128+
*
129+
* @property allowedOrigins A list of allowed origins for CORS (Cross-Origin Resource Sharing).
130+
* Defaults to `null`, allowing all origins.
131+
*
132+
* @property eventStore The `EventStore` instance for handling resumable events.
133+
* Defaults to `null`, disabling resumability.
134+
*
135+
* @property retryInterval Retry interval for event handling or reconnection attempts.
136+
* Defaults to `null`.
137+
*/
138+
public data class Configuration(
139+
public val enableJsonResponse: Boolean = false,
140+
public val enableDnsRebindingProtection: Boolean = false,
141+
public val allowedHosts: List<String>? = null,
142+
public val allowedOrigins: List<String>? = null,
143+
public val eventStore: EventStore? = null,
144+
public val retryInterval: Duration? = null,
145+
) {
146+
public companion object {
147+
public val Default: Configuration = Configuration()
148+
}
149+
}
150+
95151
public var sessionId: String? = null
96152
private set
97153

@@ -177,7 +233,7 @@ public class StreamableHttpServerTransport(
177233
?: error("No connection established for request id $routingRequestId")
178234
val activeStream = streamsMapping[streamId]
179235

180-
if (!enableJsonResponse) {
236+
if (!configuration.enableJsonResponse) {
181237
activeStream?.let { stream ->
182238
emitOnStream(streamId, stream.session, message)
183239
}
@@ -194,7 +250,7 @@ public class StreamableHttpServerTransport(
194250
streamMutex.withLock {
195251
if (activeStream == null) error("No connection established for request ID: $routingRequestId")
196252

197-
if (enableJsonResponse) {
253+
if (configuration.enableJsonResponse) {
198254
activeStream.call.response.header(HttpHeaders.ContentType, ContentType.Application.Json.toString())
199255
sessionId?.let { activeStream.call.response.header(MCP_SESSION_ID_HEADER, it) }
200256
val responses = relatedIds.mapNotNull { requestToResponseMapping[it] }
@@ -261,7 +317,7 @@ public class StreamableHttpServerTransport(
261317
@Suppress("CyclomaticComplexMethod", "LongMethod", "ReturnCount", "TooGenericExceptionCaught")
262318
public suspend fun handlePostRequest(session: ServerSSESession?, call: ApplicationCall) {
263319
try {
264-
if (!enableJsonResponse && session == null) {
320+
if (!configuration.enableJsonResponse && session == null) {
265321
error("Server session can't be null for SSE responses")
266322
}
267323

@@ -328,7 +384,7 @@ public class StreamableHttpServerTransport(
328384
}
329385

330386
val streamId = Uuid.random().toString()
331-
if (!enableJsonResponse) {
387+
if (!configuration.enableJsonResponse) {
332388
call.appendSseHeaders()
333389
flushSse(session) // flush headers immediately
334390
maybeSendPrimingEvent(streamId, session)
@@ -353,7 +409,7 @@ public class StreamableHttpServerTransport(
353409

354410
@Suppress("ReturnCount")
355411
public suspend fun handleGetRequest(session: ServerSSESession?, call: ApplicationCall) {
356-
if (enableJsonResponse) {
412+
if (configuration.enableJsonResponse) {
357413
call.reject(
358414
HttpStatusCode.MethodNotAllowed,
359415
RPCError.ErrorCode.CONNECTION_CLOSED,
@@ -375,7 +431,7 @@ public class StreamableHttpServerTransport(
375431

376432
if (!validateSession(call) || !validateProtocolVersion(call)) return
377433

378-
eventStore?.let { store ->
434+
configuration.eventStore?.let { store ->
379435
call.request.header(MCP_RESUMPTION_TOKEN_HEADER)?.let { lastEventId ->
380436
replayEvents(store, lastEventId, sseSession)
381437
return
@@ -413,7 +469,7 @@ public class StreamableHttpServerTransport(
413469
*/
414470
@Suppress("ReturnCount", "TooGenericExceptionCaught")
415471
public suspend fun closeSseStream(requestId: RequestId) {
416-
if (enableJsonResponse) return
472+
if (configuration.enableJsonResponse) return
417473
val streamId = requestToStreamMapping[requestId] ?: return
418474
val sessionContext = streamsMapping[streamId] ?: return
419475

@@ -562,9 +618,9 @@ public class StreamableHttpServerTransport(
562618

563619
@Suppress("ReturnCount")
564620
private fun validateHeaders(call: ApplicationCall): String? {
565-
if (!enableDnsRebindingProtection) return null
621+
if (!configuration.enableDnsRebindingProtection) return null
566622

567-
allowedHosts?.let { hosts ->
623+
configuration.allowedHosts?.let { hosts ->
568624
val hostHeader = call.request.headers[HttpHeaders.Host]?.lowercase()
569625
val allowedHostsLowercase = hosts.map { it.lowercase() }
570626

@@ -573,7 +629,7 @@ public class StreamableHttpServerTransport(
573629
}
574630
}
575631

576-
allowedOrigins?.let { origins ->
632+
configuration.allowedOrigins?.let { origins ->
577633
val originHeader = call.request.headers[HttpHeaders.Origin]?.lowercase()
578634
val allowedOriginsLowercase = origins.map { it.lowercase() }
579635

@@ -636,7 +692,7 @@ public class StreamableHttpServerTransport(
636692
this?.lowercase()?.contains(mime.toString().lowercase()) == true
637693

638694
private suspend fun emitOnStream(streamId: String, session: ServerSSESession?, message: JSONRPCMessage) {
639-
val eventId = eventStore?.storeEvent(streamId, message)
695+
val eventId = configuration.eventStore?.storeEvent(streamId, message)
640696
try {
641697
session?.send(event = "message", id = eventId, data = McpJson.encodeToString(message))
642698
} catch (_: Exception) {
@@ -646,11 +702,15 @@ public class StreamableHttpServerTransport(
646702

647703
@Suppress("TooGenericExceptionCaught")
648704
private suspend fun maybeSendPrimingEvent(streamId: String, session: ServerSSESession?) {
649-
val store = eventStore ?: return
705+
val store = configuration.eventStore ?: return
650706
val sseSession = session ?: return
651707
try {
652708
val primingEventId = store.storeEvent(streamId, JSONRPCEmptyMessage)
653-
sseSession.send(id = primingEventId, retry = retryIntervalMillis, data = "")
709+
sseSession.send(
710+
id = primingEventId,
711+
retry = configuration.retryInterval?.inWholeMilliseconds,
712+
data = "",
713+
)
654714
} catch (e: Exception) {
655715
_onError(e)
656716
}

0 commit comments

Comments
 (0)