From 62ce5d69e93f4f6432df29e6681d55f2b2439940 Mon Sep 17 00:00:00 2001 From: tarik02 Date: Sat, 4 Jul 2026 12:25:28 +0300 Subject: [PATCH] revert fork cursor fixes --- .../Layers/ProviderRuntimeIngestion.test.ts | 400 ++---------------- .../Layers/ProviderRuntimeIngestion.ts | 116 +---- .../provider/acp/AcpSessionRuntime.test.ts | 84 ---- .../src/provider/acp/AcpSessionRuntime.ts | 13 - .../src/provider/acp/CursorAcpSupport.ts | 2 - 5 files changed, 42 insertions(+), 573 deletions(-) delete mode 100644 apps/server/src/provider/acp/AcpSessionRuntime.test.ts diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index f908c2c2292..001ba388949 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -61,10 +61,6 @@ const asEventId = (value: string): EventId => EventId.make(value); const asMessageId = (value: string): MessageId => MessageId.make(value); const asThreadId = (value: string): ThreadId => ThreadId.make(value); const asTurnId = (value: string): TurnId => TurnId.make(value); -const turnScopedAssistantMessageId = (turnId: string, itemId: string, segment?: number): string => - segment === undefined - ? `assistant:turn:${turnId}:${itemId}` - : `assistant:turn:${turnId}:${itemId}:segment:${segment}`; type LegacyProviderRuntimeEvent = { readonly type: string; @@ -713,12 +709,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-2", "item-1") && !message.streaming, + message.id === "assistant:item-1" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === turnScopedAssistantMessageId("turn-2", "item-1"), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-1", ); expect(message?.text).toBe("hello world"); expect(message?.streaming).toBe(false); @@ -746,13 +741,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-no-delta", "item-no-delta") && - !message.streaming, + message.id === "assistant:item-no-delta" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === turnScopedAssistantMessageId("turn-no-delta", "item-no-delta"), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-no-delta", ); expect(message?.text).toBe("assistant-only final text"); expect(message?.streaming).toBe(false); @@ -1622,8 +1615,7 @@ describe("ProviderRuntimeIngestion", () => { const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect( midThread?.messages.some( - (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-buffered", "item-buffered"), + (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered", ), ).toBe(false); @@ -1644,13 +1636,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-buffered", "item-buffered") && - !message.streaming, + message.id === "assistant:item-buffered" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === turnScopedAssistantMessageId("turn-buffered", "item-buffered"), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered", ); expect(message?.text).toBe("buffer me"); expect(message?.streaming).toBe(false); @@ -1705,19 +1695,13 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-buffered-request-flush", - "item-buffered-request-flush", - ) && + message.id === "assistant:item-buffered-request-flush" && !message.streaming && message.text === "visible before approval", ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === - turnScopedAssistantMessageId("turn-buffered-request-flush", "item-buffered-request-flush"), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-flush", ); expect(message?.streaming).toBe(false); }); @@ -1777,22 +1761,14 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-buffered-user-input-flush", - "item-buffered-user-input-flush", - ) && + message.id === "assistant:item-buffered-user-input-flush" && !message.streaming && message.text === "visible before user input", ), ); const message = thread.messages.find( (entry: ProviderRuntimeTestMessage) => - entry.id === - turnScopedAssistantMessageId( - "turn-buffered-user-input-flush", - "item-buffered-user-input-flush", - ), + entry.id === "assistant:item-buffered-user-input-flush", ); expect(message?.streaming).toBe(false); }); @@ -1852,11 +1828,7 @@ describe("ProviderRuntimeIngestion", () => { expect( thread.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-buffered-whitespace-request", - "item-buffered-whitespace-request", - ), + message.id === "assistant:item-buffered-whitespace-request", ), ).toBe(false); }); @@ -1913,11 +1885,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-buffered-request-append", - "item-buffered-request-append", - ) && + message.id === "assistant:item-buffered-request-append" && !message.streaming && message.text === "first half", ), @@ -1953,32 +1921,17 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-buffered-request-append", - "item-buffered-request-append", - 1, - ) && + message.id === "assistant:item-buffered-request-append:segment:1" && !message.streaming && message.text === " second half", ), ); const firstMessage = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === - turnScopedAssistantMessageId( - "turn-buffered-request-append", - "item-buffered-request-append", - ), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-append", ); const resumedMessage = thread.messages.find( (entry: ProviderRuntimeTestMessage) => - entry.id === - turnScopedAssistantMessageId( - "turn-buffered-request-append", - "item-buffered-request-append", - 1, - ), + entry.id === "assistant:item-buffered-request-append:segment:1", ); expect(firstMessage?.text).toBe("first half"); expect(firstMessage?.streaming).toBe(false); @@ -1993,12 +1946,7 @@ describe("ProviderRuntimeIngestion", () => { const assistantEvents = events.filter( (event): event is Extract<(typeof events)[number], { type: "thread.message-sent" }> => event.type === "thread.message-sent" && - event.payload.messageId.startsWith( - turnScopedAssistantMessageId( - "turn-buffered-request-append", - "item-buffered-request-append", - ).slice(0, "assistant:turn:turn-buffered-request-append:".length), - ), + event.payload.messageId.startsWith("assistant:item-buffered-request-append"), ); expect(assistantEvents).toHaveLength(4); expect(assistantEvents[0]?.payload.streaming).toBe(true); @@ -2006,20 +1954,12 @@ describe("ProviderRuntimeIngestion", () => { expect(assistantEvents[1]?.payload.streaming).toBe(false); expect(assistantEvents[1]?.payload.text).toBe(""); expect(assistantEvents[2]?.payload.messageId).toBe( - turnScopedAssistantMessageId( - "turn-buffered-request-append", - "item-buffered-request-append", - 1, - ), + "assistant:item-buffered-request-append:segment:1", ); expect(assistantEvents[2]?.payload.streaming).toBe(true); expect(assistantEvents[2]?.payload.text).toBe(" second half"); expect(assistantEvents[3]?.payload.messageId).toBe( - turnScopedAssistantMessageId( - "turn-buffered-request-append", - "item-buffered-request-append", - 1, - ), + "assistant:item-buffered-request-append:segment:1", ); expect(assistantEvents[3]?.payload.streaming).toBe(false); expect(assistantEvents[3]?.payload.text).toBe(""); @@ -2077,11 +2017,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-streaming-request-segment", - "item-streaming-request-segment", - ) && + message.id === "assistant:item-streaming-request-segment" && !message.streaming && message.text === "before approval", ), @@ -2117,12 +2053,7 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-streaming-request-segment", - "item-streaming-request-segment", - 1, - ) && + message.id === "assistant:item-streaming-request-segment:segment:1" && !message.streaming && message.text === " after approval", ), @@ -2130,272 +2061,17 @@ describe("ProviderRuntimeIngestion", () => { expect( thread.messages.find( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-streaming-request-segment", - "item-streaming-request-segment", - ), + message.id === "assistant:item-streaming-request-segment", )?.text, ).toBe("before approval"); expect( thread.messages.find( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId( - "turn-streaming-request-segment", - "item-streaming-request-segment", - 1, - ), + message.id === "assistant:item-streaming-request-segment:segment:1", )?.text, ).toBe(" after approval"); }); - it("keeps assistant message identities isolated across turns when provider item IDs are reused", async () => { - const harness = await createHarness(); - const now = "2026-01-01T00:00:00.000Z"; - const reusedItemId = asItemId("assistant:cursor-session:segment:4"); - - harness.emit({ - type: "content.delta", - eventId: asEventId("evt-reused-item-turn-1"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-reused-1"), - itemId: reusedItemId, - payload: { - streamKind: "assistant_text", - delta: "first turn response", - }, - }); - harness.emit({ - type: "item.completed", - eventId: asEventId("evt-reused-item-turn-1-complete"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-reused-1"), - itemId: reusedItemId, - payload: { - itemType: "assistant_message", - status: "completed", - }, - }); - - harness.emit({ - type: "content.delta", - eventId: asEventId("evt-reused-item-turn-2"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-reused-2"), - itemId: reusedItemId, - payload: { - streamKind: "assistant_text", - delta: "second turn response", - }, - }); - harness.emit({ - type: "item.completed", - eventId: asEventId("evt-reused-item-turn-2-complete"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-reused-2"), - itemId: reusedItemId, - payload: { - itemType: "assistant_message", - status: "completed", - }, - }); - - const thread = await waitForThread(harness.readModel, (entry) => - entry.messages.some( - (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-reused-2", String(reusedItemId)) && - !message.streaming, - ), - ); - - const firstMessage = thread.messages.find( - (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-reused-1", String(reusedItemId)), - ); - const secondMessage = thread.messages.find( - (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-reused-2", String(reusedItemId)), - ); - - expect(firstMessage?.text).toBe("first turn response"); - expect(secondMessage?.text).toBe("second turn response"); - }); - - it("ignores cursor assistant item replay without a turn id while a new turn is active", async () => { - const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); - const now = "2026-01-01T00:00:00.000Z"; - const replayedItemId = asItemId("assistant:cursor-session:segment:0"); - const secondItemId = asItemId("assistant:cursor-session:segment:1"); - - harness.emit({ - type: "turn.started", - eventId: asEventId("evt-cursor-replay-turn-1-started"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-1"), - }); - harness.emit({ - type: "content.delta", - eventId: asEventId("evt-cursor-replay-turn-1-delta"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-1"), - itemId: replayedItemId, - payload: { - streamKind: "assistant_text", - delta: "first turn response", - }, - }); - harness.emit({ - type: "item.completed", - eventId: asEventId("evt-cursor-replay-turn-1-complete"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-1"), - itemId: replayedItemId, - payload: { - itemType: "assistant_message", - status: "completed", - }, - }); - harness.emit({ - type: "turn.completed", - eventId: asEventId("evt-cursor-replay-turn-1-turn-complete"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-1"), - status: "completed", - }); - - await waitForThread( - harness.readModel, - (thread) => - thread.session?.status === "ready" && - thread.messages.some( - (message: ProviderRuntimeTestMessage) => message.text === "first turn response", - ), - ); - - harness.emit({ - type: "turn.started", - eventId: asEventId("evt-cursor-replay-turn-2-started"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-2"), - }); - await waitForThread( - harness.readModel, - (thread) => - thread.session?.status === "running" && - thread.session.activeTurnId === "turn-cursor-replay-2", - ); - - // Cursor can replay the previous assistant segment after an ACP session - // resume without attaching a turn id, then report that same item completed - // under the active turn. Neither event belongs to the new turn. - harness.emit({ - type: "item.started", - eventId: asEventId("evt-cursor-replay-stale-started"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - itemId: replayedItemId, - payload: { - itemType: "assistant_message", - status: "inProgress", - }, - }); - harness.emit({ - type: "content.delta", - eventId: asEventId("evt-cursor-replay-stale-delta"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - itemId: replayedItemId, - payload: { - streamKind: "assistant_text", - delta: "first turn response", - }, - }); - harness.emit({ - type: "item.completed", - eventId: asEventId("evt-cursor-replay-stale-complete"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-2"), - itemId: replayedItemId, - payload: { - itemType: "assistant_message", - status: "completed", - }, - }); - - harness.emit({ - type: "content.delta", - eventId: asEventId("evt-cursor-replay-turn-2-delta"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-2"), - itemId: secondItemId, - payload: { - streamKind: "assistant_text", - delta: "second turn response", - }, - }); - harness.emit({ - type: "item.completed", - eventId: asEventId("evt-cursor-replay-turn-2-complete"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-cursor-replay-2"), - itemId: secondItemId, - payload: { - itemType: "assistant_message", - status: "completed", - }, - }); - - const thread = await waitForThread(harness.readModel, (entry) => - entry.messages.some( - (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-cursor-replay-2", String(secondItemId)), - ), - ); - expect(thread.messages.map((message: ProviderRuntimeTestMessage) => message.text)).toEqual([ - "first turn response", - "second turn response", - ]); - expect( - thread.messages.some( - (message: ProviderRuntimeTestMessage) => message.id === `assistant:${replayedItemId}`, - ), - ).toBe(false); - expect( - thread.messages.some( - (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId("turn-cursor-replay-2", String(replayedItemId)), - ), - ).toBe(false); - }); - it("streams assistant deltas when thread.turn.start requests streaming mode", async () => { const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); const now = "2026-01-01T00:00:00.000Z"; @@ -2450,15 +2126,13 @@ describe("ProviderRuntimeIngestion", () => { const liveThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId("turn-streaming-mode", "item-streaming-mode") && + message.id === "assistant:item-streaming-mode" && message.streaming && message.text === "hello live", ), ); const liveMessage = liveThread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === turnScopedAssistantMessageId("turn-streaming-mode", "item-streaming-mode"), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", ); expect(liveMessage?.streaming).toBe(true); @@ -2480,14 +2154,11 @@ describe("ProviderRuntimeIngestion", () => { const finalThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId("turn-streaming-mode", "item-streaming-mode") && - !message.streaming, + message.id === "assistant:item-streaming-mode" && !message.streaming, ), ); const finalMessage = finalThread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === turnScopedAssistantMessageId("turn-streaming-mode", "item-streaming-mode"), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", ); expect(finalMessage?.text).toBe("hello live"); expect(finalMessage?.streaming).toBe(false); @@ -2543,13 +2214,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === turnScopedAssistantMessageId("turn-buffer-spill", "item-buffer-spill") && - !message.streaming, + message.id === "assistant:item-buffer-spill" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === turnScopedAssistantMessageId("turn-buffer-spill", "item-buffer-spill"), + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffer-spill", ); expect(message?.text.length).toBe(oversizedText.length); expect(message?.text).toBe(oversizedText); @@ -2621,9 +2290,7 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === null && thread.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === - turnScopedAssistantMessageId("turn-complete-dedup", "item-complete-dedup") && - !message.streaming, + message.id === "assistant:item-complete-dedup" && !message.streaming, ), ); @@ -2637,8 +2304,7 @@ describe("ProviderRuntimeIngestion", () => { return false; } return ( - event.payload.messageId === - turnScopedAssistantMessageId("turn-complete-dedup", "item-complete-dedup") && + event.payload.messageId === "assistant:item-complete-dedup" && event.payload.streaming === false ); }); @@ -2995,9 +2661,7 @@ describe("ProviderRuntimeIngestion", () => { (entry: ProviderRuntimeTestCheckpoint) => entry.turnId === "turn-p1", ); expect(checkpoint?.status).toBe("missing"); - expect(checkpoint?.assistantMessageId).toBe( - turnScopedAssistantMessageId("turn-p1", "item-p1-assistant"), - ); + expect(checkpoint?.assistantMessageId).toBe("assistant:item-p1-assistant"); expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated"); }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 9ffd67a00fb..bd300ba2ad1 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -53,8 +53,6 @@ type GoalActivityState = Pick; const TURN_MESSAGE_IDS_BY_TURN_CACHE_CAPACITY = 10_000; const TURN_MESSAGE_IDS_BY_TURN_TTL = Duration.minutes(120); -const STALE_REPLAY_ITEM_IDS_BY_TURN_CACHE_CAPACITY = 10_000; -const STALE_REPLAY_ITEM_IDS_BY_TURN_TTL = Duration.minutes(120); const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000; const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120); const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000; @@ -243,9 +241,8 @@ function proposedPlanIdFromEvent(event: ProviderRuntimeEvent, threadId: ThreadId return `plan:${threadId}:event:${event.eventId}`; } -function assistantSegmentBaseKeyFromEvent(event: ProviderRuntimeEvent, turnId?: TurnId): string { - const providerKey = String(event.itemId ?? event.turnId ?? event.eventId); - return turnId ? `turn:${turnId}:${providerKey}` : providerKey; +function assistantSegmentBaseKeyFromEvent(event: ProviderRuntimeEvent): string { + return String(event.itemId ?? event.turnId ?? event.eventId); } function assistantSegmentMessageId(baseKey: string, segmentIndex: number): MessageId { @@ -253,41 +250,6 @@ function assistantSegmentMessageId(baseKey: string, segmentIndex: number): Messa segmentIndex === 0 ? `assistant:${baseKey}` : `assistant:${baseKey}:segment:${segmentIndex}`, ); } - -function runtimeAssistantMessageIdFromEvent( - event: ProviderRuntimeEvent, - turnId?: TurnId, -): MessageId { - return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(event, turnId), 0); -} - -function isTurnOutputRuntimeEvent(event: ProviderRuntimeEvent): boolean { - switch (event.type) { - case "content.delta": - case "item.started": - case "item.updated": - case "item.completed": - case "request.opened": - case "request.resolved": - case "runtime.warning": - case "task.started": - case "task.progress": - case "task.completed": - case "thread.state.changed": - case "thread.token-usage.updated": - case "tool.denied": - case "turn.completed": - case "turn.diff.updated": - case "turn.plan.updated": - case "turn.proposed.completed": - case "turn.proposed.delta": - case "user-input.requested": - case "user-input.resolved": - return true; - default: - return false; - } -} function buildContextWindowActivityPayload( event: ProviderRuntimeEvent, ): ThreadTokenUsageSnapshot | undefined { @@ -791,12 +753,6 @@ const make = Effect.gen(function* () { ), }); - const staleReplayItemIdsByTurnKey = yield* Cache.make>({ - capacity: STALE_REPLAY_ITEM_IDS_BY_TURN_CACHE_CAPACITY, - timeToLive: STALE_REPLAY_ITEM_IDS_BY_TURN_TTL, - lookup: () => Effect.succeed(new Set()), - }); - const bufferedProposedPlanById = yield* Cache.make({ capacity: BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY, timeToLive: BUFFERED_PROPOSED_PLAN_BY_ID_TTL, @@ -878,37 +834,6 @@ const make = Effect.gen(function* () { const clearAssistantSegmentStateForTurn = (threadId: ThreadId, turnId: TurnId) => Cache.invalidate(assistantSegmentStateByTurnKey, providerTurnKey(threadId, turnId)); - const rememberStaleReplayItemForTurn = (threadId: ThreadId, turnId: TurnId, itemId: string) => - Cache.getOption(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)).pipe( - Effect.flatMap((existingIds) => - Cache.set( - staleReplayItemIdsByTurnKey, - providerTurnKey(threadId, turnId), - Option.match(existingIds, { - onNone: () => new Set([itemId]), - onSome: (ids) => { - const nextIds = new Set(ids); - nextIds.add(itemId); - return nextIds; - }, - }), - ), - ), - ); - - const hasStaleReplayItemForTurn = (threadId: ThreadId, turnId: TurnId, itemId: string) => - Cache.getOption(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)).pipe( - Effect.map((existingIds) => - Option.match(existingIds, { - onNone: () => false, - onSome: (ids) => ids.has(itemId), - }), - ), - ); - - const clearStaleReplayItemsForTurn = (threadId: ThreadId, turnId: TurnId) => - Cache.invalidate(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)); - const getActiveAssistantMessageIdForTurn = (threadId: ThreadId, turnId: TurnId) => getAssistantSegmentStateForTurn(threadId, turnId).pipe( Effect.map((state) => @@ -955,7 +880,7 @@ const make = Effect.gen(function* () { }) => Effect.gen(function* () { if (!input.turnId) { - return runtimeAssistantMessageIdFromEvent(input.event); + return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0); } const activeMessageId = yield* getActiveAssistantMessageIdForTurn( @@ -969,7 +894,7 @@ const make = Effect.gen(function* () { return yield* startAssistantSegmentForTurn({ threadId: input.threadId, turnId: input.turnId, - baseKey: assistantSegmentBaseKeyFromEvent(input.event, input.turnId), + baseKey: assistantSegmentBaseKeyFromEvent(input.event), }); }); @@ -1263,7 +1188,6 @@ const make = Effect.gen(function* () { const proposedPlanPrefix = `plan:${threadId}:`; const turnKeys = Array.from(yield* Cache.keys(turnMessageIdsByTurnKey)); const assistantSegmentKeys = Array.from(yield* Cache.keys(assistantSegmentStateByTurnKey)); - const staleReplayKeys = Array.from(yield* Cache.keys(staleReplayItemIdsByTurnKey)); const proposedPlanKeys = Array.from(yield* Cache.keys(bufferedProposedPlanById)); yield* Effect.forEach( turnKeys, @@ -1292,12 +1216,6 @@ const make = Effect.gen(function* () { : Effect.void, { concurrency: 1 }, ).pipe(Effect.asVoid); - yield* Effect.forEach( - staleReplayKeys, - (key) => - key.startsWith(prefix) ? Cache.invalidate(staleReplayItemIdsByTurnKey, key) : Effect.void, - { concurrency: 1 }, - ).pipe(Effect.asVoid); yield* Effect.forEach( proposedPlanKeys, (key) => @@ -1539,23 +1457,6 @@ const make = Effect.gen(function* () { } } - const eventItemId = event.itemId === undefined ? undefined : String(event.itemId); - const staleReplayItemForActiveTurn = - activeTurnId !== null && eventItemId !== undefined - ? yield* hasStaleReplayItemForTurn(thread.id, activeTurnId, eventItemId) - : false; - const shouldSkipRuntimeOutput = - STRICT_PROVIDER_LIFECYCLE_GUARD && - isTurnOutputRuntimeEvent(event) && - (conflictsWithActiveTurn || missingTurnForActiveTurn || staleReplayItemForActiveTurn); - - if (shouldSkipRuntimeOutput) { - if (missingTurnForActiveTurn && activeTurnId !== null && eventItemId !== undefined) { - yield* rememberStaleReplayItemForTurn(thread.id, activeTurnId, eventItemId); - } - return; - } - const assistantDelta = event.type === "content.delta" && event.payload.streamKind === "assistant_text" ? event.payload.delta @@ -1657,7 +1558,9 @@ const make = Effect.gen(function* () { const assistantCompletion = event.type === "item.completed" && event.payload.itemType === "assistant_message" ? { - messageId: runtimeAssistantMessageIdFromEvent(event, toTurnId(event.turnId)), + messageId: MessageId.make( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ), fallbackText: event.payload.detail, } : undefined; @@ -1759,7 +1662,6 @@ const make = Effect.gen(function* () { ).pipe(Effect.asVoid); yield* clearAssistantMessageIdsForTurn(thread.id, turnId); yield* clearAssistantSegmentStateForTurn(thread.id, turnId); - yield* clearStaleReplayItemsForTurn(thread.id, turnId); yield* finalizeBufferedProposedPlan({ event, @@ -1858,7 +1760,9 @@ const make = Effect.gen(function* () { if (hasCheckpointForTurn(checkpointContext.checkpoints, turnId)) { // Already tracked; no-op. } else { - const assistantMessageId = runtimeAssistantMessageIdFromEvent(event, turnId); + const assistantMessageId = MessageId.make( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ); yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", commandId: yield* providerCommandId(event, "thread-turn-diff-complete"), diff --git a/apps/server/src/provider/acp/AcpSessionRuntime.test.ts b/apps/server/src/provider/acp/AcpSessionRuntime.test.ts deleted file mode 100644 index 59dc3421983..00000000000 --- a/apps/server/src/provider/acp/AcpSessionRuntime.test.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { assert, it } from "@effect/vitest"; -import * as Effect from "effect/Effect"; -import * as Queue from "effect/Queue"; -import * as Ref from "effect/Ref"; -import type * as EffectAcpSchema from "effect-acp/schema"; - -import { handleSessionUpdateForTest, type AcpSessionRuntimeEvent } from "./AcpSessionRuntime.ts"; -import type { AcpSessionModeState, AcpToolCallState } from "./AcpRuntimeModel.ts"; - -it.effect("suppresses loaded-session replay updates until the first live prompt", () => - Effect.gen(function* () { - const queue = yield* Queue.unbounded(); - const modeStateRef = yield* Ref.make({ - currentModeId: "ask", - availableModes: [ - { id: "ask", name: "Ask" }, - { id: "code", name: "Code" }, - ], - }); - const toolCallsRef = yield* Ref.make(new Map()); - const assistantSegmentRef = yield* Ref.make({ nextSegmentIndex: 0 }); - const suppressSessionUpdatesRef = yield* Ref.make(true); - - const handle = (params: EffectAcpSchema.SessionNotification) => - handleSessionUpdateForTest({ - queue, - modeStateRef, - toolCallsRef, - assistantSegmentRef, - suppressSessionUpdatesRef, - params, - }); - - yield* handle({ - sessionId: "cursor-session", - update: { - sessionUpdate: "current_mode_update", - currentModeId: "code", - }, - }); - yield* handle({ - sessionId: "cursor-session", - update: { - sessionUpdate: "plan", - entries: [{ content: "Old replayed plan", priority: "high", status: "completed" }], - }, - }); - yield* handle({ - sessionId: "cursor-session", - update: { - sessionUpdate: "user_message_chunk", - content: { type: "text", text: "old replayed user prompt" }, - }, - }); - yield* handle({ - sessionId: "cursor-session", - update: { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: "old replayed assistant text" }, - }, - }); - - assert.equal(yield* Queue.size(queue), 0); - assert.equal((yield* Ref.get(modeStateRef))?.currentModeId, "code"); - - yield* Ref.set(suppressSessionUpdatesRef, false); - yield* handle({ - sessionId: "cursor-session", - update: { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: "new assistant text" }, - }, - }); - - const started = yield* Queue.take(queue); - const delta = yield* Queue.take(queue); - assert.equal(started._tag, "AssistantItemStarted"); - assert.equal(delta._tag, "ContentDelta"); - if (delta._tag === "ContentDelta") { - assert.equal(delta.text, "new assistant text"); - } - assert.equal(yield* Queue.size(queue), 0); - }), -); diff --git a/apps/server/src/provider/acp/AcpSessionRuntime.ts b/apps/server/src/provider/acp/AcpSessionRuntime.ts index 0c108929c0b..ab1f19b09aa 100644 --- a/apps/server/src/provider/acp/AcpSessionRuntime.ts +++ b/apps/server/src/provider/acp/AcpSessionRuntime.ts @@ -70,7 +70,6 @@ export interface AcpSessionRuntimeOptions { }; readonly authMethodId: string; readonly mcpServers?: ReadonlyArray; - readonly suppressSessionUpdatesUntilPrompt?: boolean; readonly requestLogger?: (event: AcpSessionRequestLogEvent) => Effect.Effect; readonly protocolLogging?: { readonly logIncoming?: boolean; @@ -283,9 +282,6 @@ export const make = ( const assistantSegmentRef = yield* Ref.make({ nextSegmentIndex: 0 }); const configOptionsRef = yield* Ref.make(sessionConfigOptionsFromSetup(undefined)); const startStateRef = yield* Ref.make({ _tag: "NotStarted" }); - const suppressSessionUpdatesRef = yield* Ref.make( - options.suppressSessionUpdatesUntilPrompt === true, - ); const promptSerializationSemaphore = yield* Semaphore.make(1); const activePromptFiberRef = yield* Ref.make< Option.Option> @@ -394,7 +390,6 @@ export const make = ( modeStateRef, toolCallsRef, assistantSegmentRef, - suppressSessionUpdatesRef, params: notification, }); }), @@ -724,7 +719,6 @@ export const make = ( sessionId: started.sessionId, ...payload, } satisfies EffectAcpSchema.PromptRequest; - yield* Ref.set(suppressSessionUpdatesRef, false); const cancelledResponse = { stopReason: "cancelled", } satisfies EffectAcpSchema.PromptResponse; @@ -843,14 +837,12 @@ const handleSessionUpdate = ({ modeStateRef, toolCallsRef, assistantSegmentRef, - suppressSessionUpdatesRef, params, }: { readonly queue: Queue.Queue; readonly modeStateRef: Ref.Ref; readonly toolCallsRef: Ref.Ref>; readonly assistantSegmentRef: Ref.Ref; - readonly suppressSessionUpdatesRef: Ref.Ref; readonly params: EffectAcpSchema.SessionNotification; }): Effect.Effect => Effect.gen(function* () { @@ -860,9 +852,6 @@ const handleSessionUpdate = ({ current === undefined ? current : updateModeState(current, parsed.modeId!), ); } - if (yield* Ref.get(suppressSessionUpdatesRef)) { - return; - } for (const event of parsed.events) { if (event._tag === "ToolCallUpdated") { yield* closeActiveAssistantSegment({ @@ -912,8 +901,6 @@ const handleSessionUpdate = ({ } }); -export const handleSessionUpdateForTest = handleSessionUpdate; - function updateModeState(modeState: AcpSessionModeState, nextModeId: string): AcpSessionModeState { const normalized = nextModeId.trim(); if (!normalized) { diff --git a/apps/server/src/provider/acp/CursorAcpSupport.ts b/apps/server/src/provider/acp/CursorAcpSupport.ts index 07f7c41f6c5..169d7c6206d 100644 --- a/apps/server/src/provider/acp/CursorAcpSupport.ts +++ b/apps/server/src/provider/acp/CursorAcpSupport.ts @@ -59,8 +59,6 @@ export const makeCursorAcpRuntime = ( spawn: buildCursorAcpSpawnInput(input.cursorSettings, input.cwd, input.environment), authMethodId: "cursor_login", clientCapabilities: CURSOR_PARAMETERIZED_MODEL_PICKER_CAPABILITIES, - suppressSessionUpdatesUntilPrompt: - input.suppressSessionUpdatesUntilPrompt ?? input.resumeSessionId !== undefined, }).pipe( Layer.provide( Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, input.childProcessSpawner),