Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 67 additions & 26 deletions core/agent-runtime/src/MessageConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ export function isToolResultBlock(block: MessageContentBlock): block is ToolResu
return block.type === ContentBlockType.ToolResult;
}

interface ThinkingBlock {
type: 'thinking';
thinking: string;
}

export function isThinkingBlock(block: MessageContentBlock): block is ThinkingBlock & MessageContentBlock {
return block.type === 'thinking';
}

import { nowUnix, newMsgId } from './AgentStoreUtils';
import type { RunUsage } from './RunBuilder';

Expand Down Expand Up @@ -118,49 +127,71 @@ export class MessageConverter {
return { output, usage };
}

/** Content block types allowed in the final assembled message. */
private static readonly ALLOWED_BLOCK_TYPES = new Set([
ContentBlockType.Text,
ContentBlockType.ToolUse,
ContentBlockType.ToolResult,
'thinking',
]);

/**
* Normalize raw SDK streaming event blocks (e.g. Anthropic content_block_start/delta/stop)
* into standard content blocks that mergeContentBlocks can process.
* This allows upstream code to transparently pass through SDK events without format conversion.
*
* Two-phase approach:
* 1. Unwrap streaming protocol events to extract actual content
* 2. Whitelist filter — only keep known content block types
*/
static normalizeContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] {
const result: MessageContentBlock[] = [];
const unwrapped: MessageContentBlock[] = [];
for (const block of blocks) {
const b = block as Record<string, any>;
// content_block_start[tool_use] → ToolUseContentBlock
if (b.type === 'content_block_start' && b.content_block?.type === ContentBlockType.ToolUse) {
const cb = b.content_block;
result.push({ type: ContentBlockType.ToolUse, id: cb.id, name: cb.name, input: cb.input ?? {} } as ToolUseContentBlock);
continue;
}
// content_block_delta[input_json_delta] → TextContentBlock (merged into tool_use.input later)
if (b.type === 'content_block_delta' && b.delta?.type === 'input_json_delta') {
const partial: string = b.delta.partial_json || '';
if (partial) {
result.push({ type: ContentBlockType.Text, text: { value: partial, annotations: [] } } as TextContentBlock);

// --- Phase 1: Unwrap streaming protocol events ---

// content_block_start → extract tool_use; discard others (text/thinking start are just markers)
if (b.type === 'content_block_start') {
if (b.content_block?.type === ContentBlockType.ToolUse) {
const cb = b.content_block;
unwrapped.push({ type: ContentBlockType.ToolUse, id: cb.id, name: cb.name, input: cb.input ?? {} } as ToolUseContentBlock);
}
continue;
}
// content_block_delta[text_delta] → TextContentBlock
if (b.type === 'content_block_delta' && b.delta?.type === 'text_delta') {
const text: string = b.delta.text || '';
if (text) {
result.push({ type: ContentBlockType.Text, text: { value: text, annotations: [] } } as TextContentBlock);

// content_block_delta → extract content from known delta subtypes
if (b.type === 'content_block_delta') {
if (b.delta?.type === 'text_delta') {
const text: string = b.delta.text || '';
if (text) {
unwrapped.push({ type: ContentBlockType.Text, text: { value: text, annotations: [] } } as TextContentBlock);
}
} else if (b.delta?.type === 'input_json_delta') {
const partial: string = b.delta.partial_json || '';
if (partial) {
unwrapped.push({ type: ContentBlockType.Text, text: { value: partial, annotations: [] } } as TextContentBlock);
}
} else if (b.delta?.type === 'thinking_delta') {
const thinking: string = b.delta.thinking || '';
if (thinking) {
unwrapped.push({ type: 'thinking', thinking } as unknown as MessageContentBlock);
}
}
// Other deltas (signature_delta, etc.) → discard
continue;
}
// thinking_delta → discard (not part of final message)
if (b.type === 'content_block_delta' && b.delta?.type === 'thinking_delta') {
continue;
}
// content_block_stop / message_stop / message_delta → discard

// Streaming control signals → discard
if (b.type === 'content_block_stop' || b.type === 'message_stop' || b.type === 'message_delta') {
continue;
}
// Standard blocks (text, tool_use, tool_result) and other generic blocks → keep as-is
result.push(block);

// Non-streaming blocks (already standard or generic) → pass to phase 2
unwrapped.push(block);
}
return result;

// --- Phase 2: Whitelist filter ---
return unwrapped.filter(b => MessageConverter.ALLOWED_BLOCK_TYPES.has(b.type));
}

/**
Expand Down Expand Up @@ -213,6 +244,16 @@ export class MessageConverter {
type: ContentBlockType.Text,
text: { value: parts.join(''), annotations: [] },
});
} else if (isThinkingBlock(block)) {
// Merge consecutive thinking blocks
const parts: string[] = [ (block as unknown as ThinkingBlock).thinking ];
let next = blocks[i + 1];
while (next && isThinkingBlock(next)) {
i++;
parts.push((next as unknown as ThinkingBlock).thinking);
next = blocks[i + 1];
}
merged.push({ type: 'thinking', thinking: parts.join('') } as unknown as MessageContentBlock);
} else {
merged.push(block);
}
Expand Down
86 changes: 72 additions & 14 deletions core/agent-runtime/test/MessageConverter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
ContentBlockType,
} from '@eggjs/tegg-types/agent-runtime';

import { MessageConverter, isTextBlock, isToolUseBlock, isToolResultBlock } from '../src/MessageConverter';
import { MessageConverter, isTextBlock, isToolUseBlock, isToolResultBlock, isThinkingBlock } from '../src/MessageConverter';

describe('test/MessageConverter.test.ts', () => {
describe('toContentBlocks', () => {
Expand Down Expand Up @@ -381,11 +381,40 @@ describe('test/MessageConverter.test.ts', () => {
assert.equal(result[0].text.value, 'Hello');
});

it('should discard thinking_delta blocks', () => {
it('should discard content_block_start for non-tool_use types (thinking, text)', () => {
const blocks = [
{ type: 'content_block_start', index: 0, content_block: { type: 'thinking', thinking: '' } },
{ type: 'content_block_start', index: 1, content_block: { type: 'text', text: '' } },
] as any;
const result = MessageConverter.normalizeContentBlocks(blocks);
assert.equal(result.length, 0);
});

it('should extract thinking_delta as thinking block', () => {
const blocks = [
{ type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me think...' } },
] as any;
const result = MessageConverter.normalizeContentBlocks(blocks);
assert.equal(result.length, 1);
assert.equal(result[0].type, 'thinking');
assert.equal((result[0] as any).thinking, 'let me think...');
});

it('should discard signature_delta and other unknown delta subtypes', () => {
const blocks = [
{ type: 'content_block_delta', delta: { type: 'signature_delta', signature: 'sig-theta' } },
{ type: 'content_block_delta', delta: { type: 'some_future_delta', data: 'xxx' } },
] as any;
const result = MessageConverter.normalizeContentBlocks(blocks);
assert.equal(result.length, 0);
});

it('should discard unknown block types via whitelist', () => {
const blocks = [
{ type: 'ping' },
{ type: 'some_unknown_event', data: 'xxx' },
] as any;
const result = MessageConverter.normalizeContentBlocks(blocks);
assert.equal(result.length, 0);
});

Expand Down Expand Up @@ -531,24 +560,44 @@ describe('test/MessageConverter.test.ts', () => {
assert.equal(result[3].text.value, 'Here are the results.');
});

it('should pass through generic blocks unchanged', () => {
it('should merge consecutive thinking blocks into one', () => {
const blocks = [
{ type: 'thinking', thinking: 'let me think...' },
{ type: 'thinking', thinking: 'let ' },
{ type: 'thinking', thinking: 'me ' },
{ type: 'thinking', thinking: 'think...' },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 1);
assert.equal(result[0].type, 'thinking');
assert.equal((result[0] as any).thinking, 'let me think...');
});

it('should not merge thinking blocks separated by other blocks', () => {
const blocks = [
{ type: 'thinking', thinking: 'first thought' },
{ type: ContentBlockType.Text, text: { value: 'hello', annotations: [] } },
{ type: 'thinking', thinking: 'second thought' },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 3);
assert.equal(result[0].type, 'thinking');
assert.equal((result[0] as any).thinking, 'first thought');
assert(isTextBlock(result[1]));
assert.equal(result[2].type, 'thinking');
assert.equal((result[2] as any).thinking, 'second thought');
});

it('should handle raw Anthropic SDK stream events end-to-end', () => {
const blocks = [
// text deltas
{ type: 'content_block_delta', delta: { type: 'text_delta', text: 'I will ' } },
{ type: 'content_block_delta', delta: { type: 'text_delta', text: 'help you.' } },
{ type: 'content_block_stop' },
// thinking (should be discarded)
{ type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me think...' } },
// thinking deltas (should be merged into one thinking block)
{ type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me ' } },
{ type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'think...' } },
// signature (should be discarded)
{ type: 'content_block_delta', delta: { type: 'signature_delta', signature: 'sig-theta' } },
// tool_use start + input deltas + stop
{ type: 'content_block_start', content_block: { type: 'tool_use', id: 'fc-1', name: 'Bash', input: {} } },
{ type: 'content_block_delta', delta: { type: 'input_json_delta', partial_json: '{"command":' } },
Expand All @@ -563,20 +612,23 @@ describe('test/MessageConverter.test.ts', () => {
{ type: 'message_stop' },
] as any;
const result = MessageConverter.mergeContentBlocks(blocks);
assert.equal(result.length, 4);
assert.equal(result.length, 5);
// merged text
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'I will help you.');
// thinking block
assert.equal(result[1].type, 'thinking');
assert.equal((result[1] as any).thinking, 'let me think...');
// tool_use with parsed input
assert(isToolUseBlock(result[1]));
assert.equal(result[1].name, 'Bash');
assert.deepStrictEqual(result[1].input, { command: 'ls -la' });
assert(isToolUseBlock(result[2]));
assert.equal(result[2].name, 'Bash');
assert.deepStrictEqual(result[2].input, { command: 'ls -la' });
// tool_result
assert(isToolResultBlock(result[2]));
assert.equal(result[2].tool_use_id, 'fc-1');
assert(isToolResultBlock(result[3]));
assert.equal(result[3].tool_use_id, 'fc-1');
// merged trailing text
assert(isTextBlock(result[3]));
assert.equal(result[3].text.value, 'Here are the results.');
assert(isTextBlock(result[4]));
assert.equal(result[4].text.value, 'Here are the results.');
});
});

Expand All @@ -599,6 +651,12 @@ describe('test/MessageConverter.test.ts', () => {
assert.equal(block.tool_use_id, 'toolu_1');
});

it('isThinkingBlock should identify thinking blocks', () => {
const block = { type: 'thinking', thinking: 'hmm...' };
assert(isThinkingBlock(block as any));
assert(!isThinkingBlock({ type: ContentBlockType.Text, text: { value: 'hi', annotations: [] } } as any));
});

it('type guards should return false for non-matching blocks', () => {
const textBlock = { type: ContentBlockType.Text, text: { value: 'hi', annotations: [] } };
const toolUseBlock = { type: ContentBlockType.ToolUse, id: 'id', name: 'n', input: {} };
Expand Down
Loading