Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions apps/server/scripts/acp-mock-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const exitLogPath = process.env.T3_ACP_EXIT_LOG_PATH;
const emitToolCalls = process.env.T3_ACP_EMIT_TOOL_CALLS === "1";
const emitInterleavedAssistantToolCalls =
process.env.T3_ACP_EMIT_INTERLEAVED_ASSISTANT_TOOL_CALLS === "1";
const emitThoughtChunks = process.env.T3_ACP_EMIT_THOUGHT_CHUNKS === "1";
const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHOLDERS === "1";
const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1";
const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1";
Expand Down Expand Up @@ -580,6 +581,34 @@ const program = Effect.gen(function* () {
return yield* Effect.never;
}

if (emitThoughtChunks) {
yield* agent.client.sessionUpdate({
sessionId: requestedSessionId,
update: {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "thinking about " },
},
});

yield* agent.client.sessionUpdate({
sessionId: requestedSessionId,
update: {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "the answer" },
},
});

yield* agent.client.sessionUpdate({
sessionId: requestedSessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "final answer" },
},
});

return { stopReason: "end_turn" };
}

if (emitInterleavedAssistantToolCalls) {
const toolCallId = "tool-call-1";

Expand Down
35 changes: 35 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,19 @@ function truncateDetail(value: string, limit = 180): string {
return value.length > limit ? `${value.slice(0, limit - 3)}...` : value;
}

/** Full reasoning text is kept expandable in the work log but capped so a
* single thought segment cannot bloat the persisted activity payload. */
const MAX_REASONING_ACTIVITY_CHARS = 8_000;

function reasoningSummaryFromText(text: string): string {
const firstLine = text
.split("\n")
.find((line) => line.trim().length > 0)
?.trim();
const cleaned = firstLine?.replaceAll("**", "").trim();
return truncateDetail(cleaned && cleaned.length > 0 ? cleaned : "Thinking", 120);
}

function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string | undefined {
const trimmed = planMarkdown?.trim();
if (!trimmed) {
Expand Down Expand Up @@ -578,6 +591,28 @@ function runtimeEventToActivities(
}

case "item.completed": {
// Reasoning segments (e.g. Cursor agent_thought_chunk) become expandable
// "thinking" rows in the work log instead of disappearing entirely.
if (event.payload.itemType === "reasoning") {
const reasoningText = event.payload.detail?.trim();
if (!reasoningText) {
return [];
}
return [
{
id: event.eventId,
createdAt: event.createdAt,
tone: "info",
kind: "reasoning",
summary: reasoningSummaryFromText(reasoningText),
payload: {
detail: truncateDetail(reasoningText, MAX_REASONING_ACTIVITY_CHARS),
},
turnId: toTurnId(event.turnId) ?? null,
...maybeSequence,
},
];
}
if (!isToolLifecycleItemType(event.payload.itemType)) {
return [];
}
Expand Down
28 changes: 25 additions & 3 deletions apps/server/src/provider/Layers/CursorAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => {
assert.isDefined(delta);
if (delta?.type === "content.delta") {
assert.equal(delta.payload.delta, "hello from mock");
assert.match(String(delta.itemId), /^assistant:mock-session-1:segment:0$/);
// The middle part is a per-runtime tag that keeps segment ids unique
// across restarts that resume the same ACP session.
assert.match(String(delta.itemId), /^assistant:mock-session-1:[^:]+:segment:0$/);
}

const assistantCompleted = runtimeEvents.find(
Expand Down Expand Up @@ -687,7 +689,7 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => {
if (contentDelta?.type === "content.delta") {
assert.equal(String(contentDelta.turnId), String(turn.turnId));
assert.equal(contentDelta.payload.delta, "hello from mock");
assert.equal(String(contentDelta.itemId), "assistant:mock-session-1:segment:0");
assert.match(String(contentDelta.itemId), /^assistant:mock-session-1:[^:]+:segment:0$/);
}
});

Expand Down Expand Up @@ -1044,14 +1046,23 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => {
const serverSettings = yield* ServerSettingsService;
const threadId = ThreadId.make("cursor-stop-pending-approval");
const approvalRequested = yield* Deferred.make<void>();
const sessionExited = yield* Deferred.make<void>();
const observedEvents: Array<ProviderRuntimeEvent> = [];

const wrapperPath = yield* Effect.promise(() =>
makeMockAgentWrapper({ T3_ACP_EMIT_TOOL_CALLS: "1" }),
);
yield* serverSettings.updateSettings({ providers: { cursor: { binaryPath: wrapperPath } } });

yield* Stream.runForEach(adapter.streamEvents, (event) => {
if (String(event.threadId) !== String(threadId) || event.type !== "request.opened") {
if (String(event.threadId) !== String(threadId)) {
return Effect.void;
}
observedEvents.push(event);
if (event.type === "session.exited") {
return Deferred.succeed(sessionExited, undefined).pipe(Effect.ignore);
}
if (event.type !== "request.opened") {
return Effect.void;
}
return Deferred.succeed(approvalRequested, undefined).pipe(Effect.ignore);
Expand All @@ -1078,6 +1089,17 @@ cursorAdapterTestLayer("CursorAdapterLive", (it) => {
yield* Fiber.await(sendTurnFiber);

assert.equal(yield* adapter.hasSession(threadId), false);

// Teardown interrupts the notification fiber, which settles sendTurn's
// drain race. sendTurn must then bail out: emitting turn.completed after
// session.exited would publish out-of-order events for a removed session.
// sendTurn already finished above, so after session.exited arrives we
// only need to let the stream consumer drain any remaining events.
yield* Deferred.await(sessionExited);
for (let i = 0; i < 10; i += 1) {
yield* Effect.yieldNow;
}
assert.isFalse(observedEvents.some((event) => event.type === "turn.completed"));
}),
);

Expand Down
32 changes: 32 additions & 0 deletions apps/server/src/provider/Layers/CursorAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ export function makeCursorAdapter(
turnId: ctx.activeTurnId,
itemId: event.itemId,
lifecycle: "item.started",
channel: event.channel,
}),
);
return;
Expand All @@ -811,6 +812,8 @@ export function makeCursorAdapter(
turnId: ctx.activeTurnId,
itemId: event.itemId,
lifecycle: "item.completed",
channel: event.channel,
...(event.text !== undefined ? { detail: event.text } : {}),
}),
);
return;
Expand Down Expand Up @@ -861,6 +864,7 @@ export function makeCursorAdapter(
threadId: ctx.threadId,
turnId: ctx.activeTurnId,
...(event.itemId ? { itemId: event.itemId } : {}),
channel: event.channel,
text: event.text,
rawPayload: event.rawPayload,
}),
Expand Down Expand Up @@ -1013,6 +1017,34 @@ export function makeCursorAdapter(
),
);

// The prompt RPC can resolve while session/update notifications are
// still queued. Wait until the notification fiber has published all
// of them so content deltas and item completions reach consumers
// before turn.completed (otherwise buffered assistant text may be
// finalized against the wrong turn state). Racing against the
// notification fiber keeps this from hanging when the session is
// torn down mid-turn and nobody can acknowledge the drain barrier.
yield* ctx.notificationFiber
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
? Effect.raceFirst(
ctx.acp.drainEvents,
Fiber.await(ctx.notificationFiber).pipe(Effect.asVoid),
)
: Effect.void;

// The notification fiber also terminates when stopSessionInternal
// interrupts it during teardown, which settles the race without the
// drain having happened. At that point the session is already gone
// (session.exited emitted, ctx removed from the map), so mutating
// turn state or emitting turn.completed here would produce
// out-of-order events against a removed session. Bail out instead.
if (ctx.stopped) {
return {
threadId: input.threadId,
turnId,
resumeCursor: ctx.session.resumeCursor,
};
}

const turnRecord = ctx.turns.find((turn) => turn.id === turnId);
if (turnRecord) {
turnRecord.items.push({ prompt: promptParts, result });
Expand Down
4 changes: 4 additions & 0 deletions apps/server/src/provider/Layers/GrokAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
turnId: notificationTurnId,
itemId: event.itemId,
lifecycle: "item.started",
channel: event.channel,
}),
);
return;
Expand All @@ -830,6 +831,8 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
turnId: notificationTurnId,
itemId: event.itemId,
lifecycle: "item.completed",
channel: event.channel,
...(event.text !== undefined ? { detail: event.text } : {}),
}),
);
return;
Expand Down Expand Up @@ -863,6 +866,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
threadId: ctx.threadId,
turnId: notificationTurnId,
...(event.itemId ? { itemId: event.itemId } : {}),
channel: event.channel,
text: event.text,
rawPayload: event.rawPayload,
}),
Expand Down
44 changes: 44 additions & 0 deletions apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,48 @@ describe("AcpCoreRuntimeEvents", () => {
},
});
});

it("maps thought-channel segments to reasoning items and reasoning_text deltas", () => {
const stamp = { eventId: "event-1" as never, createdAt: "2026-03-27T00:00:00.000Z" };
const turnId = TurnId.make("turn-1");

expect(
makeAcpContentDeltaEvent({
stamp,
provider: ProviderDriverKind.make("cursor"),
threadId: "thread-1" as never,
turnId,
itemId: "thought:session-1:tag:segment:0",
channel: "thought",
text: "Checking the failing test first.",
rawPayload: { sessionId: "session-1" },
}),
).toMatchObject({
type: "content.delta",
payload: {
streamKind: "reasoning_text",
delta: "Checking the failing test first.",
},
});

expect(
makeAcpAssistantItemEvent({
stamp,
provider: ProviderDriverKind.make("cursor"),
threadId: "thread-1" as never,
turnId,
itemId: "thought:session-1:tag:segment:0",
lifecycle: "item.completed",
channel: "thought",
detail: "Checking the failing test first.",
}),
).toMatchObject({
type: "item.completed",
payload: {
itemType: "reasoning",
status: "completed",
detail: "Checking the failing test first.",
},
});
});
});
26 changes: 23 additions & 3 deletions apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import {
type TurnId,
} from "@t3tools/contracts";

import type { AcpPermissionRequest, AcpPlanUpdate, AcpToolCallState } from "./AcpRuntimeModel.ts";
import type {
AcpAssistantChannel,
AcpPermissionRequest,
AcpPlanUpdate,
AcpToolCallState,
} from "./AcpRuntimeModel.ts";

type AcpAdapterRawSource = Extract<
RuntimeEventRawSource,
Expand Down Expand Up @@ -198,6 +203,9 @@ export function makeAcpAssistantItemEvent(input: {
readonly turnId: TurnId | undefined;
readonly itemId: string;
readonly lifecycle: "item.started" | "item.completed";
readonly channel?: AcpAssistantChannel;
/** Full segment text for completed thought segments. */
readonly detail?: string;
}): ProviderRuntimeEvent {
return {
type: input.lifecycle,
Expand All @@ -207,8 +215,11 @@ export function makeAcpAssistantItemEvent(input: {
turnId: input.turnId,
itemId: RuntimeItemId.make(input.itemId),
payload: {
itemType: "assistant_message",
itemType: input.channel === "thought" ? "reasoning" : "assistant_message",
status: input.lifecycle === "item.completed" ? "completed" : "inProgress",
...(input.detail !== undefined && input.detail.trim().length > 0
? { detail: input.detail }
: {}),
},
};
}
Expand All @@ -219,6 +230,7 @@ export function makeAcpContentDeltaEvent(input: {
readonly threadId: ThreadId;
readonly turnId: TurnId | undefined;
readonly itemId?: string;
readonly channel?: AcpAssistantChannel;
readonly text: string;
readonly rawPayload: unknown;
}): ProviderRuntimeEvent {
Expand All @@ -230,7 +242,15 @@ export function makeAcpContentDeltaEvent(input: {
turnId: input.turnId,
...(input.itemId ? { itemId: RuntimeItemId.make(input.itemId) } : {}),
payload: {
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
streamKind: "assistant_text",
// reasoning_text deltas are intentionally not appended to the assistant
// message by ProviderRuntimeIngestion (same as the Codex/Claude/OpenCode
// adapters' reasoning deltas). The full thought text is accumulated in
// AcpSessionRuntime's active segment and delivered via item.completed
// (itemType "reasoning"), which ingestion persists as an expandable
// thinking row. Segments are closed on channel switches, tool calls, and
// prompt settlement (including cancellation), so accumulated reasoning
// survives an interrupted turn.
streamKind: input.channel === "thought" ? "reasoning_text" : "assistant_text",
delta: input.text,
},
raw: {
Expand Down
Loading
Loading