Skip to content

Comments

Add HTTP streaming demo, integration tests, and fix streaming bugs#232

Open
sethconvex wants to merge 4 commits intomainfrom
fix/http-streaming-duplication
Open

Add HTTP streaming demo, integration tests, and fix streaming bugs#232
sethconvex wants to merge 4 commits intomainfrom
fix/http-streaming-duplication

Conversation

@sethconvex
Copy link
Contributor

@sethconvex sethconvex commented Feb 24, 2026

Summary

Add a streaming demo example showcasing all three streaming patterns (delta, one-shot, HTTP) side-by-side, along with comprehensive integration tests for DeltaStreamer and delta consumption.

New files

  • example/convex/chat/streamingDemo.ts — Backend for the streaming demo (delta, one-shot, and HTTP modes)
  • example/ui/chat/StreamingDemo.tsx — UI with mode switcher showing all three streaming patterns
  • src/client/streaming.integration.test.ts — Integration tests for DeltaStreamer, consumeStream, compression, and delta materialization
  • src/backwards-compat.test.ts — Backwards compatibility tests for v5→v6 migration paths
  • docs/http-streaming-requirements.md — Technical requirements doc for HTTP streaming support

Bug fixes

  • HTTP streaming duplication — Clear httpText when streaming ends, hide pending DB message during HTTP stream, render tool call parts inline in the HTTP bubble
  • One-shot mode broken — Use useAction instead of useMutation for the action endpoint, fire-and-forget so the UI isn't blocked
  • useSmoothText animating non-streaming messages — Track hasStreamed flag; snap cursor to full text immediately when streaming was never activated
  • DeltaStreamer.finish()/fail() not fully draining writes — Use while loop instead of single await to drain the self-chaining #sendDelta queue
  • Unhandled rejection in #sendDelta — Return instead of re-throwing after onAsyncAbort, since the fire-and-forget caller cannot catch it

Test plan

  • npm test — 292 tests pass, no type errors, no unhandled rejections
  • npm run typecheck — clean
  • npm run lint — clean
  • Manual: verify HTTP streaming demo shows no duplicate content
  • Manual: verify one-shot mode delivers response
  • Manual: verify tool calls appear inline during HTTP streaming

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Full interactive streaming demo showcasing delta, HTTP, and one-shot modes with inspector, abort controls, and tool-approval flows; new route added to the example app.
    • New HTTP streaming UI hook and utilities exposing live text, stream/message IDs, start/abort controls, and server endpoints for text/UI streams.
  • Documentation

    • Added an HTTP streaming requirements/specification guide.
  • Bug Fixes

    • Improved streaming error handling and write-queue drainage.
  • Tests

    • Extensive integration and unit tests covering HTTP and delta streaming, formats, compression, and lifecycle.

@coderabbitai
Copy link

coderabbitai bot commented Feb 24, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a full streaming demo (UI + Convex backend) supporting delta, HTTP, and one-shot modes; introduces HTTP streaming client hooks/utilities and server actions; large test suites for backwards compatibility, HTTP and delta streaming; new demo agent/tools; small runtime fixes and HTTP streaming documentation.

Changes

Cohort / File(s) Summary
Streaming Demo UI & Routes
example/ui/chat/StreamingDemo.tsx, example/ui/main.tsx
New StreamingDemo component and route (/streaming-demo) implementing delta, HTTP, and one-shot modes with inspector, abort/continue controls, and UI integration.
Example Convex Backend & HTTP route
example/convex/chat/streamingDemo.ts, example/convex/agents/streamingDemo.ts, example/convex/http.ts
New Convex actions for streaming (streamResponse, streamOverHttp, streamOneShot), message/stream list and abort APIs, approval/continuation flows, demo Agent/tools, and HTTP route /streamTextDemo.
Client HTTP streaming utilities & hook
src/client/http.ts, src/react/useHttpStream.ts, src/react/httpStreamUtils.ts, src/react/httpStreamUtils.test.ts, src/react/index.ts
Adds httpStreamText/httpStreamUIMessages, consumeTextStream/supportsStreaming utilities, and useHttpStream hook (streaming fetch, header extraction, chunk decoding, abort). Adds tests and re-exports.
Client public surface & generateText
src/client/index.ts, src/client/generateText.ts, src/client/streamText.ts, src/client/types.ts
Exports new generateText; surfaces http streaming APIs; streamText now returns streamId in metadata; GenerationOutputMetadata gains optional streamId. Adds Agent.asHttpAction helper.
Streaming runtime & write-queue fixes
src/client/streaming.ts
Adjusts error handling to return after abort and changes finish()/fail() to drain self-chaining write queue before completing.
React smooth text hook tweak
src/react/useSmoothText.ts
Adds hasStreamed flag and snaps cursor to end if streaming never started to avoid animation artifacts.
Extensive tests: Backwards compatibility & integration
src/backwards-compat.test.ts, src/client/streaming.integration.test.ts, src/client/http.test.ts
Large new test suites covering legacy formats, delta semantics, compression/merge, cursor/gap handling, multi-stream ordering, HTTP action behaviors, UI message derivation, and end-to-end streaming lifecycles. Adds Convex test actions for HTTP tests.
Docs: HTTP streaming requirements
docs/http-streaming-requirements.md
New specification describing HTTP streaming design, transport formats, resumption, headers/metadata, delta filtering/compression, lifecycle, and open questions.
React HTTP streaming utilities export
src/react/index.ts
Re-exports useHttpStream, consumeTextStream, and supportsStreaming.

Sequence Diagram(s)

sequenceDiagram
  participant UI as Client UI
  participant Hook as useHttpStream / ChatPanel
  participant Action as Convex Action (streamOverHttp / streamOneShot)
  participant HTTP as Remote HTTP Stream
  participant DB as Database / Convex persistence

  UI->>Hook: send(body) / select mode
  Hook->>Action: start streaming (POST / action or useAction)
  Action->>HTTP: initiate remote HTTP stream
  HTTP-->>Action: stream chunks (text / tool outputs)
  Action->>DB: persist deltas/parts & emit headers (X-Stream-Id/X-Message-Id)
  Action-->>Hook: push updates (response chunks or DB subscription)
  Hook->>UI: update text, streamId, messageId, isStreaming
  UI->>Action: abort / approve / continue
  Action->>DB: update stream status (finished/aborted)
  DB-->>Hook: final persisted message -> render final bubble
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • AI SDK v6 Support #216 — closely related work on HTTP streaming, tool-approval workflows, and streaming/react utilities that overlap with these changes.

Poem

🐰 I hopped through chunks and headers bright,

Deltas danced and HTTP streamed all night,
Tools asked kindly, approvals took flight,
Tests kept watch by soft moonlight,
A rabbit nibbles on each streamed byte 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 59.09% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main changes: adding HTTP streaming demo, integration tests, and fixing streaming bugs. It accurately reflects the primary objective and key deliverables of the PR.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/http-streaming-duplication

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Feb 24, 2026

Open in StackBlitz

npm i https://pkg.pr.new/get-convex/agent/@convex-dev/agent@232

commit: 0e4d9f0

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (5)
src/client/streaming.integration.test.ts (1)

1195-1196: Assertion relies on short-circuit evaluation — fragile error message on failure.

expect(first.type === "reasoning-delta" && first.delta).toBe("Thinking") works but if first.type is wrong, the failure message will be expected false to be "Thinking" with no indication of what the actual type was. Consider splitting into two assertions for clearer diagnostics:

🔧 Suggested improvement
     const first = compressed[0];
-    expect(first.type === "reasoning-delta" && first.delta).toBe("Thinking");
+    expect(first.type).toBe("reasoning-delta");
+    if (first.type === "reasoning-delta") {
+      expect(first.delta).toBe("Thinking");
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/streaming.integration.test.ts` around lines 1195 - 1196, The
assertion combines a type check and value check into one expression which yields
poor diagnostics; change the single assertion into two explicit ones: assert the
object's type with expect(first.type).toBe("reasoning-delta") and then assert
the delta value with expect(first.delta).toBe("Thinking"), referencing the
variable first in the failing test to get clear error messages.
src/backwards-compat.test.ts (4)

289-290: Same short-circuit assertion pattern as in streaming.integration.test.ts.

expect(first.type === "text-delta" && first.delta).toBe(chars) will yield an unhelpful expected false to be "..." if the type is wrong. Consider splitting into expect(first.type).toBe("text-delta") followed by a narrowed delta check.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backwards-compat.test.ts` around lines 289 - 290, The assertion
`expect(first.type === "text-delta" && first.delta).toBe(chars)` masks which
part failed; change it to two assertions: first assert the packet type with
`expect(first.type).toBe("text-delta")` to ensure correct narrowing, then assert
the delta value with `expect(first.delta).toBe(chars)` (or a suitable
deep-equality matcher) using the same `first` and `compressed` variables so
failures clearly show whether type or delta is wrong.

435-449: Performance timing assertion may be flaky in resource-constrained CI.

expect(elapsed).toBeLessThan(100) is reasonable for 50 deltas under normal conditions, but CI runners (especially shared/containerized environments) can have variable performance. If this test becomes flaky, consider either increasing the threshold or replacing with a relative check (e.g., materializing 50 deltas shouldn't be orders of magnitude slower than 5).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backwards-compat.test.ts` around lines 435 - 449, The timing assertion
using elapsed in the test around deriveUIMessagesFromDeltas is flaky in CI;
update the test to avoid a brittle absolute threshold by either raising the
limit (e.g., from 100ms to a more generous value) or implementing a relative
check (compare elapsed for 50 deltas against a baseline run for 5 deltas or
assert elapsed is within N× baseline) and keep assertions about messages
(length/role/text) unchanged; modify the test that measures performance (the
elapsed variable and the expect(elapsed) assertion) accordingly to use the new
threshold or relative comparison.

252-267: Duplicated test constants across this file and streaming.integration.test.ts.

defaultTestOptions and testMetadata are defined identically in three places: lines 252-267, lines 890-905 in this file, and lines 22-40 in streaming.integration.test.ts. Consider extracting these into a shared test-utilities module to reduce duplication.

Also applies to: 890-905

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backwards-compat.test.ts` around lines 252 - 267, Extract the duplicated
constants defaultTestOptions and testMetadata into a shared test utilities
module (e.g., export from a new test-utils file) and replace the inline
definitions in this file (occurrences at the shown block and the second block
around lines 890-905) and in streaming.integration.test.ts with imports from
that module; ensure the exported names match exactly (defaultTestOptions,
testMetadata, including the onAsyncAbort function shape and the format literal
"UIMessageChunk") and update any import paths in the three test files
accordingly so they consume the single shared definition.

180-198: Weak tests: these only verify plain-object creation, not actual runtime acceptance.

These tests create local objects with textEmbeddingModel and maxSteps properties but never pass them to any Agent or configuration constructor. They validate that a JS object can have these keys (which is trivially true), not that the actual system still accepts them at runtime. If the goal is backwards-compatibility coverage, these tests should exercise the real Agent config path or at minimum validate the TypeScript types compile.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backwards-compat.test.ts` around lines 180 - 198, The two tests
("textEmbeddingModel config is accepted and used" and "maxSteps config is still
supported in Config type") only construct plain objects; instead, pass these
configs through the real config path to assert runtime acceptance: create a
Config/Agent instance (use the same constructor or factory your code exposes,
e.g., Agent or createAgent) with the test config and assert the instance reads
or preserves the fields (or call the code path that picks
textEmbeddingModel/embeddingModel and assert behavior), or alternatively make
the test a TypeScript compile-time check by importing the Config type and using
a typed const to ensure the properties are accepted; update the tests to use the
actual Agent/Config symbol rather than only asserting plain-object keys.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/backwards-compat.test.ts`:
- Around line 289-290: The assertion `expect(first.type === "text-delta" &&
first.delta).toBe(chars)` masks which part failed; change it to two assertions:
first assert the packet type with `expect(first.type).toBe("text-delta")` to
ensure correct narrowing, then assert the delta value with
`expect(first.delta).toBe(chars)` (or a suitable deep-equality matcher) using
the same `first` and `compressed` variables so failures clearly show whether
type or delta is wrong.
- Around line 435-449: The timing assertion using elapsed in the test around
deriveUIMessagesFromDeltas is flaky in CI; update the test to avoid a brittle
absolute threshold by either raising the limit (e.g., from 100ms to a more
generous value) or implementing a relative check (compare elapsed for 50 deltas
against a baseline run for 5 deltas or assert elapsed is within N× baseline) and
keep assertions about messages (length/role/text) unchanged; modify the test
that measures performance (the elapsed variable and the expect(elapsed)
assertion) accordingly to use the new threshold or relative comparison.
- Around line 252-267: Extract the duplicated constants defaultTestOptions and
testMetadata into a shared test utilities module (e.g., export from a new
test-utils file) and replace the inline definitions in this file (occurrences at
the shown block and the second block around lines 890-905) and in
streaming.integration.test.ts with imports from that module; ensure the exported
names match exactly (defaultTestOptions, testMetadata, including the
onAsyncAbort function shape and the format literal "UIMessageChunk") and update
any import paths in the three test files accordingly so they consume the single
shared definition.
- Around line 180-198: The two tests ("textEmbeddingModel config is accepted and
used" and "maxSteps config is still supported in Config type") only construct
plain objects; instead, pass these configs through the real config path to
assert runtime acceptance: create a Config/Agent instance (use the same
constructor or factory your code exposes, e.g., Agent or createAgent) with the
test config and assert the instance reads or preserves the fields (or call the
code path that picks textEmbeddingModel/embeddingModel and assert behavior), or
alternatively make the test a TypeScript compile-time check by importing the
Config type and using a typed const to ensure the properties are accepted;
update the tests to use the actual Agent/Config symbol rather than only
asserting plain-object keys.

In `@src/client/streaming.integration.test.ts`:
- Around line 1195-1196: The assertion combines a type check and value check
into one expression which yields poor diagnostics; change the single assertion
into two explicit ones: assert the object's type with
expect(first.type).toBe("reasoning-delta") and then assert the delta value with
expect(first.delta).toBe("Thinking"), referencing the variable first in the
failing test to get clear error messages.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 253504b and c2a0b12.

⛔ Files ignored due to path filters (1)
  • example/convex/_generated/api.d.ts is excluded by !**/_generated/**
📒 Files selected for processing (4)
  • example/ui/chat/StreamingDemo.tsx
  • src/backwards-compat.test.ts
  • src/client/streaming.integration.test.ts
  • src/react/useSmoothText.ts

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@example/ui/chat/StreamingDemo.tsx`:
- Around line 172-179: The effect clears httpText as soon as httpStreaming
becomes false, which can create a visible gap; change the guard to only clear
when a matching finalized assistant message exists in useUIMessages (e.g., find
the last message with role "assistant" and a finished/success flag or matching
message id produced by onStepFinish). In the useEffect referencing httpText,
httpStreaming, and useUIMessages, replace the simple (!httpStreaming) check with
a validation that the latest assistant message is present and marked
finished/success before calling setHttpText("") so the streamed text remains
visible until the persisted message is available.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2a0b12 and 75945d8.

📒 Files selected for processing (1)
  • example/ui/chat/StreamingDemo.tsx

@sethconvex sethconvex force-pushed the fix/http-streaming-duplication branch from 75945d8 to d71ed1f Compare February 24, 2026 05:17
@sethconvex sethconvex changed the title Fix HTTP streaming duplication and typecheck errors Fix HTTP streaming duplication, useSmoothText snapping, and DeltaStreamer drain Feb 24, 2026
Add a streaming demo example that showcases all three streaming
patterns (delta, one-shot, HTTP) side-by-side, along with
comprehensive integration tests for DeltaStreamer and delta
consumption.

Fixes discovered during testing:
- HTTP streaming showing duplicate content: clear httpText when
  streaming ends, hide pending DB message during HTTP stream, render
  tool call parts inline in the HTTP bubble
- One-shot mode using useMutation instead of useAction
- useSmoothText animating non-streaming messages: track hasStreamed
  and snap cursor to full text when streaming was never activated
- DeltaStreamer.finish()/fail() not fully draining the self-chaining
  write queue (use while loop instead of single await)
- Unhandled rejection in #sendDelta: return instead of re-throw after
  onAsyncAbort since the fire-and-forget caller cannot catch it

Also adds backwards-compat test suite and HTTP streaming requirements
doc.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sethconvex sethconvex force-pushed the fix/http-streaming-duplication branch from d71ed1f to ca3259a Compare February 24, 2026 05:24
@sethconvex sethconvex changed the title Fix HTTP streaming duplication, useSmoothText snapping, and DeltaStreamer drain Add HTTP streaming demo, integration tests, and fix streaming bugs Feb 24, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/client/streaming.ts (1)

441-452: ⚠️ Potential issue | 🟠 Major

File parts are pushed twice in the compressed output.

When part.type === "file", the code pushes a modified copy with uint8Array: undefined (lines 443–449), then unconditionally pushes the original part at line 452. This duplicates every file part in the compressed output.

🐛 Proposed fix
     } else {
       if (part.type === "file") {
         compressed.push({
           type: "file",
           file: {
             ...part.file,
             uint8Array: undefined as unknown as Uint8Array,
           },
         });
+      } else {
+        compressed.push(part);
       }
-      compressed.push(part);
     }

The test at backwards-compat.test.ts:340-354 ("TextStreamPart compression strips Uint8Array from file parts") only verifies that file parts exist in the output, not that they appear exactly once. The fix ensures file parts use the stripped version exclusively while non-file parts are pushed as-is.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/streaming.ts` around lines 441 - 452, The loop is pushing file
parts twice: it first pushes a stripped clone (setting part.file.uint8Array to
undefined) then unconditionally pushes the original part; update the logic so
that when part.type === "file" you only push the stripped version (e.g., push
the modified object and skip the subsequent unconditional push by using an else
branch or continue). Keep the existing behavior for non-file parts (push part
as-is); look for the variables 'part' and 'compressed' in this function to make
the change.
♻️ Duplicate comments (1)
example/ui/chat/StreamingDemo.tsx (1)

172-179: Guard httpText clearing until the finalized DB message is actually present.

This was flagged in a previous review: clearing httpText immediately when httpStreaming turns false can briefly flash an empty gap before the stored message appears in useUIMessages. Consider checking that the latest assistant message has status === "success" before clearing.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 172 - 179, The current
useEffect clears httpText as soon as httpStreaming becomes false, causing a
flash before the finalized DB message is present; update the effect that watches
httpText and httpStreaming to also inspect useUIMessages (or its selector) and
only call setHttpText("") when the latest assistant message in useUIMessages has
status === "success" (i.e., confirm the persisted assistant message exists and
is successful) — modify the effect referencing useEffect, httpText,
httpStreaming, setHttpText and useUIMessages to perform that guard before
clearing.
🧹 Nitpick comments (5)
src/backwards-compat.test.ts (2)

246-271: Duplicated test scaffolding between test sections.

The defaultTestOptions, testMetadata, and beforeEach setup are defined identically in sections 3 (lines 249-271) and 8 (lines 890-912). Consider hoisting these to a shared describe block or a module-level helper to reduce duplication.

Also applies to: 886-912

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backwards-compat.test.ts` around lines 246 - 271, Hoist the duplicated
test scaffolding by moving the shared constants and setup into a single shared
scope: extract defaultTestOptions, testMetadata, and the beforeEach that
initializes t via initConvexTest() and creates threadId with createThread(ctx,
components.agent, {}) into a module-level helper or into an outer describe block
so both test sections reuse the same definitions; update the individual test
sections to remove their local copies and reference the shared
defaultTestOptions, testMetadata, and threadId (and keep TestConvex/
initConvexTest usage unchanged) to eliminate the duplicated code.

197-222: Console spy is created but never asserted — test name promises "shows deprecation".

The test is titled "shows deprecation but works at runtime" but consoleSpy is never checked with expect(consoleSpy).toHaveBeenCalled(). If the intent is to verify that a deprecation warning is emitted, an assertion is missing. If the intent is only to verify no crash, consider renaming the test.

💡 Add assertion or rename
     expect(legacyTool).toBeDefined();
 
+    // If a deprecation warning is expected:
+    // expect(consoleSpy).toHaveBeenCalled();
+
     consoleSpy.mockRestore();

Or rename:

-  test("createTool with deprecated 'args' shows deprecation but works at runtime", async () => {
+  test("createTool with deprecated 'args' works at runtime without throwing", async () => {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backwards-compat.test.ts` around lines 197 - 222, The test "createTool
with deprecated 'args' shows deprecation but works at runtime" creates a
consoleSpy but never asserts it; either add an assertion to verify the
deprecation warning (e.g., expect(consoleSpy).toHaveBeenCalled() or
toHaveBeenCalledWith(...) after creating legacyTool) or rename the test to
remove "shows deprecation" from the title if you only intend to verify runtime
compatibility; locate the consoleSpy, createTool/createToolCompat and legacyTool
usage in the test and update accordingly.
docs/http-streaming-requirements.md (2)

85-91: Consider using a stricter type than any[] for parts in the response format spec.

The parts: any[] in REQ-HTTP-6 is intentionally flexible, but the spec could reference the actual union (UIMessageChunk[] | TextStreamPart[]) to be more precise, since the format is known per-stream.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/http-streaming-requirements.md` around lines 85 - 91, The spec currently
types the response object field `parts` as `any[]`; update REQ-HTTP-6 to use the
known union type instead by replacing `parts: any[]` with the more precise
`parts: UIMessageChunk[] | TextStreamPart[]` (or equivalent union type name used
in the doc) so consumers can rely on the concrete shape per stream; ensure this
change is applied where the response format object with `streamId`, `start`, and
`end` is defined and update any nearby examples or descriptions to reference
`UIMessageChunk` and `TextStreamPart`.

35-52: Add language identifiers to fenced code blocks.

The markdown linter flags these blocks (lines 35 and 120) for missing language specifiers (MD040). Use ```text for the ASCII state diagram and ```text or similar for the index query snippet at line 120.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/http-streaming-requirements.md` around lines 35 - 52, The markdown
linter error MD040 is caused by missing language identifiers on the fenced code
blocks showing the ASCII state diagram and the index query snippet; update those
fenced blocks (the one containing the diagram starting with "create() addDelta()
(with heartbeat)" and the index query snippet later in the file) to include a
language tag such as ```text (or another appropriate language) so the blocks
become ```text ... ``` and satisfy the linter.
example/ui/chat/StreamingDemo.tsx (1)

224-267: Consider extracting the HTTP bubble IIFE into a named component.

The inline IIFE {httpText && (() => { ... })()} is functional but hurts readability. A small HttpStreamBubble component would make the JSX tree cleaner and the logic easier to follow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 224 - 267, Extract the inline
IIFE that renders the HTTP bubble into a new named React component (e.g.,
HttpStreamBubble) to improve readability: create a component that accepts props
{httpText, httpStreaming, messages} (or the minimal values like
pending/toolParts/httpStreaming/httpText), move the logic that finds pending =
messages.find(...) and computes toolParts into that component, and render the
same JSX (including classes, spans, and mapping over toolParts) inside it; then
replace the original `{httpText && (() => { ... })()}` with a simple conditional
render like `{httpText && <HttpStreamBubble httpText={httpText}
httpStreaming={httpStreaming} messages={messages} />}` ensuring keys still use
p.toolCallId and behavior remains identical.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@example/convex/chat/streamingDemo.ts`:
- Around line 85-96: streamOverHttp currently skips authorization and runtime
input validation; update it to call authorizeThreadAccess(ctx, threadId) after
resolving or creating the thread (use createThread and components.agent as
already done), validate the parsed body at runtime to ensure body.prompt is a
non-empty string and return a 400 response on invalid input, and avoid unguarded
non-null assertions on result.promptMessageId by only setting the "X-Message-Id"
header when result.promptMessageId is present (or include a fallback/omit header
and return a clear error if promptMessageId is required); keep the call to
agent.streamText but pass a validated body to it and return appropriate 401/400
responses for auth/validation failures.

In `@example/ui/chat/StreamingDemo.tsx`:
- Around line 352-354: The current derivation of httpUrl from convexUrl (const
convexUrl and const httpUrl in StreamingDemo.tsx) assumes convexUrl always ends
with ".cloud" and blindly does convexUrl.replace(/\.cloud$/, ".site"); make this
defensive: check convexUrl.endsWith(".cloud") before replacing and either (a)
replace only when that check passes, (b) provide a clear fallback (e.g., use
convexUrl as-is or derive via known mapping for custom/local hosts), or (c)
throw/log a descriptive error so the failure is explicit; update the logic that
sets httpUrl and add a short comment documenting the assumption and chosen
fallback behavior.

In `@src/client/streaming.integration.test.ts`:
- Around line 596-601: The test currently sets onAsyncAbort to an async function
that throws, which can create an unhandled rejection because `#sendDelta` returns
after awaiting this.config.onAsyncAbort(...) and the call is effectively
fire-and-forget; change the test's onAsyncAbort to the safe no-op pattern used
in defaultTestOptions (e.g., onAsyncAbort: async () => {}), or otherwise ensure
the handler does not throw (or is wrapped to swallow errors) so that aborted
delta writes cannot produce unhandled promise rejections.

---

Outside diff comments:
In `@src/client/streaming.ts`:
- Around line 441-452: The loop is pushing file parts twice: it first pushes a
stripped clone (setting part.file.uint8Array to undefined) then unconditionally
pushes the original part; update the logic so that when part.type === "file" you
only push the stripped version (e.g., push the modified object and skip the
subsequent unconditional push by using an else branch or continue). Keep the
existing behavior for non-file parts (push part as-is); look for the variables
'part' and 'compressed' in this function to make the change.

---

Duplicate comments:
In `@example/ui/chat/StreamingDemo.tsx`:
- Around line 172-179: The current useEffect clears httpText as soon as
httpStreaming becomes false, causing a flash before the finalized DB message is
present; update the effect that watches httpText and httpStreaming to also
inspect useUIMessages (or its selector) and only call setHttpText("") when the
latest assistant message in useUIMessages has status === "success" (i.e.,
confirm the persisted assistant message exists and is successful) — modify the
effect referencing useEffect, httpText, httpStreaming, setHttpText and
useUIMessages to perform that guard before clearing.

---

Nitpick comments:
In `@docs/http-streaming-requirements.md`:
- Around line 85-91: The spec currently types the response object field `parts`
as `any[]`; update REQ-HTTP-6 to use the known union type instead by replacing
`parts: any[]` with the more precise `parts: UIMessageChunk[] |
TextStreamPart[]` (or equivalent union type name used in the doc) so consumers
can rely on the concrete shape per stream; ensure this change is applied where
the response format object with `streamId`, `start`, and `end` is defined and
update any nearby examples or descriptions to reference `UIMessageChunk` and
`TextStreamPart`.
- Around line 35-52: The markdown linter error MD040 is caused by missing
language identifiers on the fenced code blocks showing the ASCII state diagram
and the index query snippet; update those fenced blocks (the one containing the
diagram starting with "create() addDelta() (with heartbeat)" and the index query
snippet later in the file) to include a language tag such as ```text (or another
appropriate language) so the blocks become ```text ... ``` and satisfy the
linter.

In `@example/ui/chat/StreamingDemo.tsx`:
- Around line 224-267: Extract the inline IIFE that renders the HTTP bubble into
a new named React component (e.g., HttpStreamBubble) to improve readability:
create a component that accepts props {httpText, httpStreaming, messages} (or
the minimal values like pending/toolParts/httpStreaming/httpText), move the
logic that finds pending = messages.find(...) and computes toolParts into that
component, and render the same JSX (including classes, spans, and mapping over
toolParts) inside it; then replace the original `{httpText && (() => { ...
})()}` with a simple conditional render like `{httpText && <HttpStreamBubble
httpText={httpText} httpStreaming={httpStreaming} messages={messages} />}`
ensuring keys still use p.toolCallId and behavior remains identical.

In `@src/backwards-compat.test.ts`:
- Around line 246-271: Hoist the duplicated test scaffolding by moving the
shared constants and setup into a single shared scope: extract
defaultTestOptions, testMetadata, and the beforeEach that initializes t via
initConvexTest() and creates threadId with createThread(ctx, components.agent,
{}) into a module-level helper or into an outer describe block so both test
sections reuse the same definitions; update the individual test sections to
remove their local copies and reference the shared defaultTestOptions,
testMetadata, and threadId (and keep TestConvex/ initConvexTest usage unchanged)
to eliminate the duplicated code.
- Around line 197-222: The test "createTool with deprecated 'args' shows
deprecation but works at runtime" creates a consoleSpy but never asserts it;
either add an assertion to verify the deprecation warning (e.g.,
expect(consoleSpy).toHaveBeenCalled() or toHaveBeenCalledWith(...) after
creating legacyTool) or rename the test to remove "shows deprecation" from the
title if you only intend to verify runtime compatibility; locate the consoleSpy,
createTool/createToolCompat and legacyTool usage in the test and update
accordingly.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 75945d8 and ca3259a.

⛔ Files ignored due to path filters (1)
  • example/convex/_generated/api.d.ts is excluded by !**/_generated/**
📒 Files selected for processing (8)
  • docs/http-streaming-requirements.md
  • example/convex/chat/streamingDemo.ts
  • example/ui/chat/StreamingDemo.tsx
  • example/ui/main.tsx
  • src/backwards-compat.test.ts
  • src/client/streaming.integration.test.ts
  • src/client/streaming.ts
  • src/react/useSmoothText.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/react/useSmoothText.ts

Comment on lines 85 to 96
export const streamOverHttp = httpAction(async (ctx, request) => {
const body = (await request.json()) as {
threadId?: string;
prompt: string;
};
const threadId =
body.threadId ?? (await createThread(ctx, components.agent));
const result = await agent.streamText(ctx, { threadId }, body);
const response = result.toTextStreamResponse();
response.headers.set("X-Message-Id", result.promptMessageId!);
return response;
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

HTTP endpoint lacks authentication and input validation.

streamOverHttp doesn't call authorizeThreadAccess (unlike every other handler), and the request body is cast without runtime validation. A missing prompt would propagate undefined into agent.streamText. While this is a demo, it's worth noting:

  1. No auth — any caller can stream from any thread.
  2. No body validation — malformed requests will produce opaque errors.
  3. result.promptMessageId! — will set the header to "undefined" if promptMessageId is absent.
🛡️ Minimal validation sketch
 export const streamOverHttp = httpAction(async (ctx, request) => {
-  const body = (await request.json()) as {
-    threadId?: string;
-    prompt: string;
-  };
+  const body = await request.json();
+  if (!body || typeof body.prompt !== "string") {
+    return new Response("Missing required field: prompt", { status: 400 });
+  }
   const threadId =
     body.threadId ?? (await createThread(ctx, components.agent));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/convex/chat/streamingDemo.ts` around lines 85 - 96, streamOverHttp
currently skips authorization and runtime input validation; update it to call
authorizeThreadAccess(ctx, threadId) after resolving or creating the thread (use
createThread and components.agent as already done), validate the parsed body at
runtime to ensure body.prompt is a non-empty string and return a 400 response on
invalid input, and avoid unguarded non-null assertions on result.promptMessageId
by only setting the "X-Message-Id" header when result.promptMessageId is present
(or include a fallback/omit header and return a clear error if promptMessageId
is required); keep the call to agent.streamText but pass a validated body to it
and return appropriate 401/400 responses for auth/validation failures.

Comment on lines 352 to 354
const convexUrl = import.meta.env.VITE_CONVEX_URL as string;
// Derive the HTTP actions URL from the Convex deployment URL
const httpUrl = convexUrl.replace(/\.cloud$/, ".site");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fragile URL derivation: .cloud.site replacement.

convexUrl.replace(/\.cloud$/, ".site") silently produces a wrong URL if the deployment URL doesn't end with .cloud (e.g., local dev, custom domains, or future URL scheme changes). Consider a more defensive approach or documenting this assumption.

💡 Example: fail-fast guard
   const convexUrl = import.meta.env.VITE_CONVEX_URL as string;
-  const httpUrl = convexUrl.replace(/\.cloud$/, ".site");
+  if (!convexUrl.endsWith(".cloud")) {
+    console.warn("Unexpected Convex URL format; HTTP streaming may not work:", convexUrl);
+  }
+  const httpUrl = convexUrl.replace(/\.cloud$/, ".site");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 352 - 354, The current
derivation of httpUrl from convexUrl (const convexUrl and const httpUrl in
StreamingDemo.tsx) assumes convexUrl always ends with ".cloud" and blindly does
convexUrl.replace(/\.cloud$/, ".site"); make this defensive: check
convexUrl.endsWith(".cloud") before replacing and either (a) replace only when
that check passes, (b) provide a clear fallback (e.g., use convexUrl as-is or
derive via known mapping for custom/local hosts), or (c) throw/log a descriptive
error so the failure is explicit; update the logic that sets httpUrl and add a
short comment documenting the assumption and chosen fallback behavior.

sethconvex and others added 3 commits February 23, 2026 22:30
- Add `streamId` to `GenerationOutputMetadata` for deduplication
- Add `httpStreamText()` and `httpStreamUIMessages()` standalone helpers
- Add `agent.asHttpAction()` factory method for HTTP streaming endpoints
- Add `useHttpStream()` React hook with `streamId`/`messageId`/`abort()`
- Extract standalone `generateText()` (Agent.generateText now delegates)
- Export standalone `streamText()` and `generateText()` from barrel
- Add `consumeTextStream()` and `supportsStreaming()` React utilities
- Update example to use `asHttpAction()`, `useHttpStream()`, `skipStreamIds`
- Fix HTTP streaming duplicate bubble regression (gate on `httpStreaming`)
- Add tests for all new code paths (16 new tests)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- New streaming demo agent with approval tools (deleteFile, transferMoney, checkBalance)
- Backend: submitApproval, triggerContinuation, continueAfterApprovals endpoints
- UI: approval buttons, denial reason input, auto-continuation, disabled input while pending
- asHttpAction() now accepts promptMessageId for HTTP streaming continuations
- HTTP mode continuations stay in HTTP mode instead of falling back to delta streaming

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rejection

- Add console.warn when Convex URL doesn't end with .cloud (HTTP streaming)
- Replace throwing onAsyncAbort in integration test with no-op to prevent
  unhandled promise rejections from fire-and-forget delta writes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (2)
example/convex/chat/streamingDemo.ts (1)

88-90: ⚠️ Potential issue | 🟡 Minor

HTTP demo endpoint still lacks auth/validation.

streamOverHttp exposes an unauthenticated, unvalidated entry point. Consider wiring authorize in asHttpAction() and validating the body before streaming.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/convex/chat/streamingDemo.ts` around lines 88 - 90, streamOverHttp
currently exposes streamingDemoAgent.asHttpAction() without authentication or
input validation; update the handler to enforce auth and validate the request
body before streaming. Wire the authorize check into
streamingDemoAgent.asHttpAction() (or wrap the returned action from
asHttpAction() with an authorization middleware) and add body validation logic
to ensure required fields/types are present and sanitized before calling
httpAction(...). Make sure the symbols referenced are
streamingDemoAgent.asHttpAction, streamOverHttp, and httpAction so reviewers can
find and test the auth and validation changes.
example/ui/chat/StreamingDemo.tsx (1)

142-144: ⚠️ Potential issue | 🟡 Minor

Fragile .cloud.site URL derivation.

This assumes a .cloud deployment URL and can break on custom/local URLs. Consider guarding or documenting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 142 - 144, The code naively
derives httpUrl by replacing ".cloud" with ".site" on convexUrl (variables
convexUrl and httpUrl in StreamingDemo.tsx), which breaks for custom/local URLs;
update the logic to first validate/parse convexUrl (e.g., check hostname
endsWith(".cloud") or parse origin) and only perform the suffix replacement when
it truly is a cloud deployment, otherwise leave convexUrl unchanged or compute a
documented fallback, and add a short comment explaining the behavior for
custom/local URLs so callers know when replacement will occur.
🧹 Nitpick comments (1)
src/client/generateText.ts (1)

66-70: options parameter in prepareStep shadows the outer function's options.

The callback parameter options (AI SDK's PrepareStepOptions) shadows the outer options argument. Functionally correct and consistent with the analogous pattern in streamText.ts, but renaming it (e.g., stepOptions) would improve readability.

♻️ Proposed rename
-      prepareStep: async (options) => {
-        const result = await generateTextArgs.prepareStep?.(options);
-        call.updateModel(result?.model ?? options.model);
-        return result;
+      prepareStep: async (stepOptions) => {
+        const result = await generateTextArgs.prepareStep?.(stepOptions);
+        call.updateModel(result?.model ?? stepOptions.model);
+        return result;
       },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/generateText.ts` around lines 66 - 70, The prepareStep callback
parameter shadows the outer options variable; rename the inner parameter in
prepareStep to stepOptions (or similar) and update all uses inside the callback
(e.g., change result?.model ?? options.model to result?.model ??
stepOptions.model) and keep the call to call.updateModel(result?.model ??
stepOptions.model) so it matches the outer generateText function's options
naming and mirrors streamText.ts's pattern; update the function signature at
generateTextArgs.prepareStep and any references to the old parameter name within
that callback.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@example/convex/agents/streamingDemo.ts`:
- Around line 59-60: The override for callSettings currently replaces
defaultConfig.callSettings entirely; instead merge the existing
defaultConfig.callSettings with the new temperature so other settings (e.g.,
maxTokens) are preserved: update the object where defaultConfig and callSettings
are combined to construct callSettings by shallow-merging
defaultConfig.callSettings with temperature set to 0 (ensure you handle the case
where defaultConfig.callSettings may be undefined), leaving other defaultConfig
properties unchanged.

In `@src/client/http.test.ts`:
- Around line 33-269: This file exports many test actions (e.g.,
testStreamTextStandalone, testStreamTextWithoutDeltas,
testGenerateTextStandalone, testAgentGenerateTextDelegation,
testAsHttpActionParsesBody, etc.), which triggers the Biome noExportsInTest
rule; fix by either (A) turning these exported action declarations into
non-exported/local declarations and exporting a single test registry/importing
them elsewhere, or (B) move all action exports into a separate helper module
(create e.g., testActions.ts that exports the actions) and import them here, or
(C) add a file-level lint suppression comment to disable noExportsInTest for
this file if intentional—update the declarations that reference createThread,
agent.asHttpAction, streamText, generateText, etc., accordingly.

In `@src/client/http.ts`:
- Around line 77-79: When auto-creating a thread you must propagate
options.userId into the createThread call so newly created threads are
associated with the caller; update the expression that sets threadId (which
currently uses options.threadId ?? (await createThread(ctx, component))) to pass
options.userId to createThread (e.g., await createThread(ctx, component,
options.userId) or the appropriate param/object shape), and apply the same
change for the other auto-create site that mirrors lines 123-125; keep
references to threadId, options.threadId, options.userId, createThread, ctx, and
component so the intended parameters are clear.
- Around line 80-88: The current calls to streamText (the ones passing
streamTextArgs and options plus threadId) set saveStreamDeltas:true elsewhere
which causes the HTTP response to buffer until completion; for HTTP flows change
these call sites (the streamText invocations around the
streamTextArgs/options/threadId usage) to force returnImmediately: true when
using the HTTP client (or explicitly set saveStreamDeltas:false) so the response
is streamed immediately; update the three call sites (the streamText(...) calls
shown and the similar calls at the other two locations) to merge in
returnImmediately: true into the options object you pass to streamText.
- Around line 16-27: HttpStreamOptions.threadId should be a Convex Id<"threads">
but you must update the shape that produces it: change the vThreadDoc
validator's _id to Id<"threads"> so ThreadDoc uses Id<"threads">, update the
createThread mutation in src/component/threads.ts to return that ThreadDoc type
(with _id: Id<"threads">), and then update all client-side function signatures
that accept threadId (e.g. HttpStreamOptions.threadId, any functions in
src/client/threads.ts and src/client/index.ts) to use Id<"threads">; fix any
places that currently treat _id as string by converting or adjusting types/casts
so types align end-to-end.

In `@src/client/index.ts`:
- Around line 1655-1724: The handler currently casts request.json() without
validation which can lead to runtime errors; validate the parsed body inside
asHttpAction (before using threadId/prompt/promptMessageId/messages) by checking
JSON parse success, ensuring types (prompt is string if present, messages is
array if present), and enforcing that at least one of prompt, messages, or
promptMessageId is provided—if validation fails return a 400 response. Also
sanitize/marshal messages via the existing toModelMessage before passing into
streamText, and only call createThread/createThread(ctx, this.component, {
userId: userId ?? null }) after validation succeeds. Use the symbols
request.json(), toModelMessage, createThread, and streamText to locate the
relevant code to modify.

In `@src/react/httpStreamUtils.ts`:
- Around line 1-6: Update the JSDoc to correct the inaccuracy: don't show
TextDecoder({ stream: true }) as a constructor call—explain that a TextDecoder
is constructed (new TextDecoder()) and the option { stream: true } is passed to
decoder.decode(value, { stream: true }) when decoding chunks; update the comment
in the top-of-file docblock that describes the behavior of the TextDecoder and
reference the decoder.decode usage so the doc matches the actual implementation
in this module.
- Around line 16-31: The loop currently only checks options.signal?.aborted
before each reader.read(), so a pending read isn’t interrupted; add an 'abort'
listener on options.signal (when present) that calls reader.cancel() to unblock
reader.read(), attach it before entering the while loop, and remove it in the
finally block (alongside reader.releaseLock()); also ensure you handle/ignore
the abort-specific error from reader.read() (e.g., DOMException named
"AbortError") so it doesn’t propagate, keeping the existing decoder.flush
(decoder.decode()) and options.onChunk usage intact.

In `@src/react/useHttpStream.ts`:
- Around line 65-139: The finally block in send() can set isStreaming false for
an earlier request when multiple sends overlap; fix by introducing a per-request
identifier (e.g., incrementing requestIdRef) that you set when creating
controller and check in the finally before calling setIsStreaming(false) and
clearing abortControllerRef: generate a new id at start of send(), store it on a
ref (requestIdRef.current = id), and in the finally only run
setIsStreaming(false) and null out abortControllerRef if requestIdRef.current
=== id (and optionally clear requestIdRef when matched) so only the latest
request can flip isStreaming or clear the controller.

---

Duplicate comments:
In `@example/convex/chat/streamingDemo.ts`:
- Around line 88-90: streamOverHttp currently exposes
streamingDemoAgent.asHttpAction() without authentication or input validation;
update the handler to enforce auth and validate the request body before
streaming. Wire the authorize check into streamingDemoAgent.asHttpAction() (or
wrap the returned action from asHttpAction() with an authorization middleware)
and add body validation logic to ensure required fields/types are present and
sanitized before calling httpAction(...). Make sure the symbols referenced are
streamingDemoAgent.asHttpAction, streamOverHttp, and httpAction so reviewers can
find and test the auth and validation changes.

In `@example/ui/chat/StreamingDemo.tsx`:
- Around line 142-144: The code naively derives httpUrl by replacing ".cloud"
with ".site" on convexUrl (variables convexUrl and httpUrl in
StreamingDemo.tsx), which breaks for custom/local URLs; update the logic to
first validate/parse convexUrl (e.g., check hostname endsWith(".cloud") or parse
origin) and only perform the suffix replacement when it truly is a cloud
deployment, otherwise leave convexUrl unchanged or compute a documented
fallback, and add a short comment explaining the behavior for custom/local URLs
so callers know when replacement will occur.

---

Nitpick comments:
In `@src/client/generateText.ts`:
- Around line 66-70: The prepareStep callback parameter shadows the outer
options variable; rename the inner parameter in prepareStep to stepOptions (or
similar) and update all uses inside the callback (e.g., change result?.model ??
options.model to result?.model ?? stepOptions.model) and keep the call to
call.updateModel(result?.model ?? stepOptions.model) so it matches the outer
generateText function's options naming and mirrors streamText.ts's pattern;
update the function signature at generateTextArgs.prepareStep and any references
to the old parameter name within that callback.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ca3259a and 5518ef9.

⛔ Files ignored due to path filters (1)
  • example/convex/_generated/api.d.ts is excluded by !**/_generated/**
📒 Files selected for processing (14)
  • example/convex/agents/streamingDemo.ts
  • example/convex/chat/streamingDemo.ts
  • example/convex/http.ts
  • example/ui/chat/StreamingDemo.tsx
  • src/client/generateText.ts
  • src/client/http.test.ts
  • src/client/http.ts
  • src/client/index.ts
  • src/client/streamText.ts
  • src/client/types.ts
  • src/react/httpStreamUtils.test.ts
  • src/react/httpStreamUtils.ts
  • src/react/index.ts
  • src/react/useHttpStream.ts

Comment on lines +59 to +60
...defaultConfig,
callSettings: { temperature: 0 },
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

callSettings override replaces the entire object from defaultConfig, silently dropping any other settings.

If defaultConfig.callSettings contains properties beyond temperature (e.g., maxTokens), callSettings: { temperature: 0 } will silently discard them.

♻️ Proposed fix
-  ...defaultConfig,
-  callSettings: { temperature: 0 },
+  ...defaultConfig,
+  callSettings: { ...defaultConfig.callSettings, temperature: 0 },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
...defaultConfig,
callSettings: { temperature: 0 },
...defaultConfig,
callSettings: { ...defaultConfig.callSettings, temperature: 0 },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/convex/agents/streamingDemo.ts` around lines 59 - 60, The override
for callSettings currently replaces defaultConfig.callSettings entirely; instead
merge the existing defaultConfig.callSettings with the new temperature so other
settings (e.g., maxTokens) are preserved: update the object where defaultConfig
and callSettings are combined to construct callSettings by shallow-merging
defaultConfig.callSettings with temperature set to 0 (ensure you handle the case
where defaultConfig.callSettings may be undefined), leaving other defaultConfig
properties unchanged.

Comment on lines +33 to +269
export const testStreamTextStandalone = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
const result = await streamText(
ctx,
components.agent,
{
model: mockModel({
content: [{ type: "text", text: "standalone stream text" }],
}),
prompt: "test prompt",
},
{
agentName: "standalone-test",
threadId,
saveStreamDeltas: true,
},
);
await result.consumeStream();
return {
text: await result.text,
promptMessageId: result.promptMessageId,
streamId: result.streamId,
order: result.order,
};
},
});

export const testStreamTextWithoutDeltas = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
const result = await streamText(
ctx,
components.agent,
{
model: mockModel({
content: [{ type: "text", text: "no deltas stream" }],
}),
prompt: "test prompt",
},
{
agentName: "standalone-test",
threadId,
},
);
await result.consumeStream();
return {
text: await result.text,
promptMessageId: result.promptMessageId,
streamId: result.streamId,
order: result.order,
};
},
});

export const testGenerateTextStandalone = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
const result = await generateText(
ctx,
components.agent,
{
model: mockModel({
content: [{ type: "text", text: "standalone generate text" }],
}),
prompt: "test prompt",
},
{
agentName: "standalone-test",
threadId,
},
);
return {
text: result.text,
promptMessageId: result.promptMessageId,
order: result.order,
};
},
});

export const testAgentGenerateTextDelegation = action({
args: {},
handler: async (ctx) => {
const { thread } = await agent.createThread(ctx, { userId: "user1" });
const result = await thread.generateText({
prompt: "test prompt",
});
return {
text: result.text,
promptMessageId: result.promptMessageId,
order: result.order,
savedMessages: result.savedMessages?.map((m) => m._id),
};
},
});

export const testAsHttpActionParsesBody = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
const handler = agent.asHttpAction();
const request = new Request("https://example.com/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ threadId, prompt: "Hello" }),
});
const response = await handler(ctx as any, request);
const text = await response.text();
return {
status: response.status,
hasText: text.length > 0,
hasMessageId: response.headers.has("X-Message-Id"),
};
},
});

export const testAsHttpActionCreatesThread = action({
args: {},
handler: async (ctx) => {
const handler = agent.asHttpAction();
const request = new Request("https://example.com/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt: "Hello" }),
});
const response = await handler(ctx as any, request);
const text = await response.text();
return {
status: response.status,
hasText: text.length > 0,
hasMessageId: response.headers.has("X-Message-Id"),
};
},
});

export const testAsHttpActionWithCorsHeaders = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
const handler = agent.asHttpAction({
corsHeaders: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Expose-Headers": "X-Message-Id, X-Stream-Id",
},
});
const request = new Request("https://example.com/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ threadId, prompt: "Hello" }),
});
const response = await handler(ctx as any, request);
await response.text();
return {
status: response.status,
corsOrigin: response.headers.get("Access-Control-Allow-Origin"),
corsExpose: response.headers.get("Access-Control-Expose-Headers"),
hasMessageId: response.headers.has("X-Message-Id"),
};
},
});

export const testAsHttpActionWithSaveDeltas = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
const handler = agent.asHttpAction({
saveStreamDeltas: true,
});
const request = new Request("https://example.com/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ threadId, prompt: "Hello" }),
});
const response = await handler(ctx as any, request);
await response.text();
return {
status: response.status,
hasStreamId: response.headers.has("X-Stream-Id"),
hasMessageId: response.headers.has("X-Message-Id"),
};
},
});

export const testAsHttpActionUIMessages = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
const handler = agent.asHttpAction({
format: "ui-messages",
});
const request = new Request("https://example.com/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ threadId, prompt: "Hello" }),
});
const response = await handler(ctx as any, request);
const text = await response.text();
return {
status: response.status,
hasText: text.length > 0,
hasMessageId: response.headers.has("X-Message-Id"),
// UI message stream format is different from plain text (it contains
// structured data, not just the raw text)
textDiffers: !text.includes("Hello from mock") || text.length > "Hello from mock".length,
};
},
});

export const testAsHttpActionWithPromptMessageId = action({
args: {},
handler: async (ctx) => {
const threadId = await createThread(ctx, components.agent, {});
// First, save a user message to get a promptMessageId
const { messageId } = await agent.saveMessage(ctx, {
threadId,
prompt: "Hello",
skipEmbeddings: true,
});
// Then use the HTTP action with promptMessageId (continuation pattern)
const handler = agent.asHttpAction();
const request = new Request("https://example.com/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ threadId, promptMessageId: messageId }),
});
const response = await handler(ctx as any, request);
const text = await response.text();
return {
status: response.status,
hasText: text.length > 0,
hasMessageId: response.headers.has("X-Message-Id"),
};
},
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Biome noExportsInTest is failing here.

The test file exports many actions, which violates the lint rule. Consider moving the exported actions to a helper module (and importing them) or suppressing the rule for this file.

🧰 Tools
🪛 Biome (2.4.4)

[error] 33-60: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 62-88: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 90-114: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 116-130: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 132-150: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 152-169: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 171-195: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 197-217: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 219-242: Do not export from a test file.

(lint/suspicious/noExportsInTest)


[error] 244-269: Do not export from a test file.

(lint/suspicious/noExportsInTest)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/http.test.ts` around lines 33 - 269, This file exports many test
actions (e.g., testStreamTextStandalone, testStreamTextWithoutDeltas,
testGenerateTextStandalone, testAgentGenerateTextDelegation,
testAsHttpActionParsesBody, etc.), which triggers the Biome noExportsInTest
rule; fix by either (A) turning these exported action declarations into
non-exported/local declarations and exporting a single test registry/importing
them elsewhere, or (B) move all action exports into a separate helper module
(create e.g., testActions.ts that exports the actions) and import them here, or
(C) add a file-level lint suppression comment to disable noExportsInTest for
this file if intentional—update the declarations that reference createThread,
agent.asHttpAction, streamText, generateText, etc., accordingly.

Comment on lines +16 to +27
export type HttpStreamOptions = Options & {
/** The agent name attributed to messages. */
agentName: string;
/** The user to associate with the thread / messages. */
userId?: string | null;
/** The thread to continue. If omitted, a new thread is created. */
threadId?: string;
/**
* Whether to save incremental data (deltas) from streaming responses
* to the database alongside the HTTP stream. Defaults to false.
*/
saveStreamDeltas?: boolean | StreamingOptions;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the full context of src/client/http.ts
cat -n src/client/http.ts | head -50

Repository: get-convex/agent

Length of output: 1846


🏁 Script executed:

# Check if there are existing imports from convex in this file
rg "from ['\"]convex" src/client/http.ts

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Search for how threadId is used in the codebase
rg "threadId" src/client/http.ts -A 3 -B 1

Repository: get-convex/agent

Length of output: 1309


🏁 Script executed:

# Look for Convex schema to confirm table names
fd "schema" -t f | head -20

Repository: get-convex/agent

Length of output: 108


🏁 Script executed:

# Check for any type definitions related to threads
rg "threads" src/ -t ts --glob="*schema*" -i

Repository: get-convex/agent

Length of output: 559


🏁 Script executed:

# Find and examine the createThread function
fd "threads" -t f src/client/ | head -10

Repository: get-convex/agent

Length of output: 81


🏁 Script executed:

# Check the createThread function definition
rg "createThread" src/client/ -A 5 -B 1 -t ts

Repository: get-convex/agent

Length of output: 28593


🏁 Script executed:

# Check if Id type is already used elsewhere in the codebase
rg "Id<" src/client/ -t ts | head -20

Repository: get-convex/agent

Length of output: 506


🏁 Script executed:

# Get the full createThread function implementation
cat -n src/client/threads.ts | head -100

Repository: get-convex/agent

Length of output: 2909


🏁 Script executed:

# Check the complete return type of createThread in threads.ts
rg "export async function createThread" src/client/threads.ts -A 20

Repository: get-convex/agent

Length of output: 656


🏁 Script executed:

# See if there are explicit return type annotations in threads.ts
rg "function createThread" src/client/threads.ts -B 5 -A 15

Repository: get-convex/agent

Length of output: 670


🏁 Script executed:

# Check what the mutation returns and whether it's already an Id type
rg "component.threads.createThread" src/ -B 5 -A 10 -t ts | head -40

Repository: get-convex/agent

Length of output: 775


🏁 Script executed:

# Check the Convex component definition to see the actual return type of the threads mutation
fd "threads" -t f src/component/ | head -10

Repository: get-convex/agent

Length of output: 114


🏁 Script executed:

# Look at the internal threads.ts file in the component
find src/component -name "*threads*" -type f

Repository: get-convex/agent

Length of output: 114


🏁 Script executed:

# Look at the component threads definition
cat -n src/component/threads.ts | head -80

Repository: get-convex/agent

Length of output: 3033


🏁 Script executed:

# Check for the createThread mutation definition in component
rg "createThread.*=" src/component/threads.ts -A 20 -B 2

Repository: get-convex/agent

Length of output: 605


🏁 Script executed:

# Look for Id imports in the component threads file
rg "import.*Id" src/component/threads.ts

Repository: get-convex/agent

Length of output: 120


🏁 Script executed:

# Check all places where threadId is typed as string in src/client
rg "threadId.*string" src/client/ -t ts | grep -E "(type|interface|:|args)" | head -20

Repository: get-convex/agent

Length of output: 1221


🏁 Script executed:

# Check the index.ts to see all threadId type declarations
rg "threadId.*string" src/client/index.ts -B 2 -A 2

Repository: get-convex/agent

Length of output: 5311


🏁 Script executed:

# Look at how threadId is used in continueThread and other functions
rg "continueThread.*threadId" src/client/ -A 5 -B 2 | head -40

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Check if "convex/values" is the correct import path for Id
rg "from.*convex/values" src/ -t ts | head -10

Repository: get-convex/agent

Length of output: 720


🏁 Script executed:

# Check how Id is imported elsewhere in src/client
rg "import.*Id" src/client/ -t ts | grep -v node_modules | head -15

Repository: get-convex/agent

Length of output: 143


🏁 Script executed:

# Verify the actual return type of ctx.db.insert to confirm it returns Id<"threads">
rg "ctx.db.insert" src/component/threads.ts -B 2 -A 2

Repository: get-convex/agent

Length of output: 237


🏁 Script executed:

# Check the _generated/dataModel.js structure
head -50 src/component/_generated/dataModel.js 2>/dev/null || head -50 src/component/_generated/dataModel.ts 2>/dev/null || echo "Checking structure..."

Repository: get-convex/agent

Length of output: 1397


🏁 Script executed:

# Look at the exact import structure for Id type
rg "Id<" src/client/files.ts -B 2 -A 2

Repository: get-convex/agent

Length of output: 955


🏁 Script executed:

# Check if Id should be imported from convex or from _generated
rg "from.*_generated/dataModel" src/ -t ts | head -10

Repository: get-convex/agent

Length of output: 876


🏁 Script executed:

# Check import paths in src/client files to confirm the correct path for Id
rg "import.*from.*\.\." src/client/*.ts | grep -E "(component|_generated)" | head -10

Repository: get-convex/agent

Length of output: 700


🏁 Script executed:

# Specifically check src/client/files.ts to see the exact import pattern
head -20 src/client/files.ts

Repository: get-convex/agent

Length of output: 503


🏁 Script executed:

# Check if there's a type annotation issue - see what TypeScript infers for _id from ctx.runMutation
rg "const.*_id.*await.*ctx.runMutation" src/ -A 2 -B 2 | head -40

Repository: get-convex/agent

Length of output: 337


🏁 Script executed:

# Verify the exact return type of the component's createThread mutation
rg "returns:.*vThreadDoc" src/component/threads.ts -B 10 -A 2

Repository: get-convex/agent

Length of output: 2377


🏁 Script executed:

# Check what ThreadDoc type includes for _id
rg "ThreadDoc" src/ -t ts -A 5 | grep -E "(interface|type|_id)" | head -20

Repository: get-convex/agent

Length of output: 1182


Use Id<"threads"> for threadId throughout — this requires broader updates to the type system.

threadId should use Convex's Id<"threads"> type for strict typing as required by src/**/*.ts guidelines. However, changing only HttpStreamOptions.threadId is incomplete. The component's createThread mutation returns ThreadDoc with _id: string, and all dependent code expects string. The proper fix requires updating the validator vThreadDoc to use Id<"threads"> for _id and cascading this through:

  1. The vThreadDoc validator in src/validators.ts
  2. The createThread mutation return type in src/component/threads.ts
  3. All function signatures in src/client/ that accept threadId parameters (threads.ts, index.ts, etc.)

Otherwise, changing only HttpStreamOptions.threadId to Id<"threads"> will create type mismatches between the value returned by createThread() (string) and what the HTTP functions accept (Id<"threads">).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/http.ts` around lines 16 - 27, HttpStreamOptions.threadId should
be a Convex Id<"threads"> but you must update the shape that produces it: change
the vThreadDoc validator's _id to Id<"threads"> so ThreadDoc uses Id<"threads">,
update the createThread mutation in src/component/threads.ts to return that
ThreadDoc type (with _id: Id<"threads">), and then update all client-side
function signatures that accept threadId (e.g. HttpStreamOptions.threadId, any
functions in src/client/threads.ts and src/client/index.ts) to use
Id<"threads">; fix any places that currently treat _id as string by converting
or adjusting types/casts so types align end-to-end.

Comment on lines +77 to +79
const threadId =
options.threadId ?? (await createThread(ctx, component));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate userId when auto-creating threads.

If callers supply options.userId, the new thread is created without it, which breaks user association and context search.

Suggested fix
-  const threadId =
-    options.threadId ?? (await createThread(ctx, component));
+  const threadId =
+    options.threadId ??
+    (await createThread(ctx, component, {
+      userId: options.userId ?? null,
+    }));

Also applies to: 123-125

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/http.ts` around lines 77 - 79, When auto-creating a thread you
must propagate options.userId into the createThread call so newly created
threads are associated with the caller; update the expression that sets threadId
(which currently uses options.threadId ?? (await createThread(ctx, component)))
to pass options.userId to createThread (e.g., await createThread(ctx, component,
options.userId) or the appropriate param/object shape), and apply the same
change for the other auto-create site that mirrors lines 123-125; keep
references to threadId, options.threadId, options.userId, createThread, ctx, and
component so the intended parameters are clear.

Comment on lines +80 to +88
const result = await streamText<TOOLS, OUTPUT>(
ctx,
component,
streamTextArgs,
{
...options,
threadId,
},
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

saveStreamDeltas: true will buffer the HTTP response.

streamText awaits completion when saveStreamDeltas is true, so these helpers won’t actually stream until the generation finishes. Consider forcing returnImmediately: true for HTTP flows (or documenting the behavior).

Possible fix
+function normalizeHttpSaveStreamDeltas(
+  saveStreamDeltas?: boolean | StreamingOptions,
+): boolean | StreamingOptions | undefined {
+  if (saveStreamDeltas === true) return { returnImmediately: true };
+  if (saveStreamDeltas && typeof saveStreamDeltas === "object") {
+    return { returnImmediately: true, ...saveStreamDeltas };
+  }
+  return saveStreamDeltas;
+}

   const result = await streamText<TOOLS, OUTPUT>(
     ctx,
     component,
     streamTextArgs,
     {
       ...options,
+      saveStreamDeltas: normalizeHttpSaveStreamDeltas(options.saveStreamDeltas),
       threadId,
     },
   );

Also applies to: 126-134, 141-157

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/http.ts` around lines 80 - 88, The current calls to streamText
(the ones passing streamTextArgs and options plus threadId) set
saveStreamDeltas:true elsewhere which causes the HTTP response to buffer until
completion; for HTTP flows change these call sites (the streamText invocations
around the streamTextArgs/options/threadId usage) to force returnImmediately:
true when using the HTTP client (or explicitly set saveStreamDeltas:false) so
the response is streamed immediately; update the three call sites (the
streamText(...) calls shown and the similar calls at the other two locations) to
merge in returnImmediately: true into the options object you pass to streamText.

Comment on lines +1655 to +1724
const body = (await request.json()) as {
threadId?: string;
prompt?: string;
promptMessageId?: string;
messages?: any[];
[key: string]: unknown;
};

let userId: string | undefined;
let threadId: string | undefined = body.threadId;

if (spec?.authorize) {
const authResult = await spec.authorize(ctx_, request);
if (authResult?.userId) userId = authResult.userId;
if (authResult?.threadId) threadId = authResult.threadId;
}

const ctx = (
spec?.customCtx
? {
...ctx_,
...spec.customCtx(
ctx_,
{ userId, threadId },
{ prompt: body.prompt } as any,
),
}
: ctx_
) as GenericActionCtx<GenericDataModel> & CustomCtx;

if (!threadId) {
threadId = await createThread(ctx, this.component, {
userId: userId ?? null,
});
}

const result = await this.streamText(
ctx as ActionCtx & CustomCtx,
{ threadId, userId },
{
prompt: body.prompt,
promptMessageId: body.promptMessageId,
messages: body.messages?.map(toModelMessage),
stopWhen: spec?.stopWhen,
},
{
contextOptions: spec?.contextOptions,
storageOptions: spec?.storageOptions,
saveStreamDeltas: spec?.saveStreamDeltas,
},
);

const response =
spec?.format === "ui-messages"
? result.toUIMessageStreamResponse()
: result.toTextStreamResponse();

if (result.promptMessageId) {
response.headers.set("X-Message-Id", result.promptMessageId);
}
if (result.streamId) {
response.headers.set("X-Stream-Id", result.streamId);
}
if (spec?.corsHeaders) {
for (const [key, value] of Object.entries(spec.corsHeaders)) {
response.headers.set(key, value);
}
}

return response;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's read the actual code at the specified lines
head -n 1724 src/client/index.ts | tail -n 70 | cat -n

Repository: get-convex/agent

Length of output: 2580


🏁 Script executed:

# Let's also check the file size to understand the context
wc -l src/client/index.ts

Repository: get-convex/agent

Length of output: 84


🏁 Script executed:

# Search for the threadId and promptMessageId usage to understand their types
rg "threadId|promptMessageId" src/client/index.ts -A 2 -B 2 | head -n 100

Repository: get-convex/agent

Length of output: 3833


🏁 Script executed:

# Check if there are type definitions for threads and messages tables
fd -e ts -e tsx | xargs grep -l "tables.*threads\|tables.*messages" | head -10

Repository: get-convex/agent

Length of output: 110


🏁 Script executed:

# Look for the createThread function to understand what type it returns
ast-grep --pattern 'function createThread($_) {
  $$$
}'

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Search for streamText method to see how threadId/promptMessageId are used
rg "streamText" src/client/index.ts -B 5 -A 10

Repository: get-convex/agent

Length of output: 7258


🏁 Script executed:

# Search for the component definition and threads table
rg "threads.*:" src/component/ -A 3 | head -50

Repository: get-convex/agent

Length of output: 2729


🏁 Script executed:

# Check the threads.ts file to understand the table structure
head -n 100 src/component/threads.ts | cat -n

Repository: get-convex/agent

Length of output: 3785


🏁 Script executed:

# Search for table definitions in the component
rg "defineTable" src/component/ -A 5

Repository: get-convex/agent

Length of output: 3458


🏁 Script executed:

# Look for how threadId is actually used in the component - check return types
rg "Id<" src/component/ | head -20

Repository: get-convex/agent

Length of output: 1583


🏁 Script executed:

# Check the createThread signature in Agent class to see what it actually returns
rg "async createThread" src/client/index.ts -A 20 | head -60

Repository: get-convex/agent

Length of output: 2033


🏁 Script executed:

# Check messages.ts to see how promptMessageId is typed there
rg "promptMessageId\|messageId" src/component/messages.ts -B 2 -A 2 | head -60

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Look for how streamText handles threadId and promptMessageId parameters
rg "streamText.*threadId\|streamText.*promptMessageId" src/ -B 3 -A 10 | head -80

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Check the actual return type of createThread internal function
rg "export.*createThread" src/component/threads.ts -A 5

Repository: get-convex/agent

Length of output: 276


🏁 Script executed:

# Check if there's any validator for the HTTP body in the Agent/httpStreamText
rg "asHttpAction\|HttpStreamOptions" src/client/index.ts -B 5 -A 30 | head -100

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Look for how the internal createThread function signature and return type
rg "internal.*createThread\|export.*createThread.*:" src/component/ -B 2 -A 10

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Check the streamText signature more carefully in the actual implementation
rg "async streamText" src/component/streamText.ts -A 20 | head -60

Repository: get-convex/agent

Length of output: 127


🏁 Script executed:

# Look for internal createThread in the client to see actual return type
rg "function createThread\|const createThread =" src/client/index.ts -B 2 -A 15

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Search for internal.createThread function to understand its actual return type
rg "internal\.createThread" src/client/index.ts -B 3 -A 3

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Look for the actual import of createThread internal function
rg "from.*createThread\|import.*createThread" src/client/index.ts -B 2 -A 2

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Check how threadId is being used after createThread call in the HTTP action
sed -n '1680,1700p' src/client/index.ts | cat -n

Repository: get-convex/agent

Length of output: 768


🏁 Script executed:

# Search for function signature of createThread that's called in the HTTP action context
rg "await createThread" src/client/index.ts -B 2 -A 5

Repository: get-convex/agent

Length of output: 613


🏁 Script executed:

# Look for all imports at top of src/client/index.ts to see where createThread comes from
head -n 100 src/client/index.ts | grep -E "^import|^export" | cat -n

Repository: get-convex/agent

Length of output: 888


🏁 Script executed:

# Look at src/client/threads.ts to find the createThread function signature
head -n 150 src/client/threads.ts | cat -n

Repository: get-convex/agent

Length of output: 2909


🏁 Script executed:

# Also search for the full createThread signature in client threads
rg "export.*createThread\|function createThread" src/client/threads.ts -A 15

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Check the return type annotation (if any) of createThread in client/threads.ts
grep -A 20 "export async function createThread" src/client/threads.ts | cat -n

Repository: get-convex/agent

Length of output: 803


🏁 Script executed:

# Look for how threadId is actually used in streamText to see if it needs Id<"threads">
rg "threadId" src/client/index.ts -B 1 -A 1 | grep -A 3 "streamText\|continueThread" | head -40

Repository: get-convex/agent

Length of output: 702


🏁 Script executed:

# Check the streamText method signature to see what type it expects for threadId
rg "async streamText.*threadOpts" src/client/index.ts -A 5 | head -20

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Verify what type messages table expects for promptMessageId and threadId
grep -A 10 "messages: defineTable" src/component/schema.ts

Repository: get-convex/agent

Length of output: 433


🏁 Script executed:

# Check what type createThread actually returns by looking at the _id type
rg "_id.*threadId\|threadId.*_id" src/client/threads.ts -B 2 -A 2

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Look for how threadId is used when passed to component functions
rg "component\.threads\." src/client/index.ts -B 2 -A 5 | head -50

Repository: get-convex/agent

Length of output: 1008


🏁 Script executed:

# Check if there's any explicit type annotation on createThread's return
grep -B 5 "return threadId" src/client/threads.ts

Repository: get-convex/agent

Length of output: 187


🏁 Script executed:

# Look at the getThread signature to understand expected threadId type
rg "getThread.*args" src/component/threads.ts -A 3

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Check if there's a return type annotation anywhere for createThread
rg "export async function createThread" src/client/ -A 30 | grep -E "Promise|return"

Repository: get-convex/agent

Length of output: 209


🏁 Script executed:

# Verify what _id type would be from the mutation result
rg "createThread.*args" src/component/threads.ts -B 2 -A 10 | head -30

Repository: get-convex/agent

Length of output: 42


🏁 Script executed:

# Check streamText actual method signature to see threadOpts type
rg "threadOpts.*:" src/client/index.ts -B 5 -A 2 | head -40

Repository: get-convex/agent

Length of output: 1244


🏁 Script executed:

# Look for any casting or assertions on threadId in the HTTP action
sed -n '1655,1700p' src/client/index.ts | cat -n

Repository: get-convex/agent

Length of output: 1689


Add request body validation in asHttpAction.

The JSON body is cast without validation; invalid or missing fields will cause runtime errors downstream. Additionally, add validation to ensure at least one of prompt, messages, or promptMessageId is provided:

Suggested fix
       const body = (await request.json()) as {
         threadId?: string;
         prompt?: string;
         promptMessageId?: string;
         messages?: any[];
         [key: string]: unknown;
       };
+
+      if (
+        !body ||
+        (typeof body.prompt !== "string" &&
+          !Array.isArray(body.messages) &&
+          typeof body.promptMessageId !== "string")
+      ) {
+        return new Response("Invalid request body", { status: 400 });
+      }

Regarding ID typing: the current public API methods (streamText, generateText, etc.) accept threadId and promptMessageId as string | undefined rather than strongly-typed IDs. This is a broader API design concern that would require updating method signatures; the immediate validation above addresses the more critical issue of missing request validation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const body = (await request.json()) as {
threadId?: string;
prompt?: string;
promptMessageId?: string;
messages?: any[];
[key: string]: unknown;
};
let userId: string | undefined;
let threadId: string | undefined = body.threadId;
if (spec?.authorize) {
const authResult = await spec.authorize(ctx_, request);
if (authResult?.userId) userId = authResult.userId;
if (authResult?.threadId) threadId = authResult.threadId;
}
const ctx = (
spec?.customCtx
? {
...ctx_,
...spec.customCtx(
ctx_,
{ userId, threadId },
{ prompt: body.prompt } as any,
),
}
: ctx_
) as GenericActionCtx<GenericDataModel> & CustomCtx;
if (!threadId) {
threadId = await createThread(ctx, this.component, {
userId: userId ?? null,
});
}
const result = await this.streamText(
ctx as ActionCtx & CustomCtx,
{ threadId, userId },
{
prompt: body.prompt,
promptMessageId: body.promptMessageId,
messages: body.messages?.map(toModelMessage),
stopWhen: spec?.stopWhen,
},
{
contextOptions: spec?.contextOptions,
storageOptions: spec?.storageOptions,
saveStreamDeltas: spec?.saveStreamDeltas,
},
);
const response =
spec?.format === "ui-messages"
? result.toUIMessageStreamResponse()
: result.toTextStreamResponse();
if (result.promptMessageId) {
response.headers.set("X-Message-Id", result.promptMessageId);
}
if (result.streamId) {
response.headers.set("X-Stream-Id", result.streamId);
}
if (spec?.corsHeaders) {
for (const [key, value] of Object.entries(spec.corsHeaders)) {
response.headers.set(key, value);
}
}
return response;
const body = (await request.json()) as {
threadId?: string;
prompt?: string;
promptMessageId?: string;
messages?: any[];
[key: string]: unknown;
};
if (
!body ||
(typeof body.prompt !== "string" &&
!Array.isArray(body.messages) &&
typeof body.promptMessageId !== "string")
) {
return new Response("Invalid request body", { status: 400 });
}
let userId: string | undefined;
let threadId: string | undefined = body.threadId;
if (spec?.authorize) {
const authResult = await spec.authorize(ctx_, request);
if (authResult?.userId) userId = authResult.userId;
if (authResult?.threadId) threadId = authResult.threadId;
}
const ctx = (
spec?.customCtx
? {
...ctx_,
...spec.customCtx(
ctx_,
{ userId, threadId },
{ prompt: body.prompt } as any,
),
}
: ctx_
) as GenericActionCtx<GenericDataModel> & CustomCtx;
if (!threadId) {
threadId = await createThread(ctx, this.component, {
userId: userId ?? null,
});
}
const result = await this.streamText(
ctx as ActionCtx & CustomCtx,
{ threadId, userId },
{
prompt: body.prompt,
promptMessageId: body.promptMessageId,
messages: body.messages?.map(toModelMessage),
stopWhen: spec?.stopWhen,
},
{
contextOptions: spec?.contextOptions,
storageOptions: spec?.storageOptions,
saveStreamDeltas: spec?.saveStreamDeltas,
},
);
const response =
spec?.format === "ui-messages"
? result.toUIMessageStreamResponse()
: result.toTextStreamResponse();
if (result.promptMessageId) {
response.headers.set("X-Message-Id", result.promptMessageId);
}
if (result.streamId) {
response.headers.set("X-Stream-Id", result.streamId);
}
if (spec?.corsHeaders) {
for (const [key, value] of Object.entries(spec.corsHeaders)) {
response.headers.set(key, value);
}
}
return response;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 1655 - 1724, The handler currently casts
request.json() without validation which can lead to runtime errors; validate the
parsed body inside asHttpAction (before using
threadId/prompt/promptMessageId/messages) by checking JSON parse success,
ensuring types (prompt is string if present, messages is array if present), and
enforcing that at least one of prompt, messages, or promptMessageId is
provided—if validation fails return a 400 response. Also sanitize/marshal
messages via the existing toModelMessage before passing into streamText, and
only call createThread/createThread(ctx, this.component, { userId: userId ??
null }) after validation succeeds. Use the symbols request.json(),
toModelMessage, createThread, and streamText to locate the relevant code to
modify.

Comment on lines +1 to +6
/**
* Consume a ReadableStream of Uint8Array chunks, decoding them as text
* and calling `onChunk` for each decoded segment.
*
* Handles multi-byte characters correctly using `TextDecoder({ stream: true })`.
*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor JSDoc inaccuracy: { stream: true } is a decode() option, not a constructor argument.

The comment on line 5 reads TextDecoder({ stream: true }), which implies the option is passed to the TextDecoder constructor. The actual code correctly passes { stream: true } to decoder.decode(value, { stream: true }).

📝 Suggested fix
- * Handles multi-byte characters correctly using `TextDecoder({ stream: true })`.
+ * Handles multi-byte characters correctly using `decoder.decode(value, { stream: true })`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* Consume a ReadableStream of Uint8Array chunks, decoding them as text
* and calling `onChunk` for each decoded segment.
*
* Handles multi-byte characters correctly using `TextDecoder({ stream: true })`.
*/
/**
* Consume a ReadableStream of Uint8Array chunks, decoding them as text
* and calling `onChunk` for each decoded segment.
*
* Handles multi-byte characters correctly using `decoder.decode(value, { stream: true })`.
*/
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/react/httpStreamUtils.ts` around lines 1 - 6, Update the JSDoc to correct
the inaccuracy: don't show TextDecoder({ stream: true }) as a constructor
call—explain that a TextDecoder is constructed (new TextDecoder()) and the
option { stream: true } is passed to decoder.decode(value, { stream: true })
when decoding chunks; update the comment in the top-of-file docblock that
describes the behavior of the TextDecoder and reference the decoder.decode usage
so the doc matches the actual implementation in this module.

Comment on lines +16 to +31
try {
while (true) {
if (options.signal?.aborted) break;
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
options.onChunk(text);
}
// Flush any remaining bytes in the decoder
const remaining = decoder.decode();
if (remaining) {
options.onChunk(remaining);
}
} finally {
reader.releaseLock();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

AbortSignal is not respected during an in-progress reader.read().

The abort check (options.signal?.aborted) runs only at the top of each iteration. If the stream is slow or paused, a pending reader.read() will not be interrupted when the signal fires — the caller blocks until the next chunk arrives. Attach an 'abort' listener that proactively calls reader.cancel() to unblock the read.

🛡️ Proposed fix
 export async function consumeTextStream(
   reader: ReadableStreamDefaultReader<Uint8Array>,
   options: {
     onChunk: (text: string) => void;
     signal?: AbortSignal;
   },
 ): Promise<void> {
   const decoder = new TextDecoder();
+  const abortHandler = () => { reader.cancel().catch(() => {}); };
+  options.signal?.addEventListener('abort', abortHandler);

   try {
     while (true) {
       if (options.signal?.aborted) break;
       const { done, value } = await reader.read();
       if (done) break;
       const text = decoder.decode(value, { stream: true });
       options.onChunk(text);
     }
     // Flush any remaining bytes in the decoder
     const remaining = decoder.decode();
     if (remaining) {
       options.onChunk(remaining);
     }
   } finally {
+    options.signal?.removeEventListener('abort', abortHandler);
     reader.releaseLock();
   }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/react/httpStreamUtils.ts` around lines 16 - 31, The loop currently only
checks options.signal?.aborted before each reader.read(), so a pending read
isn’t interrupted; add an 'abort' listener on options.signal (when present) that
calls reader.cancel() to unblock reader.read(), attach it before entering the
while loop, and remove it in the finally block (alongside reader.releaseLock());
also ensure you handle/ignore the abort-specific error from reader.read() (e.g.,
DOMException named "AbortError") so it doesn’t propagate, keeping the existing
decoder.flush (decoder.decode()) and options.onChunk usage intact.

Comment on lines +65 to +139
const abortControllerRef = useRef<AbortController | null>(null);

const abort = useCallback(() => {
abortControllerRef.current?.abort();
abortControllerRef.current = null;
}, []);

const send = useCallback(
async (body: {
threadId?: string;
prompt?: string;
[key: string]: unknown;
}) => {
// Abort any existing stream
abort();

const controller = new AbortController();
abortControllerRef.current = controller;

setText("");
setError(null);
setStreamId(null);
setMessageId(null);
setIsStreaming(true);

try {
const response = await fetch(options.url, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(options.token
? { Authorization: `Bearer ${options.token}` }
: {}),
...options.headers,
},
body: JSON.stringify(body),
signal: controller.signal,
});

if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}

// Extract metadata headers
const responseStreamId = response.headers.get("X-Stream-Id");
const responseMessageId = response.headers.get("X-Message-Id");
if (responseStreamId) setStreamId(responseStreamId);
if (responseMessageId) setMessageId(responseMessageId);

if (!response.body) {
throw new Error("Response body is not readable");
}

const reader = response.body.getReader();
let accumulated = "";

await consumeTextStream(reader, {
onChunk: (chunk) => {
accumulated += chunk;
setText(accumulated);
},
signal: controller.signal,
});
} catch (e) {
if (e instanceof Error && e.name === "AbortError") {
// Intentional abort — not an error
return;
}
const err = e instanceof Error ? e : new Error(String(e));
setError(err);
} finally {
setIsStreaming(false);
if (abortControllerRef.current === controller) {
abortControllerRef.current = null;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard isStreaming against stale-finally races.

If send() is called twice quickly, the first request’s finally can flip isStreaming to false while the newer stream is active. Track a request id to ensure only the latest request updates state.

Suggested fix
   const [messageId, setMessageId] = useState<string | null>(null);
   const abortControllerRef = useRef<AbortController | null>(null);
+  const requestIdRef = useRef(0);

@@
     async (body: {
       threadId?: string;
       prompt?: string;
       [key: string]: unknown;
     }) => {
+      const requestId = ++requestIdRef.current;
       // Abort any existing stream
       abort();

@@
       } finally {
-        setIsStreaming(false);
-        if (abortControllerRef.current === controller) {
-          abortControllerRef.current = null;
-        }
+        if (requestIdRef.current === requestId) {
+          setIsStreaming(false);
+          abortControllerRef.current = null;
+        }
       }
     },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/react/useHttpStream.ts` around lines 65 - 139, The finally block in
send() can set isStreaming false for an earlier request when multiple sends
overlap; fix by introducing a per-request identifier (e.g., incrementing
requestIdRef) that you set when creating controller and check in the finally
before calling setIsStreaming(false) and clearing abortControllerRef: generate a
new id at start of send(), store it on a ref (requestIdRef.current = id), and in
the finally only run setIsStreaming(false) and null out abortControllerRef if
requestIdRef.current === id (and optionally clear requestIdRef when matched) so
only the latest request can flip isStreaming or clear the controller.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
src/client/streaming.integration.test.ts (1)

1068-1113: Nit: test name says "concurrent" but streams are created sequentially.

The streams are awaited one at a time in a for-loop (Line 1084). Consider renaming to "multiple sequential streams in same thread" to avoid confusion with truly concurrent producers, or actually parallelize the consumeStream calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/streaming.integration.test.ts` around lines 1068 - 1113, The test
name claims "concurrent" but the loop awaits each DeltaStreamer.consumeStream
sequentially; either rename the test to "multiple sequential streams in same
thread" (update the test description string) or actually run the streams
concurrently by starting all consumeStream calls without awaiting inside the
for-loop (collect the Promises from streamer.consumeStream(...) for each
DeltaStreamer instance created via new DeltaStreamer(...), then await
Promise.all on that array). Update references to DeltaStreamer, consumeStream,
streamText, and streamers accordingly so the test behavior matches the test
name.
example/ui/chat/StreamingDemo.tsx (3)

436-442: Redundant ?? ""Array.join() always returns a string.

.join("\n") never returns null or undefined, so the nullish coalescing operator is a no-op.

-    message.parts
-      .filter((p) => p.type === "reasoning")
-      .map((p) => p.text)
-      .join("\n") ?? "",
+    message.parts
+      .filter((p) => p.type === "reasoning")
+      .map((p) => p.text)
+      .join("\n"),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 436 - 442, The nullish
coalescing "?? ''" after .join("\n") is redundant because Array.prototype.join
always returns a string; remove the "?? ''" so the useSmoothText call becomes
useSmoothText(message.parts.filter((p) => p.type === "reasoning").map((p) =>
p.text).join("\n"), { startStreaming: message.status === "streaming" }); —
update the expression around useSmoothText and reasoningText to drop the
unnecessary ?? "", leaving the rest (filter/map/join and startStreaming
behavior) unchanged.

142-146: console.warn fires on every render — move to module scope or a one-time effect.

The guard runs inside the component body, so every re-render logs the warning. In Strict Mode this doubles.

🔧 Suggested fix
+const convexUrl = import.meta.env.VITE_CONVEX_URL as string;
+if (!convexUrl.endsWith(".cloud")) {
+  console.warn(
+    "Unexpected Convex URL format; HTTP streaming may not work:",
+    convexUrl,
+  );
+}
+const httpUrl = convexUrl.replace(/\.cloud$/, ".site");
+
 function ChatPanel({
   threadId,
   mode,
   reset,
 }: {
   threadId: string;
   mode: StreamMode;
   reset: () => void;
 }) {
-  const convexUrl = import.meta.env.VITE_CONVEX_URL as string;
-  if (!convexUrl.endsWith(".cloud")) {
-    console.warn("Unexpected Convex URL format; HTTP streaming may not work:", convexUrl);
-  }
-  const httpUrl = convexUrl.replace(/\.cloud$/, ".site");
-
   const httpStream = useHttpStream({ url: `${httpUrl}/streamTextDemo` });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 142 - 146, The console.warn
that checks convexUrl is running on every render inside the component; move the
check out of the render path by performing it once — either compute convexUrl
and httpUrl at module scope (using const convexUrl =
import.meta.env.VITE_CONVEX_URL as string; const httpUrl =
convexUrl.replace(/\.cloud$/, ".site"); and run the warn there) or keep the
values in the component but wrap the guard in a one-time effect (useEffect with
an empty dependency array) so the warning only fires once; refer to the
convexUrl and httpUrl identifiers and the StreamingDemo component when making
the change.

290-333: Dead ternary branch and IIFE in JSX.

This block is only entered when httpStreaming is truthy (Line 290), so the ternary on Line 303 always takes the truthy branch — the "bg-gray-200 text-gray-800" path is unreachable. Similarly, the status label on Line 309 always shows " - live".

Also, the IIFE (() => { ... })() is an unusual pattern in JSX. Extracting this into a small <HttpBubble> component (or at least a local variable above the return) would improve readability and avoid re-creating the closure every render.

🔧 Suggested simplification for the ternary
                    className={cn(
                      "rounded-lg px-4 py-2 max-w-lg whitespace-pre-wrap shadow-sm",
-                     httpStreaming
-                       ? "bg-green-100 text-gray-800"
-                       : "bg-gray-200 text-gray-800",
+                     "bg-green-100 text-gray-800",
                    )}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 290 - 333, The JSX uses an
IIFE and a dead ternary because this block only renders when httpStreaming is
truthy; extract the JSX into a small component or a local const (e.g.,
HttpBubble) that receives httpText and toolParts (derived from messages/pending)
and remove the IIFE, then simplify the className ternary in that component (use
the live styling "bg-green-100 text-gray-800" directly) and hard-code the status
label to "[HTTP stream - live]"; keep rendering of toolParts and httpText the
same and reference the existing symbols httpStreaming, httpText, messages,
pending, toolParts, and the new HttpBubble component/variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@example/ui/chat/StreamingDemo.tsx`:
- Around line 243-260: handleSend currently calls sendDelta and httpStream.send
without handling rejections; wrap the await calls to sendDelta({ threadId,
prompt: text }) and httpStream.send({ threadId, prompt: text }) in try/catch (or
append .catch(...)) so any rejection is logged/handled similarly to sendOneShot;
update the branches in handleSend to catch errors and call console.error or
route to the existing error handling pathway using requestModeRef/current state
to preserve behavior.

In `@src/client/streaming.integration.test.ts`:
- Around line 478-487: The mid-cursor selection Math.floor(endCursor / 2) can
fall inside a delta and produce zero parts, making the check a no-op; change
midCursor to snap to an actual delta boundary (use a boundary from the fetched
full set such as a part's start or end from allParts, e.g., pick the middle
part's start/end) before calling getParts/listDeltas, then assert a stricter
relationship (e.g., laterParts.length > 0 and laterParts.length <=
allParts.length) to ensure the partial fetch returns a non-empty subset; update
references to midCursor, endCursor, getParts, listDeltas, laterParts, and
allParts accordingly.

---

Nitpick comments:
In `@example/ui/chat/StreamingDemo.tsx`:
- Around line 436-442: The nullish coalescing "?? ''" after .join("\n") is
redundant because Array.prototype.join always returns a string; remove the "??
''" so the useSmoothText call becomes useSmoothText(message.parts.filter((p) =>
p.type === "reasoning").map((p) => p.text).join("\n"), { startStreaming:
message.status === "streaming" }); — update the expression around useSmoothText
and reasoningText to drop the unnecessary ?? "", leaving the rest
(filter/map/join and startStreaming behavior) unchanged.
- Around line 142-146: The console.warn that checks convexUrl is running on
every render inside the component; move the check out of the render path by
performing it once — either compute convexUrl and httpUrl at module scope (using
const convexUrl = import.meta.env.VITE_CONVEX_URL as string; const httpUrl =
convexUrl.replace(/\.cloud$/, ".site"); and run the warn there) or keep the
values in the component but wrap the guard in a one-time effect (useEffect with
an empty dependency array) so the warning only fires once; refer to the
convexUrl and httpUrl identifiers and the StreamingDemo component when making
the change.
- Around line 290-333: The JSX uses an IIFE and a dead ternary because this
block only renders when httpStreaming is truthy; extract the JSX into a small
component or a local const (e.g., HttpBubble) that receives httpText and
toolParts (derived from messages/pending) and remove the IIFE, then simplify the
className ternary in that component (use the live styling "bg-green-100
text-gray-800" directly) and hard-code the status label to "[HTTP stream -
live]"; keep rendering of toolParts and httpText the same and reference the
existing symbols httpStreaming, httpText, messages, pending, toolParts, and the
new HttpBubble component/variable.

In `@src/client/streaming.integration.test.ts`:
- Around line 1068-1113: The test name claims "concurrent" but the loop awaits
each DeltaStreamer.consumeStream sequentially; either rename the test to
"multiple sequential streams in same thread" (update the test description
string) or actually run the streams concurrently by starting all consumeStream
calls without awaiting inside the for-loop (collect the Promises from
streamer.consumeStream(...) for each DeltaStreamer instance created via new
DeltaStreamer(...), then await Promise.all on that array). Update references to
DeltaStreamer, consumeStream, streamText, and streamers accordingly so the test
behavior matches the test name.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5518ef9 and 0e4d9f0.

📒 Files selected for processing (2)
  • example/ui/chat/StreamingDemo.tsx
  • src/client/streaming.integration.test.ts

Comment on lines +243 to +260
async function handleSend() {
const text = prompt.trim();
if (!text) return;
setPrompt("");
requestModeRef.current = mode;

if (mode === "delta") {
await sendDelta({ threadId, prompt: text });
} else if (mode === "oneshot") {
// Don't await — the action runs server-side while deltas stream
// to the client via reactive queries.
sendOneShot({ threadId, prompt: text }).catch((e) =>
console.error("oneshot error:", e),
);
} else if (mode === "http") {
await httpStream.send({ threadId, prompt: text });
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

sendDelta and httpStream.send rejections are unhandled.

handleSend is invoked via void handleSend() (Line 348), which discards the returned promise. If sendDelta (Line 250) or httpStream.send (Line 258) rejects, the error is silently swallowed or becomes an unhandled rejection, unlike sendOneShot which has an explicit .catch() (Line 254).

🔧 Suggested fix — wrap with try/catch or add .catch()
   async function handleSend() {
     const text = prompt.trim();
     if (!text) return;
     setPrompt("");
     requestModeRef.current = mode;
 
-    if (mode === "delta") {
-      await sendDelta({ threadId, prompt: text });
-    } else if (mode === "oneshot") {
-      // Don't await — the action runs server-side while deltas stream
-      // to the client via reactive queries.
-      sendOneShot({ threadId, prompt: text }).catch((e) =>
-        console.error("oneshot error:", e),
-      );
-    } else if (mode === "http") {
-      await httpStream.send({ threadId, prompt: text });
+    try {
+      if (mode === "delta") {
+        await sendDelta({ threadId, prompt: text });
+      } else if (mode === "oneshot") {
+        // Don't await — the action runs server-side while deltas stream
+        // to the client via reactive queries.
+        sendOneShot({ threadId, prompt: text }).catch((e) =>
+          console.error("oneshot error:", e),
+        );
+      } else if (mode === "http") {
+        await httpStream.send({ threadId, prompt: text });
+      }
+    } catch (e) {
+      console.error(`${mode} send error:`, e);
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function handleSend() {
const text = prompt.trim();
if (!text) return;
setPrompt("");
requestModeRef.current = mode;
if (mode === "delta") {
await sendDelta({ threadId, prompt: text });
} else if (mode === "oneshot") {
// Don't await — the action runs server-side while deltas stream
// to the client via reactive queries.
sendOneShot({ threadId, prompt: text }).catch((e) =>
console.error("oneshot error:", e),
);
} else if (mode === "http") {
await httpStream.send({ threadId, prompt: text });
}
}
async function handleSend() {
const text = prompt.trim();
if (!text) return;
setPrompt("");
requestModeRef.current = mode;
try {
if (mode === "delta") {
await sendDelta({ threadId, prompt: text });
} else if (mode === "oneshot") {
// Don't await — the action runs server-side while deltas stream
// to the client via reactive queries.
sendOneShot({ threadId, prompt: text }).catch((e) =>
console.error("oneshot error:", e),
);
} else if (mode === "http") {
await httpStream.send({ threadId, prompt: text });
}
} catch (e) {
console.error(`${mode} send error:`, e);
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/ui/chat/StreamingDemo.tsx` around lines 243 - 260, handleSend
currently calls sendDelta and httpStream.send without handling rejections; wrap
the await calls to sendDelta({ threadId, prompt: text }) and httpStream.send({
threadId, prompt: text }) in try/catch (or append .catch(...)) so any rejection
is logged/handled similarly to sendOneShot; update the branches in handleSend to
catch errors and call console.error or route to the existing error handling
pathway using requestModeRef/current state to preserve behavior.

Comment on lines +478 to +487
// Fetch from midpoint cursor - should only get remaining deltas
const midCursor = Math.floor(endCursor / 2);
const laterDeltas = await ctx.runQuery(
components.agent.streams.listDeltas,
{ threadId, cursors: [{ cursor: midCursor, streamId }] },
);
const { parts: laterParts } = getParts(laterDeltas, midCursor);

// Later parts should be a subset of all parts
expect(laterParts.length).toBeLessThanOrEqual(allParts.length);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fragile mid-cursor assertion — the test passes even if partial fetch returns nothing.

Math.floor(endCursor / 2) can land inside a delta's [start, end) range, causing getParts to either hit a gap-warning (and return zero parts) or throw. Because the assertion is only laterParts.length <= allParts.length, a result of 0 parts still passes, making this a no-op check.

Consider snapping midCursor to an actual delta boundary and asserting a strict relationship:

🔧 Suggested improvement
-      // Fetch from midpoint cursor - should only get remaining deltas
-      const midCursor = Math.floor(endCursor / 2);
+      // Snap to an actual delta boundary so getParts won't hit a gap
+      const midDelta = allDeltas[Math.floor(allDeltas.length / 2)];
+      const midCursor = midDelta ? midDelta.start : 0;
       const laterDeltas = await ctx.runQuery(
         components.agent.streams.listDeltas,
         { threadId, cursors: [{ cursor: midCursor, streamId }] },
       );
       const { parts: laterParts } = getParts(laterDeltas, midCursor);
 
-      // Later parts should be a subset of all parts
-      expect(laterParts.length).toBeLessThanOrEqual(allParts.length);
+      // Later parts should be strictly fewer (or equal when midCursor === 0)
+      expect(laterParts.length).toBeLessThanOrEqual(allParts.length);
+      if (midCursor > 0) {
+        expect(laterParts.length).toBeLessThan(allParts.length);
+      }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/streaming.integration.test.ts` around lines 478 - 487, The
mid-cursor selection Math.floor(endCursor / 2) can fall inside a delta and
produce zero parts, making the check a no-op; change midCursor to snap to an
actual delta boundary (use a boundary from the fetched full set such as a part's
start or end from allParts, e.g., pick the middle part's start/end) before
calling getParts/listDeltas, then assert a stricter relationship (e.g.,
laterParts.length > 0 and laterParts.length <= allParts.length) to ensure the
partial fetch returns a non-empty subset; update references to midCursor,
endCursor, getParts, listDeltas, laterParts, and allParts accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant