Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions pkg/local_object_storage/engine/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,16 @@ func (e *StorageEngine) close(releasePools bool) error {
e.mtx.RLock()
defer e.mtx.RUnlock()

if releasePools {
for _, p := range e.shardPools {
p.Release()
}
}

for id, sh := range e.shards {
if err := sh.Close(); err != nil {
e.log.Debug("could not close shard",
zap.String("id", id),
zap.Error(err),
)
}
if releasePools {
close(sh.putCh)
}
}

return nil
Expand Down Expand Up @@ -201,9 +198,6 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
}

e.mtx.RLock()
for _, pool := range e.shardPools {
pool.Tune(int(e.shardPoolSize))
}

var shardsToRemove []string // shards IDs
var shardsToAdd []string // shard config identifiers (blobstor paths concatenation)
Expand Down
4 changes: 0 additions & 4 deletions pkg/local_object_storage/engine/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ func TestReload(t *testing.T) {

// no new paths => no new shards
require.Equal(t, shardNum, len(e.shards))
require.Equal(t, shardNum, len(e.shardPools))

newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum))

Expand All @@ -222,7 +221,6 @@ func TestReload(t *testing.T) {
require.NoError(t, e.Reload(rcfg))

require.Equal(t, shardNum+1, len(e.shards))
require.Equal(t, shardNum+1, len(e.shardPools))
})

t.Run("remove shards", func(t *testing.T) {
Expand All @@ -240,7 +238,6 @@ func TestReload(t *testing.T) {

// removed one
require.Equal(t, shardNum-1, len(e.shards))
require.Equal(t, shardNum-1, len(e.shardPools))
})
}

Expand Down Expand Up @@ -268,7 +265,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
}

require.Equal(t, num, len(e.shards))
require.Equal(t, num, len(e.shardPools))

require.NoError(t, e.Open())
require.NoError(t, e.Init())
Expand Down
24 changes: 15 additions & 9 deletions pkg/local_object_storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
"github.com/nspcc-dev/neofs-node/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
Expand All @@ -30,8 +30,6 @@ type StorageEngine struct {

shards map[string]shardWrapper

shardPools map[string]util.WorkerPool

closeCh chan struct{}
setModeCh chan setModeRequest
wg sync.WaitGroup
Expand All @@ -53,10 +51,19 @@ type shardInterface interface {
HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error)
}

type putTask struct {
addr oid.Address
obj *objectSDK.Object
objBin []byte
retCh chan error
}

type shardWrapper struct {
errorCount *atomic.Uint32
*shard.Shard
shardIface shardInterface // TODO: make Shard a shardInterface
putCh chan putTask
engine *StorageEngine
}

type setModeRequest struct {
Expand Down Expand Up @@ -242,12 +249,11 @@ func New(opts ...Option) *StorageEngine {
}

return &StorageEngine{
cfg: c,
mtx: new(sync.RWMutex),
shards: make(map[string]shardWrapper),
shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
cfg: c,
mtx: new(sync.RWMutex),
shards: make(map[string]shardWrapper),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),

sortShardsFn: (*StorageEngine).sortedShards,
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/local_object_storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/nspcc-dev/tzhash/tz"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -87,18 +86,11 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
engine := New()

for _, s := range shards {
pool, err := ants.NewPool(10, ants.WithNonblocking(true))
err := engine.addShard(s)
if err != nil {
panic(err)
}

engine.shards[s.ID().String()] = shardWrapper{
errorCount: new(atomic.Uint32),
Shard: s,
}
engine.shardPools[s.ID().String()] = pool
}

return engine
}

Expand Down
22 changes: 6 additions & 16 deletions pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@ package engine
import (
"errors"
"fmt"
"maps"
"slices"

"github.com/nspcc-dev/hrw/v2"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

const defaultEvacuateBatchSize = 100

type pooledShard struct {
shardWrapper
pool util.WorkerPool
}

var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")

// Evacuate moves data from a set of given shards to other shards available to
Expand Down Expand Up @@ -60,13 +56,7 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards {
shards = append(shards, pooledShard{
shardWrapper: e.shards[id],
pool: e.shardPools[id],
})
}
shards := slices.Collect(maps.Values(e.shards))
e.mtx.RUnlock()

shardMap := make(map[string]*shard.Shard)
Expand Down Expand Up @@ -115,9 +105,9 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, obj, nil)
if putDone || exists {
if putDone {
err = e.putToShard(shards[j], j, addr, obj, nil)
if err == nil || errors.Is(err, errExists) {
if !errors.Is(err, errExists) {
e.log.Debug("object is moved to another shard",
zap.String("from", sidList[n]),
zap.Stringer("to", shards[j].ID()),
Expand Down
1 change: 0 additions & 1 deletion pkg/local_object_storage/engine/evacuate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func TestEvacuateShard(t *testing.T) {

e.mtx.Lock()
delete(e.shards, evacuateShardID)
delete(e.shardPools, evacuateShardID)
e.mtx.Unlock()

checkHasObjects(t)
Expand Down
Loading
Loading