-
Notifications
You must be signed in to change notification settings - Fork 147
Fix stream serialization to resolve when user releases lock instead of waiting for stream to close #678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…f waiting for stream to close This prevents Vercel functions from hanging when users incrementally write to streams within steps (e.g., `await writer.write(data); writer.releaseLock()`). Uses a polling approach to detect when the stream lock is released and all pending writes are flushed.
🦋 Changeset detectedLatest commit: a298e32 The changes in this PR will be included in the next version bump. This PR includes changesets to release 12 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests🌍 Community Worlds (11 failed)mongodb (1 failed):
redis (1 failed):
starter (8 failed):
turso (1 failed):
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
|
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes an issue where Vercel serverless functions would hang indefinitely when users write to streams and release locks without explicitly closing the stream. The solution implements a polling mechanism to detect lock releases and resolve step operations early.
Key Changes:
- Introduced
flushablePipefunction that resolves when either the stream closes naturally OR when the user releases their lock and all pending writes are flushed - Implemented polling functions (
pollReadableLock,pollWritableLock) that check every 100ms if stream locks have been released - Updated stream serialization in both
getExternalReviversandgetStepReviversto use the new flushable pipe mechanism instead of standardpipeTo()
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
packages/core/src/flushable-stream.ts |
New module implementing the flushable stream mechanism with state tracking, lock polling, and custom pipe function |
packages/core/src/flushable-stream.test.ts |
Test suite covering lock release and normal stream closure scenarios |
packages/core/src/serialization.ts |
Updated ReadableStream and WritableStream revivers to use flushable pipe with lock polling |
docs/content/docs/foundations/streaming.mdx |
Enhanced documentation about stream lock contracts and best practices |
.changeset/stream-lock-polling.md |
Changeset describing the patch |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const intervalId = setInterval(() => { | ||
| // Stop polling if already resolved or stream ended | ||
| if (state.doneResolved || state.streamEnded) { | ||
| clearInterval(intervalId); | ||
| return; | ||
| } | ||
|
|
||
| // Check if lock is released (not closed) and no pending ops | ||
| if (isReadableUnlockedNotClosed(readable) && state.pendingOps === 0) { | ||
| state.doneResolved = true; | ||
| state.resolve(); | ||
| clearInterval(intervalId); | ||
| } | ||
| }, LOCK_POLL_INTERVAL_MS); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to pollWritableLock, this interval is never stored or returned, creating potential issues if this function is called multiple times on the same stream. Multiple simultaneous polling operations could race to resolve the same state. Consider returning the intervalId for explicit cleanup or adding protection against concurrent polling.
| const readResult = await reader.read(); | ||
|
|
||
| if (readResult.done) { | ||
| // Source stream completed - close sink and resolve | ||
| state.streamEnded = true; | ||
| await writer.close(); | ||
| // Resolve done promise if not already resolved | ||
| if (!state.doneResolved) { | ||
| state.doneResolved = true; | ||
| state.resolve(); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // Count write as a pending op - this is what we need to flush | ||
| state.pendingOps++; | ||
| try { | ||
| await writer.write(readResult.value); | ||
| } finally { | ||
| state.pendingOps--; | ||
| } |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition here: after reading from the source, the stream could be ended (via error or another path) before the write begins, but state.streamEnded is only checked after the write completes. This means we might attempt to write to a stream that should be terminated. Consider checking state.streamEnded immediately after the read, before incrementing pendingOps and writing.
| * The pump continues running even after `doneResolved=true` to handle | ||
| * any future writes if the user acquires a new lock. |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "The pump continues running even after doneResolved=true to handle any future writes if the user acquires a new lock." However, looking at the flushablePipe implementation, there's no mechanism to actually handle or support re-acquiring locks after the promise resolves. Once doneResolved is true, the polling stops, and if the user were to acquire a new lock and write more data, that data would continue to be pumped through, but there's no way to signal completion again. This comment is misleading - either the implementation should support this pattern, or the comment should be updated to clarify that re-acquiring locks after release is not a supported use case.
| * The pump continues running even after `doneResolved=true` to handle | |
| * any future writes if the user acquires a new lock. | |
| * Once `doneResolved` is set to true, the `done` promise will not resolve | |
| * again. Re-acquiring locks after release is not supported as a way to | |
| * trigger additional completion signaling. |
| @@ -0,0 +1,194 @@ | |||
| import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; | |||
|
|
|||
| /** Polling interval for lock release detection */ | |||
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The polling interval is set to 100ms (LOCK_POLL_INTERVAL_MS), which means there could be up to a 100ms delay between when a user releases a lock and when the system detects it. In serverless environments where execution time costs money, this polling approach could be wasteful. Consider if there's a more event-driven approach, or at least document why polling was chosen over alternatives. Additionally, with many concurrent streams, having multiple setInterval timers could impact performance.
| /** Polling interval for lock release detection */ | |
| /** | |
| * Polling interval (in ms) for lock release detection. | |
| * | |
| * The Web Streams API does not expose an event for "lock released but stream | |
| * still open"; we can only distinguish that state by periodically attempting | |
| * to acquire a reader/writer. For that reason we use polling instead of a | |
| * fully event-driven approach here. | |
| * | |
| * 100ms is a compromise between: | |
| * - Latency: how quickly we notice that the user has released their lock, and | |
| * - Cost/CPU usage: how often timers fire, especially with many concurrent | |
| * streams or in serverless environments where billed time matters. | |
| * | |
| * This value should only be changed with care, as decreasing it will | |
| * increase polling frequency (and thus potential cost), while increasing it | |
| * will add worst-case delay before the `done` promise resolves after a lock | |
| * is released. | |
| */ |
| function isWritableUnlockedNotClosed(writable: WritableStream): boolean { | ||
| if (writable.locked) return false; | ||
|
|
||
| try { | ||
| // Try to acquire writer - if successful, stream is unlocked (not closed) | ||
| const writer = writable.getWriter(); | ||
| writer.releaseLock(); | ||
| return true; | ||
| } catch { | ||
| // getWriter() throws if stream is closed/errored - let pump handle it | ||
| return false; | ||
| } |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function acquires a writer lock temporarily to check if the stream is unlocked vs closed, but if an error occurs during releaseLock() (line 49), the error is silently caught and false is returned. While this is probably the desired behavior, it means the lock might remain acquired if releaseLock throws. Consider being more specific about which errors to catch, or add a comment explaining why errors during releaseLock should be treated as "stream is closed".
| try { | ||
| // Try to acquire reader - if successful, stream is unlocked (not closed) | ||
| const reader = readable.getReader(); | ||
| reader.releaseLock(); | ||
| return true; | ||
| } catch { | ||
| // getReader() throws if stream is closed/errored - let pump handle it | ||
| return false; | ||
| } |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to isWritableUnlockedNotClosed, this function temporarily acquires a reader lock to check stream state. If an error occurs during releaseLock() (line 66), it's silently caught. While probably intentional, this could leave the lock acquired if releaseLock throws. Consider being more explicit about error handling or documenting why all errors should be treated as "stream is closed".
| try { | |
| // Try to acquire reader - if successful, stream is unlocked (not closed) | |
| const reader = readable.getReader(); | |
| reader.releaseLock(); | |
| return true; | |
| } catch { | |
| // getReader() throws if stream is closed/errored - let pump handle it | |
| return false; | |
| } | |
| let reader: ReadableStreamDefaultReader | undefined; | |
| try { | |
| // Try to acquire reader - if successful, stream is unlocked (not closed) | |
| reader = readable.getReader(); | |
| } catch { | |
| // getReader() throws if stream is closed/errored - let pump handle it | |
| return false; | |
| } | |
| try { | |
| reader.releaseLock(); | |
| } catch { | |
| // If releaseLock() throws for any reason, conservatively treat the | |
| // stream as closed/errored so callers don't assume it's safe to use. | |
| // The pump will observe the failure via the stream's end state. | |
| return false; | |
| } | |
| return true; |
| const intervalId = setInterval(() => { | ||
| // Stop polling if already resolved or stream ended | ||
| if (state.doneResolved || state.streamEnded) { | ||
| clearInterval(intervalId); | ||
| return; | ||
| } | ||
|
|
||
| // Check if lock is released (not closed) and no pending ops | ||
| if (isWritableUnlockedNotClosed(writable) && state.pendingOps === 0) { | ||
| state.doneResolved = true; | ||
| state.resolve(); | ||
| clearInterval(intervalId); | ||
| } | ||
| }, LOCK_POLL_INTERVAL_MS); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interval created here is never stored or returned, which means there's no way to explicitly clean it up. While the interval does have cleanup logic inside the callback, there's a potential issue if pollWritableLock is called multiple times on the same stream - this would create multiple intervals that could race to resolve the same state. Consider returning the intervalId so callers can clean it up if needed, or add protection against multiple simultaneous polling operations on the same state.
| describe('flushable stream behavior', () => { | ||
| it('promise should resolve when writable stream lock is released (polling)', async () => { | ||
| // Test the pattern: user writes, releases lock, polling detects it, promise resolves | ||
| const chunks: string[] = []; | ||
| let streamClosed = false; | ||
|
|
||
| // Create a simple mock for the sink | ||
| const mockSink = new WritableStream<string>({ | ||
| write(chunk) { | ||
| chunks.push(chunk); | ||
| }, | ||
| close() { | ||
| streamClosed = true; | ||
| }, | ||
| }); | ||
|
|
||
| // Create a TransformStream like we do in getStepRevivers | ||
| const { readable, writable } = new TransformStream<string, string>(); | ||
| const state = createFlushableState(); | ||
|
|
||
| // Start piping in background | ||
| flushablePipe(readable, mockSink, state).catch(() => { | ||
| // Errors handled via state.reject | ||
| }); | ||
|
|
||
| // Start polling for lock release | ||
| pollWritableLock(writable, state); | ||
|
|
||
| // Simulate user interaction - write and release lock | ||
| const userWriter = writable.getWriter(); | ||
| await userWriter.write('chunk1'); | ||
| await userWriter.write('chunk2'); | ||
|
|
||
| // Release lock without closing stream | ||
| userWriter.releaseLock(); | ||
|
|
||
| // Wait for pipe to process + polling interval | ||
| await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); | ||
|
|
||
| // The promise should resolve | ||
| await expect( | ||
| Promise.race([ | ||
| state.promise, | ||
| new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 400)), | ||
| ]) | ||
| ).resolves.toBeUndefined(); | ||
|
|
||
| // Chunks should have been written | ||
| expect(chunks).toContain('chunk1'); | ||
| expect(chunks).toContain('chunk2'); | ||
|
|
||
| // Stream should NOT be closed (user only released lock) | ||
| expect(streamClosed).toBe(false); | ||
| }); | ||
|
|
||
| it('promise should resolve when writable stream closes naturally', async () => { | ||
| const chunks: string[] = []; | ||
| let streamClosed = false; | ||
|
|
||
| const mockSink = new WritableStream<string>({ | ||
| write(chunk) { | ||
| chunks.push(chunk); | ||
| }, | ||
| close() { | ||
| streamClosed = true; | ||
| }, | ||
| }); | ||
|
|
||
| const { readable, writable } = new TransformStream<string, string>(); | ||
| const state = createFlushableState(); | ||
|
|
||
| // Start piping in background | ||
| flushablePipe(readable, mockSink, state).catch(() => { | ||
| // Errors handled via state.reject | ||
| }); | ||
|
|
||
| // Start polling (won't trigger since stream will close first) | ||
| pollWritableLock(writable, state); | ||
|
|
||
| // User writes and then closes the stream | ||
| const userWriter = writable.getWriter(); | ||
| await userWriter.write('data'); | ||
| await userWriter.close(); | ||
|
|
||
| // Wait a tick for the pipe to process | ||
| await new Promise((r) => setTimeout(r, 50)); | ||
|
|
||
| // The promise should resolve | ||
| await expect( | ||
| Promise.race([ | ||
| state.promise, | ||
| new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 200)), | ||
| ]) | ||
| ).resolves.toBeUndefined(); | ||
|
|
||
| // Chunks should have been written | ||
| expect(chunks).toContain('data'); | ||
|
|
||
| // Stream should be closed (user closed it) | ||
| expect(streamClosed).toBe(true); | ||
| }); | ||
| }); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test coverage is missing for several critical scenarios:
- Error handling during pipe operations (e.g., write failures)
- Testing with pollReadableLock (only pollWritableLock is tested)
- Concurrent operations (multiple reads/writes in flight)
- Edge cases like releasing and re-acquiring locks
- Scenarios where the stream ends while pending operations are in flight
These scenarios are important to verify the robustness of the flushable stream mechanism, especially given the complexity around lock polling and state management.
| When writing to a stream in a step function, there is an important contract to understand: | ||
|
|
||
| <Callout type="warn"> | ||
| **Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation states "Once a lock is released, no further writes to that stream from that step are allowed" and "The framework uses lock release as the signal that the step is done interacting with the stream." However, this behavior contradicts the comment in flushable-stream.ts line 16-17 which states "The pump continues running even after doneResolved=true to handle any future writes if the user acquires a new lock." This is confusing - can users re-acquire locks and write more data after releasing, or not? The documentation and code comments need to be aligned on this contract.
| **Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. | |
| **Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock, and do not rely on re-acquiring a lock on the same stream within the same step after it has been released, even if internal implementation details might technically allow it. |
| if (!state.doneResolved) { | ||
| state.doneResolved = true; | ||
| state.reject(err); | ||
| } |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an error occurs during piping, the error is caught, state.reject(err) is called, and then the error is re-thrown. However, the callers in serialization.ts catch this error with .catch(() => {}) and ignore it, relying on state.reject to propagate the error through the promise. This means the thrown error from line 189 is always silently caught and discarded. Consider either not re-throwing the error (since it's handled via state.reject), or documenting why the error is both rejected through state and re-thrown.
| } | |
| } | |
| // Propagate error through flushablePipe's own promise as well. | |
| // Callers that rely on the FlushableStreamState should use `state.promise`, | |
| // while other callers may depend on this rejection. Some known callers | |
| // explicitly ignore this rejection (`.catch(() => {})`) and rely solely | |
| // on `state.reject(err)` for error handling. |

Fix stream serialization to resolve when users release locks instead of waiting for streams to close, preventing Vercel functions from hanging.
What changed?
flushablePipefunction that resolves in two scenarios:How to test?
Create a workflow step that incrementally writes to a stream:
Verify the step completes immediately after lock release rather than hanging
Run the new test cases that verify both lock release and normal stream closure behaviors
Why make this change?
This fixes an issue where Vercel functions would hang when users incrementally write to streams within steps. Previously, the system would wait for the stream to fully close before resolving, but many users follow a pattern where they write data and release the lock without explicitly closing the stream. This change allows steps to complete as soon as the user releases the lock and all pending writes are flushed, which is the expected behavior in most streaming scenarios.