Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/stream-lock-polling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": patch
---

Fix stream serialization to resolve when user releases lock instead of waiting for stream to close. This prevents Vercel functions from hanging when users incrementally write to streams within steps (e.g., `await writer.write(data); writer.releaseLock()`). Uses a polling approach to detect when the stream lock is released and all pending writes are flushed.

38 changes: 31 additions & 7 deletions docs/content/docs/foundations/streaming.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -469,27 +469,51 @@ async function uploadResult(stream: ReadableStream<Uint8Array>) {
}
```

## Best Practices
## Stream Lock Contract

**Release locks properly:**
When writing to a stream in a step function, there is an important contract to understand:

<Callout type="warn">
**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock.
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states "Once a lock is released, no further writes to that stream from that step are allowed" and "The framework uses lock release as the signal that the step is done interacting with the stream." However, this behavior contradicts the comment in flushable-stream.ts line 16-17 which states "The pump continues running even after doneResolved=true to handle any future writes if the user acquires a new lock." This is confusing - can users re-acquire locks and write more data after releasing, or not? The documentation and code comments need to be aligned on this contract.

Suggested change
**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock.
**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock, and do not rely on re-acquiring a lock on the same stream within the same step after it has been released, even if internal implementation details might technically allow it.

Copilot uses AI. Check for mistakes.
</Callout>

<Callout type="warn">
**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues.
</Callout>

**Correct pattern - complete all writes before releasing:**

```typescript lineNumbers
async function writeData(items: string[]) {
"use step";

const writable = getWritable<string>();
const writer = writable.getWriter();

// Complete ALL writes before releasing the lock
for (const item of items) {
await writer.write(item);
}

writer.releaseLock(); // Now safe to release
}
```

**Use try/finally to ensure the lock is always released:**

```typescript lineNumbers
const writer = writable.getWriter();
try {
await writer.write(data);
} finally {
writer.releaseLock(); // Always release
writer.releaseLock(); // Always release, even on error
}
```

<Callout type="info">
Stream locks acquired in a step only apply within that step, not across other steps. This enables multiple writers to write to the same stream concurrently.
</Callout>

<Callout type="info">
If a lock is not released, the step process cannot terminate. Even though the step returns and the workflow continues, the underlying process will remain active until it times out.
</Callout>

**Close streams when done:**

```typescript lineNumbers
Expand Down
110 changes: 110 additions & 0 deletions packages/core/src/flushable-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { describe, expect, it } from 'vitest';
import {
createFlushableState,
flushablePipe,
LOCK_POLL_INTERVAL_MS,
pollWritableLock,
} from './flushable-stream.js';

describe('flushable stream behavior', () => {
it('promise should resolve when writable stream lock is released (polling)', async () => {
// Test the pattern: user writes, releases lock, polling detects it, promise resolves
const chunks: string[] = [];
let streamClosed = false;

// Create a simple mock for the sink
const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
},
close() {
streamClosed = true;
},
});

// Create a TransformStream like we do in getStepRevivers
const { readable, writable } = new TransformStream<string, string>();
const state = createFlushableState();

// Start piping in background
flushablePipe(readable, mockSink, state).catch(() => {
// Errors handled via state.reject
});

// Start polling for lock release
pollWritableLock(writable, state);

// Simulate user interaction - write and release lock
const userWriter = writable.getWriter();
await userWriter.write('chunk1');
await userWriter.write('chunk2');

// Release lock without closing stream
userWriter.releaseLock();

// Wait for pipe to process + polling interval
await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50));

// The promise should resolve
await expect(
Promise.race([
state.promise,
new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 400)),
])
).resolves.toBeUndefined();

// Chunks should have been written
expect(chunks).toContain('chunk1');
expect(chunks).toContain('chunk2');

// Stream should NOT be closed (user only released lock)
expect(streamClosed).toBe(false);
});

it('promise should resolve when writable stream closes naturally', async () => {
const chunks: string[] = [];
let streamClosed = false;

const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
},
close() {
streamClosed = true;
},
});

const { readable, writable } = new TransformStream<string, string>();
const state = createFlushableState();

// Start piping in background
flushablePipe(readable, mockSink, state).catch(() => {
// Errors handled via state.reject
});

// Start polling (won't trigger since stream will close first)
pollWritableLock(writable, state);

// User writes and then closes the stream
const userWriter = writable.getWriter();
await userWriter.write('data');
await userWriter.close();

// Wait a tick for the pipe to process
await new Promise((r) => setTimeout(r, 50));

// The promise should resolve
await expect(
Promise.race([
state.promise,
new Promise((_, r) => setTimeout(() => r(new Error('timeout')), 200)),
])
).resolves.toBeUndefined();

// Chunks should have been written
expect(chunks).toContain('data');

// Stream should be closed (user closed it)
expect(streamClosed).toBe(true);
});
});
Comment on lines +9 to +110
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test coverage is missing for several critical scenarios:

  1. Error handling during pipe operations (e.g., write failures)
  2. Testing with pollReadableLock (only pollWritableLock is tested)
  3. Concurrent operations (multiple reads/writes in flight)
  4. Edge cases like releasing and re-acquiring locks
  5. Scenarios where the stream ends while pending operations are in flight

These scenarios are important to verify the robustness of the flushable stream mechanism, especially given the complexity around lock polling and state management.

Copilot uses AI. Check for mistakes.
194 changes: 194 additions & 0 deletions packages/core/src/flushable-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import { type PromiseWithResolvers, withResolvers } from '@workflow/utils';

/** Polling interval for lock release detection */
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The polling interval is set to 100ms (LOCK_POLL_INTERVAL_MS), which means there could be up to a 100ms delay between when a user releases a lock and when the system detects it. In serverless environments where execution time costs money, this polling approach could be wasteful. Consider if there's a more event-driven approach, or at least document why polling was chosen over alternatives. Additionally, with many concurrent streams, having multiple setInterval timers could impact performance.

Suggested change
/** Polling interval for lock release detection */
/**
* Polling interval (in ms) for lock release detection.
*
* The Web Streams API does not expose an event for "lock released but stream
* still open"; we can only distinguish that state by periodically attempting
* to acquire a reader/writer. For that reason we use polling instead of a
* fully event-driven approach here.
*
* 100ms is a compromise between:
* - Latency: how quickly we notice that the user has released their lock, and
* - Cost/CPU usage: how often timers fire, especially with many concurrent
* streams or in serverless environments where billed time matters.
*
* This value should only be changed with care, as decreasing it will
* increase polling frequency (and thus potential cost), while increasing it
* will add worst-case delay before the `done` promise resolves after a lock
* is released.
*/

Copilot uses AI. Check for mistakes.
export const LOCK_POLL_INTERVAL_MS = 100;

/**
* State tracker for flushable stream operations.
* Resolves when either:
* 1. Stream completes (close/error), OR
* 2. Lock is released AND all pending operations are flushed
*
* Note: `doneResolved` and `streamEnded` are separate:
* - `doneResolved`: The `done` promise has been resolved (step can complete)
* - `streamEnded`: The underlying stream has actually closed/errored
*
* The pump continues running even after `doneResolved=true` to handle
* any future writes if the user acquires a new lock.
Comment on lines +16 to +17
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "The pump continues running even after doneResolved=true to handle any future writes if the user acquires a new lock." However, looking at the flushablePipe implementation, there's no mechanism to actually handle or support re-acquiring locks after the promise resolves. Once doneResolved is true, the polling stops, and if the user were to acquire a new lock and write more data, that data would continue to be pumped through, but there's no way to signal completion again. This comment is misleading - either the implementation should support this pattern, or the comment should be updated to clarify that re-acquiring locks after release is not a supported use case.

Suggested change
* The pump continues running even after `doneResolved=true` to handle
* any future writes if the user acquires a new lock.
* Once `doneResolved` is set to true, the `done` promise will not resolve
* again. Re-acquiring locks after release is not supported as a way to
* trigger additional completion signaling.

Copilot uses AI. Check for mistakes.
*/
export interface FlushableStreamState extends PromiseWithResolvers<void> {
/** Number of write operations currently in flight to the server */
pendingOps: number;
/** Whether the `done` promise has been resolved */
doneResolved: boolean;
/** Whether the underlying stream has actually closed/errored */
streamEnded: boolean;
}

export function createFlushableState(): FlushableStreamState {
return {
...withResolvers<void>(),
pendingOps: 0,
doneResolved: false,
streamEnded: false,
};
}

/**
* Checks if a WritableStream is unlocked (user released lock) vs closed.
* When a stream is closed, .locked is false but getWriter() throws.
* We only want to resolve via polling when the stream is unlocked, not closed.
* If closed, the pump will handle resolution via the stream ending naturally.
*/
function isWritableUnlockedNotClosed(writable: WritableStream): boolean {
if (writable.locked) return false;

try {
// Try to acquire writer - if successful, stream is unlocked (not closed)
const writer = writable.getWriter();
writer.releaseLock();
return true;
} catch {
// getWriter() throws if stream is closed/errored - let pump handle it
return false;
}
Comment on lines +43 to +54
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function acquires a writer lock temporarily to check if the stream is unlocked vs closed, but if an error occurs during releaseLock() (line 49), the error is silently caught and false is returned. While this is probably the desired behavior, it means the lock might remain acquired if releaseLock throws. Consider being more specific about which errors to catch, or add a comment explaining why errors during releaseLock should be treated as "stream is closed".

Copilot uses AI. Check for mistakes.
}

/**
* Checks if a ReadableStream is unlocked (user released lock) vs closed.
*/
function isReadableUnlockedNotClosed(readable: ReadableStream): boolean {
if (readable.locked) return false;

try {
// Try to acquire reader - if successful, stream is unlocked (not closed)
const reader = readable.getReader();
reader.releaseLock();
return true;
} catch {
// getReader() throws if stream is closed/errored - let pump handle it
return false;
}
Comment on lines +63 to +71
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to isWritableUnlockedNotClosed, this function temporarily acquires a reader lock to check stream state. If an error occurs during releaseLock() (line 66), it's silently caught. While probably intentional, this could leave the lock acquired if releaseLock throws. Consider being more explicit about error handling or documenting why all errors should be treated as "stream is closed".

Suggested change
try {
// Try to acquire reader - if successful, stream is unlocked (not closed)
const reader = readable.getReader();
reader.releaseLock();
return true;
} catch {
// getReader() throws if stream is closed/errored - let pump handle it
return false;
}
let reader: ReadableStreamDefaultReader | undefined;
try {
// Try to acquire reader - if successful, stream is unlocked (not closed)
reader = readable.getReader();
} catch {
// getReader() throws if stream is closed/errored - let pump handle it
return false;
}
try {
reader.releaseLock();
} catch {
// If releaseLock() throws for any reason, conservatively treat the
// stream as closed/errored so callers don't assume it's safe to use.
// The pump will observe the failure via the stream's end state.
return false;
}
return true;

Copilot uses AI. Check for mistakes.
}

/**
* Polls a WritableStream to check if the user has released their lock.
* Resolves the done promise when lock is released and no pending ops remain.
*
* Note: Only resolves if stream is unlocked but NOT closed. If the user closes
* the stream, the pump will handle resolution via the stream ending naturally.
*/
export function pollWritableLock(
writable: WritableStream,
state: FlushableStreamState
): void {
const intervalId = setInterval(() => {
// Stop polling if already resolved or stream ended
if (state.doneResolved || state.streamEnded) {
clearInterval(intervalId);
return;
}

// Check if lock is released (not closed) and no pending ops
if (isWritableUnlockedNotClosed(writable) && state.pendingOps === 0) {
state.doneResolved = true;
state.resolve();
clearInterval(intervalId);
}
}, LOCK_POLL_INTERVAL_MS);
Comment on lines +85 to +98
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interval created here is never stored or returned, which means there's no way to explicitly clean it up. While the interval does have cleanup logic inside the callback, there's a potential issue if pollWritableLock is called multiple times on the same stream - this would create multiple intervals that could race to resolve the same state. Consider returning the intervalId so callers can clean it up if needed, or add protection against multiple simultaneous polling operations on the same state.

Copilot uses AI. Check for mistakes.
}

/**
* Polls a ReadableStream to check if the user has released their lock.
* Resolves the done promise when lock is released and no pending ops remain.
*
* Note: Only resolves if stream is unlocked but NOT closed. If the user closes
* the stream, the pump will handle resolution via the stream ending naturally.
*/
export function pollReadableLock(
readable: ReadableStream,
state: FlushableStreamState
): void {
const intervalId = setInterval(() => {
// Stop polling if already resolved or stream ended
if (state.doneResolved || state.streamEnded) {
clearInterval(intervalId);
return;
}

// Check if lock is released (not closed) and no pending ops
if (isReadableUnlockedNotClosed(readable) && state.pendingOps === 0) {
state.doneResolved = true;
state.resolve();
clearInterval(intervalId);
}
}, LOCK_POLL_INTERVAL_MS);
Comment on lines +112 to +125
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to pollWritableLock, this interval is never stored or returned, creating potential issues if this function is called multiple times on the same stream. Multiple simultaneous polling operations could race to resolve the same state. Consider returning the intervalId for explicit cleanup or adding protection against concurrent polling.

Copilot uses AI. Check for mistakes.
}

/**
* Creates a flushable pipe from a ReadableStream to a WritableStream.
* Unlike pipeTo(), this resolves when:
* 1. The source stream completes (close/error), OR
* 2. The user releases their lock on userStream AND all pending writes are flushed
*
* @param source - The readable stream to read from (e.g., transform's readable)
* @param sink - The writable stream to write to (e.g., server writable)
* @param state - The flushable state tracker
* @returns Promise that resolves when stream ends (not when done promise resolves)
*/
export async function flushablePipe(
source: ReadableStream,
sink: WritableStream,
state: FlushableStreamState
): Promise<void> {
const reader = source.getReader();
const writer = sink.getWriter();

try {
while (true) {
// Check if stream has ended
if (state.streamEnded) {
return;
}

// Read from source - don't count as pending op since we're just waiting for data
// The important ops are writes to the sink (server)
const readResult = await reader.read();

if (readResult.done) {
// Source stream completed - close sink and resolve
state.streamEnded = true;
await writer.close();
// Resolve done promise if not already resolved
if (!state.doneResolved) {
state.doneResolved = true;
state.resolve();
}
return;
}

// Count write as a pending op - this is what we need to flush
state.pendingOps++;
try {
await writer.write(readResult.value);
} finally {
state.pendingOps--;
}
Comment on lines +156 to +176
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition here: after reading from the source, the stream could be ended (via error or another path) before the write begins, but state.streamEnded is only checked after the write completes. This means we might attempt to write to a stream that should be terminated. Consider checking state.streamEnded immediately after the read, before incrementing pendingOps and writing.

Copilot uses AI. Check for mistakes.

// Check if stream has ended (e.g., due to error in another path)
if (state.streamEnded) {
return;
}
}
} catch (err) {
state.streamEnded = true;
if (!state.doneResolved) {
state.doneResolved = true;
state.reject(err);
}
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an error occurs during piping, the error is caught, state.reject(err) is called, and then the error is re-thrown. However, the callers in serialization.ts catch this error with .catch(() => {}) and ignore it, relying on state.reject to propagate the error through the promise. This means the thrown error from line 189 is always silently caught and discarded. Consider either not re-throwing the error (since it's handled via state.reject), or documenting why the error is both rejected through state and re-thrown.

Suggested change
}
}
// Propagate error through flushablePipe's own promise as well.
// Callers that rely on the FlushableStreamState should use `state.promise`,
// while other callers may depend on this rejection. Some known callers
// explicitly ignore this rejection (`.catch(() => {})`) and rely solely
// on `state.reject(err)` for error handling.

Copilot uses AI. Check for mistakes.
throw err;
} finally {
reader.releaseLock();
writer.releaseLock();
}
}
Loading