From 39ef5d4a5486de1e3cc63075499c29adc4b4864f Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Thu, 19 Feb 2026 14:34:03 +0300 Subject: [PATCH 1/2] object/get: change exec method receiver to pointer Previously, if an error occurred, it was not saved and was ignored because the receiver was by value. Signed-off-by: Andrey Butusov --- pkg/services/object/get/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 47efddb64c..10dab16f51 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -291,7 +291,7 @@ func (exec *execCtx) headChild(id oid.ID) (*object.Object, bool) { return nil, false } -func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) { +func (exec *execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) { c, err := exec.svc.clientCache.get(info) if err == nil { From 3ee916939980d364fcdf03e3253d4c30d5a32668 Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Thu, 19 Feb 2026 14:43:03 +0300 Subject: [PATCH 2/2] object/get: return error if header has already been written There were situations when, upon receiving an object, the header had already been written, but an error occurred while writing chunks, when some of them had already been written. In this case, the system would start searching on other nodes. Then the header would be written again, resulting in an error: `unexpected message instead of chunk part: *object.GetResponse_Body_Init_`. However, even if we allow the chunks to continue to be written without writing the header, the stream will already be corrupted by the chunks that have already been written locally. Therefore, an error must be returned immediately when writing chunks. Closes #3807. Signed-off-by: Andrey Butusov --- CHANGELOG.md | 1 + pkg/services/object/get/exec.go | 6 + pkg/services/object/get/get.go | 2 +- pkg/services/object/get/get_test.go | 182 ++++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83e4a90241..d56477ee23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Changelog for NeoFS Node ### Added ### Fixed +- Resending the header after chunks have already been sent in object service `Get` handler (#3833) ### Changed diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 10dab16f51..3ae66c7f94 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -58,6 +58,11 @@ type execCtx struct { nodeLists [][]netmap.NodeInfo repRules []uint + + // headerWritten is set to true after WriteHeader is successfully called. + // If an error occurs after that, the stream is already corrupted and + // no retry should be attempted. + headerWritten bool } type execOption func(*execCtx) @@ -331,6 +336,7 @@ func (exec *execCtx) writeCollectedHeader() bool { ) if err == nil { + exec.headerWritten = true exec.status = statusOK exec.err = nil } else { diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 27c1ac941b..82113cdbe9 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -327,7 +327,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { zap.Error(exec.err), ) - if execCnr { + if execCnr && !exec.headerWritten { exec.executeOnContainer() exec.analyzeStatus(false) } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index d6aff2b117..9d7373857d 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "strconv" + "sync/atomic" "testing" iec "github.com/nspcc-dev/neofs-node/internal/ec" @@ -1136,3 +1137,184 @@ func parameterizeXHeaders(t testing.TB, p *Prm, ss []string) { p.SetCommonParameters(cp) } + +type failingReader struct { + data []byte + pos int + failAfter int + err error +} + +func (r *failingReader) Read(p []byte) (n int, err error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + + if r.pos >= r.failAfter && r.failAfter > 0 { + return 0, r.err + } + + n = copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + +func (r *failingReader) Close() error { + return nil +} + +type trackingWriter struct { + writeHeaderCount atomic.Int32 + writeChunkCount atomic.Int32 + failAfterChunks int32 + err error +} + +func (w *trackingWriter) WriteHeader(*object.Object) error { + w.writeHeaderCount.Add(1) + return nil +} + +func (w *trackingWriter) WriteChunk([]byte) error { + count := w.writeChunkCount.Add(1) + + if w.failAfterChunks > 0 && count == w.failAfterChunks { + return w.err + } + return nil +} + +type testStorageWithFailingReader struct { + unimplementedLocalStorage + obj *object.Object + failAfter int + err error +} + +func (s *testStorageWithFailingReader) get(*execCtx) (*object.Object, io.ReadCloser, error) { + if s.obj == nil { + return nil, nil, errors.New("object not found") + } + + payload := s.obj.Payload() + reader := &failingReader{ + data: payload, + failAfter: s.failAfter, + err: s.err, + } + + objWithoutPayload := s.obj.CutPayload() + objWithoutPayload.SetPayloadSize(s.obj.PayloadSize()) + return objWithoutPayload, reader, nil +} + +func (s *testStorageWithFailingReader) Head(oid.Address, bool) (*object.Object, error) { + if s.obj == nil { + return nil, errors.New("object not found") + } + return s.obj.CutPayload(), nil +} + +func TestDoubleWriteHeaderOnPayloadReadFailure(t *testing.T) { + ctx := context.Background() + addr := oidtest.Address() + + payloadSize := 1024 * 1024 // 1MB > chunk (256KB) + payload := make([]byte, payloadSize) + _, _ = rand.Read(payload) + + obj := generateObject(addr, nil, payload) + + readErr := errors.New("simulated payload read error") + storage := &testStorageWithFailingReader{ + obj: obj, + failAfter: 300 * 1024, // > chunk + err: readErr, + } + + anyNodeLists, nodeStrs := testNodeMatrix(t, []int{1}) + + clientCache := &testClientCache{ + clients: make(map[string]*testClient), + } + remoteClient := newTestClient() + remoteClient.addResult(addr, obj, nil) + clientCache.clients[nodeStrs[0][0]] = remoteClient + + svc := &Service{cfg: new(cfg)} + svc.log = zaptest.NewLogger(t) + svc.localObjects = storage + svc.localStorage = storage + svc.clientCache = clientCache + svc.neoFSNet = &testNeoFS{ + vectors: map[oid.Address][][]netmap.NodeInfo{ + addr: anyNodeLists, + }, + } + + writer := &trackingWriter{} + + var prm Prm + prm.SetObjectWriter(writer) + prm.WithAddress(addr) + prm.common = new(util.CommonPrm) + + err := svc.Get(ctx, prm) + require.ErrorIs(t, err, readErr) + + t.Logf("WriteHeader called: %d times", writer.writeHeaderCount.Load()) + t.Logf("WriteChunk called: %d times", writer.writeChunkCount.Load()) + require.EqualValues(t, 1, writer.writeHeaderCount.Load()) +} + +func TestDoubleWriteHeaderOnChunkWriteFailure(t *testing.T) { + ctx := context.Background() + addr := oidtest.Address() + + payloadSize := 1024 * 1024 // 1MB > chunk (256KB) + payload := make([]byte, payloadSize) + _, _ = rand.Read(payload) + + obj := generateObject(addr, nil, payload) + + storage := newTestStorage() + storage.addPhy(addr, obj) + + anyNodeLists, nodeStrs := testNodeMatrix(t, []int{1}) + + clientCache := &testClientCache{ + clients: make(map[string]*testClient), + } + remoteClient := newTestClient() + remoteClient.addResult(addr, obj, nil) + clientCache.clients[nodeStrs[0][0]] = remoteClient + + svc := &Service{cfg: new(cfg)} + svc.log = zaptest.NewLogger(t) + svc.localObjects = storage + svc.localStorage = storage + svc.clientCache = clientCache + svc.neoFSNet = &testNeoFS{ + vectors: map[oid.Address][][]netmap.NodeInfo{ + addr: anyNodeLists, + }, + } + + writeErr := errors.New("simulated chunk write error") + writer := &trackingWriter{ + failAfterChunks: 2, + err: writeErr, + } + + var prm Prm + prm.SetObjectWriter(writer) + prm.WithAddress(addr) + prm.common = new(util.CommonPrm) + + err := svc.Get(ctx, prm) + require.ErrorIs(t, err, writeErr) + + t.Logf("WriteHeader called: %d times", writer.writeHeaderCount.Load()) + t.Logf("WriteChunk called: %d times", writer.writeChunkCount.Load()) + require.EqualValues(t, 1, writer.writeHeaderCount.Load()) +}