From a327b2f172b68d1f55c28daf2bfabda6e556a393 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 21:59:56 +0200 Subject: [PATCH 01/26] Make commandStage close stdout streams if pooled stdout setup fails --- pipe/command.go | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 9cfd9c3..5b2cba8 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -118,6 +118,20 @@ func (s *commandStage) Start( } } + closeEarlyClosers := func() { + for _, closer := range earlyClosers { + _ = closer.Close() + } + } + + // On error, Close any pipes we created and wait for the goroutines to + // exit before propagating the error. + cleanupOnStartFailure := func() { + closeEarlyClosers() + _ = s.wg.Wait() + _ = s.closeLateClosers() + } + if stdout != nil { if f, ok := stdout.(*os.File); ok { s.cmd.Stdout = f @@ -125,36 +139,23 @@ func (s *commandStage) Start( earlyClosers = append(earlyClosers, stdoutCloser) } } else { + if stdoutCloser != nil { + s.lateClosers = append(s.lateClosers, stdoutCloser) + } // Route the copy through our own pipe so we can use a // pooled buffer rather than letting exec.Cmd allocate a // fresh 32KB buffer for its internal io.Copy. ec, err := s.setupPooledStdout(stdout) if err != nil { + cleanupOnStartFailure() return err } earlyClosers = append(earlyClosers, ec) - if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) - } } } else if stdoutCloser != nil { s.lateClosers = append(s.lateClosers, stdoutCloser) } - closeEarlyClosers := func() { - for _, closer := range earlyClosers { - _ = closer.Close() - } - } - - // On error, Close any pipes we created and wait for the goroutines to - // exit before propagating the error. - cleanupOnStartFailure := func() { - closeEarlyClosers() - _ = s.wg.Wait() - _ = s.closeLateClosers() - } - // If the caller hasn't arranged otherwise, read the command's // standard error into our `stderr` field: if s.cmd.Stderr == nil { From f040a53f3a31cd39e7426c33fc83c0811c01e8b7 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 22:01:24 +0200 Subject: [PATCH 02/26] Close stdout streams when startup fails before the final stage starts --- pipe/pipeline.go | 14 ++++++++------ pipe/pipeline_test.go | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 80db20a..08462ca 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -338,13 +338,16 @@ func (p *Pipeline) Start(ctx context.Context) error { // Clean up any processes and pipes that have been created. `i` is // the index of the stage that failed to start (whose output pipe // has already been cleaned up if necessary). - abort := func(i int, err error) error { + abort := func(i int, err error, closeFailedStageStdin bool) error { // Close the pipe that the previous stage was writing to. // That should cause it to exit even if it's not minding // its context. - if stageStarters[i].stdinCloser != nil { + if closeFailedStageStdin && stageStarters[i].stdinCloser != nil { _ = stageStarters[i].stdinCloser.Close() } + if i < len(p.stages)-1 && p.stdoutCloser != nil { + _ = p.stdoutCloser.Close() + } // Kill and wait for any stages that have been started // already to finish: @@ -376,7 +379,7 @@ func (p *Pipeline) Start(ctx context.Context) error { // Use an OS-level pipe for the communication: nextStdin, stdout, err := os.Pipe() if err != nil { - return abort(i, err) + return abort(i, err, true) } nextSS.stdin = nextStdin nextSS.stdinCloser = nextStdin @@ -395,8 +398,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ss.stdout, ss.stdoutCloser != nil, ); err != nil { nextSS.stdinCloser.Close() - ss.stdoutCloser.Close() - return abort(i, err) + return abort(i, err, false) } } @@ -413,7 +415,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ss.stdin, ss.stdinCloser != nil, ss.stdout, ss.stdoutCloser != nil, ); err != nil { - return abort(i, err) + return abort(i, err, false) } } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index be54b5f..cb7b363 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -93,6 +93,21 @@ func TestPipelineFirstStageFailsToStart(t *testing.T) { assert.ErrorIs(t, p.Run(ctx), startErr) } +func TestPipelineFirstStageFailsToStartClosesStdoutCloser(t *testing.T) { + t.Parallel() + ctx := context.Background() + startErr := errors.New("foo") + stdout := &closeTrackingWriter{} + + p := pipe.New(pipe.WithStdoutCloser(stdout)) + p.Add( + ErrorStartingStage{startErr}, + pipe.Command("this-stage-should-not-start"), + ) + assert.ErrorIs(t, p.Run(ctx), startErr) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") +} + func TestPipelineSecondStageFailsToStart(t *testing.T) { t.Parallel() ctx := context.Background() From 08d5b4b121f362cd35000e7a59f9c2b3e4331df2 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 21:50:13 +0200 Subject: [PATCH 03/26] Introduce stream types Replace Stage.Start close flags with InputStream and OutputStream types. The endpoint constructors encode whether a stream is closing while preserving the underlying reader/writer type. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/close_responsibility_test.go | 54 ++++++++---- pipe/command.go | 9 +- pipe/command_stdout_fastpath_test.go | 7 +- pipe/env_stage.go | 10 +-- pipe/function.go | 12 ++- pipe/pipe_matching_test.go | 15 ++-- pipe/pipeline.go | 39 ++++----- pipe/pipeline_test.go | 22 ++--- pipe/stage.go | 121 ++++++++++++++++++++------- 9 files changed, 172 insertions(+), 117 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 23f6107..61f6d17 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -58,8 +58,8 @@ func TestGoStageHonorsCloseFlags(t *testing.T) { require.NoError(t, s.Start( context.Background(), StageOptions{}, - in, !tc.leaveIn, - out, !tc.leaveOut, + inputForTest(in, !tc.leaveIn), + outputForTest(out, !tc.leaveOut), )) require.NoError(t, s.Wait()) @@ -69,18 +69,22 @@ func TestGoStageHonorsCloseFlags(t *testing.T) { } } -func TestStagePanicsWhenOwnedStreamIsNotCloseable(t *testing.T) { - s := Function("f", func(_ context.Context, _ Env, _ io.Reader, _ io.Writer) error { - return nil - }) - - assert.PanicsWithValue(t, "stage asked to close *strings.Reader, which does not implement io.Closer", func() { - _ = s.Start( - context.Background(), StageOptions{}, - strings.NewReader("not closeable"), true, - nil, false, - ) - }) +func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { + borrowedInput := strings.NewReader("borrowed") + assert.Same(t, borrowedInput, Input(borrowedInput).Reader()) + assert.Nil(t, Input(borrowedInput).Closer()) + + ownedInput := &readCloseSpy{Reader: strings.NewReader("owned")} + assert.Same(t, ownedInput, ClosingInput(ownedInput).Reader()) + assert.Same(t, ownedInput, ClosingInput(ownedInput).Closer()) + + borrowedOutput := &strings.Builder{} + assert.Same(t, borrowedOutput, Output(borrowedOutput).Writer()) + assert.Nil(t, Output(borrowedOutput).Closer()) + + ownedOutput := &writeCloseSpy{Writer: io.Discard} + assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Writer()) + assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Closer()) } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a @@ -100,8 +104,8 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) { require.NoError(t, s.Start( context.Background(), StageOptions{}, - in, !leave, - nil, false, + inputForTest(in, !leave), + Output(nil), )) require.NoError(t, s.Wait()) @@ -127,8 +131,8 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { require.NoError(t, s.Start( context.Background(), StageOptions{}, - nil, false, - out, !leave, + Input(nil), + outputForTest(out, !leave), )) require.NoError(t, s.Wait()) @@ -136,3 +140,17 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { }) } } + +func inputForTest(r io.ReadCloser, closing bool) InputStream { + if closing { + return ClosingInput(r) + } + return Input(r) +} + +func outputForTest(w io.WriteCloser, closing bool) OutputStream { + if closing { + return ClosingOutput(w) + } + return Output(w) +} diff --git a/pipe/command.go b/pipe/command.go index 5b2cba8..d85314b 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -88,11 +88,12 @@ func (s *commandStage) Requirements() StageRequirements { func (s *commandStage) Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + ins InputStream, outs OutputStream, ) error { - stdinCloser := ownedCloser(stdin, closeStdin) - stdoutCloser := ownedCloser(stdout, closeStdout) + stdin := ins.Reader() + stdinCloser := ins.Closer() + stdout := outs.Writer() + stdoutCloser := outs.Closer() if s.cmd.Dir == "" { s.cmd.Dir = opts.Dir diff --git a/pipe/command_stdout_fastpath_test.go b/pipe/command_stdout_fastpath_test.go index dc1c854..095a2a6 100644 --- a/pipe/command_stdout_fastpath_test.go +++ b/pipe/command_stdout_fastpath_test.go @@ -42,7 +42,12 @@ func TestCommandStageStdoutFastPath(t *testing.T) { cmd := exec.Command("true") s := CommandStage("true", cmd).(*commandStage) - require.NoError(t, s.Start(ctx, StageOptions{}, nil, false, f, tc.closeStdout)) + stdout := OutputStream{writer: f} + if tc.closeStdout { + stdout = ClosingOutput(f) + } + + require.NoError(t, s.Start(ctx, StageOptions{}, Input(nil), stdout)) t.Cleanup(func() { _ = s.Wait() }) gotFile, ok := s.cmd.Stdout.(*os.File) diff --git a/pipe/env_stage.go b/pipe/env_stage.go index 64dab22..7503e5a 100644 --- a/pipe/env_stage.go +++ b/pipe/env_stage.go @@ -1,9 +1,6 @@ package pipe -import ( - "context" - "io" -) +import "context" // WithExtraEnv returns a Stage that adds env to the environment seen by inner. func WithExtraEnv(inner Stage, env []EnvVar) Stage { @@ -40,13 +37,12 @@ func (s *stageWithExtraEnv) Requirements() StageRequirements { func (s *stageWithExtraEnv) Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin InputStream, stdout OutputStream, ) error { opts.Vars = append(opts.Vars[:len(opts.Vars):len(opts.Vars)], func(_ context.Context, vars []EnvVar) []EnvVar { return append(vars, s.env...) }) - return s.inner.Start(ctx, opts, stdin, closeStdin, stdout, closeStdout) + return s.inner.Start(ctx, opts, stdin, stdout) } func (s *stageWithExtraEnv) Wait() error { diff --git a/pipe/function.go b/pipe/function.go index e5422f2..3f34cfa 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -78,19 +78,17 @@ func (s *goStage) Requirements() StageRequirements { func (s *goStage) Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin InputStream, stdout OutputStream, ) error { - stdinCloser := ownedCloser(stdin, closeStdin) - stdoutCloser := ownedCloser(stdout, closeStdout) - - r := stdin + r := stdin.Reader() + stdinCloser := stdin.Closer() if r == nil { // treat nil as empty input. r = strings.NewReader("") } - w := stdout + w := stdout.Writer() + stdoutCloser := stdout.Closer() if w == nil { // treat nil output as /dev/null w = io.Discard diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index 542e857..efd72d5 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -104,17 +104,12 @@ func (s *pipeSniffingStage) Requirements() pipe.StageRequirements { func (s *pipeSniffingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin pipe.InputStream, stdout pipe.OutputStream, ) error { - s.stdin = stdin - if closeStdin { - _ = stdin.(io.Closer).Close() - } - s.stdout = stdout - if closeStdout { - _ = stdout.(io.Closer).Close() - } + s.stdin = stdin.Reader() + stdin.Close() + s.stdout = stdout.Writer() + stdout.Close() return nil } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 08462ca..62d4b5e 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -219,10 +219,8 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { type stageStarter struct { requirements StageRequirements - stdin io.Reader - stdinCloser io.Closer - stdout io.Writer - stdoutCloser io.Closer + stdin InputStream + stdout OutputStream } func (requirement StreamRequirement) validate() error { @@ -324,15 +322,16 @@ func (p *Pipeline) Start(ctx context.Context) error { if p.stdin != nil { // Arrange for the input of the 0th stage to come from // `p.stdin`: - stageStarters[0].stdin = p.stdin - stageStarters[0].stdinCloser = p.stdinCloser + stageStarters[0].stdin = Input(p.stdin) } if p.stdout != nil { i := len(p.stages) - 1 ss := &stageStarters[i] - ss.stdout = p.stdout - ss.stdoutCloser = p.stdoutCloser + ss.stdout = OutputStream{ + writer: p.stdout, + closer: p.stdoutCloser, + } } // Clean up any processes and pipes that have been created. `i` is @@ -342,8 +341,8 @@ func (p *Pipeline) Start(ctx context.Context) error { // Close the pipe that the previous stage was writing to. // That should cause it to exit even if it's not minding // its context. - if closeFailedStageStdin && stageStarters[i].stdinCloser != nil { - _ = stageStarters[i].stdinCloser.Close() + if closeFailedStageStdin { + stageStarters[i].stdin.Close() } if i < len(p.stages)-1 && p.stdoutCloser != nil { _ = p.stdoutCloser.Close() @@ -381,23 +380,18 @@ func (p *Pipeline) Start(ctx context.Context) error { if err != nil { return abort(i, err, true) } - nextSS.stdin = nextStdin - nextSS.stdinCloser = nextStdin - ss.stdout = stdout - ss.stdoutCloser = stdout + nextSS.stdin = ClosingInput(nextStdin) + ss.stdout = ClosingOutput(stdout) } else { nextStdin, stdout := io.Pipe() - nextSS.stdin = nextStdin - nextSS.stdinCloser = nextStdin - ss.stdout = stdout - ss.stdoutCloser = stdout + nextSS.stdin = ClosingInput(nextStdin) + ss.stdout = ClosingOutput(stdout) } if err := s.Start( ctx, p.stageOptions(), - ss.stdin, ss.stdinCloser != nil, - ss.stdout, ss.stdoutCloser != nil, + ss.stdin, ss.stdout, ); err != nil { - nextSS.stdinCloser.Close() + nextSS.stdin.Close() return abort(i, err, false) } } @@ -412,8 +406,7 @@ func (p *Pipeline) Start(ctx context.Context) error { if err := s.Start( ctx, p.stageOptions(), - ss.stdin, ss.stdinCloser != nil, - ss.stdout, ss.stdoutCloser != nil, + ss.stdin, ss.stdout, ); err != nil { return abort(i, err, false) } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index cb7b363..334ef42 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -615,15 +615,10 @@ func (s ErrorStartingStage) Requirements() pipe.StageRequirements { func (s ErrorStartingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin pipe.InputStream, stdout pipe.OutputStream, ) error { - if closeStdin { - _ = stdin.(io.Closer).Close() - } - if closeStdout { - _ = stdout.(io.Closer).Close() - } + stdin.Close() + stdout.Close() return s.err } @@ -647,18 +642,13 @@ func (s requirementStage) Requirements() pipe.StageRequirements { func (s requirementStage) Start( _ context.Context, _ pipe.StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin pipe.InputStream, stdout pipe.OutputStream, ) error { if s.started != nil { *s.started = true } - if closeStdin { - _ = stdin.(io.Closer).Close() - } - if closeStdout { - _ = stdout.(io.Closer).Close() - } + stdin.Close() + stdout.Close() return nil } diff --git a/pipe/stage.go b/pipe/stage.go index e35f428..241b705 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -2,7 +2,6 @@ package pipe import ( "context" - "fmt" "io" ) @@ -12,12 +11,22 @@ import ( // Who closes stdin and stdout? // // A `Stage` as a whole is responsible for closing its end of stdin -// and stdout (assuming that `Start()` returns successfully) if the -// corresponding close flag passed to `Start()` is true. Its doing so -// tells the previous/next stage that it is done reading/writing data, -// which can affect their behavior. Therefore, it should close each -// one as soon as it is done with it. If the caller wants to suppress -// the closing of stdin/stdout, it passes a false close flag. +// and stdout if the corresponding stream is closing. That +// responsibility transfers to the stage as soon as `Start()` is called +// and applies even if `Start()` returns an error. Before returning an +// error from `Start()`, the stage must close any closing stream that +// it has not already handed off to something else that will close it +// promptly. The caller must not close a closing stream after passing it +// to `Start()`. +// +// If the caller wants to retain ownership of stdin/stdout, it passes a +// non-closing stream. The stage must not close a non-closing stream, +// even if `Start()` returns an error. +// +// Closing stdin/stdout tells the previous/next stage that this stage is +// done reading/writing data, which can affect their behavior. Therefore, +// after a successful start, a stage should close each one as soon as it +// is done with it. // // How this should be done depends on whether stdin/stdout are of type // `*os.File`. @@ -66,9 +75,9 @@ import ( // From the point of view of the pipeline as a whole, if stdin is // provided by the user (`WithStdin()`), then we don't want the first // stage to close it at all, whether it's an `*os.File` or not. The -// pipeline communicates this by passing closeStdin=false when it -// starts that stage. For stdout, it depends on whether the user -// supplied it using `WithStdout()` or `WithStdoutCloser()`. +// pipeline communicates this by passing a non-closing `InputStream` +// when it starts that stage. For stdout, it depends on whether the +// user supplied it using `WithStdout()` or `WithStdoutCloser()`. // // [1] It's theoretically possible for a command to pass the open file // descriptor to another, longer-lived process, in which case the @@ -76,6 +85,66 @@ import ( // command finishes. But that's ill-behaved in a command that is // being used in a pipeline, so we'll ignore that possibility. +type InputStream struct { + reader io.Reader + closer io.Closer +} + +// The stage may read from r but must not close it. +func Input(r io.Reader) InputStream { + return InputStream{reader: r} +} + +// The stage is responsible for closing r. +func ClosingInput(r io.ReadCloser) InputStream { + return InputStream{reader: r, closer: r} +} + +func (s InputStream) Reader() io.Reader { + return s.reader +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s InputStream) Closer() io.Closer { + return s.closer +} + +func (s InputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} + +type OutputStream struct { + writer io.Writer + closer io.Closer +} + +// The stage may write to w but must not close it. +func Output(w io.Writer) OutputStream { + return OutputStream{writer: w} +} + +// The stage is responsible for closing w. +func ClosingOutput(w io.WriteCloser) OutputStream { + return OutputStream{writer: w, closer: w} +} + +func (s OutputStream) Writer() io.Writer { + return s.writer +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s OutputStream) Closer() io.Closer { + return s.closer +} + +func (s OutputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} + type Stage interface { // Name returns the name of the stage. Name() string @@ -86,20 +155,21 @@ type Stage interface { // Start starts the stage in the background, in the environment // described by `opts.Env`, using `stdin` to provide its input and - // `stdout` to collect its output. (`stdin`/`stdout` might be set - // to `nil` if the stage is to receive no input, which might be the - // case for the first/last stage in a pipeline.) If `closeStdin` or - // `closeStdout` is true, the stage is responsible for closing the - // corresponding stream. A stream with a true close flag must - // implement `io.Closer`. See the `Stage` type comment for more - // information about responsibility for closing stdin and stdout. + // `stdout` to collect its output. (`stdin.Reader()` or + // `stdout.Writer()` might be `nil` if the stage is to receive no + // input or produce no output, which might be the case for the + // first/last stage in a pipeline.) If `stdin` or `stdout` is + // closing, the stage is responsible for closing the corresponding + // stream, even if `Start()` returns an error. See the `Stage` type + // comment for more information about responsibility for closing + // stdin and stdout. // // If `Start()` returns without an error, `Wait()` must also be - // called, to allow all resources to be freed. + // called, to allow all resources to be freed. If `Start()` returns + // an error, `Wait()` must not be called. Start( ctx context.Context, opts StageOptions, - stdin io.Reader, closeStdin bool, - stdout io.Writer, closeStdout bool, + stdin InputStream, stdout OutputStream, ) error // Wait waits for the stage to be done, either because it has @@ -108,17 +178,6 @@ type Stage interface { Wait() error } -func ownedCloser(stream any, owned bool) io.Closer { - if !owned { - return nil - } - closer, ok := stream.(io.Closer) - if !ok { - panic(fmt.Sprintf("stage asked to close %T, which does not implement io.Closer", stream)) - } - return closer -} - // StageOptions carries everything (other than `ctx`, `stdin`, and // `stdout`) that a pipeline passes to `Stage.Start`. type StageOptions struct { From c6d12cbbfef16f68837be4f56791ad312dd646ad Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Thu, 11 Jun 2026 10:35:36 +0200 Subject: [PATCH 04/26] windows could have these, we don't know until we try --- pipe/pipeline_test.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 334ef42..d75f5c4 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "os" - "runtime" "strconv" "strings" "testing" @@ -286,10 +285,6 @@ func TestIOPipePipelineReadFromSlowly(t *testing.T) { } func TestPipelineReadFromSlowly2(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'seq' unavailable") - } - t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -385,10 +380,6 @@ func TestPipelineStderr(t *testing.T) { } func TestPipelineInterrupted(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'sleep' unavailable") - } - t.Parallel() stdout := &bytes.Buffer{} @@ -407,10 +398,6 @@ func TestPipelineInterrupted(t *testing.T) { } func TestPipelineCanceled(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'sleep' unavailable") - } - t.Parallel() stdout := &bytes.Buffer{} @@ -434,10 +421,6 @@ func TestPipelineCanceled(t *testing.T) { // unread output in this case *does fit* within the OS-level pipe // buffer. func TestLittleEPIPE(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'sleep' unavailable") - } - t.Parallel() p := pipe.New() @@ -457,10 +440,6 @@ func TestLittleEPIPE(t *testing.T) { // amount of unread output in this case *does not fit* within the // OS-level pipe buffer. func TestBigEPIPE(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'seq' unavailable") - } - t.Parallel() p := pipe.New() @@ -480,10 +459,6 @@ func TestBigEPIPE(t *testing.T) { // amount of unread output in this case *does not fit* within the // OS-level pipe buffer. func TestIgnoredSIGPIPE(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("FIXME: test skipped on Windows: 'seq' unavailable") - } - t.Parallel() p := pipe.New() From 1d7a969cd5f6c1dab919e34cb63ea01e4256b9cb Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 10 Jun 2026 22:37:55 +0200 Subject: [PATCH 05/26] test Go producer behavior after downstream early exit Add regression coverage for the go-pipe v2 behavior where a Go producer can see a pipe error directly when a downstream command exits without fully reading stdin. This documents the visible semantic change from v1 to v2. Also, we add an example that demonstrates correctly handling downstream early exit in a stateful producer. --- pipe/pipeline_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index d75f5c4..c7d197a 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -474,6 +474,71 @@ func TestIgnoredSIGPIPE(t *testing.T) { assert.EqualValues(t, "foo\n", out) } +func TestGoProducerSeesPipeErrorWhenCommandStopsReading(t *testing.T) { + t.Parallel() + + p := pipe.New() + p.Add( + pipe.Function( + "write-to-closed-command", + func(_ context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error { + w := bufio.NewWriter(stdout) + for i := 0; i < 100000; i++ { + if _, err := fmt.Fprintln(w, i); err != nil { + return err + } + } + return w.Flush() + }, + ), + pipe.Command("true"), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err := p.Run(ctx) + require.Error(t, err) + assert.True(t, pipe.IsPipeError(err), "expected a pipe error, got %v", err) +} + +func TestIgnoredPipeErrorStillAllowsStatefulProducerToFinish(t *testing.T) { + t.Parallel() + + const total = 100000 + processed := 0 + p := pipe.New() + p.Add( + pipe.IgnoreError( + pipe.Function( + "stateful-producer", + func(_ context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error { + w := bufio.NewWriter(stdout) + var writeErr error + for i := 0; i < total; i++ { + processed++ + if writeErr == nil { + if _, err := fmt.Fprintln(w, i); err != nil { + writeErr = err + } + } + } + if writeErr == nil { + writeErr = w.Flush() + } + return writeErr + }, + ), + pipe.IsPipeError, + ), + pipe.Command("true"), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + require.NoError(t, p.Run(ctx)) + assert.Equal(t, total, processed) +} + func TestFunction(t *testing.T) { t.Parallel() ctx := context.Background() From 33a4e28507d8e7f9cbc5794c364b4e81793592a2 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Wed, 10 Jun 2026 23:09:21 +0200 Subject: [PATCH 06/26] document producer pipe errors in v2 migration Explain that go-pipe v1 could hide producer-side pipe write errors as an implementation detail of the command stdin-copy path, while v2 exposes those errors more directly. Document the migration patterns for stateless and stateful producers. --- README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/README.md b/README.md index 995ee86..aabd484 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,31 @@ A package used to easily build command pipelines in your Go applications # Important We have not thoroughly tested this package on OSs other than Linux, especially Windows. At this time, using this package on Windows based systems is considered experimental and will be supported only on a best effort basis. +# Migrating to v2 + +It's normal for pipelines to stop before all input has been consumed[^1]. If an earlier stage continues writing after that happens, the write side of the pipe can fail with `EPIPE`, `SIGPIPE`, or `io.ErrClosedPipe`. + +In go-pipe v1 it was possible to get away without handling this case, because a command stage's stdin was connected in a way that often (but not necessarily!) drained the write side and hid the error from the previous stage feeding it. That was an implementation detail, not a guarantee. In go-pipe v2, producer stages are more likely to be connected directly to a command's stdin, and thus see the error themselves. + +Fortunately, this is easily handled by wrapping the stage with `pipe.IgnoreError(stage, IsPipeError)`. If the producer only writes output and is otherwise stateless, that's the only thing needed. + +If the producer also updates state, metrics, cursors, or has other side effects, in a way that depends on how much of the output was produced, then in addition to using `pipe.IgnoreError`, you must also ensure producer-owned state is brought to a consistent point before returning the error. + +For example, if a stateful producer function must process its entire input for correctness regardless of whether it was read by the consumer, it should use a pattern like: + +```go +var writeErr error +for _, item := range items { + updateState(item) + if writeErr == nil { + _, writeErr = fmt.Fprintln(stdout, item) + } +} +return writeErr +``` + # Links * [Docs](https://pkg.go.dev/github.com/github/go-pipe/v2) + +[^1]: In `cat foo | head | grep -q`, for example, either `head` or `grep` could exit before its input is fully consumed. From ac585a868aa0a01cd9f481670f98ac1741391ee9 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 20:08:24 +0200 Subject: [PATCH 07/26] fully document stream ownership (close responsibility) --- pipe/pipeline.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 62d4b5e..7e9637c 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -99,7 +99,9 @@ func WithDir(dir string) Option { } } -// WithStdin assigns stdin to the first command in the pipeline. +// WithStdin assigns stdin to the first command in the pipeline. The +// caller retains ownership of stdin; the pipeline will not close it, +// even if `Start()` returns an error. func WithStdin(stdin io.Reader) Option { return func(p *Pipeline) { p.stdin = stdin @@ -107,7 +109,9 @@ func WithStdin(stdin io.Reader) Option { } } -// WithStdout assigns stdout to the last command in the pipeline. +// WithStdout assigns stdout to the last command in the pipeline. The +// caller retains ownership of stdout; the pipeline will not close it, +// even if `Start()` returns an error. func WithStdout(stdout io.Writer) Option { return func(p *Pipeline) { p.stdout = stdout @@ -116,7 +120,9 @@ func WithStdout(stdout io.Writer) Option { } // WithStdoutCloser assigns stdout to the last command in the -// pipeline, and closes stdout when it's done. +// pipeline, and closes stdout when the pipeline is done with it. The +// pipeline is responsible for closing stdout even if `Start()` returns +// an error. func WithStdoutCloser(stdout io.WriteCloser) Option { return func(p *Pipeline) { p.stdout = stdout @@ -270,6 +276,13 @@ func (p *Pipeline) stageOptions() StageOptions { // Start starts the commands in the pipeline. If `Start()` exits // without an error, `Wait()` must also be called, to allow all // resources to be freed. +// +// If `Start()` returns an error, `Wait()` must not be called. Before +// returning an error, `Start()` cancels and waits for any stages that +// were started, closes any inter-stage pipes that the pipeline owns, +// and closes stdout if it was supplied with `WithStdoutCloser()`. +// Streams supplied with `WithStdin()` or `WithStdout()` remain owned by +// the caller and are not closed by the pipeline. func (p *Pipeline) Start(ctx context.Context) error { if p.hasStarted() { panic("attempt to start a pipeline that has already started") @@ -513,7 +526,9 @@ func (p *Pipeline) Wait() error { return nil } -// Run starts and waits for the commands in the pipeline. +// Run starts and waits for the commands in the pipeline. If startup +// fails, it returns the `Start()` error after `Start()` has performed +// its failure cleanup. func (p *Pipeline) Run(ctx context.Context) error { if err := p.Start(ctx); err != nil { return err From a65fafb8e6a97c3f82d27f78bd347d6b014aeeda Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 21:40:12 +0200 Subject: [PATCH 08/26] Document early-close producer caveats Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/filter-error.go | 12 +++++++++++- pipe/function.go | 9 +++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pipe/filter-error.go b/pipe/filter-error.go index 654796a..9bdee27 100644 --- a/pipe/filter-error.go +++ b/pipe/filter-error.go @@ -48,6 +48,12 @@ type ErrorMatcher func(err error) bool // the functions from the standard library that has the same signature // (e.g., `os.IsTimeout`), or some combination of these (e.g., // `AnyError(IsSIGPIPE, os.IsTimeout)`). +// +// `IgnoreError` only suppresses the error returned by the wrapped +// stage. If a producer ignores pipe errors because a later stage can +// stop reading early, the producer is still responsible for keeping any +// producer-owned state, metrics, cursors, or other side effects +// consistent before returning the ignored error. func IgnoreError(s Stage, em ErrorMatcher) Stage { return FilterError(s, func(err error) error { @@ -128,7 +134,11 @@ var ( // IsPipeError is an `ErrorMatcher` that matches a few different // errors that typically result if a stage writes to a subsequent - // stage that has stopped reading from its stdin. Use like + // stage that has stopped reading from its stdin. This is commonly + // useful with `IgnoreError` for stateless producer stages whose only + // job is writing output. Stateful producers should continue any + // producer-owned state updates needed for consistency before + // returning the pipe error for `IgnoreError` to suppress. Use like // // p.Add(IgnoreError(someStage, IsPipeError)) IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe) diff --git a/pipe/function.go b/pipe/function.go index 3f34cfa..d8a09af 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -17,6 +17,15 @@ import ( // Neither `stdin` nor `stdout` are necessarily buffered. If the // `StageFunc` requires buffering, it needs to arrange that itself. // +// A later stage can stop reading before this function has written all +// of its output. In that case, writes to `stdout` can fail with an +// error matched by `IsPipeError`. If the function only writes output +// and is otherwise stateless, callers can usually wrap the stage with +// `IgnoreError(stage, IsPipeError)`. If the function also updates +// producer-owned state, metrics, cursors, or other side effects that +// depend on how much output was produced, it should bring those side +// effects to a consistent point before returning the write error. +// // A `StageFunc` is run in a separate goroutine, so it must be careful // to synchronize any data access aside from reading and writing. type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Writer) error From 433e10423d5c26b0310d02a23e34d46bc7b3adf8 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Fri, 12 Jun 2026 22:37:13 +0200 Subject: [PATCH 09/26] Align start-failure cleanup with stream ownership Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/close_responsibility_test.go | 20 ++++++++++---------- pipe/command_stdout_fastpath_test.go | 12 ++++++------ pipe/pipeline.go | 15 +++++++++------ pipe/pipeline_test.go | 20 +++++++++++++++++++- pipe/stage.go | 18 ++++++++++++------ 5 files changed, 56 insertions(+), 29 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 61f6d17..162817d 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -34,9 +34,9 @@ func (w *writeCloseSpy) Close() error { return nil } -// TestGoStageHonorsCloseFlags verifies that a Function stage closes -// stdin/stdout iff the corresponding close flag is true. -func TestGoStageHonorsCloseFlags(t *testing.T) { +// TestGoStageHonorsStreamOwnership verifies that a Function stage closes +// stdin/stdout iff the corresponding stream is closing. +func TestGoStageHonorsStreamOwnership(t *testing.T) { cases := []struct { name string leaveIn, leaveOut bool @@ -63,8 +63,8 @@ func TestGoStageHonorsCloseFlags(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closeStdin=%v", !tc.leaveIn) - assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closeStdout=%v", !tc.leaveOut) + assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closing stdin=%v", !tc.leaveIn) + assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closing stdout=%v", !tc.leaveOut) }) } } @@ -88,8 +88,8 @@ func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a -// non-file stdin (a "late" closer) iff closeStdin is true. An empty -// reader is used so exec.Cmd's input-copy goroutine sees EOF promptly. +// non-file stdin (a "late" closer) iff the input stream is closing. An +// empty reader is used so exec.Cmd's input-copy goroutine sees EOF promptly. func TestCommandStageHonorsCloseStdin(t *testing.T) { for _, leave := range []bool{false, true} { name := "owns stdin" @@ -109,14 +109,14 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, in.closed.Load(), "closeStdin=%v", !leave) + assert.Equal(t, !leave, in.closed.Load(), "closing stdin=%v", !leave) }) } } // TestCommandStageHonorsCloseStdout verifies the stdout counterpart: a // non-file stdout (routed through the pooled-copy path) is closed iff -// closeStdout is true. +// the output stream is closing. func TestCommandStageHonorsCloseStdout(t *testing.T) { for _, leave := range []bool{false, true} { name := "owns stdout" @@ -136,7 +136,7 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, out.closed.Load(), "closeStdout=%v", !leave) + assert.Equal(t, !leave, out.closed.Load(), "closing stdout=%v", !leave) }) } } diff --git a/pipe/command_stdout_fastpath_test.go b/pipe/command_stdout_fastpath_test.go index 095a2a6..4b2240c 100644 --- a/pipe/command_stdout_fastpath_test.go +++ b/pipe/command_stdout_fastpath_test.go @@ -18,15 +18,15 @@ import ( // subprocess can detect when that fd is closed. func TestCommandStageStdoutFastPath(t *testing.T) { cases := []struct { - name string - closeStdout bool + name string + closingStdout bool }{ { - name: "raw *os.File with closeStdout", - closeStdout: true, + name: "raw *os.File with closing stdout", + closingStdout: true, }, { - name: "raw *os.File without closeStdout", + name: "raw *os.File with non-closing stdout", }, } for _, tc := range cases { @@ -43,7 +43,7 @@ func TestCommandStageStdoutFastPath(t *testing.T) { s := CommandStage("true", cmd).(*commandStage) stdout := OutputStream{writer: f} - if tc.closeStdout { + if tc.closingStdout { stdout = ClosingOutput(f) } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 7e9637c..2ba54cc 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -347,16 +347,19 @@ func (p *Pipeline) Start(ctx context.Context) error { } } - // Clean up any processes and pipes that have been created. `i` is - // the index of the stage that failed to start (whose output pipe - // has already been cleaned up if necessary). + // Clean up any processes and pipes that have been created. `i` is the + // index of the stage that failed to start. If the stage already received + // its streams, it owns any closing stream. abort := func(i int, err error, closeFailedStageStdin bool) error { - // Close the pipe that the previous stage was writing to. - // That should cause it to exit even if it's not minding - // its context. + // If the failing stage never received its stdin, close the pipe that + // the previous stage was writing to. That should cause it to exit + // even if it's not minding its context. if closeFailedStageStdin { stageStarters[i].stdin.Close() } + + // If stdout was supplied with WithStdoutCloser but the final stage + // was never started, then the pipeline still owns that closer. if i < len(p.stages)-1 && p.stdoutCloser != nil { _ = p.stdoutCloser.Close() } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index c7d197a..fe4b630 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -83,13 +83,15 @@ func TestPipelineFirstStageFailsToStart(t *testing.T) { t.Parallel() ctx := context.Background() startErr := errors.New("foo") + stdout := &closeTrackingWriter{} - p := pipe.New() + p := pipe.New(pipe.WithStdoutCloser(stdout)) p.Add( ErrorStartingStage{startErr}, ErrorStartingStage{errors.New("this error should never happen")}, ) assert.ErrorIs(t, p.Run(ctx), startErr) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") } func TestPipelineFirstStageFailsToStartClosesStdoutCloser(t *testing.T) { @@ -120,6 +122,22 @@ func TestPipelineSecondStageFailsToStart(t *testing.T) { assert.ErrorIs(t, p.Run(ctx), startErr) } +func TestPipelineMiddleStageFailsToStartClosesUnstartedStdoutCloser(t *testing.T) { + t.Parallel() + ctx := context.Background() + startErr := errors.New("foo") + stdout := &closeTrackingWriter{} + + p := pipe.New(pipe.WithStdoutCloser(stdout)) + p.Add( + seqFunction(20000), + ErrorStartingStage{startErr}, + ErrorStartingStage{errors.New("this error should never happen")}, + ) + assert.ErrorIs(t, p.Run(ctx), startErr) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") +} + func TestPipelineSingleCommandOutput(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/pipe/stage.go b/pipe/stage.go index 241b705..a46c01e 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -50,18 +50,24 @@ import ( // f.Close() // close our copy // cmd.Wait() // -// If the stage is an external command and one of its arguments is not -// an `*os.File`, then `exec.Cmd` will take care of creating an -// `os.Pipe()`, copying from the provided argument in/out of the pipe, -// and eventually closing both ends of the pipe. The stage must close -// the argument itself, but only _after_ the external command has +// If the stage is an external command and its stdin is not an +// `*os.File`, then `exec.Cmd` will take care of creating an +// `os.Pipe()`, copying from the provided reader into the pipe, and +// eventually closing both ends of the pipe. The stage must close the +// provided stdin itself, but only _after_ the external command has // finished, like so: // -// cmd.Stdin = r // Similarly for stdout +// cmd.Stdin = r // cmd.Start(…) // cmd.Wait() // r.Close() // +// If the stage is an external command and its stdout is not an +// `*os.File`, the stage creates a pipe, passes the write end to the +// command, and copies from the read end to the provided writer. The +// stage must close the provided stdout itself, but only _after_ the +// external command and the copy have finished. +// // If the stage is a Go function, then it holds the only copy of // stdin/stdout, so it must wait until the function is done before // closing them (regardless of their underlying type, like so: From 642eb3e4f65189d5b33461318d725ee888ac4e12 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 11:32:14 +0200 Subject: [PATCH 10/26] InputStream, OutputStream: move types to a separate file --- pipe/stage.go | 61 ----------------------------------------------- pipe/streams.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 61 deletions(-) create mode 100644 pipe/streams.go diff --git a/pipe/stage.go b/pipe/stage.go index a46c01e..8263beb 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -2,7 +2,6 @@ package pipe import ( "context" - "io" ) // Stage is an element of a `Pipeline`. It reads from standard input @@ -91,66 +90,6 @@ import ( // command finishes. But that's ill-behaved in a command that is // being used in a pipeline, so we'll ignore that possibility. -type InputStream struct { - reader io.Reader - closer io.Closer -} - -// The stage may read from r but must not close it. -func Input(r io.Reader) InputStream { - return InputStream{reader: r} -} - -// The stage is responsible for closing r. -func ClosingInput(r io.ReadCloser) InputStream { - return InputStream{reader: r, closer: r} -} - -func (s InputStream) Reader() io.Reader { - return s.reader -} - -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s InputStream) Closer() io.Closer { - return s.closer -} - -func (s InputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() - } -} - -type OutputStream struct { - writer io.Writer - closer io.Closer -} - -// The stage may write to w but must not close it. -func Output(w io.Writer) OutputStream { - return OutputStream{writer: w} -} - -// The stage is responsible for closing w. -func ClosingOutput(w io.WriteCloser) OutputStream { - return OutputStream{writer: w, closer: w} -} - -func (s OutputStream) Writer() io.Writer { - return s.writer -} - -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s OutputStream) Closer() io.Closer { - return s.closer -} - -func (s OutputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() - } -} - type Stage interface { // Name returns the name of the stage. Name() string diff --git a/pipe/streams.go b/pipe/streams.go new file mode 100644 index 0000000..259b8c5 --- /dev/null +++ b/pipe/streams.go @@ -0,0 +1,63 @@ +package pipe + +import "io" + +type InputStream struct { + reader io.Reader + closer io.Closer +} + +// The stage may read from r but must not close it. +func Input(r io.Reader) InputStream { + return InputStream{reader: r} +} + +// The stage is responsible for closing r. +func ClosingInput(r io.ReadCloser) InputStream { + return InputStream{reader: r, closer: r} +} + +func (s InputStream) Reader() io.Reader { + return s.reader +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s InputStream) Closer() io.Closer { + return s.closer +} + +func (s InputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} + +type OutputStream struct { + writer io.Writer + closer io.Closer +} + +// The stage may write to w but must not close it. +func Output(w io.Writer) OutputStream { + return OutputStream{writer: w} +} + +// The stage is responsible for closing w. +func ClosingOutput(w io.WriteCloser) OutputStream { + return OutputStream{writer: w, closer: w} +} + +func (s OutputStream) Writer() io.Writer { + return s.writer +} + +// Closer returns the stream closer, or nil if the stream is non-closing. +func (s OutputStream) Closer() io.Closer { + return s.closer +} + +func (s OutputStream) Close() { + if s.closer != nil { + _ = s.closer.Close() + } +} From 67ca961aed435e4e24d2e9a34ced8aed70394a8d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:09:36 +0200 Subject: [PATCH 11/26] InputStream, OutputStream: change `Close()` methods to pass errors through --- pipe/pipe_matching_test.go | 4 ++-- pipe/pipeline.go | 4 ++-- pipe/pipeline_test.go | 8 ++++---- pipe/streams.go | 14 ++++++++------ 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index efd72d5..00e6df0 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -107,9 +107,9 @@ func (s *pipeSniffingStage) Start( stdin pipe.InputStream, stdout pipe.OutputStream, ) error { s.stdin = stdin.Reader() - stdin.Close() + _ = stdin.Close() s.stdout = stdout.Writer() - stdout.Close() + _ = stdout.Close() return nil } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 2ba54cc..7424faf 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -355,7 +355,7 @@ func (p *Pipeline) Start(ctx context.Context) error { // the previous stage was writing to. That should cause it to exit // even if it's not minding its context. if closeFailedStageStdin { - stageStarters[i].stdin.Close() + _ = stageStarters[i].stdin.Close() } // If stdout was supplied with WithStdoutCloser but the final stage @@ -407,7 +407,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ctx, p.stageOptions(), ss.stdin, ss.stdout, ); err != nil { - nextSS.stdin.Close() + _ = nextSS.stdin.Close() return abort(i, err, false) } } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index fe4b630..c04ec1b 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -675,8 +675,8 @@ func (s ErrorStartingStage) Start( _ context.Context, _ pipe.StageOptions, stdin pipe.InputStream, stdout pipe.OutputStream, ) error { - stdin.Close() - stdout.Close() + _ = stdin.Close() + _ = stdout.Close() return s.err } @@ -705,8 +705,8 @@ func (s requirementStage) Start( if s.started != nil { *s.started = true } - stdin.Close() - stdout.Close() + _ = stdin.Close() + _ = stdout.Close() return nil } diff --git a/pipe/streams.go b/pipe/streams.go index 259b8c5..c38f48e 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -26,10 +26,11 @@ func (s InputStream) Closer() io.Closer { return s.closer } -func (s InputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() +func (s InputStream) Close() error { + if s.closer == nil { + return nil } + return s.closer.Close() } type OutputStream struct { @@ -56,8 +57,9 @@ func (s OutputStream) Closer() io.Closer { return s.closer } -func (s OutputStream) Close() { - if s.closer != nil { - _ = s.closer.Close() +func (s OutputStream) Close() error { + if s.closer == nil { + return nil } + return s.closer.Close() } From e3a82235a71e9be4b48b27f6edb01ebff95ff367 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 12:12:27 +0200 Subject: [PATCH 12/26] Stage: update docstring to reflect use of streams --- pipe/stage.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipe/stage.go b/pipe/stage.go index 8263beb..590a738 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -44,9 +44,9 @@ import ( // external command will keep its own copy open as long as necessary // (and no longer!). It should use roughly the following sequence: // -// cmd.Stdin = f // Similarly for stdout +// cmd.Stdin = stdin.Reader() // Similarly for stdout // cmd.Start(…) -// f.Close() // close our copy +// stdin.Close() // Close our copy // cmd.Wait() // // If the stage is an external command and its stdin is not an @@ -56,10 +56,10 @@ import ( // provided stdin itself, but only _after_ the external command has // finished, like so: // -// cmd.Stdin = r +// cmd.Stdin = stdin.Reader() // Similarly for stdout // cmd.Start(…) // cmd.Wait() -// r.Close() +// stdin.Close() // Close // // If the stage is an external command and its stdout is not an // `*os.File`, the stage creates a pipe, passes the write end to the From aa3db90cc7b034a797f6419ae1706315284f8f5a Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:14:13 +0200 Subject: [PATCH 13/26] InputStream, OutputStream: remove the `Closer()` methods This is to force all close calls to come through `InputStream.Close()` and `OutputStream.Close()`, which will soon get some more functionality. --- pipe/close_responsibility_test.go | 38 +++++++++++++++++++------------ pipe/command.go | 28 ++++++++--------------- pipe/function.go | 14 ++++-------- pipe/stage.go | 9 ++++---- pipe/streams.go | 10 -------- 5 files changed, 41 insertions(+), 58 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 162817d..c7a2a12 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -70,21 +70,29 @@ func TestGoStageHonorsStreamOwnership(t *testing.T) { } func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { - borrowedInput := strings.NewReader("borrowed") - assert.Same(t, borrowedInput, Input(borrowedInput).Reader()) - assert.Nil(t, Input(borrowedInput).Closer()) - - ownedInput := &readCloseSpy{Reader: strings.NewReader("owned")} - assert.Same(t, ownedInput, ClosingInput(ownedInput).Reader()) - assert.Same(t, ownedInput, ClosingInput(ownedInput).Closer()) - - borrowedOutput := &strings.Builder{} - assert.Same(t, borrowedOutput, Output(borrowedOutput).Writer()) - assert.Nil(t, Output(borrowedOutput).Closer()) - - ownedOutput := &writeCloseSpy{Writer: io.Discard} - assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Writer()) - assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Closer()) + borrowedReader := &readCloseSpy{Reader: strings.NewReader("borrowed")} + borrowedInput := Input(borrowedReader) + assert.Same(t, borrowedReader, borrowedInput.Reader()) + assert.NoError(t, borrowedInput.Close()) + assert.False(t, borrowedReader.closed.Load()) + + ownedReader := &readCloseSpy{Reader: strings.NewReader("owned")} + ownedInput := ClosingInput(ownedReader) + assert.Same(t, ownedReader, ownedInput.Reader()) + assert.NoError(t, ownedInput.Close()) + assert.True(t, ownedReader.closed.Load()) + + borrowedWriter := &writeCloseSpy{Writer: &strings.Builder{}} + borrowedOutput := Output(borrowedWriter) + assert.Same(t, borrowedWriter, borrowedOutput.Writer()) + assert.NoError(t, borrowedOutput.Close()) + assert.False(t, borrowedWriter.closed.Load()) + + ownedWriter := &writeCloseSpy{Writer: &writeCloseSpy{Writer: io.Discard}} + ownedOutput := ClosingOutput(ownedWriter) + assert.Same(t, ownedWriter, ownedOutput.Writer()) + assert.NoError(t, ownedOutput.Close()) + assert.True(t, ownedWriter.closed.Load()) } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a diff --git a/pipe/command.go b/pipe/command.go index d85314b..e1d5c01 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -91,9 +91,7 @@ func (s *commandStage) Start( ins InputStream, outs OutputStream, ) error { stdin := ins.Reader() - stdinCloser := ins.Closer() stdout := outs.Writer() - stdoutCloser := outs.Closer() if s.cmd.Dir == "" { s.cmd.Dir = opts.Dir @@ -109,14 +107,12 @@ func (s *commandStage) Start( s.cmd.Stdin = stdin } - if stdinCloser != nil { - if _, ok := stdin.(*os.File); ok { - // We can close our copy as soon as the command has started - earlyClosers = append(earlyClosers, stdinCloser) - } else { - // We need to close `stdin`, but only after the command has finished - s.lateClosers = append(s.lateClosers, stdinCloser) - } + if _, ok := stdin.(*os.File); ok { + // We can close our copy as soon as the command has started + earlyClosers = append(earlyClosers, ins) + } else { + // We need to close `stdin`, but only after the command has finished + s.lateClosers = append(s.lateClosers, ins) } closeEarlyClosers := func() { @@ -136,13 +132,9 @@ func (s *commandStage) Start( if stdout != nil { if f, ok := stdout.(*os.File); ok { s.cmd.Stdout = f - if stdoutCloser != nil { - earlyClosers = append(earlyClosers, stdoutCloser) - } + earlyClosers = append(earlyClosers, outs) } else { - if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) - } + s.lateClosers = append(s.lateClosers, outs) // Route the copy through our own pipe so we can use a // pooled buffer rather than letting exec.Cmd allocate a // fresh 32KB buffer for its internal io.Copy. @@ -153,8 +145,8 @@ func (s *commandStage) Start( } earlyClosers = append(earlyClosers, ec) } - } else if stdoutCloser != nil { - s.lateClosers = append(s.lateClosers, stdoutCloser) + } else { + s.lateClosers = append(s.lateClosers, outs) } // If the caller hasn't arranged otherwise, read the command's diff --git a/pipe/function.go b/pipe/function.go index d8a09af..6f45db2 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -90,14 +90,12 @@ func (s *goStage) Start( stdin InputStream, stdout OutputStream, ) error { r := stdin.Reader() - stdinCloser := stdin.Closer() if r == nil { // treat nil as empty input. r = strings.NewReader("") } w := stdout.Writer() - stdoutCloser := stdout.Closer() if w == nil { // treat nil output as /dev/null w = io.Discard @@ -110,15 +108,11 @@ func (s *goStage) Start( s.err = opts.PanicHandler(p) } } - if stdoutCloser != nil { - if err := stdoutCloser.Close(); err != nil && s.err == nil { - s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err) - } + if err := stdout.Close(); err != nil && s.err == nil { + s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err) } - if stdinCloser != nil { - if err := stdinCloser.Close(); err != nil && s.err == nil { - s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err) - } + if err := stdin.Close(); err != nil && s.err == nil { + s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err) } close(s.done) }() diff --git a/pipe/stage.go b/pipe/stage.go index 590a738..7e74f67 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -103,11 +103,10 @@ type Stage interface { // `stdout` to collect its output. (`stdin.Reader()` or // `stdout.Writer()` might be `nil` if the stage is to receive no // input or produce no output, which might be the case for the - // first/last stage in a pipeline.) If `stdin` or `stdout` is - // closing, the stage is responsible for closing the corresponding - // stream, even if `Start()` returns an error. See the `Stage` type - // comment for more information about responsibility for closing - // stdin and stdout. + // first/last stage in a pipeline.) The stage is responsible for + // calling `stdin.Close()` and `stdout.Close()`, even if `Start()` + // returns an error. See the `Stage` type comment for more + // information about responsibility for closing stdin and stdout. // // If `Start()` returns without an error, `Wait()` must also be // called, to allow all resources to be freed. If `Start()` returns diff --git a/pipe/streams.go b/pipe/streams.go index c38f48e..9bf0bd5 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -21,11 +21,6 @@ func (s InputStream) Reader() io.Reader { return s.reader } -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s InputStream) Closer() io.Closer { - return s.closer -} - func (s InputStream) Close() error { if s.closer == nil { return nil @@ -52,11 +47,6 @@ func (s OutputStream) Writer() io.Writer { return s.writer } -// Closer returns the stream closer, or nil if the stream is non-closing. -func (s OutputStream) Closer() io.Closer { - return s.closer -} - func (s OutputStream) Close() error { if s.closer == nil { return nil From 63d8e51c30759555f2af6e76c4d9360ff92ef29d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 13:54:02 +0200 Subject: [PATCH 14/26] commandStage.Start(): rename some local variables --- pipe/command.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index e1d5c01..31ae198 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -88,10 +88,10 @@ func (s *commandStage) Requirements() StageRequirements { func (s *commandStage) Start( ctx context.Context, opts StageOptions, - ins InputStream, outs OutputStream, + stdin InputStream, stdout OutputStream, ) error { - stdin := ins.Reader() - stdout := outs.Writer() + r := stdin.Reader() + w := stdout.Writer() if s.cmd.Dir == "" { s.cmd.Dir = opts.Dir @@ -103,16 +103,16 @@ func (s *commandStage) Start( var earlyClosers []io.Closer // See the type comment for `Stage` for the explanation of this closing behavior. - if stdin != nil { - s.cmd.Stdin = stdin + if r != nil { + s.cmd.Stdin = r } - if _, ok := stdin.(*os.File); ok { + if _, ok := r.(*os.File); ok { // We can close our copy as soon as the command has started - earlyClosers = append(earlyClosers, ins) + earlyClosers = append(earlyClosers, stdin) } else { // We need to close `stdin`, but only after the command has finished - s.lateClosers = append(s.lateClosers, ins) + s.lateClosers = append(s.lateClosers, stdin) } closeEarlyClosers := func() { @@ -129,16 +129,16 @@ func (s *commandStage) Start( _ = s.closeLateClosers() } - if stdout != nil { - if f, ok := stdout.(*os.File); ok { + if w != nil { + if f, ok := w.(*os.File); ok { s.cmd.Stdout = f - earlyClosers = append(earlyClosers, outs) + earlyClosers = append(earlyClosers, stdout) } else { - s.lateClosers = append(s.lateClosers, outs) + s.lateClosers = append(s.lateClosers, stdout) // Route the copy through our own pipe so we can use a // pooled buffer rather than letting exec.Cmd allocate a // fresh 32KB buffer for its internal io.Copy. - ec, err := s.setupPooledStdout(stdout) + ec, err := s.setupPooledStdout(w) if err != nil { cleanupOnStartFailure() return err @@ -146,7 +146,7 @@ func (s *commandStage) Start( earlyClosers = append(earlyClosers, ec) } } else { - s.lateClosers = append(s.lateClosers, outs) + s.lateClosers = append(s.lateClosers, stdout) } // If the caller hasn't arranged otherwise, read the command's From 6ef82b52dc931929be259988004805e30408cd18 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:15:11 +0200 Subject: [PATCH 15/26] InputStream, OutputStream: make these into pointer types This will be needed in a moment. --- pipe/close_responsibility_test.go | 4 +- pipe/command.go | 2 +- pipe/command_stdout_fastpath_test.go | 4 +- pipe/env_stage.go | 2 +- pipe/function.go | 2 +- pipe/pipe_matching_test.go | 2 +- pipe/pipeline.go | 10 ++--- pipe/pipeline_test.go | 4 +- pipe/stage.go | 2 +- pipe/streams.go | 58 +++++++++++++++++++++------- 10 files changed, 61 insertions(+), 29 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index c7a2a12..8765d90 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -149,14 +149,14 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { } } -func inputForTest(r io.ReadCloser, closing bool) InputStream { +func inputForTest(r io.ReadCloser, closing bool) *InputStream { if closing { return ClosingInput(r) } return Input(r) } -func outputForTest(w io.WriteCloser, closing bool) OutputStream { +func outputForTest(w io.WriteCloser, closing bool) *OutputStream { if closing { return ClosingOutput(w) } diff --git a/pipe/command.go b/pipe/command.go index 31ae198..908516b 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -88,7 +88,7 @@ func (s *commandStage) Requirements() StageRequirements { func (s *commandStage) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { r := stdin.Reader() w := stdout.Writer() diff --git a/pipe/command_stdout_fastpath_test.go b/pipe/command_stdout_fastpath_test.go index 4b2240c..0de3f51 100644 --- a/pipe/command_stdout_fastpath_test.go +++ b/pipe/command_stdout_fastpath_test.go @@ -42,9 +42,11 @@ func TestCommandStageStdoutFastPath(t *testing.T) { cmd := exec.Command("true") s := CommandStage("true", cmd).(*commandStage) - stdout := OutputStream{writer: f} + var stdout *OutputStream if tc.closingStdout { stdout = ClosingOutput(f) + } else { + stdout = Output(f) } require.NoError(t, s.Start(ctx, StageOptions{}, Input(nil), stdout)) diff --git a/pipe/env_stage.go b/pipe/env_stage.go index 7503e5a..3db2296 100644 --- a/pipe/env_stage.go +++ b/pipe/env_stage.go @@ -37,7 +37,7 @@ func (s *stageWithExtraEnv) Requirements() StageRequirements { func (s *stageWithExtraEnv) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { opts.Vars = append(opts.Vars[:len(opts.Vars):len(opts.Vars)], func(_ context.Context, vars []EnvVar) []EnvVar { return append(vars, s.env...) diff --git a/pipe/function.go b/pipe/function.go index 6f45db2..a80947f 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -87,7 +87,7 @@ func (s *goStage) Requirements() StageRequirements { func (s *goStage) Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error { r := stdin.Reader() if r == nil { diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index 00e6df0..badb990 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -104,7 +104,7 @@ func (s *pipeSniffingStage) Requirements() pipe.StageRequirements { func (s *pipeSniffingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { s.stdin = stdin.Reader() _ = stdin.Close() diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 7424faf..bbae72b 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -225,8 +225,8 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { type stageStarter struct { requirements StageRequirements - stdin InputStream - stdout OutputStream + stdin *InputStream + stdout *OutputStream } func (requirement StreamRequirement) validate() error { @@ -339,9 +339,9 @@ func (p *Pipeline) Start(ctx context.Context) error { } if p.stdout != nil { - i := len(p.stages) - 1 - ss := &stageStarters[i] - ss.stdout = OutputStream{ + // Arrange for the output of the last stage to go to + // `p.stdout`: + stageStarters[len(p.stages)-1].stdout = &OutputStream{ writer: p.stdout, closer: p.stdoutCloser, } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index c04ec1b..82dc2cd 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -673,7 +673,7 @@ func (s ErrorStartingStage) Requirements() pipe.StageRequirements { func (s ErrorStartingStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { _ = stdin.Close() _ = stdout.Close() @@ -700,7 +700,7 @@ func (s requirementStage) Requirements() pipe.StageRequirements { func (s requirementStage) Start( _ context.Context, _ pipe.StageOptions, - stdin pipe.InputStream, stdout pipe.OutputStream, + stdin *pipe.InputStream, stdout *pipe.OutputStream, ) error { if s.started != nil { *s.started = true diff --git a/pipe/stage.go b/pipe/stage.go index 7e74f67..0a22079 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -113,7 +113,7 @@ type Stage interface { // an error, `Wait()` must not be called. Start( ctx context.Context, opts StageOptions, - stdin InputStream, stdout OutputStream, + stdin *InputStream, stdout *OutputStream, ) error // Wait waits for the stage to be done, either because it has diff --git a/pipe/streams.go b/pipe/streams.go index 9bf0bd5..790f58e 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -2,53 +2,83 @@ package pipe import "io" +// InputStream represents `stdin` for a stage, which might or might +// not need to be closed when the stage is done with it. It usually +// holds an `io.Reader`, which can be retrieved using `Reader()`. Its +// `Close()` method closes the reader if necessary (i.e., if the +// `InputStream` was constructed using `ClosingInput()`. +// +// A nil `*InputStream` is a valid value. Its `Reader()` method +// returns `nil` and `Close()` does nothing successfully. type InputStream struct { reader io.Reader closer io.Closer } // The stage may read from r but must not close it. -func Input(r io.Reader) InputStream { - return InputStream{reader: r} +func Input(r io.Reader) *InputStream { + return &InputStream{reader: r} } // The stage is responsible for closing r. -func ClosingInput(r io.ReadCloser) InputStream { - return InputStream{reader: r, closer: r} +func ClosingInput(r io.ReadCloser) *InputStream { + return &InputStream{reader: r, closer: r} } -func (s InputStream) Reader() io.Reader { +func (s *InputStream) Reader() io.Reader { + if s == nil { + return nil + } return s.reader } -func (s InputStream) Close() error { - if s.closer == nil { +// Close closes the underlying reader if necessary. If `s` was +// constructed using `ClosingInput()`, then close the `io.ReadCloser` +// that was passed to that function. If `s` is `nil` or was +// constructed using `Input()`, then do nothing successfully. +func (s *InputStream) Close() error { + if s == nil || s.closer == nil { return nil } return s.closer.Close() } +// OutputStream represents `stdout` for a stage, which might or might +// not need to be closed when the stage is done with it. It usually +// holds an `io.Writer`, which can be retrieved using `Writer()`. Its +// `Close()` method closes the writer if necessary (i.e., if the +// `OutputStream` was constructed using `ClosingOutput()`. +// +// A nil `*OutputStream` is a valid value. Its `Writer()` method +// returns `nil` and `Close()` does nothing successfully. type OutputStream struct { writer io.Writer closer io.Closer } // The stage may write to w but must not close it. -func Output(w io.Writer) OutputStream { - return OutputStream{writer: w} +func Output(w io.Writer) *OutputStream { + return &OutputStream{writer: w} } // The stage is responsible for closing w. -func ClosingOutput(w io.WriteCloser) OutputStream { - return OutputStream{writer: w, closer: w} +func ClosingOutput(w io.WriteCloser) *OutputStream { + return &OutputStream{writer: w, closer: w} } -func (s OutputStream) Writer() io.Writer { +func (s *OutputStream) Writer() io.Writer { + if s == nil { + return nil + } return s.writer } -func (s OutputStream) Close() error { - if s.closer == nil { +// Close closes the underlying writer if necessary. If `s` was +// constructed using `ClosingOutput()`, then close the +// `io.WriteCloser` that was passed to that function. If `s` is `nil` +// or was constructed using `Output()`, then do nothing successfully. +func (s *OutputStream) Close() error { + if s == nil || s.closer == nil { return nil } return s.closer.Close() From 29edba336aab7f29805f36c07efc88c9fe46e0bc Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:18:00 +0200 Subject: [PATCH 16/26] InputStream, OutputStream: make `Close()` idempotent Ignore all but the first call to `Close()`. --- pipe/close_responsibility_test.go | 48 ++++++++++++++++++++------- pipe/streams.go | 55 +++++++++++++++++++++++++++---- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/pipe/close_responsibility_test.go b/pipe/close_responsibility_test.go index 8765d90..20a5a28 100644 --- a/pipe/close_responsibility_test.go +++ b/pipe/close_responsibility_test.go @@ -15,22 +15,22 @@ import ( // readCloseSpy records whether Close was called. type readCloseSpy struct { io.Reader - closed atomic.Bool + closeCount atomic.Uint32 } func (r *readCloseSpy) Close() error { - r.closed.Store(true) + r.closeCount.Add(1) return nil } // writeCloseSpy records whether Close was called. type writeCloseSpy struct { io.Writer - closed atomic.Bool + closeCount atomic.Uint32 } func (w *writeCloseSpy) Close() error { - w.closed.Store(true) + w.closeCount.Add(1) return nil } @@ -63,8 +63,16 @@ func TestGoStageHonorsStreamOwnership(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closing stdin=%v", !tc.leaveIn) - assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closing stdout=%v", !tc.leaveOut) + if tc.leaveIn { + assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn) + } else { + assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn) + } + if tc.leaveOut { + assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut) + } else { + assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut) + } }) } } @@ -74,25 +82,33 @@ func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) { borrowedInput := Input(borrowedReader) assert.Same(t, borrowedReader, borrowedInput.Reader()) assert.NoError(t, borrowedInput.Close()) - assert.False(t, borrowedReader.closed.Load()) + assert.EqualValues(t, 0, borrowedReader.closeCount.Load()) + assert.NoError(t, borrowedInput.Close()) + assert.EqualValues(t, 0, borrowedReader.closeCount.Load()) ownedReader := &readCloseSpy{Reader: strings.NewReader("owned")} ownedInput := ClosingInput(ownedReader) assert.Same(t, ownedReader, ownedInput.Reader()) assert.NoError(t, ownedInput.Close()) - assert.True(t, ownedReader.closed.Load()) + assert.EqualValues(t, 1, ownedReader.closeCount.Load()) + assert.NoError(t, ownedInput.Close()) + assert.EqualValues(t, 1, ownedReader.closeCount.Load()) borrowedWriter := &writeCloseSpy{Writer: &strings.Builder{}} borrowedOutput := Output(borrowedWriter) assert.Same(t, borrowedWriter, borrowedOutput.Writer()) assert.NoError(t, borrowedOutput.Close()) - assert.False(t, borrowedWriter.closed.Load()) + assert.EqualValues(t, 0, borrowedWriter.closeCount.Load()) + assert.NoError(t, borrowedOutput.Close()) + assert.EqualValues(t, 0, borrowedWriter.closeCount.Load()) ownedWriter := &writeCloseSpy{Writer: &writeCloseSpy{Writer: io.Discard}} ownedOutput := ClosingOutput(ownedWriter) assert.Same(t, ownedWriter, ownedOutput.Writer()) assert.NoError(t, ownedOutput.Close()) - assert.True(t, ownedWriter.closed.Load()) + assert.EqualValues(t, 1, ownedWriter.closeCount.Load()) + assert.NoError(t, ownedOutput.Close()) + assert.EqualValues(t, 1, ownedWriter.closeCount.Load()) } // TestCommandStageHonorsCloseStdin verifies that a command stage closes a @@ -117,7 +133,11 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, in.closed.Load(), "closing stdin=%v", !leave) + if leave { + assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !leave) + } else { + assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !leave) + } }) } } @@ -144,7 +164,11 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) { )) require.NoError(t, s.Wait()) - assert.Equal(t, !leave, out.closed.Load(), "closing stdout=%v", !leave) + if leave { + assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !leave) + } else { + assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !leave) + } }) } } diff --git a/pipe/streams.go b/pipe/streams.go index 790f58e..ef123dd 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -1,18 +1,32 @@ package pipe -import "io" +import ( + "io" + "sync" +) // InputStream represents `stdin` for a stage, which might or might // not need to be closed when the stage is done with it. It usually // holds an `io.Reader`, which can be retrieved using `Reader()`. Its // `Close()` method closes the reader if necessary (i.e., if the -// `InputStream` was constructed using `ClosingInput()`. +// `InputStream` was constructed using `ClosingInput()`. The +// `Close()` method is idempotent. // // A nil `*InputStream` is a valid value. Its `Reader()` method // returns `nil` and `Close()` does nothing successfully. type InputStream struct { reader io.Reader + + // once is used to ensure that `Close()` is only called once. + once sync.Once + + // closer is set to `nil` after the first call to `Close()`. closer io.Closer + + // closeErr is set to the error returned by the first call to + // `Close()`, and returned from that and any subsequent calls to + // `Close()`. + closeErr error } // The stage may read from r but must not close it. @@ -37,23 +51,42 @@ func (s *InputStream) Reader() io.Reader { // that was passed to that function. If `s` is `nil` or was // constructed using `Input()`, then do nothing successfully. func (s *InputStream) Close() error { - if s == nil || s.closer == nil { + if s == nil { return nil } - return s.closer.Close() + + s.once.Do(func() { + if s.closer != nil { + s.closeErr = s.closer.Close() + s.closer = nil + } + }) + + return s.closeErr } // OutputStream represents `stdout` for a stage, which might or might // not need to be closed when the stage is done with it. It usually // holds an `io.Writer`, which can be retrieved using `Writer()`. Its // `Close()` method closes the writer if necessary (i.e., if the -// `OutputStream` was constructed using `ClosingOutput()`. +// `OutputStream` was constructed using `ClosingOutput()`. The +// `Close()` method is idempotent. // // A nil `*OutputStream` is a valid value. Its `Writer()` method // returns `nil` and `Close()` does nothing successfully. type OutputStream struct { writer io.Writer + + // once is used to ensure that `Close()` is only called once. + once sync.Once + + // closer is set to `nil` after the first call to `Close()`. closer io.Closer + + // closeErr is set to the error returned by the first call to + // `Close()`, and returned from that and any subsequent calls to + // `Close()`. + closeErr error } // The stage may write to w but must not close it. @@ -78,8 +111,16 @@ func (s *OutputStream) Writer() io.Writer { // `io.WriteCloser` that was passed to that function. If `s` is `nil` // or was constructed using `Output()`, then do nothing successfully. func (s *OutputStream) Close() error { - if s == nil || s.closer == nil { + if s == nil { return nil } - return s.closer.Close() + + s.once.Do(func() { + if s.closer != nil { + s.closeErr = s.closer.Close() + s.closer = nil + } + }) + + return s.closeErr } From 84662746af293b073c65d9aafc311c8fd1cb4dbc Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 17:19:02 +0200 Subject: [PATCH 17/26] InputStream, OutputStream: improve docstrings In the type comments, explain why these types don't implement `io.Reader` and `io.Writer`. Otherwise, some helpful person is sure to come along and add `Read()` and `Write()` methods, to the detriment of performance and even changing some semantics. --- pipe/streams.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pipe/streams.go b/pipe/streams.go index ef123dd..8ddd578 100644 --- a/pipe/streams.go +++ b/pipe/streams.go @@ -14,6 +14,14 @@ import ( // // A nil `*InputStream` is a valid value. Its `Reader()` method // returns `nil` and `Close()` does nothing successfully. +// +// It might seem like `InputStream` should implement `io.Reader` +// itself. But we want to avoid hiding the dynamic type of the +// `io.Reader` that is being used as the stdin of a pipeline. That +// object might be of a type that is subject to optimizations that +// aren't available for a generic `io.Reader`. For example, it might +// be an `*os.File` (which can be passed directly to subcommands or to +// `splice(2)`), or it might implement `io.WriterTo`. type InputStream struct { reader io.Reader @@ -74,6 +82,14 @@ func (s *InputStream) Close() error { // // A nil `*OutputStream` is a valid value. Its `Writer()` method // returns `nil` and `Close()` does nothing successfully. +// +// It might seem like `OutputStream` should implement `io.Writer` +// itself. But we want to avoid hiding the dynamic type of the +// `io.Writer` that is being used as the stdout of a pipeline. That +// object might be of a type that is subject to optimizations that +// aren't available for a generic `io.Writer`. For example, it might +// be an `*os.File` (which can be passed directly to subcommands or to +// `splice(2)`), or it might implement `io.ReaderFrom`. type OutputStream struct { writer io.Writer From b30a165ea42e1f83b908b8044f0daacbe71b12cd Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 12:58:58 +0200 Subject: [PATCH 18/26] Pipeline: use `InputStream` and `OutputStream` Change the types of some `Pipeline` fields: * `stdin` to `InputStream` * `stdout` to `OutputStream` That way we don't have to manage their closers separately. --- pipe/pipeline.go | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index bbae72b..085fdd3 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -55,12 +55,10 @@ type ContextValuesFunc func(context.Context) []EnvVar type Pipeline struct { env Env - stdin io.Reader - stdinCloser io.Closer - stdout io.Writer - stdoutCloser io.Closer - stages []Stage - cancel func() + stdin *InputStream + stdout *OutputStream + stages []Stage + cancel func() // Atomically written and read value, nonzero if the pipeline has // been started. This is only used for lifecycle sanity checks but @@ -104,8 +102,7 @@ func WithDir(dir string) Option { // even if `Start()` returns an error. func WithStdin(stdin io.Reader) Option { return func(p *Pipeline) { - p.stdin = stdin - p.stdinCloser = nil + p.stdin = Input(stdin) } } @@ -114,8 +111,7 @@ func WithStdin(stdin io.Reader) Option { // even if `Start()` returns an error. func WithStdout(stdout io.Writer) Option { return func(p *Pipeline) { - p.stdout = stdout - p.stdoutCloser = nil + p.stdout = Output(stdout) } } @@ -125,8 +121,7 @@ func WithStdout(stdout io.Writer) Option { // an error. func WithStdoutCloser(stdout io.WriteCloser) Option { return func(p *Pipeline) { - p.stdout = stdout - p.stdoutCloser = stdout + p.stdout = ClosingOutput(stdout) } } @@ -255,9 +250,7 @@ func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutCo } func (p *Pipeline) abortBeforeStart(s Stage, err error) error { - if p.stdoutCloser != nil { - _ = p.stdoutCloser.Close() - } + _ = p.stdout.Close() p.cancel() p.eventHandler(&Event{ Command: s.Name(), @@ -335,16 +328,13 @@ func (p *Pipeline) Start(ctx context.Context) error { if p.stdin != nil { // Arrange for the input of the 0th stage to come from // `p.stdin`: - stageStarters[0].stdin = Input(p.stdin) + stageStarters[0].stdin = p.stdin } if p.stdout != nil { // Arrange for the output of the last stage to go to // `p.stdout`: - stageStarters[len(p.stages)-1].stdout = &OutputStream{ - writer: p.stdout, - closer: p.stdoutCloser, - } + stageStarters[len(p.stages)-1].stdout = p.stdout } // Clean up any processes and pipes that have been created. `i` is the @@ -360,8 +350,8 @@ func (p *Pipeline) Start(ctx context.Context) error { // If stdout was supplied with WithStdoutCloser but the final stage // was never started, then the pipeline still owns that closer. - if i < len(p.stages)-1 && p.stdoutCloser != nil { - _ = p.stdoutCloser.Close() + if i < len(p.stages)-1 { + _ = p.stdout.Close() } // Kill and wait for any stages that have been started @@ -433,8 +423,7 @@ func (p *Pipeline) Start(ctx context.Context) error { func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { var buf bytes.Buffer - p.stdout = &buf - p.stdoutCloser = nil + p.stdout = Output(&buf) err := p.Run(ctx) return buf.Bytes(), err } From 07cbbcf12b895387eb3df8bfb52a58d7ee1ce51b Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 16:16:10 +0200 Subject: [PATCH 19/26] Stage: improve docstring --- pipe/stage.go | 114 ++++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/pipe/stage.go b/pipe/stage.go index 0a22079..9a35254 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -7,34 +7,59 @@ import ( // Stage is an element of a `Pipeline`. It reads from standard input // and writes to standard output. // -// Who closes stdin and stdout? +// # Who closes stdin and stdout? // // A `Stage` as a whole is responsible for closing its end of stdin // and stdout if the corresponding stream is closing. That -// responsibility transfers to the stage as soon as `Start()` is called -// and applies even if `Start()` returns an error. Before returning an -// error from `Start()`, the stage must close any closing stream that -// it has not already handed off to something else that will close it -// promptly. The caller must not close a closing stream after passing it -// to `Start()`. +// responsibility transfers to the stage as soon as the stage's +// `Start()` method is called and applies even if `Start()` returns an +// error. Before returning an error from `Start()`, the stage must +// close any closing stream that it has not already handed off to +// something else that will close it promptly. The caller must not +// close a closing stream after passing it to `Start()`. // -// If the caller wants to retain ownership of stdin/stdout, it passes a -// non-closing stream. The stage must not close a non-closing stream, -// even if `Start()` returns an error. +// If the caller wants to retain ownership of stdin/stdout, it passes +// a non-closing stream. Calling `Close()` on a non-closing stream is +// a NOP, so it isn't harmful but isn't required. // -// Closing stdin/stdout tells the previous/next stage that this stage is -// done reading/writing data, which can affect their behavior. Therefore, -// after a successful start, a stage should close each one as soon as it -// is done with it. +// Closing stdin/stdout tells the previous/next stage that this stage +// is done reading/writing data, which can affect their behavior. +// Therefore, after a successful start, a stage should close each one +// as soon as it is done with it. Assume that this is done via the +// following local function // -// How this should be done depends on whether stdin/stdout are of type -// `*os.File`. +// closeStreams := func() { +// // Error handling omitted. +// _ = stdin.Close() +// _ = stdout.Close() +// } +// +// From the point of view of the pipeline as a whole, if stdin is +// provided by the user (`WithStdin()`), then we don't want the first +// stage to close it at all, whether it's an `*os.File` or not. The +// pipeline communicates this by passing a non-closing `InputStream` +// when it starts that stage. For stdout, it depends on whether the +// user supplied it using `WithStdout()` or `WithStdoutCloser()`. In +// any case, this function can close the streams anyway, because +// `InputStream.Close()` and `OutputStream.Close()` do nothing in +// those cases. +// +// When these closes should happen depends on what kind of stage it is +// and whether stdin/stdout are of type `*os.File`. +// +// ## A command stage // // If a stage is an external command, then the subprocess ultimately // needs its own copies of `*os.File` file descriptors for its stdin // and stdout. The external command will "always" [1] close those when // it exits. // +// (It's theoretically possible for a command to pass the open file +// descriptor to another, longer-lived process, in which case the file +// descriptor wouldn't necessarily get closed when the command +// finishes. But that's ill-behaved in a command that is being used in +// a pipeline, so we'll ignore that possibility.) +// // If the stage is an external command and one of the arguments is an // `*os.File`, then it can set the corresponding field of `exec.Cmd` // to that argument directly. This has the result that `exec.Cmd` @@ -42,54 +67,45 @@ import ( // subprocess. Therefore, the stage must close its copy of that // argument as soon as the external command has started, because the // external command will keep its own copy open as long as necessary -// (and no longer!). It should use roughly the following sequence: +// (and no longer!). Therefore, it should use roughly the following +// sequence: // -// cmd.Stdin = stdin.Reader() // Similarly for stdout -// cmd.Start(…) -// stdin.Close() // Close our copy -// cmd.Wait() +// cmd.Stdin = stdin.Reader() +// cmd.Stdout = stdout.Writer() +// err := cmd.Start(…) +// // Close our copies as soon as the command has started: +// closeStreams() +// if err != nil { +// return err +// } +// return cmd.Wait() // // If the stage is an external command and its stdin is not an // `*os.File`, then `exec.Cmd` will take care of creating an // `os.Pipe()`, copying from the provided reader into the pipe, and // eventually closing both ends of the pipe. The stage must close the -// provided stdin itself, but only _after_ the external command has -// finished, like so: +// provided stdin itself, but only _after_ the external command and +// the copy have finished, like so: // -// cmd.Stdin = stdin.Reader() // Similarly for stdout -// cmd.Start(…) -// cmd.Wait() -// stdin.Close() // Close +// defer closeStreams() +// cmd.Stdin = stdin.Reader() +// cmd.Stdout = stdout.Writer() +// err := cmd.Start(…) +// if err != nil { +// return err +// } +// return cmd.Wait() // -// If the stage is an external command and its stdout is not an -// `*os.File`, the stage creates a pipe, passes the write end to the -// command, and copies from the read end to the provided writer. The -// stage must close the provided stdout itself, but only _after_ the -// external command and the copy have finished. +// ## A function stage // // If the stage is a Go function, then it holds the only copy of // stdin/stdout, so it must wait until the function is done before // closing them (regardless of their underlying type, like so: // // go func() { -// f(…, stdin, stdout) -// stdin.Close() -// stdout.Close() +// defer closeStreams() +// f(…, stdin.Reader(), stdout.Writer()) // }() -// -// From the point of view of the pipeline as a whole, if stdin is -// provided by the user (`WithStdin()`), then we don't want the first -// stage to close it at all, whether it's an `*os.File` or not. The -// pipeline communicates this by passing a non-closing `InputStream` -// when it starts that stage. For stdout, it depends on whether the -// user supplied it using `WithStdout()` or `WithStdoutCloser()`. -// -// [1] It's theoretically possible for a command to pass the open file -// descriptor to another, longer-lived process, in which case the -// file descriptor wouldn't necessarily get closed when the -// command finishes. But that's ill-behaved in a command that is -// being used in a pipeline, so we'll ignore that possibility. - type Stage interface { // Name returns the name of the stage. Name() string From e772ff004f297411af48ca0a97f843cec121d461 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 07:52:22 +0200 Subject: [PATCH 20/26] StreamRequirement: move to separate file --- pipe/pipeline.go | 9 --------- pipe/stage.go | 10 ---------- pipe/stream_requirement.go | 24 ++++++++++++++++++++++++ 3 files changed, 24 insertions(+), 19 deletions(-) create mode 100644 pipe/stream_requirement.go diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 085fdd3..7f6814e 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -224,15 +224,6 @@ type stageStarter struct { stdout *OutputStream } -func (requirement StreamRequirement) validate() error { - switch requirement { - case StreamOptional, StreamForbidden: - return nil - default: - return fmt.Errorf("invalid stream requirement %d", requirement) - } -} - func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutConnected bool) error { if err := requirements.Stdin.validate(); err != nil { return fmt.Errorf("stdin: %w", err) diff --git a/pipe/stage.go b/pipe/stage.go index 9a35254..9ffce1c 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -155,16 +155,6 @@ type StageOptions struct { // StagePanicHandler is a function that handles panics in a pipeline's stages. type StagePanicHandler func(p any) error -type StreamRequirement int - -const ( - // StreamOptional means the stream may be connected or nil. - StreamOptional StreamRequirement = iota - - // StreamForbidden means the stream must be nil. - StreamForbidden -) - // StageRequirements describes what a Stage needs from the streams connected to // its stdin and stdout. The zero value is correct for stages that are happy // with arbitrary io.Reader/io.Writer streams, such as Function stages. diff --git a/pipe/stream_requirement.go b/pipe/stream_requirement.go new file mode 100644 index 0000000..16e14a0 --- /dev/null +++ b/pipe/stream_requirement.go @@ -0,0 +1,24 @@ +package pipe + +import "fmt" + +type StreamRequirement int + +const ( + // StreamOptional means the stream may be connected or nil. + StreamOptional StreamRequirement = iota + + // StreamForbidden means the stream must be nil. + StreamForbidden +) + +// validate checks that `req` has a valid value and returns an error +// otherwise. +func (req StreamRequirement) validate() error { + switch req { + case StreamOptional, StreamForbidden: + return nil + default: + return fmt.Errorf("invalid stream requirement %d", req) + } +} From fbe470e67ebd0d60430bd231b106b18c29427d67 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 11:09:04 +0200 Subject: [PATCH 21/26] StreamRequirement: encode also whether the stream has to be nil The two aspects of a stage's stream requirements are coupled. For example, of the four possible combinations of `(StreamRequirement, NeedsFile)`, one of them, `(StreamForbidden, true)`, makes no sense. So instead of encoding these two aspects separately, encode the three meaningful combinations into a single `StreamRequirement` type with possible values `StreamAcceptAny`, `StreamPreferFile`, and `StreamForbidden`. --- pipe/command.go | 4 +-- pipe/pipe_matching_test.go | 64 +++++++++++++++++++++++++------------- pipe/pipeline.go | 3 +- pipe/stage.go | 12 +++---- pipe/stream_requirement.go | 28 ++++++++++++----- 5 files changed, 71 insertions(+), 40 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 908516b..eb2a847 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -81,8 +81,8 @@ func (s *commandStage) Process() *os.Process { func (s *commandStage) Requirements() StageRequirements { return StageRequirements{ - StdinNeedsFile: true, - StdoutNeedsFile: true, + Stdin: StreamPreferFile, + Stdout: StreamPreferFile, } } diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index badb990..89c5ae7 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -49,14 +49,10 @@ func writeCloser() io.WriteCloser { } func newPipeSniffingStage( - stdinNeedsFile bool, stdinExpectation ioExpectation, - stdoutNeedsFile bool, stdoutExpectation ioExpectation, + req pipe.StageRequirements, stdinExpectation, stdoutExpectation ioExpectation, ) *pipeSniffingStage { return &pipeSniffingStage{ - requirements: pipe.StageRequirements{ - StdinNeedsFile: stdinNeedsFile, - StdoutNeedsFile: stdoutNeedsFile, - }, + requirements: req, expect: pipeExpectations{ stdin: stdinExpectation, stdout: stdoutExpectation, @@ -68,8 +64,11 @@ func newPipeSniffingFunc( stdinExpectation, stdoutExpectation ioExpectation, ) *pipeSniffingStage { return newPipeSniffingStage( - false, stdinExpectation, - false, stdoutExpectation, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + stdinExpectation, stdoutExpectation, ) } @@ -77,8 +76,11 @@ func newPipeSniffingCmd( stdinExpectation, stdoutExpectation ioExpectation, ) *pipeSniffingStage { return newPipeSniffingStage( - true, stdinExpectation, - true, stdoutExpectation, + pipe.StageRequirements{ + Stdin: pipe.StreamPreferFile, + Stdout: pipe.StreamPreferFile, + }, + stdinExpectation, stdoutExpectation, ) } @@ -325,16 +327,25 @@ func TestPipeTypes(t *testing.T) { opts: []pipe.Option{}, stages: []pipe.Stage{ newPipeSniffingStage( - false, expectNil, - false, expectOther, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectNil, expectOther, ), newPipeSniffingStage( - false, expectOther, - true, expectFile, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamPreferFile, + }, + expectOther, expectFile, ), newPipeSniffingStage( - false, expectFile, - false, expectNil, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectFile, expectNil, ), }, }, @@ -343,16 +354,25 @@ func TestPipeTypes(t *testing.T) { opts: []pipe.Option{}, stages: []pipe.Stage{ newPipeSniffingStage( - false, expectNil, - false, expectFile, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectNil, expectFile, ), newPipeSniffingStage( - true, expectFile, - false, expectOther, + pipe.StageRequirements{ + Stdin: pipe.StreamPreferFile, + Stdout: pipe.StreamAcceptAny, + }, + expectFile, expectOther, ), newPipeSniffingStage( - false, expectOther, - false, expectNil, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectOther, expectNil, ), }, }, diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 7f6814e..4fb4f60 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -371,7 +371,8 @@ func (p *Pipeline) Start(ctx context.Context) error { // We need to generate a pipe pair for this stage to use // to communicate with its successor: - if ss.requirements.StdoutNeedsFile || nextSS.requirements.StdinNeedsFile { + if ss.requirements.Stdout == StreamPreferFile || + nextSS.requirements.Stdin == StreamPreferFile { // Use an OS-level pipe for the communication: nextStdin, stdout, err := os.Pipe() if err != nil { diff --git a/pipe/stage.go b/pipe/stage.go index 9ffce1c..0a086ba 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -155,15 +155,11 @@ type StageOptions struct { // StagePanicHandler is a function that handles panics in a pipeline's stages. type StagePanicHandler func(p any) error -// StageRequirements describes what a Stage needs from the streams connected to -// its stdin and stdout. The zero value is correct for stages that are happy -// with arbitrary io.Reader/io.Writer streams, such as Function stages. +// StageRequirements describes what a Stage needs from the streams +// connected to its stdin and stdout. The zero value is correct for +// stages that are happy with arbitrary io.Reader/io.Writer streams, +// such as Function stages. type StageRequirements struct { Stdin StreamRequirement Stdout StreamRequirement - - // {Stdin,Stdout}NeedsFile indicate that, if stdio is connected, the - // stage requires it to be backed by an *os.File (a real file descriptor) - StdinNeedsFile bool - StdoutNeedsFile bool } diff --git a/pipe/stream_requirement.go b/pipe/stream_requirement.go index 16e14a0..ff2fd47 100644 --- a/pipe/stream_requirement.go +++ b/pipe/stream_requirement.go @@ -2,23 +2,37 @@ package pipe import "fmt" +// StreamRequirement describes a `Stage`'s requirement for its stdin +// or stdout, namely whether it can be anything, whether it should +// preferably be an `*os.File`, or whether it must be `nil`. The zero +// value `StreamAcceptAny` is a valid value that indicates that the +// stage has no particular requirements or preferences for its +// stdin/stdout, such as a typical `Function` stage. type StreamRequirement int const ( - // StreamOptional means the stream may be connected or nil. - StreamOptional StreamRequirement = iota + // StreamAcceptAny indicates that the stage hasn't declared what + // kind of stream it requires, maybe even `nil`. + StreamAcceptAny StreamRequirement = iota - // StreamForbidden means the stream must be nil. + // StreamPreferFile indicates that the stage prefers the + // corresponding stream to be backed by an `*os.File` (a real file + // descriptor), but it can work with any io.Reader/io.Writer. + StreamPreferFile + + // StreamForbidden indicates that the stage requires the + // corresponding stream to be nil. It won't read/write the stream + // or close it. StreamForbidden ) // validate checks that `req` has a valid value and returns an error // otherwise. -func (req StreamRequirement) validate() error { - switch req { - case StreamOptional, StreamForbidden: +func (requirement StreamRequirement) validate() error { + switch requirement { + case StreamAcceptAny, StreamPreferFile, StreamForbidden: return nil default: - return fmt.Errorf("invalid stream requirement %d", req) + return fmt.Errorf("invalid stream requirement %d", requirement) } } From cb3bf1d04c4a5f0981995f7e4c8c133db65582e0 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 11:22:52 +0200 Subject: [PATCH 22/26] StreamRequirement.Validate(): make method public --- pipe/pipeline.go | 4 ++-- pipe/stream_requirement.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 4fb4f60..4a57382 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -225,10 +225,10 @@ type stageStarter struct { } func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutConnected bool) error { - if err := requirements.Stdin.validate(); err != nil { + if err := requirements.Stdin.Validate(); err != nil { return fmt.Errorf("stdin: %w", err) } - if err := requirements.Stdout.validate(); err != nil { + if err := requirements.Stdout.Validate(); err != nil { return fmt.Errorf("stdout: %w", err) } if requirements.Stdin == StreamForbidden && stdinConnected { diff --git a/pipe/stream_requirement.go b/pipe/stream_requirement.go index ff2fd47..ddff829 100644 --- a/pipe/stream_requirement.go +++ b/pipe/stream_requirement.go @@ -26,9 +26,9 @@ const ( StreamForbidden ) -// validate checks that `req` has a valid value and returns an error +// Validate checks that `req` has a valid value and returns an error // otherwise. -func (requirement StreamRequirement) validate() error { +func (requirement StreamRequirement) Validate() error { switch requirement { case StreamAcceptAny, StreamPreferFile, StreamForbidden: return nil From 8e22ddcb03d57b079a2324e47e96d0c283a5db8f Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 12 Jun 2026 16:06:58 +0200 Subject: [PATCH 23/26] stageJoiner: new helper type for creating pipes between stages The old code used a `stageStarter` type to help with starting up stages. There are two awkward things about that approach: * The pipe that is needed to join stages depends on the two adjacent stages, not on the stdin and stdout of a single stage. So `stageStarter` wasn't able to figure out what pipe was needed; that logic kindof needed to live in the `Pipeline` type. * Since the read and write ends of a pipe are created together, the `stageStarter` could also not be as helpful in closing those streams if there was an error. So instead, use a new helper type, `stageJoiner`, to figure out how to join two adjacent stages together. The `stageJoiner` can now do part of the work for us: * `stageJoiner.validate()` checks that the adjacent stages' requirements are met WRT whether they need stdin/stdout to be supplied at all. * `stageJoiner.createPipe()` figures out what kind of pipe to create for the two adjacent stages. * `stageJoiner.closePipe()` closes the pipe (if it has already been created and needs closing) on errors. This isn't a panacea; for example, some loops have to iterate over stages, and some over stage joiners. But I think that it's less confusing this way. --- pipe/pipeline.go | 177 +++++++++++++++---------------------------- pipe/stage_joiner.go | 126 ++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 114 deletions(-) create mode 100644 pipe/stage_joiner.go diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 4a57382..d237461 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "os" "sync/atomic" ) @@ -218,41 +217,6 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { } } -type stageStarter struct { - requirements StageRequirements - stdin *InputStream - stdout *OutputStream -} - -func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutConnected bool) error { - if err := requirements.Stdin.Validate(); err != nil { - return fmt.Errorf("stdin: %w", err) - } - if err := requirements.Stdout.Validate(); err != nil { - return fmt.Errorf("stdout: %w", err) - } - if requirements.Stdin == StreamForbidden && stdinConnected { - return fmt.Errorf("stage %q forbids stdin, but stdin is connected", s.Name()) - } - if requirements.Stdout == StreamForbidden && stdoutConnected { - return fmt.Errorf("stage %q forbids stdout, but stdout is connected", s.Name()) - } - return nil -} - -func (p *Pipeline) abortBeforeStart(s Stage, err error) error { - _ = p.stdout.Close() - p.cancel() - p.eventHandler(&Event{ - Command: s.Name(), - Msg: "failed to start pipeline stage", - Err: err, - }) - return fmt.Errorf( - "starting pipeline stage %q: %w", s.Name(), err, - ) -} - func (p *Pipeline) stageOptions() StageOptions { return StageOptions{Env: p.env, PanicHandler: p.panicHandler} } @@ -300,50 +264,67 @@ func (p *Pipeline) Start(ctx context.Context) error { // We need to decide how to start the stages, especially what // pipes to use to connect adjacent stages (`os.Pipe()` vs. // `io.Pipe()`) based on the two stages' requirements. - stageStarters := make([]stageStarter, len(p.stages)) + stageJoiners := make([]stageJoiner, len(p.stages)+1) + + // Arrange for the input of the 0th stage to come from `p.stdin`: + stageJoiners[0].nextStdin = p.stdin + + // Arrange for the output of the last stage to go to `p.stdout`: + stageJoiners[len(p.stages)].prevStdout = p.stdout + + // closePipes closes all of the streams that are currently stored + // in the joiners. This should be called if startup fails. As we + // call `Stage.Start()` and pass that method streams, we clear + // them from the corresponding joiners to avoid closing them + // twice. + closePipes := func() { + for _, sj := range stageJoiners { + _ = sj.closePipe() + } + } - // Collect information about each stage's type and requirements: + // Store the stages in the joiners, and verify that the stages' + // requirements are well-formed: for i, s := range p.stages { - stageStarters[i].requirements = s.Requirements() + stageJoiners[i].nextStage = s + stageJoiners[i+1].prevStage = s - err := stageStarters[i].requirements.validate( - s, - i > 0 || p.stdin != nil, - i < len(p.stages)-1 || p.stdout != nil, - ) - if err != nil { - return p.abortBeforeStart(s, err) + // Make sure that the stage's requirements are well-formed: + requirements := s.Requirements() + if err := requirements.Stdin.Validate(); err != nil { + return fmt.Errorf("stdin: %w", err) + } + if err := requirements.Stdout.Validate(); err != nil { + return fmt.Errorf("stdout: %w", err) } } - if p.stdin != nil { - // Arrange for the input of the 0th stage to come from - // `p.stdin`: - stageStarters[0].stdin = p.stdin - } - - if p.stdout != nil { - // Arrange for the output of the last stage to go to - // `p.stdout`: - stageStarters[len(p.stages)-1].stdout = p.stdout + // Create the "inner" pipes (i.e, all but the first and last + // `stageJoiners`): + for i := 1; i < len(stageJoiners)-1; i++ { + if err := stageJoiners[i].createPipe(); err != nil { + closePipes() + return err + } } - // Clean up any processes and pipes that have been created. `i` is the - // index of the stage that failed to start. If the stage already received - // its streams, it owns any closing stream. - abort := func(i int, err error, closeFailedStageStdin bool) error { - // If the failing stage never received its stdin, close the pipe that - // the previous stage was writing to. That should cause it to exit - // even if it's not minding its context. - if closeFailedStageStdin { - _ = stageStarters[i].stdin.Close() + // Check that each of the stages' requirements are compatible with + // the pipes that we have created for them: + for i := range stageJoiners { + if err := stageJoiners[i].validate(); err != nil { + closePipes() + return err } + } - // If stdout was supplied with WithStdoutCloser but the final stage - // was never started, then the pipeline still owns that closer. - if i < len(p.stages)-1 { - _ = p.stdout.Close() - } + // We're about to start up the stages, one by one. If something + // goes wrong during that process, this function should be called + // to kill any stages that have already been started and to close + // any pipes that have not yet been passed to a stage. `i` is the + // index of the stage that failed to start. If the stage already + // received its streams, it is responsible for closing them. + abort := func(i int, err error) error { + closePipes() // Kill and wait for any stages that have been started // already to finish: @@ -361,52 +342,20 @@ func (p *Pipeline) Start(ctx context.Context) error { ) } - // Loop over all but the last stage, starting them. By the time we - // get to a stage, its stdin will have already been determined, - // but we still need to figure out its stdout and set the stdin - // that will be used for the subsequent stage. - for i, s := range p.stages[:len(p.stages)-1] { - ss := &stageStarters[i] - nextSS := &stageStarters[i+1] - - // We need to generate a pipe pair for this stage to use - // to communicate with its successor: - if ss.requirements.Stdout == StreamPreferFile || - nextSS.requirements.Stdin == StreamPreferFile { - // Use an OS-level pipe for the communication: - nextStdin, stdout, err := os.Pipe() - if err != nil { - return abort(i, err, true) - } - nextSS.stdin = ClosingInput(nextStdin) - ss.stdout = ClosingOutput(stdout) - } else { - nextStdin, stdout := io.Pipe() - nextSS.stdin = ClosingInput(nextStdin) - ss.stdout = ClosingOutput(stdout) - } - if err := s.Start( - ctx, p.stageOptions(), - ss.stdin, ss.stdout, - ); err != nil { - _ = nextSS.stdin.Close() - return abort(i, err, false) - } - } + // Loop over all of the stages, starting them in order. + for i, s := range p.stages { + prevSJ := &stageJoiners[i] + nextSJ := &stageJoiners[i+1] - // The last stage needs special handling, because its stdout - // doesn't need to flow into another stage (it's already set in - // `ss.stdout` if it's needed). - { - i := len(p.stages) - 1 - s := p.stages[i] - ss := &stageStarters[i] + err := s.Start(ctx, p.stageOptions(), prevSJ.nextStdin, nextSJ.prevStdout) + + // Even if that stage failed to start, we are no longer + // responsible for closing its streams: + prevSJ.nextStdin = nil + nextSJ.prevStdout = nil - if err := s.Start( - ctx, p.stageOptions(), - ss.stdin, ss.stdout, - ); err != nil { - return abort(i, err, false) + if err != nil { + return abort(i, err) } } diff --git a/pipe/stage_joiner.go b/pipe/stage_joiner.go new file mode 100644 index 0000000..24fb789 --- /dev/null +++ b/pipe/stage_joiner.go @@ -0,0 +1,126 @@ +package pipe + +import ( + "errors" + "fmt" + "io" + "os" +) + +// stageJoiner is a helper type that helps join two adjacent stages +// together. stageJoiners[i] tells how to connect stage `i-1` to stage +// `i`. From the point of view of stages, `stageJoiners[i].nextStdin` +// and `stageJoiners[i+1].prevStdout` are the input and output +// streams, respectively, of `stage[i]`. The first and last elements +// of `stageJoiners` manage `p.stdin` and `p.stdout`, respectively. +// Schematically, the data flows through like this: +// +// p.stdin == stageJoiners[0].nextStdin → +// stage[0] → +// stageJoiners[1].prevStdout → stageJoiners[1].nextStdin → +// stage[1] → +// stageJoiners[2].prevStdout → stageJoiners[2].nextStdin → +// stage[2] → +// ... → +// stageJoiners[i].prevStdout → stageJoiners[i].nextStdin → +// stage[i] → +// stageJoiners[i+1].prevStdout → stageJoiners[i+1].nextStdin → +// ... → +// stageJoiners[len(stages)-1].prevStdout → stageJoiners[len(stages)-1].nextStdin → +// stage[len(stages)-1] → +// stageJoiners[len(stages)].prevStdout == p.stdout +// +// In pseudo-Shell notation, the stages are run like this: +// +// stage[0] stageJoiners[1].prevStdout +// stage[1] stageJoiners[2].prevStdout +// stage[2] stageJoiners[3].prevStdout +// ... +// stage[i] stageJoiners[i].prevStdout +// ... +// stage[len(stages)-1] p.stdout +type stageJoiner struct { + // prevStage holds the stage that needs to write to the pipe. + prevStage Stage + + // prevStdout will be used as the stdout of `prevStage`. It is + // usually the "write" end of the `(nextStdin, prevStdout)` pipe + // pair, with the connected pipe ends in the same `stageJoiner` + // instance. + prevStdout *OutputStream + + // nextStage holds the stage that needs to read from the pipe. + nextStage Stage + + // nextStdin will be used as the stdin of `nextStage`. It is + // usually the "read" end of the `(nextStdin, prevStdout)` pipe + // pair. + nextStdin *InputStream +} + +// needFilePipe returns `true` if the pipe that joins the two adjacent +// stages should be an `os.Pipe()` rather than an `io.Pipe()`. +func (sj *stageJoiner) needFilePipe() bool { + if sj.prevStage.Requirements().Stdout == StreamPreferFile { + return true + } + if sj.nextStage.Requirements().Stdin == StreamPreferFile { + return true + } + return false +} + +func (sj *stageJoiner) createPipe() error { + var r io.ReadCloser + var w io.WriteCloser + if sj.needFilePipe() { + var err error + r, w, err = os.Pipe() + if err != nil { + return fmt.Errorf("creating os.Pipe: %w", err) + } + } else { + r, w = io.Pipe() + } + + sj.prevStdout = ClosingOutput(w) + sj.nextStdin = ClosingInput(r) + + return nil +} + +// closePipe closes both ends of the pipe that was allocated by +// `createPipe()`. This should only be called if the corresponding +// stage's `Start()` method was never called (otherwise the stage is +// responsible for closing its stdin and stdout). +func (sj *stageJoiner) closePipe() error { + return errors.Join( + sj.prevStdout.Close(), + sj.nextStdin.Close(), + ) +} + +// validate verifies that `sj.prevStdout` and `sj.nextStdin` are +// suitable for the adjacent stages, in particular that no pipe is +// created if the stage requirements are `StreamForbidden`. +func (sj *stageJoiner) validate() error { + if sj.prevStage != nil { + stdoutRequirements := sj.prevStage.Requirements().Stdout + if stdoutRequirements == StreamForbidden && sj.prevStdout != nil { + return fmt.Errorf( + "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), + ) + } + } + + if sj.nextStage != nil { + stdinRequirements := sj.nextStage.Requirements().Stdin + if stdinRequirements == StreamForbidden && sj.nextStdin != nil { + return fmt.Errorf( + "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), + ) + } + } + + return nil +} From 42a4910abe161403b2b69942a0e5385e0afc7595 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Mon, 15 Jun 2026 17:33:02 +0200 Subject: [PATCH 24/26] Stage: do some more work on the docstring Move the part of the explanation that is specific to command stages to `command.go`. --- pipe/command.go | 29 +++++++++++++ pipe/stage.go | 108 ++++++++---------------------------------------- 2 files changed, 46 insertions(+), 91 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 908516b..41ef520 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -99,6 +99,35 @@ func (s *commandStage) Start( s.setupEnv(ctx, opts.Env) + // It is important that the streams that are used by a command be + // closed at the right time. When that is depends on the type of + // the stream. + // + // A subprocess ultimately needs its own copies of `*os.File` file + // descriptors for its stdin and stdout. The external command will + // "always" close those when it exits. + // + // (It's theoretically possible for a command to pass the open + // file descriptor to another, longer-lived process, in which case + // the file descriptor wouldn't necessarily get closed even when + // the command finishes. But that's ill-behaved in a command that + // is being used in a pipeline, so we'll ignore that possibility.) + // + // If a stream provided for use as stdin/stdout is an `*os.File`, + // then we set the corresponding field of `exec.Cmd` to that + // argument. This causes `exec.Cmd` to duplicate that file + // descriptor and passes the dup to the subprocess. Therefore, we + // want to close our own copy "early", namely as soon as the + // external command has started, because the external command will + // keep its own copy open as long as necessary (and no longer!). + // + // If a stdin/stdout stream is _not_ an `*os.File`, then + // `exec.Cmd` will take care of creating an `os.Pipe()`, copying + // from the provided stream into/out of the pipe, and eventually + // close both ends of the pipe. In that case, we must close the + // provided stream "late", namely only after the external command + // and the copy have finished. + // Things that have to be closed as soon as the command has started: var earlyClosers []io.Closer diff --git a/pipe/stage.go b/pipe/stage.go index 9a35254..7f10dc8 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -9,103 +9,29 @@ import ( // // # Who closes stdin and stdout? // -// A `Stage` as a whole is responsible for closing its end of stdin -// and stdout if the corresponding stream is closing. That -// responsibility transfers to the stage as soon as the stage's -// `Start()` method is called and applies even if `Start()` returns an -// error. Before returning an error from `Start()`, the stage must -// close any closing stream that it has not already handed off to -// something else that will close it promptly. The caller must not -// close a closing stream after passing it to `Start()`. -// -// If the caller wants to retain ownership of stdin/stdout, it passes -// a non-closing stream. Calling `Close()` on a non-closing stream is -// a NOP, so it isn't harmful but isn't required. +// A `Stage` is responsible for calling `Close()` on the +// `InputStream`/`OutputStream` that represent its stdin and stdout as +// soon as it doesn't need them anymore. That responsibility begins as +// soon as the stage's `Start()` method is called, and applies +// regardless of whether `Start()` returns an error. It must close the +// streams before its `Wait()` method returns. The caller must not +// close the streams after calling `Start()`. // // Closing stdin/stdout tells the previous/next stage that this stage // is done reading/writing data, which can affect their behavior. -// Therefore, after a successful start, a stage should close each one -// as soon as it is done with it. Assume that this is done via the -// following local function -// -// closeStreams := func() { -// // Error handling omitted. -// _ = stdin.Close() -// _ = stdout.Close() -// } +// Therefore, it is important for a stage to close each one as soon as +// it is done with it. // // From the point of view of the pipeline as a whole, if stdin is // provided by the user (`WithStdin()`), then we don't want the first -// stage to close it at all, whether it's an `*os.File` or not. The -// pipeline communicates this by passing a non-closing `InputStream` -// when it starts that stage. For stdout, it depends on whether the -// user supplied it using `WithStdout()` or `WithStdoutCloser()`. In -// any case, this function can close the streams anyway, because -// `InputStream.Close()` and `OutputStream.Close()` do nothing in -// those cases. -// -// When these closes should happen depends on what kind of stage it is -// and whether stdin/stdout are of type `*os.File`. -// -// ## A command stage -// -// If a stage is an external command, then the subprocess ultimately -// needs its own copies of `*os.File` file descriptors for its stdin -// and stdout. The external command will "always" [1] close those when -// it exits. -// -// (It's theoretically possible for a command to pass the open file -// descriptor to another, longer-lived process, in which case the file -// descriptor wouldn't necessarily get closed when the command -// finishes. But that's ill-behaved in a command that is being used in -// a pipeline, so we'll ignore that possibility.) -// -// If the stage is an external command and one of the arguments is an -// `*os.File`, then it can set the corresponding field of `exec.Cmd` -// to that argument directly. This has the result that `exec.Cmd` -// duplicates that file descriptor and passes the dup to the -// subprocess. Therefore, the stage must close its copy of that -// argument as soon as the external command has started, because the -// external command will keep its own copy open as long as necessary -// (and no longer!). Therefore, it should use roughly the following -// sequence: -// -// cmd.Stdin = stdin.Reader() -// cmd.Stdout = stdout.Writer() -// err := cmd.Start(…) -// // Close our copies as soon as the command has started: -// closeStreams() -// if err != nil { -// return err -// } -// return cmd.Wait() -// -// If the stage is an external command and its stdin is not an -// `*os.File`, then `exec.Cmd` will take care of creating an -// `os.Pipe()`, copying from the provided reader into the pipe, and -// eventually closing both ends of the pipe. The stage must close the -// provided stdin itself, but only _after_ the external command and -// the copy have finished, like so: -// -// defer closeStreams() -// cmd.Stdin = stdin.Reader() -// cmd.Stdout = stdout.Writer() -// err := cmd.Start(…) -// if err != nil { -// return err -// } -// return cmd.Wait() -// -// ## A function stage -// -// If the stage is a Go function, then it holds the only copy of -// stdin/stdout, so it must wait until the function is done before -// closing them (regardless of their underlying type, like so: -// -// go func() { -// defer closeStreams() -// f(…, stdin.Reader(), stdout.Writer()) -// }() +// stage to close it at all. This is arranged by by passing a +// non-closing `InputStream` when it starts that stage. For stdout, it +// depends on whether the user supplied it using `WithStdout()` or +// `WithStdoutCloser()`, and in the former case provides the last +// stage with a non-closing `OutputStream`. Calling `Close()` on a +// non-closing stream (or even on a nil stream) is a NOP, so the +// `Stage` can always call `Close()` and doesn't have to worry about +// whether a stdin/stdout stream is non-closing. type Stage interface { // Name returns the name of the stage. Name() string From f2d53ab064330f25e91ef8de240b2435f1a34af5 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Mon, 15 Jun 2026 15:08:37 +0200 Subject: [PATCH 25/26] stageJoiner: cache each stage's Requirements() needFilePipe() and validate() each call Stage.Requirements() for the adjacent stages, so a stage's requirements can be computed/allocated twice during startup. Caching the result on the joiner lets us call Requirements() only once. --- pipe/pipeline.go | 8 +++++--- pipe/stage_joiner.go | 25 ++++++++++++++----------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index d237461..e764f10 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -286,9 +286,6 @@ func (p *Pipeline) Start(ctx context.Context) error { // Store the stages in the joiners, and verify that the stages' // requirements are well-formed: for i, s := range p.stages { - stageJoiners[i].nextStage = s - stageJoiners[i+1].prevStage = s - // Make sure that the stage's requirements are well-formed: requirements := s.Requirements() if err := requirements.Stdin.Validate(); err != nil { @@ -297,6 +294,11 @@ func (p *Pipeline) Start(ctx context.Context) error { if err := requirements.Stdout.Validate(); err != nil { return fmt.Errorf("stdout: %w", err) } + + stageJoiners[i].nextStage = s + stageJoiners[i].nextStageReq = requirements + stageJoiners[i+1].prevStage = s + stageJoiners[i+1].prevStageReq = requirements } // Create the "inner" pipes (i.e, all but the first and last diff --git a/pipe/stage_joiner.go b/pipe/stage_joiner.go index 24fb789..2730e73 100644 --- a/pipe/stage_joiner.go +++ b/pipe/stage_joiner.go @@ -43,6 +43,11 @@ type stageJoiner struct { // prevStage holds the stage that needs to write to the pipe. prevStage Stage + // prevStageReq caches `prevStage.Requirements()` so that it + // doesn't have to be recomputed. It is the zero value if + // `prevStage` is nil. + prevStageReq StageRequirements + // prevStdout will be used as the stdout of `prevStage`. It is // usually the "write" end of the `(nextStdin, prevStdout)` pipe // pair, with the connected pipe ends in the same `stageJoiner` @@ -52,6 +57,11 @@ type stageJoiner struct { // nextStage holds the stage that needs to read from the pipe. nextStage Stage + // nextStageReq caches `nextStage.Requirements()` so that it + // doesn't have to be recomputed. It is the zero value if + // `nextStage` is nil. + nextStageReq StageRequirements + // nextStdin will be used as the stdin of `nextStage`. It is // usually the "read" end of the `(nextStdin, prevStdout)` pipe // pair. @@ -61,13 +71,8 @@ type stageJoiner struct { // needFilePipe returns `true` if the pipe that joins the two adjacent // stages should be an `os.Pipe()` rather than an `io.Pipe()`. func (sj *stageJoiner) needFilePipe() bool { - if sj.prevStage.Requirements().Stdout == StreamPreferFile { - return true - } - if sj.nextStage.Requirements().Stdin == StreamPreferFile { - return true - } - return false + return sj.prevStageReq.Stdout == StreamPreferFile || + sj.nextStageReq.Stdin == StreamPreferFile } func (sj *stageJoiner) createPipe() error { @@ -105,8 +110,7 @@ func (sj *stageJoiner) closePipe() error { // created if the stage requirements are `StreamForbidden`. func (sj *stageJoiner) validate() error { if sj.prevStage != nil { - stdoutRequirements := sj.prevStage.Requirements().Stdout - if stdoutRequirements == StreamForbidden && sj.prevStdout != nil { + if sj.prevStageReq.Stdout == StreamForbidden && sj.prevStdout != nil { return fmt.Errorf( "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), ) @@ -114,8 +118,7 @@ func (sj *stageJoiner) validate() error { } if sj.nextStage != nil { - stdinRequirements := sj.nextStage.Requirements().Stdin - if stdinRequirements == StreamForbidden && sj.nextStdin != nil { + if sj.nextStageReq.Stdin == StreamForbidden && sj.nextStdin != nil { return fmt.Errorf( "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), ) From 7072f019b80fc47347f85d386d004ba0f06cd752 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Mon, 15 Jun 2026 15:16:22 +0200 Subject: [PATCH 26/26] Pipeline.Start: validate stream requirements before creating pipes Reorder Start() to run stageJoiner.validate() before createPipe(), and make validate() decide whether a stream is "connected" from the pipeline topology (the presence of an adjacent stage or an already-set input/output stream). This lets us reject an invalid pipeline without allocating any io.Pipe() objects, and in the case of os.Pipe(), doing the system calls to create OS-level file descriptors. --- pipe/pipeline.go | 15 +++++++-------- pipe/stage_joiner.go | 34 +++++++++++++++++++--------------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index e764f10..44fedd1 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -301,19 +301,18 @@ func (p *Pipeline) Start(ctx context.Context) error { stageJoiners[i+1].prevStageReq = requirements } - // Create the "inner" pipes (i.e, all but the first and last - // `stageJoiners`): - for i := 1; i < len(stageJoiners)-1; i++ { - if err := stageJoiners[i].createPipe(); err != nil { + // Check that each of the stages' requirements are satisfiable: + for i := range stageJoiners { + if err := stageJoiners[i].validate(); err != nil { closePipes() return err } } - // Check that each of the stages' requirements are compatible with - // the pipes that we have created for them: - for i := range stageJoiners { - if err := stageJoiners[i].validate(); err != nil { + // Create the "inner" pipes (i.e, all but the first and last + // `stageJoiners`): + for i := 1; i < len(stageJoiners)-1; i++ { + if err := stageJoiners[i].createPipe(); err != nil { closePipes() return err } diff --git a/pipe/stage_joiner.go b/pipe/stage_joiner.go index 2730e73..03569cb 100644 --- a/pipe/stage_joiner.go +++ b/pipe/stage_joiner.go @@ -105,24 +105,28 @@ func (sj *stageJoiner) closePipe() error { ) } -// validate verifies that `sj.prevStdout` and `sj.nextStdin` are -// suitable for the adjacent stages, in particular that no pipe is -// created if the stage requirements are `StreamForbidden`. +// validate verifies that the adjacent stages' stream requirements are +// satisfiable, in particular that a stage that forbids its stdin or +// stdout is not connected to anything. func (sj *stageJoiner) validate() error { - if sj.prevStage != nil { - if sj.prevStageReq.Stdout == StreamForbidden && sj.prevStdout != nil { - return fmt.Errorf( - "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), - ) - } + // `prevStage`'s stdout is connected if there is a `nextStage` to + // consume it (in which case an inner pipe will be created) or if + // a stream (`p.stdout`) has already been stored in `prevStdout`. + if sj.prevStage != nil && sj.prevStageReq.Stdout == StreamForbidden && + (sj.nextStage != nil || sj.prevStdout != nil) { + return fmt.Errorf( + "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), + ) } - if sj.nextStage != nil { - if sj.nextStageReq.Stdin == StreamForbidden && sj.nextStdin != nil { - return fmt.Errorf( - "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), - ) - } + // `nextStage`'s stdin is connected if there is a `prevStage` to + // produce it (in which case an inner pipe will be created) or if + // a stream (`p.stdin`) has already been stored in `nextStdin`. + if sj.nextStage != nil && sj.nextStageReq.Stdin == StreamForbidden && + (sj.prevStage != nil || sj.nextStdin != nil) { + return fmt.Errorf( + "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), + ) } return nil