diff --git a/config/instance.go b/config/instance.go index cba8a03..59de60b 100644 --- a/config/instance.go +++ b/config/instance.go @@ -78,6 +78,8 @@ const ( /* 1 GB per second */ DefaultStorageRateLimit = 1024 * 1024 * 1024 + + DefaultFileChunkPerSec = 1000 ) func EmbedDefaults(cfgInstance *Instance) { @@ -108,6 +110,10 @@ func EmbedDefaults(cfgInstance *Instance) { if cfgInstance.MetricsPort == 0 { cfgInstance.MetricsPort = DefaultMetricsPort } + + if cfgInstance.VacuumCnf.FileChunkPerSec == 0 { + cfgInstance.VacuumCnf.FileChunkPerSec = DefaultFileChunkPerSec + } cfgInstance.YezzeyRestoreParanoid = false } diff --git a/config/vacuum.go b/config/vacuum.go index d8617ba..4076896 100644 --- a/config/vacuum.go +++ b/config/vacuum.go @@ -1,5 +1,6 @@ package config type Vacuum struct { - CheckBackup bool `json:"check_backup" toml:"check_backup" yaml:"check_backup"` + CheckBackup bool `json:"check_backup" toml:"check_backup" yaml:"check_backup"` + FileChunkPerSec int `json:"file_chunk_per_sec" toml:"file_chunk_per_sec" yaml:"file_chunk_per_sec"` } diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go index 2b94374..fe00f36 100644 --- a/pkg/proc/delete_handler.go +++ b/pkg/proc/delete_handler.go @@ -1,6 +1,7 @@ package proc import ( + "context" "fmt" "path" "strings" @@ -14,6 +15,7 @@ import ( "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" + "golang.org/x/time/rate" ) //go:generate mockgen -destination=../../../test/mocks/mock_object.go -package mocks -build_flags -mod=readonly github.com/wal-g/wal-g/pkg/storages/storage Object @@ -112,12 +114,23 @@ func (dh *BasicGarbageMgr) DeleteGarbageInBucket(bucket string, msg message.Dele return nil } + /* Burst at 20% of vacuum rate capacity. It is pretty arbitrary at this time, + * but its not like something we need config field for... */ + limRate := config.InstanceConfig().VacuumCnf.FileChunkPerSec + limiter := rate.NewLimiter(rate.Limit(limRate), limRate/5) + ctx := context.Background() + var failed []string retryCount := 0 for len(fileList) > 0 && retryCount < 10 { retryCount++ for i := 0; i < len(fileList); i++ { + /* Dont move too fast */ + if err := limiter.Wait(ctx); err != nil { + break + } + if msg.CrazyDrop { ylogger.Zero.Info().Str("bucket", bucket).Str("path", fileList[i]).Msg("simply delete") err = dh.StorageInterractor.DeleteObject(bucket, fileList[i]) @@ -141,6 +154,12 @@ func (dh *BasicGarbageMgr) DeleteGarbageInBucket(bucket string, msg message.Dele } for key, uploadId := range uploads { + + /* Dont move too fast */ + if err := limiter.Wait(ctx); err != nil { + break + } + if err := dh.StorageInterractor.AbortMultipartUpload(bucket, key, uploadId); err != nil { return err } diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 3553766..2c50a9b 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -23,6 +23,7 @@ import ( "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/tablespace" "github.com/yezzey-gp/yproxy/pkg/ylogger" + "golang.org/x/time/rate" ) type S3StorageInteractor struct { @@ -222,7 +223,13 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo ylogger.Zero.Debug().Str("bucket", bucket).Str("bucket", bucket).Msg("listing bucket") + /* Use limiter to list files not too fast. */ + limRate := config.InstanceConfig().VacuumCnf.FileChunkPerSec + limiter := rate.NewLimiter(rate.Limit(limRate), limRate/5) + ctx := context.Background() + for { + input := &s3.ListObjectsV2Input{ Bucket: &bucket, Prefix: aws.String(prefix), @@ -235,6 +242,11 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo return nil, err } + /* Dont move too fast */ + if err := limiter.WaitN(ctx, len(out.Contents)); err != nil { + break + } + for _, obj := range out.Contents { path := *obj.Key