fix(agent-runtime): merge content blocks and support accumulate control#428
fix(agent-runtime): merge content blocks and support accumulate control#428jerryliang64 merged 2 commits intomasterfrom
Conversation
…ol in stream messages Content blocks accumulated during streaming were stored as raw fragments, causing fragmented text, empty tool_use inputs, and mixed thinking content in thread.message.completed and run output. This fix: 1. Adds `accumulate` field to AgentStreamMessage — executors can set `accumulate: false` to forward SSE events without polluting final content 2. Adds MessageConverter.mergeContentBlocks() — merges consecutive text blocks and backfills tool_use input from subsequent text (input_json_delta) 3. Applies merge logic to all three run paths (streamRun, syncRun, asyncRun) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughStream message accumulation is now controllable via an added Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Runtime as AgentRuntime
participant Converter as MessageConverter
participant Writer as Writer/Emitter
Client->>Runtime: send stream chunks (some with accumulate=false)
loop per incoming chunk
Runtime->>Writer: writeEvent(chunk.eventType, chunk)
alt chunk.accumulate !== false
Runtime->>Converter: collect content block(chunk)
end
end
Runtime->>Converter: mergeContentBlocks(collectedBlocks)
Converter-->>Runtime: merged content (text merged, tool_use.input backfilled)
Runtime->>Writer: writeEvent("ThreadMessageCompleted", mergedMessage)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces an accumulate flag to AgentStreamMessage to control content inclusion in final messages and adds a mergeContentBlocks utility to consolidate text fragments and backfill tool_use inputs. A review comment points out that JSON.parse could return non-object types, which would cause issues when spreading into the tool input object, and suggests adding a type check to handle this safely.
There was a problem hiding this comment.
🧹 Nitpick comments (3)
core/agent-runtime/src/AgentRuntime.ts (1)
364-374: Inconsistentaccumulatehandling between custom and standard events.Custom event messages (line 365) respect
accumulate !== false, but standard messages withouttype(line 374) always accumulate unconditionally. This asymmetry may cause confusion—if an executor yields{ message: {...}, accumulate: false }without atype, the content will still be accumulated.Is this intentional? If standard messages should also respect the flag:
♻️ Proposed fix for consistent accumulate handling
} else if (msg.message) { const contentBlocks = MessageConverter.toContentBlocks(msg.message); - content.push(...contentBlocks); + if (msg.accumulate !== false) { + content.push(...contentBlocks); + } // event: thread.message.delta const delta: MessageDeltaObject = {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/agent-runtime/src/AgentRuntime.ts` around lines 364 - 374, The handler treats accumulate differently for typed vs standard messages: when msg.message is present we always push contentBlocks but when msg.type is used we check msg.accumulate !== false; make them consistent by honoring msg.accumulate for the standard-path as well—before calling MessageConverter.toContentBlocks and content.push(...) in the branch that handles msg.message, check if msg.accumulate !== false (default true) and only push into content and include content in the writer.writeEvent payload accordingly; update the use of MessageConverter.toContentBlocks, content.push, and the writer.writeEvent call to follow the same accumulate logic used for the custom-event branch (referencing msg.accumulate, contentBlocks, MessageConverter.toContentBlocks, writer.writeEvent, and msg.message).core/agent-runtime/src/MessageConverter.ts (2)
166-169: Annotations are discarded when merging text blocks.The merged text block is created with
annotations: [], discarding any annotations from source blocks. Verify this is intentional for your use case.💡 Alternative: Concatenate annotations from all source blocks
} else if (isTextBlock(block)) { // Merge consecutive text blocks const parts: string[] = [ block.text.value ]; + const allAnnotations: unknown[] = [...block.text.annotations]; let next = blocks[i + 1]; while (next && isTextBlock(next)) { i++; parts.push(next.text.value); + allAnnotations.push(...next.text.annotations); next = blocks[i + 1]; } merged.push({ type: ContentBlockType.Text, - text: { value: parts.join(''), annotations: [] }, + text: { value: parts.join(''), annotations: allAnnotations }, });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/agent-runtime/src/MessageConverter.ts` around lines 166 - 169, The merge creates a new text ContentBlock with annotations: [] which discards source annotations; update MessageConverter's merging logic (where merged.push({ type: ContentBlockType.Text, text: { value: parts.join(''), annotations: [] } })) to collect and concatenate annotations from the original text blocks (e.g., gather annotations arrays from the source blocks you loop over, dedupe/normalize as needed) and assign that combined annotations array instead of an empty array so annotations are preserved on the merged block.
145-153: Silent fallback to empty object may hide parsing issues.When JSON parsing fails (line 150-152), the code silently falls back to
{}. This could mask malformed tool input deltas in production. Consider logging a warning or preserving the raw text for debugging purposes.♻️ Suggested improvement: Add logging for parse failures
The method would need access to a logger, or this could be handled by the caller. Alternatively, consider returning a diagnostic marker:
try { parsedInput = JSON.parse(raw); } catch { + // Consider logging: `Failed to parse tool_use input JSON: ${raw}` parsedInput = {}; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/agent-runtime/src/MessageConverter.ts` around lines 145 - 153, The JSON.parse fallback is currently silent in MessageConverter (inside the block handling inputFragments → parsedInput), which can hide malformed tool input; modify the catch to capture the exception (e) and both log a warning (use the class/logger available in MessageConverter or console.warn if none) and preserve diagnostics by setting parsedInput to include the raw text and error (e.g. { _raw: raw, _parseError: String(e) }) so merged.push receives traceable data on parse failures; update the method signature to accept/propagate a logger if needed and reference variables inputFragments, raw, parsedInput, block, and merged when making changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@core/agent-runtime/src/AgentRuntime.ts`:
- Around line 364-374: The handler treats accumulate differently for typed vs
standard messages: when msg.message is present we always push contentBlocks but
when msg.type is used we check msg.accumulate !== false; make them consistent by
honoring msg.accumulate for the standard-path as well—before calling
MessageConverter.toContentBlocks and content.push(...) in the branch that
handles msg.message, check if msg.accumulate !== false (default true) and only
push into content and include content in the writer.writeEvent payload
accordingly; update the use of MessageConverter.toContentBlocks, content.push,
and the writer.writeEvent call to follow the same accumulate logic used for the
custom-event branch (referencing msg.accumulate, contentBlocks,
MessageConverter.toContentBlocks, writer.writeEvent, and msg.message).
In `@core/agent-runtime/src/MessageConverter.ts`:
- Around line 166-169: The merge creates a new text ContentBlock with
annotations: [] which discards source annotations; update MessageConverter's
merging logic (where merged.push({ type: ContentBlockType.Text, text: { value:
parts.join(''), annotations: [] } })) to collect and concatenate annotations
from the original text blocks (e.g., gather annotations arrays from the source
blocks you loop over, dedupe/normalize as needed) and assign that combined
annotations array instead of an empty array so annotations are preserved on the
merged block.
- Around line 145-153: The JSON.parse fallback is currently silent in
MessageConverter (inside the block handling inputFragments → parsedInput), which
can hide malformed tool input; modify the catch to capture the exception (e) and
both log a warning (use the class/logger available in MessageConverter or
console.warn if none) and preserve diagnostics by setting parsedInput to include
the raw text and error (e.g. { _raw: raw, _parseError: String(e) }) so
merged.push receives traceable data on parse failures; update the method
signature to accept/propagate a logger if needed and reference variables
inputFragments, raw, parsedInput, block, and merged when making changes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0df1107f-23c7-4b37-a85c-1991da3815de
📒 Files selected for processing (5)
core/agent-runtime/src/AgentRuntime.tscore/agent-runtime/src/MessageConverter.tscore/agent-runtime/test/AgentRuntime.test.tscore/agent-runtime/test/MessageConverter.test.tscore/types/agent-runtime/AgentRuntime.ts
Cast headersInit to Record<string, string> to avoid type conflict between global Headers and undici's Headers iterator types. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
accumulatefield toAgentStreamMessage— executors setaccumulate: falseon events likeassistant.thinking.deltaortool.deltato forward them via SSE without polluting the final completed message contentMessageConverter.mergeContentBlocks()— post-processes accumulated content blocks: merges consecutive text fragments into one, backfillstool_useinput from subsequentinput_json_deltatext blocks (with JSON.parse fallback to{})streamRun(viaconsumeStreamMessages),syncRunandasyncRun(viaextractFromStreamMessages) all apply the same accumulate filtering and merge logicBefore
{ "content": [ { "type": "text", "text": { "value": "用户" } }, { "type": "text", "text": { "value": "要查询" } }, { "type": "text", "text": { "value": "我来帮你查询。" } }, { "type": "tool_use", "id": "fc-xxx", "name": "Bash", "input": {} }, { "type": "text", "text": { "value": "{\"command\": \"curl -s" } }, { "type": "text", "text": { "value": " --header ...\"}" } }, { "type": "tool_result", "tool_use_id": "fc-xxx", "content": "..." } ] }After
{ "content": [ { "type": "text", "text": { "value": "我来帮你查询。" } }, { "type": "tool_use", "id": "fc-xxx", "name": "Bash", "input": { "command": "curl -s --header ..." } }, { "type": "tool_result", "tool_use_id": "fc-xxx", "content": "..." } ] }Test plan
npm test --workspace=core/agent-runtime)mergeContentBlocksunit tests: empty input, consecutive text merge, tool_use input backfill, JSON parse fallback, realistic mixed scenario, generic block passthroughextractFromStreamMessagestests updated for merged output + newaccumulate: falsefiltering teststreamRunintegration tests:accumulate=falseexclusion, text merge + tool_use backfill in completed event🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests