Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bsondump/bsondump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,28 @@ func runBsondumpWithLargeFile(t *testing.T, size int) (string, error) {
return runBsondump("--bsonFile", bsonFile, "--outFile", outFile)
}

// TestBrokenPipe verifies that bsondump handles a broken pipe gracefully
// (exits with a write error rather than being killed by SIGPIPE).
func TestBrokenPipe(t *testing.T) {
testtype.SkipUnlessTestType(t, testtype.UnitTestType)

// Write 1000 docs (~1 KB each) so the JSON output exceeds the pipe buffer.
dir := t.TempDir()
bsonPath := filepath.Join(dir, "large.bson")
f, err := os.Create(bsonPath)
require.NoError(t, err)
for i := range 1000 {
doc := bson.D{{"_id", int32(i)}, {"data", strings.Repeat("x", 1000)}}
raw, err := bson.Marshal(doc)
require.NoError(t, err)
_, err = f.Write(raw)
require.NoError(t, err)
}
require.NoError(t, f.Close())

testutil.AssertBrokenPipeHandled(t, bsondumpCommand(bsonPath))
}

func runBsondump(args ...string) (string, error) {
cmd := []string{"go", "run", filepath.Join("..", "bsondump", "main")}
cmd = append(cmd, args...)
Expand Down
19 changes: 9 additions & 10 deletions common/signals/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ func Handle() chan struct{} {
return HandleWithInterrupt(nil)
}

// HandleWithInterrupt starts a goroutine which listens for SIGTERM, SIGINT, and
// SIGKILL and explicitly ignores SIGPIPE. It calls the finalizer function when
// the first signal is received and forcibly terminates the program after the
// second. If a nil function is provided, the program will exit after the first
// signal.
// HandleWithInterrupt starts a goroutine which listens for SIGTERM, SIGINT, and SIGKILL. It also
// calles signal.Ignore to explicitly ignore SIGPIPE. It calls the finalizer function when the first
// signal is received and forcibly terminates the program after the second. If a nil function is
// provided, the program will exit after the first signal.
func HandleWithInterrupt(finalizer func()) chan struct{} {
// Ignore SIGPIPE synchronously so there is no race between this call and the first write: a
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the race-prevention logic here also be applied to the handling of TERM & INT?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Aside: I’m a bit surprised this compiles on Windows …)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question re: race. I have no idea why the code is written this way. I think that it's not really a big deal. The worst case is that we get a TERM or INT signal before our handlers are installed, but I don't think that would cause any breakage.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for Windows, I think the Go stdlib & runtime handles the platform-specific details to make this work.

// broken pipe must surface as an EPIPE write error, not kill the process.
signal.Ignore(syscall.SIGPIPE)

finishedChan := make(chan struct{})
go handleSignals(finalizer, finishedChan)
return finishedChan
}

func handleSignals(finalizer func(), finishedChan chan struct{}) {
// explicitly ignore SIGPIPE; the tools should deal with write errors
noopChan := make(chan os.Signal)
signal.Notify(noopChan, syscall.SIGPIPE)

log.Logv(log.DebugLow, "will listen for SIGTERM, SIGINT, and SIGKILL")
sigChan := make(chan os.Signal, 2)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
defer signal.Stop(sigChan)
if finalizer != nil {
Expand Down
46 changes: 46 additions & 0 deletions common/signals/signals_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package signals

import (
"fmt"
"os"
"os/exec"
"testing"

"github.com/mongodb/mongo-tools/common/testutil"
)

// TestMain handles the subprocess mode used by TestBrokenPipeHandledAsWriteError.
func TestMain(m *testing.M) {
if os.Getenv("TEST_BROKEN_PIPE_SUBPROCESS") == "1" {
// Set up SIGPIPE handling as our tools do.
done := HandleWithInterrupt(nil)
defer close(done)

// Write repeatedly to stdout. When the reader closes the pipe,
// the write will return an EPIPE error. We exit 0 to signal that
// the error was surfaced and handled — not a SIGPIPE death.
for {
_, err := fmt.Println("data")
if err != nil {
os.Exit(0)
}
}
}
os.Exit(m.Run())
}

// TestBrokenPipeHandledAsWriteError verifies that when the read end of a pipe
// is closed, our signal handler causes the write error to surface as an EPIPE
// (allowing the tool to exit cleanly) rather than the process being killed by
// SIGPIPE.
func TestBrokenPipeHandledAsWriteError(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestBrokenPipeHandledAsWriteError")
cmd.Env = append(os.Environ(), "TEST_BROKEN_PIPE_SUBPROCESS=1")
testutil.AssertBrokenPipeHandled(t, cmd)
}
60 changes: 60 additions & 0 deletions common/testutil/broken_pipe_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

//go:build !windows

package testutil

import (
"os"
"os/exec"
"syscall"
"testing"

"github.com/stretchr/testify/require"
)

// AssertBrokenPipeHandled starts cmd with its stdout connected to a pipe,
// reads a small amount of output to confirm the process has started writing,
// then breaks the pipe by closing the read end. It asserts the process was
// not killed by SIGPIPE — a broken pipe must surface as a write error that
// the process can handle cleanly.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description seems a bit misleading with the function written as it is because—to me, at least—it implies that we expect a failure due to write error (albeit one that the program handles cleanly).

(I can also read it as not implying that, but it’s not the first impression I got when reading this paragraph.)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we expect the command we run to fail due to a write error. That's what the code looks for. It checks that the process exited but not via a SIGPIPE.

I wondered why we have the conditional on whether we got an exit error, but I think that makes sense. It's possible that a tool could handle a write error and exit 0. Maybe they shouldn't do that, but that's somewhat out of scope for these tests, so I think having the test pass with exit 0 is reasonable.

//
// cmd.Stdout must not be set before calling; this function sets it to the
// write end of a pipe.
func AssertBrokenPipeHandled(t *testing.T, cmd *exec.Cmd) {
t.Helper()

pr, pw, err := os.Pipe()
require.NoError(t, err)
cmd.Stdout = pw

require.NoError(t, cmd.Start())
require.NoError(t, pw.Close())

buf := make([]byte, 64)
_, err = pr.Read(buf)
require.NoError(t, err, "process should write at least some output before the pipe breaks")
require.NoError(t, pr.Close())

err = cmd.Wait()
if err == nil {
return
}

var exitErr *exec.ExitError
require.ErrorAs(t, err, &exitErr, "non-zero exit from cmd.Wait() should always be an ExitError")
status, ok := exitErr.Sys().(syscall.WaitStatus)
require.True(t, ok, "exitErr.Sys() should return a syscall.WaitStatus")
if status.Signaled() {
require.NotEqual(
t,
syscall.SIGPIPE,
status.Signal(),
"process should not be killed by SIGPIPE: broken pipe should surface as a write error",
)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we retain the check for broken pipe in the program’s stdout/stderr? That’ll ensure that the pipe is what caused the failure, and that we handled it cleanly.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should. In the JS code, this had been commented out for mongodump with this comment: TODO: TOOLS-1883 Following tests are flaky.

I don't want to try to figure that out right now, especially since I think this is all pointless and we don't need to suppress SIGPIPE any more (this is a legacy of very early Go versions being weird with SIGPIPE).

}
19 changes: 19 additions & 0 deletions common/testutil/broken_pipe_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

//go:build windows

package testutil

import (
"os/exec"
"testing"
)

// AssertBrokenPipeHandled is a no-op on Windows where SIGPIPE does not apply.
func AssertBrokenPipeHandled(t *testing.T, cmd *exec.Cmd) {
t.Skip("broken pipe handling via SIGPIPE is not applicable on Windows")
}
36 changes: 36 additions & 0 deletions mongodump/mongodump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ package mongodump
import (
"bytes"
"compress/gzip"
"context"
"crypto/sha1"
"encoding/base64"
"fmt"
"io"
"math/rand"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
Expand Down Expand Up @@ -2517,3 +2519,37 @@ func dumpAndCheckPipelineOrder(t *testing.T, collName string, pipeline bson.A) {
os.RemoveAll(dumpDir)
metaFile.Close()
}

// TestBrokenPipe verifies that mongodump handles a broken pipe gracefully
// (exits with a write error rather than being killed by SIGPIPE).
func TestBrokenPipe(t *testing.T) {
testtype.SkipUnlessTestType(t, testtype.IntegrationTestType)

const (
dbName = "mongodump_broken_pipe_test"
collName = "docs"
)

sessionProvider, _, err := testutil.GetBareSessionProvider()
require.NoError(t, err)
client, err := sessionProvider.GetSession()
require.NoError(t, err)
t.Cleanup(func() {
_ = client.Database(dbName).Drop(context.Background())
})

// Insert 2000 docs so the archive output exceeds the pipe buffer.
docs := make([]any, 2000)
for i := range 2000 {
docs[i] = bson.D{{"_id", int32(i)}, {"data", strings.Repeat("x", 500)}}
}
_, err = client.Database(dbName).Collection(collName).InsertMany(context.Background(), docs)
require.NoError(t, err)

args := append(
[]string{"run", filepath.Join("..", "mongodump", "main")},
testutil.GetBareArgs()...,
)
args = append(args, "--db", dbName, "--archive=-")
testutil.AssertBrokenPipeHandled(t, exec.Command("go", args...))
}
38 changes: 38 additions & 0 deletions mongoexport/mongoexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ package mongoexport

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"testing"

"github.com/mongodb/mongo-tools/common/bsonutil"
Expand Down Expand Up @@ -312,3 +316,37 @@ func TestBadOptions(t *testing.T) {
}
}
}

// TestBrokenPipe verifies that mongoexport handles a broken pipe gracefully
// (exits with a write error rather than being killed by SIGPIPE).
func TestBrokenPipe(t *testing.T) {
testtype.SkipUnlessTestType(t, testtype.IntegrationTestType)

const (
dbName = "mongoexport_broken_pipe_test"
collName = "docs"
)

sessionProvider, _, err := testutil.GetBareSessionProvider()
require.NoError(t, err)
client, err := sessionProvider.GetSession()
require.NoError(t, err)
t.Cleanup(func() {
_ = client.Database(dbName).Drop(context.Background())
})

// Insert 1000 docs with a large field so JSON output exceeds the pipe buffer.
docs := make([]any, 1000)
for i := range 1000 {
docs[i] = bson.D{{"_id", int32(i)}, {"data", strings.Repeat("x", 1000)}}
}
_, err = client.Database(dbName).Collection(collName).InsertMany(context.Background(), docs)
require.NoError(t, err)

args := append(
[]string{"run", filepath.Join("..", "mongoexport", "main")},
testutil.GetBareArgs()...,
)
args = append(args, "--db", dbName, "--collection", collName)
testutil.AssertBrokenPipeHandled(t, exec.Command("go", args...))
}
18 changes: 0 additions & 18 deletions test/qa-tests/jstests/bson/bsondump_broken_pipe.js

This file was deleted.

49 changes: 0 additions & 49 deletions test/qa-tests/jstests/dump/dump_broken_pipe.js

This file was deleted.

Loading