-
Notifications
You must be signed in to change notification settings - Fork 7
v2 review fixes: lifecycle cleanup and docs #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,6 +99,16 @@ func WithDir(dir string) Option { | |
| // WithStdin assigns stdin to the first command in the pipeline. The | ||
| // caller retains ownership of stdin; the pipeline will not close it, | ||
| // even if `Start()` returns an error. | ||
| // | ||
| // If the first stage is a `Command` and stdin is not an `*os.File`, | ||
| // `exec.Cmd` has to copy stdin through an internal goroutine, and | ||
| // `Cmd.Wait()` waits for that copy to finish. This is fine for bounded | ||
| // readers such as `strings.Reader` and `bytes.Reader`, and for | ||
| // `*os.File` values, which are passed to the command directly. But a | ||
| // borrowed, non-file reader that can block forever can also block the | ||
| // pipeline forever if the command exits without consuming all of its | ||
| // stdin. See `TestPipelineIOPipeStdinThatIsNeverClosed` for the known | ||
| // limitation. | ||
| func WithStdin(stdin io.Reader) Option { | ||
| return func(p *Pipeline) { | ||
| p.stdin = Input(stdin) | ||
|
|
@@ -238,6 +248,12 @@ func (p *Pipeline) Start(ctx context.Context) error { | |
|
|
||
| atomic.StoreUint32(&p.started, 1) | ||
| ctx, p.cancel = context.WithCancel(ctx) | ||
| startedOK := false | ||
| defer func() { | ||
| if !startedOK { | ||
| p.cancel() | ||
| } | ||
| }() | ||
|
|
||
| if len(p.stages) == 0 { | ||
| if p.stdout == nil { | ||
|
|
@@ -290,11 +306,15 @@ func (p *Pipeline) Start(ctx context.Context) error { | |
| requirements := s.Requirements() | ||
| if err := requirements.Stdin.Validate(); err != nil { | ||
| closePipes() | ||
| return fmt.Errorf("stdin: %w", err) | ||
| return fmt.Errorf( | ||
| "stage %q has invalid stdin requirement: %w", s.Name(), err, | ||
| ) | ||
| } | ||
| if err := requirements.Stdout.Validate(); err != nil { | ||
| closePipes() | ||
| return fmt.Errorf("stdout: %w", err) | ||
| return fmt.Errorf( | ||
| "stage %q has invalid stdout requirement: %w", s.Name(), err, | ||
| ) | ||
| } | ||
|
|
||
| stageJoiners[i].nextStage = s | ||
|
|
@@ -362,10 +382,19 @@ func (p *Pipeline) Start(ctx context.Context) error { | |
| } | ||
| } | ||
|
|
||
| startedOK = true | ||
| return nil | ||
| } | ||
|
|
||
| func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { | ||
| if p.hasStarted() { | ||
| panic("attempt to get output from a pipeline that has already started") | ||
| } | ||
|
|
||
| if err := p.stdout.Close(); err != nil { | ||
| return nil, fmt.Errorf("closing previous stdout: %w", err) | ||
| } | ||
|
|
||
|
Comment on lines
+394
to
+397
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ISTM that trying to call For that matter, using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd support that, yeah. Making things impossible to misuse is good. |
||
| var buf bytes.Buffer | ||
| p.stdout = Output(&buf) | ||
| err := p.Run(ctx) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.