From 886fdf5b58518b4cd21458f9cf9f4f39fb1735a3 Mon Sep 17 00:00:00 2001 From: Steffen Deusch Date: Fri, 27 Feb 2026 17:22:53 +0100 Subject: [PATCH] Ensure concurrent prompts are handled This basically implements the pending prompt queueing RFD. When a prompt request is received while the agent is already generating, we immediately forward the message to Claude and then wait until it replays the message back to us. When it replays it, we respond to the previous prompt request with "end_turn", which also singifies to the client that the message is now being processed by the agent. See agentclientprotocol/agent-client-protocol#484. --- src/acp-agent.ts | 504 +++++++++++++++++++++++++++-------------------- 1 file changed, 288 insertions(+), 216 deletions(-) diff --git a/src/acp-agent.ts b/src/acp-agent.ts index 65b5afb1..5db2e6a6 100644 --- a/src/acp-agent.ts +++ b/src/acp-agent.ts @@ -149,6 +149,9 @@ type Session = { settingsManager: SettingsManager; accumulatedUsage: AccumulatedUsage; configOptions: SessionConfigOption[]; + promptRunning: boolean; + pendingMessages: Map void; order: number }>; + nextPendingOrder: number; }; type BackgroundTerminal = @@ -281,6 +284,11 @@ export class ClaudeAcpAgent implements Agent { return { protocolVersion: 1, agentCapabilities: { + _meta: { + claudeCode: { + promptQueueing: true, + }, + }, promptCapabilities: { image: true, embeddedContext: true, @@ -410,12 +418,13 @@ export class ClaudeAcpAgent implements Agent { } async prompt(params: PromptRequest): Promise { - if (!this.sessions[params.sessionId]) { + const session = this.sessions[params.sessionId]; + if (!session) { throw new Error("Session not found"); } - this.sessions[params.sessionId].cancelled = false; - this.sessions[params.sessionId].accumulatedUsage = { + session.cancelled = false; + session.accumulatedUsage = { inputTokens: 0, outputTokens: 0, cachedReadTokens: 0, @@ -424,250 +433,306 @@ export class ClaudeAcpAgent implements Agent { let lastAssistantTotalUsage: number | null = null; - const { query, input } = this.sessions[params.sessionId]; + const userMessage = promptToClaude(params); - input.push(promptToClaude(params)); - while (true) { - const { value: message, done } = await (query as AsyncGenerator).next(); - - if (done || !message) { - if (this.sessions[params.sessionId].cancelled) { - return { stopReason: "cancelled" }; - } - break; + if (session.promptRunning) { + const uuid = randomUUID(); + userMessage.uuid = uuid; + session.input.push(userMessage); + const order = session.nextPendingOrder++; + const cancelled = await new Promise((resolve) => { + session.pendingMessages.set(uuid, { resolve, order }); + }); + if (cancelled) { + return { stopReason: "cancelled" }; } + } else { + session.input.push(userMessage); + } - switch (message.type) { - case "system": - if (message.subtype === "compact_boundary") { - // We don't know the exact size, but since we compacted, - // we set it to zero. The client gets the exact size on the next message. - lastAssistantTotalUsage = 0; - } - switch (message.subtype) { - case "init": - break; - case "compact_boundary": - case "hook_started": - case "task_notification": - case "hook_progress": - case "hook_response": - case "status": - case "files_persisted": - case "task_started": - case "task_progress": - // Todo: process via status api: https://docs.claude.com/en/docs/claude-code/hooks#hook-output - break; - default: - unreachable(message, this.logger); - break; - } - break; - case "result": { - if (this.sessions[params.sessionId].cancelled) { + session.promptRunning = true; + let handedOff = false; + + try { + while (true) { + const { value: message, done } = await ( + session.query as AsyncGenerator + ).next(); + + if (done || !message) { + if (session.cancelled) { return { stopReason: "cancelled" }; } + break; + } - // Accumulate usage from this result - const session = this.sessions[params.sessionId]; - session.accumulatedUsage.inputTokens += message.usage.input_tokens; - session.accumulatedUsage.outputTokens += message.usage.output_tokens; - session.accumulatedUsage.cachedReadTokens += message.usage.cache_read_input_tokens; - session.accumulatedUsage.cachedWriteTokens += message.usage.cache_creation_input_tokens; - - // Calculate context window size from modelUsage (minimum across all models used) - const contextWindows = Object.values(message.modelUsage).map((m) => m.contextWindow); - const contextWindowSize = - contextWindows.length > 0 ? Math.min(...contextWindows) : 200000; - - // Send usage_update notification - if (lastAssistantTotalUsage !== null) { - await this.client.sessionUpdate({ - sessionId: params.sessionId, - update: { - sessionUpdate: "usage_update", - used: lastAssistantTotalUsage, - size: contextWindowSize, - cost: { - amount: message.total_cost_usd, - currency: "USD", + switch (message.type) { + case "system": + if (message.subtype === "compact_boundary") { + // We don't know the exact size, but since we compacted, + // we set it to zero. The client gets the exact size on the next message. + lastAssistantTotalUsage = 0; + } + switch (message.subtype) { + case "init": + break; + case "compact_boundary": + case "hook_started": + case "task_notification": + case "hook_progress": + case "hook_response": + case "status": + case "files_persisted": + case "task_started": + case "task_progress": + // Todo: process via status api: https://docs.claude.com/en/docs/claude-code/hooks#hook-output + break; + default: + unreachable(message, this.logger); + break; + } + break; + case "result": { + if (session.cancelled) { + return { stopReason: "cancelled" }; + } + + // Accumulate usage from this result + session.accumulatedUsage.inputTokens += message.usage.input_tokens; + session.accumulatedUsage.outputTokens += message.usage.output_tokens; + session.accumulatedUsage.cachedReadTokens += message.usage.cache_read_input_tokens; + session.accumulatedUsage.cachedWriteTokens += message.usage.cache_creation_input_tokens; + + // Calculate context window size from modelUsage (minimum across all models used) + const contextWindows = Object.values(message.modelUsage).map((m) => m.contextWindow); + const contextWindowSize = + contextWindows.length > 0 ? Math.min(...contextWindows) : 200000; + + // Send usage_update notification + if (lastAssistantTotalUsage !== null) { + await this.client.sessionUpdate({ + sessionId: params.sessionId, + update: { + sessionUpdate: "usage_update", + used: lastAssistantTotalUsage, + size: contextWindowSize, + cost: { + amount: message.total_cost_usd, + currency: "USD", + }, }, - }, - }); - } + }); + } - // Build the usage response - const usage: PromptResponse["usage"] = { - inputTokens: session.accumulatedUsage.inputTokens, - outputTokens: session.accumulatedUsage.outputTokens, - cachedReadTokens: session.accumulatedUsage.cachedReadTokens, - cachedWriteTokens: session.accumulatedUsage.cachedWriteTokens, - totalTokens: - session.accumulatedUsage.inputTokens + - session.accumulatedUsage.outputTokens + - session.accumulatedUsage.cachedReadTokens + - session.accumulatedUsage.cachedWriteTokens, - }; + // Build the usage response + const usage: PromptResponse["usage"] = { + inputTokens: session.accumulatedUsage.inputTokens, + outputTokens: session.accumulatedUsage.outputTokens, + cachedReadTokens: session.accumulatedUsage.cachedReadTokens, + cachedWriteTokens: session.accumulatedUsage.cachedWriteTokens, + totalTokens: + session.accumulatedUsage.inputTokens + + session.accumulatedUsage.outputTokens + + session.accumulatedUsage.cachedReadTokens + + session.accumulatedUsage.cachedWriteTokens, + }; - switch (message.subtype) { - case "success": { - if (message.result.includes("Please run /login")) { - throw RequestError.authRequired(); - } - if (message.is_error) { - throw RequestError.internalError(undefined, message.result); + switch (message.subtype) { + case "success": { + if (message.result.includes("Please run /login")) { + throw RequestError.authRequired(); + } + if (message.is_error) { + throw RequestError.internalError(undefined, message.result); + } + return { stopReason: "end_turn", usage }; } - return { stopReason: "end_turn", usage }; + case "error_during_execution": + if (message.is_error) { + throw RequestError.internalError( + undefined, + message.errors.join(", ") || message.subtype, + ); + } + return { stopReason: "end_turn", usage }; + case "error_max_budget_usd": + case "error_max_turns": + case "error_max_structured_output_retries": + if (message.is_error) { + throw RequestError.internalError( + undefined, + message.errors.join(", ") || message.subtype, + ); + } + return { stopReason: "max_turn_requests", usage }; + default: + unreachable(message, this.logger); + break; } - case "error_during_execution": - if (message.is_error) { - throw RequestError.internalError( - undefined, - message.errors.join(", ") || message.subtype, - ); - } - return { stopReason: "end_turn", usage }; - case "error_max_budget_usd": - case "error_max_turns": - case "error_max_structured_output_retries": - if (message.is_error) { - throw RequestError.internalError( - undefined, - message.errors.join(", ") || message.subtype, - ); - } - return { stopReason: "max_turn_requests", usage }; - default: - unreachable(message, this.logger); - break; - } - break; - } - case "stream_event": { - for (const notification of streamEventToAcpNotifications( - message, - params.sessionId, - this.toolUseCache, - this.client, - this.logger, - { clientCapabilities: this.clientCapabilities }, - )) { - await this.client.sessionUpdate(notification); + break; } - break; - } - case "user": - case "assistant": { - if (this.sessions[params.sessionId].cancelled) { + case "stream_event": { + for (const notification of streamEventToAcpNotifications( + message, + params.sessionId, + this.toolUseCache, + this.client, + this.logger, + { clientCapabilities: this.clientCapabilities }, + )) { + await this.client.sessionUpdate(notification); + } break; } + case "user": + case "assistant": { + if (session.cancelled) { + break; + } - // Store latest assistant usage (excluding subagents) - if ((message.message as any).usage && message.parent_tool_use_id === null) { - const messageWithUsage = message.message as unknown as SDKResultMessage; - lastAssistantTotalUsage = - messageWithUsage.usage.input_tokens + - messageWithUsage.usage.output_tokens + - messageWithUsage.usage.cache_read_input_tokens + - messageWithUsage.usage.cache_creation_input_tokens; - } + // Check for queued prompt replay + if (message.type === "user" && "uuid" in message && message.uuid) { + const pending = session.pendingMessages.get(message.uuid as string); + if (pending) { + pending.resolve(false); + session.pendingMessages.delete(message.uuid as string); + handedOff = true; + // the current loop stops with end_turn, + // the loop of the next prompt continues running + return { stopReason: "end_turn" }; + } + } - // Slash commands like /compact can generate invalid output... doesn't match - // their own docs: https://docs.anthropic.com/en/docs/claude-code/sdk/sdk-slash-commands#%2Fcompact-compact-conversation-history - if ( - typeof message.message.content === "string" && - message.message.content.includes("") - ) { - // Handle /context by sending its reply as regular agent message. - if (message.message.content.includes("Context Usage")) { - for (const notification of toAcpNotifications( - message.message.content - .replace("", "") - .replace("", ""), - "assistant", - params.sessionId, - this.toolUseCache, - this.client, - this.logger, - { clientCapabilities: this.clientCapabilities }, - )) { - await this.client.sessionUpdate(notification); + // Store latest assistant usage (excluding subagents) + if ((message.message as any).usage && message.parent_tool_use_id === null) { + const messageWithUsage = message.message as unknown as SDKResultMessage; + lastAssistantTotalUsage = + messageWithUsage.usage.input_tokens + + messageWithUsage.usage.output_tokens + + messageWithUsage.usage.cache_read_input_tokens + + messageWithUsage.usage.cache_creation_input_tokens; + } + + // Slash commands like /compact can generate invalid output... doesn't match + // their own docs: https://docs.anthropic.com/en/docs/claude-code/sdk/sdk-slash-commands#%2Fcompact-compact-conversation-history + if ( + typeof message.message.content === "string" && + message.message.content.includes("") + ) { + // Handle /context by sending its reply as regular agent message. + if (message.message.content.includes("Context Usage")) { + for (const notification of toAcpNotifications( + message.message.content + .replace("", "") + .replace("", ""), + "assistant", + params.sessionId, + this.toolUseCache, + this.client, + this.logger, + { clientCapabilities: this.clientCapabilities }, + )) { + await this.client.sessionUpdate(notification); + } } + this.logger.log(message.message.content); + break; + } + + if ( + typeof message.message.content === "string" && + message.message.content.includes("") + ) { + this.logger.error(message.message.content); + break; + } + // Skip these user messages for now, since they seem to just be messages we don't want in the feed + if ( + message.type === "user" && + (typeof message.message.content === "string" || + (Array.isArray(message.message.content) && + message.message.content.length === 1 && + message.message.content[0].type === "text")) + ) { + break; + } + + if ( + message.type === "assistant" && + message.message.model === "" && + Array.isArray(message.message.content) && + message.message.content.length === 1 && + message.message.content[0].type === "text" && + message.message.content[0].text.includes("Please run /login") + ) { + throw RequestError.authRequired(); } - this.logger.log(message.message.content); - break; - } - if ( - typeof message.message.content === "string" && - message.message.content.includes("") - ) { - this.logger.error(message.message.content); + const content = + message.type === "assistant" + ? // Handled by stream events above + message.message.content.filter( + (item) => !["text", "thinking"].includes(item.type), + ) + : message.message.content; + + for (const notification of toAcpNotifications( + content, + message.message.role, + params.sessionId, + this.toolUseCache, + this.client, + this.logger, + { + clientCapabilities: this.clientCapabilities, + parentToolUseId: message.parent_tool_use_id, + }, + )) { + await this.client.sessionUpdate(notification); + } break; } - // Skip these user messages for now, since they seem to just be messages we don't want in the feed - if ( - message.type === "user" && - (typeof message.message.content === "string" || - (Array.isArray(message.message.content) && - message.message.content.length === 1 && - message.message.content[0].type === "text")) - ) { + case "tool_progress": + case "tool_use_summary": break; + case "auth_status": + break; + default: + unreachable(message); + break; + } + } + throw new Error("Session did not end in result"); + } finally { + if (!handedOff) { + session.promptRunning = false; + // This usually should not happen, but in case the loop finishes + // without claude sending all message replays, we resolve the + // next pending prompt call to ensure no prompts get stuck. + if (session.pendingMessages.size > 0) { + const next = [...session.pendingMessages.entries()].sort( + (a, b) => a[1].order - b[1].order, + )[0]; + if (next) { + next[1].resolve(false); + session.pendingMessages.delete(next[0]); } - - if ( - message.type === "assistant" && - message.message.model === "" && - Array.isArray(message.message.content) && - message.message.content.length === 1 && - message.message.content[0].type === "text" && - message.message.content[0].text.includes("Please run /login") - ) { - throw RequestError.authRequired(); - } - - const content = - message.type === "assistant" - ? // Handled by stream events above - message.message.content.filter((item) => !["text", "thinking"].includes(item.type)) - : message.message.content; - - for (const notification of toAcpNotifications( - content, - message.message.role, - params.sessionId, - this.toolUseCache, - this.client, - this.logger, - { - clientCapabilities: this.clientCapabilities, - parentToolUseId: message.parent_tool_use_id, - }, - )) { - await this.client.sessionUpdate(notification); - } - break; } - case "tool_progress": - case "tool_use_summary": - break; - case "auth_status": - break; - default: - unreachable(message); - break; } } - throw new Error("Session did not end in result"); } async cancel(params: CancelNotification): Promise { - if (!this.sessions[params.sessionId]) { + const session = this.sessions[params.sessionId]; + if (!session) { throw new Error("Session not found"); } - this.sessions[params.sessionId].cancelled = true; - await this.sessions[params.sessionId].query.interrupt(); + session.cancelled = true; + for (const [, pending] of session.pendingMessages) { + pending.resolve(true); + } + session.pendingMessages.clear(); + await session.query.interrupt(); } async unstable_setSessionModel( @@ -1074,6 +1139,10 @@ export class ClaudeAcpAgent implements Agent { : isStaticBinary() ? { pathToClaudeCodeExecutable: process.execPath } : {}), + extraArgs: { + ...userProvidedOptions?.extraArgs, + "replay-user-messages": "", + }, disallowedTools: [...(userProvidedOptions?.disallowedTools || []), ...disallowedTools], tools: { type: "preset", preset: "claude_code" }, hooks: { @@ -1134,6 +1203,9 @@ export class ClaudeAcpAgent implements Agent { cachedWriteTokens: 0, }, configOptions: [], + promptRunning: false, + pendingMessages: new Map(), + nextPendingOrder: 0, }; const initializationResult = await q.initializationResult();