From e772ff004f297411af48ca0a97f843cec121d461 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 07:52:22 +0200 Subject: [PATCH 1/6] StreamRequirement: move to separate file --- pipe/pipeline.go | 9 --------- pipe/stage.go | 10 ---------- pipe/stream_requirement.go | 24 ++++++++++++++++++++++++ 3 files changed, 24 insertions(+), 19 deletions(-) create mode 100644 pipe/stream_requirement.go diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 085fdd3..7f6814e 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -224,15 +224,6 @@ type stageStarter struct { stdout *OutputStream } -func (requirement StreamRequirement) validate() error { - switch requirement { - case StreamOptional, StreamForbidden: - return nil - default: - return fmt.Errorf("invalid stream requirement %d", requirement) - } -} - func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutConnected bool) error { if err := requirements.Stdin.validate(); err != nil { return fmt.Errorf("stdin: %w", err) diff --git a/pipe/stage.go b/pipe/stage.go index 9a35254..9ffce1c 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -155,16 +155,6 @@ type StageOptions struct { // StagePanicHandler is a function that handles panics in a pipeline's stages. type StagePanicHandler func(p any) error -type StreamRequirement int - -const ( - // StreamOptional means the stream may be connected or nil. - StreamOptional StreamRequirement = iota - - // StreamForbidden means the stream must be nil. - StreamForbidden -) - // StageRequirements describes what a Stage needs from the streams connected to // its stdin and stdout. The zero value is correct for stages that are happy // with arbitrary io.Reader/io.Writer streams, such as Function stages. diff --git a/pipe/stream_requirement.go b/pipe/stream_requirement.go new file mode 100644 index 0000000..16e14a0 --- /dev/null +++ b/pipe/stream_requirement.go @@ -0,0 +1,24 @@ +package pipe + +import "fmt" + +type StreamRequirement int + +const ( + // StreamOptional means the stream may be connected or nil. + StreamOptional StreamRequirement = iota + + // StreamForbidden means the stream must be nil. + StreamForbidden +) + +// validate checks that `req` has a valid value and returns an error +// otherwise. +func (req StreamRequirement) validate() error { + switch req { + case StreamOptional, StreamForbidden: + return nil + default: + return fmt.Errorf("invalid stream requirement %d", req) + } +} From fbe470e67ebd0d60430bd231b106b18c29427d67 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 11:09:04 +0200 Subject: [PATCH 2/6] StreamRequirement: encode also whether the stream has to be nil The two aspects of a stage's stream requirements are coupled. For example, of the four possible combinations of `(StreamRequirement, NeedsFile)`, one of them, `(StreamForbidden, true)`, makes no sense. So instead of encoding these two aspects separately, encode the three meaningful combinations into a single `StreamRequirement` type with possible values `StreamAcceptAny`, `StreamPreferFile`, and `StreamForbidden`. --- pipe/command.go | 4 +-- pipe/pipe_matching_test.go | 64 +++++++++++++++++++++++++------------- pipe/pipeline.go | 3 +- pipe/stage.go | 12 +++---- pipe/stream_requirement.go | 28 ++++++++++++----- 5 files changed, 71 insertions(+), 40 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 908516b..eb2a847 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -81,8 +81,8 @@ func (s *commandStage) Process() *os.Process { func (s *commandStage) Requirements() StageRequirements { return StageRequirements{ - StdinNeedsFile: true, - StdoutNeedsFile: true, + Stdin: StreamPreferFile, + Stdout: StreamPreferFile, } } diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index badb990..89c5ae7 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -49,14 +49,10 @@ func writeCloser() io.WriteCloser { } func newPipeSniffingStage( - stdinNeedsFile bool, stdinExpectation ioExpectation, - stdoutNeedsFile bool, stdoutExpectation ioExpectation, + req pipe.StageRequirements, stdinExpectation, stdoutExpectation ioExpectation, ) *pipeSniffingStage { return &pipeSniffingStage{ - requirements: pipe.StageRequirements{ - StdinNeedsFile: stdinNeedsFile, - StdoutNeedsFile: stdoutNeedsFile, - }, + requirements: req, expect: pipeExpectations{ stdin: stdinExpectation, stdout: stdoutExpectation, @@ -68,8 +64,11 @@ func newPipeSniffingFunc( stdinExpectation, stdoutExpectation ioExpectation, ) *pipeSniffingStage { return newPipeSniffingStage( - false, stdinExpectation, - false, stdoutExpectation, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + stdinExpectation, stdoutExpectation, ) } @@ -77,8 +76,11 @@ func newPipeSniffingCmd( stdinExpectation, stdoutExpectation ioExpectation, ) *pipeSniffingStage { return newPipeSniffingStage( - true, stdinExpectation, - true, stdoutExpectation, + pipe.StageRequirements{ + Stdin: pipe.StreamPreferFile, + Stdout: pipe.StreamPreferFile, + }, + stdinExpectation, stdoutExpectation, ) } @@ -325,16 +327,25 @@ func TestPipeTypes(t *testing.T) { opts: []pipe.Option{}, stages: []pipe.Stage{ newPipeSniffingStage( - false, expectNil, - false, expectOther, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectNil, expectOther, ), newPipeSniffingStage( - false, expectOther, - true, expectFile, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamPreferFile, + }, + expectOther, expectFile, ), newPipeSniffingStage( - false, expectFile, - false, expectNil, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectFile, expectNil, ), }, }, @@ -343,16 +354,25 @@ func TestPipeTypes(t *testing.T) { opts: []pipe.Option{}, stages: []pipe.Stage{ newPipeSniffingStage( - false, expectNil, - false, expectFile, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectNil, expectFile, ), newPipeSniffingStage( - true, expectFile, - false, expectOther, + pipe.StageRequirements{ + Stdin: pipe.StreamPreferFile, + Stdout: pipe.StreamAcceptAny, + }, + expectFile, expectOther, ), newPipeSniffingStage( - false, expectOther, - false, expectNil, + pipe.StageRequirements{ + Stdin: pipe.StreamAcceptAny, + Stdout: pipe.StreamAcceptAny, + }, + expectOther, expectNil, ), }, }, diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 7f6814e..4fb4f60 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -371,7 +371,8 @@ func (p *Pipeline) Start(ctx context.Context) error { // We need to generate a pipe pair for this stage to use // to communicate with its successor: - if ss.requirements.StdoutNeedsFile || nextSS.requirements.StdinNeedsFile { + if ss.requirements.Stdout == StreamPreferFile || + nextSS.requirements.Stdin == StreamPreferFile { // Use an OS-level pipe for the communication: nextStdin, stdout, err := os.Pipe() if err != nil { diff --git a/pipe/stage.go b/pipe/stage.go index 9ffce1c..0a086ba 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -155,15 +155,11 @@ type StageOptions struct { // StagePanicHandler is a function that handles panics in a pipeline's stages. type StagePanicHandler func(p any) error -// StageRequirements describes what a Stage needs from the streams connected to -// its stdin and stdout. The zero value is correct for stages that are happy -// with arbitrary io.Reader/io.Writer streams, such as Function stages. +// StageRequirements describes what a Stage needs from the streams +// connected to its stdin and stdout. The zero value is correct for +// stages that are happy with arbitrary io.Reader/io.Writer streams, +// such as Function stages. type StageRequirements struct { Stdin StreamRequirement Stdout StreamRequirement - - // {Stdin,Stdout}NeedsFile indicate that, if stdio is connected, the - // stage requires it to be backed by an *os.File (a real file descriptor) - StdinNeedsFile bool - StdoutNeedsFile bool } diff --git a/pipe/stream_requirement.go b/pipe/stream_requirement.go index 16e14a0..ff2fd47 100644 --- a/pipe/stream_requirement.go +++ b/pipe/stream_requirement.go @@ -2,23 +2,37 @@ package pipe import "fmt" +// StreamRequirement describes a `Stage`'s requirement for its stdin +// or stdout, namely whether it can be anything, whether it should +// preferably be an `*os.File`, or whether it must be `nil`. The zero +// value `StreamAcceptAny` is a valid value that indicates that the +// stage has no particular requirements or preferences for its +// stdin/stdout, such as a typical `Function` stage. type StreamRequirement int const ( - // StreamOptional means the stream may be connected or nil. - StreamOptional StreamRequirement = iota + // StreamAcceptAny indicates that the stage hasn't declared what + // kind of stream it requires, maybe even `nil`. + StreamAcceptAny StreamRequirement = iota - // StreamForbidden means the stream must be nil. + // StreamPreferFile indicates that the stage prefers the + // corresponding stream to be backed by an `*os.File` (a real file + // descriptor), but it can work with any io.Reader/io.Writer. + StreamPreferFile + + // StreamForbidden indicates that the stage requires the + // corresponding stream to be nil. It won't read/write the stream + // or close it. StreamForbidden ) // validate checks that `req` has a valid value and returns an error // otherwise. -func (req StreamRequirement) validate() error { - switch req { - case StreamOptional, StreamForbidden: +func (requirement StreamRequirement) validate() error { + switch requirement { + case StreamAcceptAny, StreamPreferFile, StreamForbidden: return nil default: - return fmt.Errorf("invalid stream requirement %d", req) + return fmt.Errorf("invalid stream requirement %d", requirement) } } From cb3bf1d04c4a5f0981995f7e4c8c133db65582e0 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 13 Jun 2026 11:22:52 +0200 Subject: [PATCH 3/6] StreamRequirement.Validate(): make method public --- pipe/pipeline.go | 4 ++-- pipe/stream_requirement.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 4fb4f60..4a57382 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -225,10 +225,10 @@ type stageStarter struct { } func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutConnected bool) error { - if err := requirements.Stdin.validate(); err != nil { + if err := requirements.Stdin.Validate(); err != nil { return fmt.Errorf("stdin: %w", err) } - if err := requirements.Stdout.validate(); err != nil { + if err := requirements.Stdout.Validate(); err != nil { return fmt.Errorf("stdout: %w", err) } if requirements.Stdin == StreamForbidden && stdinConnected { diff --git a/pipe/stream_requirement.go b/pipe/stream_requirement.go index ff2fd47..ddff829 100644 --- a/pipe/stream_requirement.go +++ b/pipe/stream_requirement.go @@ -26,9 +26,9 @@ const ( StreamForbidden ) -// validate checks that `req` has a valid value and returns an error +// Validate checks that `req` has a valid value and returns an error // otherwise. -func (requirement StreamRequirement) validate() error { +func (requirement StreamRequirement) Validate() error { switch requirement { case StreamAcceptAny, StreamPreferFile, StreamForbidden: return nil From 8e22ddcb03d57b079a2324e47e96d0c283a5db8f Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 12 Jun 2026 16:06:58 +0200 Subject: [PATCH 4/6] stageJoiner: new helper type for creating pipes between stages The old code used a `stageStarter` type to help with starting up stages. There are two awkward things about that approach: * The pipe that is needed to join stages depends on the two adjacent stages, not on the stdin and stdout of a single stage. So `stageStarter` wasn't able to figure out what pipe was needed; that logic kindof needed to live in the `Pipeline` type. * Since the read and write ends of a pipe are created together, the `stageStarter` could also not be as helpful in closing those streams if there was an error. So instead, use a new helper type, `stageJoiner`, to figure out how to join two adjacent stages together. The `stageJoiner` can now do part of the work for us: * `stageJoiner.validate()` checks that the adjacent stages' requirements are met WRT whether they need stdin/stdout to be supplied at all. * `stageJoiner.createPipe()` figures out what kind of pipe to create for the two adjacent stages. * `stageJoiner.closePipe()` closes the pipe (if it has already been created and needs closing) on errors. This isn't a panacea; for example, some loops have to iterate over stages, and some over stage joiners. But I think that it's less confusing this way. --- pipe/pipeline.go | 177 +++++++++++++++---------------------------- pipe/stage_joiner.go | 126 ++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 114 deletions(-) create mode 100644 pipe/stage_joiner.go diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 4a57382..d237461 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "os" "sync/atomic" ) @@ -218,41 +217,6 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { } } -type stageStarter struct { - requirements StageRequirements - stdin *InputStream - stdout *OutputStream -} - -func (requirements StageRequirements) validate(s Stage, stdinConnected, stdoutConnected bool) error { - if err := requirements.Stdin.Validate(); err != nil { - return fmt.Errorf("stdin: %w", err) - } - if err := requirements.Stdout.Validate(); err != nil { - return fmt.Errorf("stdout: %w", err) - } - if requirements.Stdin == StreamForbidden && stdinConnected { - return fmt.Errorf("stage %q forbids stdin, but stdin is connected", s.Name()) - } - if requirements.Stdout == StreamForbidden && stdoutConnected { - return fmt.Errorf("stage %q forbids stdout, but stdout is connected", s.Name()) - } - return nil -} - -func (p *Pipeline) abortBeforeStart(s Stage, err error) error { - _ = p.stdout.Close() - p.cancel() - p.eventHandler(&Event{ - Command: s.Name(), - Msg: "failed to start pipeline stage", - Err: err, - }) - return fmt.Errorf( - "starting pipeline stage %q: %w", s.Name(), err, - ) -} - func (p *Pipeline) stageOptions() StageOptions { return StageOptions{Env: p.env, PanicHandler: p.panicHandler} } @@ -300,50 +264,67 @@ func (p *Pipeline) Start(ctx context.Context) error { // We need to decide how to start the stages, especially what // pipes to use to connect adjacent stages (`os.Pipe()` vs. // `io.Pipe()`) based on the two stages' requirements. - stageStarters := make([]stageStarter, len(p.stages)) + stageJoiners := make([]stageJoiner, len(p.stages)+1) + + // Arrange for the input of the 0th stage to come from `p.stdin`: + stageJoiners[0].nextStdin = p.stdin + + // Arrange for the output of the last stage to go to `p.stdout`: + stageJoiners[len(p.stages)].prevStdout = p.stdout + + // closePipes closes all of the streams that are currently stored + // in the joiners. This should be called if startup fails. As we + // call `Stage.Start()` and pass that method streams, we clear + // them from the corresponding joiners to avoid closing them + // twice. + closePipes := func() { + for _, sj := range stageJoiners { + _ = sj.closePipe() + } + } - // Collect information about each stage's type and requirements: + // Store the stages in the joiners, and verify that the stages' + // requirements are well-formed: for i, s := range p.stages { - stageStarters[i].requirements = s.Requirements() + stageJoiners[i].nextStage = s + stageJoiners[i+1].prevStage = s - err := stageStarters[i].requirements.validate( - s, - i > 0 || p.stdin != nil, - i < len(p.stages)-1 || p.stdout != nil, - ) - if err != nil { - return p.abortBeforeStart(s, err) + // Make sure that the stage's requirements are well-formed: + requirements := s.Requirements() + if err := requirements.Stdin.Validate(); err != nil { + return fmt.Errorf("stdin: %w", err) + } + if err := requirements.Stdout.Validate(); err != nil { + return fmt.Errorf("stdout: %w", err) } } - if p.stdin != nil { - // Arrange for the input of the 0th stage to come from - // `p.stdin`: - stageStarters[0].stdin = p.stdin - } - - if p.stdout != nil { - // Arrange for the output of the last stage to go to - // `p.stdout`: - stageStarters[len(p.stages)-1].stdout = p.stdout + // Create the "inner" pipes (i.e, all but the first and last + // `stageJoiners`): + for i := 1; i < len(stageJoiners)-1; i++ { + if err := stageJoiners[i].createPipe(); err != nil { + closePipes() + return err + } } - // Clean up any processes and pipes that have been created. `i` is the - // index of the stage that failed to start. If the stage already received - // its streams, it owns any closing stream. - abort := func(i int, err error, closeFailedStageStdin bool) error { - // If the failing stage never received its stdin, close the pipe that - // the previous stage was writing to. That should cause it to exit - // even if it's not minding its context. - if closeFailedStageStdin { - _ = stageStarters[i].stdin.Close() + // Check that each of the stages' requirements are compatible with + // the pipes that we have created for them: + for i := range stageJoiners { + if err := stageJoiners[i].validate(); err != nil { + closePipes() + return err } + } - // If stdout was supplied with WithStdoutCloser but the final stage - // was never started, then the pipeline still owns that closer. - if i < len(p.stages)-1 { - _ = p.stdout.Close() - } + // We're about to start up the stages, one by one. If something + // goes wrong during that process, this function should be called + // to kill any stages that have already been started and to close + // any pipes that have not yet been passed to a stage. `i` is the + // index of the stage that failed to start. If the stage already + // received its streams, it is responsible for closing them. + abort := func(i int, err error) error { + closePipes() // Kill and wait for any stages that have been started // already to finish: @@ -361,52 +342,20 @@ func (p *Pipeline) Start(ctx context.Context) error { ) } - // Loop over all but the last stage, starting them. By the time we - // get to a stage, its stdin will have already been determined, - // but we still need to figure out its stdout and set the stdin - // that will be used for the subsequent stage. - for i, s := range p.stages[:len(p.stages)-1] { - ss := &stageStarters[i] - nextSS := &stageStarters[i+1] - - // We need to generate a pipe pair for this stage to use - // to communicate with its successor: - if ss.requirements.Stdout == StreamPreferFile || - nextSS.requirements.Stdin == StreamPreferFile { - // Use an OS-level pipe for the communication: - nextStdin, stdout, err := os.Pipe() - if err != nil { - return abort(i, err, true) - } - nextSS.stdin = ClosingInput(nextStdin) - ss.stdout = ClosingOutput(stdout) - } else { - nextStdin, stdout := io.Pipe() - nextSS.stdin = ClosingInput(nextStdin) - ss.stdout = ClosingOutput(stdout) - } - if err := s.Start( - ctx, p.stageOptions(), - ss.stdin, ss.stdout, - ); err != nil { - _ = nextSS.stdin.Close() - return abort(i, err, false) - } - } + // Loop over all of the stages, starting them in order. + for i, s := range p.stages { + prevSJ := &stageJoiners[i] + nextSJ := &stageJoiners[i+1] - // The last stage needs special handling, because its stdout - // doesn't need to flow into another stage (it's already set in - // `ss.stdout` if it's needed). - { - i := len(p.stages) - 1 - s := p.stages[i] - ss := &stageStarters[i] + err := s.Start(ctx, p.stageOptions(), prevSJ.nextStdin, nextSJ.prevStdout) + + // Even if that stage failed to start, we are no longer + // responsible for closing its streams: + prevSJ.nextStdin = nil + nextSJ.prevStdout = nil - if err := s.Start( - ctx, p.stageOptions(), - ss.stdin, ss.stdout, - ); err != nil { - return abort(i, err, false) + if err != nil { + return abort(i, err) } } diff --git a/pipe/stage_joiner.go b/pipe/stage_joiner.go new file mode 100644 index 0000000..24fb789 --- /dev/null +++ b/pipe/stage_joiner.go @@ -0,0 +1,126 @@ +package pipe + +import ( + "errors" + "fmt" + "io" + "os" +) + +// stageJoiner is a helper type that helps join two adjacent stages +// together. stageJoiners[i] tells how to connect stage `i-1` to stage +// `i`. From the point of view of stages, `stageJoiners[i].nextStdin` +// and `stageJoiners[i+1].prevStdout` are the input and output +// streams, respectively, of `stage[i]`. The first and last elements +// of `stageJoiners` manage `p.stdin` and `p.stdout`, respectively. +// Schematically, the data flows through like this: +// +// p.stdin == stageJoiners[0].nextStdin → +// stage[0] → +// stageJoiners[1].prevStdout → stageJoiners[1].nextStdin → +// stage[1] → +// stageJoiners[2].prevStdout → stageJoiners[2].nextStdin → +// stage[2] → +// ... → +// stageJoiners[i].prevStdout → stageJoiners[i].nextStdin → +// stage[i] → +// stageJoiners[i+1].prevStdout → stageJoiners[i+1].nextStdin → +// ... → +// stageJoiners[len(stages)-1].prevStdout → stageJoiners[len(stages)-1].nextStdin → +// stage[len(stages)-1] → +// stageJoiners[len(stages)].prevStdout == p.stdout +// +// In pseudo-Shell notation, the stages are run like this: +// +// stage[0] stageJoiners[1].prevStdout +// stage[1] stageJoiners[2].prevStdout +// stage[2] stageJoiners[3].prevStdout +// ... +// stage[i] stageJoiners[i].prevStdout +// ... +// stage[len(stages)-1] p.stdout +type stageJoiner struct { + // prevStage holds the stage that needs to write to the pipe. + prevStage Stage + + // prevStdout will be used as the stdout of `prevStage`. It is + // usually the "write" end of the `(nextStdin, prevStdout)` pipe + // pair, with the connected pipe ends in the same `stageJoiner` + // instance. + prevStdout *OutputStream + + // nextStage holds the stage that needs to read from the pipe. + nextStage Stage + + // nextStdin will be used as the stdin of `nextStage`. It is + // usually the "read" end of the `(nextStdin, prevStdout)` pipe + // pair. + nextStdin *InputStream +} + +// needFilePipe returns `true` if the pipe that joins the two adjacent +// stages should be an `os.Pipe()` rather than an `io.Pipe()`. +func (sj *stageJoiner) needFilePipe() bool { + if sj.prevStage.Requirements().Stdout == StreamPreferFile { + return true + } + if sj.nextStage.Requirements().Stdin == StreamPreferFile { + return true + } + return false +} + +func (sj *stageJoiner) createPipe() error { + var r io.ReadCloser + var w io.WriteCloser + if sj.needFilePipe() { + var err error + r, w, err = os.Pipe() + if err != nil { + return fmt.Errorf("creating os.Pipe: %w", err) + } + } else { + r, w = io.Pipe() + } + + sj.prevStdout = ClosingOutput(w) + sj.nextStdin = ClosingInput(r) + + return nil +} + +// closePipe closes both ends of the pipe that was allocated by +// `createPipe()`. This should only be called if the corresponding +// stage's `Start()` method was never called (otherwise the stage is +// responsible for closing its stdin and stdout). +func (sj *stageJoiner) closePipe() error { + return errors.Join( + sj.prevStdout.Close(), + sj.nextStdin.Close(), + ) +} + +// validate verifies that `sj.prevStdout` and `sj.nextStdin` are +// suitable for the adjacent stages, in particular that no pipe is +// created if the stage requirements are `StreamForbidden`. +func (sj *stageJoiner) validate() error { + if sj.prevStage != nil { + stdoutRequirements := sj.prevStage.Requirements().Stdout + if stdoutRequirements == StreamForbidden && sj.prevStdout != nil { + return fmt.Errorf( + "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), + ) + } + } + + if sj.nextStage != nil { + stdinRequirements := sj.nextStage.Requirements().Stdin + if stdinRequirements == StreamForbidden && sj.nextStdin != nil { + return fmt.Errorf( + "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), + ) + } + } + + return nil +} From f2d53ab064330f25e91ef8de240b2435f1a34af5 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Mon, 15 Jun 2026 15:08:37 +0200 Subject: [PATCH 5/6] stageJoiner: cache each stage's Requirements() needFilePipe() and validate() each call Stage.Requirements() for the adjacent stages, so a stage's requirements can be computed/allocated twice during startup. Caching the result on the joiner lets us call Requirements() only once. --- pipe/pipeline.go | 8 +++++--- pipe/stage_joiner.go | 25 ++++++++++++++----------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index d237461..e764f10 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -286,9 +286,6 @@ func (p *Pipeline) Start(ctx context.Context) error { // Store the stages in the joiners, and verify that the stages' // requirements are well-formed: for i, s := range p.stages { - stageJoiners[i].nextStage = s - stageJoiners[i+1].prevStage = s - // Make sure that the stage's requirements are well-formed: requirements := s.Requirements() if err := requirements.Stdin.Validate(); err != nil { @@ -297,6 +294,11 @@ func (p *Pipeline) Start(ctx context.Context) error { if err := requirements.Stdout.Validate(); err != nil { return fmt.Errorf("stdout: %w", err) } + + stageJoiners[i].nextStage = s + stageJoiners[i].nextStageReq = requirements + stageJoiners[i+1].prevStage = s + stageJoiners[i+1].prevStageReq = requirements } // Create the "inner" pipes (i.e, all but the first and last diff --git a/pipe/stage_joiner.go b/pipe/stage_joiner.go index 24fb789..2730e73 100644 --- a/pipe/stage_joiner.go +++ b/pipe/stage_joiner.go @@ -43,6 +43,11 @@ type stageJoiner struct { // prevStage holds the stage that needs to write to the pipe. prevStage Stage + // prevStageReq caches `prevStage.Requirements()` so that it + // doesn't have to be recomputed. It is the zero value if + // `prevStage` is nil. + prevStageReq StageRequirements + // prevStdout will be used as the stdout of `prevStage`. It is // usually the "write" end of the `(nextStdin, prevStdout)` pipe // pair, with the connected pipe ends in the same `stageJoiner` @@ -52,6 +57,11 @@ type stageJoiner struct { // nextStage holds the stage that needs to read from the pipe. nextStage Stage + // nextStageReq caches `nextStage.Requirements()` so that it + // doesn't have to be recomputed. It is the zero value if + // `nextStage` is nil. + nextStageReq StageRequirements + // nextStdin will be used as the stdin of `nextStage`. It is // usually the "read" end of the `(nextStdin, prevStdout)` pipe // pair. @@ -61,13 +71,8 @@ type stageJoiner struct { // needFilePipe returns `true` if the pipe that joins the two adjacent // stages should be an `os.Pipe()` rather than an `io.Pipe()`. func (sj *stageJoiner) needFilePipe() bool { - if sj.prevStage.Requirements().Stdout == StreamPreferFile { - return true - } - if sj.nextStage.Requirements().Stdin == StreamPreferFile { - return true - } - return false + return sj.prevStageReq.Stdout == StreamPreferFile || + sj.nextStageReq.Stdin == StreamPreferFile } func (sj *stageJoiner) createPipe() error { @@ -105,8 +110,7 @@ func (sj *stageJoiner) closePipe() error { // created if the stage requirements are `StreamForbidden`. func (sj *stageJoiner) validate() error { if sj.prevStage != nil { - stdoutRequirements := sj.prevStage.Requirements().Stdout - if stdoutRequirements == StreamForbidden && sj.prevStdout != nil { + if sj.prevStageReq.Stdout == StreamForbidden && sj.prevStdout != nil { return fmt.Errorf( "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), ) @@ -114,8 +118,7 @@ func (sj *stageJoiner) validate() error { } if sj.nextStage != nil { - stdinRequirements := sj.nextStage.Requirements().Stdin - if stdinRequirements == StreamForbidden && sj.nextStdin != nil { + if sj.nextStageReq.Stdin == StreamForbidden && sj.nextStdin != nil { return fmt.Errorf( "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), ) From 7072f019b80fc47347f85d386d004ba0f06cd752 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Mon, 15 Jun 2026 15:16:22 +0200 Subject: [PATCH 6/6] Pipeline.Start: validate stream requirements before creating pipes Reorder Start() to run stageJoiner.validate() before createPipe(), and make validate() decide whether a stream is "connected" from the pipeline topology (the presence of an adjacent stage or an already-set input/output stream). This lets us reject an invalid pipeline without allocating any io.Pipe() objects, and in the case of os.Pipe(), doing the system calls to create OS-level file descriptors. --- pipe/pipeline.go | 15 +++++++-------- pipe/stage_joiner.go | 34 +++++++++++++++++++--------------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index e764f10..44fedd1 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -301,19 +301,18 @@ func (p *Pipeline) Start(ctx context.Context) error { stageJoiners[i+1].prevStageReq = requirements } - // Create the "inner" pipes (i.e, all but the first and last - // `stageJoiners`): - for i := 1; i < len(stageJoiners)-1; i++ { - if err := stageJoiners[i].createPipe(); err != nil { + // Check that each of the stages' requirements are satisfiable: + for i := range stageJoiners { + if err := stageJoiners[i].validate(); err != nil { closePipes() return err } } - // Check that each of the stages' requirements are compatible with - // the pipes that we have created for them: - for i := range stageJoiners { - if err := stageJoiners[i].validate(); err != nil { + // Create the "inner" pipes (i.e, all but the first and last + // `stageJoiners`): + for i := 1; i < len(stageJoiners)-1; i++ { + if err := stageJoiners[i].createPipe(); err != nil { closePipes() return err } diff --git a/pipe/stage_joiner.go b/pipe/stage_joiner.go index 2730e73..03569cb 100644 --- a/pipe/stage_joiner.go +++ b/pipe/stage_joiner.go @@ -105,24 +105,28 @@ func (sj *stageJoiner) closePipe() error { ) } -// validate verifies that `sj.prevStdout` and `sj.nextStdin` are -// suitable for the adjacent stages, in particular that no pipe is -// created if the stage requirements are `StreamForbidden`. +// validate verifies that the adjacent stages' stream requirements are +// satisfiable, in particular that a stage that forbids its stdin or +// stdout is not connected to anything. func (sj *stageJoiner) validate() error { - if sj.prevStage != nil { - if sj.prevStageReq.Stdout == StreamForbidden && sj.prevStdout != nil { - return fmt.Errorf( - "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), - ) - } + // `prevStage`'s stdout is connected if there is a `nextStage` to + // consume it (in which case an inner pipe will be created) or if + // a stream (`p.stdout`) has already been stored in `prevStdout`. + if sj.prevStage != nil && sj.prevStageReq.Stdout == StreamForbidden && + (sj.nextStage != nil || sj.prevStdout != nil) { + return fmt.Errorf( + "stage %q forbids stdout, but stdout is connected", sj.prevStage.Name(), + ) } - if sj.nextStage != nil { - if sj.nextStageReq.Stdin == StreamForbidden && sj.nextStdin != nil { - return fmt.Errorf( - "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), - ) - } + // `nextStage`'s stdin is connected if there is a `prevStage` to + // produce it (in which case an inner pipe will be created) or if + // a stream (`p.stdin`) has already been stored in `nextStdin`. + if sj.nextStage != nil && sj.nextStageReq.Stdin == StreamForbidden && + (sj.prevStage != nil || sj.nextStdin != nil) { + return fmt.Errorf( + "stage %q forbids stdin, but stdin is connected", sj.nextStage.Name(), + ) } return nil