From e8c38003138c446b343971fe388d36e6cc311b12 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Sat, 20 Dec 2025 01:04:13 -0800 Subject: [PATCH 1/6] Fix stream serialization to resolve when user releases lock instead of waiting for stream to close This prevents Vercel functions from hanging when users incrementally write to streams within steps (e.g., `await writer.write(data); writer.releaseLock()`). Uses a polling approach to detect when the stream lock is released and all pending writes are flushed. --- .changeset/stream-lock-polling.md | 6 + packages/core/src/serialization.test.ts | 230 ++++++++++++++++++ packages/core/src/serialization.ts | 306 +++++++++++++++++++++++- 3 files changed, 530 insertions(+), 12 deletions(-) create mode 100644 .changeset/stream-lock-polling.md diff --git a/.changeset/stream-lock-polling.md b/.changeset/stream-lock-polling.md new file mode 100644 index 000000000..da060a111 --- /dev/null +++ b/.changeset/stream-lock-polling.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +--- + +Fix stream serialization to resolve when user releases lock instead of waiting for stream to close. This prevents Vercel functions from hanging when users incrementally write to streams within steps (e.g., `await writer.write(data); writer.releaseLock()`). Uses a polling approach to detect when the stream lock is released and all pending writes are flushed. + diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 2f55fbff3..1e9003421 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -997,3 +997,233 @@ describe('step function serialization', () => { expect(result).toEqual({ stepId: stepName }); }); }); + +describe('flushable stream behavior', () => { + const POLL_INTERVAL = 100; // Match the actual implementation + const STABLE_POLL_COUNT = 2; // Match the actual implementation + + it('done promise should resolve when writable stream lock is released (polling)', async () => { + // Test the pattern: user writes, releases lock, polling detects it, done resolves + const chunks: string[] = []; + let streamClosed = false; + + // Create a simple mock for the sink + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + close() { + streamClosed = true; + }, + }); + + // Create a TransformStream like we do in getStepRevivers + const { readable, writable } = new TransformStream(); + + // Track flushable state - this mirrors the actual implementation + const state = { + pendingOps: 0, // Only counts writes to server + doneResolved: false, + streamEnded: false, + resolve: () => {}, + reject: (_err: Error) => {}, + }; + + const done = new Promise((res, rej) => { + state.resolve = res; + state.reject = rej; + }); + + // Start piping in background (mirrors flushablePipe implementation) + (async () => { + const reader = readable.getReader(); + const writer = mockSink.getWriter(); + try { + while (!state.streamEnded) { + const result = await reader.read(); + + if (result.done) { + state.streamEnded = true; + await writer.close(); + if (!state.doneResolved) { + state.doneResolved = true; + state.resolve(); + } + return; + } + + // Only writes count as pending ops + state.pendingOps++; + await writer.write(result.value); + state.pendingOps--; + + if (state.streamEnded) { + reader.releaseLock(); + writer.releaseLock(); + return; + } + } + } catch (err) { + state.streamEnded = true; + if (!state.doneResolved) { + state.doneResolved = true; + state.reject(err as Error); + } + } + })(); + + // Start polling (mirrors pollWritableLock implementation) + let stableCount = 0; + const intervalId = setInterval(() => { + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + + // Check if lock is released by checking .locked property + if (!writable.locked && state.pendingOps === 0) { + stableCount++; + if (stableCount >= STABLE_POLL_COUNT) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + } else { + stableCount = 0; + } + }, POLL_INTERVAL); + + // Simulate user interaction - write and release lock + const userWriter = writable.getWriter(); + await userWriter.write('chunk1'); + await userWriter.write('chunk2'); + + // Release lock without closing stream + userWriter.releaseLock(); + + // Wait for pipe to process + polling intervals (need STABLE_POLL_COUNT consecutive polls) + await new Promise((r) => setTimeout(r, 250)); + + // The done promise should resolve + await expect( + Promise.race([ + done, + new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 400)), + ]) + ).resolves.toBeUndefined(); + + // Chunks should have been written + expect(chunks).toContain('chunk1'); + expect(chunks).toContain('chunk2'); + + // Stream should NOT be closed (user only released lock) + expect(streamClosed).toBe(false); + }); + + it('done promise should resolve when writable stream closes naturally', async () => { + const chunks: string[] = []; + let streamClosed = false; + + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + close() { + streamClosed = true; + }, + }); + + const { readable, writable } = new TransformStream(); + + const state = { + pendingOps: 0, + doneResolved: false, + streamEnded: false, + resolve: () => {}, + reject: (_err: Error) => {}, + }; + + const done = new Promise((res, rej) => { + state.resolve = res; + state.reject = rej; + }); + + // Start piping in background + (async () => { + const reader = readable.getReader(); + const writer = mockSink.getWriter(); + try { + while (!state.streamEnded) { + const result = await reader.read(); + + if (result.done) { + state.streamEnded = true; + await writer.close(); + if (!state.doneResolved) { + state.doneResolved = true; + state.resolve(); + } + return; + } + + state.pendingOps++; + await writer.write(result.value); + state.pendingOps--; + + if (state.streamEnded) { + reader.releaseLock(); + writer.releaseLock(); + return; + } + } + } catch (err) { + state.streamEnded = true; + if (!state.doneResolved) { + state.doneResolved = true; + state.reject(err as Error); + } + } + })(); + + // Start polling (won't trigger since stream will close first) + let stableCount2 = 0; + const intervalId = setInterval(() => { + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + if (!writable.locked && state.pendingOps === 0) { + stableCount2++; + if (stableCount2 >= STABLE_POLL_COUNT) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + } else { + stableCount2 = 0; + } + }, POLL_INTERVAL); + + // User writes and then closes the stream + const userWriter = writable.getWriter(); + await userWriter.write('data'); + await userWriter.close(); + + // Wait a tick for the pipe to process + await new Promise((r) => setTimeout(r, 50)); + + // The done promise should resolve + await expect( + Promise.race([ + done, + new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 200)), + ]) + ).resolves.toBeUndefined(); + + // Chunks should have been written + expect(chunks).toContain('data'); + + // Stream should be closed (user closed it) + expect(streamClosed).toBe(true); + }); +}); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 976f398b0..69c391b6d 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1,4 +1,5 @@ import { WorkflowRuntimeError } from '@workflow/errors'; +import { withResolvers } from '@workflow/utils'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; import { getStepFunction } from './private.js'; @@ -178,6 +179,211 @@ export class WorkflowServerWritableStream extends WritableStream { } } +/** Polling interval for checking stream lock state (in milliseconds) */ +const LOCK_POLL_INTERVAL_MS = 100; + +/** + * State tracker for flushable stream operations. + * Resolves when either: + * 1. Stream completes (close/error), OR + * 2. Lock is released AND all pending operations are flushed + * + * Note: `doneResolved` and `streamEnded` are separate: + * - `doneResolved`: The `done` promise has been resolved (step can complete) + * - `streamEnded`: The underlying stream has actually closed/errored + * + * The pump continues running even after `doneResolved=true` to handle + * any future writes if the user acquires a new lock. + */ +interface FlushableStreamState { + /** Number of write operations currently in flight to the server */ + pendingOps: number; + /** Whether the `done` promise has been resolved */ + doneResolved: boolean; + /** Whether the underlying stream has actually closed/errored */ + streamEnded: boolean; + resolve: () => void; + reject: (err: any) => void; +} + +function createFlushableState(): { + state: FlushableStreamState; + done: Promise; +} { + const { promise, resolve, reject } = withResolvers(); + + const state: FlushableStreamState = { + pendingOps: 0, + doneResolved: false, + streamEnded: false, + resolve, + reject, + }; + + return { state, done: promise }; +} + +/** + * Checks if a WritableStream is unlocked (user released lock) vs closed. + * When a stream is closed, .locked is false but getWriter() throws. + * We only want to resolve via polling when the stream is unlocked, not closed. + * If closed, the pump will handle resolution via the stream ending naturally. + */ +function isWritableUnlockedNotClosed(writable: WritableStream): boolean { + if (writable.locked) return false; + + try { + // Try to acquire writer - if successful, stream is unlocked (not closed) + const writer = writable.getWriter(); + writer.releaseLock(); + return true; + } catch { + // getWriter() throws if stream is closed/errored - let pump handle it + return false; + } +} + +/** + * Checks if a ReadableStream is unlocked (user released lock) vs closed. + */ +function isReadableUnlockedNotClosed(readable: ReadableStream): boolean { + if (readable.locked) return false; + + try { + // Try to acquire reader - if successful, stream is unlocked (not closed) + const reader = readable.getReader(); + reader.releaseLock(); + return true; + } catch { + // getReader() throws if stream is closed/errored - let pump handle it + return false; + } +} + +/** + * Polls a WritableStream to check if the user has released their lock. + * Resolves the done promise when lock is released and no pending ops remain + * for multiple consecutive polls (to avoid race conditions with the pump). + * + * Note: Only resolves if stream is unlocked but NOT closed. If the user closes + * the stream, the pump will handle resolution via the stream ending naturally. + */ +function pollWritableLock( + writable: WritableStream, + state: FlushableStreamState +): void { + const intervalId = setInterval(() => { + // Stop polling if already resolved or stream ended + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + + // Check if lock is released (not closed) and no pending ops + if (isWritableUnlockedNotClosed(writable) && state.pendingOps === 0) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + }, LOCK_POLL_INTERVAL_MS); +} + +/** + * Polls a ReadableStream to check if the user has released their lock. + * Resolves the done promise when lock is released and no pending ops remain. + * + * Note: Only resolves if stream is unlocked but NOT closed. If the user closes + * the stream, the pump will handle resolution via the stream ending naturally. + */ +function pollReadableLock( + readable: ReadableStream, + state: FlushableStreamState +): void { + const intervalId = setInterval(() => { + // Stop polling if already resolved or stream ended + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + + // Check if lock is released (not closed) and no pending ops + if (isReadableUnlockedNotClosed(readable) && state.pendingOps === 0) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + }, LOCK_POLL_INTERVAL_MS); +} + +/** + * Creates a flushable pipe from a ReadableStream to a WritableStream. + * Unlike pipeTo(), this resolves when: + * 1. The source stream completes (close/error), OR + * 2. The user releases their lock on userStream AND all pending writes are flushed + * + * @param source - The readable stream to read from (e.g., transform's readable) + * @param sink - The writable stream to write to (e.g., server writable) + * @param state - The flushable state tracker + * @returns Promise that resolves when stream ends (not when done promise resolves) + */ +async function flushablePipe( + source: ReadableStream, + sink: WritableStream, + state: FlushableStreamState +): Promise { + const reader = source.getReader(); + const writer = sink.getWriter(); + + try { + while (true) { + // Check if stream has ended + if (state.streamEnded) { + reader.releaseLock(); + writer.releaseLock(); + return; + } + + // Read from source - don't count as pending op since we're just waiting for data + // The important ops are writes to the sink (server) + const readResult = await reader.read(); + + if (readResult.done) { + // Source stream completed - close sink and resolve + state.streamEnded = true; + await writer.close(); + // Resolve done promise if not already resolved + if (!state.doneResolved) { + state.doneResolved = true; + state.resolve(); + } + return; + } + + // Count write as a pending op - this is what we need to flush + state.pendingOps++; + try { + await writer.write(readResult.value); + } finally { + state.pendingOps--; + } + + // Check if stream has ended (e.g., due to error in another path) + if (state.streamEnded) { + reader.releaseLock(); + writer.releaseLock(); + return; + } + } + } catch (err) { + state.streamEnded = true; + if (!state.doneResolved) { + state.doneResolved = true; + state.reject(err); + } + throw err; + } +} + // Types that need specialized handling when serialized/deserialized // ! If a type is added here, it MUST also be added to the `Serializable` type in `schemas.ts` export interface SerializableSpecial { @@ -736,12 +942,38 @@ export function getExternalRevivers( value.startIndex ); if (value.type === 'bytes') { - return readable; + // For byte streams, use flushable pipe with lock polling + const { state, done } = createFlushableState(); + ops.push(done); + + // Create an identity transform to give the user a readable + const { readable: userReadable, writable } = + new global.TransformStream(); + + // Start the flushable pipe in the background + flushablePipe(readable, writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(userReadable, state); + + return userReadable; } else { const transform = getDeserializeStream( getExternalRevivers(global, ops, runId) ); - ops.push(readable.pipeTo(transform.writable)); + const { state, done } = createFlushableState(); + ops.push(done); + + // Start the flushable pipe in the background + flushablePipe(readable, transform.writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(transform.readable, state); + return transform.readable; } }, @@ -749,11 +981,23 @@ export function getExternalRevivers( const serialize = getSerializeStream( getExternalReducers(global, ops, runId) ); - ops.push( - serialize.readable.pipeTo( - new WorkflowServerWritableStream(value.name, runId) - ) + const serverWritable = new WorkflowServerWritableStream( + value.name, + runId ); + + // Create flushable state for this stream + const { state, done } = createFlushableState(); + ops.push(done); + + // Start the flushable pipe in the background + flushablePipe(serialize.readable, serverWritable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollWritableLock(serialize.writable, state); + return serialize.writable; }, }; @@ -880,12 +1124,38 @@ function getStepRevivers( const readable = new WorkflowServerReadableStream(value.name); if (value.type === 'bytes') { - return readable; + // For byte streams, use flushable pipe with lock polling + const { state, done } = createFlushableState(); + ops.push(done); + + // Create an identity transform to give the user a readable + const { readable: userReadable, writable } = + new global.TransformStream(); + + // Start the flushable pipe in the background + flushablePipe(readable, writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(userReadable, state); + + return userReadable; } else { const transform = getDeserializeStream( getStepRevivers(global, ops, runId) ); - ops.push(readable.pipeTo(transform.writable)); + const { state, done } = createFlushableState(); + ops.push(done); + + // Start the flushable pipe in the background + flushablePipe(readable, transform.writable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollReadableLock(transform.readable, state); + return transform.readable; } }, @@ -897,11 +1167,23 @@ function getStepRevivers( } const serialize = getSerializeStream(getStepReducers(global, ops, runId)); - ops.push( - serialize.readable.pipeTo( - new WorkflowServerWritableStream(value.name, runId) - ) + const serverWritable = new WorkflowServerWritableStream( + value.name, + runId ); + + // Create flushable state for this stream + const { state, done } = createFlushableState(); + ops.push(done); + + // Start the flushable pipe in the background + flushablePipe(serialize.readable, serverWritable, state).catch(() => { + // Errors are handled via state.reject + }); + + // Start polling to detect when user releases lock + pollWritableLock(serialize.writable, state); + return serialize.writable; }, }; From 579162a70fd89f5f9edba8d710e5c79cfa401cdd Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 22 Dec 2025 11:21:50 -0800 Subject: [PATCH 2/6] docs --- docs/content/docs/foundations/streaming.mdx | 38 +++++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index fdb684708..23cc74ad8 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -469,16 +469,44 @@ async function uploadResult(stream: ReadableStream) { } ``` -## Best Practices +## Stream Lock Contract -**Release locks properly:** +When writing to a stream in a step function, there is an important contract to understand: + + +**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. + + + +**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. + + +**Correct pattern - complete all writes before releasing:** + +```typescript lineNumbers +async function writeData(items: string[]) { + "use step"; + + const writable = getWritable(); + const writer = writable.getWriter(); + + // Complete ALL writes before releasing the lock + for (const item of items) { + await writer.write(item); + } + + writer.releaseLock(); // Now safe to release +} +``` + +**Use try/finally to ensure the lock is always released:** ```typescript lineNumbers const writer = writable.getWriter(); try { await writer.write(data); } finally { - writer.releaseLock(); // Always release + writer.releaseLock(); // Always release, even on error } ``` @@ -486,10 +514,6 @@ try { Stream locks acquired in a step only apply within that step, not across other steps. This enables multiple writers to write to the same stream concurrently. - -If a lock is not released, the step process cannot terminate. Even though the step returns and the workflow continues, the underlying process will remain active until it times out. - - **Close streams when done:** ```typescript lineNumbers From 5a1fbf343c7cfc7140b9b260e66521c1102ea998 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 22 Dec 2025 12:40:20 -0800 Subject: [PATCH 3/6] . --- packages/core/src/serialization.test.ts | 157 +++--------------------- packages/core/src/serialization.ts | 16 +-- 2 files changed, 25 insertions(+), 148 deletions(-) diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 1e9003421..4764406c8 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -3,15 +3,19 @@ import type { WorkflowRuntimeError } from '@workflow/errors'; import { describe, expect, it } from 'vitest'; import { getStepFunction, registerStepFunction } from './private.js'; import { + createFlushableState, dehydrateStepArguments, dehydrateStepReturnValue, dehydrateWorkflowArguments, dehydrateWorkflowReturnValue, + flushablePipe, getCommonRevivers, getStreamType, getWorkflowReducers, hydrateStepArguments, hydrateWorkflowArguments, + LOCK_POLL_INTERVAL_MS, + pollWritableLock, } from './serialization.js'; import { STABLE_ULID, STREAM_NAME_SYMBOL } from './symbols.js'; import { createContext } from './vm/index.js'; @@ -999,9 +1003,6 @@ describe('step function serialization', () => { }); describe('flushable stream behavior', () => { - const POLL_INTERVAL = 100; // Match the actual implementation - const STABLE_POLL_COUNT = 2; // Match the actual implementation - it('done promise should resolve when writable stream lock is released (polling)', async () => { // Test the pattern: user writes, releases lock, polling detects it, done resolves const chunks: string[] = []; @@ -1019,79 +1020,15 @@ describe('flushable stream behavior', () => { // Create a TransformStream like we do in getStepRevivers const { readable, writable } = new TransformStream(); + const { state, done } = createFlushableState(); - // Track flushable state - this mirrors the actual implementation - const state = { - pendingOps: 0, // Only counts writes to server - doneResolved: false, - streamEnded: false, - resolve: () => {}, - reject: (_err: Error) => {}, - }; - - const done = new Promise((res, rej) => { - state.resolve = res; - state.reject = rej; + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject }); - // Start piping in background (mirrors flushablePipe implementation) - (async () => { - const reader = readable.getReader(); - const writer = mockSink.getWriter(); - try { - while (!state.streamEnded) { - const result = await reader.read(); - - if (result.done) { - state.streamEnded = true; - await writer.close(); - if (!state.doneResolved) { - state.doneResolved = true; - state.resolve(); - } - return; - } - - // Only writes count as pending ops - state.pendingOps++; - await writer.write(result.value); - state.pendingOps--; - - if (state.streamEnded) { - reader.releaseLock(); - writer.releaseLock(); - return; - } - } - } catch (err) { - state.streamEnded = true; - if (!state.doneResolved) { - state.doneResolved = true; - state.reject(err as Error); - } - } - })(); - - // Start polling (mirrors pollWritableLock implementation) - let stableCount = 0; - const intervalId = setInterval(() => { - if (state.doneResolved || state.streamEnded) { - clearInterval(intervalId); - return; - } - - // Check if lock is released by checking .locked property - if (!writable.locked && state.pendingOps === 0) { - stableCount++; - if (stableCount >= STABLE_POLL_COUNT) { - state.doneResolved = true; - state.resolve(); - clearInterval(intervalId); - } - } else { - stableCount = 0; - } - }, POLL_INTERVAL); + // Start polling for lock release + pollWritableLock(writable, state); // Simulate user interaction - write and release lock const userWriter = writable.getWriter(); @@ -1101,8 +1038,8 @@ describe('flushable stream behavior', () => { // Release lock without closing stream userWriter.releaseLock(); - // Wait for pipe to process + polling intervals (need STABLE_POLL_COUNT consecutive polls) - await new Promise((r) => setTimeout(r, 250)); + // Wait for pipe to process + polling interval + await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); // The done promise should resolve await expect( @@ -1134,75 +1071,15 @@ describe('flushable stream behavior', () => { }); const { readable, writable } = new TransformStream(); - - const state = { - pendingOps: 0, - doneResolved: false, - streamEnded: false, - resolve: () => {}, - reject: (_err: Error) => {}, - }; - - const done = new Promise((res, rej) => { - state.resolve = res; - state.reject = rej; - }); + const { state, done } = createFlushableState(); // Start piping in background - (async () => { - const reader = readable.getReader(); - const writer = mockSink.getWriter(); - try { - while (!state.streamEnded) { - const result = await reader.read(); - - if (result.done) { - state.streamEnded = true; - await writer.close(); - if (!state.doneResolved) { - state.doneResolved = true; - state.resolve(); - } - return; - } - - state.pendingOps++; - await writer.write(result.value); - state.pendingOps--; - - if (state.streamEnded) { - reader.releaseLock(); - writer.releaseLock(); - return; - } - } - } catch (err) { - state.streamEnded = true; - if (!state.doneResolved) { - state.doneResolved = true; - state.reject(err as Error); - } - } - })(); + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject + }); // Start polling (won't trigger since stream will close first) - let stableCount2 = 0; - const intervalId = setInterval(() => { - if (state.doneResolved || state.streamEnded) { - clearInterval(intervalId); - return; - } - if (!writable.locked && state.pendingOps === 0) { - stableCount2++; - if (stableCount2 >= STABLE_POLL_COUNT) { - state.doneResolved = true; - state.resolve(); - clearInterval(intervalId); - } - } else { - stableCount2 = 0; - } - }, POLL_INTERVAL); + pollWritableLock(writable, state); // User writes and then closes the stream const userWriter = writable.getWriter(); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 69c391b6d..677c77d5e 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -180,7 +180,8 @@ export class WorkflowServerWritableStream extends WritableStream { } /** Polling interval for checking stream lock state (in milliseconds) */ -const LOCK_POLL_INTERVAL_MS = 100; +/** Polling interval for lock release detection */ +export const LOCK_POLL_INTERVAL_MS = 100; /** * State tracker for flushable stream operations. @@ -195,7 +196,7 @@ const LOCK_POLL_INTERVAL_MS = 100; * The pump continues running even after `doneResolved=true` to handle * any future writes if the user acquires a new lock. */ -interface FlushableStreamState { +export interface FlushableStreamState { /** Number of write operations currently in flight to the server */ pendingOps: number; /** Whether the `done` promise has been resolved */ @@ -206,7 +207,7 @@ interface FlushableStreamState { reject: (err: any) => void; } -function createFlushableState(): { +export function createFlushableState(): { state: FlushableStreamState; done: Promise; } { @@ -262,13 +263,12 @@ function isReadableUnlockedNotClosed(readable: ReadableStream): boolean { /** * Polls a WritableStream to check if the user has released their lock. - * Resolves the done promise when lock is released and no pending ops remain - * for multiple consecutive polls (to avoid race conditions with the pump). + * Resolves the done promise when lock is released and no pending ops remain. * * Note: Only resolves if stream is unlocked but NOT closed. If the user closes * the stream, the pump will handle resolution via the stream ending naturally. */ -function pollWritableLock( +export function pollWritableLock( writable: WritableStream, state: FlushableStreamState ): void { @@ -295,7 +295,7 @@ function pollWritableLock( * Note: Only resolves if stream is unlocked but NOT closed. If the user closes * the stream, the pump will handle resolution via the stream ending naturally. */ -function pollReadableLock( +export function pollReadableLock( readable: ReadableStream, state: FlushableStreamState ): void { @@ -326,7 +326,7 @@ function pollReadableLock( * @param state - The flushable state tracker * @returns Promise that resolves when stream ends (not when done promise resolves) */ -async function flushablePipe( +export async function flushablePipe( source: ReadableStream, sink: WritableStream, state: FlushableStreamState From 9f874fea5fb5f76e9d17cb64510fb895c6c36244 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 22 Dec 2025 13:33:37 -0800 Subject: [PATCH 4/6] . --- packages/core/src/serialization.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 677c77d5e..2aea5d589 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -338,8 +338,6 @@ export async function flushablePipe( while (true) { // Check if stream has ended if (state.streamEnded) { - reader.releaseLock(); - writer.releaseLock(); return; } @@ -369,8 +367,6 @@ export async function flushablePipe( // Check if stream has ended (e.g., due to error in another path) if (state.streamEnded) { - reader.releaseLock(); - writer.releaseLock(); return; } } @@ -381,6 +377,9 @@ export async function flushablePipe( state.reject(err); } throw err; + } finally { + reader.releaseLock(); + writer.releaseLock(); } } From 819ed35f44ea388f1348aded2fce97cc023a2196 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 22 Dec 2025 13:47:43 -0800 Subject: [PATCH 5/6] . --- packages/core/src/serialization.test.ts | 12 +++---- packages/core/src/serialization.ts | 44 ++++++++++--------------- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 4764406c8..1a984ba5c 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -1020,7 +1020,7 @@ describe('flushable stream behavior', () => { // Create a TransformStream like we do in getStepRevivers const { readable, writable } = new TransformStream(); - const { state, done } = createFlushableState(); + const state = createFlushableState(); // Start piping in background flushablePipe(readable, mockSink, state).catch(() => { @@ -1041,10 +1041,10 @@ describe('flushable stream behavior', () => { // Wait for pipe to process + polling interval await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); - // The done promise should resolve + // The promise should resolve await expect( Promise.race([ - done, + state.promise, new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 400)), ]) ).resolves.toBeUndefined(); @@ -1071,7 +1071,7 @@ describe('flushable stream behavior', () => { }); const { readable, writable } = new TransformStream(); - const { state, done } = createFlushableState(); + const state = createFlushableState(); // Start piping in background flushablePipe(readable, mockSink, state).catch(() => { @@ -1089,10 +1089,10 @@ describe('flushable stream behavior', () => { // Wait a tick for the pipe to process await new Promise((r) => setTimeout(r, 50)); - // The done promise should resolve + // The promise should resolve await expect( Promise.race([ - done, + state.promise, new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 200)), ]) ).resolves.toBeUndefined(); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 2aea5d589..677b3a34a 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1,5 +1,5 @@ import { WorkflowRuntimeError } from '@workflow/errors'; -import { withResolvers } from '@workflow/utils'; +import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; import { getStepFunction } from './private.js'; @@ -196,32 +196,22 @@ export const LOCK_POLL_INTERVAL_MS = 100; * The pump continues running even after `doneResolved=true` to handle * any future writes if the user acquires a new lock. */ -export interface FlushableStreamState { +export interface FlushableStreamState extends PromiseWithResolvers { /** Number of write operations currently in flight to the server */ pendingOps: number; /** Whether the `done` promise has been resolved */ doneResolved: boolean; /** Whether the underlying stream has actually closed/errored */ streamEnded: boolean; - resolve: () => void; - reject: (err: any) => void; } -export function createFlushableState(): { - state: FlushableStreamState; - done: Promise; -} { - const { promise, resolve, reject } = withResolvers(); - - const state: FlushableStreamState = { +export function createFlushableState(): FlushableStreamState { + return { + ...withResolvers(), pendingOps: 0, doneResolved: false, streamEnded: false, - resolve, - reject, }; - - return { state, done: promise }; } /** @@ -942,8 +932,8 @@ export function getExternalRevivers( ); if (value.type === 'bytes') { // For byte streams, use flushable pipe with lock polling - const { state, done } = createFlushableState(); - ops.push(done); + const state = createFlushableState(); + ops.push(state.promise); // Create an identity transform to give the user a readable const { readable: userReadable, writable } = @@ -962,8 +952,8 @@ export function getExternalRevivers( const transform = getDeserializeStream( getExternalRevivers(global, ops, runId) ); - const { state, done } = createFlushableState(); - ops.push(done); + const state = createFlushableState(); + ops.push(state.promise); // Start the flushable pipe in the background flushablePipe(readable, transform.writable, state).catch(() => { @@ -986,8 +976,8 @@ export function getExternalRevivers( ); // Create flushable state for this stream - const { state, done } = createFlushableState(); - ops.push(done); + const state = createFlushableState(); + ops.push(state.promise); // Start the flushable pipe in the background flushablePipe(serialize.readable, serverWritable, state).catch(() => { @@ -1124,8 +1114,8 @@ function getStepRevivers( const readable = new WorkflowServerReadableStream(value.name); if (value.type === 'bytes') { // For byte streams, use flushable pipe with lock polling - const { state, done } = createFlushableState(); - ops.push(done); + const state = createFlushableState(); + ops.push(state.promise); // Create an identity transform to give the user a readable const { readable: userReadable, writable } = @@ -1144,8 +1134,8 @@ function getStepRevivers( const transform = getDeserializeStream( getStepRevivers(global, ops, runId) ); - const { state, done } = createFlushableState(); - ops.push(done); + const state = createFlushableState(); + ops.push(state.promise); // Start the flushable pipe in the background flushablePipe(readable, transform.writable, state).catch(() => { @@ -1172,8 +1162,8 @@ function getStepRevivers( ); // Create flushable state for this stream - const { state, done } = createFlushableState(); - ops.push(done); + const state = createFlushableState(); + ops.push(state.promise); // Start the flushable pipe in the background flushablePipe(serialize.readable, serverWritable, state).catch(() => { From a298e32b0d8bdc87a571618ab6242a3b545f74be Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 22 Dec 2025 15:46:29 -0800 Subject: [PATCH 6/6] . --- packages/core/src/flushable-stream.test.ts | 110 +++++++++++ packages/core/src/flushable-stream.ts | 194 ++++++++++++++++++++ packages/core/src/serialization.test.ts | 107 ----------- packages/core/src/serialization.ts | 201 +-------------------- 4 files changed, 310 insertions(+), 302 deletions(-) create mode 100644 packages/core/src/flushable-stream.test.ts create mode 100644 packages/core/src/flushable-stream.ts diff --git a/packages/core/src/flushable-stream.test.ts b/packages/core/src/flushable-stream.test.ts new file mode 100644 index 000000000..141607f5f --- /dev/null +++ b/packages/core/src/flushable-stream.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, it } from 'vitest'; +import { + createFlushableState, + flushablePipe, + LOCK_POLL_INTERVAL_MS, + pollWritableLock, +} from './flushable-stream.js'; + +describe('flushable stream behavior', () => { + it('promise should resolve when writable stream lock is released (polling)', async () => { + // Test the pattern: user writes, releases lock, polling detects it, promise resolves + const chunks: string[] = []; + let streamClosed = false; + + // Create a simple mock for the sink + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + close() { + streamClosed = true; + }, + }); + + // Create a TransformStream like we do in getStepRevivers + const { readable, writable } = new TransformStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject + }); + + // Start polling for lock release + pollWritableLock(writable, state); + + // Simulate user interaction - write and release lock + const userWriter = writable.getWriter(); + await userWriter.write('chunk1'); + await userWriter.write('chunk2'); + + // Release lock without closing stream + userWriter.releaseLock(); + + // Wait for pipe to process + polling interval + await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); + + // The promise should resolve + await expect( + Promise.race([ + state.promise, + new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 400)), + ]) + ).resolves.toBeUndefined(); + + // Chunks should have been written + expect(chunks).toContain('chunk1'); + expect(chunks).toContain('chunk2'); + + // Stream should NOT be closed (user only released lock) + expect(streamClosed).toBe(false); + }); + + it('promise should resolve when writable stream closes naturally', async () => { + const chunks: string[] = []; + let streamClosed = false; + + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + close() { + streamClosed = true; + }, + }); + + const { readable, writable } = new TransformStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject + }); + + // Start polling (won't trigger since stream will close first) + pollWritableLock(writable, state); + + // User writes and then closes the stream + const userWriter = writable.getWriter(); + await userWriter.write('data'); + await userWriter.close(); + + // Wait a tick for the pipe to process + await new Promise((r) => setTimeout(r, 50)); + + // The promise should resolve + await expect( + Promise.race([ + state.promise, + new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 200)), + ]) + ).resolves.toBeUndefined(); + + // Chunks should have been written + expect(chunks).toContain('data'); + + // Stream should be closed (user closed it) + expect(streamClosed).toBe(true); + }); +}); diff --git a/packages/core/src/flushable-stream.ts b/packages/core/src/flushable-stream.ts new file mode 100644 index 000000000..1c1d82b87 --- /dev/null +++ b/packages/core/src/flushable-stream.ts @@ -0,0 +1,194 @@ +import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; + +/** Polling interval for lock release detection */ +export const LOCK_POLL_INTERVAL_MS = 100; + +/** + * State tracker for flushable stream operations. + * Resolves when either: + * 1. Stream completes (close/error), OR + * 2. Lock is released AND all pending operations are flushed + * + * Note: `doneResolved` and `streamEnded` are separate: + * - `doneResolved`: The `done` promise has been resolved (step can complete) + * - `streamEnded`: The underlying stream has actually closed/errored + * + * The pump continues running even after `doneResolved=true` to handle + * any future writes if the user acquires a new lock. + */ +export interface FlushableStreamState extends PromiseWithResolvers { + /** Number of write operations currently in flight to the server */ + pendingOps: number; + /** Whether the `done` promise has been resolved */ + doneResolved: boolean; + /** Whether the underlying stream has actually closed/errored */ + streamEnded: boolean; +} + +export function createFlushableState(): FlushableStreamState { + return { + ...withResolvers(), + pendingOps: 0, + doneResolved: false, + streamEnded: false, + }; +} + +/** + * Checks if a WritableStream is unlocked (user released lock) vs closed. + * When a stream is closed, .locked is false but getWriter() throws. + * We only want to resolve via polling when the stream is unlocked, not closed. + * If closed, the pump will handle resolution via the stream ending naturally. + */ +function isWritableUnlockedNotClosed(writable: WritableStream): boolean { + if (writable.locked) return false; + + try { + // Try to acquire writer - if successful, stream is unlocked (not closed) + const writer = writable.getWriter(); + writer.releaseLock(); + return true; + } catch { + // getWriter() throws if stream is closed/errored - let pump handle it + return false; + } +} + +/** + * Checks if a ReadableStream is unlocked (user released lock) vs closed. + */ +function isReadableUnlockedNotClosed(readable: ReadableStream): boolean { + if (readable.locked) return false; + + try { + // Try to acquire reader - if successful, stream is unlocked (not closed) + const reader = readable.getReader(); + reader.releaseLock(); + return true; + } catch { + // getReader() throws if stream is closed/errored - let pump handle it + return false; + } +} + +/** + * Polls a WritableStream to check if the user has released their lock. + * Resolves the done promise when lock is released and no pending ops remain. + * + * Note: Only resolves if stream is unlocked but NOT closed. If the user closes + * the stream, the pump will handle resolution via the stream ending naturally. + */ +export function pollWritableLock( + writable: WritableStream, + state: FlushableStreamState +): void { + const intervalId = setInterval(() => { + // Stop polling if already resolved or stream ended + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + + // Check if lock is released (not closed) and no pending ops + if (isWritableUnlockedNotClosed(writable) && state.pendingOps === 0) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + }, LOCK_POLL_INTERVAL_MS); +} + +/** + * Polls a ReadableStream to check if the user has released their lock. + * Resolves the done promise when lock is released and no pending ops remain. + * + * Note: Only resolves if stream is unlocked but NOT closed. If the user closes + * the stream, the pump will handle resolution via the stream ending naturally. + */ +export function pollReadableLock( + readable: ReadableStream, + state: FlushableStreamState +): void { + const intervalId = setInterval(() => { + // Stop polling if already resolved or stream ended + if (state.doneResolved || state.streamEnded) { + clearInterval(intervalId); + return; + } + + // Check if lock is released (not closed) and no pending ops + if (isReadableUnlockedNotClosed(readable) && state.pendingOps === 0) { + state.doneResolved = true; + state.resolve(); + clearInterval(intervalId); + } + }, LOCK_POLL_INTERVAL_MS); +} + +/** + * Creates a flushable pipe from a ReadableStream to a WritableStream. + * Unlike pipeTo(), this resolves when: + * 1. The source stream completes (close/error), OR + * 2. The user releases their lock on userStream AND all pending writes are flushed + * + * @param source - The readable stream to read from (e.g., transform's readable) + * @param sink - The writable stream to write to (e.g., server writable) + * @param state - The flushable state tracker + * @returns Promise that resolves when stream ends (not when done promise resolves) + */ +export async function flushablePipe( + source: ReadableStream, + sink: WritableStream, + state: FlushableStreamState +): Promise { + const reader = source.getReader(); + const writer = sink.getWriter(); + + try { + while (true) { + // Check if stream has ended + if (state.streamEnded) { + return; + } + + // Read from source - don't count as pending op since we're just waiting for data + // The important ops are writes to the sink (server) + const readResult = await reader.read(); + + if (readResult.done) { + // Source stream completed - close sink and resolve + state.streamEnded = true; + await writer.close(); + // Resolve done promise if not already resolved + if (!state.doneResolved) { + state.doneResolved = true; + state.resolve(); + } + return; + } + + // Count write as a pending op - this is what we need to flush + state.pendingOps++; + try { + await writer.write(readResult.value); + } finally { + state.pendingOps--; + } + + // Check if stream has ended (e.g., due to error in another path) + if (state.streamEnded) { + return; + } + } + } catch (err) { + state.streamEnded = true; + if (!state.doneResolved) { + state.doneResolved = true; + state.reject(err); + } + throw err; + } finally { + reader.releaseLock(); + writer.releaseLock(); + } +} diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 1a984ba5c..2f55fbff3 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -3,19 +3,15 @@ import type { WorkflowRuntimeError } from '@workflow/errors'; import { describe, expect, it } from 'vitest'; import { getStepFunction, registerStepFunction } from './private.js'; import { - createFlushableState, dehydrateStepArguments, dehydrateStepReturnValue, dehydrateWorkflowArguments, dehydrateWorkflowReturnValue, - flushablePipe, getCommonRevivers, getStreamType, getWorkflowReducers, hydrateStepArguments, hydrateWorkflowArguments, - LOCK_POLL_INTERVAL_MS, - pollWritableLock, } from './serialization.js'; import { STABLE_ULID, STREAM_NAME_SYMBOL } from './symbols.js'; import { createContext } from './vm/index.js'; @@ -1001,106 +997,3 @@ describe('step function serialization', () => { expect(result).toEqual({ stepId: stepName }); }); }); - -describe('flushable stream behavior', () => { - it('done promise should resolve when writable stream lock is released (polling)', async () => { - // Test the pattern: user writes, releases lock, polling detects it, done resolves - const chunks: string[] = []; - let streamClosed = false; - - // Create a simple mock for the sink - const mockSink = new WritableStream({ - write(chunk) { - chunks.push(chunk); - }, - close() { - streamClosed = true; - }, - }); - - // Create a TransformStream like we do in getStepRevivers - const { readable, writable } = new TransformStream(); - const state = createFlushableState(); - - // Start piping in background - flushablePipe(readable, mockSink, state).catch(() => { - // Errors handled via state.reject - }); - - // Start polling for lock release - pollWritableLock(writable, state); - - // Simulate user interaction - write and release lock - const userWriter = writable.getWriter(); - await userWriter.write('chunk1'); - await userWriter.write('chunk2'); - - // Release lock without closing stream - userWriter.releaseLock(); - - // Wait for pipe to process + polling interval - await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); - - // The promise should resolve - await expect( - Promise.race([ - state.promise, - new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 400)), - ]) - ).resolves.toBeUndefined(); - - // Chunks should have been written - expect(chunks).toContain('chunk1'); - expect(chunks).toContain('chunk2'); - - // Stream should NOT be closed (user only released lock) - expect(streamClosed).toBe(false); - }); - - it('done promise should resolve when writable stream closes naturally', async () => { - const chunks: string[] = []; - let streamClosed = false; - - const mockSink = new WritableStream({ - write(chunk) { - chunks.push(chunk); - }, - close() { - streamClosed = true; - }, - }); - - const { readable, writable } = new TransformStream(); - const state = createFlushableState(); - - // Start piping in background - flushablePipe(readable, mockSink, state).catch(() => { - // Errors handled via state.reject - }); - - // Start polling (won't trigger since stream will close first) - pollWritableLock(writable, state); - - // User writes and then closes the stream - const userWriter = writable.getWriter(); - await userWriter.write('data'); - await userWriter.close(); - - // Wait a tick for the pipe to process - await new Promise((r) => setTimeout(r, 50)); - - // The promise should resolve - await expect( - Promise.race([ - state.promise, - new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 200)), - ]) - ).resolves.toBeUndefined(); - - // Chunks should have been written - expect(chunks).toContain('data'); - - // Stream should be closed (user closed it) - expect(streamClosed).toBe(true); - }); -}); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 677b3a34a..8d37b97ac 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1,7 +1,12 @@ import { WorkflowRuntimeError } from '@workflow/errors'; -import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; +import { + createFlushableState, + flushablePipe, + pollReadableLock, + pollWritableLock, +} from './flushable-stream.js'; import { getStepFunction } from './private.js'; import { getWorld } from './runtime/world.js'; import { contextStorage } from './step/context-storage.js'; @@ -179,200 +184,6 @@ export class WorkflowServerWritableStream extends WritableStream { } } -/** Polling interval for checking stream lock state (in milliseconds) */ -/** Polling interval for lock release detection */ -export const LOCK_POLL_INTERVAL_MS = 100; - -/** - * State tracker for flushable stream operations. - * Resolves when either: - * 1. Stream completes (close/error), OR - * 2. Lock is released AND all pending operations are flushed - * - * Note: `doneResolved` and `streamEnded` are separate: - * - `doneResolved`: The `done` promise has been resolved (step can complete) - * - `streamEnded`: The underlying stream has actually closed/errored - * - * The pump continues running even after `doneResolved=true` to handle - * any future writes if the user acquires a new lock. - */ -export interface FlushableStreamState extends PromiseWithResolvers { - /** Number of write operations currently in flight to the server */ - pendingOps: number; - /** Whether the `done` promise has been resolved */ - doneResolved: boolean; - /** Whether the underlying stream has actually closed/errored */ - streamEnded: boolean; -} - -export function createFlushableState(): FlushableStreamState { - return { - ...withResolvers(), - pendingOps: 0, - doneResolved: false, - streamEnded: false, - }; -} - -/** - * Checks if a WritableStream is unlocked (user released lock) vs closed. - * When a stream is closed, .locked is false but getWriter() throws. - * We only want to resolve via polling when the stream is unlocked, not closed. - * If closed, the pump will handle resolution via the stream ending naturally. - */ -function isWritableUnlockedNotClosed(writable: WritableStream): boolean { - if (writable.locked) return false; - - try { - // Try to acquire writer - if successful, stream is unlocked (not closed) - const writer = writable.getWriter(); - writer.releaseLock(); - return true; - } catch { - // getWriter() throws if stream is closed/errored - let pump handle it - return false; - } -} - -/** - * Checks if a ReadableStream is unlocked (user released lock) vs closed. - */ -function isReadableUnlockedNotClosed(readable: ReadableStream): boolean { - if (readable.locked) return false; - - try { - // Try to acquire reader - if successful, stream is unlocked (not closed) - const reader = readable.getReader(); - reader.releaseLock(); - return true; - } catch { - // getReader() throws if stream is closed/errored - let pump handle it - return false; - } -} - -/** - * Polls a WritableStream to check if the user has released their lock. - * Resolves the done promise when lock is released and no pending ops remain. - * - * Note: Only resolves if stream is unlocked but NOT closed. If the user closes - * the stream, the pump will handle resolution via the stream ending naturally. - */ -export function pollWritableLock( - writable: WritableStream, - state: FlushableStreamState -): void { - const intervalId = setInterval(() => { - // Stop polling if already resolved or stream ended - if (state.doneResolved || state.streamEnded) { - clearInterval(intervalId); - return; - } - - // Check if lock is released (not closed) and no pending ops - if (isWritableUnlockedNotClosed(writable) && state.pendingOps === 0) { - state.doneResolved = true; - state.resolve(); - clearInterval(intervalId); - } - }, LOCK_POLL_INTERVAL_MS); -} - -/** - * Polls a ReadableStream to check if the user has released their lock. - * Resolves the done promise when lock is released and no pending ops remain. - * - * Note: Only resolves if stream is unlocked but NOT closed. If the user closes - * the stream, the pump will handle resolution via the stream ending naturally. - */ -export function pollReadableLock( - readable: ReadableStream, - state: FlushableStreamState -): void { - const intervalId = setInterval(() => { - // Stop polling if already resolved or stream ended - if (state.doneResolved || state.streamEnded) { - clearInterval(intervalId); - return; - } - - // Check if lock is released (not closed) and no pending ops - if (isReadableUnlockedNotClosed(readable) && state.pendingOps === 0) { - state.doneResolved = true; - state.resolve(); - clearInterval(intervalId); - } - }, LOCK_POLL_INTERVAL_MS); -} - -/** - * Creates a flushable pipe from a ReadableStream to a WritableStream. - * Unlike pipeTo(), this resolves when: - * 1. The source stream completes (close/error), OR - * 2. The user releases their lock on userStream AND all pending writes are flushed - * - * @param source - The readable stream to read from (e.g., transform's readable) - * @param sink - The writable stream to write to (e.g., server writable) - * @param state - The flushable state tracker - * @returns Promise that resolves when stream ends (not when done promise resolves) - */ -export async function flushablePipe( - source: ReadableStream, - sink: WritableStream, - state: FlushableStreamState -): Promise { - const reader = source.getReader(); - const writer = sink.getWriter(); - - try { - while (true) { - // Check if stream has ended - if (state.streamEnded) { - return; - } - - // Read from source - don't count as pending op since we're just waiting for data - // The important ops are writes to the sink (server) - const readResult = await reader.read(); - - if (readResult.done) { - // Source stream completed - close sink and resolve - state.streamEnded = true; - await writer.close(); - // Resolve done promise if not already resolved - if (!state.doneResolved) { - state.doneResolved = true; - state.resolve(); - } - return; - } - - // Count write as a pending op - this is what we need to flush - state.pendingOps++; - try { - await writer.write(readResult.value); - } finally { - state.pendingOps--; - } - - // Check if stream has ended (e.g., due to error in another path) - if (state.streamEnded) { - return; - } - } - } catch (err) { - state.streamEnded = true; - if (!state.doneResolved) { - state.doneResolved = true; - state.reject(err); - } - throw err; - } finally { - reader.releaseLock(); - writer.releaseLock(); - } -} - // Types that need specialized handling when serialized/deserialized // ! If a type is added here, it MUST also be added to the `Serializable` type in `schemas.ts` export interface SerializableSpecial {