From 9937acb8be0fc048c20d08017188ab8b183fa24c Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 7 Jan 2026 02:33:33 +0100 Subject: [PATCH] fix(bitswap/network): set read deadline before stream Close to prevent blocking SendMessage() can block indefinitely when the remote peer is slow or unresponsive during the multistream-select handshake completion. The fix sets a read deadline (using the calculated send timeout) before calling stream.Close(), ensuring the operation will time out rather than block indefinitely. See: https://github.com/multiformats/go-multistream/issues/47 See: https://github.com/ipshipyard/waterworks-infra/issues/860 --- CHANGELOG.md | 2 + bitswap/network/bsnet/ipfs_impl.go | 8 ++ bitswap/network/bsnet/ipfs_impl_test.go | 105 ++++++++++++++++++++++-- 3 files changed, 106 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af5b02e5c..7bb502cec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ The following emojis are used to highlight certain changes: ### Fixed +- `bitswap/network`: Fixed goroutine leak that could cause bitswap to stop serving blocks after extended uptime. The root cause is `stream.Close()` blocking indefinitely when remote peers are unresponsive during multistream handshake ([go-libp2p#3448](https://github.com/libp2p/go-libp2p/pull/3448)). This PR ([#1083](https://github.com/ipfs/boxo/pull/1083)) adds a localized fix specific to bitswap's `SendMessage` by setting a read deadline before closing streams. + ### Security diff --git a/bitswap/network/bsnet/ipfs_impl.go b/bitswap/network/bsnet/ipfs_impl.go index 4310bee05..9420159ba 100644 --- a/bitswap/network/bsnet/ipfs_impl.go +++ b/bitswap/network/bsnet/ipfs_impl.go @@ -390,6 +390,14 @@ func (bsnet *impl) SendMessage( return err } + // Set a read deadline to prevent Close() from blocking indefinitely + // when the remote peer is slow or unresponsive during multistream + // handshake completion. + // See: https://github.com/multiformats/go-multistream/issues/47 + // See: https://github.com/ipshipyard/waterworks-infra/issues/860 + if err := s.SetReadDeadline(time.Now().Add(timeout)); err != nil { + log.Debugf("error setting read deadline: %s", err) + } return s.Close() } diff --git a/bitswap/network/bsnet/ipfs_impl_test.go b/bitswap/network/bsnet/ipfs_impl_test.go index 670cea6f4..38f370ddf 100644 --- a/bitswap/network/bsnet/ipfs_impl_test.go +++ b/bitswap/network/bsnet/ipfs_impl_test.go @@ -73,18 +73,22 @@ var errMockNetErr = errors.New("network err") type ErrStream struct { p2pnet.Stream - lk sync.Mutex - err error - timingOut bool - closed bool + lk sync.Mutex + err error + timingOut bool + closed bool + blockOnClose bool // if true, Close() will block until deadline + readDeadlineSet bool // tracks if SetReadDeadline was called + readDeadline time.Time // the deadline that was set } type ErrHost struct { host.Host - lk sync.Mutex - err error - timingOut bool - streams []*ErrStream + lk sync.Mutex + err error + timingOut bool + blockOnClose bool + streams []*ErrStream } func (es *ErrStream) Write(b []byte) (int, error) { @@ -100,11 +104,36 @@ func (es *ErrStream) Write(b []byte) (int, error) { return es.Stream.Write(b) } +func (es *ErrStream) SetReadDeadline(t time.Time) error { + es.lk.Lock() + defer es.lk.Unlock() + es.readDeadlineSet = true + es.readDeadline = t + return es.Stream.SetReadDeadline(t) +} + func (es *ErrStream) Close() error { es.lk.Lock() + blockOnClose := es.blockOnClose + readDeadlineSet := es.readDeadlineSet + readDeadline := es.readDeadline es.closed = true es.lk.Unlock() + if blockOnClose { + if readDeadlineSet && !readDeadline.IsZero() { + // Simulate blocking until deadline (the fix sets a deadline, so this will timeout) + waitTime := time.Until(readDeadline) + if waitTime > 0 { + time.Sleep(waitTime) + } + } else { + // No deadline set - would block forever (demonstrates the bug without fix) + // In test, we use a channel to avoid actually blocking forever + select {} + } + } + return es.Stream.Close() } @@ -140,7 +169,7 @@ func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID return nil, context.DeadlineExceeded } stream, err := eh.Host.NewStream(ctx, p, pids...) - estrm := &ErrStream{Stream: stream, err: eh.err, timingOut: eh.timingOut} + estrm := &ErrStream{Stream: stream, err: eh.err, timingOut: eh.timingOut, blockOnClose: eh.blockOnClose} eh.streams = append(eh.streams, estrm) return estrm, err @@ -170,6 +199,18 @@ func (eh *ErrHost) setTimeoutState(timingOut bool) { } } +func (eh *ErrHost) setBlockOnClose(block bool) { + eh.lk.Lock() + defer eh.lk.Unlock() + + eh.blockOnClose = block + for _, s := range eh.streams { + s.lk.Lock() + s.blockOnClose = block + s.lk.Unlock() + } +} + func TestMessageSendAndReceive(t *testing.T) { // create network ctx := context.Background() @@ -671,3 +712,49 @@ func TestNetworkCounters(t *testing.T) { testNetworkCounters(t, 10-n, n) } } + +// TestSendMessageCloseDoesNotHang verifies that SendMessage calls SetReadDeadline +// before Close(), preventing indefinite blocking when the remote peer is +// unresponsive during multistream handshake completion. +// +// This test uses ErrStream to simulate a blocking Close() that only unblocks +// when SetReadDeadline has been called. This proves the fix works without +// relying on real network timeouts. +func TestSendMessageCloseDoesNotHang(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + p1 := tnet.RandIdentityOrFatal(t) + r1 := newReceiver() + p2 := tnet.RandIdentityOrFatal(t) + r2 := newReceiver() + + // Use prepareNetwork but we'll configure blocking after + eh1, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2) + + // Configure h1's streams to block on Close() - this simulates the scenario + // where multistream handshake read would block indefinitely. + // With the fix, SetReadDeadline is called before Close(), so the simulated + // blocking will respect the deadline and unblock. + eh1.setBlockOnClose(true) + + // SendMessage should complete because the fix sets a read deadline before + // calling Close(). The ErrStream.Close() will block until the deadline, + // simulating the real-world scenario where Close() would hang without + // a deadline. + start := time.Now() + err := bsnet1.SendMessage(ctx, p2.ID(), msg) + elapsed := time.Since(start) + + // The sendTimeout for a small message is minSendTimeout (10s). + // With the fix, Close() should return after waiting until the deadline. + // Without the fix, it would hang forever (ErrStream.Close blocks indefinitely + // when blockOnClose=true and no deadline is set). + maxExpected := 15 * time.Second // minSendTimeout + margin + if elapsed > maxExpected { + t.Fatalf("SendMessage took %v, expected < %v (should timeout via SetReadDeadline)", elapsed, maxExpected) + } + + // Error is expected because the simulated blocking causes the deadline to be reached + t.Logf("SendMessage returned in %v with error: %v", elapsed, err) +}