From ee2429f0be7bf87d4af9df89d8836b63ec267402 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=97=B0=E6=98=8E?= Date: Thu, 2 Apr 2026 01:47:05 +0800 Subject: [PATCH 1/3] fix(agent-runtime): whitelist-based normalizeContentBlocks with thinking support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restructure normalizeContentBlocks with two-phase approach: 1. Unwrap streaming protocol events to extract actual content 2. Whitelist filter — only keep known content block types - Whitelist: text, tool_use, tool_result, thinking - content_block_start: only extract tool_use, discard others - content_block_delta: extract text_delta, input_json_delta, thinking_delta; discard others (signature_delta, etc.) - Unknown block types (ping, future events) discarded by whitelist Co-Authored-By: Claude Opus 4.6 --- core/agent-runtime/src/MessageConverter.ts | 74 ++++++++++++------- .../test/MessageConverter.test.ts | 54 +++++++++++--- 2 files changed, 92 insertions(+), 36 deletions(-) diff --git a/core/agent-runtime/src/MessageConverter.ts b/core/agent-runtime/src/MessageConverter.ts index 2bfc768d..5b78e672 100644 --- a/core/agent-runtime/src/MessageConverter.ts +++ b/core/agent-runtime/src/MessageConverter.ts @@ -118,49 +118,71 @@ export class MessageConverter { return { output, usage }; } + /** Content block types allowed in the final assembled message. */ + private static readonly ALLOWED_BLOCK_TYPES = new Set([ + ContentBlockType.Text, // text + ContentBlockType.ToolUse, // tool_use + ContentBlockType.ToolResult, // tool_result + 'thinking', // extended thinking + ]); + /** * Normalize raw SDK streaming event blocks (e.g. Anthropic content_block_start/delta/stop) * into standard content blocks that mergeContentBlocks can process. - * This allows upstream code to transparently pass through SDK events without format conversion. + * + * Two-phase approach: + * 1. Unwrap streaming protocol events to extract actual content + * 2. Whitelist filter — only keep known content block types */ static normalizeContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] { - const result: MessageContentBlock[] = []; + const unwrapped: MessageContentBlock[] = []; for (const block of blocks) { const b = block as Record; - // content_block_start[tool_use] → ToolUseContentBlock - if (b.type === 'content_block_start' && b.content_block?.type === ContentBlockType.ToolUse) { - const cb = b.content_block; - result.push({ type: ContentBlockType.ToolUse, id: cb.id, name: cb.name, input: cb.input ?? {} } as ToolUseContentBlock); - continue; - } - // content_block_delta[input_json_delta] → TextContentBlock (merged into tool_use.input later) - if (b.type === 'content_block_delta' && b.delta?.type === 'input_json_delta') { - const partial: string = b.delta.partial_json || ''; - if (partial) { - result.push({ type: ContentBlockType.Text, text: { value: partial, annotations: [] } } as TextContentBlock); + + // --- Phase 1: Unwrap streaming protocol events --- + + // content_block_start → extract tool_use; discard others (text/thinking start are just markers) + if (b.type === 'content_block_start') { + if (b.content_block?.type === ContentBlockType.ToolUse) { + const cb = b.content_block; + unwrapped.push({ type: ContentBlockType.ToolUse, id: cb.id, name: cb.name, input: cb.input ?? {} } as ToolUseContentBlock); } continue; } - // content_block_delta[text_delta] → TextContentBlock - if (b.type === 'content_block_delta' && b.delta?.type === 'text_delta') { - const text: string = b.delta.text || ''; - if (text) { - result.push({ type: ContentBlockType.Text, text: { value: text, annotations: [] } } as TextContentBlock); + + // content_block_delta → extract content from known delta subtypes + if (b.type === 'content_block_delta') { + if (b.delta?.type === 'text_delta') { + const text: string = b.delta.text || ''; + if (text) { + unwrapped.push({ type: ContentBlockType.Text, text: { value: text, annotations: [] } } as TextContentBlock); + } + } else if (b.delta?.type === 'input_json_delta') { + const partial: string = b.delta.partial_json || ''; + if (partial) { + unwrapped.push({ type: ContentBlockType.Text, text: { value: partial, annotations: [] } } as TextContentBlock); + } + } else if (b.delta?.type === 'thinking_delta') { + const thinking: string = b.delta.thinking || ''; + if (thinking) { + unwrapped.push({ type: 'thinking', thinking } as unknown as MessageContentBlock); + } } + // Other deltas (signature_delta, etc.) → discard continue; } - // thinking_delta → discard (not part of final message) - if (b.type === 'content_block_delta' && b.delta?.type === 'thinking_delta') { - continue; - } - // content_block_stop / message_stop / message_delta → discard + + // Streaming control signals → discard if (b.type === 'content_block_stop' || b.type === 'message_stop' || b.type === 'message_delta') { continue; } - // Standard blocks (text, tool_use, tool_result) and other generic blocks → keep as-is - result.push(block); + + // Non-streaming blocks (already standard or generic) → pass to phase 2 + unwrapped.push(block); } - return result; + + // --- Phase 2: Whitelist filter --- + return unwrapped.filter(b => MessageConverter.ALLOWED_BLOCK_TYPES.has(b.type)); } /** diff --git a/core/agent-runtime/test/MessageConverter.test.ts b/core/agent-runtime/test/MessageConverter.test.ts index 3b466608..cb352fed 100644 --- a/core/agent-runtime/test/MessageConverter.test.ts +++ b/core/agent-runtime/test/MessageConverter.test.ts @@ -381,11 +381,40 @@ describe('test/MessageConverter.test.ts', () => { assert.equal(result[0].text.value, 'Hello'); }); - it('should discard thinking_delta blocks', () => { + it('should discard content_block_start for non-tool_use types (thinking, text)', () => { + const blocks = [ + { type: 'content_block_start', index: 0, content_block: { type: 'thinking', thinking: '' } }, + { type: 'content_block_start', index: 1, content_block: { type: 'text', text: '' } }, + ] as any; + const result = MessageConverter.normalizeContentBlocks(blocks); + assert.equal(result.length, 0); + }); + + it('should extract thinking_delta as thinking block', () => { const blocks = [ { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me think...' } }, ] as any; const result = MessageConverter.normalizeContentBlocks(blocks); + assert.equal(result.length, 1); + assert.equal(result[0].type, 'thinking'); + assert.equal((result[0] as any).thinking, 'let me think...'); + }); + + it('should discard signature_delta and other unknown delta subtypes', () => { + const blocks = [ + { type: 'content_block_delta', delta: { type: 'signature_delta', signature: 'sig-theta' } }, + { type: 'content_block_delta', delta: { type: 'some_future_delta', data: 'xxx' } }, + ] as any; + const result = MessageConverter.normalizeContentBlocks(blocks); + assert.equal(result.length, 0); + }); + + it('should discard unknown block types via whitelist', () => { + const blocks = [ + { type: 'ping' }, + { type: 'some_unknown_event', data: 'xxx' }, + ] as any; + const result = MessageConverter.normalizeContentBlocks(blocks); assert.equal(result.length, 0); }); @@ -547,8 +576,10 @@ describe('test/MessageConverter.test.ts', () => { { type: 'content_block_delta', delta: { type: 'text_delta', text: 'I will ' } }, { type: 'content_block_delta', delta: { type: 'text_delta', text: 'help you.' } }, { type: 'content_block_stop' }, - // thinking (should be discarded) + // thinking (should be kept) { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me think...' } }, + // signature (should be discarded) + { type: 'content_block_delta', delta: { type: 'signature_delta', signature: 'sig-theta' } }, // tool_use start + input deltas + stop { type: 'content_block_start', content_block: { type: 'tool_use', id: 'fc-1', name: 'Bash', input: {} } }, { type: 'content_block_delta', delta: { type: 'input_json_delta', partial_json: '{"command":' } }, @@ -563,20 +594,23 @@ describe('test/MessageConverter.test.ts', () => { { type: 'message_stop' }, ] as any; const result = MessageConverter.mergeContentBlocks(blocks); - assert.equal(result.length, 4); + assert.equal(result.length, 5); // merged text assert(isTextBlock(result[0])); assert.equal(result[0].text.value, 'I will help you.'); + // thinking block + assert.equal(result[1].type, 'thinking'); + assert.equal((result[1] as any).thinking, 'let me think...'); // tool_use with parsed input - assert(isToolUseBlock(result[1])); - assert.equal(result[1].name, 'Bash'); - assert.deepStrictEqual(result[1].input, { command: 'ls -la' }); + assert(isToolUseBlock(result[2])); + assert.equal(result[2].name, 'Bash'); + assert.deepStrictEqual(result[2].input, { command: 'ls -la' }); // tool_result - assert(isToolResultBlock(result[2])); - assert.equal(result[2].tool_use_id, 'fc-1'); + assert(isToolResultBlock(result[3])); + assert.equal(result[3].tool_use_id, 'fc-1'); // merged trailing text - assert(isTextBlock(result[3])); - assert.equal(result[3].text.value, 'Here are the results.'); + assert(isTextBlock(result[4])); + assert.equal(result[4].text.value, 'Here are the results.'); }); }); From 26bc42597dba8de70cd96ba0e76a495ce74c0ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=97=B0=E6=98=8E?= Date: Thu, 2 Apr 2026 02:11:14 +0800 Subject: [PATCH 2/3] fix(agent-runtime): merge consecutive thinking blocks in mergeContentBlocks Previously each thinking_delta from Anthropic streaming became a separate thinking block in the final message. Now consecutive thinking blocks are merged into one, matching the same pattern used for text block merging. Co-Authored-By: Claude Opus 4.6 --- core/agent-runtime/src/MessageConverter.ts | 19 +++++++++++ .../test/MessageConverter.test.ts | 34 ++++++++++++++++--- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/core/agent-runtime/src/MessageConverter.ts b/core/agent-runtime/src/MessageConverter.ts index 5b78e672..e66ebff6 100644 --- a/core/agent-runtime/src/MessageConverter.ts +++ b/core/agent-runtime/src/MessageConverter.ts @@ -23,6 +23,15 @@ export function isToolResultBlock(block: MessageContentBlock): block is ToolResu return block.type === ContentBlockType.ToolResult; } +interface ThinkingBlock { + type: 'thinking'; + thinking: string; +} + +export function isThinkingBlock(block: MessageContentBlock): block is ThinkingBlock & MessageContentBlock { + return block.type === 'thinking'; +} + import { nowUnix, newMsgId } from './AgentStoreUtils'; import type { RunUsage } from './RunBuilder'; @@ -235,6 +244,16 @@ export class MessageConverter { type: ContentBlockType.Text, text: { value: parts.join(''), annotations: [] }, }); + } else if (isThinkingBlock(block)) { + // Merge consecutive thinking blocks + const parts: string[] = [ (block as unknown as ThinkingBlock).thinking ]; + let next = blocks[i + 1]; + while (next && isThinkingBlock(next)) { + i++; + parts.push((next as unknown as ThinkingBlock).thinking); + next = blocks[i + 1]; + } + merged.push({ type: 'thinking', thinking: parts.join('') } as unknown as MessageContentBlock); } else { merged.push(block); } diff --git a/core/agent-runtime/test/MessageConverter.test.ts b/core/agent-runtime/test/MessageConverter.test.ts index cb352fed..8283fcca 100644 --- a/core/agent-runtime/test/MessageConverter.test.ts +++ b/core/agent-runtime/test/MessageConverter.test.ts @@ -13,7 +13,7 @@ import { ContentBlockType, } from '@eggjs/tegg-types/agent-runtime'; -import { MessageConverter, isTextBlock, isToolUseBlock, isToolResultBlock } from '../src/MessageConverter'; +import { MessageConverter, isTextBlock, isToolUseBlock, isToolResultBlock, isThinkingBlock } from '../src/MessageConverter'; describe('test/MessageConverter.test.ts', () => { describe('toContentBlocks', () => { @@ -560,9 +560,11 @@ describe('test/MessageConverter.test.ts', () => { assert.equal(result[3].text.value, 'Here are the results.'); }); - it('should pass through generic blocks unchanged', () => { + it('should merge consecutive thinking blocks into one', () => { const blocks = [ - { type: 'thinking', thinking: 'let me think...' }, + { type: 'thinking', thinking: 'let ' }, + { type: 'thinking', thinking: 'me ' }, + { type: 'thinking', thinking: 'think...' }, ] as any; const result = MessageConverter.mergeContentBlocks(blocks); assert.equal(result.length, 1); @@ -570,14 +572,30 @@ describe('test/MessageConverter.test.ts', () => { assert.equal((result[0] as any).thinking, 'let me think...'); }); + it('should not merge thinking blocks separated by other blocks', () => { + const blocks = [ + { type: 'thinking', thinking: 'first thought' }, + { type: ContentBlockType.Text, text: { value: 'hello', annotations: [] } }, + { type: 'thinking', thinking: 'second thought' }, + ] as any; + const result = MessageConverter.mergeContentBlocks(blocks); + assert.equal(result.length, 3); + assert.equal(result[0].type, 'thinking'); + assert.equal((result[0] as any).thinking, 'first thought'); + assert(isTextBlock(result[1])); + assert.equal(result[2].type, 'thinking'); + assert.equal((result[2] as any).thinking, 'second thought'); + }); + it('should handle raw Anthropic SDK stream events end-to-end', () => { const blocks = [ // text deltas { type: 'content_block_delta', delta: { type: 'text_delta', text: 'I will ' } }, { type: 'content_block_delta', delta: { type: 'text_delta', text: 'help you.' } }, { type: 'content_block_stop' }, - // thinking (should be kept) - { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me think...' } }, + // thinking deltas (should be merged into one thinking block) + { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'let me ' } }, + { type: 'content_block_delta', delta: { type: 'thinking_delta', thinking: 'think...' } }, // signature (should be discarded) { type: 'content_block_delta', delta: { type: 'signature_delta', signature: 'sig-theta' } }, // tool_use start + input deltas + stop @@ -633,6 +651,12 @@ describe('test/MessageConverter.test.ts', () => { assert.equal(block.tool_use_id, 'toolu_1'); }); + it('isThinkingBlock should identify thinking blocks', () => { + const block = { type: 'thinking', thinking: 'hmm...' }; + assert(isThinkingBlock(block as any)); + assert(!isThinkingBlock({ type: ContentBlockType.Text, text: { value: 'hi', annotations: [] } } as any)); + }); + it('type guards should return false for non-matching blocks', () => { const textBlock = { type: ContentBlockType.Text, text: { value: 'hi', annotations: [] } }; const toolUseBlock = { type: ContentBlockType.ToolUse, id: 'id', name: 'n', input: {} }; From b0ca5b7f3decf7b146f39f75a374a3452a10cfad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=97=B0=E6=98=8E?= Date: Thu, 2 Apr 2026 02:15:48 +0800 Subject: [PATCH 3/3] fix(agent-runtime): remove multi-spaces to pass eslint no-multi-spaces Co-Authored-By: Claude Opus 4.6 --- core/agent-runtime/src/MessageConverter.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/agent-runtime/src/MessageConverter.ts b/core/agent-runtime/src/MessageConverter.ts index e66ebff6..80cbb3fe 100644 --- a/core/agent-runtime/src/MessageConverter.ts +++ b/core/agent-runtime/src/MessageConverter.ts @@ -129,10 +129,10 @@ export class MessageConverter { /** Content block types allowed in the final assembled message. */ private static readonly ALLOWED_BLOCK_TYPES = new Set([ - ContentBlockType.Text, // text - ContentBlockType.ToolUse, // tool_use - ContentBlockType.ToolResult, // tool_result - 'thinking', // extended thinking + ContentBlockType.Text, + ContentBlockType.ToolUse, + ContentBlockType.ToolResult, + 'thinking', ]); /**