From 642eb3e4f65189d5b33461318d725ee888ac4e12 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 11:32:14 +0200 Subject: [PATCH 01/10] InputStream, OutputStream: move types to a separate file --- pipe/stage.go | 61 ----------------------------------------------- pipe/streams.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 61 deletions(-) create mode 100644 pipe/streams.go diff --git a/pipe/stage.go b/pipe/stage.go index a46c01e..8263beb 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -2,7 +2,6 @@ package pipe import ( "context" - "io" ) // Stage is an element of a `Pipeline`. It reads from standard input @@ -91,66 +90,6 @@ import ( // command finishes. But that's ill-behaved in a command that is // being used in a pipeline, so we'll ignore that possibility. -type InputStream struct { - reader io.Reader - closer io.Closer -} - -// The stage may read from r but must not close it. -func Input(r io.Reader) InputStream { - return InputStream{reader: r} -} - -// The stage is responsible for closing r. -func ClosingInput(r io.ReadCloser) InputStream { - return InputStream{reader: r, closer: r} -} - -func (s InputStream) Reader() io.Reader { - return s.reader -} - -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s InputStream) Closer() io.Closer { - return s.closer -} - -func (s InputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() - } -} - -type OutputStream struct { - writer io.Writer - closer io.Closer -} - -// The stage may write to w but must not close it. -func Output(w io.Writer) OutputStream { - return OutputStream{writer: w} -} - -// The stage is responsible for closing w. -func ClosingOutput(w io.WriteCloser) OutputStream { - return OutputStream{writer: w, closer: w} -} - -func (s OutputStream) Writer() io.Writer { - return s.writer -} - -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s OutputStream) Closer() io.Closer { - return s.closer -} - -func (s OutputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() - } -} - type Stage interface { // Name returns the name of the stage. Name() string diff --git a/pipe/streams.go b/pipe/streams.go new file mode 100644 index 0000000..259b8c5 --- /dev/null +++ b/pipe/streams.go @@ -0,0 +1,63 @@ +package pipe + +import "io" + +type InputStream struct { + reader io.Reader + closer io.Closer +} + +// The stage may read from r but must not close it. +func Input(r io.Reader) InputStream { + return InputStream{reader: r} +} + +// The stage is responsible for closing r. +func ClosingInput(r io.ReadCloser) InputStream { + return InputStream{reader: r, closer: r} +} + +func (s InputStream) Reader() io.Reader { + return s.reader +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s InputStream) Closer() io.Closer { + return s.closer +} + +func (s InputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} + +type OutputStream struct { + writer io.Writer + closer io.Closer +} + +// The stage may write to w but must not close it. +func Output(w io.Writer) OutputStream { + return OutputStream{writer: w} +} + +// The stage is responsible for closing w. +func ClosingOutput(w io.WriteCloser) OutputStream { + return OutputStream{writer: w, closer: w} +} + +func (s OutputStream) Writer() io.Writer { + return s.writer +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s OutputStream) Closer() io.Closer { + return s.closer +} + +func (s OutputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} From 67ca961aed435e4e24d2e9a34ced8aed70394a8d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:09:36 +0200 Subject: [PATCH 02/10] InputStream, OutputStream: change `Close()` methods to pass errors through --- pipe/pipe_matching_test.go | 4 ++-- pipe/pipeline.go | 4 ++-- pipe/pipeline_test.go | 8 ++++---- pipe/streams.go | 14 ++++++++------ 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index efd72d5..00e6df0 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -107,9 +107,9 @@ func (s *pipeSniffingStage) Start( stdin pipe.InputStream, stdout pipe.OutputStream, ) error { s.stdin = stdin.Reader() - stdin.Close() + _ = stdin.Close() s.stdout = stdout.Writer() - stdout.Close() + _ = stdout.Close() return nil } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 2ba54cc..7424faf 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -355,7 +355,7 @@ func (p *Pipeline) Start(ctx context.Context) error { // the previous stage was writing to. That should cause it to exit // even if it's not minding its context. if closeFailedStageStdin { - stageStarters[i].stdin.Close() + _ = stageStarters[i].stdin.Close() } // If stdout was supplied with WithStdoutCloser but the final stage @@ -407,7 +407,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ctx, p.stageOptions(), ss.stdin, ss.stdout, ); err != nil { - nextSS.stdin.Close() + _ = nextSS.stdin.Close() return abort(i, err, false) } } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index fe4b630..c04ec1b 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -675,8 +675,8 @@ func (s ErrorStartingStage) Start( _ context.Context, _ pipe.StageOptions, stdin pipe.InputStream, stdout pipe.OutputStream, ) error { - stdin.Close() - stdout.Close() + _ = stdin.Close() + _ = stdout.Close() return s.err } @@ -705,8 +705,8 @@ func (s requirementStage) Start( if s.started != nil { *s.started = true } - stdin.Close() - stdout.Close() + _ = stdin.Close() + _ = stdout.Close() return nil } diff --git a/pipe/streams.go b/pipe/streams.go index 259b8c5..c38f48e 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -26,10 +26,11 @@ func (s InputStream) Closer() io.Closer { return s.closer } -func (s InputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() +func (s InputStream) Close() error { + if s.closer == nil { + return nil } + return s.closer.Close() } type OutputStream struct { @@ -56,8 +57,9 @@ func (s OutputStream) Closer() io.Closer { return s.closer } -func (s OutputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() +func (s OutputStream) Close() error { + if s.closer == nil { + return nil } + return s.closer.Close() } From e3a82235a71e9be4b48b27f6edb01ebff95ff367 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 12:12:27 +0200 Subject: [PATCH 03/10] Stage: update docstring to reflect use of streams --- pipe/stage.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipe/stage.go b/pipe/stage.go index 8263beb..590a738 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -44,9 +44,9 @@ import ( // external command will keep its own copy open as long as necessary // (and no longer!). It should use roughly the following sequence: // -// cmd.Stdin = f // Similarly for stdout +// cmd.Stdin = stdin.Reader() // Similarly for stdout // cmd.Start(…) -// f.Close() // close our copy +// stdin.Close() // Close our copy // cmd.Wait() // // If the stage is an external command and its stdin is not an @@ -56,10 +56,10 @@ import ( // provided stdin itself, but only _after_ the external command has // finished, like so: // -// cmd.Stdin = r +// cmd.Stdin = stdin.Reader() // Similarly for stdout // cmd.Start(…) // cmd.Wait() -// r.Close() +// stdin.Close() // Close // // If the stage is an external command and its stdout is not an // `*os.File`, the stage creates a pipe, passes the write end to the From aa3db90cc7b034a797f6419ae1706315284f8f5a Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:14:13 +0200 Subject: [PATCH 04/10] InputStream, OutputStream: remove the `Closer()` methods This is to force all close calls to come through `InputStream.Close()` and `OutputStream.Close()`, which will soon get some more functionality. --- pipe/close_responsibility_test.go | 38 +++++++++++++++++++------------ pipe/command.go | 28 ++++++++--------------- pipe/function.go | 14 ++++-------- pipe/stage.go | 9 ++++---- pipe/streams.go | 10 -------- 5 files changed, 41 insertions(+), 58 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 162817d..c7a2a12 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -70,21 +70,29 @@ func TestGoStageHonorsStreamOwnership(t *testing.T) { } func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { - borrowedInput := strings.NewReader("borrowed") - assert.Same(t, borrowedInput, Input(borrowedInput).Reader()) - assert.Nil(t, Input(borrowedInput).Closer()) - - ownedInput := &readCloseSpy{Reader: strings.NewReader("owned")} - assert.Same(t, ownedInput, ClosingInput(ownedInput).Reader()) - assert.Same(t, ownedInput, ClosingInput(ownedInput).Closer()) - - borrowedOutput := &strings.Builder{} - assert.Same(t, borrowedOutput, Output(borrowedOutput).Writer()) - assert.Nil(t, Output(borrowedOutput).Closer()) - - ownedOutput := &writeCloseSpy{Writer: io.Discard} - assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Writer()) - assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Closer()) + borrowedReader := &readCloseSpy{Reader: strings.NewReader("borrowed")} + borrowedInput := Input(borrowedReader) + assert.Same(t, borrowedReader, borrowedInput.Reader()) + assert.NoError(t, borrowedInput.Close()) + assert.False(t, borrowedReader.closed.Load()) + + ownedReader := &readCloseSpy{Reader: strings.NewReader("owned")} + ownedInput := ClosingInput(ownedReader) + assert.Same(t, ownedReader, ownedInput.Reader()) + assert.NoError(t, ownedInput.Close()) + assert.True(t, ownedReader.closed.Load()) + + borrowedWriter := &writeCloseSpy{Writer: &strings.Builder{}} + borrowedOutput := Output(borrowedWriter) + assert.Same(t, borrowedWriter, borrowedOutput.Writer()) + assert.NoError(t, borrowedOutput.Close()) + assert.False(t, borrowedWriter.closed.Load()) + + ownedWriter := &writeCloseSpy{Writer: &writeCloseSpy{Writer: io.Discard}} + ownedOutput := ClosingOutput(ownedWriter) + assert.Same(t, ownedWriter, ownedOutput.Writer()) + assert.NoError(t, ownedOutput.Close()) + assert.True(t, ownedWriter.closed.Load()) } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a diff --git a/pipe/command.go b/pipe/command.go index d85314b..e1d5c01 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -91,9 +91,7 @@ func (s *commandStage) Start( ins InputStream, outs OutputStream, ) error { stdin := ins.Reader() - stdinCloser := ins.Closer() stdout := outs.Writer() - stdoutCloser := outs.Closer() if s.cmd.Dir == "" { s.cmd.Dir = opts.Dir @@ -109,14 +107,12 @@ func (s *commandStage) Start( s.cmd.Stdin = stdin } - if stdinCloser != nil { - if _, ok := stdin.(*os.File); ok { - // We can close our copy as soon as the command has started - earlyClosers = append(earlyClosers, stdinCloser) - } else { - // We need to close `stdin`, but only after the command has finished - s.lateClosers = append(s.lateClosers, stdinCloser) - } + if _, ok := stdin.(*os.File); ok { + // We can close our copy as soon as the command has started + earlyClosers = append(earlyClosers, ins) + } else { + // We need to close `stdin`, but only after the command has finished + s.lateClosers = append(s.lateClosers, ins) } closeEarlyClosers := func() { @@ -136,13 +132,9 @@ func (s *commandStage) Start( if stdout != nil { if f, ok := stdout.(*os.File); ok { s.cmd.Stdout = f - if stdoutCloser != nil { - earlyClosers = append(earlyClosers, stdoutCloser) - } + earlyClosers = append(earlyClosers, outs) } else { - if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) - } + s.lateClosers = append(s.lateClosers, outs) // Route the copy through our own pipe so we can use a // pooled buffer rather than letting exec.Cmd allocate a // fresh 32KB buffer for its internal io.Copy. @@ -153,8 +145,8 @@ func (s *commandStage) Start( } earlyClosers = append(earlyClosers, ec) } - } else if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) + } else { + s.lateClosers = append(s.lateClosers, outs) } // If the caller hasn't arranged otherwise, read the command's diff --git a/pipe/function.go b/pipe/function.go index d8a09af..6f45db2 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -90,14 +90,12 @@ func (s *goStage) Start( stdin InputStream, stdout OutputStream, ) error { r := stdin.Reader() - stdinCloser := stdin.Closer() if r == nil { // treat nil as empty input. r = strings.NewReader("") } w := stdout.Writer() - stdoutCloser := stdout.Closer() if w == nil { // treat nil output as /dev/null w = io.Discard @@ -110,15 +108,11 @@ func (s *goStage) Start( s.err = opts.PanicHandler(p) } } - if stdoutCloser != nil { - if err := stdoutCloser.Close(); err != nil && s.err == nil { - s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err) - } + if err := stdout.Close(); err != nil && s.err == nil { + s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err) } - if stdinCloser != nil { - if err := stdinCloser.Close(); err != nil && s.err == nil { - s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err) - } + if err := stdin.Close(); err != nil && s.err == nil { + s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err) } close(s.done) }() diff --git a/pipe/stage.go b/pipe/stage.go index 590a738..7e74f67 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -103,11 +103,10 @@ type Stage interface { // `stdout` to collect its output. (`stdin.Reader()` or // `stdout.Writer()` might be `nil` if the stage is to receive no // input or produce no output, which might be the case for the - // first/last stage in a pipeline.) If `stdin` or `stdout` is - // closing, the stage is responsible for closing the corresponding - // stream, even if `Start()` returns an error. See the `Stage` type - // comment for more information about responsibility for closing - // stdin and stdout. + // first/last stage in a pipeline.) The stage is responsible for + // calling `stdin.Close()` and `stdout.Close()`, even if `Start()` + // returns an error. See the `Stage` type comment for more + // information about responsibility for closing stdin and stdout. // // If `Start()` returns without an error, `Wait()` must also be // called, to allow all resources to be freed. If `Start()` returns diff --git a/pipe/streams.go b/pipe/streams.go index c38f48e..9bf0bd5 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -21,11 +21,6 @@ func (s InputStream) Reader() io.Reader { return s.reader } -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s InputStream) Closer() io.Closer { - return s.closer -} - func (s InputStream) Close() error { if s.closer == nil { return nil @@ -52,11 +47,6 @@ func (s OutputStream) Writer() io.Writer { return s.writer } -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s OutputStream) Closer() io.Closer { - return s.closer -} - func (s OutputStream) Close() error { if s.closer == nil { return nil From 63d8e51c30759555f2af6e76c4d9360ff92ef29d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 13:54:02 +0200 Subject: [PATCH 05/10] commandStage.Start(): rename some local variables --- pipe/command.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index e1d5c01..31ae198 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -88,10 +88,10 @@ func (s *commandStage) Requirements() StageRequirements { func (s *commandStage) Start( ctx context.Context, opts StageOptions, - ins InputStream, outs OutputStream, + stdin InputStream, stdout OutputStream, ) error { - stdin := ins.Reader() - stdout := outs.Writer() + r := stdin.Reader() + w := stdout.Writer() if s.cmd.Dir == "" { s.cmd.Dir = opts.Dir @@ -103,16 +103,16 @@ func (s *commandStage) Start( var earlyClosers []io.Closer // See the type comment for `Stage` for the explanation of this closing behavior. - if stdin != nil { - s.cmd.Stdin = stdin + if r != nil { + s.cmd.Stdin = r } - if _, ok := stdin.(*os.File); ok { + if _, ok := r.(*os.File); ok { // We can close our copy as soon as the command has started - earlyClosers = append(earlyClosers, ins) + earlyClosers = append(earlyClosers, stdin) } else { // We need to close `stdin`, but only after the command has finished - s.lateClosers = append(s.lateClosers, ins) + s.lateClosers = append(s.lateClosers, stdin) } closeEarlyClosers := func() { @@ -129,16 +129,16 @@ func (s *commandStage) Start( _ = s.closeLateClosers() } - if stdout != nil { - if f, ok := stdout.(*os.File); ok { + if w != nil { + if f, ok := w.(*os.File); ok { s.cmd.Stdout = f - earlyClosers = append(earlyClosers, outs) + earlyClosers = append(earlyClosers, stdout) } else { - s.lateClosers = append(s.lateClosers, outs) + s.lateClosers = append(s.lateClosers, stdout) // Route the copy through our own pipe so we can use a // pooled buffer rather than letting exec.Cmd allocate a // fresh 32KB buffer for its internal io.Copy. - ec, err := s.setupPooledStdout(stdout) + ec, err := s.setupPooledStdout(w) if err != nil { cleanupOnStartFailure() return err @@ -146,7 +146,7 @@ func (s *commandStage) Start( earlyClosers = append(earlyClosers, ec) } } else { - s.lateClosers = append(s.lateClosers, outs) + s.lateClosers = append(s.lateClosers, stdout) } // If the caller hasn't arranged otherwise, read the command's From 6ef82b52dc931929be259988004805e30408cd18 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:15:11 +0200 Subject: [PATCH 06/10] InputStream, OutputStream: make these into pointer types This will be needed in a moment. --- pipe/close_responsibility_test.go | 4 +- pipe/command.go | 2 +- pipe/command_stdout_fastpath_test.go | 4 +- pipe/env_stage.go | 2 +- pipe/function.go | 2 +- pipe/pipe_matching_test.go | 2 +- pipe/pipeline.go | 10 ++--- pipe/pipeline_test.go | 4 +- pipe/stage.go | 2 +- pipe/streams.go | 58 +++++++++++++++++++++------- 10 files changed, 61 insertions(+), 29 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index c7a2a12..8765d90 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -149,14 +149,14 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { } } -func inputForTest(r io.ReadCloser, closing bool) InputStream { +func inputForTest(r io.ReadCloser, closing bool) *InputStream { if closing { return ClosingInput(r) } return Input(r) } -func outputForTest(w io.WriteCloser, closing bool) OutputStream { +func outputForTest(w io.WriteCloser, closing bool) *OutputStream { if closing { return ClosingOutput(w) } diff --git a/pipe/command.go b/pipe/command.go index 31ae198..908516b 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -88,7 +88,7 @@ func (s *commandStage) Requirements() StageRequirements { func (s *commandStage) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { r := stdin.Reader() w := stdout.Writer() diff --git a/pipe/command_stdout_fastpath_test.go b/pipe/command_stdout_fastpath_test.go index 4b2240c..0de3f51 100644 --- a/pipe/command_stdout_fastpath_test.go +++ b/pipe/command_stdout_fastpath_test.go @@ -42,9 +42,11 @@ func TestCommandStageStdoutFastPath(t *testing.T) { cmd := exec.Command("true") s := CommandStage("true", cmd).(*commandStage) - stdout := OutputStream{writer: f} + var stdout *OutputStream if tc.closingStdout { stdout = ClosingOutput(f) + } else { + stdout = Output(f) } require.NoError(t, s.Start(ctx, StageOptions{}, Input(nil), stdout)) diff --git a/pipe/env_stage.go b/pipe/env_stage.go index 7503e5a..3db2296 100644 --- a/pipe/env_stage.go +++ b/pipe/env_stage.go @@ -37,7 +37,7 @@ func (s *stageWithExtraEnv) Requirements() StageRequirements { func (s *stageWithExtraEnv) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { opts.Vars = append(opts.Vars[:len(opts.Vars):len(opts.Vars)], func(_ context.Context, vars []EnvVar) []EnvVar { return append(vars, s.env...) diff --git a/pipe/function.go b/pipe/function.go index 6f45db2..a80947f 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -87,7 +87,7 @@ func (s *goStage) Requirements() StageRequirements { func (s *goStage) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { r := stdin.Reader() if r == nil { diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index 00e6df0..badb990 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -104,7 +104,7 @@ func (s *pipeSniffingStage) Requirements() pipe.StageRequirements { func (s *pipeSniffingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { s.stdin = stdin.Reader() _ = stdin.Close() diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 7424faf..bbae72b 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -225,8 +225,8 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { type stageStarter struct { requirements StageRequirements - stdin InputStream - stdout OutputStream + stdin *InputStream + stdout *OutputStream } func (requirement StreamRequirement) validate() error { @@ -339,9 +339,9 @@ func (p *Pipeline) Start(ctx context.Context) error { } if p.stdout != nil { - i := len(p.stages) - 1 - ss := &stageStarters[i] - ss.stdout = OutputStream{ + // Arrange for the output of the last stage to go to + // `p.stdout`: + stageStarters[len(p.stages)-1].stdout = &OutputStream{ writer: p.stdout, closer: p.stdoutCloser, } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index c04ec1b..82dc2cd 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -673,7 +673,7 @@ func (s ErrorStartingStage) Requirements() pipe.StageRequirements { func (s ErrorStartingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { _ = stdin.Close() _ = stdout.Close() @@ -700,7 +700,7 @@ func (s requirementStage) Requirements() pipe.StageRequirements { func (s requirementStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { if s.started != nil { *s.started = true diff --git a/pipe/stage.go b/pipe/stage.go index 7e74f67..0a22079 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -113,7 +113,7 @@ type Stage interface { // an error, `Wait()` must not be called. Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error // Wait waits for the stage to be done, either because it has diff --git a/pipe/streams.go b/pipe/streams.go index 9bf0bd5..790f58e 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -2,53 +2,83 @@ package pipe import "io" +// InputStream represents `stdin` for a stage, which might or might +// not need to be closed when the stage is done with it. It usually +// holds an `io.Reader`, which can be retrieved using `Reader()`. Its +// `Close()` method closes the reader if necessary (i.e., if the +// `InputStream` was constructed using `ClosingInput()`. +// +// A nil `*InputStream` is a valid value. Its `Reader()` method +// returns `nil` and `Close()` does nothing successfully. type InputStream struct { reader io.Reader closer io.Closer } // The stage may read from r but must not close it. -func Input(r io.Reader) InputStream { - return InputStream{reader: r} +func Input(r io.Reader) *InputStream { + return &InputStream{reader: r} } // The stage is responsible for closing r. -func ClosingInput(r io.ReadCloser) InputStream { - return InputStream{reader: r, closer: r} +func ClosingInput(r io.ReadCloser) *InputStream { + return &InputStream{reader: r, closer: r} } -func (s InputStream) Reader() io.Reader { +func (s *InputStream) Reader() io.Reader { + if s == nil { + return nil + } return s.reader } -func (s InputStream) Close() error { - if s.closer == nil { +// Close closes the underlying reader if necessary. If `s` was +// constructed using `ClosingInput()`, then close the `io.ReadCloser` +// that was passed to that function. If `s` is `nil` or was +// constructed using `Input()`, then do nothing successfully. +func (s *InputStream) Close() error { + if s == nil || s.closer == nil { return nil } return s.closer.Close() } +// OutputStream represents `stdout` for a stage, which might or might +// not need to be closed when the stage is done with it. It usually +// holds an `io.Writer`, which can be retrieved using `Writer()`. Its +// `Close()` method closes the writer if necessary (i.e., if the +// `OutputStream` was constructed using `ClosingOutput()`. +// +// A nil `*OutputStream` is a valid value. Its `Writer()` method +// returns `nil` and `Close()` does nothing successfully. type OutputStream struct { writer io.Writer closer io.Closer } // The stage may write to w but must not close it. -func Output(w io.Writer) OutputStream { - return OutputStream{writer: w} +func Output(w io.Writer) *OutputStream { + return &OutputStream{writer: w} } // The stage is responsible for closing w. -func ClosingOutput(w io.WriteCloser) OutputStream { - return OutputStream{writer: w, closer: w} +func ClosingOutput(w io.WriteCloser) *OutputStream { + return &OutputStream{writer: w, closer: w} } -func (s OutputStream) Writer() io.Writer { +func (s *OutputStream) Writer() io.Writer { + if s == nil { + return nil + } return s.writer } -func (s OutputStream) Close() error { - if s.closer == nil { +// Close closes the underlying writer if necessary. If `s` was +// constructed using `ClosingOutput()`, then close the +// `io.WriteCloser` that was passed to that function. If `s` is `nil` +// or was constructed using `Output()`, then do nothing successfully. +func (s *OutputStream) Close() error { + if s == nil || s.closer == nil { return nil } return s.closer.Close() From 29edba336aab7f29805f36c07efc88c9fe46e0bc Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:18:00 +0200 Subject: [PATCH 07/10] InputStream, OutputStream: make `Close()` idempotent Ignore all but the first call to `Close()`. --- pipe/close_responsibility_test.go | 48 ++++++++++++++++++++------- pipe/streams.go | 55 +++++++++++++++++++++++++++---- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 8765d90..20a5a28 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -15,22 +15,22 @@ import ( // readCloseSpy records whether Close was called. type readCloseSpy struct { io.Reader - closed atomic.Bool + closeCount atomic.Uint32 } func (r *readCloseSpy) Close() error { - r.closed.Store(true) + r.closeCount.Add(1) return nil } // writeCloseSpy records whether Close was called. type writeCloseSpy struct { io.Writer - closed atomic.Bool + closeCount atomic.Uint32 } func (w *writeCloseSpy) Close() error { - w.closed.Store(true) + w.closeCount.Add(1) return nil } @@ -63,8 +63,16 @@ func TestGoStageHonorsStreamOwnership(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closing stdin=%v", !tc.leaveIn) - assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closing stdout=%v", !tc.leaveOut) + if tc.leaveIn { + assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn) + } else { + assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn) + } + if tc.leaveOut { + assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut) + } else { + assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut) + } }) } } @@ -74,25 +82,33 @@ func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { borrowedInput := Input(borrowedReader) assert.Same(t, borrowedReader, borrowedInput.Reader()) assert.NoError(t, borrowedInput.Close()) - assert.False(t, borrowedReader.closed.Load()) + assert.EqualValues(t, 0, borrowedReader.closeCount.Load()) + assert.NoError(t, borrowedInput.Close()) + assert.EqualValues(t, 0, borrowedReader.closeCount.Load()) ownedReader := &readCloseSpy{Reader: strings.NewReader("owned")} ownedInput := ClosingInput(ownedReader) assert.Same(t, ownedReader, ownedInput.Reader()) assert.NoError(t, ownedInput.Close()) - assert.True(t, ownedReader.closed.Load()) + assert.EqualValues(t, 1, ownedReader.closeCount.Load()) + assert.NoError(t, ownedInput.Close()) + assert.EqualValues(t, 1, ownedReader.closeCount.Load()) borrowedWriter := &writeCloseSpy{Writer: &strings.Builder{}} borrowedOutput := Output(borrowedWriter) assert.Same(t, borrowedWriter, borrowedOutput.Writer()) assert.NoError(t, borrowedOutput.Close()) - assert.False(t, borrowedWriter.closed.Load()) + assert.EqualValues(t, 0, borrowedWriter.closeCount.Load()) + assert.NoError(t, borrowedOutput.Close()) + assert.EqualValues(t, 0, borrowedWriter.closeCount.Load()) ownedWriter := &writeCloseSpy{Writer: &writeCloseSpy{Writer: io.Discard}} ownedOutput := ClosingOutput(ownedWriter) assert.Same(t, ownedWriter, ownedOutput.Writer()) assert.NoError(t, ownedOutput.Close()) - assert.True(t, ownedWriter.closed.Load()) + assert.EqualValues(t, 1, ownedWriter.closeCount.Load()) + assert.NoError(t, ownedOutput.Close()) + assert.EqualValues(t, 1, ownedWriter.closeCount.Load()) } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a @@ -117,7 +133,11 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, in.closed.Load(), "closing stdin=%v", !leave) + if leave { + assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !leave) + } else { + assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !leave) + } }) } } @@ -144,7 +164,11 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, out.closed.Load(), "closing stdout=%v", !leave) + if leave { + assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !leave) + } else { + assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !leave) + } }) } } diff --git a/pipe/streams.go b/pipe/streams.go index 790f58e..ef123dd 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -1,18 +1,32 @@ package pipe -import "io" +import ( + "io" + "sync" +) // InputStream represents `stdin` for a stage, which might or might // not need to be closed when the stage is done with it. It usually // holds an `io.Reader`, which can be retrieved using `Reader()`. Its // `Close()` method closes the reader if necessary (i.e., if the -// `InputStream` was constructed using `ClosingInput()`. +// `InputStream` was constructed using `ClosingInput()`. The +// `Close()` method is idempotent. // // A nil `*InputStream` is a valid value. Its `Reader()` method // returns `nil` and `Close()` does nothing successfully. type InputStream struct { reader io.Reader + + // once is used to ensure that `Close()` is only called once. + once sync.Once + + // closer is set to `nil` after the first call to `Close()`. closer io.Closer + + // closeErr is set to the error returned by the first call to + // `Close()`, and returned from that and any subsequent calls to + // `Close()`. + closeErr error } // The stage may read from r but must not close it. @@ -37,23 +51,42 @@ func (s *InputStream) Reader() io.Reader { // that was passed to that function. If `s` is `nil` or was // constructed using `Input()`, then do nothing successfully. func (s *InputStream) Close() error { - if s == nil || s.closer == nil { + if s == nil { return nil } - return s.closer.Close() + + s.once.Do(func() { + if s.closer != nil { + s.closeErr = s.closer.Close() + s.closer = nil + } + }) + + return s.closeErr } // OutputStream represents `stdout` for a stage, which might or might // not need to be closed when the stage is done with it. It usually // holds an `io.Writer`, which can be retrieved using `Writer()`. Its // `Close()` method closes the writer if necessary (i.e., if the -// `OutputStream` was constructed using `ClosingOutput()`. +// `OutputStream` was constructed using `ClosingOutput()`. The +// `Close()` method is idempotent. // // A nil `*OutputStream` is a valid value. Its `Writer()` method // returns `nil` and `Close()` does nothing successfully. type OutputStream struct { writer io.Writer + + // once is used to ensure that `Close()` is only called once. + once sync.Once + + // closer is set to `nil` after the first call to `Close()`. closer io.Closer + + // closeErr is set to the error returned by the first call to + // `Close()`, and returned from that and any subsequent calls to + // `Close()`. + closeErr error } // The stage may write to w but must not close it. @@ -78,8 +111,16 @@ func (s *OutputStream) Writer() io.Writer { // `io.WriteCloser` that was passed to that function. If `s` is `nil` // or was constructed using `Output()`, then do nothing successfully. func (s *OutputStream) Close() error { - if s == nil || s.closer == nil { + if s == nil { return nil } - return s.closer.Close() + + s.once.Do(func() { + if s.closer != nil { + s.closeErr = s.closer.Close() + s.closer = nil + } + }) + + return s.closeErr } From 84662746af293b073c65d9aafc311c8fd1cb4dbc Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:19:02 +0200 Subject: [PATCH 08/10] InputStream, OutputStream: improve docstrings In the type comments, explain why these types don't implement `io.Reader` and `io.Writer`. Otherwise, some helpful person is sure to come along and add `Read()` and `Write()` methods, to the detriment of performance and even changing some semantics. --- pipe/streams.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pipe/streams.go b/pipe/streams.go index ef123dd..8ddd578 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -14,6 +14,14 @@ import ( // // A nil `*InputStream` is a valid value. Its `Reader()` method // returns `nil` and `Close()` does nothing successfully. +// +// It might seem like `InputStream` should implement `io.Reader` +// itself. But we want to avoid hiding the dynamic type of the +// `io.Reader` that is being used as the stdin of a pipeline. That +// object might be of a type that is subject to optimizations that +// aren't available for a generic `io.Reader`. For example, it might +// be an `*os.File` (which can be passed directly to subcommands or to +// `splice(2)`), or it might implement `io.WriterTo`. type InputStream struct { reader io.Reader @@ -74,6 +82,14 @@ func (s *InputStream) Close() error { // // A nil `*OutputStream` is a valid value. Its `Writer()` method // returns `nil` and `Close()` does nothing successfully. +// +// It might seem like `OutputStream` should implement `io.Writer` +// itself. But we want to avoid hiding the dynamic type of the +// `io.Writer` that is being used as the stdout of a pipeline. That +// object might be of a type that is subject to optimizations that +// aren't available for a generic `io.Writer`. For example, it might +// be an `*os.File` (which can be passed directly to subcommands or to +// `splice(2)`), or it might implement `io.ReaderFrom`. type OutputStream struct { writer io.Writer From b30a165ea42e1f83b908b8044f0daacbe71b12cd Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 12:58:58 +0200 Subject: [PATCH 09/10] Pipeline: use `InputStream` and `OutputStream` Change the types of some `Pipeline` fields: * `stdin` to `InputStream` * `stdout` to `OutputStream` That way we don't have to manage their closers separately. --- pipe/pipeline.go | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index bbae72b..085fdd3 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -55,12 +55,10 @@ type ContextValuesFunc func(context.Context) []EnvVar type Pipeline struct { env Env - stdin io.Reader - stdinCloser io.Closer - stdout io.Writer - stdoutCloser io.Closer - stages []Stage - cancel func() + stdin *InputStream + stdout *OutputStream + stages []Stage + cancel func() // Atomically written and read value, nonzero if the pipeline has // been started. This is only used for lifecycle sanity checks but @@ -104,8 +102,7 @@ func WithDir(dir string) Option { // even if `Start()` returns an error. func WithStdin(stdin io.Reader) Option { return func(p *Pipeline) { - p.stdin = stdin - p.stdinCloser = nil + p.stdin = Input(stdin) } } @@ -114,8 +111,7 @@ func WithStdin(stdin io.Reader) Option { // even if `Start()` returns an error. func WithStdout(stdout io.Writer) Option { return func(p *Pipeline) { - p.stdout = stdout - p.stdoutCloser = nil + p.stdout = Output(stdout) } } @@ -125,8 +121,7 @@ func WithStdout(stdout io.Writer) Option { // an error. func WithStdoutCloser(stdout io.WriteCloser) Option { return func(p *Pipeline) { - p.stdout = stdout - p.stdoutCloser = stdout + p.stdout = ClosingOutput(stdout) } } @@ -255,9 +250,7 @@ func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutCo } func (p *Pipeline) abortBeforeStart(s Stage, err error) error { - if p.stdoutCloser != nil { - _ = p.stdoutCloser.Close() - } + _ = p.stdout.Close() p.cancel() p.eventHandler(&Event{ Command: s.Name(), @@ -335,16 +328,13 @@ func (p *Pipeline) Start(ctx context.Context) error { if p.stdin != nil { // Arrange for the input of the 0th stage to come from // `p.stdin`: - stageStarters[0].stdin = Input(p.stdin) + stageStarters[0].stdin = p.stdin } if p.stdout != nil { // Arrange for the output of the last stage to go to // `p.stdout`: - stageStarters[len(p.stages)-1].stdout = &OutputStream{ - writer: p.stdout, - closer: p.stdoutCloser, - } + stageStarters[len(p.stages)-1].stdout = p.stdout } // Clean up any processes and pipes that have been created. `i` is the @@ -360,8 +350,8 @@ func (p *Pipeline) Start(ctx context.Context) error { // If stdout was supplied with WithStdoutCloser but the final stage // was never started, then the pipeline still owns that closer. - if i < len(p.stages)-1 && p.stdoutCloser != nil { - _ = p.stdoutCloser.Close() + if i < len(p.stages)-1 { + _ = p.stdout.Close() } // Kill and wait for any stages that have been started @@ -433,8 +423,7 @@ func (p *Pipeline) Start(ctx context.Context) error { func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { var buf bytes.Buffer - p.stdout = &buf - p.stdoutCloser = nil + p.stdout = Output(&buf) err := p.Run(ctx) return buf.Bytes(), err } From 07cbbcf12b895387eb3df8bfb52a58d7ee1ce51b Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 16:16:10 +0200 Subject: [PATCH 10/10] Stage: improve docstring --- pipe/stage.go | 114 ++++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/pipe/stage.go b/pipe/stage.go index 0a22079..9a35254 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -7,34 +7,59 @@ import ( // Stage is an element of a `Pipeline`. It reads from standard input // and writes to standard output. // -// Who closes stdin and stdout? +// # Who closes stdin and stdout? // // A `Stage` as a whole is responsible for closing its end of stdin // and stdout if the corresponding stream is closing. That -// responsibility transfers to the stage as soon as `Start()` is called -// and applies even if `Start()` returns an error. Before returning an -// error from `Start()`, the stage must close any closing stream that -// it has not already handed off to something else that will close it -// promptly. The caller must not close a closing stream after passing it -// to `Start()`. +// responsibility transfers to the stage as soon as the stage's +// `Start()` method is called and applies even if `Start()` returns an +// error. Before returning an error from `Start()`, the stage must +// close any closing stream that it has not already handed off to +// something else that will close it promptly. The caller must not +// close a closing stream after passing it to `Start()`. // -// If the caller wants to retain ownership of stdin/stdout, it passes a -// non-closing stream. The stage must not close a non-closing stream, -// even if `Start()` returns an error. +// If the caller wants to retain ownership of stdin/stdout, it passes +// a non-closing stream. Calling `Close()` on a non-closing stream is +// a NOP, so it isn't harmful but isn't required. // -// Closing stdin/stdout tells the previous/next stage that this stage is -// done reading/writing data, which can affect their behavior. Therefore, -// after a successful start, a stage should close each one as soon as it -// is done with it. +// Closing stdin/stdout tells the previous/next stage that this stage +// is done reading/writing data, which can affect their behavior. +// Therefore, after a successful start, a stage should close each one +// as soon as it is done with it. Assume that this is done via the +// following local function // -// How this should be done depends on whether stdin/stdout are of type -// `*os.File`. +// closeStreams := func() { +// // Error handling omitted. +// _ = stdin.Close() +// _ = stdout.Close() +// } +// +// From the point of view of the pipeline as a whole, if stdin is +// provided by the user (`WithStdin()`), then we don't want the first +// stage to close it at all, whether it's an `*os.File` or not. The +// pipeline communicates this by passing a non-closing `InputStream` +// when it starts that stage. For stdout, it depends on whether the +// user supplied it using `WithStdout()` or `WithStdoutCloser()`. In +// any case, this function can close the streams anyway, because +// `InputStream.Close()` and `OutputStream.Close()` do nothing in +// those cases. +// +// When these closes should happen depends on what kind of stage it is +// and whether stdin/stdout are of type `*os.File`. +// +// ## A command stage // // If a stage is an external command, then the subprocess ultimately // needs its own copies of `*os.File` file descriptors for its stdin // and stdout. The external command will "always" [1] close those when // it exits. // +// (It's theoretically possible for a command to pass the open file +// descriptor to another, longer-lived process, in which case the file +// descriptor wouldn't necessarily get closed when the command +// finishes. But that's ill-behaved in a command that is being used in +// a pipeline, so we'll ignore that possibility.) +// // If the stage is an external command and one of the arguments is an // `*os.File`, then it can set the corresponding field of `exec.Cmd` // to that argument directly. This has the result that `exec.Cmd` @@ -42,54 +67,45 @@ import ( // subprocess. Therefore, the stage must close its copy of that // argument as soon as the external command has started, because the // external command will keep its own copy open as long as necessary -// (and no longer!). It should use roughly the following sequence: +// (and no longer!). Therefore, it should use roughly the following +// sequence: // -// cmd.Stdin = stdin.Reader() // Similarly for stdout -// cmd.Start(…) -// stdin.Close() // Close our copy -// cmd.Wait() +// cmd.Stdin = stdin.Reader() +// cmd.Stdout = stdout.Writer() +// err := cmd.Start(…) +// // Close our copies as soon as the command has started: +// closeStreams() +// if err != nil { +// return err +// } +// return cmd.Wait() // // If the stage is an external command and its stdin is not an // `*os.File`, then `exec.Cmd` will take care of creating an // `os.Pipe()`, copying from the provided reader into the pipe, and // eventually closing both ends of the pipe. The stage must close the -// provided stdin itself, but only _after_ the external command has -// finished, like so: +// provided stdin itself, but only _after_ the external command and +// the copy have finished, like so: // -// cmd.Stdin = stdin.Reader() // Similarly for stdout -// cmd.Start(…) -// cmd.Wait() -// stdin.Close() // Close +// defer closeStreams() +// cmd.Stdin = stdin.Reader() +// cmd.Stdout = stdout.Writer() +// err := cmd.Start(…) +// if err != nil { +// return err +// } +// return cmd.Wait() // -// If the stage is an external command and its stdout is not an -// `*os.File`, the stage creates a pipe, passes the write end to the -// command, and copies from the read end to the provided writer. The -// stage must close the provided stdout itself, but only _after_ the -// external command and the copy have finished. +// ## A function stage // // If the stage is a Go function, then it holds the only copy of // stdin/stdout, so it must wait until the function is done before // closing them (regardless of their underlying type, like so: // // go func() { -// f(…, stdin, stdout) -// stdin.Close() -// stdout.Close() +// defer closeStreams() +// f(…, stdin.Reader(), stdout.Writer()) // }() -// -// From the point of view of the pipeline as a whole, if stdin is -// provided by the user (`WithStdin()`), then we don't want the first -// stage to close it at all, whether it's an `*os.File` or not. The -// pipeline communicates this by passing a non-closing `InputStream` -// when it starts that stage. For stdout, it depends on whether the -// user supplied it using `WithStdout()` or `WithStdoutCloser()`. -// -// [1] It's theoretically possible for a command to pass the open file -// descriptor to another, longer-lived process, in which case the -// file descriptor wouldn't necessarily get closed when the -// command finishes. But that's ill-behaved in a command that is -// being used in a pipeline, so we'll ignore that possibility. - type Stage interface { // Name returns the name of the stage. Name() string