diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index d4351f34452..ebc4f984b86 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -5,7 +5,6 @@ import * as NodeServices from "@effect/platform-node/NodeServices"; import { ApprovalRequestId, CodexSettings, - CursorSettings, ProviderDriverKind, type OrchestrationEvent, type OrchestrationThread, @@ -40,7 +39,6 @@ import { ProviderSessionDirectoryLive } from "../src/provider/Layers/ProviderSes import { ServerSettingsService } from "../src/serverSettings.ts"; import { makeProviderServiceLive } from "../src/provider/Layers/ProviderService.ts"; import { makeCodexAdapter } from "../src/provider/Layers/CodexAdapter.ts"; -import { makeCursorAdapter } from "../src/provider/Layers/CursorAdapter.ts"; import { NoOpProviderEventLoggers, ProviderEventLoggers, @@ -82,7 +80,6 @@ import * as VcsProcess from "../src/vcs/VcsProcess.ts"; import * as AgentAwarenessRelay from "../src/relay/AgentAwarenessRelay.ts"; const decodeCodexSettings = Schema.decodeEffect(CodexSettings); -const decodeCursorSettings = Schema.decodeEffect(CursorSettings); function runGit(cwd: string, args: ReadonlyArray) { return NodeChildProcess.execFileSync("git", args, { @@ -226,7 +223,6 @@ export interface OrchestrationIntegrationHarness { interface MakeOrchestrationIntegrationHarnessOptions { readonly provider?: ProviderDriverKind; readonly realCodex?: boolean; - readonly realCursor?: boolean; } export const makeOrchestrationIntegrationHarness = ( @@ -238,14 +234,7 @@ export const makeOrchestrationIntegrationHarness = ( const provider = options?.provider ?? ProviderDriverKind.make("codex"); const useRealCodex = options?.realCodex === true; - const useRealCursor = options?.realCursor === true; - if (useRealCodex && useRealCursor) { - return yield* Effect.die( - "makeOrchestrationIntegrationHarness: realCodex and realCursor are mutually exclusive", - ); - } - const useRealAdapter = useRealCodex || useRealCursor; - const adapterHarness = useRealAdapter + const adapterHarness = useRealCodex ? null : yield* makeTestProviderAdapterHarness({ provider, @@ -276,41 +265,25 @@ export const makeOrchestrationIntegrationHarness = ( const providerSessionDirectoryLayer = ProviderSessionDirectoryLive.pipe( Layer.provide(ProviderSessionRuntimeRepositoryLive), ); - const realAdapterRegistryLayer = useRealCodex - ? Layer.effect( - ProviderAdapterRegistry, - Effect.gen(function* () { - const codexSettings = yield* decodeCodexSettings({}); - const codexAdapter = yield* makeCodexAdapter(codexSettings); - return makeAdapterRegistryMock({ - [ProviderDriverKind.make("codex")]: codexAdapter, - }); - }), - ) - : useRealCursor - ? Layer.effect( - ProviderAdapterRegistry, - Effect.gen(function* () { - const cursorSettings = yield* decodeCursorSettings({ - binaryPath: process.env.CURSOR_AGENT_BIN ?? "agent", - }); - const cursorAdapter = yield* makeCursorAdapter(cursorSettings); - return makeAdapterRegistryMock({ - [ProviderDriverKind.make("cursor")]: cursorAdapter, - }); - }), - ) - : null; - const realAdapterRegistry = realAdapterRegistryLayer?.pipe( + const realCodexRegistry = Layer.effect( + ProviderAdapterRegistry, + Effect.gen(function* () { + const codexSettings = yield* decodeCodexSettings({}); + const codexAdapter = yield* makeCodexAdapter(codexSettings); + return makeAdapterRegistryMock({ + [ProviderDriverKind.make("codex")]: codexAdapter, + }); + }), + ).pipe( Layer.provideMerge(ServerConfig.layerTest(workspaceDir, rootDir)), Layer.provideMerge(NodeServices.layer), Layer.provideMerge(providerSessionDirectoryLayer), ); const providerEventLoggersLayer = Layer.succeed(ProviderEventLoggers, NoOpProviderEventLoggers); - const providerLayer = useRealAdapter + const providerLayer = useRealCodex ? makeProviderServiceLive().pipe( Layer.provide(providerSessionDirectoryLayer), - Layer.provide(realAdapterRegistry!), + Layer.provide(realCodexRegistry), Layer.provide(AnalyticsService.layerTest), Layer.provide(providerEventLoggersLayer), ) diff --git a/apps/server/integration/cursorResume.integration.test.ts b/apps/server/integration/cursorResume.integration.test.ts deleted file mode 100644 index 2b917b993ee..00000000000 --- a/apps/server/integration/cursorResume.integration.test.ts +++ /dev/null @@ -1,218 +0,0 @@ -/** - * End-to-end Cursor resume tests against a real `agent acp` install. - * - * Enable with: - * T3_CURSOR_RESUME_E2E=1 vp test cursorResume.integration - * - * Uses composer-2.5 with fast mode disabled. The second turn intentionally - * asks the agent to run a shell command so we exercise tool calls after - * session/load resume, not just plain text replies. - */ -import { - CommandId, - DEFAULT_PROVIDER_INTERACTION_MODE, - MessageId, - ProjectId, - ProviderDriverKind, - ProviderInstanceId, - ThreadId, -} from "@t3tools/contracts"; -import { createModelSelection } from "@t3tools/shared/model"; -import * as NodeServices from "@effect/platform-node/NodeServices"; -import { assert, it } from "@effect/vitest"; -import * as Effect from "effect/Effect"; - -import { - makeOrchestrationIntegrationHarness, - type OrchestrationIntegrationHarness, -} from "./OrchestrationEngineHarness.integration.ts"; - -const PROJECT_ID = ProjectId.make("project-cursor-resume"); -const THREAD_ID = ThreadId.make("thread-cursor-resume"); -const CURSOR_PROVIDER = ProviderDriverKind.make("cursor"); -const CURSOR_INSTANCE = ProviderInstanceId.make("cursor"); -const CURSOR_MODEL = createModelSelection(CURSOR_INSTANCE, "composer-2.5", [ - { id: "fastMode", value: false }, -]); - -const MARKER_ONE = "CURSOR_RESUME_MARKER_ONE"; -const MARKER_TWO = "CURSOR_RESUME_MARKER_TWO"; -const TURN_TIMEOUT_MS = 300_000; - -const asMessageId = (value: string): MessageId => MessageId.make(value); - -function completedAssistantMessages(thread: { - readonly messages: ReadonlyArray<{ - readonly role: string; - readonly text: string; - readonly streaming: boolean; - }>; -}) { - return thread.messages.filter( - (message) => - message.role === "assistant" && !message.streaming && message.text.trim().length > 0, - ); -} - -function lastCompletedAssistantText(thread: { - readonly messages: ReadonlyArray<{ - readonly role: string; - readonly text: string; - readonly streaming: boolean; - }>; -}): string | undefined { - for (let index = thread.messages.length - 1; index >= 0; index -= 1) { - const message = thread.messages[index]; - if (message?.role === "assistant" && !message.streaming && message.text.trim().length > 0) { - return message.text; - } - } - return undefined; -} - -function hasToolActivity(thread: { - readonly activities: ReadonlyArray<{ readonly kind: string }>; -}): boolean { - return thread.activities.some( - (activity) => activity.kind === "tool.started" || activity.kind === "tool.completed", - ); -} - -const seedCursorProjectAndThread = (harness: OrchestrationIntegrationHarness) => - Effect.gen(function* () { - const createdAt = "2026-06-29T12:00:00.000Z"; - - yield* harness.engine.dispatch({ - type: "project.create", - commandId: CommandId.make("cmd-project-create-cursor-resume"), - projectId: PROJECT_ID, - title: "Cursor Resume Project", - workspaceRoot: harness.workspaceDir, - defaultModelSelection: CURSOR_MODEL, - createdAt, - }); - - yield* harness.engine.dispatch({ - type: "thread.create", - commandId: CommandId.make("cmd-thread-create-cursor-resume"), - threadId: THREAD_ID, - projectId: PROJECT_ID, - title: "Cursor Resume Thread", - modelSelection: CURSOR_MODEL, - interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, - runtimeMode: "full-access", - branch: null, - worktreePath: harness.workspaceDir, - createdAt, - }); - }); - -const startCursorTurn = (input: { - readonly harness: OrchestrationIntegrationHarness; - readonly commandId: string; - readonly messageId: string; - readonly text: string; -}) => - input.harness.engine.dispatch({ - type: "thread.turn.start", - commandId: CommandId.make(input.commandId), - threadId: THREAD_ID, - message: { - messageId: asMessageId(input.messageId), - role: "user", - text: input.text, - attachments: [], - }, - modelSelection: CURSOR_MODEL, - interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, - runtimeMode: "full-access", - createdAt: "2026-06-29T12:00:01.000Z", - }); - -function withRealCursorHarness( - use: (harness: OrchestrationIntegrationHarness) => Effect.Effect, -) { - return Effect.acquireUseRelease( - makeOrchestrationIntegrationHarness({ - provider: CURSOR_PROVIDER, - realCursor: true, - }), - use, - (harness) => harness.dispose, - ).pipe(Effect.provide(NodeServices.layer)); -} - -it.live.skipIf(process.env.T3_CURSOR_RESUME_E2E !== "1")( - "resumes a cursor session after stopSession and completes a tool-using turn with a final assistant message", - () => - withRealCursorHarness((harness) => - Effect.gen(function* () { - yield* seedCursorProjectAndThread(harness); - - yield* startCursorTurn({ - harness, - commandId: "cmd-cursor-resume-turn-1", - messageId: "msg-cursor-resume-1", - text: `Run the shell command \`ls\` in the workspace, then reply with one line that contains exactly ${MARKER_ONE}.`, - }); - - const firstThread = yield* harness.waitForThread( - THREAD_ID, - (entry) => - entry.session?.status === "ready" && - entry.session.providerName === "cursor" && - lastCompletedAssistantText(entry)?.includes(MARKER_ONE) === true, - TURN_TIMEOUT_MS, - ); - assert.equal(firstThread.session?.providerName, "cursor"); - const firstAssistants = completedAssistantMessages(firstThread); - assert.equal( - firstAssistants.some((message) => message.text.includes(MARKER_ONE)), - true, - ); - - yield* harness.providerService.stopSession({ threadId: THREAD_ID }); - - yield* startCursorTurn({ - harness, - commandId: "cmd-cursor-resume-turn-2", - messageId: "msg-cursor-resume-2", - text: `Run the shell command \`pwd\` in the workspace, then reply with one line that contains exactly ${MARKER_TWO}.`, - }); - - const resumedThread = yield* harness.waitForThread( - THREAD_ID, - (entry) => { - if (entry.session?.status !== "ready" || entry.session.providerName !== "cursor") { - return false; - } - const assistantText = lastCompletedAssistantText(entry); - return ( - assistantText?.includes(MARKER_TWO) === true && - !entry.activities.some((activity) => activity.kind === "provider.turn.start.failed") - ); - }, - TURN_TIMEOUT_MS, - ); - - const finalAssistantText = lastCompletedAssistantText(resumedThread); - const markerOneMessage = completedAssistantMessages(resumedThread).find((message) => - message.text.includes(MARKER_ONE), - ); - const lastAssistant = completedAssistantMessages(resumedThread).at(-1); - assert.equal(finalAssistantText?.includes(MARKER_TWO), true); - assert.equal(markerOneMessage?.text.includes(MARKER_TWO), false); - assert.equal(lastAssistant?.text.includes(MARKER_TWO), true); - assert.equal(lastAssistant?.text.includes(MARKER_ONE), false); - assert.equal( - resumedThread.activities.some( - (activity) => activity.kind === "provider.turn.start.failed", - ), - false, - ); - // The regression we are guarding: tool calls happen but the final assistant - // segment never lands in the projection after session/load resume. - assert.equal(hasToolActivity(resumedThread), true); - }), - ), -); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 377a030ca20..88ee2c5d243 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -62,14 +62,6 @@ const asMessageId = (value: string): MessageId => MessageId.make(value); const asThreadId = (value: string): ThreadId => ThreadId.make(value); const asTurnId = (value: string): TurnId => TurnId.make(value); -/** Mirrors ProviderRuntimeIngestion.assistantSegmentMessageId for test expectations. */ -function expectedAssistantMessageId(itemId: string, turnId?: string, segmentIndex = 0): string { - const turnScope = turnId ? `:turn:${turnId}` : ""; - return segmentIndex === 0 - ? `assistant:${itemId}${turnScope}` - : `assistant:${itemId}${turnScope}:segment:${segmentIndex}`; -} - type LegacyProviderRuntimeEvent = { readonly type: string; readonly eventId: EventId; @@ -714,15 +706,14 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const expectedMessageId = expectedAssistantMessageId("item-1", "turn-2"); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === expectedMessageId && !message.streaming, + message.id === "assistant:item-1" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === expectedMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-1", ); expect(message?.text).toBe("hello world"); expect(message?.streaming).toBe(false); @@ -747,15 +738,14 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const expectedMessageId = expectedAssistantMessageId("item-no-delta", "turn-no-delta"); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === expectedMessageId && !message.streaming, + message.id === "assistant:item-no-delta" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === expectedMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-no-delta", ); expect(message?.text).toBe("assistant-only final text"); expect(message?.streaming).toBe(false); @@ -1623,10 +1613,9 @@ describe("ProviderRuntimeIngestion", () => { await harness.drain(); const midReadModel = await harness.readModel(); const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); - const bufferedMessageId = expectedAssistantMessageId("item-buffered", "turn-buffered"); expect( midThread?.messages.some( - (message: ProviderRuntimeTestMessage) => message.id === bufferedMessageId, + (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered", ), ).toBe(false); @@ -1647,11 +1636,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === bufferedMessageId && !message.streaming, + message.id === "assistant:item-buffered" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === bufferedMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered", ); expect(message?.text).toBe("buffer me"); expect(message?.streaming).toBe(false); @@ -1703,20 +1692,16 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const requestFlushMessageId = expectedAssistantMessageId( - "item-buffered-request-flush", - "turn-buffered-request-flush", - ); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === requestFlushMessageId && + message.id === "assistant:item-buffered-request-flush" && !message.streaming && message.text === "visible before approval", ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === requestFlushMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-flush", ); expect(message?.streaming).toBe(false); }); @@ -1773,20 +1758,17 @@ describe("ProviderRuntimeIngestion", () => { }, }); - const userInputFlushMessageId = expectedAssistantMessageId( - "item-buffered-user-input-flush", - "turn-buffered-user-input-flush", - ); const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === userInputFlushMessageId && + message.id === "assistant:item-buffered-user-input-flush" && !message.streaming && message.text === "visible before user input", ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === userInputFlushMessageId, + (entry: ProviderRuntimeTestMessage) => + entry.id === "assistant:item-buffered-user-input-flush", ); expect(message?.streaming).toBe(false); }); @@ -1858,24 +1840,13 @@ describe("ProviderRuntimeIngestion", () => { const resumedAt = "2026-03-28T06:07:02.000Z"; const completedAt = "2026-03-28T06:07:03.000Z"; - const appendTurnId = "turn-buffered-request-append"; - const appendFirstMessageId = expectedAssistantMessageId( - "item-buffered-request-append", - appendTurnId, - ); - const appendSegmentMessageId = expectedAssistantMessageId( - "item-buffered-request-append", - appendTurnId, - 1, - ); - harness.emit({ type: "turn.started", eventId: asEventId("evt-turn-started-buffered-request-append"), provider: ProviderDriverKind.make("codex"), createdAt: startedAt, threadId: asThreadId("thread-1"), - turnId: asTurnId(appendTurnId), + turnId: asTurnId("turn-buffered-request-append"), }); await waitForThread( harness.readModel, @@ -1914,7 +1885,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === appendFirstMessageId && + message.id === "assistant:item-buffered-request-append" && !message.streaming && message.text === "first half", ), @@ -1950,16 +1921,17 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === appendSegmentMessageId && + message.id === "assistant:item-buffered-request-append:segment:1" && !message.streaming && message.text === " second half", ), ); const firstMessage = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === appendFirstMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-append", ); const resumedMessage = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === appendSegmentMessageId, + (entry: ProviderRuntimeTestMessage) => + entry.id === "assistant:item-buffered-request-append:segment:1", ); expect(firstMessage?.text).toBe("first half"); expect(firstMessage?.streaming).toBe(false); @@ -1974,17 +1946,21 @@ describe("ProviderRuntimeIngestion", () => { const assistantEvents = events.filter( (event): event is Extract<(typeof events)[number], { type: "thread.message-sent" }> => event.type === "thread.message-sent" && - event.payload.messageId.startsWith(appendFirstMessageId), + event.payload.messageId.startsWith("assistant:item-buffered-request-append"), ); expect(assistantEvents).toHaveLength(4); expect(assistantEvents[0]?.payload.streaming).toBe(true); expect(assistantEvents[0]?.payload.text).toBe("first half"); expect(assistantEvents[1]?.payload.streaming).toBe(false); expect(assistantEvents[1]?.payload.text).toBe(""); - expect(assistantEvents[2]?.payload.messageId).toBe(appendSegmentMessageId); + expect(assistantEvents[2]?.payload.messageId).toBe( + "assistant:item-buffered-request-append:segment:1", + ); expect(assistantEvents[2]?.payload.streaming).toBe(true); expect(assistantEvents[2]?.payload.text).toBe(" second half"); - expect(assistantEvents[3]?.payload.messageId).toBe(appendSegmentMessageId); + expect(assistantEvents[3]?.payload.messageId).toBe( + "assistant:item-buffered-request-append:segment:1", + ); expect(assistantEvents[3]?.payload.streaming).toBe(false); expect(assistantEvents[3]?.payload.text).toBe(""); }); @@ -1996,24 +1972,13 @@ describe("ProviderRuntimeIngestion", () => { const resumedAt = "2026-03-28T07:00:02.000Z"; const completedAt = "2026-03-28T07:00:03.000Z"; - const streamingSegmentTurnId = "turn-streaming-request-segment"; - const streamingFirstMessageId = expectedAssistantMessageId( - "item-streaming-request-segment", - streamingSegmentTurnId, - ); - const streamingSegmentMessageId = expectedAssistantMessageId( - "item-streaming-request-segment", - streamingSegmentTurnId, - 1, - ); - harness.emit({ type: "turn.started", eventId: asEventId("evt-turn-started-streaming-request-segment"), provider: ProviderDriverKind.make("codex"), createdAt: startedAt, threadId: asThreadId("thread-1"), - turnId: asTurnId(streamingSegmentTurnId), + turnId: asTurnId("turn-streaming-request-segment"), }); await waitForThread( harness.readModel, @@ -2052,7 +2017,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === streamingFirstMessageId && + message.id === "assistant:item-streaming-request-segment" && !message.streaming && message.text === "before approval", ), @@ -2088,19 +2053,21 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === streamingSegmentMessageId && + message.id === "assistant:item-streaming-request-segment:segment:1" && !message.streaming && message.text === " after approval", ), ); expect( thread.messages.find( - (message: ProviderRuntimeTestMessage) => message.id === streamingFirstMessageId, + (message: ProviderRuntimeTestMessage) => + message.id === "assistant:item-streaming-request-segment", )?.text, ).toBe("before approval"); expect( thread.messages.find( - (message: ProviderRuntimeTestMessage) => message.id === streamingSegmentMessageId, + (message: ProviderRuntimeTestMessage) => + message.id === "assistant:item-streaming-request-segment:segment:1", )?.text, ).toBe(" after approval"); }); @@ -2126,8 +2093,7 @@ describe("ProviderRuntimeIngestion", () => { ); const resumedItemId = asItemId("assistant:session-1:segment:3"); - const resumeTurnId = "turn-resume-assistant-segment"; - const resumedMessageId = expectedAssistantMessageId(String(resumedItemId), resumeTurnId); + const resumedMessageId = "assistant:assistant:session-1:segment:3"; harness.emit({ type: "content.delta", @@ -2219,152 +2185,6 @@ describe("ProviderRuntimeIngestion", () => { ).toBe("fresh answer after resume"); }); - it("creates a new assistant message when a later turn reuses the same provider item id", async () => { - const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); - const now = "2026-06-29T00:00:00.000Z"; - const sharedItemId = asItemId("assistant:session-1:segment:0"); - const turnOneId = "turn-one"; - const turnTwoId = "turn-two"; - const turnOneMessageId = expectedAssistantMessageId(String(sharedItemId), turnOneId); - const turnTwoMessageId = expectedAssistantMessageId(String(sharedItemId), turnTwoId); - - harness.emit({ - type: "turn.started", - eventId: asEventId("evt-turn-started-cross-turn-item-reuse-one"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnOneId), - }); - await waitForThread( - harness.readModel, - (thread) => - thread.session?.status === "running" && thread.session?.activeTurnId === turnOneId, - ); - - harness.emit({ - type: "content.delta", - eventId: asEventId("evt-message-delta-cross-turn-item-reuse-one"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnOneId), - itemId: sharedItemId, - payload: { - streamKind: "assistant_text", - delta: "turn one answer", - }, - }); - harness.emit({ - type: "item.completed", - eventId: asEventId("evt-message-completed-cross-turn-item-reuse-one"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnOneId), - itemId: sharedItemId, - payload: { - itemType: "assistant_message", - status: "completed", - }, - }); - harness.emit({ - type: "turn.completed", - eventId: asEventId("evt-turn-completed-cross-turn-item-reuse-one"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnOneId), - status: "completed", - }); - - await waitForThread(harness.readModel, (entry) => - entry.messages.some( - (message: ProviderRuntimeTestMessage) => - message.id === turnOneMessageId && - !message.streaming && - message.text === "turn one answer", - ), - ); - - harness.emit({ - type: "turn.started", - eventId: asEventId("evt-turn-started-cross-turn-item-reuse-two"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnTwoId), - }); - await waitForThread( - harness.readModel, - (thread) => - thread.session?.status === "running" && thread.session?.activeTurnId === turnTwoId, - ); - - harness.emit({ - type: "content.delta", - eventId: asEventId("evt-message-delta-cross-turn-item-reuse-two"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnTwoId), - itemId: sharedItemId, - payload: { - streamKind: "assistant_text", - delta: "turn two answer", - }, - }); - harness.emit({ - type: "item.completed", - eventId: asEventId("evt-message-completed-cross-turn-item-reuse-two"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnTwoId), - itemId: sharedItemId, - payload: { - itemType: "assistant_message", - status: "completed", - }, - }); - harness.emit({ - type: "turn.completed", - eventId: asEventId("evt-turn-completed-cross-turn-item-reuse-two"), - provider: ProviderDriverKind.make("cursor"), - createdAt: now, - threadId: asThreadId("thread-1"), - turnId: asTurnId(turnTwoId), - status: "completed", - }); - - const thread = await waitForThread(harness.readModel, (entry) => - entry.messages.some( - (message: ProviderRuntimeTestMessage) => - message.id === turnTwoMessageId && - !message.streaming && - message.text === "turn two answer", - ), - ); - - const turnOneMessage = thread.messages.find( - (message: ProviderRuntimeTestMessage) => message.id === turnOneMessageId, - ); - const turnTwoMessage = thread.messages.find( - (message: ProviderRuntimeTestMessage) => message.id === turnTwoMessageId, - ); - expect(turnOneMessage?.text).toBe("turn one answer"); - expect(turnTwoMessage?.text).toBe("turn two answer"); - expect(turnOneMessage?.text).not.toContain("turn two answer"); - - const turnOneIndex = thread.messages.findIndex( - (message: ProviderRuntimeTestMessage) => message.id === turnOneMessageId, - ); - const turnTwoIndex = thread.messages.findIndex( - (message: ProviderRuntimeTestMessage) => message.id === turnTwoMessageId, - ); - expect(turnTwoIndex).toBeGreaterThan(turnOneIndex); - }); - it("streams assistant deltas when thread.turn.start requests streaming mode", async () => { const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); const now = "2026-01-01T00:00:00.000Z"; @@ -2402,11 +2222,6 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === "turn-streaming-mode", ); - const streamingModeMessageId = expectedAssistantMessageId( - "item-streaming-mode", - "turn-streaming-mode", - ); - harness.emit({ type: "content.delta", eventId: asEventId("evt-message-delta-streaming-mode"), @@ -2424,13 +2239,13 @@ describe("ProviderRuntimeIngestion", () => { const liveThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === streamingModeMessageId && + message.id === "assistant:item-streaming-mode" && message.streaming && message.text === "hello live", ), ); const liveMessage = liveThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === streamingModeMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", ); expect(liveMessage?.streaming).toBe(true); @@ -2452,11 +2267,11 @@ describe("ProviderRuntimeIngestion", () => { const finalThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === streamingModeMessageId && !message.streaming, + message.id === "assistant:item-streaming-mode" && !message.streaming, ), ); const finalMessage = finalThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === streamingModeMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", ); expect(finalMessage?.text).toBe("hello live"); expect(finalMessage?.streaming).toBe(false); @@ -2482,8 +2297,6 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === "turn-buffer-spill", ); - const spillMessageId = expectedAssistantMessageId("item-buffer-spill", "turn-buffer-spill"); - harness.emit({ type: "content.delta", eventId: asEventId("evt-message-delta-buffer-spill"), @@ -2514,11 +2327,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === spillMessageId && !message.streaming, + message.id === "assistant:item-buffer-spill" && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === spillMessageId, + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffer-spill", ); expect(message?.text.length).toBe(oversizedText.length); expect(message?.text).toBe(oversizedText); @@ -2545,8 +2358,6 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === "turn-complete-dedup", ); - const dedupMessageId = expectedAssistantMessageId("item-complete-dedup", "turn-complete-dedup"); - harness.emit({ type: "content.delta", eventId: asEventId("evt-message-delta-for-complete-dedup"), @@ -2592,7 +2403,7 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === null && thread.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === dedupMessageId && !message.streaming, + message.id === "assistant:item-complete-dedup" && !message.streaming, ), ); @@ -2605,7 +2416,10 @@ describe("ProviderRuntimeIngestion", () => { if (event.type !== "thread.message-sent") { return false; } - return event.payload.messageId === dedupMessageId && event.payload.streaming === false; + return ( + event.payload.messageId === "assistant:item-complete-dedup" && + event.payload.streaming === false + ); }); expect(completionEvents).toHaveLength(1); }); @@ -2960,9 +2774,7 @@ describe("ProviderRuntimeIngestion", () => { (entry: ProviderRuntimeTestCheckpoint) => entry.turnId === "turn-p1", ); expect(checkpoint?.status).toBe("missing"); - expect(checkpoint?.assistantMessageId).toBe( - expectedAssistantMessageId("item-p1-assistant", "turn-p1"), - ); + expect(checkpoint?.assistantMessageId).toBe("assistant:item-p1-assistant"); expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated"); }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3038d60f8a2..a94c3e1142d 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -197,25 +197,14 @@ function assistantSegmentBaseKeyFromEvent(event: ProviderRuntimeEvent): string { return String(event.itemId ?? event.turnId ?? event.eventId); } -function assistantSegmentMessageId( - baseKey: string, - segmentIndex: number, - turnId?: string | null, -): MessageId { - const turnScope = turnId ? `:turn:${turnId}` : ""; +function assistantSegmentMessageId(baseKey: string, segmentIndex: number): MessageId { return MessageId.make( - segmentIndex === 0 - ? `assistant:${baseKey}${turnScope}` - : `assistant:${baseKey}${turnScope}:segment:${segmentIndex}`, + segmentIndex === 0 ? `assistant:${baseKey}` : `assistant:${baseKey}:segment:${segmentIndex}`, ); } function assistantMessageIdFromEvent(event: ProviderRuntimeEvent): MessageId { - return assistantSegmentMessageId( - assistantSegmentBaseKeyFromEvent(event), - 0, - event.turnId ?? null, - ); + return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(event), 0); } function nextAssistantSegmentIndexFromItemId(itemId: string | undefined): number { @@ -780,15 +769,11 @@ const make = Effect.gen(function* () { onNone: () => ({ baseKey: input.baseKey, nextSegmentIndex: 1, - activeMessageId: assistantSegmentMessageId(input.baseKey, 0, input.turnId), + activeMessageId: assistantSegmentMessageId(input.baseKey, 0), }), onSome: (state) => { const segmentIndex = state.baseKey === input.baseKey ? state.nextSegmentIndex : 0; - const messageId = assistantSegmentMessageId( - input.baseKey, - segmentIndex, - input.turnId, - ); + const messageId = assistantSegmentMessageId(input.baseKey, segmentIndex); return { baseKey: input.baseKey, nextSegmentIndex: state.baseKey === input.baseKey ? state.nextSegmentIndex + 1 : 1, @@ -809,7 +794,7 @@ const make = Effect.gen(function* () { }) => Effect.gen(function* () { if (!input.turnId) { - return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0, null); + return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0); } const activeMessageId = yield* getActiveAssistantMessageIdForTurn( @@ -1517,7 +1502,9 @@ const make = Effect.gen(function* () { const assistantCompletion = event.type === "item.completed" && event.payload.itemType === "assistant_message" ? { - messageId: assistantMessageIdFromEvent(event), + messageId: MessageId.make( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ), fallbackText: event.payload.detail, } : undefined; @@ -1690,7 +1677,9 @@ const make = Effect.gen(function* () { if (hasCheckpointForTurn(checkpointContext.checkpoints, turnId)) { // Already tracked; no-op. } else { - const assistantMessageId = assistantMessageIdFromEvent(event); + const assistantMessageId = MessageId.make( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ); yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", commandId: yield* providerCommandId(event, "thread-turn-diff-complete"),