diff --git a/.changeset/stream-lock-polling.md b/.changeset/stream-lock-polling.md new file mode 100644 index 000000000..da060a111 --- /dev/null +++ b/.changeset/stream-lock-polling.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +--- + +Fix stream serialization to resolve when user releases lock instead of waiting for stream to close. This prevents Vercel functions from hanging when users incrementally write to streams within steps (e.g., `await writer.write(data); writer.releaseLock()`). Uses a polling approach to detect when the stream lock is released and all pending writes are flushed. + diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index fdb684708..23cc74ad8 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -469,16 +469,44 @@ async function uploadResult(stream: ReadableStream) { } ``` -## Best Practices +## Stream Lock Contract -**Release locks properly:** +When writing to a stream in a step function, there is an important contract to understand: + + +**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. + + + +**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. + + +**Correct pattern - complete all writes before releasing:** + +```typescript lineNumbers +async function writeData(items: string[]) { + "use step"; + + const writable = getWritable(); + const writer = writable.getWriter(); + + // Complete ALL writes before releasing the lock + for (const item of items) { + await writer.write(item); + } + + writer.releaseLock(); // Now safe to release +} +``` + +**Use try/finally to ensure the lock is always released:** ```typescript lineNumbers const writer = writable.getWriter(); try { await writer.write(data); } finally { - writer.releaseLock(); // Always release + writer.releaseLock(); // Always release, even on error } ``` @@ -486,10 +514,6 @@ try { Stream locks acquired in a step only apply within that step, not across other steps. This enables multiple writers to write to the same stream concurrently. - -If a lock is not released, the step process cannot terminate. Even though the step returns and the workflow continues, the underlying process will remain active until it times out. - - **Close streams when done:** ```typescript lineNumbers diff --git a/packages/core/src/flushable-stream.test.ts b/packages/core/src/flushable-stream.test.ts new file mode 100644 index 000000000..141607f5f --- /dev/null +++ b/packages/core/src/flushable-stream.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, it } from 'vitest'; +import { + createFlushableState, + flushablePipe, + LOCK_POLL_INTERVAL_MS, + pollWritableLock, +} from './flushable-stream.js'; + +describe('flushable stream behavior', () => { + it('promise should resolve when writable stream lock is released (polling)', async () => { + // Test the pattern: user writes, releases lock, polling detects it, promise resolves + const chunks: string[] = []; + let streamClosed = false; + + // Create a simple mock for the sink + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + close() { + streamClosed = true; + }, + }); + + // Create a TransformStream like we do in getStepRevivers + const { readable, writable } = new TransformStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject + }); + + // Start polling for lock release + pollWritableLock(writable, state); + + // Simulate user interaction - write and release lock + const userWriter = writable.getWriter(); + await userWriter.write('chunk1'); + await userWriter.write('chunk2'); + + // Release lock without closing stream + userWriter.releaseLock(); + + // Wait for pipe to process + polling interval + await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); + + // The promise should resolve + await expect( + Promise.race([ + state.promise, + new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 400)), + ]) + ).resolves.toBeUndefined(); + + // Chunks should have been written + expect(chunks).toContain('chunk1'); + expect(chunks).toContain('chunk2'); + + // Stream should NOT be closed (user only released lock) + expect(streamClosed).toBe(false); + }); + + it('promise should resolve when writable stream closes naturally', async () => { + const chunks: string[] = []; + let streamClosed = false; + + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + close() { + streamClosed = true; + }, + }); + + const { readable, writable } = new TransformStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject + }); + + // Start polling (won't trigger since stream will close first) + pollWritableLock(writable, state); + + // User writes and then closes the stream + const userWriter = writable.getWriter(); + await userWriter.write('data'); + await userWriter.close(); + + // Wait a tick for the pipe to process + await new Promise((r) => setTimeout(r, 50)); + + // The promise should resolve + await expect( + Promise.race([ + state.promise, + new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 200)), + ]) + ).resolves.toBeUndefined(); + + // Chunks should have been written + expect(chunks).toContain('data'); + + // Stream should be closed (user closed it) + expect(streamClosed).toBe(true); + }); +}); diff --git a/packages/core/src/flushable-stream.ts b/packages/core/src/flushable-stream.ts new file mode 100644 index 000000000..1c1d82b87 --- /dev/null +++ b/packages/core/src/flushable-stream.ts @@ -0,0 +1,194 @@ +import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; + +/** Polling interval for lock release detection */ +export const LOCK_POLL_INTERVAL_MS = 100; + +/** + * State tracker for flushable stream operations. + * Resolves when either: + * 1. Stream completes (close/error), OR + * 2. Lock is released AND all pending operations are flushed + * + * Note: `doneResolved` and `streamEnded` are separate: + * - `doneResolved`: The `done` promise has been resolved (step can complete) + * - `streamEnded`: The underlying stream has actually closed/errored + * + * The pump continues running even after `doneResolved=true` to handle + * any future writes if the user acquires a new lock. + */ +export interface FlushableStreamState extends PromiseWithResolvers { + /** Number of write operations currently in flight to the server */ + pendingOps: number; + /** Whether the `done` promise has been resolved */ + doneResolved: boolean; + /** Whether the underlying stream has actually closed/errored */ + streamEnded: boolean; +} + +export function createFlushableState(): FlushableStreamState { + return { + ...withResolvers(), + pendingOps: 0, + doneResolved: false, + streamEnded: false, + }; +} + +/** + * Checks if a WritableStream is unlocked (user released lock) vs closed. + * When a stream is closed, .locked is false but getWriter() throws. + * We only want to resolve via polling when the stream is unlocked, not closed. + * If closed, the pump will handle resolution via the stream ending naturally. + */ +function isWritableUnlockedNotClosed(writable: WritableStream): boolean { + if (writable.locked) return false; + + try { + // Try to acquire writer - if successful, stream is unlocked (not closed) + const writer = writable.getWriter(); + writer.releaseLock(); + return true; + } catch { + // getWriter() throws if stream is closed/errored - let pump handle it + return false; + } +} + +/** + * Checks if a ReadableStream is unlocked (user released lock) vs closed. + */ +function isReadableUnlockedNotClosed(readable: ReadableStream): boolean { + if (readable.locked) return false; + + try { + // Try to acquire reader - if successful, stream is unlocked (not closed) + const reader = readable.getReader(); + reader.releaseLock(); + return true; + } catch { + // getReader() throws if stream is closed/errored - let pump handle it + return false; + } +} + +/** + * Polls a WritableStream to check if the user has released their lock. + * Resolves the done promise when lock is released and no pending ops remain. + * + * Note: Only resolves if stream is unlocked but NOT closed. If the user closes + * the stream, the pump will handle resolution via the stream ending naturally. + */ +export function pollWritableLock( + writable: WritableStream, + state: FlushableStreamState +): void { + const intervalId = setInterval(() => { + // Stop polling if already resolved or stream ended + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + + // Check if lock is released (not closed) and no pending ops + if (isWritableUnlockedNotClosed(writable) && state.pendingOps === 0) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + }, LOCK_POLL_INTERVAL_MS); +} + +/** + * Polls a ReadableStream to check if the user has released their lock. + * Resolves the done promise when lock is released and no pending ops remain. + * + * Note: Only resolves if stream is unlocked but NOT closed. If the user closes + * the stream, the pump will handle resolution via the stream ending naturally. + */ +export function pollReadableLock( + readable: ReadableStream, + state: FlushableStreamState +): void { + const intervalId = setInterval(() => { + // Stop polling if already resolved or stream ended + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + + // Check if lock is released (not closed) and no pending ops + if (isReadableUnlockedNotClosed(readable) && state.pendingOps === 0) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + }, LOCK_POLL_INTERVAL_MS); +} + +/** + * Creates a flushable pipe from a ReadableStream to a WritableStream. + * Unlike pipeTo(), this resolves when: + * 1. The source stream completes (close/error), OR + * 2. The user releases their lock on userStream AND all pending writes are flushed + * + * @param source - The readable stream to read from (e.g., transform's readable) + * @param sink - The writable stream to write to (e.g., server writable) + * @param state - The flushable state tracker + * @returns Promise that resolves when stream ends (not when done promise resolves) + */ +export async function flushablePipe( + source: ReadableStream, + sink: WritableStream, + state: FlushableStreamState +): Promise { + const reader = source.getReader(); + const writer = sink.getWriter(); + + try { + while (true) { + // Check if stream has ended + if (state.streamEnded) { + return; + } + + // Read from source - don't count as pending op since we're just waiting for data + // The important ops are writes to the sink (server) + const readResult = await reader.read(); + + if (readResult.done) { + // Source stream completed - close sink and resolve + state.streamEnded = true; + await writer.close(); + // Resolve done promise if not already resolved + if (!state.doneResolved) { + state.doneResolved = true; + state.resolve(); + } + return; + } + + // Count write as a pending op - this is what we need to flush + state.pendingOps++; + try { + await writer.write(readResult.value); + } finally { + state.pendingOps--; + } + + // Check if stream has ended (e.g., due to error in another path) + if (state.streamEnded) { + return; + } + } + } catch (err) { + state.streamEnded = true; + if (!state.doneResolved) { + state.doneResolved = true; + state.reject(err); + } + throw err; + } finally { + reader.releaseLock(); + writer.releaseLock(); + } +} diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 976f398b0..8d37b97ac 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1,6 +1,12 @@ import { WorkflowRuntimeError } from '@workflow/errors'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; +import { + createFlushableState, + flushablePipe, + pollReadableLock, + pollWritableLock, +} from './flushable-stream.js'; import { getStepFunction } from './private.js'; import { getWorld } from './runtime/world.js'; import { contextStorage } from './step/context-storage.js'; @@ -736,12 +742,38 @@ export function getExternalRevivers( value.startIndex ); if (value.type === 'bytes') { - return readable; + // For byte streams, use flushable pipe with lock polling + const state = createFlushableState(); + ops.push(state.promise); + + // Create an identity transform to give the user a readable + const { readable: userReadable, writable } = + new global.TransformStream(); + + // Start the flushable pipe in the background + flushablePipe(readable, writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(userReadable, state); + + return userReadable; } else { const transform = getDeserializeStream( getExternalRevivers(global, ops, runId) ); - ops.push(readable.pipeTo(transform.writable)); + const state = createFlushableState(); + ops.push(state.promise); + + // Start the flushable pipe in the background + flushablePipe(readable, transform.writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(transform.readable, state); + return transform.readable; } }, @@ -749,11 +781,23 @@ export function getExternalRevivers( const serialize = getSerializeStream( getExternalReducers(global, ops, runId) ); - ops.push( - serialize.readable.pipeTo( - new WorkflowServerWritableStream(value.name, runId) - ) + const serverWritable = new WorkflowServerWritableStream( + value.name, + runId ); + + // Create flushable state for this stream + const state = createFlushableState(); + ops.push(state.promise); + + // Start the flushable pipe in the background + flushablePipe(serialize.readable, serverWritable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollWritableLock(serialize.writable, state); + return serialize.writable; }, }; @@ -880,12 +924,38 @@ function getStepRevivers( const readable = new WorkflowServerReadableStream(value.name); if (value.type === 'bytes') { - return readable; + // For byte streams, use flushable pipe with lock polling + const state = createFlushableState(); + ops.push(state.promise); + + // Create an identity transform to give the user a readable + const { readable: userReadable, writable } = + new global.TransformStream(); + + // Start the flushable pipe in the background + flushablePipe(readable, writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(userReadable, state); + + return userReadable; } else { const transform = getDeserializeStream( getStepRevivers(global, ops, runId) ); - ops.push(readable.pipeTo(transform.writable)); + const state = createFlushableState(); + ops.push(state.promise); + + // Start the flushable pipe in the background + flushablePipe(readable, transform.writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(transform.readable, state); + return transform.readable; } }, @@ -897,11 +967,23 @@ function getStepRevivers( } const serialize = getSerializeStream(getStepReducers(global, ops, runId)); - ops.push( - serialize.readable.pipeTo( - new WorkflowServerWritableStream(value.name, runId) - ) + const serverWritable = new WorkflowServerWritableStream( + value.name, + runId ); + + // Create flushable state for this stream + const state = createFlushableState(); + ops.push(state.promise); + + // Start the flushable pipe in the background + flushablePipe(serialize.readable, serverWritable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollWritableLock(serialize.writable, state); + return serialize.writable; }, };