Skip to content

Fix race condition: Atomic stream finish with message save#224

Merged
sethconvex merged 2 commits intomainfrom
fix-atomic-stream-finish
Feb 20, 2026
Merged

Fix race condition: Atomic stream finish with message save#224
sethconvex merged 2 commits intomainfrom
fix-atomic-stream-finish

Conversation

@sethconvex
Copy link
Contributor

@sethconvex sethconvex commented Feb 11, 2026

Summary

Fixes the intermittent crash "TypeError: The stream is not in a state that permits enqueue" when using saveStreamDeltas with tool calls (issue #181).

The bug occurs when:

  1. Tool executes successfully → onStepFinish callback saves to DB
  2. Stream finishes reading → AI SDK closes stream via attemptClose()
  3. Delta-save mutation still in flight → tries to write to closed stream
  4. Crash: Cannot enqueue chunk on closed stream

The fix implements atomic stream finish by:

  • Deferring final step save when streaming is enabled
  • Saving step atomically with stream finish in same mutation
  • Stream stays open until database confirms all saves complete

Cherry-picked from #217 (commit 5cd4c5e). Resolves #181.

Test plan

  • npm run build passes
  • npm test passes
  • No more "stream is not in a state that permits enqueue" errors during tool calling with streaming

🤖 Generated with Claude Code

@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

Warning

Rate limit exceeded

@sethconvex has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 16 minutes and 6 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix-atomic-stream-finish

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Feb 11, 2026

Open in StackBlitz

npm i https://pkg.pr.new/get-convex/agent/@convex-dev/agent@224

commit: 5264d40

messages: serialized.messages,
embeddings,
failPendingSteps: false,
finishStreamId: finishStreamId as any, // optional stream to finish atomically
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the any cast?

Comment on lines 315 to 317
public async getOrCreateStreamId(): Promise<string> {
await this.getStreamId();
return this.streamId!;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just have getStreamId() return this.streamId


// If we deferred the final step save, do it now with atomic stream finish
if (pendingFinalStep && streamer) {
const finishStreamId = await streamer.getOrCreateStreamId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why the streamId wouldn't exist yet - but i guess better than streamer.streamId!?

Comment on lines +312 to +313
if (args.finishStreamId) {
await finishHandler(ctx, { streamId: args.finishStreamId });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (args.finishStreamId) {
await finishHandler(ctx, { streamId: args.finishStreamId });
if (finishStreamId) {
await finishHandler(ctx, { streamId: finishStreamId });

@sethconvex sethconvex force-pushed the fix-v6-examples-and-warnings branch from a19d166 to c209e89 Compare February 20, 2026 06:34
@sethconvex sethconvex force-pushed the fix-atomic-stream-finish branch 2 times, most recently from 462e158 to cf75fc1 Compare February 20, 2026 06:51
@sethconvex
Copy link
Contributor Author

Addressed review feedback:

  1. as any cast (start.ts:274): Added comment explaining why — finishStreamId is string from getOrCreateStreamId(), but the component mutation expects Id<"streamingMessages">. Runtime validation handles it.

  2. getStreamId() return value (streaming.ts:264): Refactored to always return this.streamId. getOrCreateStreamId() now just delegates to getStreamId() directly.

  3. streamText.ts:169: The stream is created by this point (via addPartsgetStreamId() during streaming). getOrCreateStreamId is just the public accessor.

@sethconvex sethconvex force-pushed the fix-atomic-stream-finish branch from cf75fc1 to b6894b8 Compare February 20, 2026 06:59
@sethconvex sethconvex force-pushed the fix-v6-examples-and-warnings branch from c209e89 to 0eb6f6a Compare February 20, 2026 07:12
@sethconvex sethconvex force-pushed the fix-atomic-stream-finish branch from b6894b8 to 34d83da Compare February 20, 2026 07:12
@sethconvex sethconvex force-pushed the fix-v6-examples-and-warnings branch 2 times, most recently from 17cd6b6 to b25ec92 Compare February 20, 2026 07:21
@sethconvex sethconvex force-pushed the fix-atomic-stream-finish branch from 34d83da to 15c0ddf Compare February 20, 2026 07:21
@sethconvex sethconvex marked this pull request as ready for review February 20, 2026 07:25
Copy link
Contributor Author

sethconvex commented Feb 20, 2026

Merge activity

  • Feb 20, 7:25 AM UTC: A user started a stack merge that includes this pull request via Graphite.
  • Feb 20, 7:26 AM UTC: Graphite rebased this pull request as part of a merge.
  • Feb 20, 7:27 AM UTC: @sethconvex merged this pull request with Graphite.

@sethconvex sethconvex changed the base branch from fix-v6-examples-and-warnings to graphite-base/224 February 20, 2026 07:25
@sethconvex sethconvex changed the base branch from graphite-base/224 to main February 20, 2026 07:25
sethconvex and others added 2 commits February 20, 2026 07:26
Fixes the intermittent crash "TypeError: The stream is not in a state
that permits enqueue" when using saveStreamDeltas with tool calls.

The bug occurs when:
1. Tool executes successfully -> onStepFinish callback saves to DB
2. Stream finishes reading -> AI SDK closes stream via attemptClose()
3. Delta-save mutation still in flight -> tries to write to closed stream
4. Crash: Cannot enqueue chunk on closed stream

This fix (from commit 87e3657 on seth/fix-193 branch) implements atomic
stream finish by:
- Deferring final step save when streaming is enabled
- Saving step atomically with stream finish in same mutation
- Stream stays open until database confirms all saves complete

Changes:
- streamText.ts: Track pendingFinalStep, defer save, atomic finish
- streaming.ts: Add markFinishedExternally() and getOrCreateStreamId()
- start.ts: Add finishStreamId parameter to save()
- messages.ts: Atomically finish stream with message save

Resolves issue #181
Addresses user report: #217

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@sethconvex sethconvex force-pushed the fix-atomic-stream-finish branch from 15c0ddf to 5264d40 Compare February 20, 2026 07:26
@sethconvex sethconvex merged commit 6998f79 into main Feb 20, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Sub-messages flow problem

2 participants