Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func BenchmarkFSTree_Head(b *testing.B) {
}
}

func BenchmarkFSTree_HeadToBuffer(b *testing.B) {
for _, size := range payloadSizes {
b.Run(generateSizeLabel(size), func(b *testing.B) {
runReadBenchmark(b, "ReadHeader", size)
})
}
}

func BenchmarkFSTree_Get(b *testing.B) {
for _, size := range payloadSizes {
b.Run(generateSizeLabel(size), func(b *testing.B) {
Expand Down Expand Up @@ -89,6 +97,18 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) {
addr := prepareSingleObject(b, fsTree, payloadSize)

b.ReportAllocs()

if methodName == "ReadHeader" {
buf := make([]byte, object.MaxHeaderLen*2)

for b.Loop() {
_, err := fsTree.ReadHeader(addr, func() []byte { return buf })
require.NoError(b, err)
}

return
}

for b.Loop() {
testRead(fsTree, addr)
}
Expand All @@ -99,6 +119,19 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) {
addrs := prepareMultipleObjects(b, fsTree, payloadSize)

b.ReportAllocs()

if methodName == "ReadHeader" {
buf := make([]byte, object.MaxHeaderLen*2)

b.ResetTimer()
for k := range b.N {
_, err := fsTree.ReadHeader(addrs[k%len(addrs)], func() []byte { return buf })
require.NoError(b, err)
}

return
}

b.ResetTimer()
for k := range b.N {
testRead(fsTree, addrs[k%len(addrs)])
Expand All @@ -111,6 +144,18 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) {
addr := prepareSingleObject(b, fsTree, payloadSize)

b.ReportAllocs()

if methodName == "ReadHeader" {
buf := make([]byte, object.MaxHeaderLen*2)

for b.Loop() {
_, err := fsTree.ReadHeader(addr, func() []byte { return buf })
require.NoError(b, err)
}

return
}

for b.Loop() {
testRead(fsTree, addr)
}
Expand All @@ -122,6 +167,19 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) {
addrs := prepareMultipleObjects(b, fsTree, payloadSize)

b.ReportAllocs()

if methodName == "ReadHeader" {
buf := make([]byte, object.MaxHeaderLen*2)

b.ResetTimer()
for k := range b.N {
_, err := fsTree.ReadHeader(addrs[k%len(addrs)], func() []byte { return buf })
require.NoError(b, err)
}

return
}

b.ResetTimer()
for k := range b.N {
testRead(fsTree, addrs[k%len(addrs)])
Expand Down
121 changes: 107 additions & 14 deletions pkg/local_object_storage/blobstor/fstree/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,81 @@ func (t *FSTree) Head(addr oid.Address) (*object.Object, error) {
return obj, nil
}

// ReadHeader reads header of the referenced object from t into buffer provided
// by getBuffer. Returns number of bytes read.
//
// If object is missing, ReadHeader returns [apistatus.ErrObjectNotFound].
//
// The buffer must have 2*[object.MaxHeaderLen] bytes len at least.
func (t *FSTree) ReadHeader(addr oid.Address, getBuffer func() []byte) (int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ReadHeaderInto(addr oid.Address, buf []byte) (int, error) which is very close to https://pkg.go.dev/io#Reader (or returning ([]byte, error) like https://pkg.go.dev/math/big#Int.FillBytes or https://pkg.go.dev/encoding#TextAppender (although that one appends)). Buffering is still completely out of FSTree scope, but there are no functions involved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought about io.Reader-like interface, others are not appropriate for current needs

it's good, but idle allocs when file cannot be opened (e.g. 404) concern me. Dynamic allocation after opening the file is what we have now. Buffer's parameterization requires its preallocation. It may remain unused. Buffer pool (TBD) will allow to potentially reuse the buffer, but there can also be a lot of erroneous calls

what do u think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expect a lot of 404 at this level. Normally it's filtered through metabase and it knows what we have or not correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in dont mind to accept this within shard. But a similar issue will arise at higher levels: who manages buffers - service or storage engine?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to pass something down anyway, whether it's a callback or a slice. Then you can consider this to be a price of a call (successful or not). Also, if you're to manage response message buffers you can arrange a slice prefixed with HEAD (or GET) reply wrapper (or some space for it) and then pass a subslice for header itself here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether it's a callback or a slice

this is the only thing that is rly being discussed. Im proposing a callback. Is there any real reason to choose another option?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much simpler interface. Consider addr/buf matching for example, you have to resolve it in callback instead of having some direct result (where to apply that int?). Consider debugging, where does this buffer come from really? Can I check its contents at shard/engine level (where callers won't even see it)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in practice, the buffer is allocated once and reused. Callback simply allows to allocate it when it is really needed

i understand the interface will be a bit simpler. But idle creation in some cases and closeness to the current behavior are still relevant to me. I don't see any real reason to change this behavior yet

p := t.treePath(addr)

f, err := os.Open(p)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return 0, logicerr.Wrap(apistatus.ErrObjectNotFound)
}
return 0, fmt.Errorf("read file %q: %w", p, err)
}
defer f.Close()

buf := getBuffer()
if len(buf) < 2*object.MaxHeaderLen {
return 0, fmt.Errorf("too short buffer %d bytes", len(buf))
}

from, to, rem, err := t.readHeader(buf, addr.Object(), f)
if err != nil {
return 0, err
}

// following mostly copied from readHeaderAndPayload()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to duplicate the code.


compressed := t.IsCompressed(buf[from:to])
if !compressed {
return copy(buf, buf[from:to]), nil
}

if to-from < object.MaxHeaderLen {
dec, err := t.DecompressForce(buf[from:to])
if err != nil {
return 0, fmt.Errorf("decompress initial data: %w", err)
}
if len(dec) > len(buf) {
return 0, fmt.Errorf("decompressed data len %d overflows buffer len %d", len(dec), len(buf))
}

return copy(buf, dec), nil
}

var r io.Reader
if rem < 0 { // non-combined
r = io.MultiReader(bytes.NewReader(buf[from:to]), f)
} else if rem == 0 {
r = bytes.NewReader(buf[from:to])
} else {
r = io.MultiReader(bytes.NewReader(buf[from:to]), io.LimitReader(f, rem))
}

decoder, err := zstd.NewReader(r)
if err != nil {
return 0, fmt.Errorf("zstd decoder: %w", err)
}
defer decoder.Close()

decBuf := make([]byte, object.MaxHeaderLen)

n, err := decoder.Read(decBuf)
if err != nil && !errors.Is(err, io.EOF) {
return 0, fmt.Errorf("zstd read: %w", err)
}
if n > len(buf) {
return 0, fmt.Errorf("decompressed data len %d overflows buffer len %d", n, len(buf))
}

return copy(buf, decBuf[:n]), nil
}

// getObjectStream reads an object from the storage by address as a stream.
// It returns the object with header only, and a reader for the payload.
func (t *FSTree) getObjectStream(addr oid.Address) (*object.Object, io.ReadSeekCloser, error) {
Expand Down Expand Up @@ -58,17 +133,39 @@ func (t *FSTree) getObjectStream(addr oid.Address) (*object.Object, io.ReadSeekC
// The caller is responsible for closing the returned io.ReadCloser if it is not nil.
func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*object.Object, io.ReadSeekCloser, error) {
buf := make([]byte, 2*object.MaxHeaderLen)

from, to, rem, err := t.readHeader(buf, id, f)
if err != nil {
return nil, nil, err
}

if rem <= 0 {
return t.readHeaderAndPayload(f, buf[from:to])
}

rc := struct {
io.Reader
io.Closer
}{Reader: io.LimitReader(f, rem), Closer: f}

return t.readHeaderAndPayload(rc, buf[from:to])
}

// reads header of object by id from f into buf. Returns first and last
// bytes in buf containing the result. Third return is the number of bytes left
// it f is combined (negative if not combined).
func (t *FSTree) readHeader(buf []byte, id oid.ID, f *os.File) (int, int, int64, error) {
n, err := io.ReadFull(f, buf[:object.MaxHeaderLen])
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, f, err
return 0, 0, 0, err
}
if n < combinedDataOff {
return t.readHeaderAndPayload(f, buf[:n])
return 0, n, -1, nil
}

thisOID, l := parseCombinedPrefix(buf)
if thisOID == nil {
return t.readHeaderAndPayload(f, buf[:n])
return 0, n, -1, nil
}

offset := combinedDataOff
Expand All @@ -78,44 +175,40 @@ func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*object.Object,
if n < size {
_, err = io.ReadFull(f, buf[n:size])
if err != nil {
return nil, f, fmt.Errorf("read up to size: %w", err)
return 0, 0, 0, fmt.Errorf("read up to size: %w", err)
}
}

f := io.ReadCloser(f)
if buffered := uint32(size - offset); l > buffered {
f = struct {
io.Reader
io.Closer
}{Reader: io.LimitReader(f, int64(l-buffered)), Closer: f}
return offset, size, int64(l - buffered), nil
}

return t.readHeaderAndPayload(f, buf[offset:size])
return offset, size, 0, nil
}

offset += int(l)
if n-offset < combinedDataOff {
if offset > n {
_, err = f.Seek(int64(offset-n), io.SeekCurrent)
if err != nil {
return nil, f, err
return 0, 0, 0, err
}
}
n = copy(buf, buf[min(offset, n):n])
offset = 0
k, err := io.ReadFull(f, buf[n:n+object.MaxHeaderLen])
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, f, fmt.Errorf("read full: %w", err)
return 0, 0, 0, fmt.Errorf("read full: %w", err)
}
if k == 0 {
return nil, f, fmt.Errorf("file was found, but this object is not in it: %w", io.ErrUnexpectedEOF)
return 0, 0, 0, fmt.Errorf("file was found, but this object is not in it: %w", io.ErrUnexpectedEOF)
}
n += k
}

thisOID, l = parseCombinedPrefix(buf[offset:])
if thisOID == nil {
return nil, f, errors.New("malformed combined file")
return 0, 0, 0, errors.New("malformed combined file")
}

offset += combinedDataOff
Expand Down
16 changes: 16 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/head_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fstree_test

import (
"testing"

"github.com/nspcc-dev/neofs-sdk-go/object"
)

func BenchmarkFSTree_HeadVsGet(b *testing.B) {
Expand Down Expand Up @@ -41,6 +43,20 @@ func runHeadVsGetBenchmark(b *testing.B, payloadSize int, compressed bool) {
}
})

b.Run("ReadHeader"+suffix, func(b *testing.B) {
buf := make([]byte, object.MaxHeaderLen*2)

b.ReportAllocs()
for b.Loop() {
_, err := fsTree.ReadHeader(addr, func() []byte {
return buf
})
if err != nil {
b.Fatal(err)
}
}
})

b.Run("Get"+suffix, func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
Expand Down
39 changes: 39 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/head_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package fstree_test

import (
"bytes"
"encoding/binary"
"fmt"
"testing"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -33,6 +36,8 @@ func TestHeadStorage(t *testing.T) {
fullObj, err := fsTree.Get(obj.Address())
require.NoError(t, err)
require.Equal(t, obj, fullObj)

testHeadToBufferOK(t, fsTree, *obj)
}

testCombinedObjects := func(t *testing.T, fsTree *fstree.FSTree, size int) {
Expand Down Expand Up @@ -60,6 +65,8 @@ func TestHeadStorage(t *testing.T) {
require.Len(t, attrs, 1)
require.Equal(t, fmt.Sprintf("key-%d", i), attrs[0].Key())
require.Equal(t, fmt.Sprintf("value-%d", i), attrs[0].Value())

testHeadToBufferOK(t, fsTree, *objects[i])
}
}

Expand All @@ -78,6 +85,8 @@ func TestHeadStorage(t *testing.T) {
require.NoError(t, err)
require.Equal(t, obj.CutPayload(), res)
require.Len(t, res.Attributes(), numAttrs)

testHeadToBufferOK(t, fsTree, *obj)
})

t.Run("non-existent object", func(t *testing.T) {
Expand All @@ -86,6 +95,11 @@ func TestHeadStorage(t *testing.T) {

_, err := fsTree.Head(addr)
require.Error(t, err)

_, err = fsTree.ReadHeader(obj.Address(), func() []byte {
return make([]byte, object.MaxHeaderLen*2)
})
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
})

t.Run("different payload sizes", func(t *testing.T) {
Expand Down Expand Up @@ -119,3 +133,28 @@ func TestHeadStorage(t *testing.T) {
}
})
}

func testHeadToBufferOK(t *testing.T, fst *fstree.FSTree, obj object.Object) {
var buf []byte
n, err := fst.ReadHeader(obj.Address(), func() []byte {
buf = make([]byte, object.MaxHeaderLen*2)
return buf
})
require.NoError(t, err)

_, tail, ok := bytes.Cut(buf[:n], obj.CutPayload().Marshal())
require.True(t, ok)

prefix := make([]byte, 1+binary.MaxVarintLen64)
prefix[0] = 34 // payload field tag
prefix = prefix[:1+binary.PutUvarint(prefix[1:], uint64(len(obj.Payload())))]

if len(tail) < len(prefix) {
require.True(t, bytes.HasPrefix(prefix, tail))
return
}

tail, ok = bytes.CutPrefix(tail, prefix)
require.True(t, ok)
require.True(t, bytes.HasPrefix(obj.Payload(), tail))
}
Loading