Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions Sources/Containerization/LinuxProcess.swift
Original file line number Diff line number Diff line change
Expand Up @@ -146,44 +146,9 @@ extension LinuxProcess {
}
}

if let stdin = self.ioSetup.stdin {
if let handle = handles[0] {
self.state.withLock {
$0.stdinRelay = Task {
for await data in stdin.reader.stream() {
do {
try handle.write(contentsOf: data)
} catch {
self.logger?.error("failed to write to stdin: \(error)")
break
}
}

do {
self.logger?.debug("stdin relay finished, closing")

// There's two ways we can wind up here:
//
// 1. The stream finished on its own (e.g. we wrote all the
// data) and we will close the underlying stdin in the guest below.
//
// 2. The client explicitly called closeStdin() themselves
// which will cancel this relay task AFTER actually closing
// the fds. If the client did that, then this task will be
// cancelled, and the fds are already gone so there's nothing
// for us to do.
if Task.isCancelled {
return
}

try await self._closeStdin()
} catch {
self.logger?.error("failed to close stdin: \(error)")
}
}
}
}
}
// Note: stdin relay is started separately via startStdinRelay() after
// the process has started, to avoid a deadlock where closeStdin is
// called before the process is consuming from the pipe.

var configuredStreams = 0
let (stream, cc) = AsyncStream<Void>.makeStream()
Expand Down Expand Up @@ -231,6 +196,45 @@ extension LinuxProcess {
return handles
}

func startStdinRelay(handle: FileHandle) {
guard let stdin = self.ioSetup.stdin else { return }

self.state.withLock {
$0.stdinRelay = Task {
for await data in stdin.reader.stream() {
do {
try handle.write(contentsOf: data)
} catch {
self.logger?.error("failed to write to stdin: \(error)")
break
}
}

do {
self.logger?.debug("stdin relay finished, closing")

// There's two ways we can wind up here:
//
// 1. The stream finished on its own (e.g. we wrote all the
// data) and we will close the underlying stdin in the guest below.
//
// 2. The client explicitly called closeStdin() themselves
// which will cancel this relay task AFTER actually closing
// the fds. If the client did that, then this task will be
// cancelled, and the fds are already gone so there's nothing
// for us to do.
if Task.isCancelled {
return
}

try await self._closeStdin()
} catch {
self.logger?.error("failed to close stdin: \(error)")
}
}
}
}

/// Start the process.
public func start() async throws {
do {
Expand Down Expand Up @@ -273,6 +277,12 @@ extension LinuxProcess {
containerID: self.owningContainer
)

// Start stdin relay after process launch to avoid filling the pipe
// buffer before the process is even running.
if let stdinHandle = result[0] {
self.startStdinRelay(handle: stdinHandle)
}

self.state.withLock {
$0.stdio = StdioHandles(
stdin: result[0],
Expand Down
267 changes: 267 additions & 0 deletions Sources/Integration/ContainerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,32 @@ extension IntegrationSuite {
}
}

final class ChunkedStdinBuffer: ReaderStream {
let chunks: [Data]
let delayMs: Int

init(chunks: [Data], delayMs: Int = 0) {
self.chunks = chunks
self.delayMs = delayMs
}

func stream() -> AsyncStream<Data> {
let chunks = self.chunks
let delayMs = self.delayMs
return AsyncStream { cont in
Task {
for chunk in chunks {
if delayMs > 0 {
try? await Task.sleep(for: .milliseconds(delayMs))
}
cont.yield(chunk)
}
cont.finish()
}
}
}
}

func testProcessEchoHi() async throws {
let id = "test-process-echo-hi"
let bs = try await bootstrap(id)
Expand Down Expand Up @@ -1761,4 +1787,245 @@ extension IntegrationSuite {
throw IntegrationError.assert(msg: "expected /etc/resolv.conf to contain DNS servers, got: \(output)")
}
}

func testLargeStdinInput() async throws {
let id = "test-large-stdin-input"

let bs = try await bootstrap(id)

let inputSize = 128 * 1024
let inputData = Data(repeating: 0x41, count: inputSize) // 'A' repeated

let buffer = BufferWriter()
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
config.process.arguments = ["cat"]
config.process.stdin = StdinBuffer(data: inputData)
config.process.stdout = buffer
config.bootLog = bs.bootLog
}

do {
try await container.create()
try await container.start()

let status = try await container.wait()
try await container.stop()

guard status.exitCode == 0 else {
throw IntegrationError.assert(msg: "process status \(status) != 0")
}

guard buffer.data.count == inputSize else {
throw IntegrationError.assert(
msg: "output size \(buffer.data.count) != input size \(inputSize)")
}

guard buffer.data == inputData else {
throw IntegrationError.assert(msg: "output data does not match input data")
}
} catch {
try? await container.stop()
throw error
}
}

func testExecLargeStdinInput() async throws {
let id = "test-exec-large-stdin-input"
let bs = try await bootstrap(id)

let inputSize = 128 * 1024
let inputData = Data(repeating: 0x42, count: inputSize)

let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
config.process.arguments = ["sleep", "100"]
config.bootLog = bs.bootLog
}

do {
try await container.create()
try await container.start()

let buffer = BufferWriter()
let exec = try await container.exec("large-stdin-exec") { config in
config.arguments = ["cat"]
config.stdin = StdinBuffer(data: inputData)
config.stdout = buffer
}

try await exec.start()
let status = try await exec.wait()
try await exec.delete()

guard status.exitCode == 0 else {
throw IntegrationError.assert(msg: "exec status \(status) != 0")
}

guard buffer.data.count == inputSize else {
throw IntegrationError.assert(msg: "output size \(buffer.data.count) != \(inputSize)")
}

guard buffer.data == inputData else {
throw IntegrationError.assert(msg: "output data mismatch")
}

try await container.kill(SIGKILL)
try await container.wait()
try await container.stop()
} catch {
try? await container.stop()
throw error
}
}

func testStdinExplicitClose() async throws {
let id = "test-stdin-explicit-close"
let bs = try await bootstrap(id)

let inputData = "explicit close test\n".data(using: .utf8)!
let buffer = BufferWriter()

let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
config.process.arguments = ["sleep", "100"]
config.bootLog = bs.bootLog
}

do {
try await container.create()
try await container.start()

let exec = try await container.exec("stdin-close-exec") { config in
config.arguments = ["head", "-n", "1"]
config.stdin = StdinBuffer(data: inputData)
config.stdout = buffer
}

try await exec.start()
let status = try await exec.wait()
try await exec.delete()

guard status.exitCode == 0 else {
throw IntegrationError.assert(msg: "exec status \(status) != 0")
}

guard buffer.data == inputData else {
throw IntegrationError.assert(msg: "output mismatch")
}

try await container.kill(SIGKILL)
try await container.wait()
try await container.stop()
} catch {
try? await container.stop()
throw error
}
}

func testStdinBinaryData() async throws {
let id = "test-stdin-binary-data"
let bs = try await bootstrap(id)

var inputData = Data()
for i: UInt8 in 0...255 {
inputData.append(contentsOf: [UInt8](repeating: i, count: 256))
}

let buffer = BufferWriter()
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
config.process.arguments = ["cat"]
config.process.stdin = StdinBuffer(data: inputData)
config.process.stdout = buffer
config.bootLog = bs.bootLog
}

do {
try await container.create()
try await container.start()

let status = try await container.wait()
try await container.stop()

guard status.exitCode == 0 else {
throw IntegrationError.assert(msg: "process status \(status) != 0")
}

guard buffer.data == inputData else {
throw IntegrationError.assert(msg: "binary data mismatch")
}
} catch {
try? await container.stop()
throw error
}
}

func testStdinMultipleChunks() async throws {
let id = "test-stdin-multiple-chunks"
let bs = try await bootstrap(id)

let chunks = (0..<10).map { i in
Data(repeating: UInt8(0x30 + i), count: 10 * 1024)
}
let expectedData = chunks.reduce(Data()) { $0 + $1 }

let buffer = BufferWriter()
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
config.process.arguments = ["cat"]
config.process.stdin = ChunkedStdinBuffer(chunks: chunks, delayMs: 10)
config.process.stdout = buffer
config.bootLog = bs.bootLog
}

do {
try await container.create()
try await container.start()

let status = try await container.wait()
try await container.stop()

guard status.exitCode == 0 else {
throw IntegrationError.assert(msg: "process status \(status) != 0")
}

guard buffer.data == expectedData else {
throw IntegrationError.assert(msg: "chunked data mismatch")
}
} catch {
try? await container.stop()
throw error
}
}

func testStdinVeryLarge() async throws {
let id = "test-stdin-very-large"
let bs = try await bootstrap(id)

let inputSize = 10 * 1024 * 1024
let inputData = Data(repeating: 0x58, count: inputSize)

let stdout = DiscardingWriter()
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
config.process.arguments = ["wc", "-c"]
config.process.stdin = StdinBuffer(data: inputData)
config.process.stdout = stdout
config.bootLog = bs.bootLog
}

do {
try await container.create()
try await container.start()

let status = try await container.wait()
try await container.stop()

guard status.exitCode == 0 else {
throw IntegrationError.assert(msg: "process status \(status) != 0")
}

guard stdout.count > 0 else {
throw IntegrationError.assert(msg: "no output from wc")
}
} catch {
try? await container.stop()
throw error
}
}
}
6 changes: 6 additions & 0 deletions Sources/Integration/Suite.swift
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ struct IntegrationSuite: AsyncParsableCommand {
Test("container read-only rootfs", testReadOnlyRootfs),
Test("container read-only rootfs hosts file", testReadOnlyRootfsHostsFileWritten),
Test("container read-only rootfs DNS", testReadOnlyRootfsDNSConfigured),
Test("large stdin input", testLargeStdinInput),
Test("exec large stdin input", testExecLargeStdinInput),
Test("stdin explicit close", testStdinExplicitClose),
Test("stdin binary data", testStdinBinaryData),
Test("stdin multiple chunks", testStdinMultipleChunks),
Test("stdin very large", testStdinVeryLarge),

// Pods
Test("pod single container", testPodSingleContainer),
Expand Down
Loading