Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type distributedTarget struct {
// When object from request is an EC part, ecPart.RuleIndex is >= 0.
// Undefined when policy has no EC rules.
ecPart iec.PartInfo

initialPlacementPolicy *InitialPlacementPolicy
}

type nodeDesc struct {
Expand Down Expand Up @@ -182,7 +184,30 @@ func (t *distributedTarget) saveObject(obj object.Object, encObj encodedObject)

repRules := t.containerNodes.PrimaryCounts()
ecRules := t.containerNodes.ECRules()
if typ := obj.Type(); len(repRules) > 0 || typ == object.TypeTombstone || typ == object.TypeLock || typ == object.TypeLink {
typ := obj.Type()

initialPUT := t.sessionSigner != nil || !t.localOnly
if initialPUT && typ == object.TypeRegular && t.initialPlacementPolicy != nil {
if t.sessionSigner != nil || len(ecRules) == 0 {
err := t.handleInitialPlacementPolicy(*t.initialPlacementPolicy, repRules, ecRules, objNodeLists, obj, encObj)
if err != nil {
return fmt.Errorf("initial placement failed: %w", err)
}
return nil
} else if t.ecPart.RuleIndex < 0 { // prevent EC encoding
err := t.handleInitialPlacementPolicy(InitialPlacementPolicy{
MaxReplicas: t.initialPlacementPolicy.MaxReplicas,
PreferLocal: t.initialPlacementPolicy.PreferLocal,
MaxPerRuleReplicas: t.initialPlacementPolicy.MaxPerRuleReplicas[:len(repRules)],
}, repRules, nil, objNodeLists[:len(repRules)], obj, encObj)
if err != nil {
return fmt.Errorf("initial placement failed: %w", err)
}
return nil
} // initial policy does not apply to individual EC parts
}

if len(repRules) > 0 || typ == object.TypeTombstone || typ == object.TypeLock || typ == object.TypeLink {
broadcast := typ == object.TypeTombstone || typ == object.TypeLink || (!t.localOnly && typ == object.TypeLock) || len(obj.Children()) > 0

useRepRules := repRules
Expand Down Expand Up @@ -547,9 +572,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, replCounts []uint,
busy.SetMessage(retErr.Error())
return busy
}
var inc = new(apistatus.Incomplete)
inc.SetMessage(retErr.Error())
return inc
return wrapIncompleteError(retErr)
}
nextNodeGroupKeys = slices.Grow(nextNodeGroupKeys, int(replRem))[:0]
for ; nodesCounters[listInd].processed < listLen && uint(len(nextNodeGroupKeys)) < replRem; nodesCounters[listInd].processed++ {
Expand Down Expand Up @@ -578,8 +601,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, replCounts []uint,
}
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
l.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr))
logNodeConversionError(l, nodeLists[listInd][j], nr.convertErr)
if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure
err := fmt.Errorf("%w (last node error: failed to decode network addresses: %w)",
errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed - 1},
Expand Down Expand Up @@ -638,8 +660,7 @@ broadcast:
if nr.convertErr != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
l.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(nr.convertErr))
logNodeConversionError(l, nodeLists[i][j], nr.convertErr)
continue // to send as many replicas as possible
}
wg.Add(1)
Expand Down
21 changes: 15 additions & 6 deletions pkg/services/object/put/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@ func (t *distributedTarget) ecAndSaveObject(signer neofscrypto.Signer, obj objec
continue
}

payloadParts, err := iec.Encode(ecRules[i], obj.Payload())
payloadParts, err := t.ecAndSaveObjectByRule(signer, obj, i, ecRules[i], nodeLists[i])
if err != nil {
return fmt.Errorf("split object payload into EC parts for rule #%d (%s): %w", i, ecRules[i], err)
}

if err := t.applyECRule(signer, obj, i, payloadParts, nodeLists[i]); err != nil {
return fmt.Errorf("apply EC rule #%d (%s): %w", i, ecRules[i], err)
return err
}

for j := i + 1; j < len(ecRules); j++ {
Expand All @@ -41,6 +37,19 @@ func (t *distributedTarget) ecAndSaveObject(signer neofscrypto.Signer, obj objec
return nil
}

func (t *distributedTarget) ecAndSaveObjectByRule(signer neofscrypto.Signer, obj object.Object, ruleIdx int, rule iec.Rule, nodeLists []netmap.NodeInfo) ([][]byte, error) {
payloadParts, err := iec.Encode(rule, obj.Payload())
if err != nil {
return nil, fmt.Errorf("split object payload into EC parts for rule #%d (%s): %w", ruleIdx, rule, err)
}

if err := t.applyECRule(signer, obj, ruleIdx, payloadParts, nodeLists); err != nil {
return nil, fmt.Errorf("apply EC rule #%d (%s): %w", ruleIdx, rule, err)
}

return payloadParts, nil
}

func (t *distributedTarget) applyECRule(signer neofscrypto.Signer, obj object.Object, ruleIdx int, payloadParts [][]byte, nodeList []netmap.NodeInfo) error {
var eg errgroup.Group

Expand Down
Loading
Loading