diff --git a/core/agent-runtime/src/MessageConverter.ts b/core/agent-runtime/src/MessageConverter.ts index 2bfc768d..80cbb3fe 100644 --- a/core/agent-runtime/src/MessageConverter.ts +++ b/core/agent-runtime/src/MessageConverter.ts @@ -23,6 +23,15 @@ export function isToolResultBlock(block: MessageContentBlock): block is ToolResu return block.type === ContentBlockType.ToolResult; } +interface ThinkingBlock { + type: 'thinking'; + thinking: string; +} + +export function isThinkingBlock(block: MessageContentBlock): block is ThinkingBlock & MessageContentBlock { + return block.type === 'thinking'; +} + import { nowUnix, newMsgId } from './AgentStoreUtils'; import type { RunUsage } from './RunBuilder'; @@ -118,49 +127,71 @@ export class MessageConverter { return { output, usage }; } + /** Content block types allowed in the final assembled message. */ + private static readonly ALLOWED_BLOCK_TYPES = new Set([ + ContentBlockType.Text, + ContentBlockType.ToolUse, + ContentBlockType.ToolResult, + 'thinking', + ]); + /** * Normalize raw SDK streaming event blocks (e.g. Anthropic content_block_start/delta/stop) * into standard content blocks that mergeContentBlocks can process. - * This allows upstream code to transparently pass through SDK events without format conversion. + * + * Two-phase approach: + * 1. Unwrap streaming protocol events to extract actual content + * 2. Whitelist filter — only keep known content block types */ static normalizeContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] { - const result: MessageContentBlock[] = []; + const unwrapped: MessageContentBlock[] = []; for (const block of blocks) { const b = block as Record; - // content_block_start[tool_use] → ToolUseContentBlock - if (b.type === 'content_block_start' && b.content_block?.type === ContentBlockType.ToolUse) { - const cb = b.content_block; - result.push({ type: ContentBlockType.ToolUse, id: cb.id, name: cb.name, input: cb.input ?? {} } as ToolUseContentBlock); - continue; - } - // content_block_delta[input_json_delta] → TextContentBlock (merged into tool_use.input later) - if (b.type === 'content_block_delta' && b.delta?.type === 'input_json_delta') { - const partial: string = b.delta.partial_json || ''; - if (partial) { - result.push({ type: ContentBlockType.Text, text: { value: partial, annotations: [] } } as TextContentBlock); + + // --- Phase 1: Unwrap streaming protocol events --- + + // content_block_start → extract tool_use; discard others (text/thinking start are just markers) + if (b.type === 'content_block_start') { + if (b.content_block?.type === ContentBlockType.ToolUse) { + const cb = b.content_block; + unwrapped.push({ type: ContentBlockType.ToolUse, id: cb.id, name: cb.name, input: cb.input ?? {} } as ToolUseContentBlock); } continue; } - // content_block_delta[text_delta] → TextContentBlock - if (b.type === 'content_block_delta' && b.delta?.type === 'text_delta') { - const text: string = b.delta.text || ''; - if (text) { - result.push({ type: ContentBlockType.Text, text: { value: text, annotations: [] } } as TextContentBlock); + + // content_block_delta → extract content from known delta subtypes + if (b.type === 'content_block_delta') { + if (b.delta?.type === 'text_delta') { + const text: string = b.delta.text || ''; + if (text) { + unwrapped.push({ type: ContentBlockType.Text, text: { value: text, annotations: [] } } as TextContentBlock); + } + } else if (b.delta?.type === 'input_json_delta') { + const partial: string = b.delta.partial_json || ''; + if (partial) { + unwrapped.push({ type: ContentBlockType.Text, text: { value: partial, annotations: [] } } as TextContentBlock); + } + } else if (b.delta?.type === 'thinking_delta') { + const thinking: string = b.delta.thinking || ''; + if (thinking) { + unwrapped.push({ type: 'thinking', thinking } as unknown as MessageContentBlock); + } } + // Other deltas (signature_delta, etc.) → discard continue; } - // thinking_delta → discard (not part of final message) - if (b.type === 'content_block_delta' && b.delta?.type === 'thinking_delta') { - continue; - } - // content_block_stop / message_stop / message_delta → discard + + // Streaming control signals → discard if (b.type === 'content_block_stop' || b.type === 'message_stop' || b.type === 'message_delta') { continue; } - // Standard blocks (text, tool_use, tool_result) and other generic blocks → keep as-is - result.push(block); + + // Non-streaming blocks (already standard or generic) → pass to phase 2 + unwrapped.push(block); } - return result; + + // --- Phase 2: Whitelist filter --- + return unwrapped.filter(b => MessageConverter.ALLOWED_BLOCK_TYPES.has(b.type)); } /** @@ -213,6 +244,16 @@ export class MessageConverter { type: ContentBlockType.Text, text: { value: parts.join(''), annotations: [] }, }); + } else if (isThinkingBlock(block)) { + // Merge consecutive thinking blocks + const parts: string[] = [ (block as unknown as ThinkingBlock).thinking ]; + let next = blocks[i + 1]; + while (next && isThinkingBlock(next)) { + i++; + parts.push((next as unknown as ThinkingBlock).thinking); + next = blocks[i + 1]; + } + merged.push({ type: 'thinking', thinking: parts.join('') } as unknown as MessageContentBlock); } else { merged.push(block); } diff --git a/core/agent-runtime/test/MessageConverter.test.ts b/core/agent-runtime/test/MessageConverter.test.ts index 3b466608..8283fcca 100644 --- a/core/agent-runtime/test/MessageConverter.test.ts +++ b/core/agent-runtime/test/MessageConverter.test.ts @@ -13,7 +13,7 @@ import { ContentBlockType, } from '@eggjs/tegg-types/agent-runtime'; -import { MessageConverter, isTextBlock, isToolUseBlock, isToolResultBlock } from '../src/MessageConverter'; +import { MessageConverter, isTextBlock, isToolUseBlock, isToolResultBlock, isThinkingBlock } from '../src/MessageConverter'; describe('test/MessageConverter.test.ts', () => { describe('toContentBlocks', () => { @@ -381,11 +381,40 @@ describe('test/MessageConverter.test.ts', () => { assert.equal(result[0].text.value, 'Hello'); }); - it('should discard thinking_delta blocks', () => { + it('should discard content_block_start for non-tool_use types (thinking, text)', () => { + const blocks = [ + { type: 'content_block_start', index: 0, content_block: { type: 'thinking', thinking: '' } }, + { type: 'content_block_start', index: 1, content_block: { type: 'text', text: '' } }, + ] as any; + const result = MessageConverter.normalizeContentBlocks(blocks); + assert.equal(result.length, 0); + }); + + it('should extract thinking_delta as thinking block', () => { const blocks = [ { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me think...' } }, ] as any; const result = MessageConverter.normalizeContentBlocks(blocks); + assert.equal(result.length, 1); + assert.equal(result[0].type, 'thinking'); + assert.equal((result[0] as any).thinking, 'let me think...'); + }); + + it('should discard signature_delta and other unknown delta subtypes', () => { + const blocks = [ + { type: 'content_block_delta', delta: { type: 'signature_delta', signature: 'sig-theta' } }, + { type: 'content_block_delta', delta: { type: 'some_future_delta', data: 'xxx' } }, + ] as any; + const result = MessageConverter.normalizeContentBlocks(blocks); + assert.equal(result.length, 0); + }); + + it('should discard unknown block types via whitelist', () => { + const blocks = [ + { type: 'ping' }, + { type: 'some_unknown_event', data: 'xxx' }, + ] as any; + const result = MessageConverter.normalizeContentBlocks(blocks); assert.equal(result.length, 0); }); @@ -531,9 +560,11 @@ describe('test/MessageConverter.test.ts', () => { assert.equal(result[3].text.value, 'Here are the results.'); }); - it('should pass through generic blocks unchanged', () => { + it('should merge consecutive thinking blocks into one', () => { const blocks = [ - { type: 'thinking', thinking: 'let me think...' }, + { type: 'thinking', thinking: 'let ' }, + { type: 'thinking', thinking: 'me ' }, + { type: 'thinking', thinking: 'think...' }, ] as any; const result = MessageConverter.mergeContentBlocks(blocks); assert.equal(result.length, 1); @@ -541,14 +572,32 @@ describe('test/MessageConverter.test.ts', () => { assert.equal((result[0] as any).thinking, 'let me think...'); }); + it('should not merge thinking blocks separated by other blocks', () => { + const blocks = [ + { type: 'thinking', thinking: 'first thought' }, + { type: ContentBlockType.Text, text: { value: 'hello', annotations: [] } }, + { type: 'thinking', thinking: 'second thought' }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 3); + assert.equal(result[0].type, 'thinking'); + assert.equal((result[0] as any).thinking, 'first thought'); + assert(isTextBlock(result[1])); + assert.equal(result[2].type, 'thinking'); + assert.equal((result[2] as any).thinking, 'second thought'); + }); + it('should handle raw Anthropic SDK stream events end-to-end', () => { const blocks = [ // text deltas { type: 'content_block_delta', delta: { type: 'text_delta', text: 'I will ' } }, { type: 'content_block_delta', delta: { type: 'text_delta', text: 'help you.' } }, { type: 'content_block_stop' }, - // thinking (should be discarded) - { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me think...' } }, + // thinking deltas (should be merged into one thinking block) + { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me ' } }, + { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'think...' } }, + // signature (should be discarded) + { type: 'content_block_delta', delta: { type: 'signature_delta', signature: 'sig-theta' } }, // tool_use start + input deltas + stop { type: 'content_block_start', content_block: { type: 'tool_use', id: 'fc-1', name: 'Bash', input: {} } }, { type: 'content_block_delta', delta: { type: 'input_json_delta', partial_json: '{"command":' } }, @@ -563,20 +612,23 @@ describe('test/MessageConverter.test.ts', () => { { type: 'message_stop' }, ] as any; const result = MessageConverter.mergeContentBlocks(blocks); - assert.equal(result.length, 4); + assert.equal(result.length, 5); // merged text assert(isTextBlock(result[0])); assert.equal(result[0].text.value, 'I will help you.'); + // thinking block + assert.equal(result[1].type, 'thinking'); + assert.equal((result[1] as any).thinking, 'let me think...'); // tool_use with parsed input - assert(isToolUseBlock(result[1])); - assert.equal(result[1].name, 'Bash'); - assert.deepStrictEqual(result[1].input, { command: 'ls -la' }); + assert(isToolUseBlock(result[2])); + assert.equal(result[2].name, 'Bash'); + assert.deepStrictEqual(result[2].input, { command: 'ls -la' }); // tool_result - assert(isToolResultBlock(result[2])); - assert.equal(result[2].tool_use_id, 'fc-1'); + assert(isToolResultBlock(result[3])); + assert.equal(result[3].tool_use_id, 'fc-1'); // merged trailing text - assert(isTextBlock(result[3])); - assert.equal(result[3].text.value, 'Here are the results.'); + assert(isTextBlock(result[4])); + assert.equal(result[4].text.value, 'Here are the results.'); }); }); @@ -599,6 +651,12 @@ describe('test/MessageConverter.test.ts', () => { assert.equal(block.tool_use_id, 'toolu_1'); }); + it('isThinkingBlock should identify thinking blocks', () => { + const block = { type: 'thinking', thinking: 'hmm...' }; + assert(isThinkingBlock(block as any)); + assert(!isThinkingBlock({ type: ContentBlockType.Text, text: { value: 'hi', annotations: [] } } as any)); + }); + it('type guards should return false for non-matching blocks', () => { const textBlock = { type: ContentBlockType.Text, text: { value: 'hi', annotations: [] } }; const toolUseBlock = { type: ContentBlockType.ToolUse, id: 'id', name: 'n', input: {} };