diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index dc59c4c86..87ed0bcfd 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -29,6 +29,7 @@ import { TodoWriteTool } from "../../tool/todo" import { Locale } from "../../util/locale" import { Tracer, FileExporter, HttpExporter, type TraceExporter } from "../../altimate/observability/tracing" import { Config } from "../../config/config" +import { readStdinIfAvailable, assembleStdinMessage } from "../../util/stdin" type ToolProps = { input: Tool.InferParameters @@ -430,7 +431,9 @@ export const RunCommand = cmd({ message = [extractedParts.join("\n\n"), message].filter(Boolean).join("\n\n") } - if (!process.stdin.isTTY) message += "\n" + (await Bun.stdin.text()) + // Read piped/redirected stdin without wedging on inherited-but-idle fds. + // See `src/util/stdin.ts` for the failure mode and the first-byte-race fix. + message = assembleStdinMessage(message, await readStdinIfAvailable()) if (message.trim().length === 0 && !args.command) { UI.error("You must provide a message or a command") diff --git a/packages/opencode/src/util/stdin.ts b/packages/opencode/src/util/stdin.ts new file mode 100644 index 000000000..44511e610 --- /dev/null +++ b/packages/opencode/src/util/stdin.ts @@ -0,0 +1,137 @@ +import fs from "fs" +import type { Stats } from "fs" + +const FIRST_BYTE_TIMEOUT_MS = 100 + +type Stat = Pick + +export interface ReadStdinDeps { + isTTY?: boolean + fstat?: () => Stat + // Returns "" if no first byte arrives within timeoutMs; otherwise drains + // stdin to EOF and returns the full content. Default implementation uses + // process.stdin events so that a wedged-after-first-byte case still has a + // well-defined behavior (waits for `end`); callers can inject a faster + // implementation in tests. + readStdin?: (timeoutMs: number) => Promise + timeoutMs?: number +} + +// Read piped/redirected stdin without wedging on an inherited-but-idle fd. +// +// The failure mode this guards against: subprocess callers (Claude Code's +// Bash tool, Python `subprocess.run(..., stdin=None)`, CI, plugin hosts) +// leave stdin attached to a parent pipe that is never written to and never +// closed. A blind `Bun.stdin.text()` waits forever for an EOF that never +// arrives. +// +// Strategy — two gates: +// +// 1. fstat gate: only FIFOs (pipes), regular files (redirects), and +// sockets (process supervisors, socket activation, `nc -l`) can carry +// real input. TTYs and character devices (e.g. `< /dev/null`) skip. +// +// 2. First-byte timeout: instead of bounding the whole-stream drain, we +// wait up to `timeoutMs` for the first readable byte. If no byte +// arrives in that window, we treat stdin as inherited-idle and skip. +// If a byte arrives, we drain to EOF without further deadline — so a +// slow producer that takes >100ms total but flushes its first chunk +// within the window is not truncated. This avoids the two pitfalls of +// a whole-stream race: (a) the orphaned `Bun.stdin.text()` continuing +// to hold fd 0 open after the loser is abandoned, and (b) silent +// mid-stream truncation of legitimate slow / large producers. +export async function readStdinIfAvailable(deps: ReadStdinDeps = {}): Promise { + const isTTY = deps.isTTY ?? Boolean(process.stdin.isTTY) + const fstat = deps.fstat ?? (() => fs.fstatSync(0) as Stat) + const readStdin = deps.readStdin ?? defaultReadStdin + const timeoutMs = deps.timeoutMs ?? FIRST_BYTE_TIMEOUT_MS + + if (isTTY) return "" + + try { + const stat = fstat() + if (!stat.isFIFO() && !stat.isFile() && !stat.isSocket()) return "" + } catch { + return "" + } + + return readStdin(timeoutMs) +} + +// Compose the final prompt from a positional message and stdin input. +// Extracted as a pure function so the regression case from PR #935 +// (`echo ctx | run "prompt"` must concatenate, not silently drop ctx) can +// be unit-tested without spawning the full run command. +export function assembleStdinMessage(positional: string, stdinInput: string): string { + if (stdinInput.trim().length === 0) return positional + if (positional.length === 0) return stdinInput + return positional + "\n" + stdinInput +} + +// Default implementation of the first-byte race over `process.stdin`. +// +// Why not `Bun.stdin.text()`: that reads the entire stream as a single +// uncancellable Promise. If we race it against a timer and the timer wins, +// the read still holds fd 0 open until the producer eventually closes, +// blocking process exit (the original wedge moved to teardown). +// +// Using `process.stdin` event listeners lets us: +// - bind the timeout to "first byte" rather than "full drain", so slow +// producers and large payloads aren't truncated; +// - cleanly remove our listeners and `unref` the stream on the no-data +// path, so an inherited-open fd doesn't pin the event loop. +function defaultReadStdin(timeoutMs: number): Promise { + return new Promise((resolve) => { + const stdin = process.stdin + const chunks: Buffer[] = [] + let firstByteReceived = false + let firstByteTimer: ReturnType | undefined + let settled = false + + const cleanup = () => { + stdin.off("data", onData) + stdin.off("end", onEnd) + stdin.off("error", onError) + if (firstByteTimer) clearTimeout(firstByteTimer) + try { + stdin.pause() + } catch {} + try { + stdin.unref?.() + } catch {} + } + + const settle = (result: string) => { + if (settled) return + settled = true + cleanup() + resolve(result) + } + + const onData = (chunk: Buffer) => { + if (!firstByteReceived) { + firstByteReceived = true + if (firstByteTimer) { + clearTimeout(firstByteTimer) + firstByteTimer = undefined + } + } + chunks.push(chunk) + } + const onEnd = () => settle(Buffer.concat(chunks).toString("utf8")) + const onError = () => settle(Buffer.concat(chunks).toString("utf8")) + + firstByteTimer = setTimeout(() => { + if (!firstByteReceived) settle("") + }, timeoutMs) + + stdin.on("data", onData) + stdin.on("end", onEnd) + stdin.on("error", onError) + try { + stdin.resume() + } catch { + settle("") + } + }) +} diff --git a/packages/opencode/test/util/stdin-e2e.test.ts b/packages/opencode/test/util/stdin-e2e.test.ts new file mode 100644 index 000000000..7512a2128 --- /dev/null +++ b/packages/opencode/test/util/stdin-e2e.test.ts @@ -0,0 +1,129 @@ +import { describe, expect, test } from "bun:test" +import path from "path" + +// The fixture imports the real helper and prints { result, elapsed } as JSON. +// Spawning it lets us exercise the actual `process.stdin` event path against +// real fd 0 conditions — something dependency injection can't cover. +const FIXTURE = path.join(__dirname, "stdin-fixture.ts") + +type FileSink = { write(chunk: string | Uint8Array): number; end(): Promise | number } + +async function runFixture(opts: { + writeStdin?: (sink: FileSink) => Promise + killAfterMs?: number +}): Promise<{ code: number | null; result?: string; elapsed?: number; stdout: string; stderr: string }> { + const proc = Bun.spawn(["bun", "run", FIXTURE], { + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }) + + if (opts.writeStdin) { + await opts.writeStdin(proc.stdin as unknown as FileSink) + } + + let killTimer: ReturnType | undefined + if (opts.killAfterMs) { + killTimer = setTimeout(() => proc.kill(), opts.killAfterMs) + } + const code = await proc.exited + if (killTimer) clearTimeout(killTimer) + + const stdout = await new Response(proc.stdout).text() + const stderr = await new Response(proc.stderr).text() + + try { + const parsed = JSON.parse(stdout) + return { code, result: parsed.result, elapsed: parsed.elapsed, stdout, stderr } + } catch { + return { code, stdout, stderr } + } +} + +describe("readStdinIfAvailable (spawned subprocess)", () => { + // M1-regression — the canonical wedge: an inherited pipe that's never + // written to and never closed. Pre-fix this hung forever; with the + // previous Promise.race fix, the await released at 100ms but fd 0 stayed + // open until the parent closed. With the first-byte event race + unref, + // the child exits promptly. + test( + "exits promptly with empty result when stdin is an inherited-but-idle pipe", + async () => { + const { code, result, elapsed } = await runFixture({ + // Don't write — leave the pipe open and silent. Close after 1s so + // the parent's writer isn't garbage-collected; by then the child + // should have already exited via the first-byte timeout. + writeStdin: async (sink) => { + await new Promise((r) => setTimeout(r, 1000)) + try { + await sink.end() + } catch {} + }, + killAfterMs: 5000, + }) + expect(code).toBe(0) + expect(result).toBe("") + expect(elapsed).toBeLessThan(500) + }, + 10000, + ) + + // MAJOR-regression from PR #935: `echo ctx | run "prompt"` must still + // deliver "ctx". Verified here by writing data + closing the pipe. + test( + "returns piped data when producer writes and closes", + async () => { + const { code, result } = await runFixture({ + writeStdin: async (sink) => { + sink.write("context data") + await sink.end() + }, + }) + expect(code).toBe(0) + expect(result).toBe("context data") + }, + 10000, + ) + + // M2-regression: a producer that takes >100ms to flush first byte was + // truncated by the old Promise.race timeout. The first-byte gate must + // accept the byte once it arrives and then drain the rest without a + // deadline. + test( + "preserves data from slow producer (first byte arrives just before timeout)", + async () => { + const { code, result } = await runFixture({ + writeStdin: async (sink) => { + // Sleep close to but under the 100ms first-byte budget, then + // flush. The whole-stream-race fix would have returned "". + await new Promise((r) => setTimeout(r, 60)) + sink.write("slow ctx") + await sink.end() + }, + }) + expect(code).toBe(0) + expect(result).toBe("slow ctx") + }, + 10000, + ) + + // Negative control: a producer slow enough to miss the first-byte window + // should yield "" (intentional cutoff, no truncation of in-flight data). + test( + "returns empty when first byte arrives after the first-byte timeout", + async () => { + const { code, result } = await runFixture({ + writeStdin: async (sink) => { + await new Promise((r) => setTimeout(r, 400)) + try { + sink.write("too late") + await sink.end() + } catch {} + }, + }) + expect(code).toBe(0) + expect(result).toBe("") + }, + 10000, + ) +}) diff --git a/packages/opencode/test/util/stdin-fixture.ts b/packages/opencode/test/util/stdin-fixture.ts new file mode 100644 index 000000000..4be684958 --- /dev/null +++ b/packages/opencode/test/util/stdin-fixture.ts @@ -0,0 +1,9 @@ +// Subprocess fixture used by stdin-e2e.test.ts. +// Imports the real helper, invokes it against real fd 0, and prints the +// result as JSON so the test can assert on it. +import { readStdinIfAvailable } from "../../src/util/stdin" + +const start = Date.now() +const result = await readStdinIfAvailable() +const elapsed = Date.now() - start +process.stdout.write(JSON.stringify({ result, elapsed })) diff --git a/packages/opencode/test/util/stdin.test.ts b/packages/opencode/test/util/stdin.test.ts new file mode 100644 index 000000000..49252d239 --- /dev/null +++ b/packages/opencode/test/util/stdin.test.ts @@ -0,0 +1,145 @@ +import { describe, expect, test } from "bun:test" +import { readStdinIfAvailable, assembleStdinMessage } from "../../src/util/stdin" + +const fifo = { isFIFO: () => true, isFile: () => false, isSocket: () => false } +const file = { isFIFO: () => false, isFile: () => true, isSocket: () => false } +const sock = { isFIFO: () => false, isFile: () => false, isSocket: () => true } +const charDev = { isFIFO: () => false, isFile: () => false, isSocket: () => false } + +describe("readStdinIfAvailable", () => { + test("returns empty when stdin is a TTY (no fstat, no read)", async () => { + let read = false + const out = await readStdinIfAvailable({ + isTTY: true, + fstat: () => { + throw new Error("fstat should not run when isTTY") + }, + readStdin: async () => { + read = true + return "should not happen" + }, + }) + expect(out).toBe("") + expect(read).toBe(false) + }) + + test("returns empty for `< /dev/null` (character device — neither FIFO, file, nor socket)", async () => { + let read = false + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => charDev, + readStdin: async () => { + read = true + return "should not happen" + }, + }) + expect(out).toBe("") + expect(read).toBe(false) + }) + + test("returns empty when fstat throws (e.g. EBADF — fd 0 not open)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => { + throw new Error("EBADF") + }, + readStdin: async () => "should not happen", + }) + expect(out).toBe("") + }) + + // MAJOR-regression: `echo ctx | run "prompt"` must still deliver "ctx". + test("returns piped data when stdin is a FIFO with available bytes", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "context data", + }) + expect(out).toBe("context data") + }) + + test("returns redirected file contents (run < file.txt)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => file, + readStdin: async () => "from-file", + }) + expect(out).toBe("from-file") + }) + + // m1 fix: sockets are now accepted (used by process supervisors, socket + // activation, `nc -l`). Pre-fix the helper silently skipped them. + test("accepts socket-backed stdin (process supervisors, socket activation, nc -l)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => sock, + readStdin: async () => "via-socket", + }) + expect(out).toBe("via-socket") + }) + + // M1-regression: timed-out wait must propagate as "" (no orphan, no wedge). + test("returns empty when readStdin times out (idle inherited FIFO)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "", // simulates first-byte timeout + }) + expect(out).toBe("") + }) + + // M2-regression: a producer that takes >100ms total but flushes its first + // byte inside the window must NOT be truncated. The helper trusts readStdin + // to wait for EOF after first byte; we verify the no-truncation contract by + // injecting a readStdin that resolves slowly with full content. + test("returns full content from slow-but-pre-timeout producer", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: () => new Promise((r) => setTimeout(() => r("ctx after 80ms"), 80)), + timeoutMs: 100, + }) + expect(out).toBe("ctx after 80ms") + }) + + test("returns empty for FIFO with immediate EOF (empty pipe)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "", + }) + expect(out).toBe("") + }) + + test("returns whitespace-only stdin verbatim (caller decides to trim)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => " \n\t ", + }) + expect(out).toBe(" \n\t ") + }) +}) + +describe("assembleStdinMessage", () => { + // MAJOR-regression from PR #935: positional must NOT silently override stdin. + test("concatenates positional + stdin with newline", () => { + expect(assembleStdinMessage("summarize:", "context data")).toBe("summarize:\ncontext data") + }) + + test("returns stdin when positional is empty", () => { + expect(assembleStdinMessage("", "stdin only")).toBe("stdin only") + }) + + test("returns positional when stdin is empty", () => { + expect(assembleStdinMessage("msg", "")).toBe("msg") + }) + + test("ignores whitespace-only stdin", () => { + expect(assembleStdinMessage("msg", " \n ")).toBe("msg") + }) + + test("returns empty string when both are empty", () => { + expect(assembleStdinMessage("", "")).toBe("") + }) +})