Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High

When the first thread.message-sent event for an attachments-only message arrives (empty text, streaming: false, no existing row), mergeThreadMessageProjection evaluates previousMessage.text on the !previousMessage branch because the condition incoming.streaming || incoming.text.length > 0 is false, so nextText falls through to turnChanged ? "" : previousMessage.text, which dereferences undefined. This throws during projection instead of persisting the message. The guard should also cover the case where there is no previous message and the incoming text is empty.

🤖 Copy this AI Prompt to have your agent fix this:
In file @apps/server/src/orchestration/Layers/ProjectionPipeline.ts around line 27:

When the first `thread.message-sent` event for an attachments-only message arrives (empty text, `streaming: false`, no existing row), `mergeThreadMessageProjection` evaluates `previousMessage.text` on the `!previousMessage` branch because the condition `incoming.streaming || incoming.text.length > 0` is false, so `nextText` falls through to `turnChanged ? "" : previousMessage.text`, which dereferences `undefined`. This throws during projection instead of persisting the message. The guard should also cover the case where there is no previous message and the incoming text is empty.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
parseThreadSegmentFromAttachmentId,
toSafeThreadAttachmentSegment,
} from "../../attachmentStore.ts";
import { mergeThreadMessageProjection } from "../threadMessageProjection.ts";

export const ORCHESTRATION_PROJECTOR_NAMES = {
projects: "projection.projects",
Expand Down Expand Up @@ -816,18 +817,22 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
messageId: event.payload.messageId,
});
const previousMessage = Option.getOrUndefined(existingMessage);
const nextText = Option.match(existingMessage, {
onNone: () => event.payload.text,
onSome: (message) => {
if (event.payload.streaming) {
return `${message.text}${event.payload.text}`;
}
if (event.payload.text.length === 0) {
return message.text;
}
return event.payload.text;
const mergedMessage = mergeThreadMessageProjection(
previousMessage
? {
text: previousMessage.text,
turnId: previousMessage.turnId,
createdAt: previousMessage.createdAt,
}
: undefined,
{
text: event.payload.text,
turnId: event.payload.turnId,
streaming: event.payload.streaming,
createdAt: event.payload.createdAt,
updatedAt: event.payload.updatedAt,
},
});
);
const nextAttachments =
event.payload.attachments !== undefined
? yield* materializeAttachmentsForProjection({
Expand All @@ -837,13 +842,13 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
yield* projectionThreadMessageRepository.upsert({
messageId: event.payload.messageId,
threadId: event.payload.threadId,
turnId: event.payload.turnId,
turnId: mergedMessage.turnId,
role: event.payload.role,
text: nextText,
text: mergedMessage.text,
...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
isStreaming: event.payload.streaming,
createdAt: previousMessage?.createdAt ?? event.payload.createdAt,
updatedAt: event.payload.updatedAt,
createdAt: mergedMessage.createdAt,
updatedAt: mergedMessage.updatedAt,
});
return;
}
Expand Down
165 changes: 133 additions & 32 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Large diffs are not rendered by default.

37 changes: 24 additions & 13 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import {
type ProviderRuntimeIngestionShape,
} from "../Services/ProviderRuntimeIngestion.ts";
import { ServerSettingsService } from "../../serverSettings.ts";
import {
assistantSegmentBaseKeyFromRuntimeItem,
assistantSegmentMessageId,
} from "../assistantMessageIds.ts";

const providerTurnKey = (threadId: ThreadId, turnId: TurnId) => `${threadId}:${turnId}`;

Expand Down Expand Up @@ -194,14 +198,9 @@ function proposedPlanIdFromEvent(event: ProviderRuntimeEvent, threadId: ThreadId
}

function assistantSegmentBaseKeyFromEvent(event: ProviderRuntimeEvent): string {
return String(event.itemId ?? event.turnId ?? event.eventId);
return assistantSegmentBaseKeyFromRuntimeItem(event.itemId, event.turnId, event.eventId);
}

function assistantSegmentMessageId(baseKey: string, segmentIndex: number): MessageId {
return MessageId.make(
segmentIndex === 0 ? `assistant:${baseKey}` : `assistant:${baseKey}:segment:${segmentIndex}`,
);
}
function buildContextWindowActivityPayload(
event: ProviderRuntimeEvent,
): ThreadTokenUsageSnapshot | undefined {
Expand Down Expand Up @@ -756,11 +755,11 @@ const make = Effect.gen(function* () {
onNone: () => ({
baseKey: input.baseKey,
nextSegmentIndex: 1,
activeMessageId: assistantSegmentMessageId(input.baseKey, 0),
activeMessageId: assistantSegmentMessageId(input.baseKey, 0, input.turnId),
}),
onSome: (state) => {
const segmentIndex = state.baseKey === input.baseKey ? state.nextSegmentIndex : 0;
const messageId = assistantSegmentMessageId(input.baseKey, segmentIndex);
const messageId = assistantSegmentMessageId(input.baseKey, segmentIndex, input.turnId);
return {
baseKey: input.baseKey,
nextSegmentIndex: state.baseKey === input.baseKey ? state.nextSegmentIndex + 1 : 1,
Expand All @@ -781,7 +780,7 @@ const make = Effect.gen(function* () {
}) =>
Effect.gen(function* () {
if (!input.turnId) {
return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0);
return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(input.event), 0, undefined);
}

const activeMessageId = yield* getActiveAssistantMessageIdForTurn(
Expand Down Expand Up @@ -1459,8 +1458,14 @@ const make = Effect.gen(function* () {
const assistantCompletion =
event.type === "item.completed" && event.payload.itemType === "assistant_message"
? {
messageId: MessageId.make(
`assistant:${event.itemId ?? event.turnId ?? event.eventId}`,
messageId: assistantSegmentMessageId(
assistantSegmentBaseKeyFromRuntimeItem(
event.itemId,
event.turnId,
event.eventId,
),
0,
toTurnId(event.turnId),
),
fallbackText: event.payload.detail,
}
Expand Down Expand Up @@ -1634,8 +1639,14 @@ const make = Effect.gen(function* () {
if (hasCheckpointForTurn(checkpointContext.checkpoints, turnId)) {
// Already tracked; no-op.
} else {
const assistantMessageId = MessageId.make(
`assistant:${event.itemId ?? event.turnId ?? event.eventId}`,
const assistantMessageId = assistantSegmentMessageId(
assistantSegmentBaseKeyFromRuntimeItem(
event.itemId,
event.turnId,
event.eventId,
),
0,
turnId,
);
yield* orchestrationEngine.dispatch({
type: "thread.turn.diff.complete",
Expand Down
22 changes: 22 additions & 0 deletions apps/server/src/orchestration/assistantMessageIds.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { MessageId, TurnId } from "@t3tools/contracts";
import { describe, expect, it } from "vitest";

import { assistantSegmentMessageId } from "./assistantMessageIds.ts";

describe("assistantMessageIds", () => {
it("scopes assistant message ids by turn so resumed sessions do not collide", () => {
const baseKey = "assistant:session-1:segment:0";
const turnOne = TurnId.make("turn-1");
const turnTwo = TurnId.make("turn-2");

expect(assistantSegmentMessageId(baseKey, 0, turnOne)).toBe(
MessageId.make("assistant:turn-1:assistant:session-1:segment:0"),
);
expect(assistantSegmentMessageId(baseKey, 0, turnTwo)).toBe(
MessageId.make("assistant:turn-2:assistant:session-1:segment:0"),
);
expect(assistantSegmentMessageId(baseKey, 0, turnOne)).not.toBe(
assistantSegmentMessageId(baseKey, 0, turnTwo),
);
});
});
22 changes: 22 additions & 0 deletions apps/server/src/orchestration/assistantMessageIds.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { MessageId, type TurnId } from "@t3tools/contracts";

export function assistantSegmentBaseKeyFromRuntimeItem(
itemId: string | undefined,
turnId: string | undefined,
eventId: string,
): string {
return String(itemId ?? turnId ?? eventId);
}

export function assistantSegmentMessageId(
baseKey: string,
segmentIndex: number,
turnId?: TurnId,
): MessageId {
const scopedBase = turnId ? `${turnId}:${baseKey}` : baseKey;
return MessageId.make(
segmentIndex === 0
? `assistant:${scopedBase}`
: `assistant:${scopedBase}:segment:${segmentIndex}`,
);
}
42 changes: 28 additions & 14 deletions apps/server/src/orchestration/projector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as Effect from "effect/Effect";
import * as Schema from "effect/Schema";

import { toProjectorDecodeError, type OrchestrationProjectorDecodeError } from "./Errors.ts";
import { mergeThreadMessageProjection } from "./threadMessageProjection.ts";
import {
MessageSentPayloadSchema,
ProjectCreatedPayload,
Expand Down Expand Up @@ -413,20 +414,33 @@ export function projectEvent(
const messages = existingMessage
? thread.messages.map((entry) =>
entry.id === message.id
? {
...entry,
text: message.streaming
? `${entry.text}${message.text}`
: message.text.length > 0
? message.text
: entry.text,
streaming: message.streaming,
updatedAt: message.updatedAt,
turnId: message.turnId,
...(message.attachments !== undefined
? { attachments: message.attachments }
: {}),
}
? (() => {
const mergedMessage = mergeThreadMessageProjection(
{
text: entry.text,
turnId: entry.turnId,
createdAt: entry.createdAt,
},
{
text: message.text,
turnId: message.turnId,
streaming: message.streaming,
createdAt: message.createdAt,
updatedAt: message.updatedAt,
},
);
return {
...entry,
text: mergedMessage.text,
streaming: message.streaming,
createdAt: mergedMessage.createdAt,
updatedAt: mergedMessage.updatedAt,
turnId: mergedMessage.turnId,
...(message.attachments !== undefined
? { attachments: message.attachments }
: {}),
};
})()
: entry,
)
: [...thread.messages, message];
Expand Down
76 changes: 76 additions & 0 deletions apps/server/src/orchestration/threadMessageProjection.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { TurnId } from "@t3tools/contracts";
import { describe, expect, it } from "vitest";

import { assistantMessageTurnChanged, mergeThreadMessageProjection } from "./threadMessageProjection.ts";

describe("threadMessageProjection", () => {
it("detects turn changes on reused assistant message ids", () => {
expect(
assistantMessageTurnChanged(
{ text: "old", turnId: TurnId.make("turn-1"), createdAt: "2026-07-01T21:00:00.000Z" },
TurnId.make("turn-2"),
),
).toBe(true);
});

it("resets text and createdAt when a reused message id starts a new turn", () => {
const merged = mergeThreadMessageProjection(
{
text: "first turn reply",
turnId: TurnId.make("turn-1"),
createdAt: "2026-07-01T21:00:00.000Z",
},
{
text: "second turn reply",
turnId: TurnId.make("turn-2"),
streaming: false,
createdAt: "2026-07-02T01:44:30.000Z",
updatedAt: "2026-07-02T01:44:30.000Z",
},
);

expect(merged.text).toBe("second turn reply");
expect(merged.createdAt).toBe("2026-07-02T01:44:30.000Z");
expect(merged.turnId).toBe(TurnId.make("turn-2"));
});

it("appends streaming deltas within the same turn", () => {
const merged = mergeThreadMessageProjection(
{
text: "hello",
turnId: TurnId.make("turn-1"),
createdAt: "2026-07-01T21:00:00.000Z",
},
{
text: " world",
turnId: TurnId.make("turn-1"),
streaming: true,
createdAt: "2026-07-01T21:00:01.000Z",
updatedAt: "2026-07-01T21:00:01.000Z",
},
);

expect(merged.text).toBe("hello world");
expect(merged.createdAt).toBe("2026-07-01T21:00:00.000Z");
});

it("starts fresh streaming text when the turn changes", () => {
const merged = mergeThreadMessageProjection(
{
text: "first turn reply",
turnId: TurnId.make("turn-1"),
createdAt: "2026-07-01T21:00:00.000Z",
},
{
text: "Checking",
turnId: TurnId.make("turn-2"),
streaming: true,
createdAt: "2026-07-02T01:44:30.000Z",
updatedAt: "2026-07-02T01:44:30.000Z",
},
);

expect(merged.text).toBe("Checking");
expect(merged.createdAt).toBe("2026-07-02T01:44:30.000Z");
});
});
56 changes: 56 additions & 0 deletions apps/server/src/orchestration/threadMessageProjection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import type { TurnId } from "@t3tools/contracts";

export interface ThreadMessageProjectionSlice {
readonly text: string;
readonly turnId: TurnId | null;
readonly createdAt: string;
}

export interface ThreadMessageProjectionIncoming {
readonly text: string;
readonly turnId: TurnId | null;
readonly streaming: boolean;
readonly createdAt: string;
readonly updatedAt: string;
}

export function assistantMessageTurnChanged(
previousMessage: ThreadMessageProjectionSlice | undefined,
nextTurnId: TurnId | null,
): boolean {
if (!previousMessage) {
return false;
}
return previousMessage.turnId !== nextTurnId;
}

export function mergeThreadMessageProjection(
previousMessage: ThreadMessageProjectionSlice | undefined,
incoming: ThreadMessageProjectionIncoming,
): ThreadMessageProjectionSlice & { readonly updatedAt: string } {
const turnChanged = assistantMessageTurnChanged(previousMessage, incoming.turnId);

const nextText = (() => {
if (!previousMessage || turnChanged) {
if (incoming.streaming || incoming.text.length > 0) {
return incoming.text;
}
return turnChanged ? "" : previousMessage.text;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge crashes first empty upsert

Medium Severity

When mergeThreadMessageProjection processes a first message that is non-streaming and has empty text, it attempts to read previousMessage.text even though previousMessage is undefined. This causes a runtime error during DB projection upserts, where it should instead return the empty incoming text.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 3a401bf. Configure here.

}

if (incoming.streaming) {
return `${previousMessage.text}${incoming.text}`;
}
if (incoming.text.length === 0) {
return previousMessage.text;
}
return incoming.text;
})();

return {
text: nextText,
turnId: incoming.turnId,
createdAt: turnChanged ? incoming.createdAt : (previousMessage?.createdAt ?? incoming.createdAt),
updatedAt: incoming.updatedAt,
};
}
Loading