Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c4146f4
Shush the linter
mhagger Dec 17, 2023
de3225f
pipeline_test.go: remove unnecessary tmpdirs and simplify test setup
mhagger Apr 8, 2026
9fd5b2c
Add some benchmarks of moving a bunch of data through a pipeline
mhagger Dec 16, 2023
ca78f76
Simplify the `NopCloser`s
mhagger Dec 15, 2023
2f59602
Stage: change the interface to make stdin/stdout handling more flexible
mhagger Dec 15, 2023
cc9cd67
Add some tests that `Pipeline.Start()` picks the right stdin/stdout
mhagger Dec 17, 2023
6c6dfe5
Port MemoryLimitWithObserver to new Stage interface
znull Apr 8, 2026
cd47557
Restore panic handler for Function stages
znull Apr 8, 2026
48dffbd
Fix memoryWatchStage.Wait() to always call stopWatching()
znull Apr 8, 2026
92ad57c
Fix lint errors
znull Apr 9, 2026
6a0abf3
Restore identity-copy behavior for empty pipelines
znull Apr 8, 2026
9606368
Add tests pinning command stdout fd-pass fast path
znull May 28, 2026
3a1f8d8
Pool buffers for the non-*os.File command stdout copy path
znull May 28, 2026
46f19e4
Forward panic handler through wrapper stages
znull May 28, 2026
121ae94
Avoid leaking pooled-stdout goroutine when cmd.Start() fails
znull May 29, 2026
bf5820b
Bump module path to v2 for breaking Stage interface change
znull May 29, 2026
6102173
remove IOPreferenceNil
znull May 29, 2026
754784b
properly handle nil input/output
znull May 29, 2026
9dca23e
allow earlier GC of lateClosers
znull May 29, 2026
9ee30b1
export Unwrap{Reader,Writer}; unwrap goStage stdin fully
znull May 30, 2026
75797b9
rewrite cleanup/unwind in forward order
znull May 29, 2026
512c345
Thread panic handler through Start; drop StagePanicHandlerAware
znull May 29, 2026
4278c36
Recover panics escaping the memory-watch goroutine
znull May 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# go-pipe [![GoDoc](https://pkg.go.dev/badge/github.com/github/docs)](https://pkg.go.dev/github.com/github/go-pipe)
# go-pipe [![GoDoc](https://pkg.go.dev/badge/github.com/github/docs)](https://pkg.go.dev/github.com/github/go-pipe/v2)
A package used to easily build command pipelines in your Go applications

# Important
We have not thoroughly tested this package on OSs other than Linux, especially Windows. At this time, using this package on Windows based systems is considered experimental and will be supported only on a best effort basis.

# Links

* [Docs](https://pkg.go.dev/github.com/github/go-pipe)
* [Docs](https://pkg.go.dev/github.com/github/go-pipe/v2)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/github/go-pipe
module github.com/github/go-pipe/v2

go 1.24.0

Expand Down
2 changes: 1 addition & 1 deletion internal/ptree/ptree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strconv"
"testing"

"github.com/github/go-pipe/internal/ptree"
"github.com/github/go-pipe/v2/internal/ptree"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down
167 changes: 140 additions & 27 deletions pipe/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ var errProcessInfoMissing = errors.New("cmd.Process is nil")
// commandStage is a pipeline `Stage` based on running an external
// command and piping the data through its stdin and stdout.
type commandStage struct {
name string
stdin io.Closer
cmd *exec.Cmd
name string
cmd *exec.Cmd

// lateClosers is a list of things that have to be closed once the
// command has finished.
lateClosers []io.Closer

done chan struct{}
wg errgroup.Group
stderr bytes.Buffer
Expand All @@ -32,6 +36,10 @@ type commandStage struct {
ctxErr atomic.Value
}

var (
_ Stage = (*commandStage)(nil)
)

// Command returns a pipeline `Stage` based on the specified external
// `command`, run with the given command-line `args`. Its stdin and
// stdout are handled as usual, and its stderr is collected and
Expand Down Expand Up @@ -61,33 +69,100 @@ func (s *commandStage) Name() string {
return s.name
}

func (s *commandStage) Preferences() StagePreferences {
return StagePreferences{
StdinPreference: IOPreferenceFile,
StdoutPreference: IOPreferenceFile,
}
}

func (s *commandStage) Start(
ctx context.Context, env Env, stdin io.ReadCloser,
) (io.ReadCloser, error) {
ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, _ StartOptions,
) error {
if s.cmd.Dir == "" {
s.cmd.Dir = env.Dir
}

s.setupEnv(ctx, env)

// Things that have to be closed as soon as the command has
// started:
var earlyClosers []io.Closer

// See the type comment for `Stage` and the long comment in
// `Pipeline.WithStdin()` for the explanation of this unwrapping
// and closing behavior.

if stdin != nil {
// See the long comment in `Pipeline.Start()` for the
// explanation of this special case.
switch stdin := stdin.(type) {
case nopCloser:
s.cmd.Stdin = stdin.Reader
case nopCloserWriterTo:
s.cmd.Stdin = stdin.Reader
case readerNopCloser:
// In this case, we shouldn't close it. But unwrap it for
// efficiency's sake:
s.cmd.Stdin = UnwrapReader(stdin)
case *os.File:
// In this case, we can close stdin as soon as the command
// has started:
s.cmd.Stdin = stdin
earlyClosers = append(earlyClosers, stdin)
default:
// In this case, we need to close `stdin`, but we should
// only do so after the command has finished:
s.cmd.Stdin = stdin
s.lateClosers = append(s.lateClosers, stdin)
}
// Also keep a copy so that we can close it when the command exits:
s.stdin = stdin
}

stdout, err := s.cmd.StdoutPipe()
if err != nil {
return nil, err
if stdout != nil {
// See the long comment in `Pipeline.Start()` for the
// explanation of this special case.
switch stdout := stdout.(type) {
case writerNopCloser:
// We shouldn't close the wrapped writer. Unwrap it; if
// it's an `*os.File`, exec.Cmd can pass the fd directly
// to the child. Otherwise route the copy through our own
// pipe so we can use a pooled buffer.
writer := UnwrapWriter(stdout)
if f, ok := writer.(*os.File); ok {
s.cmd.Stdout = f
} else {
ec, err := s.setupPooledStdout(writer)
if err != nil {
return err
}
earlyClosers = append(earlyClosers, ec)
}
case *os.File:
// In this case, we can close stdout as soon as the command
// has started:
s.cmd.Stdout = stdout
earlyClosers = append(earlyClosers, stdout)
default:
// In this case, we need to close `stdout`, but we should
// only do so after the command has finished. We also
// route the copy through our own pipe so we can use a
// pooled buffer rather than letting exec.Cmd allocate a
// fresh 32KB buffer for its internal io.Copy.
ec, err := s.setupPooledStdout(stdout)
if err != nil {
return err
}
earlyClosers = append(earlyClosers, ec)
s.lateClosers = append(s.lateClosers, stdout)
}
}

closeEarlyClosers := func() {
for _, closer := range earlyClosers {
_ = closer.Close()
}
}

// On error, Close any pipes we created and wait for the goroutines to
// exit before propagating the error.
cleanupOnStartFailure := func() {
closeEarlyClosers()
_ = s.wg.Wait()
_ = s.closeLateClosers()
}

// If the caller hasn't arranged otherwise, read the command's
Expand All @@ -99,7 +174,8 @@ func (s *commandStage) Start(
// can be sure.
p, err := s.cmd.StderrPipe()
if err != nil {
return nil, err
cleanupOnStartFailure()
return err
}
s.wg.Go(func() error {
_, err := io.Copy(&s.stderr, p)
Expand All @@ -116,9 +192,12 @@ func (s *commandStage) Start(
s.runInOwnProcessGroup()

if err := s.cmd.Start(); err != nil {
return nil, err
cleanupOnStartFailure()
return err
}

closeEarlyClosers()

// Arrange for the process to be killed (gently) if the context
// expires before the command exits normally:
go func() {
Expand All @@ -130,7 +209,7 @@ func (s *commandStage) Start(
}
}()

return stdout, nil
return nil
}

// setupEnv sets or modifies the environment that will be passed to
Expand Down Expand Up @@ -219,21 +298,55 @@ func (s *commandStage) Wait() error {

// Make sure that any stderr is copied before `s.cmd.Wait()`
// closes the read end of the pipe:
wErr := s.wg.Wait()
wgErr := s.wg.Wait()

err := s.cmd.Wait()
err = s.filterCmdError(err)

if err == nil && wErr != nil {
err = wErr
if err == nil && wgErr != nil {
err = wgErr
}

if s.stdin != nil {
cErr := s.stdin.Close()
if cErr != nil && err == nil {
return cErr
}
if closeErr := s.closeLateClosers(); err == nil {
err = closeErr
}

return err
}

func (s *commandStage) closeLateClosers() error {
var err error
for _, closer := range s.lateClosers {
if closeErr := closer.Close(); closeErr != nil && err == nil {
err = closeErr
}
Comment on lines +319 to 322
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think about setting s.lateClosers = nil here, to allow those objects to be freed in case the stage object remains reachable for longer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable and low risk, I'll add it

}
s.lateClosers = nil
return err
}

// setupPooledStdout creates an `os.Pipe()`, sets it as `cmd.Stdout`,
// and starts a goroutine that copies from the read end to `dst` using
// a pooled buffer (or `dst.ReadFrom` when `dst` implements it). The
// returned closer is the write end of the pipe; the caller must add
// it to `earlyClosers` so it is closed once the command has started.
//
// The buffer-pool optimization works for command stages whose stdout is
// not an `*os.File`. Without it, `exec.Cmd` would set up its own pipe
// and run `io.Copy` with a freshly allocated 32KB buffer per invocation.
func (s *commandStage) setupPooledStdout(dst io.Writer) (io.Closer, error) {
pr, pw, err := os.Pipe()
if err != nil {
return nil, err
}
s.cmd.Stdout = pw
s.wg.Go(func() error {
defer pr.Close()
_, err := pooledCopy(dst, pr)
if err != nil && !errors.Is(err, os.ErrClosed) {
return err
}
return nil
})
return pw, nil
}
2 changes: 1 addition & 1 deletion pipe/command_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package pipe
import (
"context"

"github.com/github/go-pipe/internal/ptree"
"github.com/github/go-pipe/v2/internal/ptree"
)

// On linux, we can limit or observe memory usage in command stages.
Expand Down
2 changes: 1 addition & 1 deletion pipe/command_nil_panic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestKillWithFailedStart(t *testing.T) {

stage := Command("/this/path/does/not/exist/invalid_command_12345")

_, err := stage.Start(ctx, Env{}, nil)
err := stage.Start(ctx, Env{}, nil, nil, StartOptions{})
if err == nil {
t.Fatal("Expected start to fail, but it succeeded")
}
Expand Down
63 changes: 63 additions & 0 deletions pipe/command_starterror_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package pipe_test

import (
"bytes"
"context"
"os/exec"
"sync/atomic"
"testing"

"github.com/github/go-pipe/v2/pipe"
)

// TestCommandStageStartFailureNoRace verifies that when `cmd.Start()`
// fails (e.g. command not found), the goroutine that
// `setupPooledStdout` spawned does not leak past `Pipeline.Run()`.
// `bytes.Buffer.ReadFrom` writes to the buffer's slice header via
// `grow()` before its first `Read()`, so a leaked goroutine races
// with the caller's access to the destination buffer once Run
// returns the error. Run a tight loop so `-race` is likely to catch
// any regression.
func TestCommandStageStartFailureNoRace(t *testing.T) {
for i := 0; i < 50; i++ {
var buf bytes.Buffer
p := pipe.New(pipe.WithStdout(&buf))
p.Add(pipe.CommandStage("nope", exec.Command("this-binary-does-not-exist-xyz123")))
if err := p.Run(context.Background()); err == nil {
t.Fatalf("expected error from non-existent command, got nil")
}
_ = buf.String()
}
}

// trackingWriteCloser is a non-`*os.File` `io.WriteCloser` that records
// whether it has been closed. Because it isn't an `*os.File`, a command
// stage routes it through `setupPooledStdout` and closes it as a "late
// closer" (i.e. only after the command finishes / cleanup runs).
type trackingWriteCloser struct {
closed atomic.Bool
}

func (w *trackingWriteCloser) Write(p []byte) (int, error) { return len(p), nil }

func (w *trackingWriteCloser) Close() error {
w.closed.Store(true)
return nil
}

// TestCommandStageStartFailureClosesLateClosers verifies that a
// `WithStdoutCloser` on the last stage is closed even when `cmd.Start()`
// fails. The closer is registered as a "late closer," which is normally
// drained by `Wait()`; since `Wait()` never runs when `Start()` fails,
// the start-failure cleanup path must close it instead.
func TestCommandStageStartFailureClosesLateClosers(t *testing.T) {
w := &trackingWriteCloser{}
p := pipe.New(pipe.WithStdoutCloser(w))
p.Add(pipe.CommandStage("nope", exec.Command("this-binary-does-not-exist-xyz123")))
if err := p.Run(context.Background()); err == nil {
t.Fatalf("expected error from non-existent command, got nil")
}
if !w.closed.Load() {
t.Fatalf("expected late closer to be closed after Start() failure")
}
}
Loading
Loading