diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 162817d..20a5a28 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -15,22 +15,22 @@ import ( // readCloseSpy records whether Close was called. type readCloseSpy struct { io.Reader - closed atomic.Bool + closeCount atomic.Uint32 } func (r *readCloseSpy) Close() error { - r.closed.Store(true) + r.closeCount.Add(1) return nil } // writeCloseSpy records whether Close was called. type writeCloseSpy struct { io.Writer - closed atomic.Bool + closeCount atomic.Uint32 } func (w *writeCloseSpy) Close() error { - w.closed.Store(true) + w.closeCount.Add(1) return nil } @@ -63,28 +63,52 @@ func TestGoStageHonorsStreamOwnership(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closing stdin=%v", !tc.leaveIn) - assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closing stdout=%v", !tc.leaveOut) + if tc.leaveIn { + assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn) + } else { + assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn) + } + if tc.leaveOut { + assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut) + } else { + assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut) + } }) } } func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { - borrowedInput := strings.NewReader("borrowed") - assert.Same(t, borrowedInput, Input(borrowedInput).Reader()) - assert.Nil(t, Input(borrowedInput).Closer()) - - ownedInput := &readCloseSpy{Reader: strings.NewReader("owned")} - assert.Same(t, ownedInput, ClosingInput(ownedInput).Reader()) - assert.Same(t, ownedInput, ClosingInput(ownedInput).Closer()) - - borrowedOutput := &strings.Builder{} - assert.Same(t, borrowedOutput, Output(borrowedOutput).Writer()) - assert.Nil(t, Output(borrowedOutput).Closer()) - - ownedOutput := &writeCloseSpy{Writer: io.Discard} - assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Writer()) - assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Closer()) + borrowedReader := &readCloseSpy{Reader: strings.NewReader("borrowed")} + borrowedInput := Input(borrowedReader) + assert.Same(t, borrowedReader, borrowedInput.Reader()) + assert.NoError(t, borrowedInput.Close()) + assert.EqualValues(t, 0, borrowedReader.closeCount.Load()) + assert.NoError(t, borrowedInput.Close()) + assert.EqualValues(t, 0, borrowedReader.closeCount.Load()) + + ownedReader := &readCloseSpy{Reader: strings.NewReader("owned")} + ownedInput := ClosingInput(ownedReader) + assert.Same(t, ownedReader, ownedInput.Reader()) + assert.NoError(t, ownedInput.Close()) + assert.EqualValues(t, 1, ownedReader.closeCount.Load()) + assert.NoError(t, ownedInput.Close()) + assert.EqualValues(t, 1, ownedReader.closeCount.Load()) + + borrowedWriter := &writeCloseSpy{Writer: &strings.Builder{}} + borrowedOutput := Output(borrowedWriter) + assert.Same(t, borrowedWriter, borrowedOutput.Writer()) + assert.NoError(t, borrowedOutput.Close()) + assert.EqualValues(t, 0, borrowedWriter.closeCount.Load()) + assert.NoError(t, borrowedOutput.Close()) + assert.EqualValues(t, 0, borrowedWriter.closeCount.Load()) + + ownedWriter := &writeCloseSpy{Writer: &writeCloseSpy{Writer: io.Discard}} + ownedOutput := ClosingOutput(ownedWriter) + assert.Same(t, ownedWriter, ownedOutput.Writer()) + assert.NoError(t, ownedOutput.Close()) + assert.EqualValues(t, 1, ownedWriter.closeCount.Load()) + assert.NoError(t, ownedOutput.Close()) + assert.EqualValues(t, 1, ownedWriter.closeCount.Load()) } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a @@ -109,7 +133,11 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, in.closed.Load(), "closing stdin=%v", !leave) + if leave { + assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !leave) + } else { + assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !leave) + } }) } } @@ -136,19 +164,23 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, out.closed.Load(), "closing stdout=%v", !leave) + if leave { + assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !leave) + } else { + assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !leave) + } }) } } -func inputForTest(r io.ReadCloser, closing bool) InputStream { +func inputForTest(r io.ReadCloser, closing bool) *InputStream { if closing { return ClosingInput(r) } return Input(r) } -func outputForTest(w io.WriteCloser, closing bool) OutputStream { +func outputForTest(w io.WriteCloser, closing bool) *OutputStream { if closing { return ClosingOutput(w) } diff --git a/pipe/command.go b/pipe/command.go index d85314b..908516b 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -88,12 +88,10 @@ func (s *commandStage) Requirements() StageRequirements { func (s *commandStage) Start( ctx context.Context, opts StageOptions, - ins InputStream, outs OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { - stdin := ins.Reader() - stdinCloser := ins.Closer() - stdout := outs.Writer() - stdoutCloser := outs.Closer() + r := stdin.Reader() + w := stdout.Writer() if s.cmd.Dir == "" { s.cmd.Dir = opts.Dir @@ -105,18 +103,16 @@ func (s *commandStage) Start( var earlyClosers []io.Closer // See the type comment for `Stage` for the explanation of this closing behavior. - if stdin != nil { - s.cmd.Stdin = stdin + if r != nil { + s.cmd.Stdin = r } - if stdinCloser != nil { - if _, ok := stdin.(*os.File); ok { - // We can close our copy as soon as the command has started - earlyClosers = append(earlyClosers, stdinCloser) - } else { - // We need to close `stdin`, but only after the command has finished - s.lateClosers = append(s.lateClosers, stdinCloser) - } + if _, ok := r.(*os.File); ok { + // We can close our copy as soon as the command has started + earlyClosers = append(earlyClosers, stdin) + } else { + // We need to close `stdin`, but only after the command has finished + s.lateClosers = append(s.lateClosers, stdin) } closeEarlyClosers := func() { @@ -133,28 +129,24 @@ func (s *commandStage) Start( _ = s.closeLateClosers() } - if stdout != nil { - if f, ok := stdout.(*os.File); ok { + if w != nil { + if f, ok := w.(*os.File); ok { s.cmd.Stdout = f - if stdoutCloser != nil { - earlyClosers = append(earlyClosers, stdoutCloser) - } + earlyClosers = append(earlyClosers, stdout) } else { - if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) - } + s.lateClosers = append(s.lateClosers, stdout) // Route the copy through our own pipe so we can use a // pooled buffer rather than letting exec.Cmd allocate a // fresh 32KB buffer for its internal io.Copy. - ec, err := s.setupPooledStdout(stdout) + ec, err := s.setupPooledStdout(w) if err != nil { cleanupOnStartFailure() return err } earlyClosers = append(earlyClosers, ec) } - } else if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) + } else { + s.lateClosers = append(s.lateClosers, stdout) } // If the caller hasn't arranged otherwise, read the command's diff --git a/pipe/command_stdout_fastpath_test.go b/pipe/command_stdout_fastpath_test.go index 4b2240c..0de3f51 100644 --- a/pipe/command_stdout_fastpath_test.go +++ b/pipe/command_stdout_fastpath_test.go @@ -42,9 +42,11 @@ func TestCommandStageStdoutFastPath(t *testing.T) { cmd := exec.Command("true") s := CommandStage("true", cmd).(*commandStage) - stdout := OutputStream{writer: f} + var stdout *OutputStream if tc.closingStdout { stdout = ClosingOutput(f) + } else { + stdout = Output(f) } require.NoError(t, s.Start(ctx, StageOptions{}, Input(nil), stdout)) diff --git a/pipe/env_stage.go b/pipe/env_stage.go index 7503e5a..3db2296 100644 --- a/pipe/env_stage.go +++ b/pipe/env_stage.go @@ -37,7 +37,7 @@ func (s *stageWithExtraEnv) Requirements() StageRequirements { func (s *stageWithExtraEnv) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { opts.Vars = append(opts.Vars[:len(opts.Vars):len(opts.Vars)], func(_ context.Context, vars []EnvVar) []EnvVar { return append(vars, s.env...) diff --git a/pipe/function.go b/pipe/function.go index d8a09af..a80947f 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -87,17 +87,15 @@ func (s *goStage) Requirements() StageRequirements { func (s *goStage) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { r := stdin.Reader() - stdinCloser := stdin.Closer() if r == nil { // treat nil as empty input. r = strings.NewReader("") } w := stdout.Writer() - stdoutCloser := stdout.Closer() if w == nil { // treat nil output as /dev/null w = io.Discard @@ -110,15 +108,11 @@ func (s *goStage) Start( s.err = opts.PanicHandler(p) } } - if stdoutCloser != nil { - if err := stdoutCloser.Close(); err != nil && s.err == nil { - s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err) - } + if err := stdout.Close(); err != nil && s.err == nil { + s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err) } - if stdinCloser != nil { - if err := stdinCloser.Close(); err != nil && s.err == nil { - s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err) - } + if err := stdin.Close(); err != nil && s.err == nil { + s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err) } close(s.done) }() diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index efd72d5..badb990 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -104,12 +104,12 @@ func (s *pipeSniffingStage) Requirements() pipe.StageRequirements { func (s *pipeSniffingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { s.stdin = stdin.Reader() - stdin.Close() + _ = stdin.Close() s.stdout = stdout.Writer() - stdout.Close() + _ = stdout.Close() return nil } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 2ba54cc..085fdd3 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -55,12 +55,10 @@ type ContextValuesFunc func(context.Context) []EnvVar type Pipeline struct { env Env - stdin io.Reader - stdinCloser io.Closer - stdout io.Writer - stdoutCloser io.Closer - stages []Stage - cancel func() + stdin *InputStream + stdout *OutputStream + stages []Stage + cancel func() // Atomically written and read value, nonzero if the pipeline has // been started. This is only used for lifecycle sanity checks but @@ -104,8 +102,7 @@ func WithDir(dir string) Option { // even if `Start()` returns an error. func WithStdin(stdin io.Reader) Option { return func(p *Pipeline) { - p.stdin = stdin - p.stdinCloser = nil + p.stdin = Input(stdin) } } @@ -114,8 +111,7 @@ func WithStdin(stdin io.Reader) Option { // even if `Start()` returns an error. func WithStdout(stdout io.Writer) Option { return func(p *Pipeline) { - p.stdout = stdout - p.stdoutCloser = nil + p.stdout = Output(stdout) } } @@ -125,8 +121,7 @@ func WithStdout(stdout io.Writer) Option { // an error. func WithStdoutCloser(stdout io.WriteCloser) Option { return func(p *Pipeline) { - p.stdout = stdout - p.stdoutCloser = stdout + p.stdout = ClosingOutput(stdout) } } @@ -225,8 +220,8 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { type stageStarter struct { requirements StageRequirements - stdin InputStream - stdout OutputStream + stdin *InputStream + stdout *OutputStream } func (requirement StreamRequirement) validate() error { @@ -255,9 +250,7 @@ func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutCo } func (p *Pipeline) abortBeforeStart(s Stage, err error) error { - if p.stdoutCloser != nil { - _ = p.stdoutCloser.Close() - } + _ = p.stdout.Close() p.cancel() p.eventHandler(&Event{ Command: s.Name(), @@ -335,16 +328,13 @@ func (p *Pipeline) Start(ctx context.Context) error { if p.stdin != nil { // Arrange for the input of the 0th stage to come from // `p.stdin`: - stageStarters[0].stdin = Input(p.stdin) + stageStarters[0].stdin = p.stdin } if p.stdout != nil { - i := len(p.stages) - 1 - ss := &stageStarters[i] - ss.stdout = OutputStream{ - writer: p.stdout, - closer: p.stdoutCloser, - } + // Arrange for the output of the last stage to go to + // `p.stdout`: + stageStarters[len(p.stages)-1].stdout = p.stdout } // Clean up any processes and pipes that have been created. `i` is the @@ -355,13 +345,13 @@ func (p *Pipeline) Start(ctx context.Context) error { // the previous stage was writing to. That should cause it to exit // even if it's not minding its context. if closeFailedStageStdin { - stageStarters[i].stdin.Close() + _ = stageStarters[i].stdin.Close() } // If stdout was supplied with WithStdoutCloser but the final stage // was never started, then the pipeline still owns that closer. - if i < len(p.stages)-1 && p.stdoutCloser != nil { - _ = p.stdoutCloser.Close() + if i < len(p.stages)-1 { + _ = p.stdout.Close() } // Kill and wait for any stages that have been started @@ -407,7 +397,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ctx, p.stageOptions(), ss.stdin, ss.stdout, ); err != nil { - nextSS.stdin.Close() + _ = nextSS.stdin.Close() return abort(i, err, false) } } @@ -433,8 +423,7 @@ func (p *Pipeline) Start(ctx context.Context) error { func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { var buf bytes.Buffer - p.stdout = &buf - p.stdoutCloser = nil + p.stdout = Output(&buf) err := p.Run(ctx) return buf.Bytes(), err } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index fe4b630..82dc2cd 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -673,10 +673,10 @@ func (s ErrorStartingStage) Requirements() pipe.StageRequirements { func (s ErrorStartingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { - stdin.Close() - stdout.Close() + _ = stdin.Close() + _ = stdout.Close() return s.err } @@ -700,13 +700,13 @@ func (s requirementStage) Requirements() pipe.StageRequirements { func (s requirementStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { if s.started != nil { *s.started = true } - stdin.Close() - stdout.Close() + _ = stdin.Close() + _ = stdout.Close() return nil } diff --git a/pipe/stage.go b/pipe/stage.go index a46c01e..9a35254 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -2,40 +2,64 @@ package pipe import ( "context" - "io" ) // Stage is an element of a `Pipeline`. It reads from standard input // and writes to standard output. // -// Who closes stdin and stdout? +// # Who closes stdin and stdout? // // A `Stage` as a whole is responsible for closing its end of stdin // and stdout if the corresponding stream is closing. That -// responsibility transfers to the stage as soon as `Start()` is called -// and applies even if `Start()` returns an error. Before returning an -// error from `Start()`, the stage must close any closing stream that -// it has not already handed off to something else that will close it -// promptly. The caller must not close a closing stream after passing it -// to `Start()`. +// responsibility transfers to the stage as soon as the stage's +// `Start()` method is called and applies even if `Start()` returns an +// error. Before returning an error from `Start()`, the stage must +// close any closing stream that it has not already handed off to +// something else that will close it promptly. The caller must not +// close a closing stream after passing it to `Start()`. // -// If the caller wants to retain ownership of stdin/stdout, it passes a -// non-closing stream. The stage must not close a non-closing stream, -// even if `Start()` returns an error. +// If the caller wants to retain ownership of stdin/stdout, it passes +// a non-closing stream. Calling `Close()` on a non-closing stream is +// a NOP, so it isn't harmful but isn't required. // -// Closing stdin/stdout tells the previous/next stage that this stage is -// done reading/writing data, which can affect their behavior. Therefore, -// after a successful start, a stage should close each one as soon as it -// is done with it. +// Closing stdin/stdout tells the previous/next stage that this stage +// is done reading/writing data, which can affect their behavior. +// Therefore, after a successful start, a stage should close each one +// as soon as it is done with it. Assume that this is done via the +// following local function // -// How this should be done depends on whether stdin/stdout are of type -// `*os.File`. +// closeStreams := func() { +// // Error handling omitted. +// _ = stdin.Close() +// _ = stdout.Close() +// } +// +// From the point of view of the pipeline as a whole, if stdin is +// provided by the user (`WithStdin()`), then we don't want the first +// stage to close it at all, whether it's an `*os.File` or not. The +// pipeline communicates this by passing a non-closing `InputStream` +// when it starts that stage. For stdout, it depends on whether the +// user supplied it using `WithStdout()` or `WithStdoutCloser()`. In +// any case, this function can close the streams anyway, because +// `InputStream.Close()` and `OutputStream.Close()` do nothing in +// those cases. +// +// When these closes should happen depends on what kind of stage it is +// and whether stdin/stdout are of type `*os.File`. +// +// ## A command stage // // If a stage is an external command, then the subprocess ultimately // needs its own copies of `*os.File` file descriptors for its stdin // and stdout. The external command will "always" [1] close those when // it exits. // +// (It's theoretically possible for a command to pass the open file +// descriptor to another, longer-lived process, in which case the file +// descriptor wouldn't necessarily get closed when the command +// finishes. But that's ill-behaved in a command that is being used in +// a pipeline, so we'll ignore that possibility.) +// // If the stage is an external command and one of the arguments is an // `*os.File`, then it can set the corresponding field of `exec.Cmd` // to that argument directly. This has the result that `exec.Cmd` @@ -43,114 +67,45 @@ import ( // subprocess. Therefore, the stage must close its copy of that // argument as soon as the external command has started, because the // external command will keep its own copy open as long as necessary -// (and no longer!). It should use roughly the following sequence: +// (and no longer!). Therefore, it should use roughly the following +// sequence: // -// cmd.Stdin = f // Similarly for stdout -// cmd.Start(…) -// f.Close() // close our copy -// cmd.Wait() +// cmd.Stdin = stdin.Reader() +// cmd.Stdout = stdout.Writer() +// err := cmd.Start(…) +// // Close our copies as soon as the command has started: +// closeStreams() +// if err != nil { +// return err +// } +// return cmd.Wait() // // If the stage is an external command and its stdin is not an // `*os.File`, then `exec.Cmd` will take care of creating an // `os.Pipe()`, copying from the provided reader into the pipe, and // eventually closing both ends of the pipe. The stage must close the -// provided stdin itself, but only _after_ the external command has -// finished, like so: +// provided stdin itself, but only _after_ the external command and +// the copy have finished, like so: // -// cmd.Stdin = r -// cmd.Start(…) -// cmd.Wait() -// r.Close() +// defer closeStreams() +// cmd.Stdin = stdin.Reader() +// cmd.Stdout = stdout.Writer() +// err := cmd.Start(…) +// if err != nil { +// return err +// } +// return cmd.Wait() // -// If the stage is an external command and its stdout is not an -// `*os.File`, the stage creates a pipe, passes the write end to the -// command, and copies from the read end to the provided writer. The -// stage must close the provided stdout itself, but only _after_ the -// external command and the copy have finished. +// ## A function stage // // If the stage is a Go function, then it holds the only copy of // stdin/stdout, so it must wait until the function is done before // closing them (regardless of their underlying type, like so: // // go func() { -// f(…, stdin, stdout) -// stdin.Close() -// stdout.Close() +// defer closeStreams() +// f(…, stdin.Reader(), stdout.Writer()) // }() -// -// From the point of view of the pipeline as a whole, if stdin is -// provided by the user (`WithStdin()`), then we don't want the first -// stage to close it at all, whether it's an `*os.File` or not. The -// pipeline communicates this by passing a non-closing `InputStream` -// when it starts that stage. For stdout, it depends on whether the -// user supplied it using `WithStdout()` or `WithStdoutCloser()`. -// -// [1] It's theoretically possible for a command to pass the open file -// descriptor to another, longer-lived process, in which case the -// file descriptor wouldn't necessarily get closed when the -// command finishes. But that's ill-behaved in a command that is -// being used in a pipeline, so we'll ignore that possibility. - -type InputStream struct { - reader io.Reader - closer io.Closer -} - -// The stage may read from r but must not close it. -func Input(r io.Reader) InputStream { - return InputStream{reader: r} -} - -// The stage is responsible for closing r. -func ClosingInput(r io.ReadCloser) InputStream { - return InputStream{reader: r, closer: r} -} - -func (s InputStream) Reader() io.Reader { - return s.reader -} - -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s InputStream) Closer() io.Closer { - return s.closer -} - -func (s InputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() - } -} - -type OutputStream struct { - writer io.Writer - closer io.Closer -} - -// The stage may write to w but must not close it. -func Output(w io.Writer) OutputStream { - return OutputStream{writer: w} -} - -// The stage is responsible for closing w. -func ClosingOutput(w io.WriteCloser) OutputStream { - return OutputStream{writer: w, closer: w} -} - -func (s OutputStream) Writer() io.Writer { - return s.writer -} - -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s OutputStream) Closer() io.Closer { - return s.closer -} - -func (s OutputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() - } -} - type Stage interface { // Name returns the name of the stage. Name() string @@ -164,18 +119,17 @@ type Stage interface { // `stdout` to collect its output. (`stdin.Reader()` or // `stdout.Writer()` might be `nil` if the stage is to receive no // input or produce no output, which might be the case for the - // first/last stage in a pipeline.) If `stdin` or `stdout` is - // closing, the stage is responsible for closing the corresponding - // stream, even if `Start()` returns an error. See the `Stage` type - // comment for more information about responsibility for closing - // stdin and stdout. + // first/last stage in a pipeline.) The stage is responsible for + // calling `stdin.Close()` and `stdout.Close()`, even if `Start()` + // returns an error. See the `Stage` type comment for more + // information about responsibility for closing stdin and stdout. // // If `Start()` returns without an error, `Wait()` must also be // called, to allow all resources to be freed. If `Start()` returns // an error, `Wait()` must not be called. Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error // Wait waits for the stage to be done, either because it has diff --git a/pipe/streams.go b/pipe/streams.go new file mode 100644 index 0000000..8ddd578 --- /dev/null +++ b/pipe/streams.go @@ -0,0 +1,142 @@ +package pipe + +import ( + "io" + "sync" +) + +// InputStream represents `stdin` for a stage, which might or might +// not need to be closed when the stage is done with it. It usually +// holds an `io.Reader`, which can be retrieved using `Reader()`. Its +// `Close()` method closes the reader if necessary (i.e., if the +// `InputStream` was constructed using `ClosingInput()`. The +// `Close()` method is idempotent. +// +// A nil `*InputStream` is a valid value. Its `Reader()` method +// returns `nil` and `Close()` does nothing successfully. +// +// It might seem like `InputStream` should implement `io.Reader` +// itself. But we want to avoid hiding the dynamic type of the +// `io.Reader` that is being used as the stdin of a pipeline. That +// object might be of a type that is subject to optimizations that +// aren't available for a generic `io.Reader`. For example, it might +// be an `*os.File` (which can be passed directly to subcommands or to +// `splice(2)`), or it might implement `io.WriterTo`. +type InputStream struct { + reader io.Reader + + // once is used to ensure that `Close()` is only called once. + once sync.Once + + // closer is set to `nil` after the first call to `Close()`. + closer io.Closer + + // closeErr is set to the error returned by the first call to + // `Close()`, and returned from that and any subsequent calls to + // `Close()`. + closeErr error +} + +// The stage may read from r but must not close it. +func Input(r io.Reader) *InputStream { + return &InputStream{reader: r} +} + +// The stage is responsible for closing r. +func ClosingInput(r io.ReadCloser) *InputStream { + return &InputStream{reader: r, closer: r} +} + +func (s *InputStream) Reader() io.Reader { + if s == nil { + return nil + } + return s.reader +} + +// Close closes the underlying reader if necessary. If `s` was +// constructed using `ClosingInput()`, then close the `io.ReadCloser` +// that was passed to that function. If `s` is `nil` or was +// constructed using `Input()`, then do nothing successfully. +func (s *InputStream) Close() error { + if s == nil { + return nil + } + + s.once.Do(func() { + if s.closer != nil { + s.closeErr = s.closer.Close() + s.closer = nil + } + }) + + return s.closeErr +} + +// OutputStream represents `stdout` for a stage, which might or might +// not need to be closed when the stage is done with it. It usually +// holds an `io.Writer`, which can be retrieved using `Writer()`. Its +// `Close()` method closes the writer if necessary (i.e., if the +// `OutputStream` was constructed using `ClosingOutput()`. The +// `Close()` method is idempotent. +// +// A nil `*OutputStream` is a valid value. Its `Writer()` method +// returns `nil` and `Close()` does nothing successfully. +// +// It might seem like `OutputStream` should implement `io.Writer` +// itself. But we want to avoid hiding the dynamic type of the +// `io.Writer` that is being used as the stdout of a pipeline. That +// object might be of a type that is subject to optimizations that +// aren't available for a generic `io.Writer`. For example, it might +// be an `*os.File` (which can be passed directly to subcommands or to +// `splice(2)`), or it might implement `io.ReaderFrom`. +type OutputStream struct { + writer io.Writer + + // once is used to ensure that `Close()` is only called once. + once sync.Once + + // closer is set to `nil` after the first call to `Close()`. + closer io.Closer + + // closeErr is set to the error returned by the first call to + // `Close()`, and returned from that and any subsequent calls to + // `Close()`. + closeErr error +} + +// The stage may write to w but must not close it. +func Output(w io.Writer) *OutputStream { + return &OutputStream{writer: w} +} + +// The stage is responsible for closing w. +func ClosingOutput(w io.WriteCloser) *OutputStream { + return &OutputStream{writer: w, closer: w} +} + +func (s *OutputStream) Writer() io.Writer { + if s == nil { + return nil + } + return s.writer +} + +// Close closes the underlying writer if necessary. If `s` was +// constructed using `ClosingOutput()`, then close the +// `io.WriteCloser` that was passed to that function. If `s` is `nil` +// or was constructed using `Output()`, then do nothing successfully. +func (s *OutputStream) Close() error { + if s == nil { + return nil + } + + s.once.Do(func() { + if s.closer != nil { + s.closeErr = s.closer.Close() + s.closer = nil + } + }) + + return s.closeErr +}