Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 75 additions & 29 deletions src/adapters/acp/acp-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type {
import type { MessageTracer } from "../../core/messaging/message-tracer.js";
import { AcpSession } from "./acp-session.js";
import { JsonRpcCodec } from "./json-rpc.js";
import { killProcessGroup } from "./kill-process-group.js";
import type { AcpInitializeResult, ErrorClassifier } from "./outbound-translator.js";

const PROTOCOL_VERSION = 1;
Expand Down Expand Up @@ -56,6 +57,11 @@ export class AcpAdapter implements BackendAdapter {
const child = this.spawnFn(command, args, {
stdio: ["pipe", "pipe", "pipe"],
cwd,
// Create a new process group so we can kill all descendant processes
// (e.g. subprocesses spawned by the ACP agent that inherit the stdout
// pipe, which would otherwise keep the pipe alive after the main
// process exits and prevent backend-disconnected detection).
detached: true,
});

if (!child.stdin || !child.stdout) {
Expand All @@ -79,14 +85,15 @@ export class AcpAdapter implements BackendAdapter {
});
child.stdin.write(codec.encode(initReq));

const initResult = await waitForResponse<AcpInitializeResult>(
child.stdout,
codec,
initId,
tracer,
options.sessionId,
initializeTimeoutMs,
);
const { result: initResult, leftover: initLeftover } =
await waitForResponse<AcpInitializeResult>(
child.stdout,
codec,
initId,
tracer,
options.sessionId,
initializeTimeoutMs,
);

// Create or resume session
const sessionMethod = options.resume ? "session/load" : "session/new";
Expand All @@ -102,50 +109,78 @@ export class AcpAdapter implements BackendAdapter {
});
child.stdin.write(codec.encode(sessionReq));

const sessionResult = await waitForResponse<{ sessionId: string }>(
const { result: sessionResult, leftover: sessionLeftover } = await waitForResponse<{
sessionId: string;
}>(
child.stdout,
codec,
sessionReqId,
tracer,
options.sessionId,
initializeTimeoutMs,
initLeftover,
);

const sessionId = sessionResult.sessionId ?? options.sessionId;

return new AcpSession(sessionId, child, codec, initResult, tracer, this.errorClassifier);
return new AcpSession(
sessionId,
child,
codec,
initResult,
tracer,
this.errorClassifier,
sessionLeftover,
);
} catch (err) {
// Kill the child process to prevent zombies when handshake fails or times out
child.kill("SIGTERM");
const killTimer = setTimeout(() => child.kill("SIGKILL"), killGracePeriodMs);
killProcessGroup(child, "SIGTERM");
const killTimer = setTimeout(() => killProcessGroup(child, "SIGKILL"), killGracePeriodMs);
child.once("exit", () => clearTimeout(killTimer));
killTimer.unref();
throw err;
}
}
}

/** Read lines from stdout until we get a JSON-RPC response matching the given ID. */
/**
* Read lines from stdout until we get a JSON-RPC response matching the given ID.
*
* Returns the decoded result and any raw data that arrived after the matched
* response in the same read chunk (leftover), so the caller can replay it
* rather than silently losing messages (e.g. a session/request_permission that
* Gemini sends in the same chunk as the session/new response).
*
* @param initialBuffer - Optional pre-existing data to process before listening
* for new chunks (used to pass leftover from a prior waitForResponse call).
*/
async function waitForResponse<T>(
stdout: NodeJS.ReadableStream,
codec: JsonRpcCodec,
expectedId: number | string,
tracer?: MessageTracer,
sessionId?: string,
timeoutMs?: number,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
let buffer = "";
initialBuffer?: string,
): Promise<{ result: T; leftover: string }> {
return new Promise<{ result: T; leftover: string }>((resolve, reject) => {
let buffer = initialBuffer ?? "";
let settled = false;
let timer: ReturnType<typeof setTimeout> | undefined;

const onData = (chunk: Buffer) => {
buffer += chunk.toString("utf-8");
const settle = (fn: () => void) => {
if (settled) return;
settled = true;
cleanup();
fn();
};

const processBuffer = () => {
const lines = buffer.split("\n");
// Keep the last incomplete line in the buffer
buffer = lines.pop() ?? "";

for (const line of lines) {
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
if (!line.trim()) continue;

try {
Expand All @@ -155,11 +190,14 @@ async function waitForResponse<T>(
phase: "handshake_recv",
});
if ("id" in msg && msg.id === expectedId) {
cleanup();
// Collect remaining lines + incomplete buffer as leftover for the next stage
const remaining = lines.slice(i + 1);
const leftover = remaining.length > 0 ? `${remaining.join("\n")}\n${buffer}` : buffer;
if ("error" in msg && msg.error) {
reject(new Error(`ACP error: ${msg.error.message}`));
const errorMsg = msg.error.message;
settle(() => reject(new Error(`ACP error: ${errorMsg}`)));
} else {
resolve((msg as { result: T }).result);
settle(() => resolve({ result: (msg as { result: T }).result, leftover }));
}
return;
}
Expand All @@ -169,14 +207,17 @@ async function waitForResponse<T>(
}
};

const onData = (chunk: Buffer) => {
buffer += chunk.toString("utf-8");
processBuffer();
};

const onError = (err: Error) => {
cleanup();
reject(err);
settle(() => reject(err));
};

const onClose = () => {
cleanup();
reject(new Error("ACP subprocess closed before responding"));
settle(() => reject(new Error("ACP subprocess closed before responding")));
};

const cleanup = () => {
Expand All @@ -188,11 +229,16 @@ async function waitForResponse<T>(

if (timeoutMs !== undefined) {
timer = setTimeout(() => {
cleanup();
reject(new Error(`ACP handshake timed out after ${timeoutMs}ms`));
settle(() => reject(new Error(`ACP handshake timed out after ${timeoutMs}ms`)));
}, timeoutMs);
}

// Process any pre-existing buffer content before listening for more data
if (buffer) {
processBuffer();
if (settled) return;
}

stdout.on("data", onData);
stdout.on("error", onError);
stdout.on("close", onClose);
Expand Down
43 changes: 30 additions & 13 deletions src/adapters/acp/acp-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
isJsonRpcResponse,
type JsonRpcCodec,
} from "./json-rpc.js";
import { killProcessGroup } from "./kill-process-group.js";
import type { AcpInitializeResult, ErrorClassifier } from "./outbound-translator.js";
import {
translateAuthStatus,
Expand All @@ -44,6 +45,7 @@ export class AcpSession implements BackendSession {
private readonly pendingRequests = new Map<number | string, PendingRequest>();
private pendingPermissionRequestId: number | string | undefined;
private closed = false;
private readonly preBufferedData: string;
/** Accumulated streaming text for synthesizing an assistant message when the prompt completes. */
private streamedText = "";
/** Whether a status_change(running) has been emitted for the current turn. */
Expand All @@ -56,13 +58,15 @@ export class AcpSession implements BackendSession {
initResult: AcpInitializeResult,
tracer?: MessageTracer,
errorClassifier?: ErrorClassifier,
preBufferedData?: string,
) {
this.sessionId = sessionId;
this.child = child;
this.codec = codec;
this.initResult = initResult;
this.tracer = tracer;
this.errorClassifier = errorClassifier;
this.preBufferedData = preBufferedData ?? "";
}

send(message: UnifiedMessage): void {
Expand Down Expand Up @@ -168,28 +172,30 @@ export class AcpSession implements BackendSession {
}
this.pendingRequests.clear();

// Send SIGTERM and wait for exit with timeout
const exitPromise = new Promise<void>((resolve) => {
this.child.once("exit", () => resolve());
});
await new Promise<void>((resolve) => {
let killTimer: ReturnType<typeof setTimeout> | undefined;

this.child.once("exit", () => {
clearTimeout(killTimer);
resolve();
});

this.child.kill("SIGTERM");
killProcessGroup(this.child, "SIGTERM");

const timeout = new Promise<void>((resolve) => {
setTimeout(() => {
this.child.kill("SIGKILL");
killTimer = setTimeout(() => {
killProcessGroup(this.child, "SIGKILL");
resolve();
}, 5000);
killTimer.unref();
});

await Promise.race([exitPromise, timeout]);
}

private createMessageStream(): AsyncIterable<UnifiedMessage> {
const child = this.child;
const codec = this.codec;
const initResult = this.initResult;
const session = this;
const preBufferedData = this.preBufferedData;

return {
[Symbol.asyncIterator]() {
Expand Down Expand Up @@ -260,7 +266,10 @@ export class AcpSession implements BackendSession {
}
};

const onClose = () => {
// Finalize the stream when the child process exits or stdout closes.
// Listening on both ensures we detect disconnection even if grandchild
// processes keep the stdout pipe open after the main process exits.
const finalize = () => {
done = true;
if (resolve) {
const r = resolve;
Expand All @@ -270,7 +279,14 @@ export class AcpSession implements BackendSession {
};

child.stdout?.on("data", onData);
child.stdout?.on("close", onClose);
child.stdout?.on("close", finalize);
child.on("exit", finalize);

// Replay any data buffered during the ACP handshake so messages that
// arrived in the same chunk as the session/new response are not lost.
if (preBufferedData) {
onData(Buffer.from(preBufferedData, "utf-8"));
}

return {
next(): Promise<IteratorResult<UnifiedMessage>> {
Expand All @@ -297,7 +313,8 @@ export class AcpSession implements BackendSession {
},
return(): Promise<IteratorResult<UnifiedMessage>> {
child.stdout?.removeListener("data", onData);
child.stdout?.removeListener("close", onClose);
child.stdout?.removeListener("close", finalize);
child.removeListener("exit", finalize);
done = true;
return Promise.resolve({
value: undefined,
Expand Down
28 changes: 28 additions & 0 deletions src/adapters/acp/kill-process-group.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { ChildProcess } from "node:child_process";

/**
* Send a signal to the entire process group (negative PID) so descendant
* processes spawned by the ACP agent are also terminated.
*
* Falls back to `child.kill(signal)` when the PID is unavailable (not yet
* spawned) or when `process.kill` throws (e.g. process already exited).
*/
export function killProcessGroup(child: ChildProcess, signal: NodeJS.Signals): void {
const pid = child.pid;
if (pid !== undefined) {
try {
// Try to kill the whole process group first.
process.kill(-pid, signal);
return; // If successful, we're done.
} catch {
// Fallback to killing just the child process if group kill fails
// (e.g. on Windows, or if the process has already exited).
}
}

try {
child.kill(signal);
} catch {
// Process already exited — nothing to do.
}
}