From a01a1eb59b666f94916ef09c9306d6c027327a13 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 11 Feb 2026 09:50:30 +0300 Subject: [PATCH] storage/fstree: Provide method for buffered HEAD Going to be used in #3783. Signed-off-by: Leonard Lyubich --- .../blobstor/fstree/bench_test.go | 58 +++++++++ .../blobstor/fstree/head.go | 121 ++++++++++++++++-- .../blobstor/fstree/head_bench_test.go | 16 +++ .../blobstor/fstree/head_test.go | 39 ++++++ 4 files changed, 220 insertions(+), 14 deletions(-) diff --git a/pkg/local_object_storage/blobstor/fstree/bench_test.go b/pkg/local_object_storage/blobstor/fstree/bench_test.go index 2c8af2c286..cc29935f81 100644 --- a/pkg/local_object_storage/blobstor/fstree/bench_test.go +++ b/pkg/local_object_storage/blobstor/fstree/bench_test.go @@ -18,6 +18,14 @@ func BenchmarkFSTree_Head(b *testing.B) { } } +func BenchmarkFSTree_HeadToBuffer(b *testing.B) { + for _, size := range payloadSizes { + b.Run(generateSizeLabel(size), func(b *testing.B) { + runReadBenchmark(b, "ReadHeader", size) + }) + } +} + func BenchmarkFSTree_Get(b *testing.B) { for _, size := range payloadSizes { b.Run(generateSizeLabel(size), func(b *testing.B) { @@ -89,6 +97,18 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addr := prepareSingleObject(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "ReadHeader" { + buf := make([]byte, object.MaxHeaderLen*2) + + for b.Loop() { + _, err := fsTree.ReadHeader(addr, func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + for b.Loop() { testRead(fsTree, addr) } @@ -99,6 +119,19 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addrs := prepareMultipleObjects(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "ReadHeader" { + buf := make([]byte, object.MaxHeaderLen*2) + + b.ResetTimer() + for k := range b.N { + _, err := fsTree.ReadHeader(addrs[k%len(addrs)], func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + b.ResetTimer() for k := range b.N { testRead(fsTree, addrs[k%len(addrs)]) @@ -111,6 +144,18 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addr := prepareSingleObject(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "ReadHeader" { + buf := make([]byte, object.MaxHeaderLen*2) + + for b.Loop() { + _, err := fsTree.ReadHeader(addr, func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + for b.Loop() { testRead(fsTree, addr) } @@ -122,6 +167,19 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addrs := prepareMultipleObjects(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "ReadHeader" { + buf := make([]byte, object.MaxHeaderLen*2) + + b.ResetTimer() + for k := range b.N { + _, err := fsTree.ReadHeader(addrs[k%len(addrs)], func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + b.ResetTimer() for k := range b.N { testRead(fsTree, addrs[k%len(addrs)]) diff --git a/pkg/local_object_storage/blobstor/fstree/head.go b/pkg/local_object_storage/blobstor/fstree/head.go index 172c56d62e..3f1776700f 100644 --- a/pkg/local_object_storage/blobstor/fstree/head.go +++ b/pkg/local_object_storage/blobstor/fstree/head.go @@ -27,6 +27,81 @@ func (t *FSTree) Head(addr oid.Address) (*object.Object, error) { return obj, nil } +// ReadHeader reads header of the referenced object from t into buffer provided +// by getBuffer. Returns number of bytes read. +// +// If object is missing, ReadHeader returns [apistatus.ErrObjectNotFound]. +// +// The buffer must have 2*[object.MaxHeaderLen] bytes len at least. +func (t *FSTree) ReadHeader(addr oid.Address, getBuffer func() []byte) (int, error) { + p := t.treePath(addr) + + f, err := os.Open(p) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return 0, logicerr.Wrap(apistatus.ErrObjectNotFound) + } + return 0, fmt.Errorf("read file %q: %w", p, err) + } + defer f.Close() + + buf := getBuffer() + if len(buf) < 2*object.MaxHeaderLen { + return 0, fmt.Errorf("too short buffer %d bytes", len(buf)) + } + + from, to, rem, err := t.readHeader(buf, addr.Object(), f) + if err != nil { + return 0, err + } + + // following mostly copied from readHeaderAndPayload() + + compressed := t.IsCompressed(buf[from:to]) + if !compressed { + return copy(buf, buf[from:to]), nil + } + + if to-from < object.MaxHeaderLen { + dec, err := t.DecompressForce(buf[from:to]) + if err != nil { + return 0, fmt.Errorf("decompress initial data: %w", err) + } + if len(dec) > len(buf) { + return 0, fmt.Errorf("decompressed data len %d overflows buffer len %d", len(dec), len(buf)) + } + + return copy(buf, dec), nil + } + + var r io.Reader + if rem < 0 { // non-combined + r = io.MultiReader(bytes.NewReader(buf[from:to]), f) + } else if rem == 0 { + r = bytes.NewReader(buf[from:to]) + } else { + r = io.MultiReader(bytes.NewReader(buf[from:to]), io.LimitReader(f, rem)) + } + + decoder, err := zstd.NewReader(r) + if err != nil { + return 0, fmt.Errorf("zstd decoder: %w", err) + } + defer decoder.Close() + + decBuf := make([]byte, object.MaxHeaderLen) + + n, err := decoder.Read(decBuf) + if err != nil && !errors.Is(err, io.EOF) { + return 0, fmt.Errorf("zstd read: %w", err) + } + if n > len(buf) { + return 0, fmt.Errorf("decompressed data len %d overflows buffer len %d", n, len(buf)) + } + + return copy(buf, decBuf[:n]), nil +} + // getObjectStream reads an object from the storage by address as a stream. // It returns the object with header only, and a reader for the payload. func (t *FSTree) getObjectStream(addr oid.Address) (*object.Object, io.ReadSeekCloser, error) { @@ -58,17 +133,39 @@ func (t *FSTree) getObjectStream(addr oid.Address) (*object.Object, io.ReadSeekC // The caller is responsible for closing the returned io.ReadCloser if it is not nil. func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*object.Object, io.ReadSeekCloser, error) { buf := make([]byte, 2*object.MaxHeaderLen) + + from, to, rem, err := t.readHeader(buf, id, f) + if err != nil { + return nil, nil, err + } + + if rem <= 0 { + return t.readHeaderAndPayload(f, buf[from:to]) + } + + rc := struct { + io.Reader + io.Closer + }{Reader: io.LimitReader(f, rem), Closer: f} + + return t.readHeaderAndPayload(rc, buf[from:to]) +} + +// reads header of object by id from f into buf. Returns first and last +// bytes in buf containing the result. Third return is the number of bytes left +// it f is combined (negative if not combined). +func (t *FSTree) readHeader(buf []byte, id oid.ID, f *os.File) (int, int, int64, error) { n, err := io.ReadFull(f, buf[:object.MaxHeaderLen]) if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { - return nil, f, err + return 0, 0, 0, err } if n < combinedDataOff { - return t.readHeaderAndPayload(f, buf[:n]) + return 0, n, -1, nil } thisOID, l := parseCombinedPrefix(buf) if thisOID == nil { - return t.readHeaderAndPayload(f, buf[:n]) + return 0, n, -1, nil } offset := combinedDataOff @@ -78,19 +175,15 @@ func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*object.Object, if n < size { _, err = io.ReadFull(f, buf[n:size]) if err != nil { - return nil, f, fmt.Errorf("read up to size: %w", err) + return 0, 0, 0, fmt.Errorf("read up to size: %w", err) } } - f := io.ReadCloser(f) if buffered := uint32(size - offset); l > buffered { - f = struct { - io.Reader - io.Closer - }{Reader: io.LimitReader(f, int64(l-buffered)), Closer: f} + return offset, size, int64(l - buffered), nil } - return t.readHeaderAndPayload(f, buf[offset:size]) + return offset, size, 0, nil } offset += int(l) @@ -98,24 +191,24 @@ func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*object.Object, if offset > n { _, err = f.Seek(int64(offset-n), io.SeekCurrent) if err != nil { - return nil, f, err + return 0, 0, 0, err } } n = copy(buf, buf[min(offset, n):n]) offset = 0 k, err := io.ReadFull(f, buf[n:n+object.MaxHeaderLen]) if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { - return nil, f, fmt.Errorf("read full: %w", err) + return 0, 0, 0, fmt.Errorf("read full: %w", err) } if k == 0 { - return nil, f, fmt.Errorf("file was found, but this object is not in it: %w", io.ErrUnexpectedEOF) + return 0, 0, 0, fmt.Errorf("file was found, but this object is not in it: %w", io.ErrUnexpectedEOF) } n += k } thisOID, l = parseCombinedPrefix(buf[offset:]) if thisOID == nil { - return nil, f, errors.New("malformed combined file") + return 0, 0, 0, errors.New("malformed combined file") } offset += combinedDataOff diff --git a/pkg/local_object_storage/blobstor/fstree/head_bench_test.go b/pkg/local_object_storage/blobstor/fstree/head_bench_test.go index 0076bbef9d..5c7240c804 100644 --- a/pkg/local_object_storage/blobstor/fstree/head_bench_test.go +++ b/pkg/local_object_storage/blobstor/fstree/head_bench_test.go @@ -2,6 +2,8 @@ package fstree_test import ( "testing" + + "github.com/nspcc-dev/neofs-sdk-go/object" ) func BenchmarkFSTree_HeadVsGet(b *testing.B) { @@ -41,6 +43,20 @@ func runHeadVsGetBenchmark(b *testing.B, payloadSize int, compressed bool) { } }) + b.Run("ReadHeader"+suffix, func(b *testing.B) { + buf := make([]byte, object.MaxHeaderLen*2) + + b.ReportAllocs() + for b.Loop() { + _, err := fsTree.ReadHeader(addr, func() []byte { + return buf + }) + if err != nil { + b.Fatal(err) + } + } + }) + b.Run("Get"+suffix, func(b *testing.B) { b.ReportAllocs() for b.Loop() { diff --git a/pkg/local_object_storage/blobstor/fstree/head_test.go b/pkg/local_object_storage/blobstor/fstree/head_test.go index 76a5e44ed5..b638afccfd 100644 --- a/pkg/local_object_storage/blobstor/fstree/head_test.go +++ b/pkg/local_object_storage/blobstor/fstree/head_test.go @@ -1,10 +1,13 @@ package fstree_test import ( + "bytes" + "encoding/binary" "fmt" "testing" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/stretchr/testify/require" @@ -33,6 +36,8 @@ func TestHeadStorage(t *testing.T) { fullObj, err := fsTree.Get(obj.Address()) require.NoError(t, err) require.Equal(t, obj, fullObj) + + testHeadToBufferOK(t, fsTree, *obj) } testCombinedObjects := func(t *testing.T, fsTree *fstree.FSTree, size int) { @@ -60,6 +65,8 @@ func TestHeadStorage(t *testing.T) { require.Len(t, attrs, 1) require.Equal(t, fmt.Sprintf("key-%d", i), attrs[0].Key()) require.Equal(t, fmt.Sprintf("value-%d", i), attrs[0].Value()) + + testHeadToBufferOK(t, fsTree, *objects[i]) } } @@ -78,6 +85,8 @@ func TestHeadStorage(t *testing.T) { require.NoError(t, err) require.Equal(t, obj.CutPayload(), res) require.Len(t, res.Attributes(), numAttrs) + + testHeadToBufferOK(t, fsTree, *obj) }) t.Run("non-existent object", func(t *testing.T) { @@ -86,6 +95,11 @@ func TestHeadStorage(t *testing.T) { _, err := fsTree.Head(addr) require.Error(t, err) + + _, err = fsTree.ReadHeader(obj.Address(), func() []byte { + return make([]byte, object.MaxHeaderLen*2) + }) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) }) t.Run("different payload sizes", func(t *testing.T) { @@ -119,3 +133,28 @@ func TestHeadStorage(t *testing.T) { } }) } + +func testHeadToBufferOK(t *testing.T, fst *fstree.FSTree, obj object.Object) { + var buf []byte + n, err := fst.ReadHeader(obj.Address(), func() []byte { + buf = make([]byte, object.MaxHeaderLen*2) + return buf + }) + require.NoError(t, err) + + _, tail, ok := bytes.Cut(buf[:n], obj.CutPayload().Marshal()) + require.True(t, ok) + + prefix := make([]byte, 1+binary.MaxVarintLen64) + prefix[0] = 34 // payload field tag + prefix = prefix[:1+binary.PutUvarint(prefix[1:], uint64(len(obj.Payload())))] + + if len(tail) < len(prefix) { + require.True(t, bytes.HasPrefix(prefix, tail)) + return + } + + tail, ok = bytes.CutPrefix(tail, prefix) + require.True(t, ok) + require.True(t, bytes.HasPrefix(obj.Payload(), tail)) +}