From 74557010da18a8ccd6c3f4a317fe0dfa96326afe Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 17 Feb 2026 21:56:32 +0300 Subject: [PATCH] WIP Signed-off-by: Leonard Lyubich --- pkg/services/object/put/distributed.go | 37 +- pkg/services/object/put/ec.go | 21 +- pkg/services/object/put/initial.go | 353 ++++++++++++++++ pkg/services/object/put/initial_test.go | 540 ++++++++++++++++++++++++ pkg/services/object/put/service_test.go | 50 ++- pkg/services/object/put/streamer.go | 1 + pkg/services/object/put/util.go | 20 + 7 files changed, 991 insertions(+), 31 deletions(-) create mode 100644 pkg/services/object/put/initial.go create mode 100644 pkg/services/object/put/initial_test.go diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 4f091d0f06..98b01be3bb 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -76,6 +76,8 @@ type distributedTarget struct { // When object from request is an EC part, ecPart.RuleIndex is >= 0. // Undefined when policy has no EC rules. ecPart iec.PartInfo + + initialPlacementPolicy *InitialPlacementPolicy } type nodeDesc struct { @@ -182,7 +184,30 @@ func (t *distributedTarget) saveObject(obj object.Object, encObj encodedObject) repRules := t.containerNodes.PrimaryCounts() ecRules := t.containerNodes.ECRules() - if typ := obj.Type(); len(repRules) > 0 || typ == object.TypeTombstone || typ == object.TypeLock || typ == object.TypeLink { + typ := obj.Type() + + initialPUT := t.sessionSigner != nil || !t.localOnly + if initialPUT && typ == object.TypeRegular && t.initialPlacementPolicy != nil { + if t.sessionSigner != nil || len(ecRules) == 0 { + err := t.handleInitialPlacementPolicy(*t.initialPlacementPolicy, repRules, ecRules, objNodeLists, obj, encObj) + if err != nil { + return fmt.Errorf("initial placement failed: %w", err) + } + return nil + } else if t.ecPart.RuleIndex < 0 { // prevent EC encoding + err := t.handleInitialPlacementPolicy(InitialPlacementPolicy{ + MaxReplicas: t.initialPlacementPolicy.MaxReplicas, + PreferLocal: t.initialPlacementPolicy.PreferLocal, + MaxPerRuleReplicas: t.initialPlacementPolicy.MaxPerRuleReplicas[:len(repRules)], + }, repRules, nil, objNodeLists[:len(repRules)], obj, encObj) + if err != nil { + return fmt.Errorf("initial placement failed: %w", err) + } + return nil + } // initial policy does not apply to individual EC parts + } + + if len(repRules) > 0 || typ == object.TypeTombstone || typ == object.TypeLock || typ == object.TypeLink { broadcast := typ == object.TypeTombstone || typ == object.TypeLink || (!t.localOnly && typ == object.TypeLock) || len(obj.Children()) > 0 useRepRules := repRules @@ -547,9 +572,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, replCounts []uint, busy.SetMessage(retErr.Error()) return busy } - var inc = new(apistatus.Incomplete) - inc.SetMessage(retErr.Error()) - return inc + return wrapIncompleteError(retErr) } nextNodeGroupKeys = slices.Grow(nextNodeGroupKeys, int(replRem))[:0] for ; nodesCounters[listInd].processed < listLen && uint(len(nextNodeGroupKeys)) < replRem; nodesCounters[listInd].processed++ { @@ -578,8 +601,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, replCounts []uint, } // critical error that may ultimately block the storage service. Normally it // should not appear because entry into the network map under strict control - l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", - zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr)) + logNodeConversionError(l, nodeLists[listInd][j], nr.convertErr) if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure err := fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed - 1}, @@ -638,8 +660,7 @@ broadcast: if nr.convertErr != nil { // critical error that may ultimately block the storage service. Normally it // should not appear because entry into the network map under strict control - l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", - zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(nr.convertErr)) + logNodeConversionError(l, nodeLists[i][j], nr.convertErr) continue // to send as many replicas as possible } wg.Add(1) diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/put/ec.go index bd14b744af..567392fdd1 100644 --- a/pkg/services/object/put/ec.go +++ b/pkg/services/object/put/ec.go @@ -19,13 +19,9 @@ func (t *distributedTarget) ecAndSaveObject(signer neofscrypto.Signer, obj objec continue } - payloadParts, err := iec.Encode(ecRules[i], obj.Payload()) + payloadParts, err := t.ecAndSaveObjectByRule(signer, obj, i, ecRules[i], nodeLists[i]) if err != nil { - return fmt.Errorf("split object payload into EC parts for rule #%d (%s): %w", i, ecRules[i], err) - } - - if err := t.applyECRule(signer, obj, i, payloadParts, nodeLists[i]); err != nil { - return fmt.Errorf("apply EC rule #%d (%s): %w", i, ecRules[i], err) + return err } for j := i + 1; j < len(ecRules); j++ { @@ -41,6 +37,19 @@ func (t *distributedTarget) ecAndSaveObject(signer neofscrypto.Signer, obj objec return nil } +func (t *distributedTarget) ecAndSaveObjectByRule(signer neofscrypto.Signer, obj object.Object, ruleIdx int, rule iec.Rule, nodeLists []netmap.NodeInfo) ([][]byte, error) { + payloadParts, err := iec.Encode(rule, obj.Payload()) + if err != nil { + return nil, fmt.Errorf("split object payload into EC parts for rule #%d (%s): %w", ruleIdx, rule, err) + } + + if err := t.applyECRule(signer, obj, ruleIdx, payloadParts, nodeLists); err != nil { + return nil, fmt.Errorf("apply EC rule #%d (%s): %w", ruleIdx, rule, err) + } + + return payloadParts, nil +} + func (t *distributedTarget) applyECRule(signer neofscrypto.Signer, obj object.Object, ruleIdx int, payloadParts [][]byte, nodeList []netmap.NodeInfo) error { var eg errgroup.Group diff --git a/pkg/services/object/put/initial.go b/pkg/services/object/put/initial.go new file mode 100644 index 0000000000..830ca7d2ca --- /dev/null +++ b/pkg/services/object/put/initial.go @@ -0,0 +1,353 @@ +package putsvc + +import ( + "fmt" + "iter" + "slices" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type InitialPlacementPolicy struct { + MaxReplicas uint32 + PreferLocal bool + MaxPerRuleReplicas []uint32 +} + +type progressStatus uint8 + +const ( + statusUndone progressStatus = iota + statusWIP + statusOK + statusFail +) + +type nodeProgress struct { + status progressStatus + ok bool +} + +func (t *distributedTarget) handleInitialPlacementPolicy(inPolicy InitialPlacementPolicy, mainRepRules []uint, mainECRules []iec.Rule, + nodeLists [][]netmap.NodeInfo, obj object.Object, encObj encodedObject) error { + var repLimits, ecLimits []uint32 + if inPolicy.MaxPerRuleReplicas != nil { + repLimits = inPolicy.MaxPerRuleReplicas[:len(mainRepRules)] + ecLimits = inPolicy.MaxPerRuleReplicas[len(mainRepRules):] + } else { + // TODO: make ContainerNodes.PrimaryCounts() to just assign here + repLimits = make([]uint32, len(mainRepRules)) + for i := range repLimits { + repLimits[i] = uint32(mainRepRules[i]) + } + ecLimits = islices.RepeatElement(len(mainECRules), uint32(1)) + } + + listStatuses := make([][]nodeProgress, len(repLimits)+len(ecLimits)) + for i := range listStatuses { + if i < len(repLimits) { + if repLimits[i] > 0 { + listStatuses[i] = make([]nodeProgress, len(nodeLists[i])) + } + continue + } + + if ecLimits[i-len(repLimits)] > 0 { + listStatuses[i] = make([]nodeProgress, 1) + } + } + + var eg errgroup.Group + + if inPolicy.PreferLocal && inPolicy.MaxReplicas > 0 { + localLists := make([]int, 0, len(nodeLists)) + for i := range nodeLists { + if localNodeInSet(t.placementIterator.neoFSNet, nodeLists[i]) { + localLists = append(localLists, i) + } + } + + ok, err := t._handleInitialPlacementSeq(inPolicy.MaxReplicas, repLimits, ecLimits, mainECRules, nodeLists, obj, encObj, listStatuses, &eg, true, slices.All(localLists)) + if err != nil || ok == inPolicy.MaxReplicas { + return err + } + + inPolicy.MaxReplicas -= ok + } + + _, err := t._handleInitialPlacementSeq(inPolicy.MaxReplicas, repLimits, ecLimits, mainECRules, nodeLists, obj, encObj, listStatuses, &eg, false, func(yield func(int, int) bool) { + for i := range nodeLists { + if !yield(i, i) { + return + } + } + }) + return err +} + +func (t *distributedTarget) _handleInitialPlacementSeq(maxReplicas uint32, repLimits []uint32, ecLimits []uint32, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo, + obj object.Object, encObj encodedObject, listStatuses [][]nodeProgress, eg *errgroup.Group, local bool, listSeq iter.Seq2[int, int]) (uint32, error) { + leftReplicas := maxReplicas +group: + for { + t._fillNextInitialPlacementGroup(leftReplicas, repLimits, ecLimits, ecRules, nodeLists, obj, encObj, listStatuses, eg, local, listSeq) + + if err := eg.Wait(); err != nil { + return 0, err + } + + for _, i := range listSeq { + for j := range listStatuses[i] { + if listStatuses[i][j].status == statusWIP { + if listStatuses[i][j].ok { + listStatuses[i][j].status = statusOK + } else { + listStatuses[i][j].status = statusFail + } + } + } + } + + if maxReplicas > 0 { + var ok, undone uint32 + for _, i := range listSeq { + var limit uint32 + if i < len(repLimits) { + limit = repLimits[i] + } else { + limit = ecLimits[i] + } + + if limit == 0 { + continue + } + + for j := range listStatuses[i] { + switch listStatuses[i][j].status { + case statusUndone: + undone++ + case statusOK: + if ok++; ok == maxReplicas { + return ok, nil + } + case statusFail: + default: + panic(fmt.Sprintf("unexpected enum value %d", listStatuses[i][j].status)) + } + } + } + + if local { + if undone == 0 { + return ok, nil + } + continue + } + + if maxReplicas <= ok+undone { + leftReplicas = maxReplicas - ok + continue + } + + err := fmt.Errorf("unable to reach MaxReplicas %d (succeeded: %d, left nodes: %d)", maxReplicas, ok, undone) + if ok == 0 { + return 0, err + } + return 0, wrapIncompleteError(err) + } + + nextList: + for i := range listStatuses { + var limit uint32 + if i < len(repLimits) { + limit = repLimits[i] + } else { + limit = ecLimits[i] + } + + if limit == 0 { + continue + } + + var ok, undone uint32 + for j := range listStatuses[i] { + switch listStatuses[i][j].status { + case statusUndone: + undone++ + case statusOK: + if ok++; ok == limit { + continue nextList + } + case statusFail: + default: + panic(fmt.Sprintf("unexpected enum value %d", listStatuses[i][j].status)) + } + } + + if limit <= ok+undone { + continue group + } + + err := fmt.Errorf("unable to reach replica number for rule #%d (required: %d, succeeded: %d, left nodes: %d)", i, limit, ok, undone) + if ok == 0 { + return 0, err + } + return 0, wrapIncompleteError(err) + } + + return 0, nil + } +} + +func (t *distributedTarget) _fillNextInitialPlacementGroup(maxReplicas uint32, repLimits []uint32, ecLimits []uint32, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo, + obj object.Object, encObj encodedObject, listStatuses [][]nodeProgress, eg *errgroup.Group, local bool, listSeq iter.Seq2[int, int]) { + var added uint32 + for { + var extended bool + nextList: + for _, listIdx := range listSeq { + var limit uint32 + ecRuleIdx := -1 + if listIdx >= len(repLimits) { + ecRuleIdx = listIdx - len(repLimits) + limit = ecLimits[ecRuleIdx] + } else { + limit = repLimits[listIdx] + } + + if limit == 0 { + continue + } + + nodeIdx := -1 + var ok, wip uint32 + statusLoop: + for i := range listStatuses[listIdx] { + switch listStatuses[listIdx][i].status { + case statusUndone: + nodeIdx = i + break statusLoop + case statusOK: + ok++ + if ok+wip == limit { + continue nextList + } + case statusWIP: + wip++ + if ok+wip == limit { + continue nextList + } + case statusFail: + default: + panic(fmt.Sprintf("unexpected enum value %d", listStatuses[listIdx][i].status)) + } + } + + if nodeIdx < 0 { + continue + } + + if ecRuleIdx >= 0 { + eg.Go(func() error { + _, err := t.ecAndSaveObjectByRule(t.sessionSigner, obj, ecRuleIdx, ecRules[ecRuleIdx], nodeLists[listIdx]) + if err != nil { + t.placementIterator.log.Error("initial EC placement failed", + zap.Stringer("object", obj.Address()), zap.Error(err)) // error contains rule info + } + listStatuses[listIdx][0].ok = err == nil + if local { + err = nil + } // otherwise EC rule must succeed + return err + }) + + listStatuses[listIdx][0].status = statusWIP + + if added++; added == maxReplicas { + return + } + + extended = true + continue + } + + var node nodeDesc + node.local = t.placementIterator.neoFSNet.IsLocalNodePublicKey(nodeLists[listIdx][nodeIdx].PublicKey()) + node.placementVector = listIdx + if !node.local { + var err error + node.info, err = convertNodeInfo(nodeLists[listIdx][nodeIdx]) + if err != nil { + // https://github.com/nspcc-dev/neofs-node/issues/3565 + logNodeConversionError(t.placementIterator.log, nodeLists[listIdx][nodeIdx], err) + setNodeStatus(listStatuses[:len(repLimits)], nodeLists[:len(repLimits)], listIdx, nodeIdx, statusFail) + continue + } + } + + setNodeStatus(listStatuses[:len(repLimits)], nodeLists[:len(repLimits)], listIdx, nodeIdx, statusWIP) + + eg.Go(func() error { + err := t.sendObject(obj, encObj, node, &t.metaCollection) + if err != nil { + t.placementIterator.log.Error("initial REP placement to node failed", + zap.Stringer("object", obj.Address()), zap.Int("repRule", listIdx), zap.Int("nodeIdx", nodeIdx), + zap.Bool("local", node.local), zap.Error(err)) // error contains addresses if remote + markNodeFailure(listStatuses[:len(repLimits)], nodeLists[:len(repLimits)], listIdx, nodeIdx) + // TODO: this could have been the last chance to comply with the rule. If it is + // missed, the entire operation should be aborted asap. Currently, if some other + // worker handles slow node, request handler will wait for it delaying the response. + // Note that this is relevant if local is unset only. + return nil + } + markNodeSuccess(listStatuses[:len(repLimits)], nodeLists[:len(repLimits)], listIdx, nodeIdx) + return nil + }) + + if added++; added == maxReplicas { + return + } + + extended = true + } + + if !extended { + return + } + } +} + +func setNodeStatus(listStatuses [][]nodeProgress, nodeLists [][]netmap.NodeInfo, listIdx int, nodeIdx int, st progressStatus) { + for i := range listStatuses { + if i != listIdx { + if ind := nodeIndexInSet(nodeLists[listIdx][nodeIdx], nodeLists[i]); ind >= 0 { + listStatuses[i][ind].status = st + } + } + } + listStatuses[listIdx][nodeIdx].status = st +} + +func markNodeSuccess(listStatuses [][]nodeProgress, nodeLists [][]netmap.NodeInfo, listIdx int, nodeIdx int) { + _markNodeResult(listStatuses, nodeLists, listIdx, nodeIdx, true) +} + +func markNodeFailure(listStatuses [][]nodeProgress, nodeLists [][]netmap.NodeInfo, listIdx int, nodeIdx int) { + _markNodeResult(listStatuses, nodeLists, listIdx, nodeIdx, false) +} + +func _markNodeResult(listStatuses [][]nodeProgress, nodeLists [][]netmap.NodeInfo, listIdx int, nodeIdx int, ok bool) { + for i := range listStatuses { + if i != listIdx { + if ind := nodeIndexInSet(nodeLists[listIdx][nodeIdx], nodeLists[i]); ind >= 0 { + listStatuses[i][ind].ok = ok + } + } + } + listStatuses[listIdx][nodeIdx].ok = ok +} diff --git a/pkg/services/object/put/initial_test.go b/pkg/services/object/put/initial_test.go new file mode 100644 index 0000000000..cd083e4112 --- /dev/null +++ b/pkg/services/object/put/initial_test.go @@ -0,0 +1,540 @@ +package putsvc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "slices" + "strconv" + "sync" + "sync/atomic" + "testing" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/nspcc-dev/neofs-node/internal/testutil" + coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +type nopObjectWriter struct{} + +func (nopObjectWriter) Write(p []byte) (int, error) { return len(p), nil } +func (nopObjectWriter) Close() error { return nil } +func (nopObjectWriter) GetResult() client.ResObjectPut { return client.ResObjectPut{} } + +type nopPutClient struct { + unimplementedClient +} + +func (nopPutClient) ObjectPutInit(context.Context, object.Object, user.Signer, client.PrmObjectPutInit) (client.ObjectWriter, error) { + return nopObjectWriter{}, nil +} + +type testConnCollector struct { + errors map[string]error + + connsMtx sync.Mutex + conns [][]byte +} + +func (x *testConnCollector) Get(node coreclient.NodeInfo) (coreclient.MultiAddressClient, error) { + x.connsMtx.Lock() + x.conns = append(x.conns, node.PublicKey()) + x.connsMtx.Unlock() + + if err := x.errors[string(node.PublicKey())]; err != nil { + return nil, err + } + + return nopPutClient{}, nil +} + +func (x *testConnCollector) failNode(node netmap.NodeInfo) { + if x.errors == nil { + x.errors = map[string]error{} + } + x.errors[string(node.PublicKey())] = errors.New("[test] forced error") +} + +func (x *testConnCollector) collectConns() [][]byte { + return x.conns +} + +type testLocalStorageCallCollector struct { + unimplementedLocalStorage + count atomic.Uint32 +} + +func (x *testLocalStorageCallCollector) Put(*object.Object, []byte) error { + x.count.Add(1) + return nil +} + +func (x *testLocalStorageCallCollector) callCount() uint32 { + return x.count.Load() +} + +func TestInitialPlacement_REPLimits(t *testing.T) { + anyNodeKey := neofscryptotest.Signer().ECDSAPrivateKey + obj := objecttest.Object() + var noEncObj encodedObject + keyStorage := util.NewKeyStorage(&anyNodeKey, nil, nil) + + t.Run("failures", func(t *testing.T) { + tgt := &distributedTarget{ + placementIterator: placementIterator{ + log: zap.NewNop(), + neoFSNet: testNetwork{}, + }, + keyStorage: keyStorage, + } + + for i, tc := range []struct { + main []uint + initial []uint32 + failNodes [][]int + okNodes int + failedRule int + lastNode int + }{ + {main: []uint{3}, initial: []uint32{1}, failNodes: [][]int{ + {0, 1, 2}, + }, okNodes: 0, failedRule: 0, lastNode: 2}, + {main: []uint{3}, initial: []uint32{2}, failNodes: [][]int{ + {0, 1}, + }, okNodes: 0, failedRule: 0, lastNode: 1}, + {main: []uint{3, 3, 3}, initial: []uint32{1, 1, 1}, failNodes: [][]int{ + {0, 1}, {0, 1, 2}, {0, 1}, + }, okNodes: 0, failedRule: 1, lastNode: 2}, + {main: []uint{1, 7, 11}, initial: []uint32{1, 3, 6}, failNodes: [][]int{ + {}, {1, 3}, {0, 2, 4, 6, 8, 10}, + }, okNodes: 5, failedRule: 2, lastNode: 10}, + {main: []uint{1, 7, 11}, initial: []uint32{1, 3, 6}, failNodes: [][]int{ + {}, {0, 1, 2, 4, 5}, {}, + }, okNodes: 1, failedRule: 1, lastNode: 5}, + {main: []uint{1, 7, 11}, initial: []uint32{1, 3, 6}, failNodes: [][]int{ + {}, {0, 1, 2, 3, 4}, {}, // 0, 1, 2, 3, 4, 5, 6 + }, okNodes: 1, failedRule: 1, lastNode: 5}, + } { + nodeLists := allocNodes(tc.main) + inPolicy := InitialPlacementPolicy{ + MaxPerRuleReplicas: tc.initial, + } + + var clientConns testConnCollector + for listIdx := range tc.failNodes { + for _, nodeIdx := range tc.failNodes[listIdx] { + clientConns.failNode(nodeLists[listIdx][nodeIdx]) + } + } + + tgt.clientConstructor = &clientConns + + err := tgt.handleInitialPlacementPolicy(inPolicy, tc.main, nil, nodeLists, obj, noEncObj) + require.Error(t, err, i) + + leftNodes := len(nodeLists[tc.failedRule]) - tc.okNodes - len(tc.failNodes[tc.failedRule]) + expErr := fmt.Sprintf("unable to reach replica number for rule #%d (required: %d, succeeded: %d, left nodes: %d)", + tc.failedRule, tc.initial[tc.failedRule], tc.okNodes, leftNodes) + require.ErrorContains(t, err, expErr, i) + if tc.okNodes > 0 { + require.ErrorIs(t, err, apistatus.ErrIncomplete, i) + } + + calledNodes := clientConns.collectConns() + for nodeIdx, node := range nodeLists[tc.failedRule] { + if nodeIdx <= tc.lastNode { + require.Contains(t, calledNodes, node.PublicKey()) + } else { + require.NotContains(t, calledNodes, node.PublicKey()) + } + } + } + }) + + t.Run("fault tolerance", func(t *testing.T) { + tgt := &distributedTarget{ + placementIterator: placementIterator{ + neoFSNet: testNetwork{}, + }, + keyStorage: keyStorage, + } + + for i, tc := range []struct { + main []uint + initial []uint32 + failNodes [][]int + okNodes [][]int + }{ + {main: []uint{3, 3, 3}, initial: []uint32{1, 1, 1}, failNodes: [][]int{ + {0, 1}, {0, 1}, {0, 1}, + }, okNodes: [][]int{ + {2}, {2}, {2}, + }}, + {main: []uint{1, 7, 11}, initial: []uint32{1, 3, 5}, failNodes: [][]int{ + {}, {1, 3}, {0, 2, 4, 5}, + }, okNodes: [][]int{ + {0}, {0, 2, 4}, {1, 3, 6, 7, 8}, + }}, + } { + nodeLists := allocNodes(tc.main) + inPolicy := InitialPlacementPolicy{ + MaxPerRuleReplicas: tc.initial, + } + + var clientConns testConnCollector + for listIdx := range tc.failNodes { + for _, nodeIdx := range tc.failNodes[listIdx] { + clientConns.failNode(nodeLists[listIdx][nodeIdx]) + } + } + + l, logBuf := testutil.NewBufferedLogger(t, zap.ErrorLevel) + + tgt.clientConstructor = &clientConns + tgt.placementIterator.log = l + + err := tgt.handleInitialPlacementPolicy(inPolicy, tc.main, nil, nodeLists, obj, noEncObj) + require.NoError(t, err, i) + + calledNodes := clientConns.collectConns() + + for listIdx := range nodeLists { + for nodeIdx, node := range nodeLists[listIdx] { + if slices.Contains(tc.failNodes[listIdx], nodeIdx) { + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.ErrorLevel, + Message: "initial REP placement to node failed", + Fields: map[string]any{ + "object": obj.Address().String(), "repRule": json.Number(strconv.Itoa(listIdx)), "nodeIdx": json.Number(strconv.Itoa(nodeIdx)), "local": false, + "error": fmt.Sprintf("could not close object stream: could not create SDK client %x: [test] forced error", node.PublicKey()), + }, + }) + } else if !slices.Contains(tc.okNodes[listIdx], nodeIdx) { + require.NotContains(t, calledNodes, node.PublicKey(), i) + continue + } + + require.Contains(t, calledNodes, node.PublicKey(), i) + } + } + } + }) + + tgt := &distributedTarget{ + placementIterator: placementIterator{ + neoFSNet: testNetwork{}, + }, + keyStorage: keyStorage, + } + + for i, tc := range []struct { + main []uint + initial []uint32 + }{ + {main: []uint{3, 3, 3}, initial: []uint32{3, 2, 1}}, + {main: []uint{3, 3, 3}, initial: []uint32{1, 2, 3}}, + {main: []uint{3, 3, 3}, initial: []uint32{1, 1, 1}}, + {main: []uint{3, 3, 3}, initial: []uint32{1, 0, 1}}, + {main: []uint{3, 3, 3}, initial: []uint32{1, 0, 0}}, + {main: []uint{3, 3, 3}, initial: []uint32{0, 0, 1}}, + {main: []uint{1, 5, 10}, initial: []uint32{0, 3, 5}}, + } { + nodeLists := allocNodes(tc.main) + inPolicy := InitialPlacementPolicy{ + MaxPerRuleReplicas: tc.initial, + } + + var clientConns testConnCollector + tgt.clientConstructor = &clientConns + + err := tgt.handleInitialPlacementPolicy(inPolicy, tc.main, nil, nodeLists, obj, noEncObj) + require.NoError(t, err, i) + + calledNodes := clientConns.collectConns() + + for listIdx := range nodeLists { + for nodeIdx := range nodeLists[listIdx] { + if nodeIdx < int(tc.initial[listIdx]) { + require.Contains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } else { + require.NotContains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } + } + } + } +} + +func TestInitialPlacement_MaxReplicas(t *testing.T) { + anyNodeKey := neofscryptotest.Signer().ECDSAPrivateKey + obj := objecttest.Object() + var noEncObj encodedObject + keyStorage := util.NewKeyStorage(&anyNodeKey, nil, nil) + + t.Run("failures", func(t *testing.T) { + tgt := &distributedTarget{ + placementIterator: placementIterator{ + log: zap.NewNop(), + neoFSNet: testNetwork{}, + }, + keyStorage: keyStorage, + } + + for i, tc := range []struct { + main []uint + maxReplicas uint32 + failNodes [][2]int + called [][2]int + }{ + {main: []uint{3, 3, 3}, maxReplicas: 1, failNodes: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {1, 1}, {2, 1}, {0, 2}, {1, 2}, {2, 2}, + }, called: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {1, 1}, {2, 1}, {0, 2}, {1, 2}, {2, 2}, + }}, + {main: []uint{3, 3, 3}, maxReplicas: 6, failNodes: [][2]int{ + {0, 0}, {1, 1}, {1, 2}, {2, 2}, + }, called: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {1, 1}, {2, 1}, {0, 2}, {1, 2}, {2, 2}, + }}, + } { + nodeLists := allocNodes(tc.main) + inPolicy := InitialPlacementPolicy{ + MaxReplicas: tc.maxReplicas, + } + + var clientConns testConnCollector + for _, failIdx := range tc.failNodes { + clientConns.failNode(nodeLists[failIdx[0]][failIdx[1]]) + } + + tgt.clientConstructor = &clientConns + + err := tgt.handleInitialPlacementPolicy(inPolicy, tc.main, nil, nodeLists, obj, noEncObj) + require.Error(t, err) + + leftNodes := islices.TwoDimSliceElementCount(nodeLists) - len(tc.called) + expErr := fmt.Sprintf("unable to reach MaxReplicas %d (succeeded: %d, left nodes: %d)", + tc.maxReplicas, len(tc.called)-len(tc.failNodes), leftNodes) + require.ErrorContains(t, err, expErr, i) + if len(tc.called)-len(tc.failNodes) > 0 { + require.ErrorIs(t, err, apistatus.ErrIncomplete, i) + } + + calledNodes := clientConns.collectConns() + require.Len(t, calledNodes, len(tc.called)) + for listIdx := range nodeLists { + for nodeIdx := range nodeLists[listIdx] { + if slices.Contains(tc.called, [2]int{listIdx, nodeIdx}) { + require.Contains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } else { + require.NotContains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } + } + } + } + }) + + t.Run("fault tolerance", func(t *testing.T) { + tgt := &distributedTarget{ + placementIterator: placementIterator{ + log: zap.NewNop(), + neoFSNet: testNetwork{}, + }, + keyStorage: keyStorage, + } + + for i, tc := range []struct { + main []uint + maxReplicas uint32 + failNodes [][2]int + called [][2]int + }{ + {main: []uint{3, 3, 3}, maxReplicas: 1, failNodes: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {1, 1}, {2, 1}, {0, 2}, {1, 2}, + }, called: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {1, 1}, {2, 1}, {0, 2}, {1, 2}, {2, 2}, + }}, + {main: []uint{3, 3, 3}, maxReplicas: 6, failNodes: [][2]int{ + {0, 0}, {1, 1}, {1, 2}, + }, called: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {1, 1}, {2, 1}, {0, 2}, {1, 2}, {2, 2}, + }}, + } { + nodeLists := allocNodes(tc.main) + inPolicy := InitialPlacementPolicy{ + MaxReplicas: tc.maxReplicas, + } + + var clientConns testConnCollector + for _, failIdx := range tc.failNodes { + clientConns.failNode(nodeLists[failIdx[0]][failIdx[1]]) + } + + tgt.clientConstructor = &clientConns + + err := tgt.handleInitialPlacementPolicy(inPolicy, tc.main, nil, nodeLists, obj, noEncObj) + require.NoError(t, err, i) + + calledNodes := clientConns.collectConns() + require.Len(t, calledNodes, len(tc.called)) + for listIdx := range nodeLists { + for nodeIdx := range nodeLists[listIdx] { + if slices.Contains(tc.called, [2]int{listIdx, nodeIdx}) { + require.Contains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } else { + require.NotContains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } + } + } + } + }) + + tgt := &distributedTarget{ + placementIterator: placementIterator{ + neoFSNet: testNetwork{}, + }, + keyStorage: keyStorage, + } + + for i, tc := range []struct { + main []uint + maxReplicas uint32 + called [][2]int + }{ + {main: []uint{3, 3, 3}, maxReplicas: 1, called: [][2]int{ + {0, 0}, + }}, + {main: []uint{3, 3, 3}, maxReplicas: 7, called: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {1, 1}, {2, 1}, {0, 2}, + }}, + {main: []uint{3, 1, 3}, maxReplicas: 6, called: [][2]int{ + {0, 0}, {1, 0}, {2, 0}, {0, 1}, {2, 1}, {0, 2}, + }}, + } { + nodeLists := allocNodes(tc.main) + inPolicy := InitialPlacementPolicy{ + MaxReplicas: tc.maxReplicas, + } + + var clientConns testConnCollector + tgt.clientConstructor = &clientConns + + err := tgt.handleInitialPlacementPolicy(inPolicy, tc.main, nil, nodeLists, obj, noEncObj) + require.NoError(t, err, i) + + calledNodes := clientConns.collectConns() + require.Len(t, calledNodes, len(tc.called)) + for listIdx := range nodeLists { + for nodeIdx := range nodeLists[listIdx] { + if slices.Contains(tc.called, [2]int{listIdx, nodeIdx}) { + require.Contains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } else { + require.NotContains(t, calledNodes, nodeLists[listIdx][nodeIdx].PublicKey(), i) + } + } + } + } +} + +func TestInitialPlacement_MaxReplicas_PreferLocal(t *testing.T) { + anyNodeSigner := neofscryptotest.Signer() + anyNodeKey := anyNodeSigner.ECDSAPrivateKey + obj := objecttest.Object() + var noEncObj encodedObject + keyStorage := util.NewKeyStorage(&anyNodeKey, nil, nil) + localPubKey := anyNodeSigner.PublicKeyBytes + + mainRepRules := []uint{3, 3, 3} + + nodeLists := allocNodes(mainRepRules) + nodeLists[1][1].SetPublicKey(localPubKey) + + inPolicy := InitialPlacementPolicy{ + MaxReplicas: 7, + PreferLocal: true, + } + + var clientConns testConnCollector + clientConns.failNode(nodeLists[1][0]) + + var localStorage testLocalStorageCallCollector + + tgt := &distributedTarget{ + placementIterator: placementIterator{ + log: zap.NewNop(), + neoFSNet: testNetwork{localPubKey: localPubKey}, + }, + keyStorage: keyStorage, + clientConstructor: &clientConns, + localStorage: &localStorage, + } + + err := tgt.handleInitialPlacementPolicy(inPolicy, mainRepRules, nil, nodeLists, obj, noEncObj) + require.NoError(t, err) + + calledNodes := clientConns.collectConns() + require.EqualValues(t, 1, localStorage.callCount()) + require.Len(t, calledNodes, 7) + require.Contains(t, calledNodes, nodeLists[1][0].PublicKey()) + require.Contains(t, calledNodes, nodeLists[1][2].PublicKey()) + require.Contains(t, calledNodes, nodeLists[0][0].PublicKey()) + require.Contains(t, calledNodes, nodeLists[2][0].PublicKey()) + require.Contains(t, calledNodes, nodeLists[0][1].PublicKey()) + require.Contains(t, calledNodes, nodeLists[2][1].PublicKey()) +} + +func TestInitialPlacement_EC(t *testing.T) { + anyNodeSigner := neofscryptotest.Signer() + anyNodeKey := anyNodeSigner.ECDSAPrivateKey + obj := objecttest.Object() + var noEncObj encodedObject + keyStorage := util.NewKeyStorage(&anyNodeKey, nil, nil) + localPubKey := anyNodeSigner.PublicKeyBytes + + mainECRules := []iec.Rule{ + {DataPartNum: 3, ParityPartNum: 1}, + {DataPartNum: 6, ParityPartNum: 3}, + {DataPartNum: 9, ParityPartNum: 4}, + } + + nodeLists := allocNodes([]uint{4, 9, 13}) + + inPolicy := InitialPlacementPolicy{ + MaxPerRuleReplicas: []uint32{0, 1, 1}, + } + + var clientConns testConnCollector + var localStorage testLocalStorageCallCollector + + tgt := &distributedTarget{ + placementIterator: placementIterator{ + log: zap.NewNop(), + neoFSNet: testNetwork{localPubKey: localPubKey}, + }, + keyStorage: keyStorage, + clientConstructor: &clientConns, + localStorage: &localStorage, + sessionSigner: neofscryptotest.Signer(), + containerNodes: mockContainerNodes{}, + } + + err := tgt.handleInitialPlacementPolicy(inPolicy, nil, mainECRules, nodeLists, obj, noEncObj) + require.NoError(t, err) + + calledNodes := clientConns.collectConns() + require.Len(t, calledNodes, 22) + for _, node := range slices.Concat(nodeLists[1], nodeLists[2]) { + require.Contains(t, calledNodes, node.PublicKey()) + } +} diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go index 0b1f99468d..f7682a15d1 100644 --- a/pkg/services/object/put/service_test.go +++ b/pkg/services/object/put/service_test.go @@ -910,6 +910,7 @@ func (x mockNodeState) CurrentEpoch() uint64 { } type inMemLocalStorage struct { + unimplementedLocalStorage mtx sync.RWMutex objs []object.Object err error @@ -946,15 +947,21 @@ func (x *inMemLocalStorage) Put(obj *object.Object, objBin []byte) error { return nil } -func (x *inMemLocalStorage) Delete(oid.Address, uint64, []oid.ID) error { +type unimplementedLocalStorage struct{} + +func (unimplementedLocalStorage) Put(*object.Object, []byte) error { panic("unimplemented") } -func (x *inMemLocalStorage) Lock(oid.Address, []oid.ID) error { +func (unimplementedLocalStorage) Delete(oid.Address, uint64, []oid.ID) error { panic("unimplemented") } -func (x *inMemLocalStorage) IsLocked(oid.Address) (bool, error) { +func (unimplementedLocalStorage) Lock(oid.Address, []oid.ID) error { + panic("unimplemented") +} + +func (unimplementedLocalStorage) IsLocked(oid.Address) (bool, error) { panic("unimplemented") } @@ -975,7 +982,7 @@ func (x nodeServices) Get(node clientcore.NodeInfo) (clientcore.MultiAddressClie if err != nil { return nil, err } - return (*serviceClient)(svc), nil + return &serviceClient{svc: svc}, nil } func (x nodeServices) SendReplicationRequestToNode(_ context.Context, reqBin []byte, node clientcore.NodeInfo) ([]byte, error) { @@ -1005,10 +1012,13 @@ func (x nodeServices) SendReplicationRequestToNode(_ context.Context, reqBin []b return nil, nil } -type serviceClient Service +type serviceClient struct { + unimplementedClient + svc *Service +} func (m *serviceClient) ObjectPutInit(ctx context.Context, hdr object.Object, _ user.Signer, _ client.PrmObjectPutInit) (client.ObjectWriter, error) { - stream, err := (*Service)(m).Put(ctx) + stream, err := m.svc.Put(ctx) if err != nil { return nil, err } @@ -1033,48 +1043,54 @@ func (m *serviceClient) ObjectPutInit(ctx context.Context, hdr object.Object, _ return (*testPayloadStream)(stream), nil } -func (m *serviceClient) ReplicateObject(context.Context, oid.ID, io.ReadSeeker, neofscrypto.Signer, bool) (*neofscrypto.Signature, error) { +type unimplementedClient struct{} + +func (unimplementedClient) ObjectPutInit(context.Context, object.Object, user.Signer, client.PrmObjectPutInit) (client.ObjectWriter, error) { + panic("unimplemented") +} + +func (unimplementedClient) ReplicateObject(context.Context, oid.ID, io.ReadSeeker, neofscrypto.Signer, bool) (*neofscrypto.Signature, error) { panic("unimplemented") } -func (m *serviceClient) ObjectDelete(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectDelete) (oid.ID, error) { +func (unimplementedClient) ObjectDelete(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectDelete) (oid.ID, error) { panic("unimplemented") } -func (m *serviceClient) ObjectGetInit(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectGet) (object.Object, *client.PayloadReader, error) { +func (unimplementedClient) ObjectGetInit(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectGet) (object.Object, *client.PayloadReader, error) { panic("unimplemented") } -func (m *serviceClient) ObjectHead(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHead) (*object.Object, error) { +func (unimplementedClient) ObjectHead(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHead) (*object.Object, error) { panic("unimplemented") } -func (m *serviceClient) ObjectSearchInit(context.Context, cid.ID, user.Signer, client.PrmObjectSearch) (*client.ObjectListReader, error) { +func (unimplementedClient) ObjectSearchInit(context.Context, cid.ID, user.Signer, client.PrmObjectSearch) (*client.ObjectListReader, error) { panic("unimplemented") } -func (m *serviceClient) SearchObjects(context.Context, cid.ID, object.SearchFilters, []string, string, neofscrypto.Signer, client.SearchObjectsOptions) ([]client.SearchResultItem, string, error) { +func (unimplementedClient) SearchObjects(context.Context, cid.ID, object.SearchFilters, []string, string, neofscrypto.Signer, client.SearchObjectsOptions) ([]client.SearchResultItem, string, error) { panic("unimplemented") } -func (m *serviceClient) ObjectRangeInit(context.Context, cid.ID, oid.ID, uint64, uint64, user.Signer, client.PrmObjectRange) (*client.ObjectRangeReader, error) { +func (unimplementedClient) ObjectRangeInit(context.Context, cid.ID, oid.ID, uint64, uint64, user.Signer, client.PrmObjectRange) (*client.ObjectRangeReader, error) { panic("unimplemented") } -func (m *serviceClient) ObjectHash(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHash) ([][]byte, error) { +func (unimplementedClient) ObjectHash(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHash) ([][]byte, error) { panic("unimplemented") } -func (m *serviceClient) AnnounceLocalTrust(context.Context, uint64, []apireputation.Trust, client.PrmAnnounceLocalTrust) error { +func (unimplementedClient) AnnounceLocalTrust(context.Context, uint64, []apireputation.Trust, client.PrmAnnounceLocalTrust) error { // TODO: interfaces are oversaturated. This will never be needed to server object PUT. Refactor this. panic("unimplemented") } -func (m *serviceClient) AnnounceIntermediateTrust(context.Context, uint64, apireputation.PeerToPeerTrust, client.PrmAnnounceIntermediateTrust) error { +func (unimplementedClient) AnnounceIntermediateTrust(context.Context, uint64, apireputation.PeerToPeerTrust, client.PrmAnnounceIntermediateTrust) error { panic("unimplemented") } -func (m *serviceClient) ForEachGRPCConn(context.Context, func(context.Context, *grpc.ClientConn) error) error { +func (unimplementedClient) ForEachGRPCConn(context.Context, func(context.Context, *grpc.ClientConn) error) error { panic("unimplemented") } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index e7f91c1ae1..0b6291093f 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -290,6 +290,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { }, metaSigner: prm.localSignerRFC6979, localOnly: prm.common.LocalOnly(), + // initialPlacementPolicy: prm.cnr.InitialPlacementPolicy(), // FIXME } } diff --git a/pkg/services/object/put/util.go b/pkg/services/object/put/util.go index 380bcf2c14..892bd5ceb4 100644 --- a/pkg/services/object/put/util.go +++ b/pkg/services/object/put/util.go @@ -1,9 +1,12 @@ package putsvc import ( + "bytes" "slices" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/netmap" + "go.uber.org/zap" ) func localNodeInSets(n NeoFSNetwork, ss [][]netmap.NodeInfo) bool { @@ -17,3 +20,20 @@ func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool { return n.IsLocalNodePublicKey(node.PublicKey()) }) } + +func nodeIndexInSet(n netmap.NodeInfo, nn []netmap.NodeInfo) int { + return slices.IndexFunc(nn, func(node netmap.NodeInfo) bool { + return bytes.Equal(node.PublicKey(), n.PublicKey()) + }) +} + +func wrapIncompleteError(cause error) error { + var st apistatus.Incomplete + st.SetMessage(cause.Error()) + return st +} + +func logNodeConversionError(l *zap.Logger, node netmap.NodeInfo, err error) { + l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + zap.String("public key", netmap.StringifyPublicKey(node)), zap.Error(err)) +}