diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index e27791c13b..525bae37b6 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -81,12 +81,6 @@ func (e *StorageEngine) close(releasePools bool) error { e.mtx.RLock() defer e.mtx.RUnlock() - if releasePools { - for _, p := range e.shardPools { - p.Release() - } - } - for id, sh := range e.shards { if err := sh.Close(); err != nil { e.log.Debug("could not close shard", @@ -94,6 +88,9 @@ func (e *StorageEngine) close(releasePools bool) error { zap.Error(err), ) } + if releasePools { + close(sh.putCh) + } } return nil @@ -201,9 +198,6 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error { } e.mtx.RLock() - for _, pool := range e.shardPools { - pool.Tune(int(e.shardPoolSize)) - } var shardsToRemove []string // shards IDs var shardsToAdd []string // shard config identifiers (blobstor paths concatenation) diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 785e55904a..66a970240d 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -208,7 +208,6 @@ func TestReload(t *testing.T) { // no new paths => no new shards require.Equal(t, shardNum, len(e.shards)) - require.Equal(t, shardNum, len(e.shardPools)) newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum)) @@ -222,7 +221,6 @@ func TestReload(t *testing.T) { require.NoError(t, e.Reload(rcfg)) require.Equal(t, shardNum+1, len(e.shards)) - require.Equal(t, shardNum+1, len(e.shardPools)) }) t.Run("remove shards", func(t *testing.T) { @@ -240,7 +238,6 @@ func TestReload(t *testing.T) { // removed one require.Equal(t, shardNum-1, len(e.shards)) - require.Equal(t, shardNum-1, len(e.shardPools)) }) } @@ -268,7 +265,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str } require.Equal(t, num, len(e.shards)) - require.Equal(t, num, len(e.shardPools)) require.NoError(t, e.Open()) require.NoError(t, e.Init()) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index c9db679e6b..4f83e0718d 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -12,9 +12,9 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" - "github.com/nspcc-dev/neofs-node/pkg/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -30,8 +30,6 @@ type StorageEngine struct { shards map[string]shardWrapper - shardPools map[string]util.WorkerPool - closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup @@ -53,10 +51,19 @@ type shardInterface interface { HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) } +type putTask struct { + addr oid.Address + obj *objectSDK.Object + objBin []byte + retCh chan error +} + type shardWrapper struct { errorCount *atomic.Uint32 *shard.Shard shardIface shardInterface // TODO: make Shard a shardInterface + putCh chan putTask + engine *StorageEngine } type setModeRequest struct { @@ -242,12 +249,11 @@ func New(opts ...Option) *StorageEngine { } return &StorageEngine{ - cfg: c, - mtx: new(sync.RWMutex), - shards: make(map[string]shardWrapper), - shardPools: make(map[string]util.WorkerPool), - closeCh: make(chan struct{}), - setModeCh: make(chan setModeRequest), + cfg: c, + mtx: new(sync.RWMutex), + shards: make(map[string]shardWrapper), + closeCh: make(chan struct{}), + setModeCh: make(chan setModeRequest), sortShardsFn: (*StorageEngine).sortedShards, } diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 4a22fe5d25..ea1e8968eb 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -28,7 +28,6 @@ import ( usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/nspcc-dev/tzhash/tz" - "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -87,18 +86,11 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { engine := New() for _, s := range shards { - pool, err := ants.NewPool(10, ants.WithNonblocking(true)) + err := engine.addShard(s) if err != nil { panic(err) } - - engine.shards[s.ID().String()] = shardWrapper{ - errorCount: new(atomic.Uint32), - Shard: s, - } - engine.shardPools[s.ID().String()] = pool } - return engine } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index fddff9c370..a495c29f07 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -3,11 +3,12 @@ package engine import ( "errors" "fmt" + "maps" + "slices" "github.com/nspcc-dev/hrw/v2" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -15,11 +16,6 @@ import ( const defaultEvacuateBatchSize = 100 -type pooledShard struct { - shardWrapper - pool util.WorkerPool -} - var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from a set of given shards to other shards available to @@ -60,13 +56,7 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. - shards := make([]pooledShard, 0, len(e.shards)) - for id := range e.shards { - shards = append(shards, pooledShard{ - shardWrapper: e.shards[id], - pool: e.shardPools[id], - }) - } + shards := slices.Collect(maps.Values(e.shards)) e.mtx.RUnlock() shardMap := make(map[string]*shard.Shard) @@ -115,9 +105,9 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, obj, nil) - if putDone || exists { - if putDone { + err = e.putToShard(shards[j], j, addr, obj, nil) + if err == nil || errors.Is(err, errExists) { + if !errors.Is(err, errExists) { e.log.Debug("object is moved to another shard", zap.String("from", sidList[n]), zap.Stringer("to", shards[j].ID()), diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 452505f0c2..80d340209b 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -107,7 +107,6 @@ func TestEvacuateShard(t *testing.T) { e.mtx.Lock() delete(e.shards, evacuateShardID) - delete(e.shardPools, evacuateShardID) e.mtx.Unlock() checkHasObjects(t) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index c48dd98927..0bf94e5991 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "fmt" "time" @@ -8,15 +9,18 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) -var errPutShard = errors.New("could not put object to any shard") +var ( + errPutShard = errors.New("could not put object to any shard") + + errOverloaded = errors.New("overloaded") + errExists = errors.New("already exists") +) // Put saves an object to local storage. objBin and hdrLen parameters are // optional and used to optimize out object marshaling, when used both must @@ -84,43 +88,15 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte) error { default: } - var ( - bestPool util.WorkerPool - bestShard shardWrapper - overloaded bool - ) + var overloaded bool for i, sh := range e.sortedShards(addr) { - e.mtx.RLock() - pool, ok := e.shardPools[sh.ID().String()] - if ok && bestPool == nil { - bestShard = sh - bestPool = pool - } - e.mtx.RUnlock() - if !ok { - // Shard was concurrently removed, skip. - continue - } + err = e.putToShard(sh, i, addr, obj, objBin) - putDone, exists, over := e.putToShard(sh, i, pool, addr, obj, objBin) - if putDone || exists { + if err == nil || errors.Is(err, errExists) { return nil } - if over { - overloaded = true - } - } - - e.log.Debug("failed to put object to shards, trying the best one more", - zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID())) - - if e.objectPutTimeout > 0 { - success, over := e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin) - if success { - return nil - } - if over { + if errors.Is(err, errOverloaded) { overloaded = true } } @@ -131,111 +107,67 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte) error { return busy } - return errPutShard + return err } // putToShard puts object to sh. -// First return value is true iff put has been successfully done. -// Second return value is true iff object already exists. -// Third return value is true iff object cannot be put because of max concurrent load. -func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte) (bool, bool, bool) { +// Returns error from shard put or errOverloaded (when shard pool can't accept +// the task) or errExists (if object is already stored there). +func (e *StorageEngine) putToShard(sh shardWrapper, ind int, addr oid.Address, obj *objectSDK.Object, objBin []byte) error { var ( - alreadyExists bool - err error - exitCh = make(chan struct{}) - id = sh.ID() - overloaded bool - putSuccess bool + exitCh = make(chan error) + ctx, cancel = context.WithTimeout(context.TODO(), e.objectPutTimeout+time.Millisecond) // 1ms to avoid zero value. ) + defer cancel() + + select { + case sh.putCh <- putTask{addr: addr, obj: obj, objBin: objBin, retCh: exitCh}: + case <-ctx.Done(): + return errOverloaded + } - err = pool.Submit(func() { - defer close(exitCh) + err := <-exitCh + return err +} - exists, err := sh.Exists(addr, false) +func (sh *shardWrapper) shardPutThread() { + var ( + id = sh.ID() + ) + for t := range sh.putCh { + exists, err := sh.Exists(t.addr, false) if err != nil { - e.log.Warn("object put: check object existence", - zap.Stringer("addr", addr), + sh.engine.log.Warn("object put: check object existence", + zap.Stringer("addr", t.addr), zap.Stringer("shard", id), zap.Error(err)) if shard.IsErrObjectExpired(err) { // object is already found but // expired => do nothing with it - alreadyExists = true + err = errExists } - - return // this is not ErrAlreadyRemoved error so we can go to the next shard + t.retCh <- err + continue // this is not ErrAlreadyRemoved error so we can go to the next task } - alreadyExists = exists - if alreadyExists { - if ind != 0 { - err = sh.ToMoveIt(addr) - if err != nil { - e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", id), - zap.Error(err), - ) - } - } - - e.log.Debug("object put: object already exists", - zap.Stringer("shard", id), - zap.Stringer("addr", addr)) - - return + if exists { + t.retCh <- errExists + continue } - err = sh.Put(obj, objBin) + err = sh.Put(t.obj, t.objBin) if err != nil { if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) { - e.log.Warn("could not put object to shard", + sh.engine.log.Warn("could not put object to shard", zap.Stringer("shard_id", id), zap.Error(err)) - return - } - - e.reportShardError(sh, "could not put object to shard", err) - return - } - - putSuccess = true - }) - if err != nil { - e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err)) - overloaded = errors.Is(err, ants.ErrPoolOverload) - close(exitCh) - } - - <-exitCh - - return putSuccess, alreadyExists, overloaded -} - -func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte) (bool, bool) { - const putCooldown = 100 * time.Millisecond - var ( - overloaded bool - ticker = time.NewTicker(putCooldown) - timer = time.NewTimer(e.objectPutTimeout) - ) - - for { - select { - case <-timer.C: - e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.objectPutTimeout)) - return false, overloaded - case <-ticker.C: - putDone, exists, over := e.putToShard(sh, ind, pool, addr, obj, objBin) - if over { - overloaded = true - ticker.Reset(putCooldown) - continue + } else { + sh.engine.reportShardError(*sh, "could not put object to shard", err) } - - return putDone || exists, false } + t.retCh <- err } } @@ -243,10 +175,9 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool ut func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) error { var ( successCount int - pool util.WorkerPool - ok bool allShards = e.unsortedShards() addr = object.AddressOf(obj) + lastError error ) e.log.Debug("broadcasting object to all shards", @@ -256,19 +187,10 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er zap.Int("shard_count", len(allShards))) for i, sh := range allShards { - e.mtx.RLock() - pool, ok = e.shardPools[sh.ID().String()] - e.mtx.RUnlock() - - if !ok { - // Shard was concurrently removed, skip. - continue - } - - putDone, exists, overloaded := e.putToShard(sh, i, pool, addr, obj, objBin) - if putDone || exists { + err := e.putToShard(sh, i, addr, obj, objBin) + if err == nil || errors.Is(err, errExists) { successCount++ - if exists { + if errors.Is(err, errExists) { e.log.Debug("object already exists on shard during broadcast", zap.Stringer("type", obj.Type()), zap.Stringer("associated", obj.AssociatedObject()), @@ -283,13 +205,19 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er } continue } + lastError = err + if errors.Is(err, apistatus.ErrLockNonRegularObject) || + errors.Is(err, apistatus.ErrObjectLocked) || + errors.Is(err, apistatus.ErrObjectAlreadyRemoved) { + break + } e.log.Warn("failed to put object on shard during broadcast", zap.Stringer("type", obj.Type()), zap.Stringer("shard", sh.ID()), zap.Stringer("addr", addr), zap.Stringer("associated", obj.AssociatedObject()), - zap.Bool("overloaded", overloaded)) + zap.Error(err)) } e.log.Debug("object broadcast completed", @@ -300,7 +228,7 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er zap.Int("total_shards", len(allShards))) if successCount == 0 { - return fmt.Errorf("failed to broadcast %s object to any shard", obj.Type()) + return fmt.Errorf("failed to broadcast %s object to any shard, last error: %w", obj.Type(), lastError) } return nil diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 5f6903be01..ce0dedce64 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -2,6 +2,8 @@ package engine import ( "fmt" + "maps" + "slices" "sync/atomic" "github.com/google/uuid" @@ -10,7 +12,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -115,22 +116,23 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { e.mtx.Lock() defer e.mtx.Unlock() - pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) - if err != nil { - return fmt.Errorf("could not create pool: %w", err) - } - strID := sh.ID().String() if _, ok := e.shards[strID]; ok { return fmt.Errorf("shard with id %s was already added", strID) } - e.shards[strID] = shardWrapper{ + var shw = shardWrapper{ errorCount: new(atomic.Uint32), Shard: sh, + putCh: make(chan putTask), + engine: e, } - e.shardPools[strID] = pool + for range e.shardPoolSize { + go shw.shardPutThread() + } + + e.shards[strID] = shw return nil } @@ -154,12 +156,6 @@ func (e *StorageEngine) removeShards(ids ...string) { ss = append(ss, sh) delete(e.shards, id) - pool, ok := e.shardPools[id] - if ok { - pool.Release() - delete(e.shardPools, id) - } - e.log.Info("shard has been removed", zap.String("id", id)) } @@ -173,6 +169,7 @@ func (e *StorageEngine) removeShards(ids ...string) { zap.Error(err), ) } + close(sh.putCh) } } @@ -206,13 +203,7 @@ func (e *StorageEngine) unsortedShards() []shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() - shards := make([]shardWrapper, 0, len(e.shards)) - - for _, sh := range e.shards { - shards = append(shards, sh) - } - - return shards + return slices.Collect(maps.Values(e.shards)) } func (e *StorageEngine) getShard(id string) shardWrapper { diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go index 67a006b5ab..c9bcb500e2 100644 --- a/pkg/local_object_storage/engine/shards_test.go +++ b/pkg/local_object_storage/engine/shards_test.go @@ -16,7 +16,6 @@ func TestRemoveShard(t *testing.T) { os.RemoveAll(t.Name()) }) - require.Equal(t, numOfShards, len(e.shardPools)) require.Equal(t, numOfShards, len(e.shards)) removedNum := numOfShards / 2 @@ -36,7 +35,6 @@ func TestRemoveShard(t *testing.T) { } } - require.Equal(t, numOfShards-removedNum, len(e.shardPools)) require.Equal(t, numOfShards-removedNum, len(e.shards)) for id, removed := range mSh {