From a327b2f172b68d1f55c28daf2bfabda6e556a393 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 21:59:56 +0200 Subject: [PATCH 1/9] Make commandStage close stdout streams if pooled stdout setup fails --- pipe/command.go | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 9cfd9c3..5b2cba8 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -118,6 +118,20 @@ func (s *commandStage) Start( } } + closeEarlyClosers := func() { + for _, closer := range earlyClosers { + _ = closer.Close() + } + } + + // On error, Close any pipes we created and wait for the goroutines to + // exit before propagating the error. + cleanupOnStartFailure := func() { + closeEarlyClosers() + _ = s.wg.Wait() + _ = s.closeLateClosers() + } + if stdout != nil { if f, ok := stdout.(*os.File); ok { s.cmd.Stdout = f @@ -125,36 +139,23 @@ func (s *commandStage) Start( earlyClosers = append(earlyClosers, stdoutCloser) } } else { + if stdoutCloser != nil { + s.lateClosers = append(s.lateClosers, stdoutCloser) + } // Route the copy through our own pipe so we can use a // pooled buffer rather than letting exec.Cmd allocate a // fresh 32KB buffer for its internal io.Copy. ec, err := s.setupPooledStdout(stdout) if err != nil { + cleanupOnStartFailure() return err } earlyClosers = append(earlyClosers, ec) - if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) - } } } else if stdoutCloser != nil { s.lateClosers = append(s.lateClosers, stdoutCloser) } - closeEarlyClosers := func() { - for _, closer := range earlyClosers { - _ = closer.Close() - } - } - - // On error, Close any pipes we created and wait for the goroutines to - // exit before propagating the error. - cleanupOnStartFailure := func() { - closeEarlyClosers() - _ = s.wg.Wait() - _ = s.closeLateClosers() - } - // If the caller hasn't arranged otherwise, read the command's // standard error into our `stderr` field: if s.cmd.Stderr == nil { From f040a53f3a31cd39e7426c33fc83c0811c01e8b7 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 22:01:24 +0200 Subject: [PATCH 2/9] Close stdout streams when startup fails before the final stage starts --- pipe/pipeline.go | 14 ++++++++------ pipe/pipeline_test.go | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 80db20a..08462ca 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -338,13 +338,16 @@ func (p *Pipeline) Start(ctx context.Context) error { // Clean up any processes and pipes that have been created. `i` is // the index of the stage that failed to start (whose output pipe // has already been cleaned up if necessary). - abort := func(i int, err error) error { + abort := func(i int, err error, closeFailedStageStdin bool) error { // Close the pipe that the previous stage was writing to. // That should cause it to exit even if it's not minding // its context. - if stageStarters[i].stdinCloser != nil { + if closeFailedStageStdin && stageStarters[i].stdinCloser != nil { _ = stageStarters[i].stdinCloser.Close() } + if i < len(p.stages)-1 && p.stdoutCloser != nil { + _ = p.stdoutCloser.Close() + } // Kill and wait for any stages that have been started // already to finish: @@ -376,7 +379,7 @@ func (p *Pipeline) Start(ctx context.Context) error { // Use an OS-level pipe for the communication: nextStdin, stdout, err := os.Pipe() if err != nil { - return abort(i, err) + return abort(i, err, true) } nextSS.stdin = nextStdin nextSS.stdinCloser = nextStdin @@ -395,8 +398,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ss.stdout, ss.stdoutCloser != nil, ); err != nil { nextSS.stdinCloser.Close() - ss.stdoutCloser.Close() - return abort(i, err) + return abort(i, err, false) } } @@ -413,7 +415,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ss.stdin, ss.stdinCloser != nil, ss.stdout, ss.stdoutCloser != nil, ); err != nil { - return abort(i, err) + return abort(i, err, false) } } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index be54b5f..cb7b363 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -93,6 +93,21 @@ func TestPipelineFirstStageFailsToStart(t *testing.T) { assert.ErrorIs(t, p.Run(ctx), startErr) } +func TestPipelineFirstStageFailsToStartClosesStdoutCloser(t *testing.T) { + t.Parallel() + ctx := context.Background() + startErr := errors.New("foo") + stdout := &closeTrackingWriter{} + + p := pipe.New(pipe.WithStdoutCloser(stdout)) + p.Add( + ErrorStartingStage{startErr}, + pipe.Command("this-stage-should-not-start"), + ) + assert.ErrorIs(t, p.Run(ctx), startErr) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") +} + func TestPipelineSecondStageFailsToStart(t *testing.T) { t.Parallel() ctx := context.Background() From 08d5b4b121f362cd35000e7a59f9c2b3e4331df2 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 21:50:13 +0200 Subject: [PATCH 3/9] Introduce stream types Replace Stage.Start close flags with InputStream and OutputStream types. The endpoint constructors encode whether a stream is closing while preserving the underlying reader/writer type. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/close_responsibility_test.go | 54 ++++++++---- pipe/command.go | 9 +- pipe/command_stdout_fastpath_test.go | 7 +- pipe/env_stage.go | 10 +-- pipe/function.go | 12 ++- pipe/pipe_matching_test.go | 15 ++-- pipe/pipeline.go | 39 ++++----- pipe/pipeline_test.go | 22 ++--- pipe/stage.go | 121 ++++++++++++++++++++------- 9 files changed, 172 insertions(+), 117 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 23f6107..61f6d17 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -58,8 +58,8 @@ func TestGoStageHonorsCloseFlags(t *testing.T) { require.NoError(t, s.Start( context.Background(), StageOptions{}, - in, !tc.leaveIn, - out, !tc.leaveOut, + inputForTest(in, !tc.leaveIn), + outputForTest(out, !tc.leaveOut), )) require.NoError(t, s.Wait()) @@ -69,18 +69,22 @@ func TestGoStageHonorsCloseFlags(t *testing.T) { } } -func TestStagePanicsWhenOwnedStreamIsNotCloseable(t *testing.T) { - s := Function("f", func(_ context.Context, _ Env, _ io.Reader, _ io.Writer) error { - return nil - }) - - assert.PanicsWithValue(t, "stage asked to close *strings.Reader, which does not implement io.Closer", func() { - _ = s.Start( - context.Background(), StageOptions{}, - strings.NewReader("not closeable"), true, - nil, false, - ) - }) +func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { + borrowedInput := strings.NewReader("borrowed") + assert.Same(t, borrowedInput, Input(borrowedInput).Reader()) + assert.Nil(t, Input(borrowedInput).Closer()) + + ownedInput := &readCloseSpy{Reader: strings.NewReader("owned")} + assert.Same(t, ownedInput, ClosingInput(ownedInput).Reader()) + assert.Same(t, ownedInput, ClosingInput(ownedInput).Closer()) + + borrowedOutput := &strings.Builder{} + assert.Same(t, borrowedOutput, Output(borrowedOutput).Writer()) + assert.Nil(t, Output(borrowedOutput).Closer()) + + ownedOutput := &writeCloseSpy{Writer: io.Discard} + assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Writer()) + assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Closer()) } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a @@ -100,8 +104,8 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) { require.NoError(t, s.Start( context.Background(), StageOptions{}, - in, !leave, - nil, false, + inputForTest(in, !leave), + Output(nil), )) require.NoError(t, s.Wait()) @@ -127,8 +131,8 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { require.NoError(t, s.Start( context.Background(), StageOptions{}, - nil, false, - out, !leave, + Input(nil), + outputForTest(out, !leave), )) require.NoError(t, s.Wait()) @@ -136,3 +140,17 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { }) } } + +func inputForTest(r io.ReadCloser, closing bool) InputStream { + if closing { + return ClosingInput(r) + } + return Input(r) +} + +func outputForTest(w io.WriteCloser, closing bool) OutputStream { + if closing { + return ClosingOutput(w) + } + return Output(w) +} diff --git a/pipe/command.go b/pipe/command.go index 5b2cba8..d85314b 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -88,11 +88,12 @@ func (s *commandStage) Requirements() StageRequirements { func (s *commandStage) Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + ins InputStream, outs OutputStream, ) error { - stdinCloser := ownedCloser(stdin, closeStdin) - stdoutCloser := ownedCloser(stdout, closeStdout) + stdin := ins.Reader() + stdinCloser := ins.Closer() + stdout := outs.Writer() + stdoutCloser := outs.Closer() if s.cmd.Dir == "" { s.cmd.Dir = opts.Dir diff --git a/pipe/command_stdout_fastpath_test.go b/pipe/command_stdout_fastpath_test.go index dc1c854..095a2a6 100644 --- a/pipe/command_stdout_fastpath_test.go +++ b/pipe/command_stdout_fastpath_test.go @@ -42,7 +42,12 @@ func TestCommandStageStdoutFastPath(t *testing.T) { cmd := exec.Command("true") s := CommandStage("true", cmd).(*commandStage) - require.NoError(t, s.Start(ctx, StageOptions{}, nil, false, f, tc.closeStdout)) + stdout := OutputStream{writer: f} + if tc.closeStdout { + stdout = ClosingOutput(f) + } + + require.NoError(t, s.Start(ctx, StageOptions{}, Input(nil), stdout)) t.Cleanup(func() { _ = s.Wait() }) gotFile, ok := s.cmd.Stdout.(*os.File) diff --git a/pipe/env_stage.go b/pipe/env_stage.go index 64dab22..7503e5a 100644 --- a/pipe/env_stage.go +++ b/pipe/env_stage.go @@ -1,9 +1,6 @@ package pipe -import ( - "context" - "io" -) +import "context" // WithExtraEnv returns a Stage that adds env to the environment seen by inner. func WithExtraEnv(inner Stage, env []EnvVar) Stage { @@ -40,13 +37,12 @@ func (s *stageWithExtraEnv) Requirements() StageRequirements { func (s *stageWithExtraEnv) Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin InputStream, stdout OutputStream, ) error { opts.Vars = append(opts.Vars[:len(opts.Vars):len(opts.Vars)], func(_ context.Context, vars []EnvVar) []EnvVar { return append(vars, s.env...) }) - return s.inner.Start(ctx, opts, stdin, closeStdin, stdout, closeStdout) + return s.inner.Start(ctx, opts, stdin, stdout) } func (s *stageWithExtraEnv) Wait() error { diff --git a/pipe/function.go b/pipe/function.go index e5422f2..3f34cfa 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -78,19 +78,17 @@ func (s *goStage) Requirements() StageRequirements { func (s *goStage) Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin InputStream, stdout OutputStream, ) error { - stdinCloser := ownedCloser(stdin, closeStdin) - stdoutCloser := ownedCloser(stdout, closeStdout) - - r := stdin + r := stdin.Reader() + stdinCloser := stdin.Closer() if r == nil { // treat nil as empty input. r = strings.NewReader("") } - w := stdout + w := stdout.Writer() + stdoutCloser := stdout.Closer() if w == nil { // treat nil output as /dev/null w = io.Discard diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index 542e857..efd72d5 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -104,17 +104,12 @@ func (s *pipeSniffingStage) Requirements() pipe.StageRequirements { func (s *pipeSniffingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin pipe.InputStream, stdout pipe.OutputStream, ) error { - s.stdin = stdin - if closeStdin { - _ = stdin.(io.Closer).Close() - } - s.stdout = stdout - if closeStdout { - _ = stdout.(io.Closer).Close() - } + s.stdin = stdin.Reader() + stdin.Close() + s.stdout = stdout.Writer() + stdout.Close() return nil } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 08462ca..62d4b5e 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -219,10 +219,8 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { type stageStarter struct { requirements StageRequirements - stdin io.Reader - stdinCloser io.Closer - stdout io.Writer - stdoutCloser io.Closer + stdin InputStream + stdout OutputStream } func (requirement StreamRequirement) validate() error { @@ -324,15 +322,16 @@ func (p *Pipeline) Start(ctx context.Context) error { if p.stdin != nil { // Arrange for the input of the 0th stage to come from // `p.stdin`: - stageStarters[0].stdin = p.stdin - stageStarters[0].stdinCloser = p.stdinCloser + stageStarters[0].stdin = Input(p.stdin) } if p.stdout != nil { i := len(p.stages) - 1 ss := &stageStarters[i] - ss.stdout = p.stdout - ss.stdoutCloser = p.stdoutCloser + ss.stdout = OutputStream{ + writer: p.stdout, + closer: p.stdoutCloser, + } } // Clean up any processes and pipes that have been created. `i` is @@ -342,8 +341,8 @@ func (p *Pipeline) Start(ctx context.Context) error { // Close the pipe that the previous stage was writing to. // That should cause it to exit even if it's not minding // its context. - if closeFailedStageStdin && stageStarters[i].stdinCloser != nil { - _ = stageStarters[i].stdinCloser.Close() + if closeFailedStageStdin { + stageStarters[i].stdin.Close() } if i < len(p.stages)-1 && p.stdoutCloser != nil { _ = p.stdoutCloser.Close() @@ -381,23 +380,18 @@ func (p *Pipeline) Start(ctx context.Context) error { if err != nil { return abort(i, err, true) } - nextSS.stdin = nextStdin - nextSS.stdinCloser = nextStdin - ss.stdout = stdout - ss.stdoutCloser = stdout + nextSS.stdin = ClosingInput(nextStdin) + ss.stdout = ClosingOutput(stdout) } else { nextStdin, stdout := io.Pipe() - nextSS.stdin = nextStdin - nextSS.stdinCloser = nextStdin - ss.stdout = stdout - ss.stdoutCloser = stdout + nextSS.stdin = ClosingInput(nextStdin) + ss.stdout = ClosingOutput(stdout) } if err := s.Start( ctx, p.stageOptions(), - ss.stdin, ss.stdinCloser != nil, - ss.stdout, ss.stdoutCloser != nil, + ss.stdin, ss.stdout, ); err != nil { - nextSS.stdinCloser.Close() + nextSS.stdin.Close() return abort(i, err, false) } } @@ -412,8 +406,7 @@ func (p *Pipeline) Start(ctx context.Context) error { if err := s.Start( ctx, p.stageOptions(), - ss.stdin, ss.stdinCloser != nil, - ss.stdout, ss.stdoutCloser != nil, + ss.stdin, ss.stdout, ); err != nil { return abort(i, err, false) } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index cb7b363..334ef42 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -615,15 +615,10 @@ func (s ErrorStartingStage) Requirements() pipe.StageRequirements { func (s ErrorStartingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin pipe.InputStream, stdout pipe.OutputStream, ) error { - if closeStdin { - _ = stdin.(io.Closer).Close() - } - if closeStdout { - _ = stdout.(io.Closer).Close() - } + stdin.Close() + stdout.Close() return s.err } @@ -647,18 +642,13 @@ func (s requirementStage) Requirements() pipe.StageRequirements { func (s requirementStage) Start( _ context.Context, _ pipe.StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin pipe.InputStream, stdout pipe.OutputStream, ) error { if s.started != nil { *s.started = true } - if closeStdin { - _ = stdin.(io.Closer).Close() - } - if closeStdout { - _ = stdout.(io.Closer).Close() - } + stdin.Close() + stdout.Close() return nil } diff --git a/pipe/stage.go b/pipe/stage.go index e35f428..241b705 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -2,7 +2,6 @@ package pipe import ( "context" - "fmt" "io" ) @@ -12,12 +11,22 @@ import ( // Who closes stdin and stdout? // // A `Stage` as a whole is responsible for closing its end of stdin -// and stdout (assuming that `Start()` returns successfully) if the -// corresponding close flag passed to `Start()` is true. Its doing so -// tells the previous/next stage that it is done reading/writing data, -// which can affect their behavior. Therefore, it should close each -// one as soon as it is done with it. If the caller wants to suppress -// the closing of stdin/stdout, it passes a false close flag. +// and stdout if the corresponding stream is closing. That +// responsibility transfers to the stage as soon as `Start()` is called +// and applies even if `Start()` returns an error. Before returning an +// error from `Start()`, the stage must close any closing stream that +// it has not already handed off to something else that will close it +// promptly. The caller must not close a closing stream after passing it +// to `Start()`. +// +// If the caller wants to retain ownership of stdin/stdout, it passes a +// non-closing stream. The stage must not close a non-closing stream, +// even if `Start()` returns an error. +// +// Closing stdin/stdout tells the previous/next stage that this stage is +// done reading/writing data, which can affect their behavior. Therefore, +// after a successful start, a stage should close each one as soon as it +// is done with it. // // How this should be done depends on whether stdin/stdout are of type // `*os.File`. @@ -66,9 +75,9 @@ import ( // From the point of view of the pipeline as a whole, if stdin is // provided by the user (`WithStdin()`), then we don't want the first // stage to close it at all, whether it's an `*os.File` or not. The -// pipeline communicates this by passing closeStdin=false when it -// starts that stage. For stdout, it depends on whether the user -// supplied it using `WithStdout()` or `WithStdoutCloser()`. +// pipeline communicates this by passing a non-closing `InputStream` +// when it starts that stage. For stdout, it depends on whether the +// user supplied it using `WithStdout()` or `WithStdoutCloser()`. // // [1] It's theoretically possible for a command to pass the open file // descriptor to another, longer-lived process, in which case the @@ -76,6 +85,66 @@ import ( // command finishes. But that's ill-behaved in a command that is // being used in a pipeline, so we'll ignore that possibility. +type InputStream struct { + reader io.Reader + closer io.Closer +} + +// The stage may read from r but must not close it. +func Input(r io.Reader) InputStream { + return InputStream{reader: r} +} + +// The stage is responsible for closing r. +func ClosingInput(r io.ReadCloser) InputStream { + return InputStream{reader: r, closer: r} +} + +func (s InputStream) Reader() io.Reader { + return s.reader +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s InputStream) Closer() io.Closer { + return s.closer +} + +func (s InputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} + +type OutputStream struct { + writer io.Writer + closer io.Closer +} + +// The stage may write to w but must not close it. +func Output(w io.Writer) OutputStream { + return OutputStream{writer: w} +} + +// The stage is responsible for closing w. +func ClosingOutput(w io.WriteCloser) OutputStream { + return OutputStream{writer: w, closer: w} +} + +func (s OutputStream) Writer() io.Writer { + return s.writer +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s OutputStream) Closer() io.Closer { + return s.closer +} + +func (s OutputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} + type Stage interface { // Name returns the name of the stage. Name() string @@ -86,20 +155,21 @@ type Stage interface { // Start starts the stage in the background, in the environment // described by `opts.Env`, using `stdin` to provide its input and - // `stdout` to collect its output. (`stdin`/`stdout` might be set - // to `nil` if the stage is to receive no input, which might be the - // case for the first/last stage in a pipeline.) If `closeStdin` or - // `closeStdout` is true, the stage is responsible for closing the - // corresponding stream. A stream with a true close flag must - // implement `io.Closer`. See the `Stage` type comment for more - // information about responsibility for closing stdin and stdout. + // `stdout` to collect its output. (`stdin.Reader()` or + // `stdout.Writer()` might be `nil` if the stage is to receive no + // input or produce no output, which might be the case for the + // first/last stage in a pipeline.) If `stdin` or `stdout` is + // closing, the stage is responsible for closing the corresponding + // stream, even if `Start()` returns an error. See the `Stage` type + // comment for more information about responsibility for closing + // stdin and stdout. // // If `Start()` returns without an error, `Wait()` must also be - // called, to allow all resources to be freed. + // called, to allow all resources to be freed. If `Start()` returns + // an error, `Wait()` must not be called. Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin InputStream, stdout OutputStream, ) error // Wait waits for the stage to be done, either because it has @@ -108,17 +178,6 @@ type Stage interface { Wait() error } -func ownedCloser(stream any, owned bool) io.Closer { - if !owned { - return nil - } - closer, ok := stream.(io.Closer) - if !ok { - panic(fmt.Sprintf("stage asked to close %T, which does not implement io.Closer", stream)) - } - return closer -} - // StageOptions carries everything (other than `ctx`, `stdin`, and // `stdout`) that a pipeline passes to `Stage.Start`. type StageOptions struct { From c6d12cbbfef16f68837be4f56791ad312dd646ad Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Thu, 11 Jun 2026 10:35:36 +0200 Subject: [PATCH 4/9] windows could have these, we don't know until we try --- pipe/pipeline_test.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 334ef42..d75f5c4 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "os" - "runtime" "strconv" "strings" "testing" @@ -286,10 +285,6 @@ func TestIOPipePipelineReadFromSlowly(t *testing.T) { } func TestPipelineReadFromSlowly2(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'seq' unavailable") - } - t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -385,10 +380,6 @@ func TestPipelineStderr(t *testing.T) { } func TestPipelineInterrupted(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'sleep' unavailable") - } - t.Parallel() stdout := &bytes.Buffer{} @@ -407,10 +398,6 @@ func TestPipelineInterrupted(t *testing.T) { } func TestPipelineCanceled(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'sleep' unavailable") - } - t.Parallel() stdout := &bytes.Buffer{} @@ -434,10 +421,6 @@ func TestPipelineCanceled(t *testing.T) { // unread output in this case *does fit* within the OS-level pipe // buffer. func TestLittleEPIPE(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'sleep' unavailable") - } - t.Parallel() p := pipe.New() @@ -457,10 +440,6 @@ func TestLittleEPIPE(t *testing.T) { // amount of unread output in this case *does not fit* within the // OS-level pipe buffer. func TestBigEPIPE(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'seq' unavailable") - } - t.Parallel() p := pipe.New() @@ -480,10 +459,6 @@ func TestBigEPIPE(t *testing.T) { // amount of unread output in this case *does not fit* within the // OS-level pipe buffer. func TestIgnoredSIGPIPE(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'seq' unavailable") - } - t.Parallel() p := pipe.New() From 1d7a969cd5f6c1dab919e34cb63ea01e4256b9cb Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 10 Jun 2026 22:37:55 +0200 Subject: [PATCH 5/9] test Go producer behavior after downstream early exit Add regression coverage for the go-pipe v2 behavior where a Go producer can see a pipe error directly when a downstream command exits without fully reading stdin. This documents the visible semantic change from v1 to v2. Also, we add an example that demonstrates correctly handling downstream early exit in a stateful producer. --- pipe/pipeline_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index d75f5c4..c7d197a 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -474,6 +474,71 @@ func TestIgnoredSIGPIPE(t *testing.T) { assert.EqualValues(t, "foo\n", out) } +func TestGoProducerSeesPipeErrorWhenCommandStopsReading(t *testing.T) { + t.Parallel() + + p := pipe.New() + p.Add( + pipe.Function( + "write-to-closed-command", + func(_ context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error { + w := bufio.NewWriter(stdout) + for i := 0; i < 100000; i++ { + if _, err := fmt.Fprintln(w, i); err != nil { + return err + } + } + return w.Flush() + }, + ), + pipe.Command("true"), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err := p.Run(ctx) + require.Error(t, err) + assert.True(t, pipe.IsPipeError(err), "expected a pipe error, got %v", err) +} + +func TestIgnoredPipeErrorStillAllowsStatefulProducerToFinish(t *testing.T) { + t.Parallel() + + const total = 100000 + processed := 0 + p := pipe.New() + p.Add( + pipe.IgnoreError( + pipe.Function( + "stateful-producer", + func(_ context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error { + w := bufio.NewWriter(stdout) + var writeErr error + for i := 0; i < total; i++ { + processed++ + if writeErr == nil { + if _, err := fmt.Fprintln(w, i); err != nil { + writeErr = err + } + } + } + if writeErr == nil { + writeErr = w.Flush() + } + return writeErr + }, + ), + pipe.IsPipeError, + ), + pipe.Command("true"), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + require.NoError(t, p.Run(ctx)) + assert.Equal(t, total, processed) +} + func TestFunction(t *testing.T) { t.Parallel() ctx := context.Background() From 33a4e28507d8e7f9cbc5794c364b4e81793592a2 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 10 Jun 2026 23:09:21 +0200 Subject: [PATCH 6/9] document producer pipe errors in v2 migration Explain that go-pipe v1 could hide producer-side pipe write errors as an implementation detail of the command stdin-copy path, while v2 exposes those errors more directly. Document the migration patterns for stateless and stateful producers. --- README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/README.md b/README.md index 995ee86..aabd484 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,31 @@ A package used to easily build command pipelines in your Go applications # Important We have not thoroughly tested this package on OSs other than Linux, especially Windows. At this time, using this package on Windows based systems is considered experimental and will be supported only on a best effort basis. +# Migrating to v2 + +It's normal for pipelines to stop before all input has been consumed[^1]. If an earlier stage continues writing after that happens, the write side of the pipe can fail with `EPIPE`, `SIGPIPE`, or `io.ErrClosedPipe`. + +In go-pipe v1 it was possible to get away without handling this case, because a command stage's stdin was connected in a way that often (but not necessarily!) drained the write side and hid the error from the previous stage feeding it. That was an implementation detail, not a guarantee. In go-pipe v2, producer stages are more likely to be connected directly to a command's stdin, and thus see the error themselves. + +Fortunately, this is easily handled by wrapping the stage with `pipe.IgnoreError(stage, IsPipeError)`. If the producer only writes output and is otherwise stateless, that's the only thing needed. + +If the producer also updates state, metrics, cursors, or has other side effects, in a way that depends on how much of the output was produced, then in addition to using `pipe.IgnoreError`, you must also ensure producer-owned state is brought to a consistent point before returning the error. + +For example, if a stateful producer function must process its entire input for correctness regardless of whether it was read by the consumer, it should use a pattern like: + +```go +var writeErr error +for _, item := range items { + updateState(item) + if writeErr == nil { + _, writeErr = fmt.Fprintln(stdout, item) + } +} +return writeErr +``` + # Links * [Docs](https://pkg.go.dev/github.com/github/go-pipe/v2) + +[^1]: In `cat foo | head | grep -q`, for example, either `head` or `grep` could exit before its input is fully consumed. From ac585a868aa0a01cd9f481670f98ac1741391ee9 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 20:08:24 +0200 Subject: [PATCH 7/9] fully document stream ownership (close responsibility) --- pipe/pipeline.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 62d4b5e..7e9637c 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -99,7 +99,9 @@ func WithDir(dir string) Option { } } -// WithStdin assigns stdin to the first command in the pipeline. +// WithStdin assigns stdin to the first command in the pipeline. The +// caller retains ownership of stdin; the pipeline will not close it, +// even if `Start()` returns an error. func WithStdin(stdin io.Reader) Option { return func(p *Pipeline) { p.stdin = stdin @@ -107,7 +109,9 @@ func WithStdin(stdin io.Reader) Option { } } -// WithStdout assigns stdout to the last command in the pipeline. +// WithStdout assigns stdout to the last command in the pipeline. The +// caller retains ownership of stdout; the pipeline will not close it, +// even if `Start()` returns an error. func WithStdout(stdout io.Writer) Option { return func(p *Pipeline) { p.stdout = stdout @@ -116,7 +120,9 @@ func WithStdout(stdout io.Writer) Option { } // WithStdoutCloser assigns stdout to the last command in the -// pipeline, and closes stdout when it's done. +// pipeline, and closes stdout when the pipeline is done with it. The +// pipeline is responsible for closing stdout even if `Start()` returns +// an error. func WithStdoutCloser(stdout io.WriteCloser) Option { return func(p *Pipeline) { p.stdout = stdout @@ -270,6 +276,13 @@ func (p *Pipeline) stageOptions() StageOptions { // Start starts the commands in the pipeline. If `Start()` exits // without an error, `Wait()` must also be called, to allow all // resources to be freed. +// +// If `Start()` returns an error, `Wait()` must not be called. Before +// returning an error, `Start()` cancels and waits for any stages that +// were started, closes any inter-stage pipes that the pipeline owns, +// and closes stdout if it was supplied with `WithStdoutCloser()`. +// Streams supplied with `WithStdin()` or `WithStdout()` remain owned by +// the caller and are not closed by the pipeline. func (p *Pipeline) Start(ctx context.Context) error { if p.hasStarted() { panic("attempt to start a pipeline that has already started") @@ -513,7 +526,9 @@ func (p *Pipeline) Wait() error { return nil } -// Run starts and waits for the commands in the pipeline. +// Run starts and waits for the commands in the pipeline. If startup +// fails, it returns the `Start()` error after `Start()` has performed +// its failure cleanup. func (p *Pipeline) Run(ctx context.Context) error { if err := p.Start(ctx); err != nil { return err From a65fafb8e6a97c3f82d27f78bd347d6b014aeeda Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 21:40:12 +0200 Subject: [PATCH 8/9] Document early-close producer caveats Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/filter-error.go | 12 +++++++++++- pipe/function.go | 9 +++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pipe/filter-error.go b/pipe/filter-error.go index 654796a..9bdee27 100644 --- a/pipe/filter-error.go +++ b/pipe/filter-error.go @@ -48,6 +48,12 @@ type ErrorMatcher func(err error) bool // the functions from the standard library that has the same signature // (e.g., `os.IsTimeout`), or some combination of these (e.g., // `AnyError(IsSIGPIPE, os.IsTimeout)`). +// +// `IgnoreError` only suppresses the error returned by the wrapped +// stage. If a producer ignores pipe errors because a later stage can +// stop reading early, the producer is still responsible for keeping any +// producer-owned state, metrics, cursors, or other side effects +// consistent before returning the ignored error. func IgnoreError(s Stage, em ErrorMatcher) Stage { return FilterError(s, func(err error) error { @@ -128,7 +134,11 @@ var ( // IsPipeError is an `ErrorMatcher` that matches a few different // errors that typically result if a stage writes to a subsequent - // stage that has stopped reading from its stdin. Use like + // stage that has stopped reading from its stdin. This is commonly + // useful with `IgnoreError` for stateless producer stages whose only + // job is writing output. Stateful producers should continue any + // producer-owned state updates needed for consistency before + // returning the pipe error for `IgnoreError` to suppress. Use like // // p.Add(IgnoreError(someStage, IsPipeError)) IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe) diff --git a/pipe/function.go b/pipe/function.go index 3f34cfa..d8a09af 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -17,6 +17,15 @@ import ( // Neither `stdin` nor `stdout` are necessarily buffered. If the // `StageFunc` requires buffering, it needs to arrange that itself. // +// A later stage can stop reading before this function has written all +// of its output. In that case, writes to `stdout` can fail with an +// error matched by `IsPipeError`. If the function only writes output +// and is otherwise stateless, callers can usually wrap the stage with +// `IgnoreError(stage, IsPipeError)`. If the function also updates +// producer-owned state, metrics, cursors, or other side effects that +// depend on how much output was produced, it should bring those side +// effects to a consistent point before returning the write error. +// // A `StageFunc` is run in a separate goroutine, so it must be careful // to synchronize any data access aside from reading and writing. type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Writer) error From 433e10423d5c26b0310d02a23e34d46bc7b3adf8 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 22:37:13 +0200 Subject: [PATCH 9/9] Align start-failure cleanup with stream ownership Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/close_responsibility_test.go | 20 ++++++++++---------- pipe/command_stdout_fastpath_test.go | 12 ++++++------ pipe/pipeline.go | 15 +++++++++------ pipe/pipeline_test.go | 20 +++++++++++++++++++- pipe/stage.go | 18 ++++++++++++------ 5 files changed, 56 insertions(+), 29 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 61f6d17..162817d 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -34,9 +34,9 @@ func (w *writeCloseSpy) Close() error { return nil } -// TestGoStageHonorsCloseFlags verifies that a Function stage closes -// stdin/stdout iff the corresponding close flag is true. -func TestGoStageHonorsCloseFlags(t *testing.T) { +// TestGoStageHonorsStreamOwnership verifies that a Function stage closes +// stdin/stdout iff the corresponding stream is closing. +func TestGoStageHonorsStreamOwnership(t *testing.T) { cases := []struct { name string leaveIn, leaveOut bool @@ -63,8 +63,8 @@ func TestGoStageHonorsCloseFlags(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closeStdin=%v", !tc.leaveIn) - assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closeStdout=%v", !tc.leaveOut) + assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closing stdin=%v", !tc.leaveIn) + assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closing stdout=%v", !tc.leaveOut) }) } } @@ -88,8 +88,8 @@ func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a -// non-file stdin (a "late" closer) iff closeStdin is true. An empty -// reader is used so exec.Cmd's input-copy goroutine sees EOF promptly. +// non-file stdin (a "late" closer) iff the input stream is closing. An +// empty reader is used so exec.Cmd's input-copy goroutine sees EOF promptly. func TestCommandStageHonorsCloseStdin(t *testing.T) { for _, leave := range []bool{false, true} { name := "owns stdin" @@ -109,14 +109,14 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, in.closed.Load(), "closeStdin=%v", !leave) + assert.Equal(t, !leave, in.closed.Load(), "closing stdin=%v", !leave) }) } } // TestCommandStageHonorsCloseStdout verifies the stdout counterpart: a // non-file stdout (routed through the pooled-copy path) is closed iff -// closeStdout is true. +// the output stream is closing. func TestCommandStageHonorsCloseStdout(t *testing.T) { for _, leave := range []bool{false, true} { name := "owns stdout" @@ -136,7 +136,7 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, out.closed.Load(), "closeStdout=%v", !leave) + assert.Equal(t, !leave, out.closed.Load(), "closing stdout=%v", !leave) }) } } diff --git a/pipe/command_stdout_fastpath_test.go b/pipe/command_stdout_fastpath_test.go index 095a2a6..4b2240c 100644 --- a/pipe/command_stdout_fastpath_test.go +++ b/pipe/command_stdout_fastpath_test.go @@ -18,15 +18,15 @@ import ( // subprocess can detect when that fd is closed. func TestCommandStageStdoutFastPath(t *testing.T) { cases := []struct { - name string - closeStdout bool + name string + closingStdout bool }{ { - name: "raw *os.File with closeStdout", - closeStdout: true, + name: "raw *os.File with closing stdout", + closingStdout: true, }, { - name: "raw *os.File without closeStdout", + name: "raw *os.File with non-closing stdout", }, } for _, tc := range cases { @@ -43,7 +43,7 @@ func TestCommandStageStdoutFastPath(t *testing.T) { s := CommandStage("true", cmd).(*commandStage) stdout := OutputStream{writer: f} - if tc.closeStdout { + if tc.closingStdout { stdout = ClosingOutput(f) } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 7e9637c..2ba54cc 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -347,16 +347,19 @@ func (p *Pipeline) Start(ctx context.Context) error { } } - // Clean up any processes and pipes that have been created. `i` is - // the index of the stage that failed to start (whose output pipe - // has already been cleaned up if necessary). + // Clean up any processes and pipes that have been created. `i` is the + // index of the stage that failed to start. If the stage already received + // its streams, it owns any closing stream. abort := func(i int, err error, closeFailedStageStdin bool) error { - // Close the pipe that the previous stage was writing to. - // That should cause it to exit even if it's not minding - // its context. + // If the failing stage never received its stdin, close the pipe that + // the previous stage was writing to. That should cause it to exit + // even if it's not minding its context. if closeFailedStageStdin { stageStarters[i].stdin.Close() } + + // If stdout was supplied with WithStdoutCloser but the final stage + // was never started, then the pipeline still owns that closer. if i < len(p.stages)-1 && p.stdoutCloser != nil { _ = p.stdoutCloser.Close() } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index c7d197a..fe4b630 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -83,13 +83,15 @@ func TestPipelineFirstStageFailsToStart(t *testing.T) { t.Parallel() ctx := context.Background() startErr := errors.New("foo") + stdout := &closeTrackingWriter{} - p := pipe.New() + p := pipe.New(pipe.WithStdoutCloser(stdout)) p.Add( ErrorStartingStage{startErr}, ErrorStartingStage{errors.New("this error should never happen")}, ) assert.ErrorIs(t, p.Run(ctx), startErr) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") } func TestPipelineFirstStageFailsToStartClosesStdoutCloser(t *testing.T) { @@ -120,6 +122,22 @@ func TestPipelineSecondStageFailsToStart(t *testing.T) { assert.ErrorIs(t, p.Run(ctx), startErr) } +func TestPipelineMiddleStageFailsToStartClosesUnstartedStdoutCloser(t *testing.T) { + t.Parallel() + ctx := context.Background() + startErr := errors.New("foo") + stdout := &closeTrackingWriter{} + + p := pipe.New(pipe.WithStdoutCloser(stdout)) + p.Add( + seqFunction(20000), + ErrorStartingStage{startErr}, + ErrorStartingStage{errors.New("this error should never happen")}, + ) + assert.ErrorIs(t, p.Run(ctx), startErr) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") +} + func TestPipelineSingleCommandOutput(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/pipe/stage.go b/pipe/stage.go index 241b705..a46c01e 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -50,18 +50,24 @@ import ( // f.Close() // close our copy // cmd.Wait() // -// If the stage is an external command and one of its arguments is not -// an `*os.File`, then `exec.Cmd` will take care of creating an -// `os.Pipe()`, copying from the provided argument in/out of the pipe, -// and eventually closing both ends of the pipe. The stage must close -// the argument itself, but only _after_ the external command has +// If the stage is an external command and its stdin is not an +// `*os.File`, then `exec.Cmd` will take care of creating an +// `os.Pipe()`, copying from the provided reader into the pipe, and +// eventually closing both ends of the pipe. The stage must close the +// provided stdin itself, but only _after_ the external command has // finished, like so: // -// cmd.Stdin = r // Similarly for stdout +// cmd.Stdin = r // cmd.Start(…) // cmd.Wait() // r.Close() // +// If the stage is an external command and its stdout is not an +// `*os.File`, the stage creates a pipe, passes the write end to the +// command, and copies from the read end to the provided writer. The +// stage must close the provided stdout itself, but only _after_ the +// external command and the copy have finished. +// // If the stage is a Go function, then it holds the only copy of // stdin/stdout, so it must wait until the function is done before // closing them (regardless of their underlying type, like so: