Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions apps/server/scripts/acp-mock-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHO
const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1";
const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1";
const emitXAiPromptCompleteThenHang = process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG === "1";
const emitXAiPromptCompleteWithoutPromptIdThenHang =
process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_WITHOUT_PROMPT_ID_THEN_HANG === "1";
const emitForeignSessionUpdates = process.env.T3_ACP_EMIT_FOREIGN_SESSION_UPDATES === "1";
const hangPromptForever = process.env.T3_ACP_HANG_PROMPT_FOREVER === "1";
const hangFirstPromptForever = process.env.T3_ACP_HANG_FIRST_PROMPT_FOREVER === "1";
Expand All @@ -30,6 +32,7 @@ const failLoadSession = process.env.T3_ACP_FAIL_LOAD_SESSION === "1";
const emitLoadReplay = process.env.T3_ACP_EMIT_LOAD_REPLAY === "1";
const hangLoadSessionAfterReplay = process.env.T3_ACP_HANG_LOAD_SESSION_AFTER_REPLAY === "1";
const delayLoadSessionAfterReplay = process.env.T3_ACP_DELAY_LOAD_SESSION_AFTER_REPLAY === "1";
const fastLoadWithDelayedReplayTail = process.env.T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL === "1";
const loadSessionDelayMs = Number(process.env.T3_ACP_LOAD_SESSION_DELAY_MS ?? "5000");
const emitStaleXAiPromptCompleteBeforeSecondHang =
process.env.T3_ACP_EMIT_STALE_XAI_PROMPT_COMPLETE_BEFORE_SECOND_HANG === "1";
Expand Down Expand Up @@ -362,6 +365,41 @@ const program = Effect.gen(function* () {
configOptions: configOptions(),
};
}
if (fastLoadWithDelayedReplayTail) {
emitLoadReplayNotifications(requestedSessionId);
yield* agent.client.sessionUpdate({
sessionId: requestedSessionId,
update: {
sessionUpdate: "user_message_chunk",
content: { type: "text", text: "replay-head" },
},
});
const response = {
modes: modeState(),
models: modelState(),
configOptions: configOptions(),
};
void Effect.runFork(
Effect.gen(function* () {
yield* Effect.sleep("30 millis");
writeJsonRpcNotification("session/update", {
_meta: { isReplay: true },
sessionId: requestedSessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "delayed replay tail" },
},
});
writeJsonRpcNotification("_x.ai/session/prompt_complete", {
sessionId: requestedSessionId,
promptId: "replay-stale-prompt-id",
stopReason: "end_turn",
agentResult: null,
});
}).pipe(Effect.provide(NodeServices.layer)),
);
return response;
}
if (emitLoadReplay) {
emitLoadReplayNotifications(requestedSessionId);
}
Expand Down Expand Up @@ -522,6 +560,15 @@ const program = Effect.gen(function* () {
return yield* Effect.never;
}

if (emitXAiPromptCompleteWithoutPromptIdThenHang) {
writeJsonRpcNotification("_x.ai/session/prompt_complete", {
sessionId: requestedSessionId,
stopReason: "end_turn",
agentResult: null,
});
return yield* Effect.never;
}

if (emitXAiPromptCompleteThenHang) {
writeJsonRpcNotification("session/update", {
sessionId: requestedSessionId,
Expand Down
65 changes: 39 additions & 26 deletions apps/server/src/provider/Layers/GrokAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -570,32 +570,45 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
});

const mcpSession = McpProviderSession.readMcpProviderSession(input.threadId);
const acp = yield* makeGrokAcpRuntime({
grokSettings,
...(options?.environment ? { environment: options.environment } : {}),
childProcessSpawner,
cwd,
...(resumeSessionId ? { resumeSessionId } : {}),
clientInfo: { name: "t3-code", version: "0.0.0" },
...(mcpSession
? {
mcpServers: [
{
type: "http" as const,
name: "t3-code",
url: mcpSession.endpoint,
headers: [
{
name: "Authorization",
value: mcpSession.authorizationHeader,
},
],
},
],
}
: {}),
...acpNativeLoggers,
}).pipe(
const allocateXAiPromptFallbackId = randomUUIDv4.pipe(
Effect.map((uuid) => `t3-xai-prompt-${uuid}`),
Effect.mapError(
(cause) =>
new EffectAcpErrors.AcpTransportError({
detail: "Failed to allocate xAI prompt identifier.",
cause,
}),
),
);
const acp = yield* makeGrokAcpRuntime(
{
grokSettings,
...(options?.environment ? { environment: options.environment } : {}),
childProcessSpawner,
cwd,
...(resumeSessionId ? { resumeSessionId } : {}),
clientInfo: { name: "t3-code", version: "0.0.0" },
...(mcpSession
? {
mcpServers: [
{
type: "http" as const,
name: "t3-code",
url: mcpSession.endpoint,
headers: [
{
name: "Authorization",
value: mcpSession.authorizationHeader,
},
],
},
],
}
: {}),
...acpNativeLoggers,
},
allocateXAiPromptFallbackId,
).pipe(
Effect.provideService(Scope.Scope, sessionScope),
Effect.mapError(
(cause) =>
Expand Down
146 changes: 145 additions & 1 deletion apps/server/src/provider/Layers/ProviderService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,19 @@ routing.layer("ProviderServiceLive routing", (it) => {
attachments: [],
});

yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild);
yield* advanceTestClock(50);

routing.codex.emit({
type: "turn.started",
eventId: asEventId("evt-runtime-status-start"),
provider: ProviderDriverKind.make("codex"),
createdAt: "2026-01-01T00:00:00.000Z",
threadId: session.threadId,
turnId: asTurnId(`turn-${String(session.threadId)}`),
});
yield* advanceTestClock(50);

const runningRuntime = yield* runtimeRepository.getByThreadId({
threadId: session.threadId,
});
Expand All @@ -1314,7 +1327,7 @@ routing.layer("ProviderServiceLive routing", (it) => {
assert.equal(runtimePayload.model, null);
assert.equal(runtimePayload.activeTurnId, `turn-${String(session.threadId)}`);
assert.equal(runtimePayload.lastError, null);
assert.equal(runtimePayload.lastRuntimeEvent, "provider.sendTurn");
assert.equal(runtimePayload.lastRuntimeEvent, "provider.turn.started");
}
}
}),
Expand Down Expand Up @@ -1573,6 +1586,137 @@ fanout.layer("ProviderServiceLive fanout", (it) => {
}),
);

it.effect("clears persisted activeTurnId when adapter emits turn.completed", () =>
Effect.gen(function* () {
const provider = yield* ProviderService.ProviderService;
const runtimeRepository = yield* ProviderSessionRuntime.ProviderSessionRuntimeRepository;
const session = yield* provider.startSession(asThreadId("thread-runtime-turn-complete"), {
provider: ProviderDriverKind.make("codex"),
providerInstanceId: codexInstanceId,
threadId: asThreadId("thread-runtime-turn-complete"),
runtimeMode: "full-access",
});
yield* provider.sendTurn({
threadId: session.threadId,
input: "hello",
attachments: [],
});

yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild);
yield* advanceTestClock(50);

fanout.codex.emit({
type: "turn.started",
eventId: asEventId("evt-runtime-turn-start"),
provider: ProviderDriverKind.make("codex"),
createdAt: "2026-01-01T00:00:00.000Z",
threadId: session.threadId,
turnId: asTurnId(`turn-${String(session.threadId)}`),
});
yield* advanceTestClock(50);

const runningRuntime = yield* runtimeRepository.getByThreadId({
threadId: session.threadId,
});
assert.equal(Option.isSome(runningRuntime), true);
if (Option.isSome(runningRuntime)) {
const payload = runningRuntime.value.runtimePayload;
assert.equal(payload !== null && typeof payload === "object", true);
if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) {
assert.equal(
(payload as { activeTurnId: string | null }).activeTurnId,
`turn-${String(session.threadId)}`,
);
}
}

fanout.codex.emit({
type: "turn.completed",
eventId: asEventId("evt-runtime-turn-complete"),
provider: ProviderDriverKind.make("codex"),
createdAt: "2026-01-01T00:00:00.000Z",
threadId: session.threadId,
turnId: asTurnId(`turn-${String(session.threadId)}`),
status: "completed",
});
yield* advanceTestClock(50);

const settledRuntime = yield* runtimeRepository.getByThreadId({
threadId: session.threadId,
});
assert.equal(Option.isSome(settledRuntime), true);
if (Option.isSome(settledRuntime)) {
const payload = settledRuntime.value.runtimePayload;
assert.equal(payload !== null && typeof payload === "object", true);
if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) {
const runtimePayload = payload as {
activeTurnId: string | null;
lastRuntimeEvent: string | null;
};
assert.equal(runtimePayload.activeTurnId, null);
assert.equal(runtimePayload.lastRuntimeEvent, "provider.turn.completed");
assert.equal(settledRuntime.value.status, "running");
}
}
}),
);

it.effect("does not resurrect activeTurnId after sendTurn returns following turn.completed", () =>
Effect.gen(function* () {
const provider = yield* ProviderService.ProviderService;
const runtimeRepository = yield* ProviderSessionRuntime.ProviderSessionRuntimeRepository;
const session = yield* provider.startSession(asThreadId("thread-runtime-sendturn-settle"), {
provider: ProviderDriverKind.make("codex"),
providerInstanceId: codexInstanceId,
threadId: asThreadId("thread-runtime-sendturn-settle"),
runtimeMode: "full-access",
});
const turnId = asTurnId(`turn-${String(session.threadId)}`);

yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild);
yield* advanceTestClock(50);

fanout.codex.emit({
type: "turn.started",
eventId: asEventId("evt-runtime-sendturn-start"),
provider: ProviderDriverKind.make("codex"),
createdAt: "2026-01-01T00:00:00.000Z",
threadId: session.threadId,
turnId,
});
fanout.codex.emit({
type: "turn.completed",
eventId: asEventId("evt-runtime-sendturn-complete"),
provider: ProviderDriverKind.make("codex"),
createdAt: "2026-01-01T00:00:00.000Z",
threadId: session.threadId,
turnId,
status: "completed",
});
yield* advanceTestClock(50);

yield* provider.sendTurn({
threadId: session.threadId,
input: "hello",
attachments: [],
});
yield* advanceTestClock(50);

const settledRuntime = yield* runtimeRepository.getByThreadId({
threadId: session.threadId,
});
assert.equal(Option.isSome(settledRuntime), true);
if (Option.isSome(settledRuntime)) {
const payload = settledRuntime.value.runtimePayload;
assert.equal(payload !== null && typeof payload === "object", true);
if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) {
assert.equal((payload as { activeTurnId: string | null }).activeTurnId, null);
assert.equal(settledRuntime.value.status, "running");
}
}
}),
);

it.effect("fans out canonical runtime events in emission order", () =>
Effect.gen(function* () {
const provider = yield* ProviderService.ProviderService;
Expand Down
39 changes: 39 additions & 0 deletions apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ describe("AcpSessionRuntime", () => {
),
Effect.scoped,
Effect.provide(NodeServices.layer),
TestClock.withLive,
),
);

Expand Down Expand Up @@ -568,6 +569,44 @@ describe("AcpSessionRuntime", () => {
),
);

it.effect("waits for delayed session/load replay tail after the load RPC returns", () =>
Effect.gen(function* () {
const runtime = yield* AcpSessionRuntime.AcpSessionRuntime;
const started = yield* runtime.start().pipe(Effect.timeout("2 seconds"));

expect(started.sessionId).toBe("mock-session-1");
expect(started.sessionSetupResult._meta).not.toMatchObject({
t3SessionLoadReady: "replay_idle",
});

const unexpectedReplayEvent = yield* Stream.runHead(runtime.getEvents()).pipe(
Effect.timeoutOption("100 millis"),
);
expect(Option.isNone(unexpectedReplayEvent)).toBe(true);
}).pipe(
Effect.provide(
AcpSessionRuntime.layer({
authMethodId: "test",
spawn: {
command: mockAgentCommand,
args: mockAgentArgs,
env: {
T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL: "1",
},
},
cwd: process.cwd(),
resumeSessionId: "mock-session-1",
sessionLoadReplayIdleGap: "50 millis",
sessionLoadTimeout: "2 seconds",
clientInfo: { name: "t3-test", version: "0.0.0" },
}),
),
Effect.scoped,
Effect.provide(NodeServices.layer),
TestClock.withLive,
),
);

it.effect("rejects invalid config option values before sending session/set_config_option", () => {
const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "acp-runtime-"));
const requestLogPath = NodePath.join(tempDir, "requests.ndjson");
Expand Down
14 changes: 13 additions & 1 deletion apps/server/src/provider/acp/GrokAcpSupport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,20 @@ function resolveGrokAuthMethodId(environment: NodeJS.ProcessEnv | undefined): st
: GROK_AUTH_METHOD_CACHED_TOKEN;
}

export const sequentialXAiPromptFallbackIdAllocator = (): Effect.Effect<string, never> => {
let next = 0;
return Effect.sync(() => {
next += 1;
return `t3-xai-prompt-${next}`;
});
};

export const makeGrokAcpRuntime = (
input: GrokAcpRuntimeInput,
allocatePromptFallbackId: Effect.Effect<
string,
EffectAcpErrors.AcpError
> = sequentialXAiPromptFallbackIdAllocator(),
): Effect.Effect<
AcpSessionRuntime.AcpSessionRuntime["Service"],
EffectAcpErrors.AcpError,
Expand All @@ -72,7 +84,7 @@ export const makeGrokAcpRuntime = (
const runtime = yield* Effect.service(AcpSessionRuntime.AcpSessionRuntime).pipe(
Effect.provide(acpContext),
);
return yield* makeXAiPromptCompletionRuntime(runtime);
return yield* makeXAiPromptCompletionRuntime(runtime, allocatePromptFallbackId);
});

export function resolveGrokAcpBaseModelId(model: string | null | undefined): string {
Expand Down
Loading