Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ const (

/* 1 GB per second */
DefaultStorageRateLimit = 1024 * 1024 * 1024

DefaultFileChunkPerSec = 1000
)

func EmbedDefaults(cfgInstance *Instance) {
Expand Down Expand Up @@ -108,6 +110,10 @@ func EmbedDefaults(cfgInstance *Instance) {
if cfgInstance.MetricsPort == 0 {
cfgInstance.MetricsPort = DefaultMetricsPort
}

if cfgInstance.VacuumCnf.FileChunkPerSec == 0 {
cfgInstance.VacuumCnf.FileChunkPerSec = DefaultFileChunkPerSec
}
cfgInstance.YezzeyRestoreParanoid = false
}

Expand Down
3 changes: 2 additions & 1 deletion config/vacuum.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package config

type Vacuum struct {
CheckBackup bool `json:"check_backup" toml:"check_backup" yaml:"check_backup"`
CheckBackup bool `json:"check_backup" toml:"check_backup" yaml:"check_backup"`
FileChunkPerSec int `json:"file_chunk_per_sec" toml:"file_chunk_per_sec" yaml:"file_chunk_per_sec"`
}
19 changes: 19 additions & 0 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proc

import (
"context"
"fmt"
"path"
"strings"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/yezzey-gp/yproxy/pkg/object"
"github.com/yezzey-gp/yproxy/pkg/storage"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
"golang.org/x/time/rate"
)

//go:generate mockgen -destination=../../../test/mocks/mock_object.go -package mocks -build_flags -mod=readonly github.com/wal-g/wal-g/pkg/storages/storage Object
Expand Down Expand Up @@ -112,12 +114,23 @@ func (dh *BasicGarbageMgr) DeleteGarbageInBucket(bucket string, msg message.Dele
return nil
}

/* Burst at 20% of vacuum rate capacity. It is pretty arbitrary at this time,
* but its not like something we need config field for... */
limRate := config.InstanceConfig().VacuumCnf.FileChunkPerSec
limiter := rate.NewLimiter(rate.Limit(limRate), limRate/5)
ctx := context.Background()

var failed []string
retryCount := 0
for len(fileList) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(fileList); i++ {

/* Dont move too fast */
if err := limiter.Wait(ctx); err != nil {
break
}

if msg.CrazyDrop {
ylogger.Zero.Info().Str("bucket", bucket).Str("path", fileList[i]).Msg("simply delete")
err = dh.StorageInterractor.DeleteObject(bucket, fileList[i])
Expand All @@ -141,6 +154,12 @@ func (dh *BasicGarbageMgr) DeleteGarbageInBucket(bucket string, msg message.Dele
}

for key, uploadId := range uploads {

/* Dont move too fast */
if err := limiter.Wait(ctx); err != nil {
break
}

if err := dh.StorageInterractor.AbortMultipartUpload(bucket, key, uploadId); err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/yezzey-gp/yproxy/pkg/settings"
"github.com/yezzey-gp/yproxy/pkg/tablespace"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
"golang.org/x/time/rate"
)

type S3StorageInteractor struct {
Expand Down Expand Up @@ -222,7 +223,13 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo

ylogger.Zero.Debug().Str("bucket", bucket).Str("bucket", bucket).Msg("listing bucket")

/* Use limiter to list files not too fast. */
limRate := config.InstanceConfig().VacuumCnf.FileChunkPerSec
limiter := rate.NewLimiter(rate.Limit(limRate), limRate/5)
ctx := context.Background()

for {

input := &s3.ListObjectsV2Input{
Bucket: &bucket,
Prefix: aws.String(prefix),
Expand All @@ -235,6 +242,11 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo
return nil, err
}

/* Dont move too fast */
if err := limiter.WaitN(ctx, len(out.Contents)); err != nil {
break
}

for _, obj := range out.Contents {
path := *obj.Key

Expand Down
Loading