diff --git a/lib/asyncwrite/asyncwrite.go b/lib/asyncwrite/asyncwrite.go index 6e653b105..39d016d2d 100644 --- a/lib/asyncwrite/asyncwrite.go +++ b/lib/asyncwrite/asyncwrite.go @@ -32,6 +32,10 @@ func (bfw *BackgroundWriter) writeWorker() { pool.Put(data) if writeErr != nil { err = writeErr + // Drain remaining buffers from channel to avoid memory leak + for remaining := range bfw.ch { + pool.Put(remaining) + } break } } @@ -53,6 +57,7 @@ func (bfw *BackgroundWriter) Write(p []byte) (n int, err error) { case bfw.ch <- b: return len(b), nil case err := <-bfw.done: + pool.Put(b) return 0, err } } diff --git a/lib/ffi/cunative/decode_sdr.go b/lib/ffi/cunative/decode_sdr.go index 60b820e91..52aca05e4 100644 --- a/lib/ffi/cunative/decode_sdr.go +++ b/lib/ffi/cunative/decode_sdr.go @@ -121,6 +121,8 @@ func Decode(replica, key io.Reader, out io.Writer) error { // Read replica rn, err := io.ReadFull(replica, rbuf) if err != nil && err != io.ErrUnexpectedEOF { + pool.Put(rbuf) + pool.Put(kbuf) if err == io.EOF { return } @@ -131,11 +133,15 @@ func Decode(replica, key io.Reader, out io.Writer) error { // Read key kn, err := io.ReadFull(key, kbuf[:rn]) if err != nil && err != io.ErrUnexpectedEOF { + pool.Put(rbuf) + pool.Put(kbuf) errChan <- err return } if kn != rn { + pool.Put(rbuf) + pool.Put(kbuf) errChan <- io.ErrUnexpectedEOF return } diff --git a/lib/ffi/cunative/decode_snap.go b/lib/ffi/cunative/decode_snap.go index ab34e312d..63554f99b 100644 --- a/lib/ffi/cunative/decode_snap.go +++ b/lib/ffi/cunative/decode_snap.go @@ -122,6 +122,8 @@ func DecodeSnap(spt abi.RegisteredSealProof, commD, commK cid.Cid, key, replica // Read replica rn, err := io.ReadFull(replica, rbuf) if err != nil && err != io.ErrUnexpectedEOF { + pool.Put(rbuf) + pool.Put(kbuf) if err == io.EOF { return } @@ -132,11 +134,15 @@ func DecodeSnap(spt abi.RegisteredSealProof, commD, commK cid.Cid, key, replica // Read key kn, err := io.ReadFull(key, kbuf[:rn]) if err != nil && err != io.ErrUnexpectedEOF { + pool.Put(rbuf) + pool.Put(kbuf) errChan <- err return } if kn != rn { + pool.Put(rbuf) + pool.Put(kbuf) errChan <- io.ErrUnexpectedEOF return } diff --git a/lib/pieceprovider/sector_reader.go b/lib/pieceprovider/sector_reader.go index 02a7dd406..de3600267 100644 --- a/lib/pieceprovider/sector_reader.go +++ b/lib/pieceprovider/sector_reader.go @@ -82,6 +82,7 @@ func (p *SectorReader) ReadPiece(ctx context.Context, sector storiface.SectorRef upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf) if err != nil { + pool.Put(buf) r.Close() // nolint return nil, xerrors.Errorf("creating unpadded reader: %w", err) } @@ -89,6 +90,7 @@ func (p *SectorReader) ReadPiece(ctx context.Context, sector storiface.SectorRef bir := bufio.NewReaderSize(upr, 127) if startOffset > uint64(startOffsetAligned) { if _, err := bir.Discard(startOffsetDiff); err != nil { + pool.Put(buf) r.Close() // nolint return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err) } diff --git a/tasks/pdp/task_prove.go b/tasks/pdp/task_prove.go index b5772c831..aca6ed8f6 100644 --- a/tasks/pdp/task_prove.go +++ b/tasks/pdp/task_prove.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ipfs/go-cid" + pool "github.com/libp2p/go-buffer-pool" "github.com/minio/sha256-simd" "github.com/oklog/ulid" "github.com/samber/lo" @@ -435,6 +436,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6 if err != nil { return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree: %w", err) } + defer pool.Put(memTree) log.Debugw("provePiece", "rootChallengeOffset", rootChallengeOffset, "challengedLeaf", challengedLeaf) mProof, err := proof.MemtreeProof(memTree, challengedLeaf) @@ -519,6 +521,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6 if err != nil { return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree: %w", err) } + defer pool.Put(memtree) // Get challenge leaf in subTree subTreeChallenge := challengedLeaf - startLeaf @@ -552,6 +555,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6 if err != nil { return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree from snapshot: %w", err) } + defer pool.Put(mtree) // Generate merkle proof from snapShot node to commP proofs, err := proof.MemtreeProof(mtree, snapshotNodeIndex)