Skip to content

Commit 3f1896d

Browse files
committed
Add a new test for our SIGPIPE handling in the common code
This test makes sure that we ignore SIGPIPE. Instead, writes to a broken pipe will cause the write call to return an error. The tools should always check errors on write calls, so these will be surfaced as normal "failure to write to an fd" errors, as opposed to the signal aborting the process.
1 parent 0bbfb8f commit 3f1896d

11 files changed

Lines changed: 230 additions & 157 deletions

File tree

bsondump/bsondump_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,28 @@ func runBsondumpWithLargeFile(t *testing.T, size int) (string, error) {
348348
return runBsondump("--bsonFile", bsonFile, "--outFile", outFile)
349349
}
350350

351+
// TestBrokenPipe verifies that bsondump handles a broken pipe gracefully
352+
// (exits with a write error rather than being killed by SIGPIPE).
353+
func TestBrokenPipe(t *testing.T) {
354+
testtype.SkipUnlessTestType(t, testtype.UnitTestType)
355+
356+
// Write 1000 docs (~1 KB each) so the JSON output exceeds the pipe buffer.
357+
dir := t.TempDir()
358+
bsonPath := filepath.Join(dir, "large.bson")
359+
f, err := os.Create(bsonPath)
360+
require.NoError(t, err)
361+
for i := range 1000 {
362+
doc := bson.D{{"_id", int32(i)}, {"data", strings.Repeat("x", 1000)}}
363+
raw, err := bson.Marshal(doc)
364+
require.NoError(t, err)
365+
_, err = f.Write(raw)
366+
require.NoError(t, err)
367+
}
368+
require.NoError(t, f.Close())
369+
370+
testutil.AssertBrokenPipeHandled(t, bsondumpCommand(bsonPath))
371+
}
372+
351373
func runBsondump(args ...string) (string, error) {
352374
cmd := []string{"go", "run", filepath.Join("..", "bsondump", "main")}
353375
cmd = append(cmd, args...)

common/signals/signals.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,23 @@ func Handle() chan struct{} {
2121
return HandleWithInterrupt(nil)
2222
}
2323

24-
// HandleWithInterrupt starts a goroutine which listens for SIGTERM, SIGINT, and
25-
// SIGKILL and explicitly ignores SIGPIPE. It calls the finalizer function when
26-
// the first signal is received and forcibly terminates the program after the
27-
// second. If a nil function is provided, the program will exit after the first
28-
// signal.
24+
// HandleWithInterrupt starts a goroutine which listens for SIGTERM, SIGINT, and SIGKILL. It also
25+
// calles signal.Ignore to explicitly ignore SIGPIPE. It calls the finalizer function when the first
26+
// signal is received and forcibly terminates the program after the second. If a nil function is
27+
// provided, the program will exit after the first signal.
2928
func HandleWithInterrupt(finalizer func()) chan struct{} {
29+
// Ignore SIGPIPE synchronously so there is no race between this call and the first write: a
30+
// broken pipe must surface as an EPIPE write error, not kill the process.
31+
signal.Ignore(syscall.SIGPIPE)
32+
3033
finishedChan := make(chan struct{})
3134
go handleSignals(finalizer, finishedChan)
3235
return finishedChan
3336
}
3437

3538
func handleSignals(finalizer func(), finishedChan chan struct{}) {
36-
// explicitly ignore SIGPIPE; the tools should deal with write errors
37-
noopChan := make(chan os.Signal)
38-
signal.Notify(noopChan, syscall.SIGPIPE)
39-
4039
log.Logv(log.DebugLow, "will listen for SIGTERM, SIGINT, and SIGKILL")
41-
sigChan := make(chan os.Signal, 2)
40+
sigChan := make(chan os.Signal, 1)
4241
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
4342
defer signal.Stop(sigChan)
4443
if finalizer != nil {

common/signals/signals_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright (C) MongoDB, Inc. 2014-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package signals
8+
9+
import (
10+
"fmt"
11+
"os"
12+
"os/exec"
13+
"testing"
14+
15+
"github.com/mongodb/mongo-tools/common/testutil"
16+
)
17+
18+
// TestMain handles the subprocess mode used by TestBrokenPipeHandledAsWriteError.
19+
func TestMain(m *testing.M) {
20+
if os.Getenv("TEST_BROKEN_PIPE_SUBPROCESS") == "1" {
21+
// Set up SIGPIPE handling as our tools do.
22+
done := HandleWithInterrupt(nil)
23+
defer close(done)
24+
25+
// Write repeatedly to stdout. When the reader closes the pipe,
26+
// the write will return an EPIPE error. We exit 0 to signal that
27+
// the error was surfaced and handled — not a SIGPIPE death.
28+
for {
29+
_, err := fmt.Println("data")
30+
if err != nil {
31+
os.Exit(0)
32+
}
33+
}
34+
}
35+
os.Exit(m.Run())
36+
}
37+
38+
// TestBrokenPipeHandledAsWriteError verifies that when the read end of a pipe
39+
// is closed, our signal handler causes the write error to surface as an EPIPE
40+
// (allowing the tool to exit cleanly) rather than the process being killed by
41+
// SIGPIPE.
42+
func TestBrokenPipeHandledAsWriteError(t *testing.T) {
43+
cmd := exec.Command(os.Args[0], "-test.run=TestBrokenPipeHandledAsWriteError")
44+
cmd.Env = append(os.Environ(), "TEST_BROKEN_PIPE_SUBPROCESS=1")
45+
testutil.AssertBrokenPipeHandled(t, cmd)
46+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (C) MongoDB, Inc. 2014-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
//go:build !windows
8+
9+
package testutil
10+
11+
import (
12+
"os"
13+
"os/exec"
14+
"syscall"
15+
"testing"
16+
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
// AssertBrokenPipeHandled starts cmd with its stdout connected to a pipe,
21+
// reads a small amount of output to confirm the process has started writing,
22+
// then breaks the pipe by closing the read end. It asserts the process was
23+
// not killed by SIGPIPE — a broken pipe must surface as a write error that
24+
// the process can handle cleanly.
25+
//
26+
// cmd.Stdout must not be set before calling; this function sets it to the
27+
// write end of a pipe.
28+
func AssertBrokenPipeHandled(t *testing.T, cmd *exec.Cmd) {
29+
t.Helper()
30+
31+
pr, pw, err := os.Pipe()
32+
require.NoError(t, err)
33+
cmd.Stdout = pw
34+
35+
require.NoError(t, cmd.Start())
36+
require.NoError(t, pw.Close())
37+
38+
buf := make([]byte, 64)
39+
_, err = pr.Read(buf)
40+
require.NoError(t, err, "process should write at least some output before the pipe breaks")
41+
require.NoError(t, pr.Close())
42+
43+
err = cmd.Wait()
44+
if err == nil {
45+
return
46+
}
47+
48+
var exitErr *exec.ExitError
49+
require.ErrorAs(t, err, &exitErr, "non-zero exit from cmd.Wait() should always be an ExitError")
50+
status, ok := exitErr.Sys().(syscall.WaitStatus)
51+
require.True(t, ok, "exitErr.Sys() should return a syscall.WaitStatus")
52+
if status.Signaled() {
53+
require.NotEqual(
54+
t,
55+
syscall.SIGPIPE,
56+
status.Signal(),
57+
"process should not be killed by SIGPIPE: broken pipe should surface as a write error",
58+
)
59+
}
60+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (C) MongoDB, Inc. 2014-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
//go:build windows
8+
9+
package testutil
10+
11+
import (
12+
"os/exec"
13+
"testing"
14+
)
15+
16+
// AssertBrokenPipeHandled is a no-op on Windows where SIGPIPE does not apply.
17+
func AssertBrokenPipeHandled(t *testing.T, cmd *exec.Cmd) {
18+
t.Skip("broken pipe handling via SIGPIPE is not applicable on Windows")
19+
}

mongodump/mongodump_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ package mongodump
99
import (
1010
"bytes"
1111
"compress/gzip"
12+
"context"
1213
"crypto/sha1"
1314
"encoding/base64"
1415
"fmt"
1516
"io"
1617
"math/rand"
1718
"os"
19+
"os/exec"
1820
"path/filepath"
1921
"regexp"
2022
"strings"
@@ -2517,3 +2519,37 @@ func dumpAndCheckPipelineOrder(t *testing.T, collName string, pipeline bson.A) {
25172519
os.RemoveAll(dumpDir)
25182520
metaFile.Close()
25192521
}
2522+
2523+
// TestBrokenPipe verifies that mongodump handles a broken pipe gracefully
2524+
// (exits with a write error rather than being killed by SIGPIPE).
2525+
func TestBrokenPipe(t *testing.T) {
2526+
testtype.SkipUnlessTestType(t, testtype.IntegrationTestType)
2527+
2528+
const (
2529+
dbName = "mongodump_broken_pipe_test"
2530+
collName = "docs"
2531+
)
2532+
2533+
sessionProvider, _, err := testutil.GetBareSessionProvider()
2534+
require.NoError(t, err)
2535+
client, err := sessionProvider.GetSession()
2536+
require.NoError(t, err)
2537+
t.Cleanup(func() {
2538+
_ = client.Database(dbName).Drop(context.Background())
2539+
})
2540+
2541+
// Insert 2000 docs so the archive output exceeds the pipe buffer.
2542+
docs := make([]any, 2000)
2543+
for i := range 2000 {
2544+
docs[i] = bson.D{{"_id", int32(i)}, {"data", strings.Repeat("x", 500)}}
2545+
}
2546+
_, err = client.Database(dbName).Collection(collName).InsertMany(context.Background(), docs)
2547+
require.NoError(t, err)
2548+
2549+
args := append(
2550+
[]string{"run", filepath.Join("..", "mongodump", "main")},
2551+
testutil.GetBareArgs()...,
2552+
)
2553+
args = append(args, "--db", dbName, "--archive=-")
2554+
testutil.AssertBrokenPipeHandled(t, exec.Command("go", args...))
2555+
}

mongoexport/mongoexport_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ package mongoexport
88

99
import (
1010
"bytes"
11+
"context"
1112
"encoding/json"
1213
"errors"
1314
"fmt"
1415
"io"
1516
"os"
17+
"os/exec"
18+
"path/filepath"
1619
"runtime"
20+
"strings"
1721
"testing"
1822

1923
"github.com/mongodb/mongo-tools/common/bsonutil"
@@ -312,3 +316,37 @@ func TestBadOptions(t *testing.T) {
312316
}
313317
}
314318
}
319+
320+
// TestBrokenPipe verifies that mongoexport handles a broken pipe gracefully
321+
// (exits with a write error rather than being killed by SIGPIPE).
322+
func TestBrokenPipe(t *testing.T) {
323+
testtype.SkipUnlessTestType(t, testtype.IntegrationTestType)
324+
325+
const (
326+
dbName = "mongoexport_broken_pipe_test"
327+
collName = "docs"
328+
)
329+
330+
sessionProvider, _, err := testutil.GetBareSessionProvider()
331+
require.NoError(t, err)
332+
client, err := sessionProvider.GetSession()
333+
require.NoError(t, err)
334+
t.Cleanup(func() {
335+
_ = client.Database(dbName).Drop(context.Background())
336+
})
337+
338+
// Insert 1000 docs with a large field so JSON output exceeds the pipe buffer.
339+
docs := make([]any, 1000)
340+
for i := range 1000 {
341+
docs[i] = bson.D{{"_id", int32(i)}, {"data", strings.Repeat("x", 1000)}}
342+
}
343+
_, err = client.Database(dbName).Collection(collName).InsertMany(context.Background(), docs)
344+
require.NoError(t, err)
345+
346+
args := append(
347+
[]string{"run", filepath.Join("..", "mongoexport", "main")},
348+
testutil.GetBareArgs()...,
349+
)
350+
args = append(args, "--db", dbName, "--collection", collName)
351+
testutil.AssertBrokenPipeHandled(t, exec.Command("go", args...))
352+
}

test/qa-tests/jstests/bson/bsondump_broken_pipe.js

Lines changed: 0 additions & 18 deletions
This file was deleted.

test/qa-tests/jstests/dump/dump_broken_pipe.js

Lines changed: 0 additions & 49 deletions
This file was deleted.

0 commit comments

Comments
 (0)