diff --git a/src/adapters/acp/acp-adapter.ts b/src/adapters/acp/acp-adapter.ts index f2de0ae..bc0cb0f 100644 --- a/src/adapters/acp/acp-adapter.ts +++ b/src/adapters/acp/acp-adapter.ts @@ -19,6 +19,7 @@ import type { import type { MessageTracer } from "../../core/messaging/message-tracer.js"; import { AcpSession } from "./acp-session.js"; import { JsonRpcCodec } from "./json-rpc.js"; +import { killProcessGroup } from "./kill-process-group.js"; import type { AcpInitializeResult, ErrorClassifier } from "./outbound-translator.js"; const PROTOCOL_VERSION = 1; @@ -56,6 +57,11 @@ export class AcpAdapter implements BackendAdapter { const child = this.spawnFn(command, args, { stdio: ["pipe", "pipe", "pipe"], cwd, + // Create a new process group so we can kill all descendant processes + // (e.g. subprocesses spawned by the ACP agent that inherit the stdout + // pipe, which would otherwise keep the pipe alive after the main + // process exits and prevent backend-disconnected detection). + detached: true, }); if (!child.stdin || !child.stdout) { @@ -79,14 +85,15 @@ export class AcpAdapter implements BackendAdapter { }); child.stdin.write(codec.encode(initReq)); - const initResult = await waitForResponse( - child.stdout, - codec, - initId, - tracer, - options.sessionId, - initializeTimeoutMs, - ); + const { result: initResult, leftover: initLeftover } = + await waitForResponse( + child.stdout, + codec, + initId, + tracer, + options.sessionId, + initializeTimeoutMs, + ); // Create or resume session const sessionMethod = options.resume ? "session/load" : "session/new"; @@ -102,22 +109,32 @@ export class AcpAdapter implements BackendAdapter { }); child.stdin.write(codec.encode(sessionReq)); - const sessionResult = await waitForResponse<{ sessionId: string }>( + const { result: sessionResult, leftover: sessionLeftover } = await waitForResponse<{ + sessionId: string; + }>( child.stdout, codec, sessionReqId, tracer, options.sessionId, initializeTimeoutMs, + initLeftover, ); const sessionId = sessionResult.sessionId ?? options.sessionId; - return new AcpSession(sessionId, child, codec, initResult, tracer, this.errorClassifier); + return new AcpSession( + sessionId, + child, + codec, + initResult, + tracer, + this.errorClassifier, + sessionLeftover, + ); } catch (err) { - // Kill the child process to prevent zombies when handshake fails or times out - child.kill("SIGTERM"); - const killTimer = setTimeout(() => child.kill("SIGKILL"), killGracePeriodMs); + killProcessGroup(child, "SIGTERM"); + const killTimer = setTimeout(() => killProcessGroup(child, "SIGKILL"), killGracePeriodMs); child.once("exit", () => clearTimeout(killTimer)); killTimer.unref(); throw err; @@ -125,7 +142,17 @@ export class AcpAdapter implements BackendAdapter { } } -/** Read lines from stdout until we get a JSON-RPC response matching the given ID. */ +/** + * Read lines from stdout until we get a JSON-RPC response matching the given ID. + * + * Returns the decoded result and any raw data that arrived after the matched + * response in the same read chunk (leftover), so the caller can replay it + * rather than silently losing messages (e.g. a session/request_permission that + * Gemini sends in the same chunk as the session/new response). + * + * @param initialBuffer - Optional pre-existing data to process before listening + * for new chunks (used to pass leftover from a prior waitForResponse call). + */ async function waitForResponse( stdout: NodeJS.ReadableStream, codec: JsonRpcCodec, @@ -133,19 +160,27 @@ async function waitForResponse( tracer?: MessageTracer, sessionId?: string, timeoutMs?: number, -): Promise { - return new Promise((resolve, reject) => { - let buffer = ""; + initialBuffer?: string, +): Promise<{ result: T; leftover: string }> { + return new Promise<{ result: T; leftover: string }>((resolve, reject) => { + let buffer = initialBuffer ?? ""; + let settled = false; let timer: ReturnType | undefined; - const onData = (chunk: Buffer) => { - buffer += chunk.toString("utf-8"); + const settle = (fn: () => void) => { + if (settled) return; + settled = true; + cleanup(); + fn(); + }; + const processBuffer = () => { const lines = buffer.split("\n"); // Keep the last incomplete line in the buffer buffer = lines.pop() ?? ""; - for (const line of lines) { + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; if (!line.trim()) continue; try { @@ -155,11 +190,14 @@ async function waitForResponse( phase: "handshake_recv", }); if ("id" in msg && msg.id === expectedId) { - cleanup(); + // Collect remaining lines + incomplete buffer as leftover for the next stage + const remaining = lines.slice(i + 1); + const leftover = remaining.length > 0 ? `${remaining.join("\n")}\n${buffer}` : buffer; if ("error" in msg && msg.error) { - reject(new Error(`ACP error: ${msg.error.message}`)); + const errorMsg = msg.error.message; + settle(() => reject(new Error(`ACP error: ${errorMsg}`))); } else { - resolve((msg as { result: T }).result); + settle(() => resolve({ result: (msg as { result: T }).result, leftover })); } return; } @@ -169,14 +207,17 @@ async function waitForResponse( } }; + const onData = (chunk: Buffer) => { + buffer += chunk.toString("utf-8"); + processBuffer(); + }; + const onError = (err: Error) => { - cleanup(); - reject(err); + settle(() => reject(err)); }; const onClose = () => { - cleanup(); - reject(new Error("ACP subprocess closed before responding")); + settle(() => reject(new Error("ACP subprocess closed before responding"))); }; const cleanup = () => { @@ -188,11 +229,16 @@ async function waitForResponse( if (timeoutMs !== undefined) { timer = setTimeout(() => { - cleanup(); - reject(new Error(`ACP handshake timed out after ${timeoutMs}ms`)); + settle(() => reject(new Error(`ACP handshake timed out after ${timeoutMs}ms`))); }, timeoutMs); } + // Process any pre-existing buffer content before listening for more data + if (buffer) { + processBuffer(); + if (settled) return; + } + stdout.on("data", onData); stdout.on("error", onError); stdout.on("close", onClose); diff --git a/src/adapters/acp/acp-session.ts b/src/adapters/acp/acp-session.ts index 130294c..67aeae6 100644 --- a/src/adapters/acp/acp-session.ts +++ b/src/adapters/acp/acp-session.ts @@ -18,6 +18,7 @@ import { isJsonRpcResponse, type JsonRpcCodec, } from "./json-rpc.js"; +import { killProcessGroup } from "./kill-process-group.js"; import type { AcpInitializeResult, ErrorClassifier } from "./outbound-translator.js"; import { translateAuthStatus, @@ -44,6 +45,7 @@ export class AcpSession implements BackendSession { private readonly pendingRequests = new Map(); private pendingPermissionRequestId: number | string | undefined; private closed = false; + private readonly preBufferedData: string; /** Accumulated streaming text for synthesizing an assistant message when the prompt completes. */ private streamedText = ""; /** Whether a status_change(running) has been emitted for the current turn. */ @@ -56,6 +58,7 @@ export class AcpSession implements BackendSession { initResult: AcpInitializeResult, tracer?: MessageTracer, errorClassifier?: ErrorClassifier, + preBufferedData?: string, ) { this.sessionId = sessionId; this.child = child; @@ -63,6 +66,7 @@ export class AcpSession implements BackendSession { this.initResult = initResult; this.tracer = tracer; this.errorClassifier = errorClassifier; + this.preBufferedData = preBufferedData ?? ""; } send(message: UnifiedMessage): void { @@ -168,21 +172,22 @@ export class AcpSession implements BackendSession { } this.pendingRequests.clear(); - // Send SIGTERM and wait for exit with timeout - const exitPromise = new Promise((resolve) => { - this.child.once("exit", () => resolve()); - }); + await new Promise((resolve) => { + let killTimer: ReturnType | undefined; + + this.child.once("exit", () => { + clearTimeout(killTimer); + resolve(); + }); - this.child.kill("SIGTERM"); + killProcessGroup(this.child, "SIGTERM"); - const timeout = new Promise((resolve) => { - setTimeout(() => { - this.child.kill("SIGKILL"); + killTimer = setTimeout(() => { + killProcessGroup(this.child, "SIGKILL"); resolve(); }, 5000); + killTimer.unref(); }); - - await Promise.race([exitPromise, timeout]); } private createMessageStream(): AsyncIterable { @@ -190,6 +195,7 @@ export class AcpSession implements BackendSession { const codec = this.codec; const initResult = this.initResult; const session = this; + const preBufferedData = this.preBufferedData; return { [Symbol.asyncIterator]() { @@ -260,7 +266,10 @@ export class AcpSession implements BackendSession { } }; - const onClose = () => { + // Finalize the stream when the child process exits or stdout closes. + // Listening on both ensures we detect disconnection even if grandchild + // processes keep the stdout pipe open after the main process exits. + const finalize = () => { done = true; if (resolve) { const r = resolve; @@ -270,7 +279,14 @@ export class AcpSession implements BackendSession { }; child.stdout?.on("data", onData); - child.stdout?.on("close", onClose); + child.stdout?.on("close", finalize); + child.on("exit", finalize); + + // Replay any data buffered during the ACP handshake so messages that + // arrived in the same chunk as the session/new response are not lost. + if (preBufferedData) { + onData(Buffer.from(preBufferedData, "utf-8")); + } return { next(): Promise> { @@ -297,7 +313,8 @@ export class AcpSession implements BackendSession { }, return(): Promise> { child.stdout?.removeListener("data", onData); - child.stdout?.removeListener("close", onClose); + child.stdout?.removeListener("close", finalize); + child.removeListener("exit", finalize); done = true; return Promise.resolve({ value: undefined, diff --git a/src/adapters/acp/kill-process-group.ts b/src/adapters/acp/kill-process-group.ts new file mode 100644 index 0000000..cae5685 --- /dev/null +++ b/src/adapters/acp/kill-process-group.ts @@ -0,0 +1,28 @@ +import type { ChildProcess } from "node:child_process"; + +/** + * Send a signal to the entire process group (negative PID) so descendant + * processes spawned by the ACP agent are also terminated. + * + * Falls back to `child.kill(signal)` when the PID is unavailable (not yet + * spawned) or when `process.kill` throws (e.g. process already exited). + */ +export function killProcessGroup(child: ChildProcess, signal: NodeJS.Signals): void { + const pid = child.pid; + if (pid !== undefined) { + try { + // Try to kill the whole process group first. + process.kill(-pid, signal); + return; // If successful, we're done. + } catch { + // Fallback to killing just the child process if group kill fails + // (e.g. on Windows, or if the process has already exited). + } + } + + try { + child.kill(signal); + } catch { + // Process already exited — nothing to do. + } +}