diff --git a/CHANGELOG.md b/CHANGELOG.md index 83e4a90241..d56477ee23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Changelog for NeoFS Node ### Added ### Fixed +- Resending the header after chunks have already been sent in object service `Get` handler (#3833) ### Changed diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 47efddb64c..3ae66c7f94 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -58,6 +58,11 @@ type execCtx struct { nodeLists [][]netmap.NodeInfo repRules []uint + + // headerWritten is set to true after WriteHeader is successfully called. + // If an error occurs after that, the stream is already corrupted and + // no retry should be attempted. + headerWritten bool } type execOption func(*execCtx) @@ -291,7 +296,7 @@ func (exec *execCtx) headChild(id oid.ID) (*object.Object, bool) { return nil, false } -func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) { +func (exec *execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) { c, err := exec.svc.clientCache.get(info) if err == nil { @@ -331,6 +336,7 @@ func (exec *execCtx) writeCollectedHeader() bool { ) if err == nil { + exec.headerWritten = true exec.status = statusOK exec.err = nil } else { diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 27c1ac941b..82113cdbe9 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -327,7 +327,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { zap.Error(exec.err), ) - if execCnr { + if execCnr && !exec.headerWritten { exec.executeOnContainer() exec.analyzeStatus(false) } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index d6aff2b117..9d7373857d 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "strconv" + "sync/atomic" "testing" iec "github.com/nspcc-dev/neofs-node/internal/ec" @@ -1136,3 +1137,184 @@ func parameterizeXHeaders(t testing.TB, p *Prm, ss []string) { p.SetCommonParameters(cp) } + +type failingReader struct { + data []byte + pos int + failAfter int + err error +} + +func (r *failingReader) Read(p []byte) (n int, err error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + + if r.pos >= r.failAfter && r.failAfter > 0 { + return 0, r.err + } + + n = copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + +func (r *failingReader) Close() error { + return nil +} + +type trackingWriter struct { + writeHeaderCount atomic.Int32 + writeChunkCount atomic.Int32 + failAfterChunks int32 + err error +} + +func (w *trackingWriter) WriteHeader(*object.Object) error { + w.writeHeaderCount.Add(1) + return nil +} + +func (w *trackingWriter) WriteChunk([]byte) error { + count := w.writeChunkCount.Add(1) + + if w.failAfterChunks > 0 && count == w.failAfterChunks { + return w.err + } + return nil +} + +type testStorageWithFailingReader struct { + unimplementedLocalStorage + obj *object.Object + failAfter int + err error +} + +func (s *testStorageWithFailingReader) get(*execCtx) (*object.Object, io.ReadCloser, error) { + if s.obj == nil { + return nil, nil, errors.New("object not found") + } + + payload := s.obj.Payload() + reader := &failingReader{ + data: payload, + failAfter: s.failAfter, + err: s.err, + } + + objWithoutPayload := s.obj.CutPayload() + objWithoutPayload.SetPayloadSize(s.obj.PayloadSize()) + return objWithoutPayload, reader, nil +} + +func (s *testStorageWithFailingReader) Head(oid.Address, bool) (*object.Object, error) { + if s.obj == nil { + return nil, errors.New("object not found") + } + return s.obj.CutPayload(), nil +} + +func TestDoubleWriteHeaderOnPayloadReadFailure(t *testing.T) { + ctx := context.Background() + addr := oidtest.Address() + + payloadSize := 1024 * 1024 // 1MB > chunk (256KB) + payload := make([]byte, payloadSize) + _, _ = rand.Read(payload) + + obj := generateObject(addr, nil, payload) + + readErr := errors.New("simulated payload read error") + storage := &testStorageWithFailingReader{ + obj: obj, + failAfter: 300 * 1024, // > chunk + err: readErr, + } + + anyNodeLists, nodeStrs := testNodeMatrix(t, []int{1}) + + clientCache := &testClientCache{ + clients: make(map[string]*testClient), + } + remoteClient := newTestClient() + remoteClient.addResult(addr, obj, nil) + clientCache.clients[nodeStrs[0][0]] = remoteClient + + svc := &Service{cfg: new(cfg)} + svc.log = zaptest.NewLogger(t) + svc.localObjects = storage + svc.localStorage = storage + svc.clientCache = clientCache + svc.neoFSNet = &testNeoFS{ + vectors: map[oid.Address][][]netmap.NodeInfo{ + addr: anyNodeLists, + }, + } + + writer := &trackingWriter{} + + var prm Prm + prm.SetObjectWriter(writer) + prm.WithAddress(addr) + prm.common = new(util.CommonPrm) + + err := svc.Get(ctx, prm) + require.ErrorIs(t, err, readErr) + + t.Logf("WriteHeader called: %d times", writer.writeHeaderCount.Load()) + t.Logf("WriteChunk called: %d times", writer.writeChunkCount.Load()) + require.EqualValues(t, 1, writer.writeHeaderCount.Load()) +} + +func TestDoubleWriteHeaderOnChunkWriteFailure(t *testing.T) { + ctx := context.Background() + addr := oidtest.Address() + + payloadSize := 1024 * 1024 // 1MB > chunk (256KB) + payload := make([]byte, payloadSize) + _, _ = rand.Read(payload) + + obj := generateObject(addr, nil, payload) + + storage := newTestStorage() + storage.addPhy(addr, obj) + + anyNodeLists, nodeStrs := testNodeMatrix(t, []int{1}) + + clientCache := &testClientCache{ + clients: make(map[string]*testClient), + } + remoteClient := newTestClient() + remoteClient.addResult(addr, obj, nil) + clientCache.clients[nodeStrs[0][0]] = remoteClient + + svc := &Service{cfg: new(cfg)} + svc.log = zaptest.NewLogger(t) + svc.localObjects = storage + svc.localStorage = storage + svc.clientCache = clientCache + svc.neoFSNet = &testNeoFS{ + vectors: map[oid.Address][][]netmap.NodeInfo{ + addr: anyNodeLists, + }, + } + + writeErr := errors.New("simulated chunk write error") + writer := &trackingWriter{ + failAfterChunks: 2, + err: writeErr, + } + + var prm Prm + prm.SetObjectWriter(writer) + prm.WithAddress(addr) + prm.common = new(util.CommonPrm) + + err := svc.Get(ctx, prm) + require.ErrorIs(t, err, writeErr) + + t.Logf("WriteHeader called: %d times", writer.writeHeaderCount.Load()) + t.Logf("WriteChunk called: %d times", writer.writeChunkCount.Load()) + require.EqualValues(t, 1, writer.writeHeaderCount.Load()) +}