diff --git a/apps/server/scripts/acp-mock-agent.ts b/apps/server/scripts/acp-mock-agent.ts index bc7828dd854..9ecdf7d58ba 100644 --- a/apps/server/scripts/acp-mock-agent.ts +++ b/apps/server/scripts/acp-mock-agent.ts @@ -20,6 +20,8 @@ const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHO const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1"; const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1"; const emitXAiPromptCompleteThenHang = process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG === "1"; +const emitXAiPromptCompleteWithoutPromptIdThenHang = + process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_WITHOUT_PROMPT_ID_THEN_HANG === "1"; const emitForeignSessionUpdates = process.env.T3_ACP_EMIT_FOREIGN_SESSION_UPDATES === "1"; const hangPromptForever = process.env.T3_ACP_HANG_PROMPT_FOREVER === "1"; const hangFirstPromptForever = process.env.T3_ACP_HANG_FIRST_PROMPT_FOREVER === "1"; @@ -30,6 +32,7 @@ const failLoadSession = process.env.T3_ACP_FAIL_LOAD_SESSION === "1"; const emitLoadReplay = process.env.T3_ACP_EMIT_LOAD_REPLAY === "1"; const hangLoadSessionAfterReplay = process.env.T3_ACP_HANG_LOAD_SESSION_AFTER_REPLAY === "1"; const delayLoadSessionAfterReplay = process.env.T3_ACP_DELAY_LOAD_SESSION_AFTER_REPLAY === "1"; +const fastLoadWithDelayedReplayTail = process.env.T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL === "1"; const loadSessionDelayMs = Number(process.env.T3_ACP_LOAD_SESSION_DELAY_MS ?? "5000"); const emitStaleXAiPromptCompleteBeforeSecondHang = process.env.T3_ACP_EMIT_STALE_XAI_PROMPT_COMPLETE_BEFORE_SECOND_HANG === "1"; @@ -362,6 +365,41 @@ const program = Effect.gen(function* () { configOptions: configOptions(), }; } + if (fastLoadWithDelayedReplayTail) { + emitLoadReplayNotifications(requestedSessionId); + yield* agent.client.sessionUpdate({ + sessionId: requestedSessionId, + update: { + sessionUpdate: "user_message_chunk", + content: { type: "text", text: "replay-head" }, + }, + }); + const response = { + modes: modeState(), + models: modelState(), + configOptions: configOptions(), + }; + void Effect.runFork( + Effect.gen(function* () { + yield* Effect.sleep("30 millis"); + writeJsonRpcNotification("session/update", { + _meta: { isReplay: true }, + sessionId: requestedSessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "delayed replay tail" }, + }, + }); + writeJsonRpcNotification("_x.ai/session/prompt_complete", { + sessionId: requestedSessionId, + promptId: "replay-stale-prompt-id", + stopReason: "end_turn", + agentResult: null, + }); + }).pipe(Effect.provide(NodeServices.layer)), + ); + return response; + } if (emitLoadReplay) { emitLoadReplayNotifications(requestedSessionId); } @@ -522,6 +560,15 @@ const program = Effect.gen(function* () { return yield* Effect.never; } + if (emitXAiPromptCompleteWithoutPromptIdThenHang) { + writeJsonRpcNotification("_x.ai/session/prompt_complete", { + sessionId: requestedSessionId, + stopReason: "end_turn", + agentResult: null, + }); + return yield* Effect.never; + } + if (emitXAiPromptCompleteThenHang) { writeJsonRpcNotification("session/update", { sessionId: requestedSessionId, diff --git a/apps/server/src/provider/Layers/GrokAdapter.ts b/apps/server/src/provider/Layers/GrokAdapter.ts index c22b2180183..d1033da724c 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.ts @@ -570,32 +570,45 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte }); const mcpSession = McpProviderSession.readMcpProviderSession(input.threadId); - const acp = yield* makeGrokAcpRuntime({ - grokSettings, - ...(options?.environment ? { environment: options.environment } : {}), - childProcessSpawner, - cwd, - ...(resumeSessionId ? { resumeSessionId } : {}), - clientInfo: { name: "t3-code", version: "0.0.0" }, - ...(mcpSession - ? { - mcpServers: [ - { - type: "http" as const, - name: "t3-code", - url: mcpSession.endpoint, - headers: [ - { - name: "Authorization", - value: mcpSession.authorizationHeader, - }, - ], - }, - ], - } - : {}), - ...acpNativeLoggers, - }).pipe( + const allocateXAiPromptFallbackId = randomUUIDv4.pipe( + Effect.map((uuid) => `t3-xai-prompt-${uuid}`), + Effect.mapError( + (cause) => + new EffectAcpErrors.AcpTransportError({ + detail: "Failed to allocate xAI prompt identifier.", + cause, + }), + ), + ); + const acp = yield* makeGrokAcpRuntime( + { + grokSettings, + ...(options?.environment ? { environment: options.environment } : {}), + childProcessSpawner, + cwd, + ...(resumeSessionId ? { resumeSessionId } : {}), + clientInfo: { name: "t3-code", version: "0.0.0" }, + ...(mcpSession + ? { + mcpServers: [ + { + type: "http" as const, + name: "t3-code", + url: mcpSession.endpoint, + headers: [ + { + name: "Authorization", + value: mcpSession.authorizationHeader, + }, + ], + }, + ], + } + : {}), + ...acpNativeLoggers, + }, + allocateXAiPromptFallbackId, + ).pipe( Effect.provideService(Scope.Scope, sessionScope), Effect.mapError( (cause) => diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index 2b4cc103103..93d91e560e4 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -1293,6 +1293,19 @@ routing.layer("ProviderServiceLive routing", (it) => { attachments: [], }); + yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild); + yield* advanceTestClock(50); + + routing.codex.emit({ + type: "turn.started", + eventId: asEventId("evt-runtime-status-start"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId: asTurnId(`turn-${String(session.threadId)}`), + }); + yield* advanceTestClock(50); + const runningRuntime = yield* runtimeRepository.getByThreadId({ threadId: session.threadId, }); @@ -1314,7 +1327,7 @@ routing.layer("ProviderServiceLive routing", (it) => { assert.equal(runtimePayload.model, null); assert.equal(runtimePayload.activeTurnId, `turn-${String(session.threadId)}`); assert.equal(runtimePayload.lastError, null); - assert.equal(runtimePayload.lastRuntimeEvent, "provider.sendTurn"); + assert.equal(runtimePayload.lastRuntimeEvent, "provider.turn.started"); } } }), @@ -1573,6 +1586,137 @@ fanout.layer("ProviderServiceLive fanout", (it) => { }), ); + it.effect("clears persisted activeTurnId when adapter emits turn.completed", () => + Effect.gen(function* () { + const provider = yield* ProviderService.ProviderService; + const runtimeRepository = yield* ProviderSessionRuntime.ProviderSessionRuntimeRepository; + const session = yield* provider.startSession(asThreadId("thread-runtime-turn-complete"), { + provider: ProviderDriverKind.make("codex"), + providerInstanceId: codexInstanceId, + threadId: asThreadId("thread-runtime-turn-complete"), + runtimeMode: "full-access", + }); + yield* provider.sendTurn({ + threadId: session.threadId, + input: "hello", + attachments: [], + }); + + yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild); + yield* advanceTestClock(50); + + fanout.codex.emit({ + type: "turn.started", + eventId: asEventId("evt-runtime-turn-start"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId: asTurnId(`turn-${String(session.threadId)}`), + }); + yield* advanceTestClock(50); + + const runningRuntime = yield* runtimeRepository.getByThreadId({ + threadId: session.threadId, + }); + assert.equal(Option.isSome(runningRuntime), true); + if (Option.isSome(runningRuntime)) { + const payload = runningRuntime.value.runtimePayload; + assert.equal(payload !== null && typeof payload === "object", true); + if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) { + assert.equal( + (payload as { activeTurnId: string | null }).activeTurnId, + `turn-${String(session.threadId)}`, + ); + } + } + + fanout.codex.emit({ + type: "turn.completed", + eventId: asEventId("evt-runtime-turn-complete"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId: asTurnId(`turn-${String(session.threadId)}`), + status: "completed", + }); + yield* advanceTestClock(50); + + const settledRuntime = yield* runtimeRepository.getByThreadId({ + threadId: session.threadId, + }); + assert.equal(Option.isSome(settledRuntime), true); + if (Option.isSome(settledRuntime)) { + const payload = settledRuntime.value.runtimePayload; + assert.equal(payload !== null && typeof payload === "object", true); + if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) { + const runtimePayload = payload as { + activeTurnId: string | null; + lastRuntimeEvent: string | null; + }; + assert.equal(runtimePayload.activeTurnId, null); + assert.equal(runtimePayload.lastRuntimeEvent, "provider.turn.completed"); + assert.equal(settledRuntime.value.status, "running"); + } + } + }), + ); + + it.effect("does not resurrect activeTurnId after sendTurn returns following turn.completed", () => + Effect.gen(function* () { + const provider = yield* ProviderService.ProviderService; + const runtimeRepository = yield* ProviderSessionRuntime.ProviderSessionRuntimeRepository; + const session = yield* provider.startSession(asThreadId("thread-runtime-sendturn-settle"), { + provider: ProviderDriverKind.make("codex"), + providerInstanceId: codexInstanceId, + threadId: asThreadId("thread-runtime-sendturn-settle"), + runtimeMode: "full-access", + }); + const turnId = asTurnId(`turn-${String(session.threadId)}`); + + yield* Stream.runForEach(provider.streamEvents, () => Effect.void).pipe(Effect.forkChild); + yield* advanceTestClock(50); + + fanout.codex.emit({ + type: "turn.started", + eventId: asEventId("evt-runtime-sendturn-start"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId, + }); + fanout.codex.emit({ + type: "turn.completed", + eventId: asEventId("evt-runtime-sendturn-complete"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:00.000Z", + threadId: session.threadId, + turnId, + status: "completed", + }); + yield* advanceTestClock(50); + + yield* provider.sendTurn({ + threadId: session.threadId, + input: "hello", + attachments: [], + }); + yield* advanceTestClock(50); + + const settledRuntime = yield* runtimeRepository.getByThreadId({ + threadId: session.threadId, + }); + assert.equal(Option.isSome(settledRuntime), true); + if (Option.isSome(settledRuntime)) { + const payload = settledRuntime.value.runtimePayload; + assert.equal(payload !== null && typeof payload === "object", true); + if (payload !== null && typeof payload === "object" && !Array.isArray(payload)) { + assert.equal((payload as { activeTurnId: string | null }).activeTurnId, null); + assert.equal(settledRuntime.value.status, "running"); + } + } + }), + ); + it.effect("fans out canonical runtime events in emission order", () => Effect.gen(function* () { const provider = yield* ProviderService.ProviderService; diff --git a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts index 4e9700dab7d..7422395fc83 100644 --- a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts +++ b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts @@ -526,6 +526,7 @@ describe("AcpSessionRuntime", () => { ), Effect.scoped, Effect.provide(NodeServices.layer), + TestClock.withLive, ), ); @@ -568,6 +569,44 @@ describe("AcpSessionRuntime", () => { ), ); + it.effect("waits for delayed session/load replay tail after the load RPC returns", () => + Effect.gen(function* () { + const runtime = yield* AcpSessionRuntime.AcpSessionRuntime; + const started = yield* runtime.start().pipe(Effect.timeout("2 seconds")); + + expect(started.sessionId).toBe("mock-session-1"); + expect(started.sessionSetupResult._meta).not.toMatchObject({ + t3SessionLoadReady: "replay_idle", + }); + + const unexpectedReplayEvent = yield* Stream.runHead(runtime.getEvents()).pipe( + Effect.timeoutOption("100 millis"), + ); + expect(Option.isNone(unexpectedReplayEvent)).toBe(true); + }).pipe( + Effect.provide( + AcpSessionRuntime.layer({ + authMethodId: "test", + spawn: { + command: mockAgentCommand, + args: mockAgentArgs, + env: { + T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL: "1", + }, + }, + cwd: process.cwd(), + resumeSessionId: "mock-session-1", + sessionLoadReplayIdleGap: "50 millis", + sessionLoadTimeout: "2 seconds", + clientInfo: { name: "t3-test", version: "0.0.0" }, + }), + ), + Effect.scoped, + Effect.provide(NodeServices.layer), + TestClock.withLive, + ), + ); + it.effect("rejects invalid config option values before sending session/set_config_option", () => { const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "acp-runtime-")); const requestLogPath = NodePath.join(tempDir, "requests.ndjson"); diff --git a/apps/server/src/provider/acp/GrokAcpSupport.ts b/apps/server/src/provider/acp/GrokAcpSupport.ts index 3e6d63a5393..da5ad980754 100644 --- a/apps/server/src/provider/acp/GrokAcpSupport.ts +++ b/apps/server/src/provider/acp/GrokAcpSupport.ts @@ -50,8 +50,20 @@ function resolveGrokAuthMethodId(environment: NodeJS.ProcessEnv | undefined): st : GROK_AUTH_METHOD_CACHED_TOKEN; } +export const sequentialXAiPromptFallbackIdAllocator = (): Effect.Effect => { + let next = 0; + return Effect.sync(() => { + next += 1; + return `t3-xai-prompt-${next}`; + }); +}; + export const makeGrokAcpRuntime = ( input: GrokAcpRuntimeInput, + allocatePromptFallbackId: Effect.Effect< + string, + EffectAcpErrors.AcpError + > = sequentialXAiPromptFallbackIdAllocator(), ): Effect.Effect< AcpSessionRuntime.AcpSessionRuntime["Service"], EffectAcpErrors.AcpError, @@ -72,7 +84,7 @@ export const makeGrokAcpRuntime = ( const runtime = yield* Effect.service(AcpSessionRuntime.AcpSessionRuntime).pipe( Effect.provide(acpContext), ); - return yield* makeXAiPromptCompletionRuntime(runtime); + return yield* makeXAiPromptCompletionRuntime(runtime, allocatePromptFallbackId); }); export function resolveGrokAcpBaseModelId(model: string | null | undefined): string { diff --git a/apps/server/src/provider/acp/XAiAcpExtension.test.ts b/apps/server/src/provider/acp/XAiAcpExtension.test.ts index c435269fd76..e2bf8a56bbe 100644 --- a/apps/server/src/provider/acp/XAiAcpExtension.test.ts +++ b/apps/server/src/provider/acp/XAiAcpExtension.test.ts @@ -4,8 +4,12 @@ import * as NodeURL from "node:url"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { it } from "@effect/vitest"; +import * as Crypto from "effect/Crypto"; +import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; +import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; +import * as TestClock from "effect/testing/TestClock"; import { describe, expect } from "vite-plus/test"; import { @@ -13,15 +17,18 @@ import { makeXAiAskUserQuestionCancelledResponse, makeXAiAskUserQuestionResponse, makeXAiPromptCompletionRuntime, + resolveXAiPromptCompletionFallback, XAiAskUserQuestionRequest, } from "./XAiAcpExtension.ts"; import * as AcpSessionRuntime from "./AcpSessionRuntime.ts"; +import * as EffectAcpErrors from "effect-acp/errors"; const __dirname = NodePath.dirname(NodeURL.fileURLToPath(import.meta.url)); const mockAgentPath = NodePath.join(__dirname, "../../../scripts/acp-mock-agent.ts"); -const makePromptCompletionRuntime = (env: NodeJS.ProcessEnv) => +const makePromptCompletionRuntime = (env: NodeJS.ProcessEnv, resumeSessionId?: string) => Effect.gen(function* () { + const crypto = yield* Crypto.Crypto; const runtime = yield* AcpSessionRuntime.make({ spawn: { command: process.execPath, @@ -31,8 +38,21 @@ const makePromptCompletionRuntime = (env: NodeJS.ProcessEnv) => cwd: process.cwd(), clientInfo: { name: "t3-test", version: "0.0.0" }, authMethodId: "test", + ...(resumeSessionId ? { resumeSessionId } : {}), + sessionLoadReplayIdleGap: "50 millis", + sessionLoadTimeout: "2 seconds", }); - return yield* makeXAiPromptCompletionRuntime(runtime); + const allocatePromptFallbackId = crypto.randomUUIDv4.pipe( + Effect.map((uuid) => `t3-xai-prompt-${uuid}`), + Effect.mapError( + (cause) => + new EffectAcpErrors.AcpTransportError({ + detail: "Failed to allocate xAI prompt identifier.", + cause, + }), + ), + ); + return yield* makeXAiPromptCompletionRuntime(runtime, allocatePromptFallbackId); }); const decodeXAiAskUserQuestionRequest = Schema.decodeUnknownSync(XAiAskUserQuestionRequest); @@ -296,7 +316,175 @@ describe("XAiAcpExtension", () => { requestId: promptId, }, }); - }).pipe(Effect.scoped, Effect.provide(NodeServices.layer)), + }).pipe(Effect.scoped, Effect.provide(NodeServices.layer), TestClock.withLive), + ); + + it.effect("ignores xAI prompt completion notifications during session/load replay", () => + Effect.gen(function* () { + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + deferred: yield* Deferred.make(), + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }, + isSessionLoadReplayActive: Effect.succeed(true), + }); + + const pending = yield* Ref.get(pendingRef); + expect(pending).toHaveLength(1); + expect(pending[0]?.promptId).toBe("t3-xai-prompt-1"); + }), + ); + + it.effect("completes a prompt after session/load with a delayed replay tail", () => + Effect.gen(function* () { + const runtime = yield* makePromptCompletionRuntime( + { + T3_ACP_FAST_LOAD_WITH_DELAYED_REPLAY_TAIL: "1", + T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG: "1", + }, + "mock-session-1", + ); + yield* runtime.start(); + + const promptResult = yield* runtime.prompt({ + prompt: [{ type: "text", text: "after resume" }], + }); + const promptId = promptResult._meta?.promptId; + + expect(typeof promptId).toBe("string"); + expect(promptResult).toMatchObject({ + stopReason: "end_turn", + _meta: { + sessionId: "mock-session-1", + promptId, + requestId: promptId, + }, + }); + }).pipe(Effect.scoped, Effect.provide(NodeServices.layer), TestClock.withLive), + ); + + it.effect("ignores xAI prompt completion notifications without a prompt id", () => + Effect.gen(function* () { + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + deferred: yield* Deferred.make(), + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + stopReason: "end_turn", + agentResult: null, + }, + }); + + const pending = yield* Ref.get(pendingRef); + expect(pending).toHaveLength(1); + expect(pending[0]?.promptId).toBe("t3-xai-prompt-1"); + }), + ); + + it.effect("settles deferred after grace without blocking the notification handler", () => + Effect.gen(function* () { + const deferred = yield* Deferred.make(); + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + deferred, + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }, + }); + + expect(yield* Deferred.isDone(deferred)).toBe(false); + yield* Effect.sleep("600 millis"); + expect(yield* Deferred.isDone(deferred)).toBe(true); + }).pipe(TestClock.withLive), + ); + + it.effect( + "ignores replayed prompt completion when prompt id does not match the in-flight prompt", + () => + Effect.gen(function* () { + const deferred = yield* Deferred.make(); + const pendingRef = yield* Ref.make< + ReadonlyArray<{ + readonly sessionId: string; + readonly promptId: string; + readonly deferred: Deferred.Deferred; + }> + >([ + { + sessionId: "session-1", + promptId: "t3-xai-prompt-fresh-uuid", + deferred, + }, + ]); + const completedPromptIdsRef = yield* Ref.make>([]); + + yield* resolveXAiPromptCompletionFallback({ + pendingRef, + completedPromptIdsRef, + notification: { + sessionId: "session-1", + promptId: "t3-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }, + }); + + const pending = yield* Ref.get(pendingRef); + expect(pending).toHaveLength(1); + expect(yield* Deferred.isDone(deferred)).toBe(false); + }), ); it.effect("ignores stale xAI completion from an already settled prompt", () => @@ -327,6 +515,6 @@ describe("XAiAcpExtension", () => { requestId: secondPromptId, }, }); - }).pipe(Effect.scoped, Effect.provide(NodeServices.layer)), + }).pipe(Effect.scoped, Effect.provide(NodeServices.layer), TestClock.withLive), ); }); diff --git a/apps/server/src/provider/acp/XAiAcpExtension.ts b/apps/server/src/provider/acp/XAiAcpExtension.ts index d36a5fcfc89..b99e64a65c4 100644 --- a/apps/server/src/provider/acp/XAiAcpExtension.ts +++ b/apps/server/src/provider/acp/XAiAcpExtension.ts @@ -1,9 +1,11 @@ import type { ProviderUserInputAnswers, UserInputQuestion } from "@t3tools/contracts"; import * as Deferred from "effect/Deferred"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; import type * as EffectAcpSchema from "effect-acp/schema"; +import * as EffectAcpErrors from "effect-acp/errors"; import type * as AcpSessionRuntime from "./AcpSessionRuntime.ts"; @@ -24,6 +26,7 @@ interface PendingXAiPromptCompletion { const completedXAiPromptIdLimit = 128; const xAiStopReasonMissingMetaKey = "xAiStopReasonMissing"; +export const xAiPromptCompletionSettleDelay = Duration.millis(500); const XAiAskUserQuestionOption = Schema.Struct({ label: Schema.String, @@ -201,15 +204,13 @@ export function makeXAiAskUserQuestionCancelledResponse(): XAiAskUserQuestionCan * The underlying runtime remains unaware of xAI methods and metadata. */ export const makeXAiPromptCompletionRuntime = Effect.fn("makeXAiPromptCompletionRuntime")( - function* (runtime: AcpSessionRuntime.AcpSessionRuntime["Service"]) { + function* ( + runtime: AcpSessionRuntime.AcpSessionRuntime["Service"], + allocatePromptFallbackId: Effect.Effect, + ) { const activeSessionIdRef = yield* Ref.make(undefined); const pendingRef = yield* Ref.make>([]); const completedPromptIdsRef = yield* Ref.make>([]); - let nextPromptFallbackId = 0; - const allocatePromptFallbackId = Effect.sync(() => { - nextPromptFallbackId += 1; - return `t3-xai-prompt-${nextPromptFallbackId}`; - }); yield* runtime.handleExtNotification( "_x.ai/session/prompt_complete", @@ -219,6 +220,8 @@ export const makeXAiPromptCompletionRuntime = Effect.fn("makeXAiPromptCompletion pendingRef, completedPromptIdsRef, notification, + isSessionLoadReplayActive: runtime.isSessionLoadReplayActive, + touchSessionLoadReplayActivity: runtime.touchSessionLoadReplayActivity, }), ); @@ -324,45 +327,74 @@ const abortPendingPromptCompletions = ( ] as const; }).pipe(Effect.flatten); -const resolveXAiPromptCompletionFallback = ({ +export const resolveXAiPromptCompletionFallback = ({ pendingRef, completedPromptIdsRef, notification, + isSessionLoadReplayActive, + touchSessionLoadReplayActivity, }: { readonly pendingRef: Ref.Ref>; readonly completedPromptIdsRef: Ref.Ref>; readonly notification: XAiPromptCompleteNotification; + readonly isSessionLoadReplayActive?: Effect.Effect; + readonly touchSessionLoadReplayActivity?: Effect.Effect; }) => - Ref.get(completedPromptIdsRef).pipe( - Effect.flatMap((completedPromptIds) => { - if ( - notification.promptId !== undefined && - completedPromptIds.includes(notification.promptId) - ) { - return Effect.void; - } - return Ref.modify(pendingRef, (pending) => { - const index = - notification.promptId !== undefined - ? pending.findIndex( - (entry) => - entry.sessionId === notification.sessionId && - entry.promptId === notification.promptId, - ) - : pending.findIndex((entry) => entry.sessionId === notification.sessionId); - if (index < 0) { - return [Effect.void, pending] as const; - } - const entry = pending[index]; - if (!entry) { - return [Effect.void, pending] as const; - } - return [ - Deferred.succeed(entry.deferred, promptResponseFromXAi(notification)).pipe(Effect.asVoid), - [...pending.slice(0, index), ...pending.slice(index + 1)], - ] as const; - }).pipe(Effect.flatten); - }), + (isSessionLoadReplayActive ?? Effect.succeed(false)).pipe( + Effect.flatMap((loadReplayActive) => + loadReplayActive + ? (touchSessionLoadReplayActivity ?? Effect.void) + : Ref.get(completedPromptIdsRef).pipe( + Effect.flatMap((completedPromptIds) => { + if ( + notification.promptId !== undefined && + completedPromptIds.includes(notification.promptId) + ) { + return Effect.void; + } + // Require an explicit prompt id. Session-id-only matching would let a + // replayed or stale prompt_complete from session/load settle the wrong + // in-flight prompt and interrupt the real session/prompt RPC. + if (notification.promptId === undefined) { + return Effect.void; + } + return Ref.modify(pendingRef, (pending) => { + const index = pending.findIndex( + (entry) => + entry.sessionId === notification.sessionId && + entry.promptId === notification.promptId, + ); + if (index < 0) { + return [Effect.void, pending] as const; + } + const entry = pending[index]; + if (!entry) { + return [Effect.void, pending] as const; + } + return [ + Effect.gen(function* () { + yield* Effect.sleep(xAiPromptCompletionSettleDelay); + yield* Ref.modify(pendingRef, (current) => { + const stillIndex = current.findIndex( + (candidate) => candidate.deferred === entry.deferred, + ); + if (stillIndex < 0) { + return [Effect.void, current] as const; + } + return [ + Deferred.succeed(entry.deferred, promptResponseFromXAi(notification)).pipe( + Effect.asVoid, + ), + [...current.slice(0, stillIndex), ...current.slice(stillIndex + 1)], + ] as const; + }).pipe(Effect.flatten); + }).pipe(Effect.forkDetach, Effect.asVoid), + pending, + ] as const; + }).pipe(Effect.flatten); + }), + ), + ), ); const rememberCompletedXAiPromptId = ( diff --git a/apps/server/src/textGeneration/GrokTextGeneration.ts b/apps/server/src/textGeneration/GrokTextGeneration.ts index 1bb58216305..b2a81a3e402 100644 --- a/apps/server/src/textGeneration/GrokTextGeneration.ts +++ b/apps/server/src/textGeneration/GrokTextGeneration.ts @@ -1,9 +1,10 @@ import * as Effect from "effect/Effect"; +import * as Crypto from "effect/Crypto"; import * as Option from "effect/Option"; import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; import { ChildProcessSpawner } from "effect/unstable/process"; -import type * as EffectAcpErrors from "effect-acp/errors"; +import * as EffectAcpErrors from "effect-acp/errors"; import { type GrokSettings, type ModelSelection } from "@t3tools/contracts"; import { sanitizeBranchFragment, sanitizeFeatureBranchName } from "@t3tools/shared/git"; @@ -38,6 +39,17 @@ export const makeGrokTextGeneration = Effect.fn("makeGrokTextGeneration")(functi environment: NodeJS.ProcessEnv = process.env, ) { const commandSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const crypto = yield* Crypto.Crypto; + const allocateXAiPromptFallbackId = crypto.randomUUIDv4.pipe( + Effect.map((uuid) => `t3-xai-prompt-${uuid}`), + Effect.mapError( + (cause) => + new EffectAcpErrors.AcpTransportError({ + detail: "Failed to allocate xAI prompt identifier.", + cause, + }), + ), + ); const runGrokJson = ({ operation, @@ -59,13 +71,16 @@ export const makeGrokTextGeneration = Effect.fn("makeGrokTextGeneration")(functi Effect.gen(function* () { const resolvedModel = resolveGrokAcpBaseModelId(modelSelection.model); const outputRef = yield* Ref.make(""); - const runtime = yield* makeGrokAcpRuntime({ - grokSettings, - environment, - childProcessSpawner: commandSpawner, - cwd, - clientInfo: { name: "t3-code-git-text", version: "0.0.0" }, - }); + const runtime = yield* makeGrokAcpRuntime( + { + grokSettings, + environment, + childProcessSpawner: commandSpawner, + cwd, + clientInfo: { name: "t3-code-git-text", version: "0.0.0" }, + }, + allocateXAiPromptFallbackId, + ); yield* runtime.handleSessionUpdate((notification) => { const update = notification.update;