Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions src/mcp/__tests__/dynamic-handlers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,115 @@ describe("executeDynamicHandler", () => {
}
});

test("concurrent drain: large stderr before stdout does not deadlock", async () => {
// Regression test for pipe-buffer deadlock. Write 200 KB to stderr
// (well beyond the 64 KB pipe buffer), then print to stdout and exit.
// Under the old sequential-drain code this hangs forever.
const tool: DynamicToolDef = {
name: "test_pipe_deadlock",
description: "test",
inputSchema: {},
handlerType: "shell",
handlerCode: 'head -c 204800 /dev/urandom | base64 >&2; echo "done"',
};

const start = Date.now();
const result = await executeDynamicHandler(tool, {});
const elapsed = Date.now() - start;

expect(result.isError).toBeFalsy();
const text = (result.content[0] as { type: string; text: string }).text;
expect(text).toBe("done");
// Should complete in well under 5 seconds. A deadlock would hit the
// test runner timeout instead.
expect(elapsed).toBeLessThan(5000);
});

test("timeout kills a hung handler", async () => {
const origTimeout = process.env.PHANTOM_DYNAMIC_HANDLER_TIMEOUT_MS;
process.env.PHANTOM_DYNAMIC_HANDLER_TIMEOUT_MS = "500";
try {
const tool: DynamicToolDef = {
name: "test_hang",
description: "test",
inputSchema: {},
handlerType: "shell",
handlerCode: "sleep 10",
};

const start = Date.now();
const result = await executeDynamicHandler(tool, {});
const elapsed = Date.now() - start;

expect(result.isError).toBe(true);
const text = (result.content[0] as { type: string; text: string }).text;
expect(text).toContain("timed out");
// 500ms timeout + 2s grace + slack. Should be well under 10s (the sleep).
expect(elapsed).toBeLessThan(5000);
} finally {
if (origTimeout !== undefined) {
process.env.PHANTOM_DYNAMIC_HANDLER_TIMEOUT_MS = origTimeout;
} else {
// `= undefined` would coerce to the string "undefined" on process.env;
// Reflect.deleteProperty actually removes the key so later tests see it as unset.
// (Matches the pattern acknowledged by the maintainer in #5.)
Reflect.deleteProperty(process.env, "PHANTOM_DYNAMIC_HANDLER_TIMEOUT_MS");
}
}
});

test("output cap truncates runaway stdout", async () => {
const origCap = process.env.PHANTOM_DYNAMIC_HANDLER_MAX_OUTPUT_BYTES;
process.env.PHANTOM_DYNAMIC_HANDLER_MAX_OUTPUT_BYTES = "10000";
try {
const tool: DynamicToolDef = {
name: "test_runaway",
description: "test",
inputSchema: {},
handlerType: "shell",
// Emit ~270 KB of base64 to stdout, far exceeding the 10 KB cap.
handlerCode: "head -c 200000 /dev/urandom | base64",
};

const result = await executeDynamicHandler(tool, {});

expect(result.isError).toBeFalsy();
const text = (result.content[0] as { type: string; text: string }).text;
expect(text).toContain("Output truncated");
// Captured bytes ≤ cap + truncation notice (~50 chars). Trim happens
// after the truncation marker was appended, so just assert it's
// bounded well below the full output size.
expect(text.length).toBeLessThan(11000);
} finally {
if (origCap !== undefined) {
process.env.PHANTOM_DYNAMIC_HANDLER_MAX_OUTPUT_BYTES = origCap;
} else {
// See note on PHANTOM_DYNAMIC_HANDLER_TIMEOUT_MS above.
Reflect.deleteProperty(process.env, "PHANTOM_DYNAMIC_HANDLER_MAX_OUTPUT_BYTES");
}
}
});

test("non-zero exit surfaces stderr and exit code in error message", async () => {
// The concurrency claim (stdout and stderr drained in parallel) is
// already proven by the pipe-deadlock regression test above. This test
// guards the non-zero exit path: the error message must include the
// captured stderr and the exit code so the agent has actionable signal.
const tool: DynamicToolDef = {
name: "test_mixed_streams",
description: "test",
inputSchema: {},
handlerType: "shell",
handlerCode: 'echo "stdout-marker"; echo "stderr-marker" >&2; exit 2',
};

const result = await executeDynamicHandler(tool, {});
expect(result.isError).toBe(true);
const text = (result.content[0] as { type: string; text: string }).text;
expect(text).toContain("stderr-marker");
expect(text).toContain("exit 2");
});

test("script handler receives TOOL_INPUT via env", async () => {
const tmpFile = "/tmp/phantom-test-tool-input.ts";
await Bun.write(tmpFile, "console.log(process.env.TOOL_INPUT)");
Expand Down
51 changes: 51 additions & 0 deletions src/mcp/__tests__/dynamic-tools.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,57 @@ describe("DynamicToolRegistry", () => {
freshDb.close();
});

test("registerAllOnServer tolerates a failing tool registration", () => {
const freshDb = new Database(":memory:");
runMigrations(freshDb);

const registry = new DynamicToolRegistry(freshDb);
registry.register({
name: "good_tool",
description: "Good",
input_schema: {},
handler_type: "shell",
handler_code: "echo good",
});
registry.register({
name: "bad_tool",
description: "Bad",
input_schema: {},
handler_type: "shell",
handler_code: "echo bad",
});

const attempted: string[] = [];
const succeeded: string[] = [];
const mockServer = {
registerTool(name: string, _meta: unknown, _handler: unknown) {
attempted.push(name);
if (name === "bad_tool") {
throw new Error("simulated schema failure");
}
succeeded.push(name);
},
};

const origWarn = console.warn;
const warnings: string[] = [];
console.warn = (...args: unknown[]) => {
warnings.push(args.map((a) => String(a)).join(" "));
};

try {
expect(() => registry.registerAllOnServer(mockServer as never)).not.toThrow();
} finally {
console.warn = origWarn;
}

expect(attempted).toEqual(["good_tool", "bad_tool"]);
expect(succeeded).toEqual(["good_tool"]);
expect(warnings.some((w) => w.includes("bad_tool") && w.includes("simulated schema failure"))).toBe(true);

freshDb.close();
});

test("upserts on duplicate name", () => {
const freshDb = new Database(":memory:");
runMigrations(freshDb);
Expand Down
156 changes: 142 additions & 14 deletions src/mcp/dynamic-handlers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import type { Subprocess } from "bun";
import type { DynamicToolDef } from "./dynamic-tools.ts";

const DEFAULT_HANDLER_TIMEOUT_MS = 60_000;
const DEFAULT_MAX_OUTPUT_BYTES = 1_000_000;
const HANDLER_GRACE_MS = 2_000;

type HandlerLimits = { timeoutMs: number; maxOutputBytes: number };

function getHandlerLimits(): HandlerLimits {
return {
timeoutMs: Number(process.env.PHANTOM_DYNAMIC_HANDLER_TIMEOUT_MS ?? DEFAULT_HANDLER_TIMEOUT_MS),
maxOutputBytes: Number(process.env.PHANTOM_DYNAMIC_HANDLER_MAX_OUTPUT_BYTES ?? DEFAULT_MAX_OUTPUT_BYTES),
};
}

/**
* Safe environment for subprocess execution.
* Only expose what dynamic tools legitimately need.
Expand All @@ -16,16 +30,120 @@ export function buildSafeEnv(input: Record<string, unknown>): Record<string, str
};
}

/**
* Drain a ReadableStream with a hard byte cap.
*
* Critically, we keep reading (and dropping) chunks past the cap so the child
* process never blocks on a full 64 KB pipe buffer. Cancelling the reader would
* be simpler but risks leaving the child stuck on its next write.
*/
async function readStreamWithCap(
stream: ReadableStream<Uint8Array>,
maxBytes: number,
): Promise<{ text: string; truncated: boolean }> {
const reader = stream.getReader();
const chunks: Uint8Array[] = [];
let totalBytes = 0;
let truncated = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (truncated) continue;
if (totalBytes + value.byteLength > maxBytes) {
const remaining = maxBytes - totalBytes;
if (remaining > 0) chunks.push(value.subarray(0, remaining));
totalBytes = maxBytes;
truncated = true;
} else {
chunks.push(value);
totalBytes += value.byteLength;
}
}
} finally {
reader.releaseLock();
}

const combined = new Uint8Array(totalBytes);
let offset = 0;
for (const chunk of chunks) {
combined.set(chunk, offset);
offset += chunk.byteLength;
}
const text = new TextDecoder().decode(combined);
return {
text: truncated ? `${text}\n\n_(Output truncated at ${maxBytes} bytes.)_` : text,
truncated,
};
}

type DrainResult = {
stdout: string;
stderr: string;
exitCode: number | null;
timedOut: boolean;
};

/**
* Run a spawned subprocess with concurrent pipe drains, a hard timeout, and
* stdout/stderr size caps. Concurrent drains prevent the classic sequential
* drain deadlock (child blocks on stderr write while parent waits for stdout
* EOF). Timeout fires SIGTERM, escalates to SIGKILL after a grace period.
*/
async function drainProcessWithLimits(
proc: Subprocess<"pipe" | "ignore" | "inherit", "pipe", "pipe">,
limits: HandlerLimits,
): Promise<DrainResult> {
let timedOut = false;
const termTimer = setTimeout(() => {
timedOut = true;
proc.kill("SIGTERM");
}, limits.timeoutMs);
const killTimer = setTimeout(() => {
proc.kill("SIGKILL");
}, limits.timeoutMs + HANDLER_GRACE_MS);

try {
const [stdoutResult, stderrResult] = await Promise.all([
readStreamWithCap(proc.stdout, limits.maxOutputBytes),
readStreamWithCap(proc.stderr, limits.maxOutputBytes),
]);
await proc.exited;
return {
stdout: stdoutResult.text,
stderr: stderrResult.text,
exitCode: proc.exitCode,
timedOut,
};
} finally {
clearTimeout(termTimer);
clearTimeout(killTimer);
}
}

function timeoutResult(toolName: string, timeoutMs: number, partial: string): CallToolResult {
const snippet = partial.slice(0, 500);
return {
content: [
{
type: "text",
text: `Tool '${toolName}' timed out after ${timeoutMs}ms and was killed. Partial output: ${snippet}`,
},
],
isError: true,
};
}

export async function executeDynamicHandler(
tool: DynamicToolDef,
input: Record<string, unknown>,
): Promise<CallToolResult> {
try {
switch (tool.handlerType) {
case "script":
return executeScriptHandler(tool.handlerPath ?? "", input);
return executeScriptHandler(tool, input);
case "shell":
return executeShellHandler(tool.handlerCode ?? "", input);
return executeShellHandler(tool, input);
default:
return {
content: [
Expand All @@ -46,7 +164,8 @@ export async function executeDynamicHandler(
}
}

async function executeScriptHandler(path: string, input: Record<string, unknown>): Promise<CallToolResult> {
async function executeScriptHandler(tool: DynamicToolDef, input: Record<string, unknown>): Promise<CallToolResult> {
const path = tool.handlerPath ?? "";
const { existsSync } = await import("node:fs");
if (!existsSync(path)) {
return {
Expand All @@ -55,6 +174,8 @@ async function executeScriptHandler(path: string, input: Record<string, unknown>
};
}

const limits = getHandlerLimits();

// --env-file= prevents bun from auto-loading .env/.env.local files,
// which would leak secrets into the subprocess despite buildSafeEnv.
const proc = Bun.spawn(["bun", "--env-file=", "run", path], {
Expand All @@ -67,34 +188,41 @@ async function executeScriptHandler(path: string, input: Record<string, unknown>
proc.stdin.write(JSON.stringify(input));
proc.stdin.end();

const stdout = await new Response(proc.stdout).text();
const stderr = await new Response(proc.stderr).text();
await proc.exited;
const { stdout, stderr, exitCode, timedOut } = await drainProcessWithLimits(proc, limits);

if (proc.exitCode !== 0) {
if (timedOut) {
return timeoutResult(tool.name, limits.timeoutMs, stderr || stdout);
}

if (exitCode !== 0) {
return {
content: [{ type: "text", text: `Script error (exit ${proc.exitCode}): ${stderr || stdout}` }],
content: [{ type: "text", text: `Script error (exit ${exitCode}): ${stderr || stdout}` }],
isError: true,
};
}

return { content: [{ type: "text", text: stdout.trim() }] };
}

async function executeShellHandler(command: string, input: Record<string, unknown>): Promise<CallToolResult> {
async function executeShellHandler(tool: DynamicToolDef, input: Record<string, unknown>): Promise<CallToolResult> {
const command = tool.handlerCode ?? "";
const limits = getHandlerLimits();

const proc = Bun.spawn(["bash", "-c", command], {
stdout: "pipe",
stderr: "pipe",
env: buildSafeEnv(input),
});

const stdout = await new Response(proc.stdout).text();
const stderr = await new Response(proc.stderr).text();
await proc.exited;
const { stdout, stderr, exitCode, timedOut } = await drainProcessWithLimits(proc, limits);

if (timedOut) {
return timeoutResult(tool.name, limits.timeoutMs, stderr || stdout);
}

if (proc.exitCode !== 0) {
if (exitCode !== 0) {
return {
content: [{ type: "text", text: `Shell error (exit ${proc.exitCode}): ${stderr || stdout}` }],
content: [{ type: "text", text: `Shell error (exit ${exitCode}): ${stderr || stdout}` }],
isError: true,
};
}
Expand Down
Loading