From 3a401bf5b3d4372bb794ba942c9631451f4b19f5 Mon Sep 17 00:00:00 2001 From: David Mashburn Date: Wed, 1 Jul 2026 22:06:13 -0400 Subject: [PATCH] fix(orchestration): show follow-up assistant replies after Cursor session resume When Cursor resumes a session, ACP reuses assistant segment IDs across turns. Scope assistant message IDs by turn and reset projection state on turn change so follow-up replies appear below the latest user message instead of updating stale segments sorted above it. Co-authored-by: Cursor --- .../Layers/ProjectionPipeline.ts | 35 ++-- .../Layers/ProviderRuntimeIngestion.test.ts | 165 ++++++++++++++---- .../Layers/ProviderRuntimeIngestion.ts | 37 ++-- .../orchestration/assistantMessageIds.test.ts | 22 +++ .../src/orchestration/assistantMessageIds.ts | 22 +++ apps/server/src/orchestration/projector.ts | 42 +++-- .../threadMessageProjection.test.ts | 76 ++++++++ .../orchestration/threadMessageProjection.ts | 56 ++++++ 8 files changed, 381 insertions(+), 74 deletions(-) create mode 100644 apps/server/src/orchestration/assistantMessageIds.test.ts create mode 100644 apps/server/src/orchestration/assistantMessageIds.ts create mode 100644 apps/server/src/orchestration/threadMessageProjection.test.ts create mode 100644 apps/server/src/orchestration/threadMessageProjection.ts diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index f12df850941..a578e921de9 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -54,6 +54,7 @@ import { parseThreadSegmentFromAttachmentId, toSafeThreadAttachmentSegment, } from "../../attachmentStore.ts"; +import { mergeThreadMessageProjection } from "../threadMessageProjection.ts"; export const ORCHESTRATION_PROJECTOR_NAMES = { projects: "projection.projects", @@ -816,18 +817,22 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti messageId: event.payload.messageId, }); const previousMessage = Option.getOrUndefined(existingMessage); - const nextText = Option.match(existingMessage, { - onNone: () => event.payload.text, - onSome: (message) => { - if (event.payload.streaming) { - return `${message.text}${event.payload.text}`; - } - if (event.payload.text.length === 0) { - return message.text; - } - return event.payload.text; + const mergedMessage = mergeThreadMessageProjection( + previousMessage + ? { + text: previousMessage.text, + turnId: previousMessage.turnId, + createdAt: previousMessage.createdAt, + } + : undefined, + { + text: event.payload.text, + turnId: event.payload.turnId, + streaming: event.payload.streaming, + createdAt: event.payload.createdAt, + updatedAt: event.payload.updatedAt, }, - }); + ); const nextAttachments = event.payload.attachments !== undefined ? yield* materializeAttachmentsForProjection({ @@ -837,13 +842,13 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti yield* projectionThreadMessageRepository.upsert({ messageId: event.payload.messageId, threadId: event.payload.threadId, - turnId: event.payload.turnId, + turnId: mergedMessage.turnId, role: event.payload.role, - text: nextText, + text: mergedMessage.text, ...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}), isStreaming: event.payload.streaming, - createdAt: previousMessage?.createdAt ?? event.payload.createdAt, - updatedAt: event.payload.updatedAt, + createdAt: mergedMessage.createdAt, + updatedAt: mergedMessage.updatedAt, }); return; } diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 001ba388949..20a83efae9d 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -47,6 +47,7 @@ import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; +import { assistantSegmentMessageId } from "../assistantMessageIds.ts"; import { ServerConfig } from "../../config.ts"; import { ServerSettingsService } from "../../serverSettings.ts"; import * as NodeServices from "@effect/platform-node/NodeServices"; @@ -167,6 +168,14 @@ type ProviderRuntimeTestProposedPlan = ProviderRuntimeTestThread["proposedPlans" type ProviderRuntimeTestActivity = ProviderRuntimeTestThread["activities"][number]; type ProviderRuntimeTestCheckpoint = ProviderRuntimeTestThread["checkpoints"][number]; +function expectedAssistantMessageId( + turnId: string, + itemId: string, + segmentIndex = 0, +): string { + return String(assistantSegmentMessageId(itemId, segmentIndex, TurnId.make(turnId))); +} + async function waitForThread( readModel: () => Promise, predicate: (thread: ProviderRuntimeTestThread) => boolean, @@ -709,16 +718,108 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-1" && !message.streaming, + message.id === expectedAssistantMessageId("turn-2", "item-1") && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-1", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-2", "item-1"), ); expect(message?.text).toBe("hello world"); expect(message?.streaming).toBe(false); }); + it("creates distinct assistant messages when a resumed session reuses provider segment ids", async () => { + const harness = await createHarness(); + const firstTurnAt = "2026-07-01T21:06:10.000Z"; + const secondTurnAt = "2026-07-02T01:44:30.000Z"; + const sessionItemId = "assistant:session-1:segment:0"; + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-resume-segment-turn-1"), + provider: ProviderDriverKind.make("cursor"), + createdAt: firstTurnAt, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-1"), + itemId: asItemId(sessionItemId), + payload: { + streamKind: "assistant_text", + delta: "first turn reply", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-resume-segment-turn-1-complete"), + provider: ProviderDriverKind.make("cursor"), + createdAt: firstTurnAt, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-1"), + itemId: asItemId(sessionItemId), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + await waitForThread(harness.readModel, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === expectedAssistantMessageId("turn-1", sessionItemId) && + message.text === "first turn reply", + ), + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-resume-segment-turn-2"), + provider: ProviderDriverKind.make("cursor"), + createdAt: secondTurnAt, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-2"), + itemId: asItemId(sessionItemId), + payload: { + streamKind: "assistant_text", + delta: "second turn reply", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-resume-segment-turn-2-complete"), + provider: ProviderDriverKind.make("cursor"), + createdAt: secondTurnAt, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-2"), + itemId: asItemId(sessionItemId), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + const thread = await waitForThread(harness.readModel, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === expectedAssistantMessageId("turn-2", sessionItemId) && + message.text === "second turn reply", + ), + ); + + const firstTurnMessage = thread.messages.find( + (entry: ProviderRuntimeTestMessage) => + entry.id === expectedAssistantMessageId("turn-1", sessionItemId), + ); + const secondTurnMessage = thread.messages.find( + (entry: ProviderRuntimeTestMessage) => + entry.id === expectedAssistantMessageId("turn-2", sessionItemId), + ); + + expect(firstTurnMessage?.text).toBe("first turn reply"); + expect(secondTurnMessage?.text).toBe("second turn reply"); + expect(firstTurnMessage?.id).not.toBe(secondTurnMessage?.id); + expect(firstTurnMessage?.createdAt).toBe(firstTurnAt); + expect(secondTurnMessage?.createdAt).toBe(secondTurnAt); + }); + it("uses assistant item completion detail when no assistant deltas were streamed", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; @@ -741,11 +842,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-no-delta" && !message.streaming, + message.id === expectedAssistantMessageId("turn-no-delta", "item-no-delta") && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-no-delta", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-no-delta", "item-no-delta"), ); expect(message?.text).toBe("assistant-only final text"); expect(message?.streaming).toBe(false); @@ -1615,7 +1716,7 @@ describe("ProviderRuntimeIngestion", () => { const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect( midThread?.messages.some( - (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered", + (message: ProviderRuntimeTestMessage) => message.id === expectedAssistantMessageId("turn-buffered", "item-buffered"), ), ).toBe(false); @@ -1636,11 +1737,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered" && !message.streaming, + message.id === expectedAssistantMessageId("turn-buffered", "item-buffered") && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-buffered", "item-buffered"), ); expect(message?.text).toBe("buffer me"); expect(message?.streaming).toBe(false); @@ -1695,13 +1796,13 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-request-flush" && + message.id === expectedAssistantMessageId("turn-buffered-request-flush", "item-buffered-request-flush") && !message.streaming && message.text === "visible before approval", ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-flush", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-buffered-request-flush", "item-buffered-request-flush"), ); expect(message?.streaming).toBe(false); }); @@ -1761,14 +1862,14 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-user-input-flush" && + message.id === expectedAssistantMessageId("turn-buffered-user-input-flush", "item-buffered-user-input-flush") && !message.streaming && message.text === "visible before user input", ), ); const message = thread.messages.find( (entry: ProviderRuntimeTestMessage) => - entry.id === "assistant:item-buffered-user-input-flush", + entry.id === expectedAssistantMessageId("turn-buffered-user-input-flush", "item-buffered-user-input-flush"), ); expect(message?.streaming).toBe(false); }); @@ -1828,7 +1929,7 @@ describe("ProviderRuntimeIngestion", () => { expect( thread.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-whitespace-request", + message.id === expectedAssistantMessageId("turn-buffered-whitespace-request", "item-buffered-whitespace-request"), ), ).toBe(false); }); @@ -1885,7 +1986,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-request-append" && + message.id === expectedAssistantMessageId("turn-buffered-request-append", "item-buffered-request-append") && !message.streaming && message.text === "first half", ), @@ -1921,17 +2022,17 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered-request-append:segment:1" && + message.id === expectedAssistantMessageId("turn-buffered-request-append", "item-buffered-request-append", 1) && !message.streaming && message.text === " second half", ), ); const firstMessage = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered-request-append", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-buffered-request-append", "item-buffered-request-append"), ); const resumedMessage = thread.messages.find( (entry: ProviderRuntimeTestMessage) => - entry.id === "assistant:item-buffered-request-append:segment:1", + entry.id === expectedAssistantMessageId("turn-buffered-request-append", "item-buffered-request-append", 1), ); expect(firstMessage?.text).toBe("first half"); expect(firstMessage?.streaming).toBe(false); @@ -1946,7 +2047,7 @@ describe("ProviderRuntimeIngestion", () => { const assistantEvents = events.filter( (event): event is Extract<(typeof events)[number], { type: "thread.message-sent" }> => event.type === "thread.message-sent" && - event.payload.messageId.startsWith("assistant:item-buffered-request-append"), + event.payload.messageId.startsWith(expectedAssistantMessageId("turn-buffered-request-append", "item-buffered-request-append")), ); expect(assistantEvents).toHaveLength(4); expect(assistantEvents[0]?.payload.streaming).toBe(true); @@ -1954,12 +2055,12 @@ describe("ProviderRuntimeIngestion", () => { expect(assistantEvents[1]?.payload.streaming).toBe(false); expect(assistantEvents[1]?.payload.text).toBe(""); expect(assistantEvents[2]?.payload.messageId).toBe( - "assistant:item-buffered-request-append:segment:1", + expectedAssistantMessageId("turn-buffered-request-append", "item-buffered-request-append", 1), ); expect(assistantEvents[2]?.payload.streaming).toBe(true); expect(assistantEvents[2]?.payload.text).toBe(" second half"); expect(assistantEvents[3]?.payload.messageId).toBe( - "assistant:item-buffered-request-append:segment:1", + expectedAssistantMessageId("turn-buffered-request-append", "item-buffered-request-append", 1), ); expect(assistantEvents[3]?.payload.streaming).toBe(false); expect(assistantEvents[3]?.payload.text).toBe(""); @@ -2017,7 +2118,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment" && + message.id === expectedAssistantMessageId("turn-streaming-request-segment", "item-streaming-request-segment") && !message.streaming && message.text === "before approval", ), @@ -2053,7 +2154,7 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment:segment:1" && + message.id === expectedAssistantMessageId("turn-streaming-request-segment", "item-streaming-request-segment", 1) && !message.streaming && message.text === " after approval", ), @@ -2061,13 +2162,13 @@ describe("ProviderRuntimeIngestion", () => { expect( thread.messages.find( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment", + message.id === expectedAssistantMessageId("turn-streaming-request-segment", "item-streaming-request-segment"), )?.text, ).toBe("before approval"); expect( thread.messages.find( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-request-segment:segment:1", + message.id === expectedAssistantMessageId("turn-streaming-request-segment", "item-streaming-request-segment", 1), )?.text, ).toBe(" after approval"); }); @@ -2126,13 +2227,13 @@ describe("ProviderRuntimeIngestion", () => { const liveThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-mode" && + message.id === expectedAssistantMessageId("turn-streaming-mode", "item-streaming-mode") && message.streaming && message.text === "hello live", ), ); const liveMessage = liveThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-streaming-mode", "item-streaming-mode"), ); expect(liveMessage?.streaming).toBe(true); @@ -2154,11 +2255,11 @@ describe("ProviderRuntimeIngestion", () => { const finalThread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-mode" && !message.streaming, + message.id === expectedAssistantMessageId("turn-streaming-mode", "item-streaming-mode") && !message.streaming, ), ); const finalMessage = finalThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-streaming-mode", "item-streaming-mode"), ); expect(finalMessage?.text).toBe("hello live"); expect(finalMessage?.streaming).toBe(false); @@ -2214,11 +2315,11 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.readModel, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffer-spill" && !message.streaming, + message.id === expectedAssistantMessageId("turn-buffer-spill", "item-buffer-spill") && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffer-spill", + (entry: ProviderRuntimeTestMessage) => entry.id === expectedAssistantMessageId("turn-buffer-spill", "item-buffer-spill"), ); expect(message?.text.length).toBe(oversizedText.length); expect(message?.text).toBe(oversizedText); @@ -2290,7 +2391,7 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === null && thread.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-complete-dedup" && !message.streaming, + message.id === expectedAssistantMessageId("turn-complete-dedup", "item-complete-dedup") && !message.streaming, ), ); @@ -2304,7 +2405,7 @@ describe("ProviderRuntimeIngestion", () => { return false; } return ( - event.payload.messageId === "assistant:item-complete-dedup" && + event.payload.messageId === expectedAssistantMessageId("turn-complete-dedup", "item-complete-dedup") && event.payload.streaming === false ); }); @@ -2661,7 +2762,7 @@ describe("ProviderRuntimeIngestion", () => { (entry: ProviderRuntimeTestCheckpoint) => entry.turnId === "turn-p1", ); expect(checkpoint?.status).toBe("missing"); - expect(checkpoint?.assistantMessageId).toBe("assistant:item-p1-assistant"); + expect(checkpoint?.assistantMessageId).toBe(expectedAssistantMessageId("turn-p1", "item-p1-assistant")); expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated"); }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3e5978f4846..068d716d42a 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -38,6 +38,10 @@ import { type ProviderRuntimeIngestionShape, } from "../Services/ProviderRuntimeIngestion.ts"; import { ServerSettingsService } from "../../serverSettings.ts"; +import { + assistantSegmentBaseKeyFromRuntimeItem, + assistantSegmentMessageId, +} from "../assistantMessageIds.ts"; const providerTurnKey = (threadId: ThreadId, turnId: TurnId) => `${threadId}:${turnId}`; @@ -194,14 +198,9 @@ function proposedPlanIdFromEvent(event: ProviderRuntimeEvent, threadId: ThreadId } function assistantSegmentBaseKeyFromEvent(event: ProviderRuntimeEvent): string { - return String(event.itemId ?? event.turnId ?? event.eventId); + return assistantSegmentBaseKeyFromRuntimeItem(event.itemId, event.turnId, event.eventId); } -function assistantSegmentMessageId(baseKey: string, segmentIndex: number): MessageId { - return MessageId.make( - segmentIndex === 0 ? `assistant:${baseKey}` : `assistant:${baseKey}:segment:${segmentIndex}`, - ); -} function buildContextWindowActivityPayload( event: ProviderRuntimeEvent, ): ThreadTokenUsageSnapshot | undefined { @@ -756,11 +755,11 @@ const make = Effect.gen(function* () { onNone: () => ({ baseKey: input.baseKey, nextSegmentIndex: 1, - activeMessageId: assistantSegmentMessageId(input.baseKey, 0), + activeMessageId: assistantSegmentMessageId(input.baseKey, 0, input.turnId), }), onSome: (state) => { const segmentIndex = state.baseKey === input.baseKey ? state.nextSegmentIndex : 0; - const messageId = assistantSegmentMessageId(input.baseKey, segmentIndex); + const messageId = assistantSegmentMessageId(input.baseKey, segmentIndex, input.turnId); return { baseKey: input.baseKey, nextSegmentIndex: state.baseKey === input.baseKey ? state.nextSegmentIndex + 1 : 1, @@ -781,7 +780,7 @@ const make = Effect.gen(function* () { }) => Effect.gen(function* () { if (!input.turnId) { - return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0); + return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0, undefined); } const activeMessageId = yield* getActiveAssistantMessageIdForTurn( @@ -1459,8 +1458,14 @@ const make = Effect.gen(function* () { const assistantCompletion = event.type === "item.completed" && event.payload.itemType === "assistant_message" ? { - messageId: MessageId.make( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + messageId: assistantSegmentMessageId( + assistantSegmentBaseKeyFromRuntimeItem( + event.itemId, + event.turnId, + event.eventId, + ), + 0, + toTurnId(event.turnId), ), fallbackText: event.payload.detail, } @@ -1634,8 +1639,14 @@ const make = Effect.gen(function* () { if (hasCheckpointForTurn(checkpointContext.checkpoints, turnId)) { // Already tracked; no-op. } else { - const assistantMessageId = MessageId.make( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + const assistantMessageId = assistantSegmentMessageId( + assistantSegmentBaseKeyFromRuntimeItem( + event.itemId, + event.turnId, + event.eventId, + ), + 0, + turnId, ); yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", diff --git a/apps/server/src/orchestration/assistantMessageIds.test.ts b/apps/server/src/orchestration/assistantMessageIds.test.ts new file mode 100644 index 00000000000..6817fb7d39f --- /dev/null +++ b/apps/server/src/orchestration/assistantMessageIds.test.ts @@ -0,0 +1,22 @@ +import { MessageId, TurnId } from "@t3tools/contracts"; +import { describe, expect, it } from "vitest"; + +import { assistantSegmentMessageId } from "./assistantMessageIds.ts"; + +describe("assistantMessageIds", () => { + it("scopes assistant message ids by turn so resumed sessions do not collide", () => { + const baseKey = "assistant:session-1:segment:0"; + const turnOne = TurnId.make("turn-1"); + const turnTwo = TurnId.make("turn-2"); + + expect(assistantSegmentMessageId(baseKey, 0, turnOne)).toBe( + MessageId.make("assistant:turn-1:assistant:session-1:segment:0"), + ); + expect(assistantSegmentMessageId(baseKey, 0, turnTwo)).toBe( + MessageId.make("assistant:turn-2:assistant:session-1:segment:0"), + ); + expect(assistantSegmentMessageId(baseKey, 0, turnOne)).not.toBe( + assistantSegmentMessageId(baseKey, 0, turnTwo), + ); + }); +}); diff --git a/apps/server/src/orchestration/assistantMessageIds.ts b/apps/server/src/orchestration/assistantMessageIds.ts new file mode 100644 index 00000000000..7457f13ebc9 --- /dev/null +++ b/apps/server/src/orchestration/assistantMessageIds.ts @@ -0,0 +1,22 @@ +import { MessageId, type TurnId } from "@t3tools/contracts"; + +export function assistantSegmentBaseKeyFromRuntimeItem( + itemId: string | undefined, + turnId: string | undefined, + eventId: string, +): string { + return String(itemId ?? turnId ?? eventId); +} + +export function assistantSegmentMessageId( + baseKey: string, + segmentIndex: number, + turnId?: TurnId, +): MessageId { + const scopedBase = turnId ? `${turnId}:${baseKey}` : baseKey; + return MessageId.make( + segmentIndex === 0 + ? `assistant:${scopedBase}` + : `assistant:${scopedBase}:segment:${segmentIndex}`, + ); +} diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index fc6ab8f6fcf..ebb41fe1213 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -9,6 +9,7 @@ import * as Effect from "effect/Effect"; import * as Schema from "effect/Schema"; import { toProjectorDecodeError, type OrchestrationProjectorDecodeError } from "./Errors.ts"; +import { mergeThreadMessageProjection } from "./threadMessageProjection.ts"; import { MessageSentPayloadSchema, ProjectCreatedPayload, @@ -413,20 +414,33 @@ export function projectEvent( const messages = existingMessage ? thread.messages.map((entry) => entry.id === message.id - ? { - ...entry, - text: message.streaming - ? `${entry.text}${message.text}` - : message.text.length > 0 - ? message.text - : entry.text, - streaming: message.streaming, - updatedAt: message.updatedAt, - turnId: message.turnId, - ...(message.attachments !== undefined - ? { attachments: message.attachments } - : {}), - } + ? (() => { + const mergedMessage = mergeThreadMessageProjection( + { + text: entry.text, + turnId: entry.turnId, + createdAt: entry.createdAt, + }, + { + text: message.text, + turnId: message.turnId, + streaming: message.streaming, + createdAt: message.createdAt, + updatedAt: message.updatedAt, + }, + ); + return { + ...entry, + text: mergedMessage.text, + streaming: message.streaming, + createdAt: mergedMessage.createdAt, + updatedAt: mergedMessage.updatedAt, + turnId: mergedMessage.turnId, + ...(message.attachments !== undefined + ? { attachments: message.attachments } + : {}), + }; + })() : entry, ) : [...thread.messages, message]; diff --git a/apps/server/src/orchestration/threadMessageProjection.test.ts b/apps/server/src/orchestration/threadMessageProjection.test.ts new file mode 100644 index 00000000000..7d98f17d52d --- /dev/null +++ b/apps/server/src/orchestration/threadMessageProjection.test.ts @@ -0,0 +1,76 @@ +import { TurnId } from "@t3tools/contracts"; +import { describe, expect, it } from "vitest"; + +import { assistantMessageTurnChanged, mergeThreadMessageProjection } from "./threadMessageProjection.ts"; + +describe("threadMessageProjection", () => { + it("detects turn changes on reused assistant message ids", () => { + expect( + assistantMessageTurnChanged( + { text: "old", turnId: TurnId.make("turn-1"), createdAt: "2026-07-01T21:00:00.000Z" }, + TurnId.make("turn-2"), + ), + ).toBe(true); + }); + + it("resets text and createdAt when a reused message id starts a new turn", () => { + const merged = mergeThreadMessageProjection( + { + text: "first turn reply", + turnId: TurnId.make("turn-1"), + createdAt: "2026-07-01T21:00:00.000Z", + }, + { + text: "second turn reply", + turnId: TurnId.make("turn-2"), + streaming: false, + createdAt: "2026-07-02T01:44:30.000Z", + updatedAt: "2026-07-02T01:44:30.000Z", + }, + ); + + expect(merged.text).toBe("second turn reply"); + expect(merged.createdAt).toBe("2026-07-02T01:44:30.000Z"); + expect(merged.turnId).toBe(TurnId.make("turn-2")); + }); + + it("appends streaming deltas within the same turn", () => { + const merged = mergeThreadMessageProjection( + { + text: "hello", + turnId: TurnId.make("turn-1"), + createdAt: "2026-07-01T21:00:00.000Z", + }, + { + text: " world", + turnId: TurnId.make("turn-1"), + streaming: true, + createdAt: "2026-07-01T21:00:01.000Z", + updatedAt: "2026-07-01T21:00:01.000Z", + }, + ); + + expect(merged.text).toBe("hello world"); + expect(merged.createdAt).toBe("2026-07-01T21:00:00.000Z"); + }); + + it("starts fresh streaming text when the turn changes", () => { + const merged = mergeThreadMessageProjection( + { + text: "first turn reply", + turnId: TurnId.make("turn-1"), + createdAt: "2026-07-01T21:00:00.000Z", + }, + { + text: "Checking", + turnId: TurnId.make("turn-2"), + streaming: true, + createdAt: "2026-07-02T01:44:30.000Z", + updatedAt: "2026-07-02T01:44:30.000Z", + }, + ); + + expect(merged.text).toBe("Checking"); + expect(merged.createdAt).toBe("2026-07-02T01:44:30.000Z"); + }); +}); diff --git a/apps/server/src/orchestration/threadMessageProjection.ts b/apps/server/src/orchestration/threadMessageProjection.ts new file mode 100644 index 00000000000..d1bb882482f --- /dev/null +++ b/apps/server/src/orchestration/threadMessageProjection.ts @@ -0,0 +1,56 @@ +import type { TurnId } from "@t3tools/contracts"; + +export interface ThreadMessageProjectionSlice { + readonly text: string; + readonly turnId: TurnId | null; + readonly createdAt: string; +} + +export interface ThreadMessageProjectionIncoming { + readonly text: string; + readonly turnId: TurnId | null; + readonly streaming: boolean; + readonly createdAt: string; + readonly updatedAt: string; +} + +export function assistantMessageTurnChanged( + previousMessage: ThreadMessageProjectionSlice | undefined, + nextTurnId: TurnId | null, +): boolean { + if (!previousMessage) { + return false; + } + return previousMessage.turnId !== nextTurnId; +} + +export function mergeThreadMessageProjection( + previousMessage: ThreadMessageProjectionSlice | undefined, + incoming: ThreadMessageProjectionIncoming, +): ThreadMessageProjectionSlice & { readonly updatedAt: string } { + const turnChanged = assistantMessageTurnChanged(previousMessage, incoming.turnId); + + const nextText = (() => { + if (!previousMessage || turnChanged) { + if (incoming.streaming || incoming.text.length > 0) { + return incoming.text; + } + return turnChanged ? "" : previousMessage.text; + } + + if (incoming.streaming) { + return `${previousMessage.text}${incoming.text}`; + } + if (incoming.text.length === 0) { + return previousMessage.text; + } + return incoming.text; + })(); + + return { + text: nextText, + turnId: incoming.turnId, + createdAt: turnChanged ? incoming.createdAt : (previousMessage?.createdAt ?? incoming.createdAt), + updatedAt: incoming.updatedAt, + }; +}