diff --git a/apps/server/package.json b/apps/server/package.json index 387c880a9ab..b8e6f482a28 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -23,7 +23,6 @@ "test": "vitest run" }, "dependencies": { - "@anthropic-ai/claude-agent-sdk": "^0.2.111", "@effect/platform-bun": "catalog:", "@effect/platform-node": "catalog:", "@effect/platform-node-shared": "catalog:", @@ -34,6 +33,7 @@ "node-pty": "^1.1.0" }, "devDependencies": { + "@anthropic-ai/claude-agent-sdk": "^0.2.111", "@effect/language-service": "catalog:", "@effect/vitest": "catalog:", "@t3tools/contracts": "workspace:*", diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index e9bd8fb7a16..cf97b0a87f0 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -6,18 +6,17 @@ * * @module ClaudeAdapterLive */ -import { - type CanUseTool, - query, - type Options as ClaudeQueryOptions, - type PermissionMode, - type PermissionResult, - type PermissionUpdate, - type SDKMessage, - type SDKResultMessage, - type SettingSource, - type SDKUserMessage, - type ModelUsage, +import type { + CanUseTool, + Options as ClaudeQueryOptions, + PermissionMode, + PermissionResult, + PermissionUpdate, + SDKMessage, + SDKResultMessage, + SettingSource, + SDKUserMessage, + ModelUsage, } from "@anthropic-ai/claude-agent-sdk"; import { parseCliArgs } from "@t3tools/shared/cliArgs"; import { @@ -67,6 +66,7 @@ import * as Stream from "effect/Stream"; import { resolveAttachmentPath } from "../../attachmentStore.ts"; import { ServerConfig } from "../../config.ts"; +import { makeClaudeCliQuery } from "./ClaudeCliTransport.ts"; import { makeClaudeEnvironment } from "../Drivers/ClaudeHome.ts"; import { getClaudeModelCapabilities, @@ -1015,7 +1015,7 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( readonly prompt: AsyncIterable; readonly options: ClaudeQueryOptions; }) => - query({ + makeClaudeCliQuery({ prompt: input.prompt, options: input.options, }) as ClaudeQueryRuntime); diff --git a/apps/server/src/provider/Layers/ClaudeCliTransport.test.ts b/apps/server/src/provider/Layers/ClaudeCliTransport.test.ts new file mode 100644 index 00000000000..8b7ed16e227 --- /dev/null +++ b/apps/server/src/provider/Layers/ClaudeCliTransport.test.ts @@ -0,0 +1,221 @@ +// @effect-diagnostics nodeBuiltinImport:off +import { mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { assert, describe, it } from "@effect/vitest"; + +import { makeClaudeCliQuery } from "./ClaudeCliTransport.ts"; + +/** + * A fake `claude` binary (Node script) that speaks the stream-json control + * protocol just enough to exercise the transport end to end: + * - answers the `initialize` control request with account + commands, + * - on a user message, emits one assistant message then a result, + * - answers `interrupt`, + * - when FAKE_EMIT_PERMISSION=1, sends a `can_use_tool` control request and + * echoes the decision back as a system message. + */ +const FAKE_CLI = ` +let buffer = ""; +function send(obj) { process.stdout.write(JSON.stringify(obj) + "\\n"); } + +process.stdin.setEncoding("utf8"); +process.stdin.on("data", (chunk) => { + buffer += chunk; + let i; + while ((i = buffer.indexOf("\\n")) !== -1) { + const line = buffer.slice(0, i).trim(); + buffer = buffer.slice(i + 1); + if (!line) continue; + let msg; + try { msg = JSON.parse(line); } catch { continue; } + handle(msg); + } +}); + +function handle(msg) { + if (msg.type === "control_request" && msg.request?.subtype === "initialize") { + send({ + type: "control_response", + response: { + request_id: msg.request_id, + subtype: "success", + response: { + account: { email: "user@example.com", subscriptionType: "claude_pro_subscription", tokenSource: "oauth" }, + commands: [{ name: "compact", description: "Compact" }], + }, + }, + }); + if (process.env.FAKE_EMIT_PERMISSION === "1") { + send({ + type: "control_request", + request_id: "perm-1", + request: { subtype: "can_use_tool", tool_name: "Bash", input: { command: "ls" } }, + }); + } + return; + } + if (msg.type === "control_request" && msg.request?.subtype === "interrupt") { + send({ type: "control_response", response: { request_id: msg.request_id, subtype: "success", response: {} } }); + return; + } + if (msg.type === "control_response") { + // Our answer to the can_use_tool request — echo the decision out. + send({ type: "system", subtype: "permission_decision", decision: msg.response?.response }); + send({ type: "result", subtype: "success", is_error: false, session_id: "s1", uuid: "r-perm" }); + return; + } + if (msg.type === "user") { + send({ type: "system", subtype: "init", session_id: "s1" }); + send({ type: "assistant", message: { id: "a1", role: "assistant", content: [{ type: "text", text: "hi" }] }, session_id: "s1" }); + send({ type: "result", subtype: "success", is_error: false, session_id: "s1", uuid: "r1" }); + return; + } +} +`; + +function makeFakeCli(): { dir: string; cliPath: string; cleanup: () => void } { + const dir = mkdtempSync(path.join(os.tmpdir(), "claude-cli-transport-")); + const cliPath = path.join(dir, "fake-cli.mjs"); + writeFileSync(cliPath, FAKE_CLI, "utf8"); + return { dir, cliPath, cleanup: () => rmSync(dir, { recursive: true, force: true }) }; +} + +async function* once(message: unknown): AsyncGenerator { + yield message; +} + +async function* never(signal: AbortSignal): AsyncGenerator { + await new Promise((resolve) => { + if (signal.aborted) resolve(); + else signal.addEventListener("abort", () => resolve(), { once: true }); + }); +} + +describe("ClaudeCliTransport", () => { + it("streams assistant + result messages from a user prompt", async () => { + const fake = makeFakeCli(); + try { + const query = makeClaudeCliQuery({ + prompt: once({ + type: "user", + session_id: "", + message: { role: "user", content: [{ type: "text", text: "hello" }] }, + parent_tool_use_id: null, + }) as AsyncIterable, + options: { pathToClaudeCodeExecutable: fake.cliPath } as never, + }); + + const types: string[] = []; + for await (const message of query) { + types.push((message as { type: string }).type); + if ((message as { type: string }).type === "result") break; + } + query.close(); + + assert.deepEqual(types, ["system", "assistant", "result"]); + } finally { + fake.cleanup(); + } + }); + + it("resolves initialize() with the account + commands payload", async () => { + const fake = makeFakeCli(); + const abort = new AbortController(); + try { + const query = makeClaudeCliQuery({ + prompt: never(abort.signal) as AsyncIterable, + options: { pathToClaudeCodeExecutable: fake.cliPath } as never, + }); + const init = await query.initialize(); + abort.abort(); + query.close(); + + assert.equal(init.account?.email, "user@example.com"); + assert.equal(init.account?.subscriptionType, "claude_pro_subscription"); + assert.equal(init.account?.tokenSource, "oauth"); + assert.deepEqual(init.commands, [{ name: "compact", description: "Compact" }]); + } finally { + fake.cleanup(); + } + }); + + it("round-trips an interrupt control request", async () => { + const fake = makeFakeCli(); + const abort = new AbortController(); + try { + const query = makeClaudeCliQuery({ + prompt: never(abort.signal) as AsyncIterable, + options: { pathToClaudeCodeExecutable: fake.cliPath } as never, + }); + await query.initialize(); + // Resolves only if the fake answered the control request by request_id. + await query.interrupt(); + abort.abort(); + query.close(); + assert.ok(true); + } finally { + fake.cleanup(); + } + }); + + it("bridges inbound can_use_tool to the canUseTool callback", async () => { + const fake = makeFakeCli(); + const abort = new AbortController(); + process.env.FAKE_EMIT_PERMISSION = "1"; + try { + let sawToolName: string | undefined; + const query = makeClaudeCliQuery({ + prompt: never(abort.signal) as AsyncIterable, + options: { + pathToClaudeCodeExecutable: fake.cliPath, + canUseTool: async (toolName: string) => { + sawToolName = toolName; + return { behavior: "allow", updatedInput: { command: "ls" } }; + }, + } as never, + }); + + let decision: unknown; + for await (const message of query) { + const m = message as { type: string; subtype?: string; decision?: unknown }; + if (m.type === "system" && m.subtype === "permission_decision") decision = m.decision; + if (m.type === "result") break; + } + abort.abort(); + query.close(); + + assert.equal(sawToolName, "Bash"); + assert.deepEqual(decision, { behavior: "allow", updatedInput: { command: "ls" } }); + } finally { + delete process.env.FAKE_EMIT_PERMISSION; + fake.cleanup(); + } + }); + + it("ends the iterator cleanly when the process exits", async () => { + const fake = makeFakeCli(); + try { + const query = makeClaudeCliQuery({ + prompt: once({ + type: "user", + session_id: "", + message: { role: "user", content: [{ type: "text", text: "hi" }] }, + parent_tool_use_id: null, + }) as AsyncIterable, + options: { pathToClaudeCodeExecutable: fake.cliPath } as never, + }); + + let count = 0; + for await (const _message of query) { + count += 1; + if (count > 10) break; + } + // Iterator returned (process exits after closing stdin) without hanging. + assert.ok(count >= 3); + } finally { + fake.cleanup(); + } + }); +}); diff --git a/apps/server/src/provider/Layers/ClaudeCliTransport.ts b/apps/server/src/provider/Layers/ClaudeCliTransport.ts new file mode 100644 index 00000000000..0c644a6b7ce --- /dev/null +++ b/apps/server/src/provider/Layers/ClaudeCliTransport.ts @@ -0,0 +1,581 @@ +// @effect-diagnostics nodeBuiltinImport:off +// @effect-diagnostics globalRandom:off +// @effect-diagnostics globalTimers:off +/** + * ClaudeCliTransport - SDK-free driver for the native `claude` binary. + * + * Replaces `@anthropic-ai/claude-agent-sdk`'s `query()` runtime with a direct + * child-process spawn that speaks the binary's `stream-json` IPC protocol. + * The shape it returns is intentionally identical to the SDK's query object + * (`AsyncIterable` plus `interrupt/setModel/setPermissionMode/ + * setMaxThinkingTokens/close`) so it drops straight into the existing + * `createQuery` seam in {@link module:ClaudeAdapterLive} with no downstream + * changes. + * + * Billing rationale: the SDK package is a *type-only* dependency after this + * change. No SDK runtime code executes, so usage bills against the local + * `claude` login (subscription) rather than pay-per-token API credits — as + * long as the caller does not inject ANTHROPIC_API_KEY into `options.env`. + * + * Wire protocol (reverse-engineered from sdk.mjs @ 0.2.111 — the contract we + * must match exactly): + * + * - Base flags: `--output-format stream-json --verbose --input-format + * stream-json`. With a `canUseTool` callback the SDK adds + * `--permission-prompt-tool stdio`, so permission prompts arrive as + * `control_request{subtype:can_use_tool}` on stdout and are answered with + * a `control_response` on stdin. No separate MCP server is involved. + * - stdin (NDJSON): user messages; our control requests + * `{request_id,type:"control_request",request:{subtype}}`; our permission + * answers `{type:"control_response",response:{subtype:"success", + * request_id,response:}}`. + * - stdout (NDJSON): SDKMessage objects; `control_response`; inbound + * `control_request` (can_use_tool); `control_cancel_request`; plus + * `keep_alive` / `transcript_mirror` which are ignored. + * + * @module ClaudeCliTransport + */ +import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; + +import type { + CanUseTool, + Options as ClaudeQueryOptions, + PermissionResult, + SDKMessage, + SDKUserMessage, +} from "@anthropic-ai/claude-agent-sdk"; + +/** + * The exact interface the Claude adapter expects back from `createQuery`. + * Mirrors the SDK's query object. + */ +export interface ClaudeCliQueryRuntime extends AsyncIterable { + readonly interrupt: () => Promise; + readonly setModel: (model?: string) => Promise; + readonly setPermissionMode: (mode: string) => Promise; + readonly setMaxThinkingTokens: (maxThinkingTokens: number | null) => Promise; + readonly close: () => void; + /** + * Resolves with the CLI's `initialize` control-response payload + * (`{ account, commands, models, ... }`). The handshake is sent + * automatically on startup; this returns the (memoised) result. Used by + * the capabilities/auth probe, which never sends a prompt. + */ + readonly initialize: () => Promise; +} + +export interface InitializeResult { + readonly account?: { + readonly email?: string; + readonly subscriptionType?: string; + readonly tokenSource?: string; + }; + readonly commands?: ReadonlyArray; + readonly models?: ReadonlyArray; + readonly [key: string]: unknown; +} + +export interface ClaudeCliQueryInput { + readonly prompt: AsyncIterable; + readonly options: ClaudeQueryOptions; +} + +/** Milliseconds to wait after closing stdin before escalating SIGTERM→SIGKILL. */ +const GRACEFUL_SHUTDOWN_MS = 2_000; +const SIGKILL_ESCALATION_MS = 3_000; + +/** Treat a path as a JS entrypoint (needs node/bun) if it has a JS/TS ext. */ +function isJsEntrypoint(path: string): boolean { + return [".js", ".mjs", ".cjs", ".tsx", ".ts", ".jsx"].some((ext) => path.endsWith(ext)); +} + +function defaultExecutable(): string { + return typeof (globalThis as { Bun?: unknown }).Bun !== "undefined" || + process.versions.bun !== undefined + ? "bun" + : "node"; +} + +function newRequestId(): string { + return Math.random().toString(36).substring(2, 15); +} + +/** + * Build the binary argv from `ClaudeQueryOptions`, mirroring the flag builder + * in the SDK. Only the options the Claude adapter actually sets are mapped; + * unknown extras flow through `extraArgs`. + */ +function buildArgs(options: ClaudeQueryOptions): string[] { + const args: string[] = [ + "--output-format", + "stream-json", + "--verbose", + "--input-format", + "stream-json", + ]; + + const o = options as ClaudeQueryOptions & { + readonly settings?: unknown; + readonly extraArgs?: Record; + readonly additionalDirectories?: ReadonlyArray; + }; + + if (o.effort) args.push("--effort", String(o.effort)); + if (o.model) args.push("--model", String(o.model)); + if (o.fallbackModel) args.push("--fallback-model", String(o.fallbackModel)); + + // A canUseTool callback routes permission prompts through stdio control IPC. + if (typeof o.canUseTool === "function") { + args.push("--permission-prompt-tool", "stdio"); + } + + if (o.resume) args.push("--resume", String(o.resume)); + if (o.sessionId) args.push("--session-id", String(o.sessionId)); + if (o.resumeSessionAt) args.push("--resume-session-at", String(o.resumeSessionAt)); + if (o.forkSession) args.push("--fork-session"); + if ((o as { persistSession?: boolean }).persistSession === false) { + args.push("--no-session-persistence"); + } + + const settingSources = o.settingSources; + if (settingSources !== undefined) { + args.push(`--setting-sources=${(settingSources as ReadonlyArray).join(",")}`); + } + + if (o.permissionMode) args.push("--permission-mode", String(o.permissionMode)); + if (o.allowDangerouslySkipPermissions) args.push("--allow-dangerously-skip-permissions"); + if (o.includePartialMessages) args.push("--include-partial-messages"); + + for (const dir of o.additionalDirectories ?? []) { + args.push("--add-dir", dir); + } + + // `settings` is a JSON blob the CLI accepts via --settings. + if (o.settings !== undefined) { + args.push( + "--settings", + typeof o.settings === "string" ? o.settings : JSON.stringify(o.settings), + ); + } + + // Pass-through extra flags: { "flag-name": value | null }. null = boolean flag. + for (const [flag, value] of Object.entries(o.extraArgs ?? {})) { + if (value === null) args.push(`--${flag}`); + else args.push(`--${flag}`, value); + } + + return args; +} + +/** + * Build the `initialize` control-request payload, mirroring the SDK. We don't + * use hooks / sdkMcpServers / jsonSchema, so those stay undefined; the + * systemPrompt and related fields flow through from options when present. + */ +function buildInitializePayload(options: ClaudeQueryOptions): Record { + const o = options as ClaudeQueryOptions & { + readonly systemPrompt?: unknown; + readonly appendSystemPrompt?: unknown; + readonly appendSubagentSystemPrompt?: unknown; + readonly excludeDynamicSections?: unknown; + readonly agents?: unknown; + readonly promptSuggestions?: unknown; + readonly agentProgressSummaries?: unknown; + }; + return { + subtype: "initialize", + hooks: undefined, + sdkMcpServers: undefined, + jsonSchema: undefined, + systemPrompt: typeof o.systemPrompt === "string" ? [o.systemPrompt] : o.systemPrompt, + appendSystemPrompt: o.appendSystemPrompt, + appendSubagentSystemPrompt: o.appendSubagentSystemPrompt, + excludeDynamicSections: o.excludeDynamicSections, + agents: o.agents, + promptSuggestions: o.promptSuggestions, + agentProgressSummaries: o.agentProgressSummaries, + }; +} + +interface AsyncQueue { + push: (value: T) => void; + end: () => void; + fail: (error: unknown) => void; + iterator: () => AsyncIterator; +} + +/** + * Unbounded async queue bridging push-based stdout framing to the pull-based + * `for await` the adapter consumes. Matches the buffering semantics of the + * SDK's internal message stream (and the test FakeClaudeQuery). + */ +function makeAsyncQueue(): AsyncQueue { + const buffer: T[] = []; + const waiters: Array<{ + resolve: (r: IteratorResult) => void; + reject: (e: unknown) => void; + }> = []; + let ended = false; + let failure: unknown | undefined; + + return { + push(value) { + if (ended) return; + const waiter = waiters.shift(); + if (waiter) waiter.resolve({ done: false, value }); + else buffer.push(value); + }, + end() { + if (ended) return; + ended = true; + for (const w of waiters.splice(0)) w.resolve({ done: true, value: undefined }); + }, + fail(error) { + if (ended) return; + ended = true; + failure = error; + for (const w of waiters.splice(0)) w.reject(error); + }, + iterator() { + return { + next() { + if (buffer.length > 0) { + return Promise.resolve({ done: false, value: buffer.shift() as T }); + } + if (failure !== undefined) { + const err = failure; + failure = undefined; + return Promise.reject(err); + } + if (ended) return Promise.resolve({ done: true, value: undefined }); + return new Promise>((resolve, reject) => { + waiters.push({ resolve, reject }); + }); + }, + }; + }, + }; +} + +interface ControlRequestEnvelope { + readonly type: "control_request"; + readonly request_id: string; + readonly request: { + readonly subtype: string; + readonly tool_name?: string; + readonly input?: Record; + readonly permission_suggestions?: unknown; + readonly blocked_path?: string; + readonly decision_reason?: unknown; + readonly title?: string; + }; +} + +interface ControlResponseEnvelope { + readonly type: "control_response"; + readonly response: { + readonly request_id: string; + readonly subtype: "success" | "error"; + readonly error?: string; + readonly response?: unknown; + }; +} + +/** + * Start a `claude` session over stream-json IPC and return a runtime object + * shaped like the SDK's query. + */ +export function makeClaudeCliQuery(input: ClaudeCliQueryInput): ClaudeCliQueryRuntime { + const { options } = input; + const o = options as ClaudeQueryOptions & { + readonly pathToClaudeCodeExecutable?: string; + readonly cwd?: string; + readonly env?: NodeJS.ProcessEnv; + readonly executableArgs?: ReadonlyArray; + readonly canUseTool?: CanUseTool; + }; + + const binaryPath = o.pathToClaudeCodeExecutable ?? "claude"; + const flags = buildArgs(options); + const executableArgs = [...(o.executableArgs ?? [])]; + + const isJs = isJsEntrypoint(binaryPath); + const command = isJs ? defaultExecutable() : binaryPath; + const commandArgs = isJs + ? [...executableArgs, binaryPath, ...flags] + : [...executableArgs, ...flags]; + + const env = { ...(o.env ?? process.env) }; + // Match the SDK: tag the entrypoint and never leak NODE_OPTIONS into the + // child (it can crash the bundled CLI). + if (!env.CLAUDE_CODE_ENTRYPOINT) env.CLAUDE_CODE_ENTRYPOINT = "sdk-ts"; + delete env.NODE_OPTIONS; + + const child: ChildProcessWithoutNullStreams = spawn(command, commandArgs, { + cwd: o.cwd, + env, + stdio: ["pipe", "pipe", "pipe"], + windowsHide: true, + }); + + const messages = makeAsyncQueue(); + const pendingControl = new Map< + string, + { resolve: (r: ControlResponseEnvelope["response"]) => void; reject: (e: unknown) => void } + >(); + const cancelControllers = new Map(); + let closed = false; + let exited = false; + let killTimer: NodeJS.Timeout | undefined; + + const writeLine = (obj: unknown): void => { + if (closed || exited || !child.stdin.writable) return; + try { + child.stdin.write(`${JSON.stringify(obj)}\n`); + } catch { + /* stdin closed underneath us — surfaced via the exit handler */ + } + }; + + // ── stdout NDJSON framing ──────────────────────────────────────────── + let stdoutBuffer = ""; + const handleParsed = (parsed: unknown): void => { + if (!parsed || typeof parsed !== "object") return; + const record = parsed as { type?: unknown }; + + if (record.type === "control_response") { + const env_ = parsed as ControlResponseEnvelope; + const pending = pendingControl.get(env_.response.request_id); + if (pending) { + pendingControl.delete(env_.response.request_id); + if (env_.response.subtype === "success") pending.resolve(env_.response); + else pending.reject(new Error(env_.response.error ?? "control request failed")); + } + return; + } + + if (record.type === "control_request") { + void handleInboundControlRequest(parsed as ControlRequestEnvelope); + return; + } + + if (record.type === "control_cancel_request") { + const reqId = (parsed as { request_id?: string }).request_id; + if (reqId) { + cancelControllers.get(reqId)?.abort(); + cancelControllers.delete(reqId); + } + return; + } + + if (record.type === "keep_alive" || record.type === "transcript_mirror") { + return; + } + + messages.push(parsed as SDKMessage); + }; + + child.stdout.setEncoding("utf8"); + child.stdout.on("data", (chunk: string) => { + stdoutBuffer += chunk; + let newlineIndex = stdoutBuffer.indexOf("\n"); + while (newlineIndex !== -1) { + const line = stdoutBuffer.slice(0, newlineIndex).trim(); + stdoutBuffer = stdoutBuffer.slice(newlineIndex + 1); + if (line.length > 0) { + try { + handleParsed(JSON.parse(line)); + } catch { + /* tolerate a malformed line rather than tearing down the stream */ + } + } + newlineIndex = stdoutBuffer.indexOf("\n"); + } + }); + + let stderrTail = ""; + child.stderr.setEncoding("utf8"); + child.stderr.on("data", (chunk: string) => { + stderrTail = `${stderrTail}${chunk}`.slice(-4_000); + }); + + // ── inbound can_use_tool → canUseTool callback ─────────────────────── + const handleInboundControlRequest = async (req: ControlRequestEnvelope): Promise => { + if (req.request.subtype !== "can_use_tool") { + // Unknown inbound control request: acknowledge with an error so the CLI + // does not hang waiting for a response. + writeLine({ + type: "control_response", + response: { + subtype: "error", + request_id: req.request_id, + error: `Unsupported control request subtype: ${req.request.subtype}`, + }, + }); + return; + } + + const callback = o.canUseTool; + if (typeof callback !== "function") { + writeLine({ + type: "control_response", + response: { + subtype: "error", + request_id: req.request_id, + error: "canUseTool callback is not provided.", + }, + }); + return; + } + + const controller = new AbortController(); + cancelControllers.set(req.request_id, controller); + try { + const result: PermissionResult = await callback( + req.request.tool_name ?? "", + req.request.input ?? {}, + { + signal: controller.signal, + suggestions: req.request.permission_suggestions, + blockedPath: req.request.blocked_path, + decisionReason: req.request.decision_reason, + title: req.request.title, + } as unknown as Parameters[2], + ); + writeLine({ + type: "control_response", + response: { subtype: "success", request_id: req.request_id, response: result }, + }); + } catch (error) { + writeLine({ + type: "control_response", + response: { + subtype: "error", + request_id: req.request_id, + error: error instanceof Error ? error.message : String(error), + }, + }); + } finally { + cancelControllers.delete(req.request_id); + } + }; + + // ── outbound control request (interrupt / set_* / etc.) ─────────────── + const sendControlRequest = (request: Record): Promise => { + if (closed || exited) { + return Promise.reject(new Error("Claude CLI session is closed.")); + } + const requestId = newRequestId(); + return new Promise((resolve, reject) => { + pendingControl.set(requestId, { + resolve: (r) => resolve(r.response), + reject, + }); + try { + writeLine({ request_id: requestId, type: "control_request", request }); + } catch (error) { + pendingControl.delete(requestId); + reject(error); + } + }); + }; + + // ── initialize handshake ───────────────────────────────────────────── + // Sent synchronously (before the async prompt pump's first write) so the + // CLI is configured before any user message is processed, matching the + // SDK which always runs `this.initialization = this.initialize()`. + const initialization = sendControlRequest( + buildInitializePayload(options), + ) as Promise; + // Prevent an unhandled rejection when no one awaits the handshake (the + // conversational path ignores it; only the probe consumes the result). + initialization.catch(() => {}); + + // ── prompt pump: user messages → stdin NDJSON ───────────────────────── + void (async () => { + try { + for await (const message of input.prompt) { + if (closed || exited) break; + writeLine(message); + } + } catch { + /* prompt iterable ended via interruption — normal shutdown path */ + } finally { + if (!closed && !exited && child.stdin.writable) { + try { + child.stdin.end(); + } catch { + /* already ended */ + } + } + } + })(); + + // ── lifecycle ───────────────────────────────────────────────────────── + child.on("error", (error: Error) => { + exited = true; + for (const pending of pendingControl.values()) pending.reject(error); + pendingControl.clear(); + messages.fail(error); + }); + + child.on("close", (code: number | null, signal: NodeJS.Signals | null) => { + exited = true; + if (killTimer) clearTimeout(killTimer); + for (const pending of pendingControl.values()) { + pending.reject(new Error("Claude CLI session closed.")); + } + pendingControl.clear(); + if (closed || code === 0 || code === null) { + messages.end(); + } else { + const detail = stderrTail.trim(); + messages.fail( + new Error( + `Claude CLI exited with code ${code}${signal ? ` (signal ${signal})` : ""}${ + detail ? `: ${detail}` : "" + }`, + ), + ); + } + }); + + const close = (): void => { + if (closed) return; + closed = true; + try { + if (child.stdin.writable) child.stdin.end(); + } catch { + /* already ended */ + } + killTimer = setTimeout(() => { + if (exited) return; + child.kill("SIGTERM"); + setTimeout(() => { + if (!exited) child.kill("SIGKILL"); + }, SIGKILL_ESCALATION_MS).unref?.(); + }, GRACEFUL_SHUTDOWN_MS); + killTimer.unref?.(); + }; + + return { + [Symbol.asyncIterator]: () => messages.iterator(), + interrupt: async () => { + await sendControlRequest({ subtype: "interrupt" }); + }, + setModel: async (model?: string) => { + await sendControlRequest({ subtype: "set_model", model }); + }, + setPermissionMode: async (mode: string) => { + await sendControlRequest({ subtype: "set_permission_mode", mode }); + }, + setMaxThinkingTokens: async (maxThinkingTokens: number | null) => { + await sendControlRequest({ + subtype: "set_max_thinking_tokens", + max_thinking_tokens: maxThinkingTokens, + }); + }, + initialize: () => initialization, + close, + }; +} diff --git a/apps/server/src/provider/Layers/ClaudeProvider.ts b/apps/server/src/provider/Layers/ClaudeProvider.ts index f787129af89..f99f6995378 100644 --- a/apps/server/src/provider/Layers/ClaudeProvider.ts +++ b/apps/server/src/provider/Layers/ClaudeProvider.ts @@ -19,12 +19,13 @@ import { getProviderOptionDescriptors, } from "@t3tools/shared/model"; import { compareSemverVersions } from "@t3tools/shared/semver"; -import { - query as claudeQuery, - type SlashCommand as ClaudeSlashCommand, - type SDKUserMessage, +import type { + SlashCommand as ClaudeSlashCommand, + SDKUserMessage, } from "@anthropic-ai/claude-agent-sdk"; +import { makeClaudeCliQuery } from "./ClaudeCliTransport.ts"; + import { buildBooleanOptionDescriptor, buildSelectOptionDescriptor, @@ -454,7 +455,7 @@ const probeClaudeCapabilities = ( return Effect.gen(function* () { const claudeEnvironment = yield* makeClaudeEnvironment(claudeSettings, environment); return yield* Effect.tryPromise(async () => { - const q = claudeQuery({ + const q = makeClaudeCliQuery({ // Never yield — we only need initialization data, not a conversation. // This prevents any prompt from reaching the Anthropic API. // oxlint-disable-next-line require-yield @@ -464,27 +465,25 @@ const probeClaudeCapabilities = ( options: { persistSession: false, pathToClaudeCodeExecutable: claudeSettings.binaryPath, - abortController: abort, settingSources: ["user", "project", "local"], allowedTools: [], env: claudeEnvironment, - stderr: () => {}, - }, + } as unknown as Parameters[0]["options"], }); - const init = await q.initializationResult(); - const account = init.account as - | { - readonly email?: string; - readonly subscriptionType?: string; - readonly tokenSource?: string; - } - | undefined; - return { - email: account?.email, - subscriptionType: account?.subscriptionType, - tokenSource: account?.tokenSource, - slashCommands: parseClaudeInitializationCommands(init.commands), - } satisfies ClaudeCapabilitiesProbe; + try { + const init = await q.initialize(); + const account = init.account; + return { + email: account?.email, + subscriptionType: account?.subscriptionType, + tokenSource: account?.tokenSource, + slashCommands: parseClaudeInitializationCommands( + init.commands as ReadonlyArray | undefined, + ), + } satisfies ClaudeCapabilitiesProbe; + } finally { + q.close(); + } }); }).pipe( Effect.ensuring( diff --git a/bun.lock b/bun.lock index ffc4a5922bd..769bb8c42b8 100644 --- a/bun.lock +++ b/bun.lock @@ -17,7 +17,7 @@ }, "apps/desktop": { "name": "@t3tools/desktop", - "version": "0.0.23", + "version": "0.0.24", "dependencies": { "@effect/platform-node": "catalog:", "@t3tools/contracts": "workspace:*", @@ -50,12 +50,11 @@ }, "apps/server": { "name": "t3", - "version": "0.0.23", + "version": "0.0.24", "bin": { "t3": "./dist/bin.mjs", }, "dependencies": { - "@anthropic-ai/claude-agent-sdk": "^0.2.111", "@effect/platform-bun": "catalog:", "@effect/platform-node": "catalog:", "@effect/platform-node-shared": "catalog:", @@ -66,6 +65,7 @@ "node-pty": "^1.1.0", }, "devDependencies": { + "@anthropic-ai/claude-agent-sdk": "^0.2.111", "@effect/language-service": "catalog:", "@effect/vitest": "catalog:", "@t3tools/contracts": "workspace:*", @@ -83,7 +83,7 @@ }, "apps/web": { "name": "@t3tools/web", - "version": "0.0.23", + "version": "0.0.24", "dependencies": { "@base-ui/react": "^1.4.1", "@dnd-kit/core": "^6.3.1", @@ -165,7 +165,7 @@ }, "packages/contracts": { "name": "@t3tools/contracts", - "version": "0.0.23", + "version": "0.0.24", "dependencies": { "effect": "catalog:", },