Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Changelog for NeoFS Node
### Added

### Fixed
- Resending the header after chunks have already been sent in object service `Get` handler (#3833)

### Changed

Expand Down
8 changes: 7 additions & 1 deletion pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type execCtx struct {

nodeLists [][]netmap.NodeInfo
repRules []uint

// headerWritten is set to true after WriteHeader is successfully called.
// If an error occurs after that, the stream is already corrupted and
// no retry should be attempted.
headerWritten bool
}

type execOption func(*execCtx)
Expand Down Expand Up @@ -291,7 +296,7 @@ func (exec *execCtx) headChild(id oid.ID) (*object.Object, bool) {
return nil, false
}

func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
func (exec *execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
c, err := exec.svc.clientCache.get(info)

if err == nil {
Expand Down Expand Up @@ -331,6 +336,7 @@ func (exec *execCtx) writeCollectedHeader() bool {
)

if err == nil {
exec.headerWritten = true
exec.status = statusOK
exec.err = nil
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
zap.Error(exec.err),
)

if execCnr {
if execCnr && !exec.headerWritten {
exec.executeOnContainer()
exec.analyzeStatus(false)
}
Expand Down
182 changes: 182 additions & 0 deletions pkg/services/object/get/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"strconv"
"sync/atomic"
"testing"

iec "github.com/nspcc-dev/neofs-node/internal/ec"
Expand Down Expand Up @@ -1136,3 +1137,184 @@ func parameterizeXHeaders(t testing.TB, p *Prm, ss []string) {

p.SetCommonParameters(cp)
}

type failingReader struct {
data []byte
pos int
failAfter int
err error
}

func (r *failingReader) Read(p []byte) (n int, err error) {
if r.pos >= len(r.data) {
return 0, io.EOF
}

if r.pos >= r.failAfter && r.failAfter > 0 {
return 0, r.err
}

n = copy(p, r.data[r.pos:])
r.pos += n
return n, nil
}

func (r *failingReader) Close() error {
return nil
}

type trackingWriter struct {
writeHeaderCount atomic.Int32
writeChunkCount atomic.Int32
failAfterChunks int32
err error
}

func (w *trackingWriter) WriteHeader(*object.Object) error {
w.writeHeaderCount.Add(1)
return nil
}

func (w *trackingWriter) WriteChunk([]byte) error {
count := w.writeChunkCount.Add(1)

if w.failAfterChunks > 0 && count == w.failAfterChunks {
return w.err
}
return nil
}

type testStorageWithFailingReader struct {
unimplementedLocalStorage
obj *object.Object
failAfter int
err error
}

func (s *testStorageWithFailingReader) get(*execCtx) (*object.Object, io.ReadCloser, error) {
if s.obj == nil {
return nil, nil, errors.New("object not found")
}

payload := s.obj.Payload()
reader := &failingReader{
data: payload,
failAfter: s.failAfter,
err: s.err,
}

objWithoutPayload := s.obj.CutPayload()
objWithoutPayload.SetPayloadSize(s.obj.PayloadSize())
return objWithoutPayload, reader, nil
}

func (s *testStorageWithFailingReader) Head(oid.Address, bool) (*object.Object, error) {
if s.obj == nil {
return nil, errors.New("object not found")
}
return s.obj.CutPayload(), nil
}

func TestDoubleWriteHeaderOnPayloadReadFailure(t *testing.T) {
ctx := context.Background()
addr := oidtest.Address()

payloadSize := 1024 * 1024 // 1MB > chunk (256KB)
payload := make([]byte, payloadSize)
_, _ = rand.Read(payload)

obj := generateObject(addr, nil, payload)

readErr := errors.New("simulated payload read error")
storage := &testStorageWithFailingReader{
obj: obj,
failAfter: 300 * 1024, // > chunk
err: readErr,
}

anyNodeLists, nodeStrs := testNodeMatrix(t, []int{1})

clientCache := &testClientCache{
clients: make(map[string]*testClient),
}
remoteClient := newTestClient()
remoteClient.addResult(addr, obj, nil)
clientCache.clients[nodeStrs[0][0]] = remoteClient

svc := &Service{cfg: new(cfg)}
svc.log = zaptest.NewLogger(t)
svc.localObjects = storage
svc.localStorage = storage
svc.clientCache = clientCache
svc.neoFSNet = &testNeoFS{
vectors: map[oid.Address][][]netmap.NodeInfo{
addr: anyNodeLists,
},
}

writer := &trackingWriter{}

var prm Prm
prm.SetObjectWriter(writer)
prm.WithAddress(addr)
prm.common = new(util.CommonPrm)

err := svc.Get(ctx, prm)
require.ErrorIs(t, err, readErr)

t.Logf("WriteHeader called: %d times", writer.writeHeaderCount.Load())
t.Logf("WriteChunk called: %d times", writer.writeChunkCount.Load())
require.EqualValues(t, 1, writer.writeHeaderCount.Load())
}

func TestDoubleWriteHeaderOnChunkWriteFailure(t *testing.T) {
ctx := context.Background()
addr := oidtest.Address()

payloadSize := 1024 * 1024 // 1MB > chunk (256KB)
payload := make([]byte, payloadSize)
_, _ = rand.Read(payload)

obj := generateObject(addr, nil, payload)

storage := newTestStorage()
storage.addPhy(addr, obj)

anyNodeLists, nodeStrs := testNodeMatrix(t, []int{1})

clientCache := &testClientCache{
clients: make(map[string]*testClient),
}
remoteClient := newTestClient()
remoteClient.addResult(addr, obj, nil)
clientCache.clients[nodeStrs[0][0]] = remoteClient

svc := &Service{cfg: new(cfg)}
svc.log = zaptest.NewLogger(t)
svc.localObjects = storage
svc.localStorage = storage
svc.clientCache = clientCache
svc.neoFSNet = &testNeoFS{
vectors: map[oid.Address][][]netmap.NodeInfo{
addr: anyNodeLists,
},
}

writeErr := errors.New("simulated chunk write error")
writer := &trackingWriter{
failAfterChunks: 2,
err: writeErr,
}

var prm Prm
prm.SetObjectWriter(writer)
prm.WithAddress(addr)
prm.common = new(util.CommonPrm)

err := svc.Get(ctx, prm)
require.ErrorIs(t, err, writeErr)

t.Logf("WriteHeader called: %d times", writer.writeHeaderCount.Load())
t.Logf("WriteChunk called: %d times", writer.writeChunkCount.Load())
require.EqualValues(t, 1, writer.writeHeaderCount.Load())
}
Loading