From 1bdfe3b6cee229dec2b5d2200e3f47648f0ca2bf Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 10 Nov 2025 23:10:22 +0300 Subject: [PATCH 1/2] engine: improve multishard error handling on put 1. If any shard returns something like "object is locked" it's final, there is no point in continuing iterations. 2. Wrap and return the last error from the method, don't lose it. 3. Simplify putToShard signature, single error is sufficient here. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/evacuate.go | 6 +- pkg/local_object_storage/engine/put.go | 68 ++++++++++++--------- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index fddff9c370..994fe398aa 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -115,9 +115,9 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, obj, nil) - if putDone || exists { - if putDone { + err = e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, obj, nil) + if err == nil || errors.Is(err, errExists) { + if !errors.Is(err, errExists) { e.log.Debug("object is moved to another shard", zap.String("from", sidList[n]), zap.Stringer("to", shards[j].ID()), diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index c48dd98927..a6c644eb31 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -16,7 +16,12 @@ import ( "go.uber.org/zap" ) -var errPutShard = errors.New("could not put object to any shard") +var ( + errPutShard = errors.New("could not put object to any shard") + + errOverloaded = ants.ErrPoolOverload + errExists = errors.New("already exists") +) // Put saves an object to local storage. objBin and hdrLen parameters are // optional and used to optimize out object marshaling, when used both must @@ -103,11 +108,11 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte) error { continue } - putDone, exists, over := e.putToShard(sh, i, pool, addr, obj, objBin) - if putDone || exists { + err = e.putToShard(sh, i, pool, addr, obj, objBin) + if err == nil || errors.Is(err, errExists) { return nil } - if over { + if errors.Is(err, errOverloaded) { overloaded = true } } @@ -135,17 +140,15 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte) error { } // putToShard puts object to sh. -// First return value is true iff put has been successfully done. -// Second return value is true iff object already exists. -// Third return value is true iff object cannot be put because of max concurrent load. -func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte) (bool, bool, bool) { +// Returns error from shard put or errOverloaded (when shard pool can't accept +// the task) or errExists (if object is already stored there). +func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte) error { var ( alreadyExists bool err error exitCh = make(chan struct{}) id = sh.ID() - overloaded bool - putSuccess bool + putError error ) err = pool.Submit(func() { @@ -186,31 +189,33 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo return } - err = sh.Put(obj, objBin) - if err != nil { - if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, common.ErrReadOnly) || - errors.Is(err, common.ErrNoSpace) { + putError = sh.Put(obj, objBin) + if putError != nil { + if errors.Is(putError, shard.ErrReadOnlyMode) || errors.Is(putError, common.ErrReadOnly) || + errors.Is(putError, common.ErrNoSpace) { e.log.Warn("could not put object to shard", zap.Stringer("shard_id", id), - zap.Error(err)) + zap.Error(putError)) return } - e.reportShardError(sh, "could not put object to shard", err) + e.reportShardError(sh, "could not put object to shard", putError) return } - - putSuccess = true }) if err != nil { e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err)) - overloaded = errors.Is(err, ants.ErrPoolOverload) close(exitCh) + return err } <-exitCh - return putSuccess, alreadyExists, overloaded + if alreadyExists { + return errExists + } + + return putError } func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte) (bool, bool) { @@ -227,14 +232,14 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool ut e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.objectPutTimeout)) return false, overloaded case <-ticker.C: - putDone, exists, over := e.putToShard(sh, ind, pool, addr, obj, objBin) - if over { + err := e.putToShard(sh, ind, pool, addr, obj, objBin) + if errors.Is(err, errOverloaded) { overloaded = true ticker.Reset(putCooldown) continue } - return putDone || exists, false + return err == nil || errors.Is(err, errExists), false } } } @@ -247,6 +252,7 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er ok bool allShards = e.unsortedShards() addr = object.AddressOf(obj) + lastError error ) e.log.Debug("broadcasting object to all shards", @@ -265,10 +271,10 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er continue } - putDone, exists, overloaded := e.putToShard(sh, i, pool, addr, obj, objBin) - if putDone || exists { + err := e.putToShard(sh, i, pool, addr, obj, objBin) + if err == nil || errors.Is(err, errExists) { successCount++ - if exists { + if errors.Is(err, errExists) { e.log.Debug("object already exists on shard during broadcast", zap.Stringer("type", obj.Type()), zap.Stringer("associated", obj.AssociatedObject()), @@ -283,13 +289,19 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er } continue } + lastError = err + if errors.Is(err, apistatus.ErrLockNonRegularObject) || + errors.Is(err, apistatus.ErrObjectLocked) || + errors.Is(err, apistatus.ErrObjectAlreadyRemoved) { + break + } e.log.Warn("failed to put object on shard during broadcast", zap.Stringer("type", obj.Type()), zap.Stringer("shard", sh.ID()), zap.Stringer("addr", addr), zap.Stringer("associated", obj.AssociatedObject()), - zap.Bool("overloaded", overloaded)) + zap.Error(err)) } e.log.Debug("object broadcast completed", @@ -300,7 +312,7 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er zap.Int("total_shards", len(allShards))) if successCount == 0 { - return fmt.Errorf("failed to broadcast %s object to any shard", obj.Type()) + return fmt.Errorf("failed to broadcast %s object to any shard, last error: %w", obj.Type(), lastError) } return nil From bafa66bd9e29ec023d489cc5368b90d1d829b95a Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 20 Nov 2025 22:00:51 +0300 Subject: [PATCH 2/2] engine: drop ants.Pool --- pkg/local_object_storage/engine/control.go | 12 +- .../engine/control_test.go | 4 - pkg/local_object_storage/engine/engine.go | 24 ++- .../engine/engine_test.go | 10 +- pkg/local_object_storage/engine/evacuate.go | 18 +- .../engine/evacuate_test.go | 1 - pkg/local_object_storage/engine/put.go | 168 +++++------------- pkg/local_object_storage/engine/shards.go | 33 ++-- .../engine/shards_test.go | 2 - 9 files changed, 77 insertions(+), 195 deletions(-) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index e27791c13b..525bae37b6 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -81,12 +81,6 @@ func (e *StorageEngine) close(releasePools bool) error { e.mtx.RLock() defer e.mtx.RUnlock() - if releasePools { - for _, p := range e.shardPools { - p.Release() - } - } - for id, sh := range e.shards { if err := sh.Close(); err != nil { e.log.Debug("could not close shard", @@ -94,6 +88,9 @@ func (e *StorageEngine) close(releasePools bool) error { zap.Error(err), ) } + if releasePools { + close(sh.putCh) + } } return nil @@ -201,9 +198,6 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error { } e.mtx.RLock() - for _, pool := range e.shardPools { - pool.Tune(int(e.shardPoolSize)) - } var shardsToRemove []string // shards IDs var shardsToAdd []string // shard config identifiers (blobstor paths concatenation) diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 785e55904a..66a970240d 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -208,7 +208,6 @@ func TestReload(t *testing.T) { // no new paths => no new shards require.Equal(t, shardNum, len(e.shards)) - require.Equal(t, shardNum, len(e.shardPools)) newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum)) @@ -222,7 +221,6 @@ func TestReload(t *testing.T) { require.NoError(t, e.Reload(rcfg)) require.Equal(t, shardNum+1, len(e.shards)) - require.Equal(t, shardNum+1, len(e.shardPools)) }) t.Run("remove shards", func(t *testing.T) { @@ -240,7 +238,6 @@ func TestReload(t *testing.T) { // removed one require.Equal(t, shardNum-1, len(e.shards)) - require.Equal(t, shardNum-1, len(e.shardPools)) }) } @@ -268,7 +265,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str } require.Equal(t, num, len(e.shards)) - require.Equal(t, num, len(e.shardPools)) require.NoError(t, e.Open()) require.NoError(t, e.Init()) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index c9db679e6b..4f83e0718d 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -12,9 +12,9 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" - "github.com/nspcc-dev/neofs-node/pkg/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -30,8 +30,6 @@ type StorageEngine struct { shards map[string]shardWrapper - shardPools map[string]util.WorkerPool - closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup @@ -53,10 +51,19 @@ type shardInterface interface { HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) } +type putTask struct { + addr oid.Address + obj *objectSDK.Object + objBin []byte + retCh chan error +} + type shardWrapper struct { errorCount *atomic.Uint32 *shard.Shard shardIface shardInterface // TODO: make Shard a shardInterface + putCh chan putTask + engine *StorageEngine } type setModeRequest struct { @@ -242,12 +249,11 @@ func New(opts ...Option) *StorageEngine { } return &StorageEngine{ - cfg: c, - mtx: new(sync.RWMutex), - shards: make(map[string]shardWrapper), - shardPools: make(map[string]util.WorkerPool), - closeCh: make(chan struct{}), - setModeCh: make(chan setModeRequest), + cfg: c, + mtx: new(sync.RWMutex), + shards: make(map[string]shardWrapper), + closeCh: make(chan struct{}), + setModeCh: make(chan setModeRequest), sortShardsFn: (*StorageEngine).sortedShards, } diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 4a22fe5d25..ea1e8968eb 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -28,7 +28,6 @@ import ( usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/nspcc-dev/tzhash/tz" - "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -87,18 +86,11 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { engine := New() for _, s := range shards { - pool, err := ants.NewPool(10, ants.WithNonblocking(true)) + err := engine.addShard(s) if err != nil { panic(err) } - - engine.shards[s.ID().String()] = shardWrapper{ - errorCount: new(atomic.Uint32), - Shard: s, - } - engine.shardPools[s.ID().String()] = pool } - return engine } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 994fe398aa..a495c29f07 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -3,11 +3,12 @@ package engine import ( "errors" "fmt" + "maps" + "slices" "github.com/nspcc-dev/hrw/v2" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -15,11 +16,6 @@ import ( const defaultEvacuateBatchSize = 100 -type pooledShard struct { - shardWrapper - pool util.WorkerPool -} - var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from a set of given shards to other shards available to @@ -60,13 +56,7 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. - shards := make([]pooledShard, 0, len(e.shards)) - for id := range e.shards { - shards = append(shards, pooledShard{ - shardWrapper: e.shards[id], - pool: e.shardPools[id], - }) - } + shards := slices.Collect(maps.Values(e.shards)) e.mtx.RUnlock() shardMap := make(map[string]*shard.Shard) @@ -115,7 +105,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - err = e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, obj, nil) + err = e.putToShard(shards[j], j, addr, obj, nil) if err == nil || errors.Is(err, errExists) { if !errors.Is(err, errExists) { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 452505f0c2..80d340209b 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -107,7 +107,6 @@ func TestEvacuateShard(t *testing.T) { e.mtx.Lock() delete(e.shards, evacuateShardID) - delete(e.shardPools, evacuateShardID) e.mtx.Unlock() checkHasObjects(t) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index a6c644eb31..0bf94e5991 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "fmt" "time" @@ -8,18 +9,16 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) var ( errPutShard = errors.New("could not put object to any shard") - errOverloaded = ants.ErrPoolOverload + errOverloaded = errors.New("overloaded") errExists = errors.New("already exists") ) @@ -89,26 +88,11 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte) error { default: } - var ( - bestPool util.WorkerPool - bestShard shardWrapper - overloaded bool - ) + var overloaded bool for i, sh := range e.sortedShards(addr) { - e.mtx.RLock() - pool, ok := e.shardPools[sh.ID().String()] - if ok && bestPool == nil { - bestShard = sh - bestPool = pool - } - e.mtx.RUnlock() - if !ok { - // Shard was concurrently removed, skip. - continue - } + err = e.putToShard(sh, i, addr, obj, objBin) - err = e.putToShard(sh, i, pool, addr, obj, objBin) if err == nil || errors.Is(err, errExists) { return nil } @@ -117,130 +101,73 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte) error { } } - e.log.Debug("failed to put object to shards, trying the best one more", - zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID())) - - if e.objectPutTimeout > 0 { - success, over := e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin) - if success { - return nil - } - if over { - overloaded = true - } - } - if overloaded { var busy = new(apistatus.Busy) busy.SetMessage(errPutShard.Error()) return busy } - return errPutShard + return err } // putToShard puts object to sh. // Returns error from shard put or errOverloaded (when shard pool can't accept // the task) or errExists (if object is already stored there). -func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte) error { +func (e *StorageEngine) putToShard(sh shardWrapper, ind int, addr oid.Address, obj *objectSDK.Object, objBin []byte) error { var ( - alreadyExists bool - err error - exitCh = make(chan struct{}) - id = sh.ID() - putError error + exitCh = make(chan error) + ctx, cancel = context.WithTimeout(context.TODO(), e.objectPutTimeout+time.Millisecond) // 1ms to avoid zero value. ) + defer cancel() - err = pool.Submit(func() { - defer close(exitCh) + select { + case sh.putCh <- putTask{addr: addr, obj: obj, objBin: objBin, retCh: exitCh}: + case <-ctx.Done(): + return errOverloaded + } + + err := <-exitCh + return err +} - exists, err := sh.Exists(addr, false) +func (sh *shardWrapper) shardPutThread() { + var ( + id = sh.ID() + ) + for t := range sh.putCh { + exists, err := sh.Exists(t.addr, false) if err != nil { - e.log.Warn("object put: check object existence", - zap.Stringer("addr", addr), + sh.engine.log.Warn("object put: check object existence", + zap.Stringer("addr", t.addr), zap.Stringer("shard", id), zap.Error(err)) if shard.IsErrObjectExpired(err) { // object is already found but // expired => do nothing with it - alreadyExists = true + err = errExists } - - return // this is not ErrAlreadyRemoved error so we can go to the next shard + t.retCh <- err + continue // this is not ErrAlreadyRemoved error so we can go to the next task } - alreadyExists = exists - if alreadyExists { - if ind != 0 { - err = sh.ToMoveIt(addr) - if err != nil { - e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", id), - zap.Error(err), - ) - } - } - - e.log.Debug("object put: object already exists", - zap.Stringer("shard", id), - zap.Stringer("addr", addr)) - - return + if exists { + t.retCh <- errExists + continue } - putError = sh.Put(obj, objBin) - if putError != nil { - if errors.Is(putError, shard.ErrReadOnlyMode) || errors.Is(putError, common.ErrReadOnly) || - errors.Is(putError, common.ErrNoSpace) { - e.log.Warn("could not put object to shard", + err = sh.Put(t.obj, t.objBin) + if err != nil { + if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, common.ErrReadOnly) || + errors.Is(err, common.ErrNoSpace) { + sh.engine.log.Warn("could not put object to shard", zap.Stringer("shard_id", id), - zap.Error(putError)) - return - } - - e.reportShardError(sh, "could not put object to shard", putError) - return - } - }) - if err != nil { - e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err)) - close(exitCh) - return err - } - - <-exitCh - - if alreadyExists { - return errExists - } - - return putError -} - -func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte) (bool, bool) { - const putCooldown = 100 * time.Millisecond - var ( - overloaded bool - ticker = time.NewTicker(putCooldown) - timer = time.NewTimer(e.objectPutTimeout) - ) - - for { - select { - case <-timer.C: - e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.objectPutTimeout)) - return false, overloaded - case <-ticker.C: - err := e.putToShard(sh, ind, pool, addr, obj, objBin) - if errors.Is(err, errOverloaded) { - overloaded = true - ticker.Reset(putCooldown) - continue + zap.Error(err)) + } else { + sh.engine.reportShardError(*sh, "could not put object to shard", err) } - - return err == nil || errors.Is(err, errExists), false } + t.retCh <- err } } @@ -248,8 +175,6 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool ut func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) error { var ( successCount int - pool util.WorkerPool - ok bool allShards = e.unsortedShards() addr = object.AddressOf(obj) lastError error @@ -262,16 +187,7 @@ func (e *StorageEngine) broadcastObject(obj *objectSDK.Object, objBin []byte) er zap.Int("shard_count", len(allShards))) for i, sh := range allShards { - e.mtx.RLock() - pool, ok = e.shardPools[sh.ID().String()] - e.mtx.RUnlock() - - if !ok { - // Shard was concurrently removed, skip. - continue - } - - err := e.putToShard(sh, i, pool, addr, obj, objBin) + err := e.putToShard(sh, i, addr, obj, objBin) if err == nil || errors.Is(err, errExists) { successCount++ if errors.Is(err, errExists) { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 5f6903be01..ce0dedce64 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -2,6 +2,8 @@ package engine import ( "fmt" + "maps" + "slices" "sync/atomic" "github.com/google/uuid" @@ -10,7 +12,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -115,22 +116,23 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { e.mtx.Lock() defer e.mtx.Unlock() - pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) - if err != nil { - return fmt.Errorf("could not create pool: %w", err) - } - strID := sh.ID().String() if _, ok := e.shards[strID]; ok { return fmt.Errorf("shard with id %s was already added", strID) } - e.shards[strID] = shardWrapper{ + var shw = shardWrapper{ errorCount: new(atomic.Uint32), Shard: sh, + putCh: make(chan putTask), + engine: e, } - e.shardPools[strID] = pool + for range e.shardPoolSize { + go shw.shardPutThread() + } + + e.shards[strID] = shw return nil } @@ -154,12 +156,6 @@ func (e *StorageEngine) removeShards(ids ...string) { ss = append(ss, sh) delete(e.shards, id) - pool, ok := e.shardPools[id] - if ok { - pool.Release() - delete(e.shardPools, id) - } - e.log.Info("shard has been removed", zap.String("id", id)) } @@ -173,6 +169,7 @@ func (e *StorageEngine) removeShards(ids ...string) { zap.Error(err), ) } + close(sh.putCh) } } @@ -206,13 +203,7 @@ func (e *StorageEngine) unsortedShards() []shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() - shards := make([]shardWrapper, 0, len(e.shards)) - - for _, sh := range e.shards { - shards = append(shards, sh) - } - - return shards + return slices.Collect(maps.Values(e.shards)) } func (e *StorageEngine) getShard(id string) shardWrapper { diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go index 67a006b5ab..c9bcb500e2 100644 --- a/pkg/local_object_storage/engine/shards_test.go +++ b/pkg/local_object_storage/engine/shards_test.go @@ -16,7 +16,6 @@ func TestRemoveShard(t *testing.T) { os.RemoveAll(t.Name()) }) - require.Equal(t, numOfShards, len(e.shardPools)) require.Equal(t, numOfShards, len(e.shards)) removedNum := numOfShards / 2 @@ -36,7 +35,6 @@ func TestRemoveShard(t *testing.T) { } } - require.Equal(t, numOfShards-removedNum, len(e.shardPools)) require.Equal(t, numOfShards-removedNum, len(e.shards)) for id, removed := range mSh {