Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions apps/server/src/provider/Layers/CursorAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => {
assert.isDefined(delta);
if (delta?.type === "content.delta") {
assert.equal(delta.payload.delta, "hello from mock");
assert.match(String(delta.itemId), /^assistant:mock-session-1:segment:0$/);
assert.match(
String(delta.itemId),
/^assistant:mock-session-1:run:[0-9a-f-]{36}:segment:0$/,
);
}

const assistantCompleted = runtimeEvents.find(
Expand Down Expand Up @@ -588,7 +591,10 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => {
if (contentDelta?.type === "content.delta") {
assert.equal(String(contentDelta.turnId), String(turn.turnId));
assert.equal(contentDelta.payload.delta, "hello from mock");
assert.equal(String(contentDelta.itemId), "assistant:mock-session-1:segment:0");
assert.match(
String(contentDelta.itemId),
/^assistant:mock-session-1:run:[0-9a-f-]{36}:segment:0$/,
);
}
});

Expand Down
42 changes: 42 additions & 0 deletions apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,48 @@ describe("AcpSessionRuntime", () => {
),
);

it.effect(
"namespaces assistant itemIds per runtime instance so reloaded sessions don't collide",
() =>
Effect.gen(function* () {
const runOnce = Effect.gen(function* () {
const runtime = yield* AcpSessionRuntime;
yield* runtime.start();
yield* runtime.prompt({ prompt: [{ type: "text", text: "hi" }] });
const notes = Array.from(yield* Stream.runCollect(Stream.take(runtime.getEvents(), 4)));
const delta = notes.find((note) => note._tag === "ContentDelta");
if (delta?._tag !== "ContentDelta") {
throw new Error("expected ContentDelta event");
}
return delta.itemId;
}).pipe(
Effect.provide(
AcpSessionRuntime.layer({
spawn: { command: bunExe, args: [mockAgentPath] },
cwd: process.cwd(),
clientInfo: { name: "t3-test", version: "0.0.0" },
authMethodId: "test",
}),
),
Effect.scoped,
);

const firstItemId = yield* runOnce;
const secondItemId = yield* runOnce;

// Both runtimes loaded the same mock ACP `sessionId`; without per-runtime
// namespacing the synthesized itemIds collide and the orchestrator
// collapses today's assistant message onto yesterday's.
expect(firstItemId).not.toBe(secondItemId);
expect(firstItemId).toMatch(
/^assistant:mock-session-1:run:[0-9a-f-]{36}:segment:0$/,
);
expect(secondItemId).toMatch(
/^assistant:mock-session-1:run:[0-9a-f-]{36}:segment:0$/,
);
}).pipe(Effect.provide(NodeServices.layer)),
);

it.effect("suppresses generic placeholder tool updates until completion", () =>
Effect.gen(function* () {
const runtime = yield* AcpSessionRuntime;
Expand Down
24 changes: 21 additions & 3 deletions apps/server/src/provider/acp/AcpSessionRuntime.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { randomUUID } from "node:crypto";

import * as Cause from "effect/Cause";
import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";
Expand Down Expand Up @@ -165,6 +167,7 @@ const makeAcpSessionRuntime = (
const assistantSegmentRef = yield* Ref.make<AcpAssistantSegmentState>({ nextSegmentIndex: 0 });
const configOptionsRef = yield* Ref.make(sessionConfigOptionsFromSetup(undefined));
const startStateRef = yield* Ref.make<AcpStartState>({ _tag: "NotStarted" });
const runtimeInstanceId = randomUUID();

const logRequest = (event: AcpSessionRequestLogEvent) =>
options.requestLogger ? options.requestLogger(event) : Effect.void;
Expand Down Expand Up @@ -237,6 +240,7 @@ const makeAcpSessionRuntime = (
toolCallsRef,
assistantSegmentRef,
params: notification,
runtimeInstanceId,
}),
);

Expand Down Expand Up @@ -582,12 +586,14 @@ const handleSessionUpdate = ({
toolCallsRef,
assistantSegmentRef,
params,
runtimeInstanceId,
}: {
readonly queue: Queue.Queue<AcpParsedSessionEvent>;
readonly modeStateRef: Ref.Ref<AcpSessionModeState | undefined>;
readonly toolCallsRef: Ref.Ref<Map<string, AcpToolCallState>>;
readonly assistantSegmentRef: Ref.Ref<AcpAssistantSegmentState>;
readonly params: EffectAcpSchema.SessionNotification;
readonly runtimeInstanceId: string;
}): Effect.Effect<void> =>
Effect.gen(function* () {
const parsed = parseSessionUpdateEvent(params);
Expand Down Expand Up @@ -634,6 +640,7 @@ const handleSessionUpdate = ({
queue,
assistantSegmentRef,
sessionId: params.sessionId,
runtimeInstanceId,
});
yield* Queue.offer(queue, {
...event,
Expand Down Expand Up @@ -671,25 +678,36 @@ function shouldEmitToolCallUpdate(
return previous === undefined || previous.title !== next.title || previous.detail !== next.detail;
}

const assistantItemId = (sessionId: string, segmentIndex: number) =>
`assistant:${sessionId}:segment:${segmentIndex}`;
// `runtimeInstanceId` is minted per AcpSessionRuntime construction so that
// itemIds remain unique across runtime restarts that reuse the same ACP
// `sessionId` (e.g. Cursor's `session/load`). Without it, segment counters
// reset to 0 on reload while the cursor session id stays the same, and the
// orchestrator collapses the new assistant message onto the prior turn's
// message of the same id — making the agent's response invisible to clients.
const assistantItemId = (
sessionId: string,
runtimeInstanceId: string,
segmentIndex: number,
) => `assistant:${sessionId}:run:${runtimeInstanceId}:segment:${segmentIndex}`;

const ensureActiveAssistantSegment = ({
queue,
assistantSegmentRef,
sessionId,
runtimeInstanceId,
}: {
readonly queue: Queue.Queue<AcpParsedSessionEvent>;
readonly assistantSegmentRef: Ref.Ref<AcpAssistantSegmentState>;
readonly sessionId: string;
readonly runtimeInstanceId: string;
}) =>
Ref.modify<AcpAssistantSegmentState, EnsureActiveAssistantSegmentResult>(
assistantSegmentRef,
(current) => {
if (current.activeItemId) {
return [{ itemId: current.activeItemId }, current] as const;
}
const itemId = assistantItemId(sessionId, current.nextSegmentIndex);
const itemId = assistantItemId(sessionId, runtimeInstanceId, current.nextSegmentIndex);
return [
{
itemId,
Expand Down