fix!: harden lifecycle, dispatchers, back-pressure#778
Open
devcrocod wants to merge 1 commit into
Open
Conversation
…currency. Added support for custom `CoroutineScope` and I/O dispatchers. Updated test cases for extended functionality.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR hardens StdioServerTransport’s coroutine lifecycle and threading model to address handler-dispatcher misuse and scope detachment, while also adding bounded-channel back-pressure and more robust start/close race handling.
Changes:
- Introduces a builder-based constructor that allows configuring
handlerDispatcher,ioDispatcher, and an optional caller-providedCoroutineScope. - Reworks the internal pumps (reader/processor/writer) to use bounded channels, apply back-pressure to
send(), and coordinate shutdown via a 3-state CAS + setup barrier. - Expands JVM integration tests to cover back-pressure, concurrency races, lifecycle error cases, EOF resource closing, and dispatcher/scope configuration.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransport.kt |
Major lifecycle/dispatcher/scope refactor; introduces bounded channels + builder configuration. |
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StdioServerTransportTest.kt |
Adds/updates integration tests for new lifecycle semantics, back-pressure, and configurability. |
kotlin-sdk-server/api/kotlin-sdk-server.api |
API surface update reflecting the new builder constructor and nested Builder type. |
.gitignore |
Adjusts ignore patterns related to SWE agent tooling files/directories. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| override suspend fun start() { | ||
| if (!initialized.compareAndSet(expectedValue = false, newValue = true)) { | ||
| if (!state.compareAndSet(State.New, State.Operational)) { | ||
| error("StdioServerTransport already started!") |
| } | ||
| if (previous == State.New) { | ||
| setupComplete.complete(Unit) | ||
| return |
| logger.debug(cause) { "$jobName job completed exceptionally" } | ||
| runCatching { input.close() }.onFailure { logger.warn(it) { "Failed to close stdin" } } | ||
| readerJob?.cancel() | ||
| readChannel.close() |
| if (state.load() == State.Stopped) { | ||
| logger.debug(e) { "Reader interrupted by close()" } | ||
| } else { | ||
| logger.error(e) { "Error reading from stdin" } |
Comment on lines
+239
to
+242
| } catch (e: Throwable) { | ||
| logger.error(e) { "Error writing to stdout" } | ||
| _onError(e) | ||
| } finally { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
fixes #573
fixes #574
fixes #708
Dispatchers.IO→ nowDispatchers.Default, configurable viahandlerDispatcher.ioDispatcher(defaultIODispatcher.limitedParallelism(2)).CoroutineScopeso the pipeline isn't detached from the caller's tree.CancellationExceptionconsistently rethrown across all pumps.onClose).Sourcenow closed on natural EOF / reader error (was leaking for non-System.insources).start()andclose()fixed via 3-state CAS +setupCompletebarrier replacingAtomicBoolean.How Has This Been Tested?
unit and integration tests
Breaking Changes
Dispatchers.Default(wasDispatchers.IO). Override withhandlerDispatcher = Dispatchers.IOif your handlers perform blocking I/O.send()suspends under back-pressure instead of buffering unbounded.StdioServerTransport(input, output)ctor is@Deprecated(WARNING) in favor ofStdioServerTransport(input, output) { ... }. Still binary-compatible.Types of changes
Checklist