From 71a95588115e0cf341d8c3e539604acc63747507 Mon Sep 17 00:00:00 2001 From: Justin Gray Date: Mon, 29 Jun 2026 22:47:17 +0000 Subject: [PATCH] fix(grok): harden ACP resume and session lifecycle handling Improve Grok adapter resume semantics, xAI ACP extension replay behavior, and provider runtime turn tracking tests so resumed Grok sessions stay consistent across restarts. Co-authored-by: Cursor --- apps/server/scripts/acp-mock-agent.ts | 47 +++++ .../server/src/provider/Layers/GrokAdapter.ts | 65 +++--- .../provider/Layers/ProviderService.test.ts | 146 ++++++++++++- .../provider/acp/AcpJsonRpcConnection.test.ts | 39 ++++ .../server/src/provider/acp/GrokAcpSupport.ts | 14 +- .../src/provider/acp/XAiAcpExtension.test.ts | 196 +++++++++++++++++- .../src/provider/acp/XAiAcpExtension.ts | 106 ++++++---- .../src/textGeneration/GrokTextGeneration.ts | 31 ++- 8 files changed, 567 insertions(+), 77 deletions(-) diff --git a/apps/server/scripts/acp-mock-agent.ts b/apps/server/scripts/acp-mock-agent.ts index bc7828dd854..9ecdf7d58ba 100644 --- a/apps/server/scripts/acp-mock-agent.ts +++ b/apps/server/scripts/acp-mock-agent.ts @@ -20,6 +20,8 @@ const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHO const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1"; const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1"; const emitXAiPromptCompleteThenHang = process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG === "1"; +const emitXAiPromptCompleteWithoutPromptIdThenHang = + process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_WITHOUT_PROMPT_ID_THEN_HANG === "1"; const emitForeignSessionUpdates = process.env.T3_ACP_EMIT_FOREIGN_SESSION_UPDATES === "1"; const hangPromptForever = process.env.T3_ACP_HANG_PROMPT_FOREVER === "1"; const hangFirstPromptForever = process.env.T3_ACP_HANG_FIRST_PROMPT_FOREVER === "1"; @@ -30,6 +32,7 @@ const failLoadSession = process.env.T3_ACP_FAIL_LOAD_SESSION === "1"; const emitLoadReplay = process.env.T3_ACP_EMIT_LOAD_REPLAY === "1"; const hangLoadSessionAfterReplay = process.env.T3_ACP_HANG_LOAD_SESSION_AFTER_REPLAY === "1"; const delayLoadSessionAfterReplay = process.env.T3_ACP_DELAY_LOAD_SESSION_AFTER_REPLAY === "1"; +const fastLoadWithDelayedReplayTail = process.env.T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL === "1"; const loadSessionDelayMs = Number(process.env.T3_ACP_LOAD_SESSION_DELAY_MS ?? "5000"); const emitStaleXAiPromptCompleteBeforeSecondHang = process.env.T3_ACP_EMIT_STALE_XAI_PROMPT_COMPLETE_BEFORE_SECOND_HANG === "1"; @@ -362,6 +365,41 @@ const program = Effect.gen(function* () { configOptions: configOptions(), }; } + if (fastLoadWithDelayedReplayTail) { + emitLoadReplayNotifications(requestedSessionId); + yield* agent.client.sessionUpdate({ + sessionId: requestedSessionId, + update: { + sessionUpdate: "user_message_chunk", + content: { type: "text", text: "replay-head" }, + }, + }); + const response = { + modes: modeState(), + models: modelState(), + configOptions: configOptions(), + }; + void Effect.runFork( + Effect.gen(function* () { + yield* Effect.sleep("30 millis"); + writeJsonRpcNotification("session/update", { + _meta: { isReplay: true }, + sessionId: requestedSessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "delayed replay tail" }, + }, + }); + writeJsonRpcNotification("_x.ai/session/prompt_complete", { + sessionId: requestedSessionId, + promptId: "replay-stale-prompt-id", + stopReason: "end_turn", + agentResult: null, + }); + }).pipe(Effect.provide(NodeServices.layer)), + ); + return response; + } if (emitLoadReplay) { emitLoadReplayNotifications(requestedSessionId); } @@ -522,6 +560,15 @@ const program = Effect.gen(function* () { return yield* Effect.never; } + if (emitXAiPromptCompleteWithoutPromptIdThenHang) { + writeJsonRpcNotification("_x.ai/session/prompt_complete", { + sessionId: requestedSessionId, + stopReason: "end_turn", + agentResult: null, + }); + return yield* Effect.never; + } + if (emitXAiPromptCompleteThenHang) { writeJsonRpcNotification("session/update", { sessionId: requestedSessionId, diff --git a/apps/server/src/provider/Layers/GrokAdapter.ts b/apps/server/src/provider/Layers/GrokAdapter.ts index c22b2180183..d1033da724c 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.ts @@ -570,32 +570,45 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte }); const mcpSession = McpProviderSession.readMcpProviderSession(input.threadId); - const acp = yield* makeGrokAcpRuntime({ - grokSettings, - ...(options?.environment ? { environment: options.environment } : {}), - childProcessSpawner, - cwd, - ...(resumeSessionId ? { resumeSessionId } : {}), - clientInfo: { name: "t3-code", version: "0.0.0" }, - ...(mcpSession - ? { - mcpServers: [ - { - type: "http" as const, - name: "t3-code", - url: mcpSession.endpoint, - headers: [ - { - name: "Authorization", - value: mcpSession.authorizationHeader, - }, - ], - }, - ], - } - : {}), - ...acpNativeLoggers, - }).pipe( + const allocateXAiPromptFallbackId = randomUUIDv4.pipe( + Effect.map((uuid) => `t3-xai-prompt-${uuid}`), + Effect.mapError( + (cause) => + new EffectAcpErrors.AcpTransportError({ + detail: "Failed to allocate xAI prompt identifier.", + cause, + }), + ), + ); + const acp = yield* makeGrokAcpRuntime( + { + grokSettings, + ...(options?.environment ? { environment: options.environment } : {}), + childProcessSpawner, + cwd, + ...(resumeSessionId ? { resumeSessionId } : {}), + clientInfo: { name: "t3-code", version: "0.0.0" }, + ...(mcpSession + ? { + mcpServers: [ + { + type: "http" as const, + name: "t3-code", + url: mcpSession.endpoint, + headers: [ + { + name: "Authorization", + value: mcpSession.authorizationHeader, + }, + ], + }, + ], + } + : {}), + ...acpNativeLoggers, + }, + allocateXAiPromptFallbackId, + ).pipe( Effect.provideService(Scope.Scope, sessionScope), Effect.mapError( (cause) => diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index 2b4cc103103..93d91e560e4 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -1293,6 +1293,19 @@ routing.layer("ProviderServiceLive routing", (it) => { attachments: [], }); + yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild); + yield* advanceTestClock(50); + + routing.codex.emit({ + type: "turn.started", + eventId: asEventId("evt-runtime-status-start"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId: asTurnId(`turn-${String(session.threadId)}`), + }); + yield* advanceTestClock(50); + const runningRuntime = yield* runtimeRepository.getByThreadId({ threadId: session.threadId, }); @@ -1314,7 +1327,7 @@ routing.layer("ProviderServiceLive routing", (it) => { assert.equal(runtimePayload.model, null); assert.equal(runtimePayload.activeTurnId, `turn-${String(session.threadId)}`); assert.equal(runtimePayload.lastError, null); - assert.equal(runtimePayload.lastRuntimeEvent, "provider.sendTurn"); + assert.equal(runtimePayload.lastRuntimeEvent, "provider.turn.started"); } } }), @@ -1573,6 +1586,137 @@ fanout.layer("ProviderServiceLive fanout", (it) => { }), ); + it.effect("clears persisted activeTurnId when adapter emits turn.completed", () => + Effect.gen(function* () { + const provider = yield* ProviderService.ProviderService; + const runtimeRepository = yield* ProviderSessionRuntime.ProviderSessionRuntimeRepository; + const session = yield* provider.startSession(asThreadId("thread-runtime-turn-complete"), { + provider: ProviderDriverKind.make("codex"), + providerInstanceId: codexInstanceId, + threadId: asThreadId("thread-runtime-turn-complete"), + runtimeMode: "full-access", + }); + yield* provider.sendTurn({ + threadId: session.threadId, + input: "hello", + attachments: [], + }); + + yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild); + yield* advanceTestClock(50); + + fanout.codex.emit({ + type: "turn.started", + eventId: asEventId("evt-runtime-turn-start"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId: asTurnId(`turn-${String(session.threadId)}`), + }); + yield* advanceTestClock(50); + + const runningRuntime = yield* runtimeRepository.getByThreadId({ + threadId: session.threadId, + }); + assert.equal(Option.isSome(runningRuntime), true); + if (Option.isSome(runningRuntime)) { + const payload = runningRuntime.value.runtimePayload; + assert.equal(payload !== null && typeof payload === "object", true); + if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) { + assert.equal( + (payload as { activeTurnId: string | null }).activeTurnId, + `turn-${String(session.threadId)}`, + ); + } + } + + fanout.codex.emit({ + type: "turn.completed", + eventId: asEventId("evt-runtime-turn-complete"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId: asTurnId(`turn-${String(session.threadId)}`), + status: "completed", + }); + yield* advanceTestClock(50); + + const settledRuntime = yield* runtimeRepository.getByThreadId({ + threadId: session.threadId, + }); + assert.equal(Option.isSome(settledRuntime), true); + if (Option.isSome(settledRuntime)) { + const payload = settledRuntime.value.runtimePayload; + assert.equal(payload !== null && typeof payload === "object", true); + if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) { + const runtimePayload = payload as { + activeTurnId: string | null; + lastRuntimeEvent: string | null; + }; + assert.equal(runtimePayload.activeTurnId, null); + assert.equal(runtimePayload.lastRuntimeEvent, "provider.turn.completed"); + assert.equal(settledRuntime.value.status, "running"); + } + } + }), + ); + + it.effect("does not resurrect activeTurnId after sendTurn returns following turn.completed", () => + Effect.gen(function* () { + const provider = yield* ProviderService.ProviderService; + const runtimeRepository = yield* ProviderSessionRuntime.ProviderSessionRuntimeRepository; + const session = yield* provider.startSession(asThreadId("thread-runtime-sendturn-settle"), { + provider: ProviderDriverKind.make("codex"), + providerInstanceId: codexInstanceId, + threadId: asThreadId("thread-runtime-sendturn-settle"), + runtimeMode: "full-access", + }); + const turnId = asTurnId(`turn-${String(session.threadId)}`); + + yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild); + yield* advanceTestClock(50); + + fanout.codex.emit({ + type: "turn.started", + eventId: asEventId("evt-runtime-sendturn-start"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId, + }); + fanout.codex.emit({ + type: "turn.completed", + eventId: asEventId("evt-runtime-sendturn-complete"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId, + status: "completed", + }); + yield* advanceTestClock(50); + + yield* provider.sendTurn({ + threadId: session.threadId, + input: "hello", + attachments: [], + }); + yield* advanceTestClock(50); + + const settledRuntime = yield* runtimeRepository.getByThreadId({ + threadId: session.threadId, + }); + assert.equal(Option.isSome(settledRuntime), true); + if (Option.isSome(settledRuntime)) { + const payload = settledRuntime.value.runtimePayload; + assert.equal(payload !== null && typeof payload === "object", true); + if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) { + assert.equal((payload as { activeTurnId: string | null }).activeTurnId, null); + assert.equal(settledRuntime.value.status, "running"); + } + } + }), + ); + it.effect("fans out canonical runtime events in emission order", () => Effect.gen(function* () { const provider = yield* ProviderService.ProviderService; diff --git a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts index 4e9700dab7d..7422395fc83 100644 --- a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts +++ b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts @@ -526,6 +526,7 @@ describe("AcpSessionRuntime", () => { ), Effect.scoped, Effect.provide(NodeServices.layer), + TestClock.withLive, ), ); @@ -568,6 +569,44 @@ describe("AcpSessionRuntime", () => { ), ); + it.effect("waits for delayed session/load replay tail after the load RPC returns", () => + Effect.gen(function* () { + const runtime = yield* AcpSessionRuntime.AcpSessionRuntime; + const started = yield* runtime.start().pipe(Effect.timeout("2 seconds")); + + expect(started.sessionId).toBe("mock-session-1"); + expect(started.sessionSetupResult._meta).not.toMatchObject({ + t3SessionLoadReady: "replay_idle", + }); + + const unexpectedReplayEvent = yield* Stream.runHead(runtime.getEvents()).pipe( + Effect.timeoutOption("100 millis"), + ); + expect(Option.isNone(unexpectedReplayEvent)).toBe(true); + }).pipe( + Effect.provide( + AcpSessionRuntime.layer({ + authMethodId: "test", + spawn: { + command: mockAgentCommand, + args: mockAgentArgs, + env: { + T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL: "1", + }, + }, + cwd: process.cwd(), + resumeSessionId: "mock-session-1", + sessionLoadReplayIdleGap: "50 millis", + sessionLoadTimeout: "2 seconds", + clientInfo: { name: "t3-test", version: "0.0.0" }, + }), + ), + Effect.scoped, + Effect.provide(NodeServices.layer), + TestClock.withLive, + ), + ); + it.effect("rejects invalid config option values before sending session/set_config_option", () => { const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "acp-runtime-")); const requestLogPath = NodePath.join(tempDir, "requests.ndjson"); diff --git a/apps/server/src/provider/acp/GrokAcpSupport.ts b/apps/server/src/provider/acp/GrokAcpSupport.ts index 3e6d63a5393..da5ad980754 100644 --- a/apps/server/src/provider/acp/GrokAcpSupport.ts +++ b/apps/server/src/provider/acp/GrokAcpSupport.ts @@ -50,8 +50,20 @@ function resolveGrokAuthMethodId(environment: NodeJS.ProcessEnv | undefined): st : GROK_AUTH_METHOD_CACHED_TOKEN; } +export const sequentialXAiPromptFallbackIdAllocator = (): Effect.Effect => { + let next = 0; + return Effect.sync(() => { + next += 1; + return `t3-xai-prompt-${next}`; + }); +}; + export const makeGrokAcpRuntime = ( input: GrokAcpRuntimeInput, + allocatePromptFallbackId: Effect.Effect< + string, + EffectAcpErrors.AcpError + > = sequentialXAiPromptFallbackIdAllocator(), ): Effect.Effect< AcpSessionRuntime.AcpSessionRuntime["Service"], EffectAcpErrors.AcpError, @@ -72,7 +84,7 @@ export const makeGrokAcpRuntime = ( const runtime = yield* Effect.service(AcpSessionRuntime.AcpSessionRuntime).pipe( Effect.provide(acpContext), ); - return yield* makeXAiPromptCompletionRuntime(runtime); + return yield* makeXAiPromptCompletionRuntime(runtime, allocatePromptFallbackId); }); export function resolveGrokAcpBaseModelId(model: string | null | undefined): string { diff --git a/apps/server/src/provider/acp/XAiAcpExtension.test.ts b/apps/server/src/provider/acp/XAiAcpExtension.test.ts index c435269fd76..e2bf8a56bbe 100644 --- a/apps/server/src/provider/acp/XAiAcpExtension.test.ts +++ b/apps/server/src/provider/acp/XAiAcpExtension.test.ts @@ -4,8 +4,12 @@ import * as NodeURL from "node:url"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { it } from "@effect/vitest"; +import * as Crypto from "effect/Crypto"; +import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; +import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; +import * as TestClock from "effect/testing/TestClock"; import { describe, expect } from "vite-plus/test"; import { @@ -13,15 +17,18 @@ import { makeXAiAskUserQuestionCancelledResponse, makeXAiAskUserQuestionResponse, makeXAiPromptCompletionRuntime, + resolveXAiPromptCompletionFallback, XAiAskUserQuestionRequest, } from "./XAiAcpExtension.ts"; import * as AcpSessionRuntime from "./AcpSessionRuntime.ts"; +import * as EffectAcpErrors from "effect-acp/errors"; const __dirname = NodePath.dirname(NodeURL.fileURLToPath(import.meta.url)); const mockAgentPath = NodePath.join(__dirname, "../../../scripts/acp-mock-agent.ts"); -const makePromptCompletionRuntime = (env: NodeJS.ProcessEnv) => +const makePromptCompletionRuntime = (env: NodeJS.ProcessEnv, resumeSessionId?: string) => Effect.gen(function* () { + const crypto = yield* Crypto.Crypto; const runtime = yield* AcpSessionRuntime.make({ spawn: { command: process.execPath, @@ -31,8 +38,21 @@ const makePromptCompletionRuntime = (env: NodeJS.ProcessEnv) => cwd: process.cwd(), clientInfo: { name: "t3-test", version: "0.0.0" }, authMethodId: "test", + ...(resumeSessionId ? { resumeSessionId } : {}), + sessionLoadReplayIdleGap: "50 millis", + sessionLoadTimeout: "2 seconds", }); - return yield* makeXAiPromptCompletionRuntime(runtime); + const allocatePromptFallbackId = crypto.randomUUIDv4.pipe( + Effect.map((uuid) => `t3-xai-prompt-${uuid}`), + Effect.mapError( + (cause) => + new EffectAcpErrors.AcpTransportError({ + detail: "Failed to allocate xAI prompt identifier.", + cause, + }), + ), + ); + return yield* makeXAiPromptCompletionRuntime(runtime, allocatePromptFallbackId); }); const decodeXAiAskUserQuestionRequest = Schema.decodeUnknownSync(XAiAskUserQuestionRequest); @@ -296,7 +316,175 @@ describe("XAiAcpExtension", () => { requestId: promptId, }, }); - }).pipe(Effect.scoped, Effect.provide(NodeServices.layer)), + }).pipe(Effect.scoped, Effect.provide(NodeServices.layer), TestClock.withLive), + ); + + it.effect("ignores xAI prompt completion notifications during session/load replay", () => + Effect.gen(function* () { + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + deferred: yield* Deferred.make(), + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }, + isSessionLoadReplayActive: Effect.succeed(true), + }); + + const pending = yield* Ref.get(pendingRef); + expect(pending).toHaveLength(1); + expect(pending[0]?.promptId).toBe("t3-xai-prompt-1"); + }), + ); + + it.effect("completes a prompt after session/load with a delayed replay tail", () => + Effect.gen(function* () { + const runtime = yield* makePromptCompletionRuntime( + { + T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL: "1", + T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG: "1", + }, + "mock-session-1", + ); + yield* runtime.start(); + + const promptResult = yield* runtime.prompt({ + prompt: [{ type: "text", text: "after resume" }], + }); + const promptId = promptResult._meta?.promptId; + + expect(typeof promptId).toBe("string"); + expect(promptResult).toMatchObject({ + stopReason: "end_turn", + _meta: { + sessionId: "mock-session-1", + promptId, + requestId: promptId, + }, + }); + }).pipe(Effect.scoped, Effect.provide(NodeServices.layer), TestClock.withLive), + ); + + it.effect("ignores xAI prompt completion notifications without a prompt id", () => + Effect.gen(function* () { + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + deferred: yield* Deferred.make(), + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + stopReason: "end_turn", + agentResult: null, + }, + }); + + const pending = yield* Ref.get(pendingRef); + expect(pending).toHaveLength(1); + expect(pending[0]?.promptId).toBe("t3-xai-prompt-1"); + }), + ); + + it.effect("settles deferred after grace without blocking the notification handler", () => + Effect.gen(function* () { + const deferred = yield* Deferred.make(); + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + deferred, + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }, + }); + + expect(yield* Deferred.isDone(deferred)).toBe(false); + yield* Effect.sleep("600 millis"); + expect(yield* Deferred.isDone(deferred)).toBe(true); + }).pipe(TestClock.withLive), + ); + + it.effect( + "ignores replayed prompt completion when prompt id does not match the in-flight prompt", + () => + Effect.gen(function* () { + const deferred = yield* Deferred.make(); + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-fresh-uuid", + deferred, + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }, + }); + + const pending = yield* Ref.get(pendingRef); + expect(pending).toHaveLength(1); + expect(yield* Deferred.isDone(deferred)).toBe(false); + }), ); it.effect("ignores stale xAI completion from an already settled prompt", () => @@ -327,6 +515,6 @@ describe("XAiAcpExtension", () => { requestId: secondPromptId, }, }); - }).pipe(Effect.scoped, Effect.provide(NodeServices.layer)), + }).pipe(Effect.scoped, Effect.provide(NodeServices.layer), TestClock.withLive), ); }); diff --git a/apps/server/src/provider/acp/XAiAcpExtension.ts b/apps/server/src/provider/acp/XAiAcpExtension.ts index d36a5fcfc89..b99e64a65c4 100644 --- a/apps/server/src/provider/acp/XAiAcpExtension.ts +++ b/apps/server/src/provider/acp/XAiAcpExtension.ts @@ -1,9 +1,11 @@ import type { ProviderUserInputAnswers, UserInputQuestion } from "@t3tools/contracts"; import * as Deferred from "effect/Deferred"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; import type * as EffectAcpSchema from "effect-acp/schema"; +import * as EffectAcpErrors from "effect-acp/errors"; import type * as AcpSessionRuntime from "./AcpSessionRuntime.ts"; @@ -24,6 +26,7 @@ interface PendingXAiPromptCompletion { const completedXAiPromptIdLimit = 128; const xAiStopReasonMissingMetaKey = "xAiStopReasonMissing"; +export const xAiPromptCompletionSettleDelay = Duration.millis(500); const XAiAskUserQuestionOption = Schema.Struct({ label: Schema.String, @@ -201,15 +204,13 @@ export function makeXAiAskUserQuestionCancelledResponse(): XAiAskUserQuestionCan * The underlying runtime remains unaware of xAI methods and metadata. */ export const makeXAiPromptCompletionRuntime = Effect.fn("makeXAiPromptCompletionRuntime")( - function* (runtime: AcpSessionRuntime.AcpSessionRuntime["Service"]) { + function* ( + runtime: AcpSessionRuntime.AcpSessionRuntime["Service"], + allocatePromptFallbackId: Effect.Effect, + ) { const activeSessionIdRef = yield* Ref.make(undefined); const pendingRef = yield* Ref.make>([]); const completedPromptIdsRef = yield* Ref.make>([]); - let nextPromptFallbackId = 0; - const allocatePromptFallbackId = Effect.sync(() => { - nextPromptFallbackId += 1; - return `t3-xai-prompt-${nextPromptFallbackId}`; - }); yield* runtime.handleExtNotification( "_x.ai/session/prompt_complete", @@ -219,6 +220,8 @@ export const makeXAiPromptCompletionRuntime = Effect.fn("makeXAiPromptCompletion pendingRef, completedPromptIdsRef, notification, + isSessionLoadReplayActive: runtime.isSessionLoadReplayActive, + touchSessionLoadReplayActivity: runtime.touchSessionLoadReplayActivity, }), ); @@ -324,45 +327,74 @@ const abortPendingPromptCompletions = ( ] as const; }).pipe(Effect.flatten); -const resolveXAiPromptCompletionFallback = ({ +export const resolveXAiPromptCompletionFallback = ({ pendingRef, completedPromptIdsRef, notification, + isSessionLoadReplayActive, + touchSessionLoadReplayActivity, }: { readonly pendingRef: Ref.Ref>; readonly completedPromptIdsRef: Ref.Ref>; readonly notification: XAiPromptCompleteNotification; + readonly isSessionLoadReplayActive?: Effect.Effect; + readonly touchSessionLoadReplayActivity?: Effect.Effect; }) => - Ref.get(completedPromptIdsRef).pipe( - Effect.flatMap((completedPromptIds) => { - if ( - notification.promptId !== undefined && - completedPromptIds.includes(notification.promptId) - ) { - return Effect.void; - } - return Ref.modify(pendingRef, (pending) => { - const index = - notification.promptId !== undefined - ? pending.findIndex( - (entry) => - entry.sessionId === notification.sessionId && - entry.promptId === notification.promptId, - ) - : pending.findIndex((entry) => entry.sessionId === notification.sessionId); - if (index < 0) { - return [Effect.void, pending] as const; - } - const entry = pending[index]; - if (!entry) { - return [Effect.void, pending] as const; - } - return [ - Deferred.succeed(entry.deferred, promptResponseFromXAi(notification)).pipe(Effect.asVoid), - [...pending.slice(0, index), ...pending.slice(index + 1)], - ] as const; - }).pipe(Effect.flatten); - }), + (isSessionLoadReplayActive ?? Effect.succeed(false)).pipe( + Effect.flatMap((loadReplayActive) => + loadReplayActive + ? (touchSessionLoadReplayActivity ?? Effect.void) + : Ref.get(completedPromptIdsRef).pipe( + Effect.flatMap((completedPromptIds) => { + if ( + notification.promptId !== undefined && + completedPromptIds.includes(notification.promptId) + ) { + return Effect.void; + } + // Require an explicit prompt id. Session-id-only matching would let a + // replayed or stale prompt_complete from session/load settle the wrong + // in-flight prompt and interrupt the real session/prompt RPC. + if (notification.promptId === undefined) { + return Effect.void; + } + return Ref.modify(pendingRef, (pending) => { + const index = pending.findIndex( + (entry) => + entry.sessionId === notification.sessionId && + entry.promptId === notification.promptId, + ); + if (index < 0) { + return [Effect.void, pending] as const; + } + const entry = pending[index]; + if (!entry) { + return [Effect.void, pending] as const; + } + return [ + Effect.gen(function* () { + yield* Effect.sleep(xAiPromptCompletionSettleDelay); + yield* Ref.modify(pendingRef, (current) => { + const stillIndex = current.findIndex( + (candidate) => candidate.deferred === entry.deferred, + ); + if (stillIndex < 0) { + return [Effect.void, current] as const; + } + return [ + Deferred.succeed(entry.deferred, promptResponseFromXAi(notification)).pipe( + Effect.asVoid, + ), + [...current.slice(0, stillIndex), ...current.slice(stillIndex + 1)], + ] as const; + }).pipe(Effect.flatten); + }).pipe(Effect.forkDetach, Effect.asVoid), + pending, + ] as const; + }).pipe(Effect.flatten); + }), + ), + ), ); const rememberCompletedXAiPromptId = ( diff --git a/apps/server/src/textGeneration/GrokTextGeneration.ts b/apps/server/src/textGeneration/GrokTextGeneration.ts index 1bb58216305..b2a81a3e402 100644 --- a/apps/server/src/textGeneration/GrokTextGeneration.ts +++ b/apps/server/src/textGeneration/GrokTextGeneration.ts @@ -1,9 +1,10 @@ import * as Effect from "effect/Effect"; +import * as Crypto from "effect/Crypto"; import * as Option from "effect/Option"; import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; import { ChildProcessSpawner } from "effect/unstable/process"; -import type * as EffectAcpErrors from "effect-acp/errors"; +import * as EffectAcpErrors from "effect-acp/errors"; import { type GrokSettings, type ModelSelection } from "@t3tools/contracts"; import { sanitizeBranchFragment, sanitizeFeatureBranchName } from "@t3tools/shared/git"; @@ -38,6 +39,17 @@ export const makeGrokTextGeneration = Effect.fn("makeGrokTextGeneration")(functi environment: NodeJS.ProcessEnv = process.env, ) { const commandSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const crypto = yield* Crypto.Crypto; + const allocateXAiPromptFallbackId = crypto.randomUUIDv4.pipe( + Effect.map((uuid) => `t3-xai-prompt-${uuid}`), + Effect.mapError( + (cause) => + new EffectAcpErrors.AcpTransportError({ + detail: "Failed to allocate xAI prompt identifier.", + cause, + }), + ), + ); const runGrokJson = ({ operation, @@ -59,13 +71,16 @@ export const makeGrokTextGeneration = Effect.fn("makeGrokTextGeneration")(functi Effect.gen(function* () { const resolvedModel = resolveGrokAcpBaseModelId(modelSelection.model); const outputRef = yield* Ref.make(""); - const runtime = yield* makeGrokAcpRuntime({ - grokSettings, - environment, - childProcessSpawner: commandSpawner, - cwd, - clientInfo: { name: "t3-code-git-text", version: "0.0.0" }, - }); + const runtime = yield* makeGrokAcpRuntime( + { + grokSettings, + environment, + childProcessSpawner: commandSpawner, + cwd, + clientInfo: { name: "t3-code-git-text", version: "0.0.0" }, + }, + allocateXAiPromptFallbackId, + ); yield* runtime.handleSessionUpdate((notification) => { const update = notification.update;