diff --git a/core/agent-runtime/src/AgentRuntime.ts b/core/agent-runtime/src/AgentRuntime.ts index 07791a16b..a2bbbd457 100644 --- a/core/agent-runtime/src/AgentRuntime.ts +++ b/core/agent-runtime/src/AgentRuntime.ts @@ -361,8 +361,8 @@ export class AgentRuntime { const contentBlocks = msg.message ? MessageConverter.toContentBlocks(msg.message) : []; - // Only accumulate content blocks for text/tool content (for storage) - if (contentBlocks.length > 0) { + // Only accumulate when accumulate !== false (defaults to true) + if (contentBlocks.length > 0 && msg.accumulate !== false) { content.push(...contentBlocks); } writer.writeEvent(msg.type, { @@ -389,7 +389,7 @@ export class AgentRuntime { } return { - content, + content: MessageConverter.mergeContentBlocks(content), usage: hasUsage ? { promptTokens, completionTokens, totalTokens: promptTokens + completionTokens } : undefined, aborted: false as const, }; diff --git a/core/agent-runtime/src/MessageConverter.ts b/core/agent-runtime/src/MessageConverter.ts index 67bbe2471..4dbe57f59 100644 --- a/core/agent-runtime/src/MessageConverter.ts +++ b/core/agent-runtime/src/MessageConverter.ts @@ -77,14 +77,14 @@ export class MessageConverter { output: MessageObject[]; usage?: RunUsage; } { - const output: MessageObject[] = []; + const contentBlocks: MessageContentBlock[] = []; let promptTokens = 0; let completionTokens = 0; let hasUsage = false; for (const msg of messages) { - if (msg.message) { - output.push(MessageConverter.toMessageObject(msg.message, runId)); + if (msg.message && msg.accumulate !== false) { + contentBlocks.push(...MessageConverter.toContentBlocks(msg.message)); } if (msg.usage) { hasUsage = true; @@ -93,6 +93,19 @@ export class MessageConverter { } } + const mergedContent = MessageConverter.mergeContentBlocks(contentBlocks); + const output: MessageObject[] = mergedContent.length > 0 + ? [{ + id: newMsgId(), + object: AgentObjectType.ThreadMessage, + createdAt: nowUnix(), + runId, + role: MessageRole.Assistant, + status: MessageStatus.Completed, + content: mergedContent, + }] + : []; + let usage: RunUsage | undefined; if (hasUsage) { usage = { @@ -105,6 +118,63 @@ export class MessageConverter { return { output, usage }; } + /** + * Merge accumulated content blocks into a clean final form: + * 1. Consecutive text blocks are merged into a single text block. + * 2. Text blocks immediately following a tool_use block (before the next + * tool_result) are treated as input_json_delta fragments — they are + * concatenated, JSON-parsed, and written into the tool_use block's input. + */ + static mergeContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] { + if (blocks.length === 0) return blocks; + + const merged: MessageContentBlock[] = []; + + for (let i = 0; i < blocks.length; i++) { + const block = blocks[i]; + + if (isToolUseBlock(block)) { + // Collect subsequent text blocks as input_json_delta fragments + const inputFragments: string[] = []; + let next = blocks[i + 1]; + while (next && isTextBlock(next)) { + i++; + inputFragments.push(next.text.value); + next = blocks[i + 1]; + } + if (inputFragments.length > 0) { + const raw = inputFragments.join(''); + let parsedInput: Record; + try { + parsedInput = JSON.parse(raw); + } catch { + parsedInput = {}; + } + merged.push({ ...block, input: { ...block.input, ...parsedInput } }); + } else { + merged.push(block); + } + } else if (isTextBlock(block)) { + // Merge consecutive text blocks + const parts: string[] = [ block.text.value ]; + let next = blocks[i + 1]; + while (next && isTextBlock(next)) { + i++; + parts.push(next.text.value); + next = blocks[i + 1]; + } + merged.push({ + type: ContentBlockType.Text, + text: { value: parts.join(''), annotations: [] }, + }); + } else { + merged.push(block); + } + } + + return merged; + } + /** * Produce a completed copy of a streaming MessageObject with final content. */ diff --git a/core/agent-runtime/test/AgentRuntime.test.ts b/core/agent-runtime/test/AgentRuntime.test.ts index 53cff757b..424c447c0 100644 --- a/core/agent-runtime/test/AgentRuntime.test.ts +++ b/core/agent-runtime/test/AgentRuntime.test.ts @@ -530,6 +530,68 @@ describe('test/AgentRuntime.test.ts', () => { assert(deltaEvents.length > 0); }); + it('should respect accumulate=false to exclude content from completed message', async () => { + executor.execRun = async function* (): AsyncGenerator { + yield { type: 'assistant.thinking.delta', message: { content: 'thinking...' }, accumulate: false }; + yield { type: 'assistant.text.delta', message: { content: 'Hello' }, accumulate: true }; + yield { type: 'tool.delta', message: { content: '{"partial":true}' }, accumulate: false }; + yield { type: 'assistant.text.delta', message: { content: ' world' }, accumulate: true }; + }; + + const writer = new MockSSEWriter(); + await runtime.streamRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } }, writer); + + // All custom events are still forwarded + const customEvents = writer.events.filter(e => + !e.event.startsWith('thread.') && e.event !== 'done', + ); + assert.equal(customEvents.length, 4); + + // But completed message only has accumulated content (merged text) + const completedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadMessageCompleted); + assert.ok(completedEvent); + const content = (completedEvent.data as any).content; + assert.equal(content.length, 1); + assert(isTextBlock(content[0])); + assert.equal(content[0].text.value, 'Hello world'); + }); + + it('should merge text fragments and backfill tool_use input in completed message', async () => { + executor.execRun = async function* (): AsyncGenerator { + yield { type: 'assistant.text.delta', message: { content: 'I will ' }, accumulate: true }; + yield { type: 'assistant.text.delta', message: { content: 'help you.' }, accumulate: true }; + yield { + type: 'tool.started', + message: { content: [{ type: 'tool_use', id: 'fc-1', name: 'Bash', input: {} }] as any }, + accumulate: true, + }; + yield { type: 'tool.delta', message: { content: '{"command":"ls"}' }, accumulate: true }; + yield { + type: 'tool.completed', + message: { content: [{ type: 'tool_result', tool_use_id: 'fc-1', content: 'file1' }] as any }, + accumulate: true, + }; + }; + + const writer = new MockSSEWriter(); + await runtime.streamRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } }, writer); + + const completedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadMessageCompleted); + assert.ok(completedEvent); + const content = (completedEvent.data as any).content; + + // Text fragments merged into one + assert(isTextBlock(content[0])); + assert.equal(content[0].text.value, 'I will help you.'); + + // tool_use input backfilled from text delta + assert.equal(content[1].type, 'tool_use'); + assert.deepStrictEqual(content[1].input, { command: 'ls' }); + + // tool_result preserved + assert.equal(content[2].type, 'tool_result'); + }); + it('should emit failed event when execRun throws', async () => { executor.execRun = async function* (): AsyncGenerator { throw new Error('model unavailable'); diff --git a/core/agent-runtime/test/MessageConverter.test.ts b/core/agent-runtime/test/MessageConverter.test.ts index a09c8aa0d..de77f694a 100644 --- a/core/agent-runtime/test/MessageConverter.test.ts +++ b/core/agent-runtime/test/MessageConverter.test.ts @@ -186,18 +186,18 @@ describe('test/MessageConverter.test.ts', () => { }); describe('extractFromStreamMessages', () => { - it('should extract messages and accumulate usage', () => { + it('should merge chunks into single MessageObject and accumulate usage', () => { const messages: AgentStreamMessage[] = [ { message: { content: 'chunk1' }, usage: { promptTokens: 10, completionTokens: 5 } }, { message: { content: 'chunk2' }, usage: { promptTokens: 0, completionTokens: 8 } }, ]; const { output, usage } = MessageConverter.extractFromStreamMessages(messages, 'run_1'); - assert.equal(output.length, 2); + assert.equal(output.length, 1); + assert.equal(output[0].content.length, 1); assert(isTextBlock(output[0].content[0])); - assert.equal(output[0].content[0].text.value, 'chunk1'); - assert(isTextBlock(output[1].content[0])); - assert.equal(output[1].content[0].text.value, 'chunk2'); + assert.equal(output[0].content[0].text.value, 'chunk1chunk2'); + assert.equal(output[0].runId, 'run_1'); assert.ok(usage); assert.equal(usage.promptTokens, 10); assert.equal(usage.completionTokens, 13); @@ -222,6 +222,19 @@ describe('test/MessageConverter.test.ts', () => { assert(isToolUseBlock(output[0].content[1])); }); + it('should skip messages with accumulate=false', () => { + const messages: AgentStreamMessage[] = [ + { message: { content: 'thinking...' }, accumulate: false }, + { message: { content: 'visible text' }, accumulate: true }, + { message: { content: 'tool delta' }, accumulate: false }, + ]; + const { output } = MessageConverter.extractFromStreamMessages(messages, 'run_1'); + assert.equal(output.length, 1); + assert.equal(output[0].content.length, 1); + assert(isTextBlock(output[0].content[0])); + assert.equal(output[0].content[0].text.value, 'visible text'); + }); + it('should return undefined usage when no usage info', () => { const messages: AgentStreamMessage[] = [{ message: { content: 'data' } }]; const { output, usage } = MessageConverter.extractFromStreamMessages(messages); @@ -332,6 +345,107 @@ describe('test/MessageConverter.test.ts', () => { }); }); + describe('mergeContentBlocks', () => { + it('should return empty array for empty input', () => { + assert.deepStrictEqual(MessageConverter.mergeContentBlocks([]), []); + }); + + it('should merge consecutive text blocks into one', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'Hello', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: ' ', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: 'world', annotations: [] } }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 1); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, 'Hello world'); + }); + + it('should not merge text blocks separated by non-text blocks', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'before', annotations: [] } }, + { type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'result' }, + { type: ContentBlockType.Text, text: { value: 'after', annotations: [] } }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 3); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, 'before'); + assert(isToolResultBlock(result[1])); + assert(isTextBlock(result[2])); + assert.equal(result[2].text.value, 'after'); + }); + + it('should backfill tool_use input from subsequent text blocks', () => { + const blocks = [ + { type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: {} }, + { type: ContentBlockType.Text, text: { value: '{"command":', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: ' "curl -s"}', annotations: [] } }, + { type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'ok' }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 2); + assert(isToolUseBlock(result[0])); + assert.deepStrictEqual(result[0].input, { command: 'curl -s' }); + assert(isToolResultBlock(result[1])); + }); + + it('should keep tool_use as-is when no text blocks follow', () => { + const blocks = [ + { type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: { q: 'test' } }, + { type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'result' }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 2); + assert(isToolUseBlock(result[0])); + assert.deepStrictEqual(result[0].input, { q: 'test' }); + }); + + it('should fallback to empty object when JSON parse fails', () => { + const blocks = [ + { type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: {} }, + { type: ContentBlockType.Text, text: { value: 'not valid json', annotations: [] } }, + { type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'result' }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 2); + assert(isToolUseBlock(result[0])); + assert.deepStrictEqual(result[0].input, {}); + }); + + it('should handle realistic stream scenario: text + tool_use + input deltas + tool_result + text', () => { + const blocks = [ + { type: ContentBlockType.Text, text: { value: 'I will ', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: 'help you.', annotations: [] } }, + { type: ContentBlockType.ToolUse, id: 'fc-1', name: 'Bash', input: {} }, + { type: ContentBlockType.Text, text: { value: '{"command":"ls -la"}', annotations: [] } }, + { type: ContentBlockType.ToolResult, tool_use_id: 'fc-1', content: 'file1\nfile2' }, + { type: ContentBlockType.Text, text: { value: 'Here are', annotations: [] } }, + { type: ContentBlockType.Text, text: { value: ' the results.', annotations: [] } }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 4); + assert(isTextBlock(result[0])); + assert.equal(result[0].text.value, 'I will help you.'); + assert(isToolUseBlock(result[1])); + assert.deepStrictEqual(result[1].input, { command: 'ls -la' }); + assert(isToolResultBlock(result[2])); + assert(isTextBlock(result[3])); + assert.equal(result[3].text.value, 'Here are the results.'); + }); + + it('should pass through generic blocks unchanged', () => { + const blocks = [ + { type: 'thinking', thinking: 'let me think...' }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 1); + assert.equal(result[0].type, 'thinking'); + assert.equal((result[0] as any).thinking, 'let me think...'); + }); + }); + describe('type guards', () => { it('isTextBlock should narrow to TextContentBlock', () => { const block = { type: ContentBlockType.Text, text: { value: 'hello', annotations: [] } }; diff --git a/core/mcp-client/src/HeaderUtil.ts b/core/mcp-client/src/HeaderUtil.ts index 5d2306eb0..36017ad4c 100644 --- a/core/mcp-client/src/HeaderUtil.ts +++ b/core/mcp-client/src/HeaderUtil.ts @@ -4,7 +4,7 @@ export function mergeHeaders(...headersInits: Array): H const res = {}; for (const headersInit of headersInits) { if (!headersInit) continue; - const headers = new Headers(headersInit); + const headers = new Headers(headersInit as Record); for (const key of headers.keys()) { res[key] = headers.get(key); } diff --git a/core/types/agent-runtime/AgentRuntime.ts b/core/types/agent-runtime/AgentRuntime.ts index 779efd772..c1665b983 100644 --- a/core/types/agent-runtime/AgentRuntime.ts +++ b/core/types/agent-runtime/AgentRuntime.ts @@ -116,4 +116,6 @@ export interface AgentStreamMessage { type?: string; message?: AgentStreamMessagePayload; usage?: AgentRunUsage; + /** Whether to accumulate message content into the final completed message. Defaults to true. */ + accumulate?: boolean; }