From 19eed286e8229db0a710e1887d09e28fa25bd344 Mon Sep 17 00:00:00 2001 From: Justin Gray Date: Fri, 3 Jul 2026 23:57:56 +0000 Subject: [PATCH] Adopt upstream #3669: Cursor ACP thread rendering, thought output, cancel delivery Applies pingdotgg/t3code#3669 on top of the fork's #9 resume/recovery hardening. Fixes: session/cancel sent as JSON-RPC notification (not request), runtime-unique ACP segment item ids so resumed sessions render below the latest user message, agent_thought_chunk parsed as expandable reasoning rows, and a turn-completion drain race. Supersedes the reverted #10 turn-scoping. Co-authored-by: Cursor Co-Authored-By: Claude Opus 4.8 --- apps/server/scripts/acp-mock-agent.ts | 29 +++++ .../Layers/ProviderRuntimeIngestion.ts | 35 +++++ .../src/provider/Layers/CursorAdapter.test.ts | 28 +++- .../src/provider/Layers/CursorAdapter.ts | 32 +++++ .../server/src/provider/Layers/GrokAdapter.ts | 4 + .../provider/acp/AcpCoreRuntimeEvents.test.ts | 44 +++++++ .../src/provider/acp/AcpCoreRuntimeEvents.ts | 26 +++- .../provider/acp/AcpJsonRpcConnection.test.ts | 58 +++++++++ .../src/provider/acp/AcpRuntimeModel.test.ts | 30 +++++ .../src/provider/acp/AcpRuntimeModel.ts | 26 ++++ .../src/provider/acp/AcpSessionRuntime.ts | 122 ++++++++++++++---- apps/web/src/session-logic.ts | 4 +- packages/effect-acp/src/protocol.test.ts | 10 +- packages/effect-acp/src/protocol.ts | 16 ++- 14 files changed, 433 insertions(+), 31 deletions(-) diff --git a/apps/server/scripts/acp-mock-agent.ts b/apps/server/scripts/acp-mock-agent.ts index bc7828dd854..1add889c6b9 100644 --- a/apps/server/scripts/acp-mock-agent.ts +++ b/apps/server/scripts/acp-mock-agent.ts @@ -16,6 +16,7 @@ const exitLogPath = process.env.T3_ACP_EXIT_LOG_PATH; const emitToolCalls = process.env.T3_ACP_EMIT_TOOL_CALLS === "1"; const emitInterleavedAssistantToolCalls = process.env.T3_ACP_EMIT_INTERLEAVED_ASSISTANT_TOOL_CALLS === "1"; +const emitThoughtChunks = process.env.T3_ACP_EMIT_THOUGHT_CHUNKS === "1"; const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHOLDERS === "1"; const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1"; const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1"; @@ -580,6 +581,34 @@ const program = Effect.gen(function* () { return yield* Effect.never; } + if (emitThoughtChunks) { + yield* agent.client.sessionUpdate({ + sessionId: requestedSessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "thinking about " }, + }, + }); + + yield* agent.client.sessionUpdate({ + sessionId: requestedSessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "the answer" }, + }, + }); + + yield* agent.client.sessionUpdate({ + sessionId: requestedSessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "final answer" }, + }, + }); + + return { stopReason: "end_turn" }; + } + if (emitInterleavedAssistantToolCalls) { const toolCallId = "tool-call-1"; diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index a94c3e1142d..d242ff49c7a 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -166,6 +166,19 @@ function truncateDetail(value: string, limit = 180): string { return value.length > limit ? `${value.slice(0, limit - 3)}...` : value; } +/** Full reasoning text is kept expandable in the work log but capped so a + * single thought segment cannot bloat the persisted activity payload. */ +const MAX_REASONING_ACTIVITY_CHARS = 8_000; + +function reasoningSummaryFromText(text: string): string { + const firstLine = text + .split("\n") + .find((line) => line.trim().length > 0) + ?.trim(); + const cleaned = firstLine?.replaceAll("**", "").trim(); + return truncateDetail(cleaned && cleaned.length > 0 ? cleaned : "Thinking", 120); +} + function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string | undefined { const trimmed = planMarkdown?.trim(); if (!trimmed) { @@ -591,6 +604,28 @@ function runtimeEventToActivities( } case "item.completed": { + // Reasoning segments (e.g. Cursor agent_thought_chunk) become expandable + // "thinking" rows in the work log instead of disappearing entirely. + if (event.payload.itemType === "reasoning") { + const reasoningText = event.payload.detail?.trim(); + if (!reasoningText) { + return []; + } + return [ + { + id: event.eventId, + createdAt: event.createdAt, + tone: "info", + kind: "reasoning", + summary: reasoningSummaryFromText(reasoningText), + payload: { + detail: truncateDetail(reasoningText, MAX_REASONING_ACTIVITY_CHARS), + }, + turnId: toTurnId(event.turnId) ?? null, + ...maybeSequence, + }, + ]; + } if (!isToolLifecycleItemType(event.payload.itemType)) { return []; } diff --git a/apps/server/src/provider/Layers/CursorAdapter.test.ts b/apps/server/src/provider/Layers/CursorAdapter.test.ts index 89e9c56eb8a..f6510a53b76 100644 --- a/apps/server/src/provider/Layers/CursorAdapter.test.ts +++ b/apps/server/src/provider/Layers/CursorAdapter.test.ts @@ -228,7 +228,9 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => { assert.isDefined(delta); if (delta?.type === "content.delta") { assert.equal(delta.payload.delta, "hello from mock"); - assert.match(String(delta.itemId), /^assistant:mock-session-1:segment:0$/); + // The middle part is a per-runtime tag that keeps segment ids unique + // across restarts that resume the same ACP session. + assert.match(String(delta.itemId), /^assistant:mock-session-1:[^:]+:segment:0$/); } const assistantCompleted = runtimeEvents.find( @@ -687,7 +689,7 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => { if (contentDelta?.type === "content.delta") { assert.equal(String(contentDelta.turnId), String(turn.turnId)); assert.equal(contentDelta.payload.delta, "hello from mock"); - assert.equal(String(contentDelta.itemId), "assistant:mock-session-1:segment:0"); + assert.match(String(contentDelta.itemId), /^assistant:mock-session-1:[^:]+:segment:0$/); } }); @@ -1044,6 +1046,8 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => { const serverSettings = yield* ServerSettingsService; const threadId = ThreadId.make("cursor-stop-pending-approval"); const approvalRequested = yield* Deferred.make(); + const sessionExited = yield* Deferred.make(); + const observedEvents: Array = []; const wrapperPath = yield* Effect.promise(() => makeMockAgentWrapper({ T3_ACP_EMIT_TOOL_CALLS: "1" }), @@ -1051,7 +1055,14 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => { yield* serverSettings.updateSettings({ providers: { cursor: { binaryPath: wrapperPath } } }); yield* Stream.runForEach(adapter.streamEvents, (event) => { - if (String(event.threadId) !== String(threadId) || event.type !== "request.opened") { + if (String(event.threadId) !== String(threadId)) { + return Effect.void; + } + observedEvents.push(event); + if (event.type === "session.exited") { + return Deferred.succeed(sessionExited, undefined).pipe(Effect.ignore); + } + if (event.type !== "request.opened") { return Effect.void; } return Deferred.succeed(approvalRequested, undefined).pipe(Effect.ignore); @@ -1078,6 +1089,17 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => { yield* Fiber.await(sendTurnFiber); assert.equal(yield* adapter.hasSession(threadId), false); + + // Teardown interrupts the notification fiber, which settles sendTurn's + // drain race. sendTurn must then bail out: emitting turn.completed after + // session.exited would publish out-of-order events for a removed session. + // sendTurn already finished above, so after session.exited arrives we + // only need to let the stream consumer drain any remaining events. + yield* Deferred.await(sessionExited); + for (let i = 0; i < 10; i += 1) { + yield* Effect.yieldNow; + } + assert.isFalse(observedEvents.some((event) => event.type === "turn.completed")); }), ); diff --git a/apps/server/src/provider/Layers/CursorAdapter.ts b/apps/server/src/provider/Layers/CursorAdapter.ts index 59788a2d225..c4536a73369 100644 --- a/apps/server/src/provider/Layers/CursorAdapter.ts +++ b/apps/server/src/provider/Layers/CursorAdapter.ts @@ -799,6 +799,7 @@ export function makeCursorAdapter( turnId: ctx.activeTurnId, itemId: event.itemId, lifecycle: "item.started", + channel: event.channel, }), ); return; @@ -811,6 +812,8 @@ export function makeCursorAdapter( turnId: ctx.activeTurnId, itemId: event.itemId, lifecycle: "item.completed", + channel: event.channel, + ...(event.text !== undefined ? { detail: event.text } : {}), }), ); return; @@ -861,6 +864,7 @@ export function makeCursorAdapter( threadId: ctx.threadId, turnId: ctx.activeTurnId, ...(event.itemId ? { itemId: event.itemId } : {}), + channel: event.channel, text: event.text, rawPayload: event.rawPayload, }), @@ -1013,6 +1017,34 @@ export function makeCursorAdapter( ), ); + // The prompt RPC can resolve while session/update notifications are + // still queued. Wait until the notification fiber has published all + // of them so content deltas and item completions reach consumers + // before turn.completed (otherwise buffered assistant text may be + // finalized against the wrong turn state). Racing against the + // notification fiber keeps this from hanging when the session is + // torn down mid-turn and nobody can acknowledge the drain barrier. + yield* ctx.notificationFiber + ? Effect.raceFirst( + ctx.acp.drainEvents, + Fiber.await(ctx.notificationFiber).pipe(Effect.asVoid), + ) + : Effect.void; + + // The notification fiber also terminates when stopSessionInternal + // interrupts it during teardown, which settles the race without the + // drain having happened. At that point the session is already gone + // (session.exited emitted, ctx removed from the map), so mutating + // turn state or emitting turn.completed here would produce + // out-of-order events against a removed session. Bail out instead. + if (ctx.stopped) { + return { + threadId: input.threadId, + turnId, + resumeCursor: ctx.session.resumeCursor, + }; + } + const turnRecord = ctx.turns.find((turn) => turn.id === turnId); if (turnRecord) { turnRecord.items.push({ prompt: promptParts, result }); diff --git a/apps/server/src/provider/Layers/GrokAdapter.ts b/apps/server/src/provider/Layers/GrokAdapter.ts index c22b2180183..922c3e6c257 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.ts @@ -818,6 +818,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte turnId: notificationTurnId, itemId: event.itemId, lifecycle: "item.started", + channel: event.channel, }), ); return; @@ -830,6 +831,8 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte turnId: notificationTurnId, itemId: event.itemId, lifecycle: "item.completed", + channel: event.channel, + ...(event.text !== undefined ? { detail: event.text } : {}), }), ); return; @@ -863,6 +866,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte threadId: ctx.threadId, turnId: notificationTurnId, ...(event.itemId ? { itemId: event.itemId } : {}), + channel: event.channel, text: event.text, rawPayload: event.rawPayload, }), diff --git a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts index 7fe25699bbc..7c7d6219eda 100644 --- a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts +++ b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts @@ -152,4 +152,48 @@ describe("AcpCoreRuntimeEvents", () => { }, }); }); + + it("maps thought-channel segments to reasoning items and reasoning_text deltas", () => { + const stamp = { eventId: "event-1" as never, createdAt: "2026-03-27T00:00:00.000Z" }; + const turnId = TurnId.make("turn-1"); + + expect( + makeAcpContentDeltaEvent({ + stamp, + provider: ProviderDriverKind.make("cursor"), + threadId: "thread-1" as never, + turnId, + itemId: "thought:session-1:tag:segment:0", + channel: "thought", + text: "Checking the failing test first.", + rawPayload: { sessionId: "session-1" }, + }), + ).toMatchObject({ + type: "content.delta", + payload: { + streamKind: "reasoning_text", + delta: "Checking the failing test first.", + }, + }); + + expect( + makeAcpAssistantItemEvent({ + stamp, + provider: ProviderDriverKind.make("cursor"), + threadId: "thread-1" as never, + turnId, + itemId: "thought:session-1:tag:segment:0", + lifecycle: "item.completed", + channel: "thought", + detail: "Checking the failing test first.", + }), + ).toMatchObject({ + type: "item.completed", + payload: { + itemType: "reasoning", + status: "completed", + detail: "Checking the failing test first.", + }, + }); + }); }); diff --git a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts index c93e61dc37b..20f0e68ff60 100644 --- a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts +++ b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts @@ -12,7 +12,12 @@ import { type TurnId, } from "@t3tools/contracts"; -import type { AcpPermissionRequest, AcpPlanUpdate, AcpToolCallState } from "./AcpRuntimeModel.ts"; +import type { + AcpAssistantChannel, + AcpPermissionRequest, + AcpPlanUpdate, + AcpToolCallState, +} from "./AcpRuntimeModel.ts"; type AcpAdapterRawSource = Extract< RuntimeEventRawSource, @@ -198,6 +203,9 @@ export function makeAcpAssistantItemEvent(input: { readonly turnId: TurnId | undefined; readonly itemId: string; readonly lifecycle: "item.started" | "item.completed"; + readonly channel?: AcpAssistantChannel; + /** Full segment text for completed thought segments. */ + readonly detail?: string; }): ProviderRuntimeEvent { return { type: input.lifecycle, @@ -207,8 +215,11 @@ export function makeAcpAssistantItemEvent(input: { turnId: input.turnId, itemId: RuntimeItemId.make(input.itemId), payload: { - itemType: "assistant_message", + itemType: input.channel === "thought" ? "reasoning" : "assistant_message", status: input.lifecycle === "item.completed" ? "completed" : "inProgress", + ...(input.detail !== undefined && input.detail.trim().length > 0 + ? { detail: input.detail } + : {}), }, }; } @@ -219,6 +230,7 @@ export function makeAcpContentDeltaEvent(input: { readonly threadId: ThreadId; readonly turnId: TurnId | undefined; readonly itemId?: string; + readonly channel?: AcpAssistantChannel; readonly text: string; readonly rawPayload: unknown; }): ProviderRuntimeEvent { @@ -230,7 +242,15 @@ export function makeAcpContentDeltaEvent(input: { turnId: input.turnId, ...(input.itemId ? { itemId: RuntimeItemId.make(input.itemId) } : {}), payload: { - streamKind: "assistant_text", + // reasoning_text deltas are intentionally not appended to the assistant + // message by ProviderRuntimeIngestion (same as the Codex/Claude/OpenCode + // adapters' reasoning deltas). The full thought text is accumulated in + // AcpSessionRuntime's active segment and delivered via item.completed + // (itemType "reasoning"), which ingestion persists as an expandable + // thinking row. Segments are closed on channel switches, tool calls, and + // prompt settlement (including cancellation), so accumulated reasoning + // survives an interrupted turn. + streamKind: input.channel === "thought" ? "reasoning_text" : "assistant_text", delta: input.text, }, raw: { diff --git a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts index 4e9700dab7d..db5dd961494 100644 --- a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts +++ b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts @@ -293,6 +293,64 @@ describe("AcpSessionRuntime", () => { ), ); + it.effect("segments thought chunks separately from assistant text", () => + Effect.gen(function* () { + const runtime = yield* AcpSessionRuntime.AcpSessionRuntime; + yield* runtime.start(); + + const promptResult = yield* runtime.prompt({ + prompt: [{ type: "text", text: "hi" }], + }); + expect(promptResult).toMatchObject({ stopReason: "end_turn" }); + + const notes = Array.from(yield* Stream.runCollect(Stream.take(runtime.getEvents(), 7))); + expect(notes.map((note) => note._tag)).toEqual([ + "AssistantItemStarted", + "ContentDelta", + "ContentDelta", + "AssistantItemCompleted", + "AssistantItemStarted", + "ContentDelta", + "AssistantItemCompleted", + ]); + + const thoughtStarted = notes[0]; + const thoughtCompleted = notes[3]; + const assistantStarted = notes[4]; + if ( + thoughtStarted?._tag === "AssistantItemStarted" && + thoughtCompleted?._tag === "AssistantItemCompleted" && + assistantStarted?._tag === "AssistantItemStarted" + ) { + expect(thoughtStarted.channel).toBe("thought"); + expect(thoughtStarted.itemId).toMatch(/^thought:/); + expect(thoughtCompleted.itemId).toBe(thoughtStarted.itemId); + // The completed thought segment carries the accumulated text. + expect(thoughtCompleted.text).toBe("thinking about the answer"); + expect(assistantStarted.channel).toBe("assistant"); + expect(assistantStarted.itemId).toMatch(/^assistant:/); + expect(assistantStarted.itemId).not.toBe(thoughtStarted.itemId); + } + }).pipe( + Effect.provide( + AcpSessionRuntime.layer({ + spawn: { + command: mockAgentCommand, + args: mockAgentArgs, + env: { + T3_ACP_EMIT_THOUGHT_CHUNKS: "1", + }, + }, + cwd: process.cwd(), + clientInfo: { name: "t3-test", version: "0.0.0" }, + authMethodId: "test", + }), + ), + Effect.scoped, + Effect.provide(NodeServices.layer), + ), + ); + it.effect("suppresses generic placeholder tool updates until completion", () => Effect.gen(function* () { const runtime = yield* AcpSessionRuntime.AcpSessionRuntime; diff --git a/apps/server/src/provider/acp/AcpRuntimeModel.test.ts b/apps/server/src/provider/acp/AcpRuntimeModel.test.ts index 14b36a43501..dea11a09236 100644 --- a/apps/server/src/provider/acp/AcpRuntimeModel.test.ts +++ b/apps/server/src/provider/acp/AcpRuntimeModel.test.ts @@ -322,6 +322,7 @@ describe("AcpRuntimeModel", () => { expect(contentResult.events).toEqual([ { _tag: "ContentDelta", + channel: "assistant", text: "hello from acp", rawPayload: { sessionId: "session-1", @@ -335,6 +336,35 @@ describe("AcpRuntimeModel", () => { }, }, ]); + + const thoughtResult = parseSessionUpdateEvent({ + sessionId: "session-1", + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: "thinking...", + }, + }, + } satisfies EffectAcpSchema.SessionNotification); + + expect(thoughtResult.events).toEqual([ + { + _tag: "ContentDelta", + channel: "thought", + text: "thinking...", + rawPayload: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: "thinking...", + }, + }, + }, + }, + ]); }); it("requires replay idle after load RPC completion even when replay stopped earlier", () => { diff --git a/apps/server/src/provider/acp/AcpRuntimeModel.ts b/apps/server/src/provider/acp/AcpRuntimeModel.ts index df6b07563fc..c6d5b2b9b7e 100644 --- a/apps/server/src/provider/acp/AcpRuntimeModel.ts +++ b/apps/server/src/provider/acp/AcpRuntimeModel.ts @@ -80,6 +80,12 @@ export interface AcpPermissionRequest { readonly toolCall?: AcpToolCallState; } +/** + * Distinguishes the two ACP text streams: `agent_message_chunk` (final + * assistant prose) and `agent_thought_chunk` (intermediate reasoning). + */ +export type AcpAssistantChannel = "assistant" | "thought"; + export type AcpParsedSessionEvent = | { readonly _tag: "ModeChanged"; @@ -88,10 +94,15 @@ export type AcpParsedSessionEvent = | { readonly _tag: "AssistantItemStarted"; readonly itemId: string; + readonly channel: AcpAssistantChannel; } | { readonly _tag: "AssistantItemCompleted"; readonly itemId: string; + readonly channel: AcpAssistantChannel; + /** Accumulated segment text; populated for thought segments so the + * completed event can carry the full reasoning text downstream. */ + readonly text?: string; } | { readonly _tag: "PlanUpdated"; @@ -106,6 +117,7 @@ export type AcpParsedSessionEvent = | { readonly _tag: "ContentDelta"; readonly itemId?: string; + readonly channel: AcpAssistantChannel; readonly text: string; readonly rawPayload: unknown; }; @@ -624,6 +636,20 @@ export function parseSessionUpdateEvent(params: EffectAcpSchema.SessionNotificat if (upd.content.type === "text" && upd.content.text.length > 0) { events.push({ _tag: "ContentDelta", + channel: "assistant", + text: upd.content.text, + rawPayload: params, + }); + } + break; + } + case "agent_thought_chunk": { + // Reasoning text; agents like the Cursor CLI stream their intermediate + // narration here, so dropping it hides most of the turn's text output. + if (upd.content.type === "text" && upd.content.text.length > 0) { + events.push({ + _tag: "ContentDelta", + channel: "thought", text: upd.content.text, rawPayload: params, }); diff --git a/apps/server/src/provider/acp/AcpSessionRuntime.ts b/apps/server/src/provider/acp/AcpSessionRuntime.ts index 120f156944b..eda6f0dec9f 100644 --- a/apps/server/src/provider/acp/AcpSessionRuntime.ts +++ b/apps/server/src/provider/acp/AcpSessionRuntime.ts @@ -33,6 +33,7 @@ import { touchSessionLoadReplayActivity, waitForSessionLoadReplayToSettle, type SessionLoadGate, + type AcpAssistantChannel, type AcpParsedSessionEvent, type AcpSessionModeState, type AcpToolCallState, @@ -262,16 +263,36 @@ type AcpStartState = } | { readonly _tag: "Started"; readonly result: AcpStartedState }; +interface AcpActiveAssistantSegment { + readonly itemId: string; + readonly channel: AcpAssistantChannel; + /** Accumulated thought text so the segment-completed event can carry it. + * Assistant text is not accumulated here — it streams via ContentDelta. */ + readonly text: string; +} + interface AcpAssistantSegmentState { readonly nextSegmentIndex: number; - readonly activeItemId?: string; + readonly active?: AcpActiveAssistantSegment; } interface EnsureActiveAssistantSegmentResult { readonly itemId: string; - readonly startedEvent?: Extract; + readonly events: ReadonlyArray< + Extract< + AcpParsedSessionEvent, + { readonly _tag: "AssistantItemStarted" | "AssistantItemCompleted" } + > + >; } +/** Keeps thought accumulation bounded for very long reasoning segments. */ +const MAX_THOUGHT_SEGMENT_CHARS = 20_000; + +/** Differentiates runtime instances within one process; combined with the + * startup timestamp it makes segment item ids unique across restarts. */ +let runtimeInstanceCounter = 0; + export const make = ( options: AcpSessionRuntimeOptions, ): Effect.Effect< @@ -286,6 +307,12 @@ export const make = ( const modeStateRef = yield* Ref.make(undefined); const toolCallsRef = yield* Ref.make(new Map()); const assistantSegmentRef = yield* Ref.make({ nextSegmentIndex: 0 }); + // Segment item ids must not repeat across runtimes that resume the same + // ACP session (e.g. after a server restart), otherwise downstream + // consumers derive colliding message ids and new output gets appended to + // messages from a previous run. The tag makes ids runtime-unique. + runtimeInstanceCounter += 1; + const runtimeTag = `${(yield* Clock.currentTimeMillis).toString(36)}-${runtimeInstanceCounter.toString(36)}`; const configOptionsRef = yield* Ref.make(sessionConfigOptionsFromSetup(undefined)); const startStateRef = yield* Ref.make({ _tag: "NotStarted" }); const promptSerializationSemaphore = yield* Semaphore.make(1); @@ -387,6 +414,7 @@ export const make = ( modeStateRef, toolCallsRef, assistantSegmentRef, + runtimeTag, params: notification, }); }), @@ -864,12 +892,14 @@ const handleSessionUpdate = ({ modeStateRef, toolCallsRef, assistantSegmentRef, + runtimeTag, params, }: { readonly queue: Queue.Queue; readonly modeStateRef: Ref.Ref; readonly toolCallsRef: Ref.Ref>; readonly assistantSegmentRef: Ref.Ref; + readonly runtimeTag: string; readonly params: EffectAcpSchema.SessionNotification; }): Effect.Effect => Effect.gen(function* () { @@ -909,7 +939,8 @@ const handleSessionUpdate = ({ if (event._tag === "ContentDelta") { if (event.text.trim().length === 0) { const assistantSegmentState = yield* Ref.get(assistantSegmentRef); - if (!assistantSegmentState.activeItemId) { + // Whitespace-only deltas may not open a segment on their own. + if (assistantSegmentState.active?.channel !== event.channel) { continue; } } @@ -917,7 +948,22 @@ const handleSessionUpdate = ({ queue, assistantSegmentRef, sessionId: params.sessionId, + runtimeTag, + channel: event.channel, }); + if (event.channel === "thought") { + yield* Ref.update(assistantSegmentRef, (current) => + current.active?.itemId === itemId + ? { + ...current, + active: { + ...current.active, + text: appendThoughtSegmentText(current.active.text, event.text), + }, + } + : current, + ); + } yield* Queue.offer(queue, { ...event, itemId, @@ -954,44 +1000,79 @@ function shouldEmitToolCallUpdate( return previous === undefined || previous.title !== next.title || previous.detail !== next.detail; } -const assistantItemId = (sessionId: string, segmentIndex: number) => - `assistant:${sessionId}:segment:${segmentIndex}`; +const assistantItemId = (input: { + readonly sessionId: string; + readonly runtimeTag: string; + readonly channel: AcpAssistantChannel; + readonly segmentIndex: number; +}) => + `${input.channel === "thought" ? "thought" : "assistant"}:${input.sessionId}:${input.runtimeTag}:segment:${input.segmentIndex}`; + +function appendThoughtSegmentText(current: string, delta: string): string { + if (current.length >= MAX_THOUGHT_SEGMENT_CHARS) { + return current; + } + return `${current}${delta}`.slice(0, MAX_THOUGHT_SEGMENT_CHARS); +} + +function completedSegmentEvent( + segment: AcpActiveAssistantSegment, +): Extract { + return { + _tag: "AssistantItemCompleted", + itemId: segment.itemId, + channel: segment.channel, + ...(segment.channel === "thought" && segment.text.length > 0 ? { text: segment.text } : {}), + }; +} const ensureActiveAssistantSegment = ({ queue, assistantSegmentRef, sessionId, + runtimeTag, + channel, }: { readonly queue: Queue.Queue; readonly assistantSegmentRef: Ref.Ref; readonly sessionId: string; + readonly runtimeTag: string; + readonly channel: AcpAssistantChannel; }) => Ref.modify( assistantSegmentRef, (current) => { - if (current.activeItemId) { - return [{ itemId: current.activeItemId }, current] as const; + if (current.active?.channel === channel) { + return [{ itemId: current.active.itemId, events: [] }, current] as const; } - const itemId = assistantItemId(sessionId, current.nextSegmentIndex); - return [ + const itemId = assistantItemId({ + sessionId, + runtimeTag, + channel, + segmentIndex: current.nextSegmentIndex, + }); + const events: EnsureActiveAssistantSegmentResult["events"] = [ + // A channel switch (assistant <-> thought) closes the previous segment. + ...(current.active ? [completedSegmentEvent(current.active)] : []), { + _tag: "AssistantItemStarted", itemId, - startedEvent: { - _tag: "AssistantItemStarted", - itemId, - } satisfies Extract, + channel, }, + ]; + return [ + { itemId, events }, { nextSegmentIndex: current.nextSegmentIndex + 1, - activeItemId: itemId, + active: { itemId, channel, text: "" }, } satisfies AcpAssistantSegmentState, ] as const; }, ).pipe( Effect.flatMap((result) => - result.startedEvent - ? Queue.offer(queue, result.startedEvent).pipe(Effect.as(result.itemId)) - : Effect.succeed(result.itemId), + Effect.forEach(result.events, (event) => Queue.offer(queue, event), { + discard: true, + }).pipe(Effect.as(result.itemId)), ), ); @@ -1003,14 +1084,11 @@ const closeActiveAssistantSegment = ({ readonly assistantSegmentRef: Ref.Ref; }) => Ref.modify(assistantSegmentRef, (current) => { - if (!current.activeItemId) { + if (!current.active) { return [undefined, current] as const; } return [ - { - _tag: "AssistantItemCompleted", - itemId: current.activeItemId, - } satisfies AcpParsedSessionEvent, + completedSegmentEvent(current.active), { nextSegmentIndex: current.nextSegmentIndex, } satisfies AcpAssistantSegmentState, diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index 5d5051f748e..0939f0dd5bb 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -710,7 +710,9 @@ function toDerivedWorkLogEntry(activity: OrchestrationThreadActivity): DerivedWo turnId: activity.turnId, label: taskLabel || activity.summary, tone: - activity.kind === "task.progress" + // Reasoning rows (ACP agent_thought_chunk segments) share the thinking + // affordance with Codex task.progress reasoning updates. + activity.kind === "task.progress" || activity.kind === "reasoning" ? "thinking" : activity.tone === "approval" ? "info" diff --git a/packages/effect-acp/src/protocol.test.ts b/packages/effect-acp/src/protocol.test.ts index ece068dfc88..7f3cec01487 100644 --- a/packages/effect-acp/src/protocol.test.ts +++ b/packages/effect-acp/src/protocol.test.ts @@ -95,6 +95,14 @@ it.layer(NodeServices.layer)("effect-acp protocol", (it) => { sessionId: "session-1", }, }); + // JSON-RPC 2.0 notifications must omit `id` entirely; agents treat a + // present-but-empty id as a malformed request and drop the message. + const outboundText = + typeof outbound === "string" ? outbound : new TextDecoder().decode(outbound); + assert.equal( + outboundText, + '{"jsonrpc":"2.0","method":"session/cancel","params":{"sessionId":"session-1"}}\n', + ); yield* Queue.offer( input, @@ -211,7 +219,7 @@ it.layer(NodeServices.layer)("effect-acp protocol", (it) => { direction: "outgoing", stage: "raw", payload: - '{"jsonrpc":"2.0","method":"session/cancel","params":{"sessionId":"session-1"},"id":"","headers":[]}\n', + '{"jsonrpc":"2.0","method":"session/cancel","params":{"sessionId":"session-1"}}\n', }, ]); }), diff --git a/packages/effect-acp/src/protocol.ts b/packages/effect-acp/src/protocol.ts index ace9fc11b5d..a7aa61f1a69 100644 --- a/packages/effect-acp/src/protocol.ts +++ b/packages/effect-acp/src/protocol.ts @@ -120,8 +120,13 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi ? message.requestId : undefined; const requestId = encodedRequestId === "" ? undefined : encodedRequestId; + // JSON-RPC 2.0 notifications must omit the `id` member entirely. The RPC + // serializer would emit `"id": ""` for empty-id requests, which agents + // (e.g. the Cursor CLI) reject as a malformed request instead of handling + // the notification — so notifications are encoded by hand here. + const isNotification = message._tag === "Request" && message.id === ""; const encoded = yield* Effect.try({ - try: () => parser.encode(message), + try: () => (isNotification ? encodeJsonRpcNotificationWire(message) : parser.encode(message)), catch: (cause) => AcpError.AcpProtocolParseError.fromEncodingError(method, requestId, cause), }); @@ -577,6 +582,15 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi } satisfies AcpPatchedProtocol; }); +/** Encodes a JSON-RPC 2.0 notification (no `id`) with ndjson framing. */ +function encodeJsonRpcNotificationWire(message: RpcMessage.RequestEncoded): string { + return `${JSON.stringify({ + jsonrpc: "2.0", + method: message.tag, + ...(message.payload === undefined ? {} : { params: message.payload }), + })}\n`; +} + function isProtocolError( value: unknown, ): value is { code: number; message: string; data?: unknown } {