Skip to content

Commit ca3259a

Browse files
sethconvexclaude
andcommitted
Add HTTP streaming demo, integration tests, and fix streaming bugs
Add a streaming demo example that showcases all three streaming patterns (delta, one-shot, HTTP) side-by-side, along with comprehensive integration tests for DeltaStreamer and delta consumption. Fixes discovered during testing: - HTTP streaming showing duplicate content: clear httpText when streaming ends, hide pending DB message during HTTP stream, render tool call parts inline in the HTTP bubble - One-shot mode using useMutation instead of useAction - useSmoothText animating non-streaming messages: track hasStreamed and snap cursor to full text when streaming was never activated - DeltaStreamer.finish()/fail() not fully draining the self-chaining write queue (use while loop instead of single await) - Unhandled rejection in #sendDelta: return instead of re-throw after onAsyncAbort since the fire-and-forget caller cannot catch it Also adds backwards-compat test suite and HTTP streaming requirements doc. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6fbf088 commit ca3259a

9 files changed

Lines changed: 3345 additions & 3 deletions

File tree

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
# Technical Requirements: HTTP Streaming for @convex-dev/agent
2+
3+
## Status: Draft
4+
## Date: 2026-02-23
5+
6+
---
7+
8+
## 1. Executive Summary
9+
10+
The current streaming architecture relies exclusively on Convex's reactive query system (WebSocket-based delta polling). This document specifies requirements for adding HTTP streaming support, including delta filtering logic, stream ID lifecycle management, and backwards compatibility constraints.
11+
12+
---
13+
14+
## 2. Current Architecture
15+
16+
### 2.1 Streaming Transport (WebSocket Delta Polling)
17+
18+
The existing system persists stream data as discrete deltas in the database, which clients poll via Convex reactive queries. There is no HTTP streaming transport.
19+
20+
**Flow:**
21+
1. `DeltaStreamer` (client action) writes compressed parts via `streams.addDelta` mutations
22+
2. React hooks (`useDeltaStreams`) issue two reactive queries per render cycle:
23+
- `kind: "list"` — discovers active `streamingMessages` for the thread
24+
- `kind: "deltas"` — fetches new deltas using per-stream cursors
25+
3. `deriveUIMessagesFromDeltas()` materializes `UIMessage[]` from accumulated deltas
26+
27+
**Key files:**
28+
- `src/client/streaming.ts``DeltaStreamer` class, compression, `syncStreams()`
29+
- `src/component/streams.ts` — Backend mutations/queries (`create`, `addDelta`, `listDeltas`, `finish`, `abort`)
30+
- `src/react/useDeltaStreams.ts` — Client-side cursor tracking and delta accumulation
31+
- `src/deltas.ts` — Delta-to-UIMessage materialization
32+
33+
### 2.2 Stream State Machine
34+
35+
```
36+
create() addDelta() (with heartbeat)
37+
│ │
38+
▼ ▼
39+
┌──────────┐ ┌──────────┐
40+
│ streaming │─────▶│ streaming │──── heartbeat every ~2.5 min
41+
└──────────┘ └──────────┘
42+
│ │
43+
│ finish() │ abort() / timeout (10 min)
44+
▼ ▼
45+
┌──────────┐ ┌─────────┐
46+
│ finished │ │ aborted │
47+
└──────────┘ └─────────┘
48+
49+
│ cleanup (5 min delay)
50+
51+
[deleted]
52+
```
53+
54+
### 2.3 Data Formats
55+
56+
Two delta formats are supported, declared per-stream:
57+
58+
| Format | Description | Primary Use |
59+
|--------|-------------|-------------|
60+
| `UIMessageChunk` | AI SDK v6 native format (`text-delta`, `tool-input-delta`, `reasoning-delta`, etc.) | Default for new streams |
61+
| `TextStreamPart` | Legacy AI SDK format | Backwards compatibility |
62+
63+
---
64+
65+
## 3. HTTP Streaming Requirements
66+
67+
### 3.1 Transport Layer
68+
69+
**REQ-HTTP-1**: Provide an HTTP streaming endpoint that emits deltas as Server-Sent Events (SSE) or newline-delimited JSON (NDJSON), enabling clients that cannot use Convex WebSocket subscriptions (e.g., non-JS environments, CLI tools, third-party integrations).
70+
71+
**REQ-HTTP-2**: The HTTP endpoint must support resumption. A client that disconnects and reconnects with a cursor value must receive only deltas it hasn't seen, not replay the full stream.
72+
73+
**REQ-HTTP-3**: The HTTP endpoint must respect the same rate-limiting constants as the WebSocket path:
74+
- `MAX_DELTAS_PER_REQUEST = 1000` (total across all streams)
75+
- `MAX_DELTAS_PER_STREAM = 100` (per stream per request)
76+
77+
**REQ-HTTP-4**: The HTTP endpoint must support filtering by stream status (`streaming`, `finished`, `aborted`) matching the existing `listStreams` query interface.
78+
79+
**REQ-HTTP-5**: The HTTP endpoint must emit a terminal event when the stream reaches `finished` or `aborted` state, so clients know to stop polling/listening.
80+
81+
### 3.2 Response Format
82+
83+
**REQ-HTTP-6**: Each SSE/NDJSON frame must include:
84+
```typescript
85+
{
86+
streamId: string; // ID of the streaming message
87+
start: number; // Inclusive cursor position
88+
end: number; // Exclusive cursor position
89+
parts: any[]; // Delta parts (UIMessageChunk[] or TextStreamPart[])
90+
}
91+
```
92+
93+
This matches the existing `StreamDelta` type (`src/validators.ts:628-634`).
94+
95+
**REQ-HTTP-7**: Stream metadata must be available either as an initial frame or via a separate endpoint, containing:
96+
```typescript
97+
{
98+
streamId: string;
99+
status: "streaming" | "finished" | "aborted";
100+
format: "UIMessageChunk" | "TextStreamPart" | undefined;
101+
order: number;
102+
stepOrder: number;
103+
userId?: string;
104+
agentName?: string;
105+
model?: string;
106+
provider?: string;
107+
providerOptions?: ProviderOptions;
108+
}
109+
```
110+
111+
This matches the existing `StreamMessage` type (`src/validators.ts:607-626`).
112+
113+
---
114+
115+
## 4. Delta Stream Filtering Logic
116+
117+
### 4.1 Server-Side Filtering
118+
119+
**REQ-FILT-1**: The `listDeltas` query must continue to filter by stream ID + cursor position using the `streamId_start_end` index:
120+
```
121+
.withIndex("streamId_start_end", (q) =>
122+
q.eq("streamId", cursor.streamId).gte("start", cursor.cursor))
123+
```
124+
125+
**REQ-FILT-2**: Stream discovery (`list` query) must filter by:
126+
- `threadId` (required) — scoped to a single thread
127+
- `state.kind` (optional, defaults to `["streaming"]`) — which statuses to include
128+
- `startOrder` (optional, defaults to 0) — minimum message order position
129+
130+
This uses the compound index `threadId_state_order_stepOrder`.
131+
132+
**REQ-FILT-3**: For HTTP streaming, add support for filtering deltas by a single `streamId` (not requiring `threadId`), for clients that already know which stream they want to follow.
133+
134+
### 4.2 Client-Side Filtering
135+
136+
**REQ-FILT-4**: The `useDeltaStreams` hook's cursor management must be preserved:
137+
- Per-stream cursor tracking via `Record<string, number>`
138+
- Gap detection: assert `previousEnd === delta.start` for consecutive deltas
139+
- Stale delta rejection: skip deltas where `delta.start < oldCursor`
140+
- Cache-friendly `startOrder` rounding (round down to nearest 10)
141+
142+
**REQ-FILT-5**: Support `skipStreamIds` filtering to allow callers to exclude specific streams (used when streams are already materialized from stored messages).
143+
144+
### 4.3 Delta Compression
145+
146+
**REQ-FILT-6**: Delta compression must happen before persistence (in `DeltaStreamer.#createDelta`). Two compression strategies:
147+
148+
1. **UIMessageChunk compression** (`compressUIMessageChunks`):
149+
- Merge consecutive `text-delta` parts with same `id` by concatenating `.delta`
150+
- Merge consecutive `reasoning-delta` parts with same `id` by concatenating `.delta`
151+
152+
2. **TextStreamPart compression** (`compressTextStreamParts`):
153+
- Merge consecutive `text-delta` parts with same `id` by concatenating `.text`
154+
- Merge consecutive `reasoning-delta` parts with same `id` by concatenating `.text`
155+
- Strip `Uint8Array` data from `file` parts (not suitable for delta transport)
156+
157+
**REQ-FILT-7**: Throttling must remain configurable per-stream:
158+
- Default: `250ms` between delta writes
159+
- Configurable via `StreamingOptions.throttleMs`
160+
- Chunking granularity: `"word"`, `"line"`, `RegExp`, or custom `ChunkDetector` (default: `/[\p{P}\s]/u` — punctuation + whitespace)
161+
162+
---
163+
164+
## 5. Stream ID Tracking
165+
166+
### 5.1 Stream ID Lifecycle
167+
168+
**REQ-SID-1**: Stream IDs are Convex document IDs (`Id<"streamingMessages">`) generated lazily on first delta write:
169+
- `DeltaStreamer.getStreamId()` creates the stream via `streams.create` mutation
170+
- Race-condition safe: only one creation promise via `#creatingStreamIdPromise`
171+
- Stream ID is `undefined` until the first `addParts()` call
172+
173+
**REQ-SID-2**: The `streams.create` mutation must:
174+
1. Insert a `streamingMessages` document with `state: { kind: "streaming", lastHeartbeat: Date.now() }`
175+
2. Schedule a timeout function at `TIMEOUT_INTERVAL` (10 minutes)
176+
3. Patch the document with the `timeoutFnId`
177+
178+
**REQ-SID-3**: Stream IDs must be passed to `addMessages` via `finishStreamId` for atomic stream finish + message persistence (prevents UI flicker from separate mutations).
179+
180+
### 5.2 Client-Side Stream ID Management
181+
182+
**REQ-SID-4**: React hooks must track multiple concurrent streams per thread:
183+
- `useDeltaStreams` returns `Array<{ streamMessage: StreamMessage; deltas: StreamDelta[] }>`
184+
- Each stream accumulates deltas independently
185+
- Streams are sorted by `[order, stepOrder]` for display
186+
187+
**REQ-SID-5**: When a thread changes (`threadId` differs from previous render):
188+
- Clear all accumulated delta streams (`state.deltaStreams = undefined`)
189+
- Reset all cursors (`setCursors({})`)
190+
- Reset `startOrder`
191+
192+
**REQ-SID-6**: Stream identity in UIMessages uses the convention `id: "stream:{streamId}"` to distinguish streaming messages from persisted messages.
193+
194+
### 5.3 Heartbeat & Timeout
195+
196+
**REQ-SID-7**: Heartbeat behavior:
197+
- Triggered on every `addDelta` call
198+
- Debounced: only writes if >2.5 minutes since last heartbeat (`TIMEOUT_INTERVAL / 4`)
199+
- Updates `state.lastHeartbeat` and reschedules the timeout function
200+
201+
**REQ-SID-8**: Timeout behavior:
202+
- After 10 minutes of inactivity, `timeoutStream` internal mutation fires
203+
- Checks if `lastHeartbeat + TIMEOUT_INTERVAL < Date.now()`
204+
- If expired: aborts the stream with reason `"timeout"`
205+
- If not expired: reschedules for the remaining time
206+
207+
**REQ-SID-9**: Cleanup behavior:
208+
- `finish()` schedules `deleteStream` after `DELETE_STREAM_DELAY` (5 minutes)
209+
- `deleteStream` removes the `streamingMessages` document and all associated `streamDeltas`
210+
- 5-minute delay allows clients to fetch final deltas before cleanup
211+
212+
---
213+
214+
## 6. Backwards Compatibility Requirements
215+
216+
### 6.1 Transport Compatibility
217+
218+
**REQ-BC-1**: The existing WebSocket/reactive-query streaming path must remain the default and primary transport. HTTP streaming is additive, not a replacement.
219+
220+
**REQ-BC-2**: All existing public APIs must remain unchanged:
221+
- `syncStreams()` function signature and return type (`SyncStreamsReturnValue`)
222+
- `listStreams()` function signature
223+
- `abortStream()` function signature
224+
- `vStreamMessagesReturnValue` validator
225+
226+
**REQ-BC-3**: The `StreamArgs` union type must be extended (not replaced) to support HTTP streaming parameters:
227+
```typescript
228+
// Existing (preserved):
229+
type StreamArgs =
230+
| { kind: "list"; startOrder: number }
231+
| { kind: "deltas"; cursors: Array<{ streamId: string; cursor: number }> }
232+
// New (additive):
233+
| { kind: "http"; streamId: string; cursor?: number }
234+
```
235+
236+
### 6.2 Data Format Compatibility
237+
238+
**REQ-BC-4**: Both `UIMessageChunk` and `TextStreamPart` delta formats must be supported in perpetuity. The `format` field on `streamingMessages` is `v.optional(...)`, so streams created before format tracking was added (format = `undefined`) must default to `TextStreamPart` behavior.
239+
240+
**REQ-BC-5**: Forward compatibility for new `TextStreamPart` types from future AI SDK versions must be maintained via the `default` case in `updateFromTextStreamParts` (`src/deltas.ts:520-527`):
241+
```typescript
242+
default: {
243+
console.warn(`Received unexpected part: ${JSON.stringify(part)}`);
244+
break;
245+
}
246+
```
247+
248+
**REQ-BC-6**: The `readUIMessageStream` error suppression for `"no tool invocation found"` must be preserved (`src/deltas.ts:77-81`). This handles tool approval continuation streams that have `tool-result` without the original `tool-call`.
249+
250+
### 6.3 React Hook Compatibility
251+
252+
**REQ-BC-7**: Existing React hooks must not change behavior:
253+
- `useThreadMessages` — paginated messages + streaming
254+
- `useUIMessages` — UIMessage-first with metadata
255+
- `useSmoothText` — animated text rendering
256+
257+
**REQ-BC-8**: New HTTP-streaming React hooks (if any) must be additive exports from `@convex-dev/agent/react`, not replacements.
258+
259+
### 6.4 Schema Compatibility
260+
261+
**REQ-BC-9**: No breaking changes to the component schema tables:
262+
- `streamingMessages` — no field removals or type changes
263+
- `streamDeltas` — no field removals or type changes
264+
- Indexes must not be dropped (can add new ones)
265+
266+
**REQ-BC-10**: The `vStreamDelta` and `vStreamMessage` validators must remain structurally compatible. New optional fields may be added but existing fields must not change type or be removed.
267+
268+
### 6.5 Export Surface Compatibility
269+
270+
**REQ-BC-11**: All four export surfaces must remain stable:
271+
- `@convex-dev/agent` — main exports
272+
- `@convex-dev/agent/react` — React hooks
273+
- `@convex-dev/agent/validators` — Convex validators
274+
- `@convex-dev/agent/test` — testing utilities
275+
276+
HTTP streaming additions should be exported from the main surface or a new `@convex-dev/agent/http` surface (not mixed into existing surfaces that would break tree-shaking).
277+
278+
---
279+
280+
## 7. Non-Functional Requirements
281+
282+
**REQ-NF-1**: HTTP streaming latency must not exceed the WebSocket path latency by more than 100ms for equivalent payload sizes.
283+
284+
**REQ-NF-2**: HTTP streaming must support concurrent streams per thread (matching current behavior of up to 100 active streams per thread, per the `list` query's `.take(100)`).
285+
286+
**REQ-NF-3**: HTTP streaming must gracefully handle client disconnection without leaving orphaned streams (existing heartbeat/timeout mechanism applies).
287+
288+
**REQ-NF-4**: Delta writes must remain throttled at the configured `throttleMs` regardless of transport, to avoid excessive database writes.
289+
290+
---
291+
292+
## 8. Open Questions
293+
294+
1. **SSE vs NDJSON**: Should the HTTP transport use SSE (native browser support, automatic reconnection) or NDJSON (simpler, works with `fetch` + `ReadableStream`)?
295+
296+
2. **Authentication**: How should HTTP streaming endpoints authenticate? Convex actions have auth context, but raw HTTP endpoints may need token-based auth.
297+
298+
3. **Multi-stream HTTP**: Should a single HTTP connection support multiplexed streams (like the current WebSocket path with multi-cursor queries), or should each HTTP connection follow a single stream?
299+
300+
4. **Convex HTTP actions**: Should HTTP streaming be implemented as Convex HTTP actions (which have a 2-minute timeout and limited streaming support), or as a separate server/proxy?
301+
302+
5. **Atomic finish over HTTP**: The current `finishStreamId` pattern enables atomic stream finish + message save. How should this translate to the HTTP transport where the client may not be the writer?

example/convex/_generated/api.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import type * as chat_basic from "../chat/basic.js";
1919
import type * as chat_human from "../chat/human.js";
2020
import type * as chat_streamAbort from "../chat/streamAbort.js";
2121
import type * as chat_streaming from "../chat/streaming.js";
22+
import type * as chat_streamingDemo from "../chat/streamingDemo.js";
2223
import type * as chat_streamingReasoning from "../chat/streamingReasoning.js";
2324
import type * as chat_withoutAgent from "../chat/withoutAgent.js";
2425
import type * as crons from "../crons.js";
@@ -68,6 +69,7 @@ declare const fullApi: ApiFromModules<{
6869
"chat/human": typeof chat_human;
6970
"chat/streamAbort": typeof chat_streamAbort;
7071
"chat/streaming": typeof chat_streaming;
72+
"chat/streamingDemo": typeof chat_streamingDemo;
7173
"chat/streamingReasoning": typeof chat_streamingReasoning;
7274
"chat/withoutAgent": typeof chat_withoutAgent;
7375
crons: typeof crons;

0 commit comments

Comments
 (0)