From 8c9acea596668f58199ed9330d6c02df545d025b Mon Sep 17 00:00:00 2001 From: Justin Gray Date: Mon, 29 Jun 2026 21:33:07 +0000 Subject: [PATCH] fix(cursor): scope assistant message ids by turn after resume When Cursor reuses provider item ids across turns after restart, append assistant output to a new turn-scoped message at the end of the thread instead of mutating an earlier completed message. Co-authored-by: Cursor --- .../OrchestrationEngineHarness.integration.ts | 53 +++- .../cursorResume.integration.test.ts | 218 ++++++++++++++ .../Layers/ProviderRuntimeIngestion.test.ts | 278 +++++++++++++++--- .../Layers/ProviderRuntimeIngestion.ts | 35 ++- 4 files changed, 514 insertions(+), 70 deletions(-) create mode 100644 apps/server/integration/cursorResume.integration.test.ts diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index ebc4f984b86..d4351f34452 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -5,6 +5,7 @@ import * as NodeServices from "@effect/platform-node/NodeServices"; import { ApprovalRequestId, CodexSettings, + CursorSettings, ProviderDriverKind, type OrchestrationEvent, type OrchestrationThread, @@ -39,6 +40,7 @@ import { ProviderSessionDirectoryLive } from "../src/provider/Layers/ProviderSes import { ServerSettingsService } from "../src/serverSettings.ts"; import { makeProviderServiceLive } from "../src/provider/Layers/ProviderService.ts"; import { makeCodexAdapter } from "../src/provider/Layers/CodexAdapter.ts"; +import { makeCursorAdapter } from "../src/provider/Layers/CursorAdapter.ts"; import { NoOpProviderEventLoggers, ProviderEventLoggers, @@ -80,6 +82,7 @@ import * as VcsProcess from "../src/vcs/VcsProcess.ts"; import * as AgentAwarenessRelay from "../src/relay/AgentAwarenessRelay.ts"; const decodeCodexSettings = Schema.decodeEffect(CodexSettings); +const decodeCursorSettings = Schema.decodeEffect(CursorSettings); function runGit(cwd: string, args: ReadonlyArray) { return NodeChildProcess.execFileSync("git", args, { @@ -223,6 +226,7 @@ export interface OrchestrationIntegrationHarness { interface MakeOrchestrationIntegrationHarnessOptions { readonly provider?: ProviderDriverKind; readonly realCodex?: boolean; + readonly realCursor?: boolean; } export const makeOrchestrationIntegrationHarness = ( @@ -234,7 +238,14 @@ export const makeOrchestrationIntegrationHarness = ( const provider = options?.provider ?? ProviderDriverKind.make("codex"); const useRealCodex = options?.realCodex === true; - const adapterHarness = useRealCodex + const useRealCursor = options?.realCursor === true; + if (useRealCodex && useRealCursor) { + return yield* Effect.die( + "makeOrchestrationIntegrationHarness: realCodex and realCursor are mutually exclusive", + ); + } + const useRealAdapter = useRealCodex || useRealCursor; + const adapterHarness = useRealAdapter ? null : yield* makeTestProviderAdapterHarness({ provider, @@ -265,25 +276,41 @@ export const makeOrchestrationIntegrationHarness = ( const providerSessionDirectoryLayer = ProviderSessionDirectoryLive.pipe( Layer.provide(ProviderSessionRuntimeRepositoryLive), ); - const realCodexRegistry = Layer.effect( - ProviderAdapterRegistry, - Effect.gen(function* () { - const codexSettings = yield* decodeCodexSettings({}); - const codexAdapter = yield* makeCodexAdapter(codexSettings); - return makeAdapterRegistryMock({ - [ProviderDriverKind.make("codex")]: codexAdapter, - }); - }), - ).pipe( + const realAdapterRegistryLayer = useRealCodex + ? Layer.effect( + ProviderAdapterRegistry, + Effect.gen(function* () { + const codexSettings = yield* decodeCodexSettings({}); + const codexAdapter = yield* makeCodexAdapter(codexSettings); + return makeAdapterRegistryMock({ + [ProviderDriverKind.make("codex")]: codexAdapter, + }); + }), + ) + : useRealCursor + ? Layer.effect( + ProviderAdapterRegistry, + Effect.gen(function* () { + const cursorSettings = yield* decodeCursorSettings({ + binaryPath: process.env.CURSOR_AGENT_BIN ?? "agent", + }); + const cursorAdapter = yield* makeCursorAdapter(cursorSettings); + return makeAdapterRegistryMock({ + [ProviderDriverKind.make("cursor")]: cursorAdapter, + }); + }), + ) + : null; + const realAdapterRegistry = realAdapterRegistryLayer?.pipe( Layer.provideMerge(ServerConfig.layerTest(workspaceDir, rootDir)), Layer.provideMerge(NodeServices.layer), Layer.provideMerge(providerSessionDirectoryLayer), ); const providerEventLoggersLayer = Layer.succeed(ProviderEventLoggers, NoOpProviderEventLoggers); - const providerLayer = useRealCodex + const providerLayer = useRealAdapter ? makeProviderServiceLive().pipe( Layer.provide(providerSessionDirectoryLayer), - Layer.provide(realCodexRegistry), + Layer.provide(realAdapterRegistry!), Layer.provide(AnalyticsService.layerTest), Layer.provide(providerEventLoggersLayer), ) diff --git a/apps/server/integration/cursorResume.integration.test.ts b/apps/server/integration/cursorResume.integration.test.ts new file mode 100644 index 00000000000..2b917b993ee --- /dev/null +++ b/apps/server/integration/cursorResume.integration.test.ts @@ -0,0 +1,218 @@ +/** + * End-to-end Cursor resume tests against a real `agent acp` install. + * + * Enable with: + * T3_CURSOR_RESUME_E2E=1 vp test cursorResume.integration + * + * Uses composer-2.5 with fast mode disabled. The second turn intentionally + * asks the agent to run a shell command so we exercise tool calls after + * session/load resume, not just plain text replies. + */ +import { + CommandId, + DEFAULT_PROVIDER_INTERACTION_MODE, + MessageId, + ProjectId, + ProviderDriverKind, + ProviderInstanceId, + ThreadId, +} from "@t3tools/contracts"; +import { createModelSelection } from "@t3tools/shared/model"; +import * as NodeServices from "@effect/platform-node/NodeServices"; +import { assert, it } from "@effect/vitest"; +import * as Effect from "effect/Effect"; + +import { + makeOrchestrationIntegrationHarness, + type OrchestrationIntegrationHarness, +} from "./OrchestrationEngineHarness.integration.ts"; + +const PROJECT_ID = ProjectId.make("project-cursor-resume"); +const THREAD_ID = ThreadId.make("thread-cursor-resume"); +const CURSOR_PROVIDER = ProviderDriverKind.make("cursor"); +const CURSOR_INSTANCE = ProviderInstanceId.make("cursor"); +const CURSOR_MODEL = createModelSelection(CURSOR_INSTANCE, "composer-2.5", [ + { id: "fastMode", value: false }, +]); + +const MARKER_ONE = "CURSOR_RESUME_MARKER_ONE"; +const MARKER_TWO = "CURSOR_RESUME_MARKER_TWO"; +const TURN_TIMEOUT_MS = 300_000; + +const asMessageId = (value: string): MessageId => MessageId.make(value); + +function completedAssistantMessages(thread: { + readonly messages: ReadonlyArray<{ + readonly role: string; + readonly text: string; + readonly streaming: boolean; + }>; +}) { + return thread.messages.filter( + (message) => + message.role === "assistant" && !message.streaming && message.text.trim().length > 0, + ); +} + +function lastCompletedAssistantText(thread: { + readonly messages: ReadonlyArray<{ + readonly role: string; + readonly text: string; + readonly streaming: boolean; + }>; +}): string | undefined { + for (let index = thread.messages.length - 1; index >= 0; index -= 1) { + const message = thread.messages[index]; + if (message?.role === "assistant" && !message.streaming && message.text.trim().length > 0) { + return message.text; + } + } + return undefined; +} + +function hasToolActivity(thread: { + readonly activities: ReadonlyArray<{ readonly kind: string }>; +}): boolean { + return thread.activities.some( + (activity) => activity.kind === "tool.started" || activity.kind === "tool.completed", + ); +} + +const seedCursorProjectAndThread = (harness: OrchestrationIntegrationHarness) => + Effect.gen(function* () { + const createdAt = "2026-06-29T12:00:00.000Z"; + + yield* harness.engine.dispatch({ + type: "project.create", + commandId: CommandId.make("cmd-project-create-cursor-resume"), + projectId: PROJECT_ID, + title: "Cursor Resume Project", + workspaceRoot: harness.workspaceDir, + defaultModelSelection: CURSOR_MODEL, + createdAt, + }); + + yield* harness.engine.dispatch({ + type: "thread.create", + commandId: CommandId.make("cmd-thread-create-cursor-resume"), + threadId: THREAD_ID, + projectId: PROJECT_ID, + title: "Cursor Resume Thread", + modelSelection: CURSOR_MODEL, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + branch: null, + worktreePath: harness.workspaceDir, + createdAt, + }); + }); + +const startCursorTurn = (input: { + readonly harness: OrchestrationIntegrationHarness; + readonly commandId: string; + readonly messageId: string; + readonly text: string; +}) => + input.harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.make(input.commandId), + threadId: THREAD_ID, + message: { + messageId: asMessageId(input.messageId), + role: "user", + text: input.text, + attachments: [], + }, + modelSelection: CURSOR_MODEL, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + createdAt: "2026-06-29T12:00:01.000Z", + }); + +function withRealCursorHarness( + use: (harness: OrchestrationIntegrationHarness) => Effect.Effect, +) { + return Effect.acquireUseRelease( + makeOrchestrationIntegrationHarness({ + provider: CURSOR_PROVIDER, + realCursor: true, + }), + use, + (harness) => harness.dispose, + ).pipe(Effect.provide(NodeServices.layer)); +} + +it.live.skipIf(process.env.T3_CURSOR_RESUME_E2E !== "1")( + "resumes a cursor session after stopSession and completes a tool-using turn with a final assistant message", + () => + withRealCursorHarness((harness) => + Effect.gen(function* () { + yield* seedCursorProjectAndThread(harness); + + yield* startCursorTurn({ + harness, + commandId: "cmd-cursor-resume-turn-1", + messageId: "msg-cursor-resume-1", + text: `Run the shell command \`ls\` in the workspace, then reply with one line that contains exactly ${MARKER_ONE}.`, + }); + + const firstThread = yield* harness.waitForThread( + THREAD_ID, + (entry) => + entry.session?.status === "ready" && + entry.session.providerName === "cursor" && + lastCompletedAssistantText(entry)?.includes(MARKER_ONE) === true, + TURN_TIMEOUT_MS, + ); + assert.equal(firstThread.session?.providerName, "cursor"); + const firstAssistants = completedAssistantMessages(firstThread); + assert.equal( + firstAssistants.some((message) => message.text.includes(MARKER_ONE)), + true, + ); + + yield* harness.providerService.stopSession({ threadId: THREAD_ID }); + + yield* startCursorTurn({ + harness, + commandId: "cmd-cursor-resume-turn-2", + messageId: "msg-cursor-resume-2", + text: `Run the shell command \`pwd\` in the workspace, then reply with one line that contains exactly ${MARKER_TWO}.`, + }); + + const resumedThread = yield* harness.waitForThread( + THREAD_ID, + (entry) => { + if (entry.session?.status !== "ready" || entry.session.providerName !== "cursor") { + return false; + } + const assistantText = lastCompletedAssistantText(entry); + return ( + assistantText?.includes(MARKER_TWO) === true && + !entry.activities.some((activity) => activity.kind === "provider.turn.start.failed") + ); + }, + TURN_TIMEOUT_MS, + ); + + const finalAssistantText = lastCompletedAssistantText(resumedThread); + const markerOneMessage = completedAssistantMessages(resumedThread).find((message) => + message.text.includes(MARKER_ONE), + ); + const lastAssistant = completedAssistantMessages(resumedThread).at(-1); + assert.equal(finalAssistantText?.includes(MARKER_TWO), true); + assert.equal(markerOneMessage?.text.includes(MARKER_TWO), false); + assert.equal(lastAssistant?.text.includes(MARKER_TWO), true); + assert.equal(lastAssistant?.text.includes(MARKER_ONE), false); + assert.equal( + resumedThread.activities.some( + (activity) => activity.kind === "provider.turn.start.failed", + ), + false, + ); + // The regression we are guarding: tool calls happen but the final assistant + // segment never lands in the projection after session/load resume. + assert.equal(hasToolActivity(resumedThread), true); + }), + ), +); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 88ee2c5d243..377a030ca20 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -62,6 +62,14 @@ const asMessageId = (value: string): MessageId => MessageId.make(value); const asThreadId = (value: string): ThreadId => ThreadId.make(value); const asTurnId = (value: string): TurnId => TurnId.make(value); +/** Mirrors ProviderRuntimeIngestion.assistantSegmentMessageId for test expectations. */ +function expectedAssistantMessageId(itemId: string, turnId?: string, segmentIndex = 0): string { + const turnScope = turnId ? `:turn:${turnId}` : ""; + return segmentIndex === 0 + ? `assistant:${itemId}${turnScope}` + : `assistant:${itemId}${turnScope}:segment:${segmentIndex}`; +} + type LegacyProviderRuntimeEvent = { readonly type: string; readonly eventId: EventId; @@ -706,14 +714,15 @@ describe("ProviderRuntimeIngestion", () => { }, }); + const expectedMessageId = expectedAssistantMessageId("item-1", "turn-2"); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-1" && !message.streaming, + message.id === expectedMessageId && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-1", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedMessageId, ); expect(message?.text).toBe("hello world"); expect(message?.streaming).toBe(false); @@ -738,14 +747,15 @@ describe("ProviderRuntimeIngestion", () => { }, }); + const expectedMessageId = expectedAssistantMessageId("item-no-delta", "turn-no-delta"); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-no-delta" && !message.streaming, + message.id === expectedMessageId && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-no-delta", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedMessageId, ); expect(message?.text).toBe("assistant-only final text"); expect(message?.streaming).toBe(false); @@ -1613,9 +1623,10 @@ describe("ProviderRuntimeIngestion", () => { await harness.drain(); const midReadModel = await harness.readModel(); const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + const bufferedMessageId = expectedAssistantMessageId("item-buffered", "turn-buffered"); expect( midThread?.messages.some( - (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered", + (message: ProviderRuntimeTestMessage) => message.id === bufferedMessageId, ), ).toBe(false); @@ -1636,11 +1647,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered" && !message.streaming, + message.id === bufferedMessageId && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered", + (entry: ProviderRuntimeTestMessage) => entry.id === bufferedMessageId, ); expect(message?.text).toBe("buffer me"); expect(message?.streaming).toBe(false); @@ -1692,16 +1703,20 @@ describe("ProviderRuntimeIngestion", () => { }, }); + const requestFlushMessageId = expectedAssistantMessageId( + "item-buffered-request-flush", + "turn-buffered-request-flush", + ); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-request-flush" && + message.id === requestFlushMessageId && !message.streaming && message.text === "visible before approval", ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-flush", + (entry: ProviderRuntimeTestMessage) => entry.id === requestFlushMessageId, ); expect(message?.streaming).toBe(false); }); @@ -1758,17 +1773,20 @@ describe("ProviderRuntimeIngestion", () => { }, }); + const userInputFlushMessageId = expectedAssistantMessageId( + "item-buffered-user-input-flush", + "turn-buffered-user-input-flush", + ); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-user-input-flush" && + message.id === userInputFlushMessageId && !message.streaming && message.text === "visible before user input", ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === "assistant:item-buffered-user-input-flush", + (entry: ProviderRuntimeTestMessage) => entry.id === userInputFlushMessageId, ); expect(message?.streaming).toBe(false); }); @@ -1840,13 +1858,24 @@ describe("ProviderRuntimeIngestion", () => { const resumedAt = "2026-03-28T06:07:02.000Z"; const completedAt = "2026-03-28T06:07:03.000Z"; + const appendTurnId = "turn-buffered-request-append"; + const appendFirstMessageId = expectedAssistantMessageId( + "item-buffered-request-append", + appendTurnId, + ); + const appendSegmentMessageId = expectedAssistantMessageId( + "item-buffered-request-append", + appendTurnId, + 1, + ); + harness.emit({ type: "turn.started", eventId: asEventId("evt-turn-started-buffered-request-append"), provider: ProviderDriverKind.make("codex"), createdAt: startedAt, threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-buffered-request-append"), + turnId: asTurnId(appendTurnId), }); await waitForThread( harness.readModel, @@ -1885,7 +1914,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-request-append" && + message.id === appendFirstMessageId && !message.streaming && message.text === "first half", ), @@ -1921,17 +1950,16 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-request-append:segment:1" && + message.id === appendSegmentMessageId && !message.streaming && message.text === " second half", ), ); const firstMessage = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-append", + (entry: ProviderRuntimeTestMessage) => entry.id === appendFirstMessageId, ); const resumedMessage = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => - entry.id === "assistant:item-buffered-request-append:segment:1", + (entry: ProviderRuntimeTestMessage) => entry.id === appendSegmentMessageId, ); expect(firstMessage?.text).toBe("first half"); expect(firstMessage?.streaming).toBe(false); @@ -1946,21 +1974,17 @@ describe("ProviderRuntimeIngestion", () => { const assistantEvents = events.filter( (event): event is Extract<(typeof events)[number], { type: "thread.message-sent" }> => event.type === "thread.message-sent" && - event.payload.messageId.startsWith("assistant:item-buffered-request-append"), + event.payload.messageId.startsWith(appendFirstMessageId), ); expect(assistantEvents).toHaveLength(4); expect(assistantEvents[0]?.payload.streaming).toBe(true); expect(assistantEvents[0]?.payload.text).toBe("first half"); expect(assistantEvents[1]?.payload.streaming).toBe(false); expect(assistantEvents[1]?.payload.text).toBe(""); - expect(assistantEvents[2]?.payload.messageId).toBe( - "assistant:item-buffered-request-append:segment:1", - ); + expect(assistantEvents[2]?.payload.messageId).toBe(appendSegmentMessageId); expect(assistantEvents[2]?.payload.streaming).toBe(true); expect(assistantEvents[2]?.payload.text).toBe(" second half"); - expect(assistantEvents[3]?.payload.messageId).toBe( - "assistant:item-buffered-request-append:segment:1", - ); + expect(assistantEvents[3]?.payload.messageId).toBe(appendSegmentMessageId); expect(assistantEvents[3]?.payload.streaming).toBe(false); expect(assistantEvents[3]?.payload.text).toBe(""); }); @@ -1972,13 +1996,24 @@ describe("ProviderRuntimeIngestion", () => { const resumedAt = "2026-03-28T07:00:02.000Z"; const completedAt = "2026-03-28T07:00:03.000Z"; + const streamingSegmentTurnId = "turn-streaming-request-segment"; + const streamingFirstMessageId = expectedAssistantMessageId( + "item-streaming-request-segment", + streamingSegmentTurnId, + ); + const streamingSegmentMessageId = expectedAssistantMessageId( + "item-streaming-request-segment", + streamingSegmentTurnId, + 1, + ); + harness.emit({ type: "turn.started", eventId: asEventId("evt-turn-started-streaming-request-segment"), provider: ProviderDriverKind.make("codex"), createdAt: startedAt, threadId: asThreadId("thread-1"), - turnId: asTurnId("turn-streaming-request-segment"), + turnId: asTurnId(streamingSegmentTurnId), }); await waitForThread( harness.readModel, @@ -2017,7 +2052,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment" && + message.id === streamingFirstMessageId && !message.streaming && message.text === "before approval", ), @@ -2053,21 +2088,19 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment:segment:1" && + message.id === streamingSegmentMessageId && !message.streaming && message.text === " after approval", ), ); expect( thread.messages.find( - (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment", + (message: ProviderRuntimeTestMessage) => message.id === streamingFirstMessageId, )?.text, ).toBe("before approval"); expect( thread.messages.find( - (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment:segment:1", + (message: ProviderRuntimeTestMessage) => message.id === streamingSegmentMessageId, )?.text, ).toBe(" after approval"); }); @@ -2093,7 +2126,8 @@ describe("ProviderRuntimeIngestion", () => { ); const resumedItemId = asItemId("assistant:session-1:segment:3"); - const resumedMessageId = "assistant:assistant:session-1:segment:3"; + const resumeTurnId = "turn-resume-assistant-segment"; + const resumedMessageId = expectedAssistantMessageId(String(resumedItemId), resumeTurnId); harness.emit({ type: "content.delta", @@ -2185,6 +2219,152 @@ describe("ProviderRuntimeIngestion", () => { ).toBe("fresh answer after resume"); }); + it("creates a new assistant message when a later turn reuses the same provider item id", async () => { + const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); + const now = "2026-06-29T00:00:00.000Z"; + const sharedItemId = asItemId("assistant:session-1:segment:0"); + const turnOneId = "turn-one"; + const turnTwoId = "turn-two"; + const turnOneMessageId = expectedAssistantMessageId(String(sharedItemId), turnOneId); + const turnTwoMessageId = expectedAssistantMessageId(String(sharedItemId), turnTwoId); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-cross-turn-item-reuse-one"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnOneId), + }); + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && thread.session?.activeTurnId === turnOneId, + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-cross-turn-item-reuse-one"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnOneId), + itemId: sharedItemId, + payload: { + streamKind: "assistant_text", + delta: "turn one answer", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-message-completed-cross-turn-item-reuse-one"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnOneId), + itemId: sharedItemId, + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-turn-completed-cross-turn-item-reuse-one"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnOneId), + status: "completed", + }); + + await waitForThread(harness.readModel, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === turnOneMessageId && + !message.streaming && + message.text === "turn one answer", + ), + ); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-cross-turn-item-reuse-two"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnTwoId), + }); + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && thread.session?.activeTurnId === turnTwoId, + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-cross-turn-item-reuse-two"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnTwoId), + itemId: sharedItemId, + payload: { + streamKind: "assistant_text", + delta: "turn two answer", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-message-completed-cross-turn-item-reuse-two"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnTwoId), + itemId: sharedItemId, + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-turn-completed-cross-turn-item-reuse-two"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId(turnTwoId), + status: "completed", + }); + + const thread = await waitForThread(harness.readModel, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === turnTwoMessageId && + !message.streaming && + message.text === "turn two answer", + ), + ); + + const turnOneMessage = thread.messages.find( + (message: ProviderRuntimeTestMessage) => message.id === turnOneMessageId, + ); + const turnTwoMessage = thread.messages.find( + (message: ProviderRuntimeTestMessage) => message.id === turnTwoMessageId, + ); + expect(turnOneMessage?.text).toBe("turn one answer"); + expect(turnTwoMessage?.text).toBe("turn two answer"); + expect(turnOneMessage?.text).not.toContain("turn two answer"); + + const turnOneIndex = thread.messages.findIndex( + (message: ProviderRuntimeTestMessage) => message.id === turnOneMessageId, + ); + const turnTwoIndex = thread.messages.findIndex( + (message: ProviderRuntimeTestMessage) => message.id === turnTwoMessageId, + ); + expect(turnTwoIndex).toBeGreaterThan(turnOneIndex); + }); + it("streams assistant deltas when thread.turn.start requests streaming mode", async () => { const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); const now = "2026-01-01T00:00:00.000Z"; @@ -2222,6 +2402,11 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === "turn-streaming-mode", ); + const streamingModeMessageId = expectedAssistantMessageId( + "item-streaming-mode", + "turn-streaming-mode", + ); + harness.emit({ type: "content.delta", eventId: asEventId("evt-message-delta-streaming-mode"), @@ -2239,13 +2424,13 @@ describe("ProviderRuntimeIngestion", () => { const liveThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-mode" && + message.id === streamingModeMessageId && message.streaming && message.text === "hello live", ), ); const liveMessage = liveThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", + (entry: ProviderRuntimeTestMessage) => entry.id === streamingModeMessageId, ); expect(liveMessage?.streaming).toBe(true); @@ -2267,11 +2452,11 @@ describe("ProviderRuntimeIngestion", () => { const finalThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-mode" && !message.streaming, + message.id === streamingModeMessageId && !message.streaming, ), ); const finalMessage = finalThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", + (entry: ProviderRuntimeTestMessage) => entry.id === streamingModeMessageId, ); expect(finalMessage?.text).toBe("hello live"); expect(finalMessage?.streaming).toBe(false); @@ -2297,6 +2482,8 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === "turn-buffer-spill", ); + const spillMessageId = expectedAssistantMessageId("item-buffer-spill", "turn-buffer-spill"); + harness.emit({ type: "content.delta", eventId: asEventId("evt-message-delta-buffer-spill"), @@ -2327,11 +2514,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffer-spill" && !message.streaming, + message.id === spillMessageId && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffer-spill", + (entry: ProviderRuntimeTestMessage) => entry.id === spillMessageId, ); expect(message?.text.length).toBe(oversizedText.length); expect(message?.text).toBe(oversizedText); @@ -2358,6 +2545,8 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === "turn-complete-dedup", ); + const dedupMessageId = expectedAssistantMessageId("item-complete-dedup", "turn-complete-dedup"); + harness.emit({ type: "content.delta", eventId: asEventId("evt-message-delta-for-complete-dedup"), @@ -2403,7 +2592,7 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === null && thread.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-complete-dedup" && !message.streaming, + message.id === dedupMessageId && !message.streaming, ), ); @@ -2416,10 +2605,7 @@ describe("ProviderRuntimeIngestion", () => { if (event.type !== "thread.message-sent") { return false; } - return ( - event.payload.messageId === "assistant:item-complete-dedup" && - event.payload.streaming === false - ); + return event.payload.messageId === dedupMessageId && event.payload.streaming === false; }); expect(completionEvents).toHaveLength(1); }); @@ -2774,7 +2960,9 @@ describe("ProviderRuntimeIngestion", () => { (entry: ProviderRuntimeTestCheckpoint) => entry.turnId === "turn-p1", ); expect(checkpoint?.status).toBe("missing"); - expect(checkpoint?.assistantMessageId).toBe("assistant:item-p1-assistant"); + expect(checkpoint?.assistantMessageId).toBe( + expectedAssistantMessageId("item-p1-assistant", "turn-p1"), + ); expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated"); }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index a94c3e1142d..3038d60f8a2 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -197,14 +197,25 @@ function assistantSegmentBaseKeyFromEvent(event: ProviderRuntimeEvent): string { return String(event.itemId ?? event.turnId ?? event.eventId); } -function assistantSegmentMessageId(baseKey: string, segmentIndex: number): MessageId { +function assistantSegmentMessageId( + baseKey: string, + segmentIndex: number, + turnId?: string | null, +): MessageId { + const turnScope = turnId ? `:turn:${turnId}` : ""; return MessageId.make( - segmentIndex === 0 ? `assistant:${baseKey}` : `assistant:${baseKey}:segment:${segmentIndex}`, + segmentIndex === 0 + ? `assistant:${baseKey}${turnScope}` + : `assistant:${baseKey}${turnScope}:segment:${segmentIndex}`, ); } function assistantMessageIdFromEvent(event: ProviderRuntimeEvent): MessageId { - return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(event), 0); + return assistantSegmentMessageId( + assistantSegmentBaseKeyFromEvent(event), + 0, + event.turnId ?? null, + ); } function nextAssistantSegmentIndexFromItemId(itemId: string | undefined): number { @@ -769,11 +780,15 @@ const make = Effect.gen(function* () { onNone: () => ({ baseKey: input.baseKey, nextSegmentIndex: 1, - activeMessageId: assistantSegmentMessageId(input.baseKey, 0), + activeMessageId: assistantSegmentMessageId(input.baseKey, 0, input.turnId), }), onSome: (state) => { const segmentIndex = state.baseKey === input.baseKey ? state.nextSegmentIndex : 0; - const messageId = assistantSegmentMessageId(input.baseKey, segmentIndex); + const messageId = assistantSegmentMessageId( + input.baseKey, + segmentIndex, + input.turnId, + ); return { baseKey: input.baseKey, nextSegmentIndex: state.baseKey === input.baseKey ? state.nextSegmentIndex + 1 : 1, @@ -794,7 +809,7 @@ const make = Effect.gen(function* () { }) => Effect.gen(function* () { if (!input.turnId) { - return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0); + return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0, null); } const activeMessageId = yield* getActiveAssistantMessageIdForTurn( @@ -1502,9 +1517,7 @@ const make = Effect.gen(function* () { const assistantCompletion = event.type === "item.completed" && event.payload.itemType === "assistant_message" ? { - messageId: MessageId.make( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ), + messageId: assistantMessageIdFromEvent(event), fallbackText: event.payload.detail, } : undefined; @@ -1677,9 +1690,7 @@ const make = Effect.gen(function* () { if (hasCheckpointForTurn(checkpointContext.checkpoints, turnId)) { // Already tracked; no-op. } else { - const assistantMessageId = MessageId.make( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ); + const assistantMessageId = assistantMessageIdFromEvent(event); yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", commandId: yield* providerCommandId(event, "thread-turn-diff-complete"),