Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/queue-ipc-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,45 @@ import { type QueueOwnerRecord, waitMs } from "./queue-lease-store.js";

const QUEUE_CONNECT_ATTEMPTS = 40;
export const QUEUE_CONNECT_RETRY_MS = 50;
export const SOCKET_CONNECTION_TIMEOUT_MS = 5000;

function shouldRetryQueueConnect(error: unknown): boolean {
const code = (error as NodeJS.ErrnoException).code;
return code === "ENOENT" || code === "ECONNREFUSED";
}

async function connectToSocket(socketPath: string): Promise<net.Socket> {
async function connectToSocket(
socketPath: string,
timeoutMs = SOCKET_CONNECTION_TIMEOUT_MS,
): Promise<net.Socket> {
return await new Promise<net.Socket>((resolve, reject) => {
const socket = net.createConnection(socketPath);
let settled = false;

const timeout = setTimeout(() => {
if (settled) {
return;
}
settled = true;
socket.destroy();
reject(new Error(`Connection to ${socketPath} timed out after ${timeoutMs}ms`));
}, timeoutMs);

const onConnect = () => {
if (settled) {
return;
}
settled = true;
clearTimeout(timeout);
socket.off("error", onError);
resolve(socket);
};
const onError = (error: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timeout);
socket.off("connect", onConnect);
reject(error);
};
Expand Down
7 changes: 7 additions & 0 deletions src/queue-ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import type {
} from "./types.js";

export { QUEUE_CONNECT_RETRY_MS } from "./queue-ipc-transport.js";
export const MAX_MESSAGE_BUFFER_SIZE = 10 * 1024 * 1024;
export {
isProcessAlive,
releaseQueueOwnerLease,
Expand Down Expand Up @@ -218,6 +219,12 @@ async function runQueueOwnerRequest<TResult>(options: {
socket.on("data", (chunk: string) => {
buffer += chunk;

if (buffer.length > MAX_MESSAGE_BUFFER_SIZE) {
socket.destroy();
finishReject(new Error(`Message buffer exceeded ${MAX_MESSAGE_BUFFER_SIZE} bytes`));
return;
}

let index = buffer.indexOf("\n");
while (index >= 0) {
const line = buffer.slice(0, index).trim();
Expand Down
50 changes: 50 additions & 0 deletions test/queue-ipc-errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import test from "node:test";
import type { SetSessionConfigOptionResponse } from "@agentclientprotocol/sdk";
import { QueueConnectionError, QueueProtocolError } from "../src/errors.js";
import {
MAX_MESSAGE_BUFFER_SIZE,
SessionQueueOwner,
releaseQueueOwnerLease,
tryAcquireQueueOwnerLease,
Expand Down Expand Up @@ -319,6 +320,55 @@ test("trySubmitToRunningOwner surfaces disconnect-before-ack detail code", async
});
});

test("trySubmitToRunningOwner rejects oversized queue messages", async () => {
await withTempHome(async (homeDir) => {
const sessionId = "submit-oversized-message";
const keeper = await startKeeperProcess();
const { lockPath, socketPath } = queuePaths(homeDir, sessionId);
await writeQueueOwnerLock({
lockPath,
pid: keeper.pid,
sessionId,
socketPath,
});

const server = createSingleRequestServer((socket, request) => {
assert.equal(request.type, "submit_prompt");
socket.write(
`${JSON.stringify({
type: "accepted",
requestId: request.requestId,
})}\n`,
);
socket.write(`${"x".repeat(MAX_MESSAGE_BUFFER_SIZE + 1)}\n`);
});

await listenServer(server, socketPath);

try {
await assert.rejects(
async () =>
await trySubmitToRunningOwner({
sessionId,
message: "hello",
permissionMode: "approve-reads",
outputFormatter: NOOP_OUTPUT_FORMATTER,
waitForCompletion: true,
}),
(error: unknown) => {
assert(error instanceof Error);
assert.match(error.message, /Message buffer exceeded/);
return true;
},
);
} finally {
await closeServer(server);
await cleanupOwnerArtifacts({ socketPath, lockPath });
stopProcess(keeper);
}
});
});

test("trySubmitToRunningOwner streams queued lifecycle and returns result", async () => {
await withTempHome(async (homeDir) => {
const sessionId = "queued-lifecycle-session";
Expand Down