Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/agent-runtime/src/AgentRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ export class AgentRuntime {
const contentBlocks = msg.message
? MessageConverter.toContentBlocks(msg.message)
: [];
// Only accumulate content blocks for text/tool content (for storage)
if (contentBlocks.length > 0) {
// Only accumulate when accumulate !== false (defaults to true)
if (contentBlocks.length > 0 && msg.accumulate !== false) {
content.push(...contentBlocks);
}
writer.writeEvent(msg.type, {
Expand All @@ -389,7 +389,7 @@ export class AgentRuntime {
}

return {
content,
content: MessageConverter.mergeContentBlocks(content),
usage: hasUsage ? { promptTokens, completionTokens, totalTokens: promptTokens + completionTokens } : undefined,
aborted: false as const,
};
Expand Down
76 changes: 73 additions & 3 deletions core/agent-runtime/src/MessageConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ export class MessageConverter {
output: MessageObject[];
usage?: RunUsage;
} {
const output: MessageObject[] = [];
const contentBlocks: MessageContentBlock[] = [];
let promptTokens = 0;
let completionTokens = 0;
let hasUsage = false;

for (const msg of messages) {
if (msg.message) {
output.push(MessageConverter.toMessageObject(msg.message, runId));
if (msg.message && msg.accumulate !== false) {
contentBlocks.push(...MessageConverter.toContentBlocks(msg.message));
}
if (msg.usage) {
hasUsage = true;
Expand All @@ -93,6 +93,19 @@ export class MessageConverter {
}
}

const mergedContent = MessageConverter.mergeContentBlocks(contentBlocks);
const output: MessageObject[] = mergedContent.length > 0
? [{
id: newMsgId(),
object: AgentObjectType.ThreadMessage,
createdAt: nowUnix(),
runId,
role: MessageRole.Assistant,
status: MessageStatus.Completed,
content: mergedContent,
}]
: [];

let usage: RunUsage | undefined;
if (hasUsage) {
usage = {
Expand All @@ -105,6 +118,63 @@ export class MessageConverter {
return { output, usage };
}

/**
* Merge accumulated content blocks into a clean final form:
* 1. Consecutive text blocks are merged into a single text block.
* 2. Text blocks immediately following a tool_use block (before the next
* tool_result) are treated as input_json_delta fragments — they are
* concatenated, JSON-parsed, and written into the tool_use block's input.
*/
static mergeContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] {
if (blocks.length === 0) return blocks;

const merged: MessageContentBlock[] = [];

for (let i = 0; i < blocks.length; i++) {
const block = blocks[i];

if (isToolUseBlock(block)) {
// Collect subsequent text blocks as input_json_delta fragments
const inputFragments: string[] = [];
let next = blocks[i + 1];
while (next && isTextBlock(next)) {
i++;
inputFragments.push(next.text.value);
next = blocks[i + 1];
}
if (inputFragments.length > 0) {
const raw = inputFragments.join('');
let parsedInput: Record<string, unknown>;
try {
parsedInput = JSON.parse(raw);
} catch {
parsedInput = {};
}
merged.push({ ...block, input: { ...block.input, ...parsedInput } });
} else {
merged.push(block);
}
} else if (isTextBlock(block)) {
// Merge consecutive text blocks
const parts: string[] = [ block.text.value ];
let next = blocks[i + 1];
while (next && isTextBlock(next)) {
i++;
parts.push(next.text.value);
next = blocks[i + 1];
}
merged.push({
type: ContentBlockType.Text,
text: { value: parts.join(''), annotations: [] },
});
} else {
merged.push(block);
}
}

return merged;
}

/**
* Produce a completed copy of a streaming MessageObject with final content.
*/
Expand Down
62 changes: 62 additions & 0 deletions core/agent-runtime/test/AgentRuntime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,68 @@ describe('test/AgentRuntime.test.ts', () => {
assert(deltaEvents.length > 0);
});

it('should respect accumulate=false to exclude content from completed message', async () => {
executor.execRun = async function* (): AsyncGenerator<AgentStreamMessage> {
yield { type: 'assistant.thinking.delta', message: { content: 'thinking...' }, accumulate: false };
yield { type: 'assistant.text.delta', message: { content: 'Hello' }, accumulate: true };
yield { type: 'tool.delta', message: { content: '{"partial":true}' }, accumulate: false };
yield { type: 'assistant.text.delta', message: { content: ' world' }, accumulate: true };
};

const writer = new MockSSEWriter();
await runtime.streamRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } }, writer);

// All custom events are still forwarded
const customEvents = writer.events.filter(e =>
!e.event.startsWith('thread.') && e.event !== 'done',
);
assert.equal(customEvents.length, 4);

// But completed message only has accumulated content (merged text)
const completedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadMessageCompleted);
assert.ok(completedEvent);
const content = (completedEvent.data as any).content;
assert.equal(content.length, 1);
assert(isTextBlock(content[0]));
assert.equal(content[0].text.value, 'Hello world');
});

it('should merge text fragments and backfill tool_use input in completed message', async () => {
executor.execRun = async function* (): AsyncGenerator<AgentStreamMessage> {
yield { type: 'assistant.text.delta', message: { content: 'I will ' }, accumulate: true };
yield { type: 'assistant.text.delta', message: { content: 'help you.' }, accumulate: true };
yield {
type: 'tool.started',
message: { content: [{ type: 'tool_use', id: 'fc-1', name: 'Bash', input: {} }] as any },
accumulate: true,
};
yield { type: 'tool.delta', message: { content: '{"command":"ls"}' }, accumulate: true };
yield {
type: 'tool.completed',
message: { content: [{ type: 'tool_result', tool_use_id: 'fc-1', content: 'file1' }] as any },
accumulate: true,
};
};

const writer = new MockSSEWriter();
await runtime.streamRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } }, writer);

const completedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadMessageCompleted);
assert.ok(completedEvent);
const content = (completedEvent.data as any).content;

// Text fragments merged into one
assert(isTextBlock(content[0]));
assert.equal(content[0].text.value, 'I will help you.');

// tool_use input backfilled from text delta
assert.equal(content[1].type, 'tool_use');
assert.deepStrictEqual(content[1].input, { command: 'ls' });

// tool_result preserved
assert.equal(content[2].type, 'tool_result');
});

it('should emit failed event when execRun throws', async () => {
executor.execRun = async function* (): AsyncGenerator<AgentStreamMessage> {
throw new Error('model unavailable');
Expand Down
124 changes: 119 additions & 5 deletions core/agent-runtime/test/MessageConverter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,18 @@ describe('test/MessageConverter.test.ts', () => {
});

describe('extractFromStreamMessages', () => {
it('should extract messages and accumulate usage', () => {
it('should merge chunks into single MessageObject and accumulate usage', () => {
const messages: AgentStreamMessage[] = [
{ message: { content: 'chunk1' }, usage: { promptTokens: 10, completionTokens: 5 } },
{ message: { content: 'chunk2' }, usage: { promptTokens: 0, completionTokens: 8 } },
];
const { output, usage } = MessageConverter.extractFromStreamMessages(messages, 'run_1');

assert.equal(output.length, 2);
assert.equal(output.length, 1);
assert.equal(output[0].content.length, 1);
assert(isTextBlock(output[0].content[0]));
assert.equal(output[0].content[0].text.value, 'chunk1');
assert(isTextBlock(output[1].content[0]));
assert.equal(output[1].content[0].text.value, 'chunk2');
assert.equal(output[0].content[0].text.value, 'chunk1chunk2');
assert.equal(output[0].runId, 'run_1');
assert.ok(usage);
assert.equal(usage.promptTokens, 10);
assert.equal(usage.completionTokens, 13);
Expand All @@ -222,6 +222,19 @@ describe('test/MessageConverter.test.ts', () => {
assert(isToolUseBlock(output[0].content[1]));
});

it('should skip messages with accumulate=false', () => {
const messages: AgentStreamMessage[] = [
{ message: { content: 'thinking...' }, accumulate: false },
{ message: { content: 'visible text' }, accumulate: true },
{ message: { content: 'tool delta' }, accumulate: false },
];
const { output } = MessageConverter.extractFromStreamMessages(messages, 'run_1');
assert.equal(output.length, 1);
assert.equal(output[0].content.length, 1);
assert(isTextBlock(output[0].content[0]));
assert.equal(output[0].content[0].text.value, 'visible text');
});

it('should return undefined usage when no usage info', () => {
const messages: AgentStreamMessage[] = [{ message: { content: 'data' } }];
const { output, usage } = MessageConverter.extractFromStreamMessages(messages);
Expand Down Expand Up @@ -332,6 +345,107 @@ describe('test/MessageConverter.test.ts', () => {
});
});

describe('mergeContentBlocks', () => {
it('should return empty array for empty input', () => {
assert.deepStrictEqual(MessageConverter.mergeContentBlocks([]), []);
});

it('should merge consecutive text blocks into one', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'Hello', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: ' ', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: 'world', annotations: [] } },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 1);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'Hello world');
});

it('should not merge text blocks separated by non-text blocks', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'before', annotations: [] } },
{ type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'result' },
{ type: ContentBlockType.Text, text: { value: 'after', annotations: [] } },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 3);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'before');
assert(isToolResultBlock(result[1]));
assert(isTextBlock(result[2]));
assert.equal(result[2].text.value, 'after');
});

it('should backfill tool_use input from subsequent text blocks', () => {
const blocks = [
{ type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: {} },
{ type: ContentBlockType.Text, text: { value: '{"command":', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: ' "curl -s"}', annotations: [] } },
{ type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'ok' },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 2);
assert(isToolUseBlock(result[0]));
assert.deepStrictEqual(result[0].input, { command: 'curl -s' });
assert(isToolResultBlock(result[1]));
});

it('should keep tool_use as-is when no text blocks follow', () => {
const blocks = [
{ type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: { q: 'test' } },
{ type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'result' },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 2);
assert(isToolUseBlock(result[0]));
assert.deepStrictEqual(result[0].input, { q: 'test' });
});

it('should fallback to empty object when JSON parse fails', () => {
const blocks = [
{ type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: {} },
{ type: ContentBlockType.Text, text: { value: 'not valid json', annotations: [] } },
{ type: ContentBlockType.ToolResult, tool_use_id: 'toolu_1', content: 'result' },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 2);
assert(isToolUseBlock(result[0]));
assert.deepStrictEqual(result[0].input, {});
});

it('should handle realistic stream scenario: text + tool_use + input deltas + tool_result + text', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'I will ', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: 'help you.', annotations: [] } },
{ type: ContentBlockType.ToolUse, id: 'fc-1', name: 'Bash', input: {} },
{ type: ContentBlockType.Text, text: { value: '{"command":"ls -la"}', annotations: [] } },
{ type: ContentBlockType.ToolResult, tool_use_id: 'fc-1', content: 'file1\nfile2' },
{ type: ContentBlockType.Text, text: { value: 'Here are', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: ' the results.', annotations: [] } },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 4);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'I will help you.');
assert(isToolUseBlock(result[1]));
assert.deepStrictEqual(result[1].input, { command: 'ls -la' });
assert(isToolResultBlock(result[2]));
assert(isTextBlock(result[3]));
assert.equal(result[3].text.value, 'Here are the results.');
});

it('should pass through generic blocks unchanged', () => {
const blocks = [
{ type: 'thinking', thinking: 'let me think...' },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 1);
assert.equal(result[0].type, 'thinking');
assert.equal((result[0] as any).thinking, 'let me think...');
});
});

describe('type guards', () => {
it('isTextBlock should narrow to TextContentBlock', () => {
const block = { type: ContentBlockType.Text, text: { value: 'hello', annotations: [] } };
Expand Down
2 changes: 1 addition & 1 deletion core/mcp-client/src/HeaderUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export function mergeHeaders(...headersInits: Array<HeadersInit | undefined>): H
const res = {};
for (const headersInit of headersInits) {
if (!headersInit) continue;
const headers = new Headers(headersInit);
const headers = new Headers(headersInit as Record<string, string>);
for (const key of headers.keys()) {
res[key] = headers.get(key);
}
Expand Down
2 changes: 2 additions & 0 deletions core/types/agent-runtime/AgentRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,6 @@ export interface AgentStreamMessage {
type?: string;
message?: AgentStreamMessagePayload;
usage?: AgentRunUsage;
/** Whether to accumulate message content into the final completed message. Defaults to true. */
accumulate?: boolean;
}
Loading