Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 84 additions & 4 deletions src/acp-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ type Session = {
/** Accumulated task list for the session, keyed by task ID. Task IDs are
* per-session, so this state must not be shared across sessions. */
taskState: TaskState;
/** Assistant message ids whose text/thinking content was already emitted as
* `agent_message_chunk` / `agent_thought_chunk` via `stream_event` partial
* messages. Used by the top-level `assistant` case to decide whether to
* skip text/thinking blocks (the default, when streaming worked) or fall
* back to emitting the assembled blocks (when the underlying SDK / model
* proxy never produced `content_block_delta` for them). */
textStreamedMessageIds: Set<string>;
/** Id of the assistant message currently being streamed via `stream_event`
* partial messages, captured at `message_start` so subsequent
* `content_block_delta` events can record their parent into
* `textStreamedMessageIds`. */
currentStreamingMessageId?: string;
};

/** Compute a stable fingerprint of the session-defining params so we can
Expand Down Expand Up @@ -743,6 +755,12 @@ export class ClaudeAcpAgent implements Agent {
cachedWriteTokens: 0,
};

// Assistant message ids are turn-scoped — clear before a new turn so the
// set doesn't grow unboundedly over a long-lived session, and so that an
// id reused across turns (uncommon but possible) is re-evaluated.
session.textStreamedMessageIds.clear();
session.currentStreamingMessageId = undefined;

let lastAssistantTotalUsage: number | null = null;
let lastAssistantUsage: UsageSnapshot | null = null;
let lastAssistantModel: string | null = null;
Expand Down Expand Up @@ -1056,6 +1074,23 @@ export class ClaudeAcpAgent implements Agent {
break;
}
case "stream_event": {
// Track which assistant message ids have produced streamed text /
// thinking deltas so the `assistant` case below can fall back to
// emitting the assembled content when streaming didn't (e.g. when
// the SDK is talking to a model proxy that never produces
// `content_block_delta` events for the second model call in a
// tool-use loop).
if (message.event.type === "message_start") {
session.currentStreamingMessageId = message.event.message.id;
} else if (
message.event.type === "content_block_delta" &&
(message.event.delta.type === "text_delta" ||
message.event.delta.type === "thinking_delta") &&
session.currentStreamingMessageId
) {
session.textStreamedMessageIds.add(session.currentStreamingMessageId);
}

if (
message.parent_tool_use_id === null &&
(message.event.type === "message_start" || message.event.type === "message_delta")
Expand Down Expand Up @@ -1227,12 +1262,56 @@ export class ClaudeAcpAgent implements Agent {
throw RequestError.authRequired();
}

// Text and thinking blocks are normally emitted from the
// `stream_event` case above as `content_block_delta` arrives,
// so the assembled message replays them here. We skip them in
// that case to avoid duplication. However, if streaming never
// produced any text/thinking delta for this assistant message
// (e.g. because the underlying model proxy returned the response
// as a single non-streamed block, which is common with OpenAI-
// compatible gateways translating to the Anthropic protocol), the
// delta path emitted nothing — falling through with the filter in
// place would drop the assistant's final text on the floor while
// still returning `stopReason: "end_turn"`. Detect that case via
// `textStreamedMessageIds` and emit the assembled blocks as a
// fallback.
const assistantMessageId =
message.type === "assistant" ? message.message.id : undefined;
const textAlreadyStreamed =
!!assistantMessageId && session.textStreamedMessageIds.has(assistantMessageId);

if (
message.type === "assistant" &&
!textAlreadyStreamed &&
Array.isArray(message.message.content) &&
message.message.content.some(
(item) => item.type === "text" || item.type === "thinking",
)
) {
this.logger.log(
`[claude-agent-acp] No streamed text/thinking deltas seen for ` +
`assistant message ${assistantMessageId ?? "<unknown>"}; ` +
`emitting assembled blocks as fallback.`,
);
// After this fallback emits, mark the id so a duplicate
// `assistant` delivery of the same message doesn't re-emit the
// same text/thinking blocks. Mirrors the dedupe semantics that
// streaming-emitted ids already have.
if (assistantMessageId) {
session.textStreamedMessageIds.add(assistantMessageId);
}
}

const content =
message.type === "assistant"
? // Handled by stream events above
message.message.content.filter(
(item) => !["text", "thinking"].includes(item.type),
)
? message.message.content.filter((item) => {
if (item.type === "text" || item.type === "thinking") {
// Keep only when the stream_event path didn't already
// emit chunks for this message id.
return !textAlreadyStreamed;
}
return true;
})
: message.message.content;

for (const notification of toAcpNotifications(
Expand Down Expand Up @@ -2258,6 +2337,7 @@ export class ClaudeAcpAgent implements Agent {
contextWindowSize:
inferContextWindowFromModel(models.currentModelId) ?? DEFAULT_CONTEXT_WINDOW,
taskState,
textStreamedMessageIds: new Set<string>(),
};

return {
Expand Down
Loading