diff --git a/src/acp-agent.ts b/src/acp-agent.ts index 6466e750..c8321d38 100644 --- a/src/acp-agent.ts +++ b/src/acp-agent.ts @@ -165,6 +165,18 @@ type Session = { /** Accumulated task list for the session, keyed by task ID. Task IDs are * per-session, so this state must not be shared across sessions. */ taskState: TaskState; + /** Assistant message ids whose text/thinking content was already emitted as + * `agent_message_chunk` / `agent_thought_chunk` via `stream_event` partial + * messages. Used by the top-level `assistant` case to decide whether to + * skip text/thinking blocks (the default, when streaming worked) or fall + * back to emitting the assembled blocks (when the underlying SDK / model + * proxy never produced `content_block_delta` for them). */ + textStreamedMessageIds: Set; + /** Id of the assistant message currently being streamed via `stream_event` + * partial messages, captured at `message_start` so subsequent + * `content_block_delta` events can record their parent into + * `textStreamedMessageIds`. */ + currentStreamingMessageId?: string; }; /** Compute a stable fingerprint of the session-defining params so we can @@ -743,6 +755,12 @@ export class ClaudeAcpAgent implements Agent { cachedWriteTokens: 0, }; + // Assistant message ids are turn-scoped — clear before a new turn so the + // set doesn't grow unboundedly over a long-lived session, and so that an + // id reused across turns (uncommon but possible) is re-evaluated. + session.textStreamedMessageIds.clear(); + session.currentStreamingMessageId = undefined; + let lastAssistantTotalUsage: number | null = null; let lastAssistantUsage: UsageSnapshot | null = null; let lastAssistantModel: string | null = null; @@ -1056,6 +1074,23 @@ export class ClaudeAcpAgent implements Agent { break; } case "stream_event": { + // Track which assistant message ids have produced streamed text / + // thinking deltas so the `assistant` case below can fall back to + // emitting the assembled content when streaming didn't (e.g. when + // the SDK is talking to a model proxy that never produces + // `content_block_delta` events for the second model call in a + // tool-use loop). + if (message.event.type === "message_start") { + session.currentStreamingMessageId = message.event.message.id; + } else if ( + message.event.type === "content_block_delta" && + (message.event.delta.type === "text_delta" || + message.event.delta.type === "thinking_delta") && + session.currentStreamingMessageId + ) { + session.textStreamedMessageIds.add(session.currentStreamingMessageId); + } + if ( message.parent_tool_use_id === null && (message.event.type === "message_start" || message.event.type === "message_delta") @@ -1227,12 +1262,56 @@ export class ClaudeAcpAgent implements Agent { throw RequestError.authRequired(); } + // Text and thinking blocks are normally emitted from the + // `stream_event` case above as `content_block_delta` arrives, + // so the assembled message replays them here. We skip them in + // that case to avoid duplication. However, if streaming never + // produced any text/thinking delta for this assistant message + // (e.g. because the underlying model proxy returned the response + // as a single non-streamed block, which is common with OpenAI- + // compatible gateways translating to the Anthropic protocol), the + // delta path emitted nothing — falling through with the filter in + // place would drop the assistant's final text on the floor while + // still returning `stopReason: "end_turn"`. Detect that case via + // `textStreamedMessageIds` and emit the assembled blocks as a + // fallback. + const assistantMessageId = + message.type === "assistant" ? message.message.id : undefined; + const textAlreadyStreamed = + !!assistantMessageId && session.textStreamedMessageIds.has(assistantMessageId); + + if ( + message.type === "assistant" && + !textAlreadyStreamed && + Array.isArray(message.message.content) && + message.message.content.some( + (item) => item.type === "text" || item.type === "thinking", + ) + ) { + this.logger.log( + `[claude-agent-acp] No streamed text/thinking deltas seen for ` + + `assistant message ${assistantMessageId ?? ""}; ` + + `emitting assembled blocks as fallback.`, + ); + // After this fallback emits, mark the id so a duplicate + // `assistant` delivery of the same message doesn't re-emit the + // same text/thinking blocks. Mirrors the dedupe semantics that + // streaming-emitted ids already have. + if (assistantMessageId) { + session.textStreamedMessageIds.add(assistantMessageId); + } + } + const content = message.type === "assistant" - ? // Handled by stream events above - message.message.content.filter( - (item) => !["text", "thinking"].includes(item.type), - ) + ? message.message.content.filter((item) => { + if (item.type === "text" || item.type === "thinking") { + // Keep only when the stream_event path didn't already + // emit chunks for this message id. + return !textAlreadyStreamed; + } + return true; + }) : message.message.content; for (const notification of toAcpNotifications( @@ -2258,6 +2337,7 @@ export class ClaudeAcpAgent implements Agent { contextWindowSize: inferContextWindowFromModel(models.currentModelId) ?? DEFAULT_CONTEXT_WINDOW, taskState, + textStreamedMessageIds: new Set(), }; return { diff --git a/src/tests/acp-agent.test.ts b/src/tests/acp-agent.test.ts index b01dbe77..46ddb5f9 100644 --- a/src/tests/acp-agent.test.ts +++ b/src/tests/acp-agent.test.ts @@ -1544,6 +1544,7 @@ describe("stop reason propagation", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; } @@ -1688,6 +1689,7 @@ describe("stop reason propagation", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; const response = await agent.prompt({ @@ -1806,6 +1808,281 @@ describe("stop reason propagation", () => { }); }); +describe("assistant text fallback when streaming yields no text deltas", () => { + function createCapturingAgent() { + const notifications: SessionNotification[] = []; + const mockClient = { + sessionUpdate: async (n: SessionNotification) => { + notifications.push(n); + }, + extNotification: async () => {}, + } as unknown as AgentSideConnection; + const agent = new ClaudeAcpAgent(mockClient, { log: () => {}, error: () => {} }); + return { agent, notifications }; + } + + function injectSession(agent: ClaudeAcpAgent, messages: any[]) { + const input = new Pushable(); + async function* messageGenerator() { + const iter = input[Symbol.asyncIterator](); + const { value: userMessage, done } = await iter.next(); + if (!done && userMessage) { + yield { + type: "user", + message: userMessage.message, + parent_tool_use_id: null, + uuid: userMessage.uuid, + session_id: "test-session", + isReplay: true, + }; + } + yield* messages; + } + agent.sessions["test-session"] = { + query: messageGenerator() as any, + input, + cancelled: false, + cwd: "/test", + sessionFingerprint: JSON.stringify({ cwd: "/test", mcpServers: [] }), + modes: { currentModeId: "default", availableModes: [] }, + models: { currentModelId: "default", availableModels: [] }, + modelInfos: [], + settingsManager: { dispose: vi.fn() } as any, + accumulatedUsage: { + inputTokens: 0, + outputTokens: 0, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }, + configOptions: [], + promptRunning: false, + pendingMessages: new Map(), + nextPendingOrder: 0, + abortController: new AbortController(), + emitRawSDKMessages: false, + contextWindowSize: 200000, + taskState: new Map(), + textStreamedMessageIds: new Set(), + }; + } + + function makeResult() { + return { + type: "result" as const, + subtype: "success" as const, + stop_reason: null, + is_error: false, + result: "", + errors: [], + duration_ms: 0, + duration_api_ms: 0, + num_turns: 1, + total_cost_usd: 0, + usage: { + input_tokens: 0, + output_tokens: 0, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + }, + modelUsage: {}, + permission_denials: [], + uuid: randomUUID(), + session_id: "test-session", + }; + } + + function makeAssistantMessage(messageId: string, text: string) { + return { + type: "assistant" as const, + parent_tool_use_id: null, + uuid: randomUUID(), + session_id: "test-session", + message: { + id: messageId, + type: "message" as const, + role: "assistant" as const, + model: "claude-test", + content: [{ type: "text", text }], + stop_reason: "end_turn", + stop_sequence: null, + usage: { + input_tokens: 0, + output_tokens: 0, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + }, + }, + }; + } + + it("emits the assembled text when no content_block_delta was streamed", async () => { + const { agent, notifications } = createCapturingAgent(); + const messageId = "msg_no_stream"; + const finalText = "final answer never streamed"; + injectSession(agent, [ + makeAssistantMessage(messageId, finalText), + makeResult(), + { type: "system", subtype: "session_state_changed", state: "idle" }, + ]); + + const response = await agent.prompt({ + sessionId: "test-session", + prompt: [{ type: "text", text: "go" }], + }); + + expect(response.stopReason).toBe("end_turn"); + const textChunks = notifications + .map((n) => n.update) + .filter( + (u) => + u.sessionUpdate === "agent_message_chunk" && + u.content.type === "text" && + u.content.text === finalText, + ); + expect(textChunks.length).toBe(1); + }); + + it("does not re-emit text already streamed via content_block_delta", async () => { + const { agent, notifications } = createCapturingAgent(); + const messageId = "msg_streamed"; + const streamedText = "streamed answer"; + injectSession(agent, [ + { + type: "stream_event", + parent_tool_use_id: null, + uuid: randomUUID(), + session_id: "test-session", + event: { + type: "message_start", + message: { + id: messageId, + type: "message", + role: "assistant", + model: "claude-test", + content: [], + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: 0, + output_tokens: 0, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + }, + }, + }, + }, + { + type: "stream_event", + parent_tool_use_id: null, + uuid: randomUUID(), + session_id: "test-session", + event: { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: streamedText }, + }, + }, + makeAssistantMessage(messageId, streamedText), + makeResult(), + { type: "system", subtype: "session_state_changed", state: "idle" }, + ]); + + const response = await agent.prompt({ + sessionId: "test-session", + prompt: [{ type: "text", text: "go" }], + }); + + expect(response.stopReason).toBe("end_turn"); + const textChunks = notifications + .map((n) => n.update) + .filter( + (u) => + u.sessionUpdate === "agent_message_chunk" && + u.content.type === "text" && + u.content.text === streamedText, + ); + // Only the streamed content_block_delta should produce a chunk; the + // assembled message must not duplicate it. + expect(textChunks.length).toBe(1); + }); + + it("fallback emits exactly once when the same assistant message is delivered twice", async () => { + // Regression test: the SDK occasionally delivers the same assembled + // assistant message twice (observed in production logs). Without + // recording the id into `textStreamedMessageIds` after the first + // fallback emit, the second delivery would re-fire the fallback and + // ACP clients would see duplicate text chunks. After the dedupe fix + // the second delivery must be a no-op. + const { agent, notifications } = createCapturingAgent(); + const messageId = "msg_delivered_twice"; + const finalText = "final answer delivered twice"; + injectSession(agent, [ + makeAssistantMessage(messageId, finalText), + makeAssistantMessage(messageId, finalText), + makeResult(), + { type: "system", subtype: "session_state_changed", state: "idle" }, + ]); + + const response = await agent.prompt({ + sessionId: "test-session", + prompt: [{ type: "text", text: "go" }], + }); + + expect(response.stopReason).toBe("end_turn"); + const textChunks = notifications + .map((n) => n.update) + .filter( + (u) => + u.sessionUpdate === "agent_message_chunk" && + u.content.type === "text" && + u.content.text === finalText, + ); + expect(textChunks.length).toBe(1); + }); + + it("textStreamedMessageIds is cleared at the start of each new turn", async () => { + // Regression test: assistant message ids are turn-scoped. Without + // per-turn clear, the set would accumulate over the session's lifetime + // and a (rare) reuse of an id across turns would be misclassified as + // "already streamed", silently dropping the fallback emit. The two + // turns below use the same messageId; both should fallback-emit + // exactly once because the set is reset on each `prompt()` entry. + const { agent, notifications } = createCapturingAgent(); + const messageId = "msg_reused_across_turns"; + const finalText = "answer for both turns"; + injectSession(agent, [ + makeAssistantMessage(messageId, finalText), + makeResult(), + { type: "system", subtype: "session_state_changed", state: "idle" }, + makeAssistantMessage(messageId, finalText), + makeResult(), + { type: "system", subtype: "session_state_changed", state: "idle" }, + ]); + + const first = await agent.prompt({ + sessionId: "test-session", + prompt: [{ type: "text", text: "turn 1" }], + }); + const second = await agent.prompt({ + sessionId: "test-session", + prompt: [{ type: "text", text: "turn 2" }], + }); + + expect(first.stopReason).toBe("end_turn"); + expect(second.stopReason).toBe("end_turn"); + const textChunks = notifications + .map((n) => n.update) + .filter( + (u) => + u.sessionUpdate === "agent_message_chunk" && + u.content.type === "text" && + u.content.text === finalText, + ); + // Two turns each fallback-emit once. + expect(textChunks.length).toBe(2); + }); +}); + describe("session/close", () => { function createMockAgent() { const mockClient = { @@ -1847,6 +2124,7 @@ describe("session/close", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; return agent.sessions[sessionId]!; } @@ -1931,6 +2209,7 @@ describe("session/delete", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; return agent.sessions[sessionId]!; } @@ -2032,6 +2311,7 @@ describe("getOrCreateSession param change detection", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; return agent.sessions[sessionId]!; } @@ -2267,6 +2547,7 @@ describe("usage_update computation", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; } @@ -3167,6 +3448,7 @@ describe("emitRawSDKMessages", () => { emitRawSDKMessages, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; } @@ -3395,6 +3677,7 @@ describe("result origin handling", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; } @@ -3570,6 +3853,7 @@ describe("memory_recall handling", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; } @@ -3800,6 +4084,7 @@ describe("post-error recovery", () => { emitRawSDKMessages: false, contextWindowSize: 200000, taskState: new Map(), + textStreamedMessageIds: new Set(), }; return { interrupt }; }