From 4bbf3e630d91e7e8ffc7a91d840973a18431cede Mon Sep 17 00:00:00 2001 From: Danny Canter Date: Fri, 16 Jan 2026 00:49:00 -0800 Subject: [PATCH] LinuxProcess: Start stdin relay after process start Because we start piping stdin before process launch we can fill up the guest pipe buffer before the process even starts. We'd need some backpressure mechanism to handle this (register the other end with epoll and buffer some data etc.), but we should probably just start piping after the process is up and running. This change does exactly that, as well as stops holding the process mutex while draining stdin in the guest for `CloseStdin()`. This fixes issues where we try and write > pipe_buf bytes through stdin. Today this hangs. --- Sources/Containerization/LinuxProcess.swift | 86 +++--- Sources/Integration/ContainerTests.swift | 267 +++++++++++++++++++ Sources/Integration/Suite.swift | 6 + vminitd/Sources/vminitd/ManagedProcess.swift | 5 +- 4 files changed, 323 insertions(+), 41 deletions(-) diff --git a/Sources/Containerization/LinuxProcess.swift b/Sources/Containerization/LinuxProcess.swift index 76239ffd..4ef676d6 100644 --- a/Sources/Containerization/LinuxProcess.swift +++ b/Sources/Containerization/LinuxProcess.swift @@ -146,44 +146,9 @@ extension LinuxProcess { } } - if let stdin = self.ioSetup.stdin { - if let handle = handles[0] { - self.state.withLock { - $0.stdinRelay = Task { - for await data in stdin.reader.stream() { - do { - try handle.write(contentsOf: data) - } catch { - self.logger?.error("failed to write to stdin: \(error)") - break - } - } - - do { - self.logger?.debug("stdin relay finished, closing") - - // There's two ways we can wind up here: - // - // 1. The stream finished on its own (e.g. we wrote all the - // data) and we will close the underlying stdin in the guest below. - // - // 2. The client explicitly called closeStdin() themselves - // which will cancel this relay task AFTER actually closing - // the fds. If the client did that, then this task will be - // cancelled, and the fds are already gone so there's nothing - // for us to do. - if Task.isCancelled { - return - } - - try await self._closeStdin() - } catch { - self.logger?.error("failed to close stdin: \(error)") - } - } - } - } - } + // Note: stdin relay is started separately via startStdinRelay() after + // the process has started, to avoid a deadlock where closeStdin is + // called before the process is consuming from the pipe. var configuredStreams = 0 let (stream, cc) = AsyncStream.makeStream() @@ -231,6 +196,45 @@ extension LinuxProcess { return handles } + func startStdinRelay(handle: FileHandle) { + guard let stdin = self.ioSetup.stdin else { return } + + self.state.withLock { + $0.stdinRelay = Task { + for await data in stdin.reader.stream() { + do { + try handle.write(contentsOf: data) + } catch { + self.logger?.error("failed to write to stdin: \(error)") + break + } + } + + do { + self.logger?.debug("stdin relay finished, closing") + + // There's two ways we can wind up here: + // + // 1. The stream finished on its own (e.g. we wrote all the + // data) and we will close the underlying stdin in the guest below. + // + // 2. The client explicitly called closeStdin() themselves + // which will cancel this relay task AFTER actually closing + // the fds. If the client did that, then this task will be + // cancelled, and the fds are already gone so there's nothing + // for us to do. + if Task.isCancelled { + return + } + + try await self._closeStdin() + } catch { + self.logger?.error("failed to close stdin: \(error)") + } + } + } + } + /// Start the process. public func start() async throws { do { @@ -273,6 +277,12 @@ extension LinuxProcess { containerID: self.owningContainer ) + // Start stdin relay after process launch to avoid filling the pipe + // buffer before the process is even running. + if let stdinHandle = result[0] { + self.startStdinRelay(handle: stdinHandle) + } + self.state.withLock { $0.stdio = StdioHandles( stdin: result[0], diff --git a/Sources/Integration/ContainerTests.swift b/Sources/Integration/ContainerTests.swift index 5fcf1568..cc4340d5 100644 --- a/Sources/Integration/ContainerTests.swift +++ b/Sources/Integration/ContainerTests.swift @@ -107,6 +107,32 @@ extension IntegrationSuite { } } + final class ChunkedStdinBuffer: ReaderStream { + let chunks: [Data] + let delayMs: Int + + init(chunks: [Data], delayMs: Int = 0) { + self.chunks = chunks + self.delayMs = delayMs + } + + func stream() -> AsyncStream { + let chunks = self.chunks + let delayMs = self.delayMs + return AsyncStream { cont in + Task { + for chunk in chunks { + if delayMs > 0 { + try? await Task.sleep(for: .milliseconds(delayMs)) + } + cont.yield(chunk) + } + cont.finish() + } + } + } + } + func testProcessEchoHi() async throws { let id = "test-process-echo-hi" let bs = try await bootstrap(id) @@ -1761,4 +1787,245 @@ extension IntegrationSuite { throw IntegrationError.assert(msg: "expected /etc/resolv.conf to contain DNS servers, got: \(output)") } } + + func testLargeStdinInput() async throws { + let id = "test-large-stdin-input" + + let bs = try await bootstrap(id) + + let inputSize = 128 * 1024 + let inputData = Data(repeating: 0x41, count: inputSize) // 'A' repeated + + let buffer = BufferWriter() + let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in + config.process.arguments = ["cat"] + config.process.stdin = StdinBuffer(data: inputData) + config.process.stdout = buffer + config.bootLog = bs.bootLog + } + + do { + try await container.create() + try await container.start() + + let status = try await container.wait() + try await container.stop() + + guard status.exitCode == 0 else { + throw IntegrationError.assert(msg: "process status \(status) != 0") + } + + guard buffer.data.count == inputSize else { + throw IntegrationError.assert( + msg: "output size \(buffer.data.count) != input size \(inputSize)") + } + + guard buffer.data == inputData else { + throw IntegrationError.assert(msg: "output data does not match input data") + } + } catch { + try? await container.stop() + throw error + } + } + + func testExecLargeStdinInput() async throws { + let id = "test-exec-large-stdin-input" + let bs = try await bootstrap(id) + + let inputSize = 128 * 1024 + let inputData = Data(repeating: 0x42, count: inputSize) + + let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in + config.process.arguments = ["sleep", "100"] + config.bootLog = bs.bootLog + } + + do { + try await container.create() + try await container.start() + + let buffer = BufferWriter() + let exec = try await container.exec("large-stdin-exec") { config in + config.arguments = ["cat"] + config.stdin = StdinBuffer(data: inputData) + config.stdout = buffer + } + + try await exec.start() + let status = try await exec.wait() + try await exec.delete() + + guard status.exitCode == 0 else { + throw IntegrationError.assert(msg: "exec status \(status) != 0") + } + + guard buffer.data.count == inputSize else { + throw IntegrationError.assert(msg: "output size \(buffer.data.count) != \(inputSize)") + } + + guard buffer.data == inputData else { + throw IntegrationError.assert(msg: "output data mismatch") + } + + try await container.kill(SIGKILL) + try await container.wait() + try await container.stop() + } catch { + try? await container.stop() + throw error + } + } + + func testStdinExplicitClose() async throws { + let id = "test-stdin-explicit-close" + let bs = try await bootstrap(id) + + let inputData = "explicit close test\n".data(using: .utf8)! + let buffer = BufferWriter() + + let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in + config.process.arguments = ["sleep", "100"] + config.bootLog = bs.bootLog + } + + do { + try await container.create() + try await container.start() + + let exec = try await container.exec("stdin-close-exec") { config in + config.arguments = ["head", "-n", "1"] + config.stdin = StdinBuffer(data: inputData) + config.stdout = buffer + } + + try await exec.start() + let status = try await exec.wait() + try await exec.delete() + + guard status.exitCode == 0 else { + throw IntegrationError.assert(msg: "exec status \(status) != 0") + } + + guard buffer.data == inputData else { + throw IntegrationError.assert(msg: "output mismatch") + } + + try await container.kill(SIGKILL) + try await container.wait() + try await container.stop() + } catch { + try? await container.stop() + throw error + } + } + + func testStdinBinaryData() async throws { + let id = "test-stdin-binary-data" + let bs = try await bootstrap(id) + + var inputData = Data() + for i: UInt8 in 0...255 { + inputData.append(contentsOf: [UInt8](repeating: i, count: 256)) + } + + let buffer = BufferWriter() + let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in + config.process.arguments = ["cat"] + config.process.stdin = StdinBuffer(data: inputData) + config.process.stdout = buffer + config.bootLog = bs.bootLog + } + + do { + try await container.create() + try await container.start() + + let status = try await container.wait() + try await container.stop() + + guard status.exitCode == 0 else { + throw IntegrationError.assert(msg: "process status \(status) != 0") + } + + guard buffer.data == inputData else { + throw IntegrationError.assert(msg: "binary data mismatch") + } + } catch { + try? await container.stop() + throw error + } + } + + func testStdinMultipleChunks() async throws { + let id = "test-stdin-multiple-chunks" + let bs = try await bootstrap(id) + + let chunks = (0..<10).map { i in + Data(repeating: UInt8(0x30 + i), count: 10 * 1024) + } + let expectedData = chunks.reduce(Data()) { $0 + $1 } + + let buffer = BufferWriter() + let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in + config.process.arguments = ["cat"] + config.process.stdin = ChunkedStdinBuffer(chunks: chunks, delayMs: 10) + config.process.stdout = buffer + config.bootLog = bs.bootLog + } + + do { + try await container.create() + try await container.start() + + let status = try await container.wait() + try await container.stop() + + guard status.exitCode == 0 else { + throw IntegrationError.assert(msg: "process status \(status) != 0") + } + + guard buffer.data == expectedData else { + throw IntegrationError.assert(msg: "chunked data mismatch") + } + } catch { + try? await container.stop() + throw error + } + } + + func testStdinVeryLarge() async throws { + let id = "test-stdin-very-large" + let bs = try await bootstrap(id) + + let inputSize = 10 * 1024 * 1024 + let inputData = Data(repeating: 0x58, count: inputSize) + + let stdout = DiscardingWriter() + let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in + config.process.arguments = ["wc", "-c"] + config.process.stdin = StdinBuffer(data: inputData) + config.process.stdout = stdout + config.bootLog = bs.bootLog + } + + do { + try await container.create() + try await container.start() + + let status = try await container.wait() + try await container.stop() + + guard status.exitCode == 0 else { + throw IntegrationError.assert(msg: "process status \(status) != 0") + } + + guard stdout.count > 0 else { + throw IntegrationError.assert(msg: "no output from wc") + } + } catch { + try? await container.stop() + throw error + } + } } diff --git a/Sources/Integration/Suite.swift b/Sources/Integration/Suite.swift index 543c309e..7e1d56e3 100644 --- a/Sources/Integration/Suite.swift +++ b/Sources/Integration/Suite.swift @@ -312,6 +312,12 @@ struct IntegrationSuite: AsyncParsableCommand { Test("container read-only rootfs", testReadOnlyRootfs), Test("container read-only rootfs hosts file", testReadOnlyRootfsHostsFileWritten), Test("container read-only rootfs DNS", testReadOnlyRootfsDNSConfigured), + Test("large stdin input", testLargeStdinInput), + Test("exec large stdin input", testExecLargeStdinInput), + Test("stdin explicit close", testStdinExplicitClose), + Test("stdin binary data", testStdinBinaryData), + Test("stdin multiple chunks", testStdinMultipleChunks), + Test("stdin very large", testStdinVeryLarge), // Pods Test("pod single container", testPodSingleContainer), diff --git a/vminitd/Sources/vminitd/ManagedProcess.swift b/vminitd/Sources/vminitd/ManagedProcess.swift index 111ac6e9..5d516593 100644 --- a/vminitd/Sources/vminitd/ManagedProcess.swift +++ b/vminitd/Sources/vminitd/ManagedProcess.swift @@ -327,9 +327,8 @@ extension ManagedProcess { } func closeStdin() throws { - try self.state.withLock { - try $0.io.closeStdin() - } + let io = self.state.withLock { $0.io } + try io.closeStdin() } func delete() async throws {