Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a327b2f
Make commandStage close stdout streams if pooled stdout setup fails
znull Jun 12, 2026
f040a53
Close stdout streams when startup fails before the final stage starts
znull Jun 12, 2026
08d5b4b
Introduce stream types
znull Jun 12, 2026
c6d12cb
windows could have these, we don't know until we try
znull Jun 11, 2026
1d7a969
test Go producer behavior after downstream early exit
znull Jun 10, 2026
33a4e28
document producer pipe errors in v2 migration
znull Jun 10, 2026
ac585a8
fully document stream ownership (close responsibility)
znull Jun 12, 2026
a65fafb
Document early-close producer caveats
znull Jun 12, 2026
433e104
Align start-failure cleanup with stream ownership
znull Jun 12, 2026
642eb3e
InputStream, OutputStream: move types to a separate file
mhagger Jun 13, 2026
67ca961
InputStream, OutputStream: change `Close()` methods to pass errors th…
mhagger Jun 13, 2026
e3a8223
Stage: update docstring to reflect use of streams
mhagger Jun 13, 2026
aa3db90
InputStream, OutputStream: remove the `Closer()` methods
mhagger Jun 13, 2026
63d8e51
commandStage.Start(): rename some local variables
mhagger Jun 13, 2026
6ef82b5
InputStream, OutputStream: make these into pointer types
mhagger Jun 13, 2026
29edba3
InputStream, OutputStream: make `Close()` idempotent
mhagger Jun 13, 2026
8466274
InputStream, OutputStream: improve docstrings
mhagger Jun 13, 2026
b30a165
Pipeline: use `InputStream` and `OutputStream`
mhagger Jun 13, 2026
07cbbcf
Stage: improve docstring
mhagger Jun 14, 2026
e772ff0
StreamRequirement: move to separate file
mhagger Jun 13, 2026
fbe470e
StreamRequirement: encode also whether the stream has to be nil
mhagger Jun 13, 2026
cb3bf1d
StreamRequirement.Validate(): make method public
mhagger Jun 13, 2026
8e22ddc
stageJoiner: new helper type for creating pipes between stages
mhagger Jun 12, 2026
42a4910
Stage: do some more work on the docstring
mhagger Jun 15, 2026
f2d53ab
stageJoiner: cache each stage's Requirements()
znull Jun 15, 2026
7072f01
Pipeline.Start: validate stream requirements before creating pipes
znull Jun 15, 2026
a274f74
Merge pull request #58 from github/moar-streams
mhagger Jun 15, 2026
a86cf5f
Merge pull request #59 from github/stage-joiner
mhagger Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@ A package used to easily build command pipelines in your Go applications
# Important
We have not thoroughly tested this package on OSs other than Linux, especially Windows. At this time, using this package on Windows based systems is considered experimental and will be supported only on a best effort basis.

# Migrating to v2

It's normal for pipelines to stop before all input has been consumed[^1]. If an earlier stage continues writing after that happens, the write side of the pipe can fail with `EPIPE`, `SIGPIPE`, or `io.ErrClosedPipe`.

In go-pipe v1 it was possible to get away without handling this case, because a command stage's stdin was connected in a way that often (but not necessarily!) drained the write side and hid the error from the previous stage feeding it. That was an implementation detail, not a guarantee. In go-pipe v2, producer stages are more likely to be connected directly to a command's stdin, and thus see the error themselves.

Fortunately, this is easily handled by wrapping the stage with `pipe.IgnoreError(stage, IsPipeError)`. If the producer only writes output and is otherwise stateless, that's the only thing needed.

If the producer also updates state, metrics, cursors, or has other side effects, in a way that depends on how much of the output was produced, then in addition to using `pipe.IgnoreError`, you must also ensure producer-owned state is brought to a consistent point before returning the error.

For example, if a stateful producer function must process its entire input for correctness regardless of whether it was read by the consumer, it should use a pattern like:

```go
var writeErr error
for _, item := range items {
updateState(item)
if writeErr == nil {
_, writeErr = fmt.Fprintln(stdout, item)
}
}
return writeErr
```

# Links

* [Docs](https://pkg.go.dev/github.com/github/go-pipe/v2)

[^1]: In `cat foo | head | grep -q`, for example, either `head` or `grep` could exit before its input is fully consumed.
114 changes: 82 additions & 32 deletions pipe/close_responsibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@ import (
// readCloseSpy records whether Close was called.
type readCloseSpy struct {
io.Reader
closed atomic.Bool
closeCount atomic.Uint32
}

func (r *readCloseSpy) Close() error {
r.closed.Store(true)
r.closeCount.Add(1)
return nil
}

// writeCloseSpy records whether Close was called.
type writeCloseSpy struct {
io.Writer
closed atomic.Bool
closeCount atomic.Uint32
}

func (w *writeCloseSpy) Close() error {
w.closed.Store(true)
w.closeCount.Add(1)
return nil
}

// TestGoStageHonorsCloseFlags verifies that a Function stage closes
// stdin/stdout iff the corresponding close flag is true.
func TestGoStageHonorsCloseFlags(t *testing.T) {
// TestGoStageHonorsStreamOwnership verifies that a Function stage closes
// stdin/stdout iff the corresponding stream is closing.
func TestGoStageHonorsStreamOwnership(t *testing.T) {
cases := []struct {
name string
leaveIn, leaveOut bool
Expand All @@ -58,34 +58,62 @@ func TestGoStageHonorsCloseFlags(t *testing.T) {

require.NoError(t, s.Start(
context.Background(), StageOptions{},
in, !tc.leaveIn,
out, !tc.leaveOut,
inputForTest(in, !tc.leaveIn),
outputForTest(out, !tc.leaveOut),
))
require.NoError(t, s.Wait())

assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closeStdin=%v", !tc.leaveIn)
assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closeStdout=%v", !tc.leaveOut)
if tc.leaveIn {
assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn)
} else {
assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !tc.leaveIn)
}
if tc.leaveOut {
assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut)
} else {
assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !tc.leaveOut)
}
})
}
}

func TestStagePanicsWhenOwnedStreamIsNotCloseable(t *testing.T) {
s := Function("f", func(_ context.Context, _ Env, _ io.Reader, _ io.Writer) error {
return nil
})

assert.PanicsWithValue(t, "stage asked to close *strings.Reader, which does not implement io.Closer", func() {
_ = s.Start(
context.Background(), StageOptions{},
strings.NewReader("not closeable"), true,
nil, false,
)
})
func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) {
borrowedReader := &readCloseSpy{Reader: strings.NewReader("borrowed")}
borrowedInput := Input(borrowedReader)
assert.Same(t, borrowedReader, borrowedInput.Reader())
assert.NoError(t, borrowedInput.Close())
assert.EqualValues(t, 0, borrowedReader.closeCount.Load())
assert.NoError(t, borrowedInput.Close())
assert.EqualValues(t, 0, borrowedReader.closeCount.Load())

ownedReader := &readCloseSpy{Reader: strings.NewReader("owned")}
ownedInput := ClosingInput(ownedReader)
assert.Same(t, ownedReader, ownedInput.Reader())
assert.NoError(t, ownedInput.Close())
assert.EqualValues(t, 1, ownedReader.closeCount.Load())
assert.NoError(t, ownedInput.Close())
assert.EqualValues(t, 1, ownedReader.closeCount.Load())

borrowedWriter := &writeCloseSpy{Writer: &strings.Builder{}}
borrowedOutput := Output(borrowedWriter)
assert.Same(t, borrowedWriter, borrowedOutput.Writer())
assert.NoError(t, borrowedOutput.Close())
assert.EqualValues(t, 0, borrowedWriter.closeCount.Load())
assert.NoError(t, borrowedOutput.Close())
assert.EqualValues(t, 0, borrowedWriter.closeCount.Load())

ownedWriter := &writeCloseSpy{Writer: &writeCloseSpy{Writer: io.Discard}}
ownedOutput := ClosingOutput(ownedWriter)
assert.Same(t, ownedWriter, ownedOutput.Writer())
assert.NoError(t, ownedOutput.Close())
assert.EqualValues(t, 1, ownedWriter.closeCount.Load())
assert.NoError(t, ownedOutput.Close())
assert.EqualValues(t, 1, ownedWriter.closeCount.Load())
}

// TestCommandStageHonorsCloseStdin verifies that a command stage closes a
// non-file stdin (a "late" closer) iff closeStdin is true. An empty
// reader is used so exec.Cmd's input-copy goroutine sees EOF promptly.
// non-file stdin (a "late" closer) iff the input stream is closing. An
// empty reader is used so exec.Cmd's input-copy goroutine sees EOF promptly.
func TestCommandStageHonorsCloseStdin(t *testing.T) {
for _, leave := range []bool{false, true} {
name := "owns stdin"
Expand All @@ -100,19 +128,23 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) {

require.NoError(t, s.Start(
context.Background(), StageOptions{},
in, !leave,
nil, false,
inputForTest(in, !leave),
Output(nil),
))
require.NoError(t, s.Wait())

assert.Equal(t, !leave, in.closed.Load(), "closeStdin=%v", !leave)
if leave {
assert.EqualValues(t, 0, in.closeCount.Load(), "closing stdin=%v", !leave)
} else {
assert.EqualValues(t, 1, in.closeCount.Load(), "closing stdin=%v", !leave)
}
})
}
}

// TestCommandStageHonorsCloseStdout verifies the stdout counterpart: a
// non-file stdout (routed through the pooled-copy path) is closed iff
// closeStdout is true.
// the output stream is closing.
func TestCommandStageHonorsCloseStdout(t *testing.T) {
for _, leave := range []bool{false, true} {
name := "owns stdout"
Expand All @@ -127,12 +159,30 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) {

require.NoError(t, s.Start(
context.Background(), StageOptions{},
nil, false,
out, !leave,
Input(nil),
outputForTest(out, !leave),
))
require.NoError(t, s.Wait())

assert.Equal(t, !leave, out.closed.Load(), "closeStdout=%v", !leave)
if leave {
assert.EqualValues(t, 0, out.closeCount.Load(), "closing stdout=%v", !leave)
} else {
assert.EqualValues(t, 1, out.closeCount.Load(), "closing stdout=%v", !leave)
}
})
}
}

func inputForTest(r io.ReadCloser, closing bool) *InputStream {
if closing {
return ClosingInput(r)
}
return Input(r)
}

func outputForTest(w io.WriteCloser, closing bool) *OutputStream {
if closing {
return ClosingOutput(w)
}
return Output(w)
}
101 changes: 62 additions & 39 deletions pipe/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,64 +81,67 @@ func (s *commandStage) Process() *os.Process {

func (s *commandStage) Requirements() StageRequirements {
return StageRequirements{
StdinNeedsFile: true,
StdoutNeedsFile: true,
Stdin: StreamPreferFile,
Stdout: StreamPreferFile,
}
}

func (s *commandStage) Start(
ctx context.Context, opts StageOptions,
stdin io.Reader, closeStdin bool,
stdout io.Writer, closeStdout bool,
stdin *InputStream, stdout *OutputStream,
) error {
stdinCloser := ownedCloser(stdin, closeStdin)
stdoutCloser := ownedCloser(stdout, closeStdout)
r := stdin.Reader()
w := stdout.Writer()

if s.cmd.Dir == "" {
s.cmd.Dir = opts.Dir
}

s.setupEnv(ctx, opts.Env)

// It is important that the streams that are used by a command be
// closed at the right time. When that is depends on the type of
// the stream.
//
// A subprocess ultimately needs its own copies of `*os.File` file
// descriptors for its stdin and stdout. The external command will
// "always" close those when it exits.
//
// (It's theoretically possible for a command to pass the open
// file descriptor to another, longer-lived process, in which case
// the file descriptor wouldn't necessarily get closed even when
// the command finishes. But that's ill-behaved in a command that
// is being used in a pipeline, so we'll ignore that possibility.)
//
// If a stream provided for use as stdin/stdout is an `*os.File`,
// then we set the corresponding field of `exec.Cmd` to that
// argument. This causes `exec.Cmd` to duplicate that file
// descriptor and passes the dup to the subprocess. Therefore, we
// want to close our own copy "early", namely as soon as the
// external command has started, because the external command will
// keep its own copy open as long as necessary (and no longer!).
//
// If a stdin/stdout stream is _not_ an `*os.File`, then
// `exec.Cmd` will take care of creating an `os.Pipe()`, copying
// from the provided stream into/out of the pipe, and eventually
// close both ends of the pipe. In that case, we must close the
// provided stream "late", namely only after the external command
// and the copy have finished.

// Things that have to be closed as soon as the command has started:
var earlyClosers []io.Closer

// See the type comment for `Stage` for the explanation of this closing behavior.
if stdin != nil {
s.cmd.Stdin = stdin
if r != nil {
s.cmd.Stdin = r
}

if stdinCloser != nil {
if _, ok := stdin.(*os.File); ok {
// We can close our copy as soon as the command has started
earlyClosers = append(earlyClosers, stdinCloser)
} else {
// We need to close `stdin`, but only after the command has finished
s.lateClosers = append(s.lateClosers, stdinCloser)
}
}

if stdout != nil {
if f, ok := stdout.(*os.File); ok {
s.cmd.Stdout = f
if stdoutCloser != nil {
earlyClosers = append(earlyClosers, stdoutCloser)
}
} else {
// Route the copy through our own pipe so we can use a
// pooled buffer rather than letting exec.Cmd allocate a
// fresh 32KB buffer for its internal io.Copy.
ec, err := s.setupPooledStdout(stdout)
if err != nil {
return err
}
earlyClosers = append(earlyClosers, ec)
if stdoutCloser != nil {
s.lateClosers = append(s.lateClosers, stdoutCloser)
}
}
} else if stdoutCloser != nil {
s.lateClosers = append(s.lateClosers, stdoutCloser)
if _, ok := r.(*os.File); ok {
// We can close our copy as soon as the command has started
earlyClosers = append(earlyClosers, stdin)
} else {
// We need to close `stdin`, but only after the command has finished
s.lateClosers = append(s.lateClosers, stdin)
}

closeEarlyClosers := func() {
Expand All @@ -155,6 +158,26 @@ func (s *commandStage) Start(
_ = s.closeLateClosers()
}

if w != nil {
if f, ok := w.(*os.File); ok {
s.cmd.Stdout = f
earlyClosers = append(earlyClosers, stdout)
} else {
s.lateClosers = append(s.lateClosers, stdout)
// Route the copy through our own pipe so we can use a
// pooled buffer rather than letting exec.Cmd allocate a
// fresh 32KB buffer for its internal io.Copy.
ec, err := s.setupPooledStdout(w)
if err != nil {
cleanupOnStartFailure()
return err
}
earlyClosers = append(earlyClosers, ec)
}
} else {
s.lateClosers = append(s.lateClosers, stdout)
}

// If the caller hasn't arranged otherwise, read the command's
// standard error into our `stderr` field:
if s.cmd.Stderr == nil {
Expand Down
19 changes: 13 additions & 6 deletions pipe/command_stdout_fastpath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
// subprocess can detect when that fd is closed.
func TestCommandStageStdoutFastPath(t *testing.T) {
cases := []struct {
name string
closeStdout bool
name string
closingStdout bool
}{
{
name: "raw *os.File with closeStdout",
closeStdout: true,
name: "raw *os.File with closing stdout",
closingStdout: true,
},
{
name: "raw *os.File without closeStdout",
name: "raw *os.File with non-closing stdout",
},
}
for _, tc := range cases {
Expand All @@ -42,7 +42,14 @@ func TestCommandStageStdoutFastPath(t *testing.T) {
cmd := exec.Command("true")
s := CommandStage("true", cmd).(*commandStage)

require.NoError(t, s.Start(ctx, StageOptions{}, nil, false, f, tc.closeStdout))
var stdout *OutputStream
if tc.closingStdout {
stdout = ClosingOutput(f)
} else {
stdout = Output(f)
}

require.NoError(t, s.Start(ctx, StageOptions{}, Input(nil), stdout))
t.Cleanup(func() { _ = s.Wait() })

gotFile, ok := s.cmd.Stdout.(*os.File)
Expand Down
Loading
Loading