Skip to content
8 changes: 7 additions & 1 deletion pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package proc

import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"syscall"

"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/backups"
Expand Down Expand Up @@ -65,7 +67,11 @@ func ProcessCatExtended(

n, err := io.Copy(ycl.GetRW(), contentReader)
if err != nil {
ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object")
if errors.Is(err, syscall.EPIPE) || errors.Is(err, io.ErrClosedPipe) {
ylogger.Zero.Warn().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("client disconnected during cat")
} else {
ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object")
}
return err
}
ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object")
Expand Down
12 changes: 10 additions & 2 deletions pkg/proc/yio/yrreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (y *YRestartReader) Restart(offsetStart int64) error {
if offsetStart == 0 {
ylogger.Zero.Debug().Str("object-path", y.name).Msg("cat object with offset")
} else {
ylogger.Zero.Error().Str("object-path", y.name).Int64("offset", offsetStart).Msg("cat object with offset after possible error")
ylogger.Zero.Warn().Str("object-path", y.name).Int64("offset", offsetStart).Msg("cat object: restarting read from offset after transient error")
}
r, err := y.s.CatFileFromStorage(y.name, offsetStart, y.settings)
if err != nil {
Expand Down Expand Up @@ -105,6 +105,7 @@ func (y *YproxyRetryReader) Close() error {

// Read implements io.ReadCloser.
func (y *YproxyRetryReader) Read(p []byte) (int, error) {
var lastErr error

for retry := range y.retryLimit {

Expand All @@ -116,6 +117,7 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {
// log error and continue.
// Try to mitigate overload problems with random sleep
ylogger.Zero.Error().Err(err).Int("offset reached", int(y.offsetReached)).Int("retry count", int(retry)).Msg("failed to reacquire external storage connection, wait and retry")
lastErr = err

time.Sleep(time.Second)
continue
Expand All @@ -134,6 +136,9 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {
if err != nil || n < 0 {
metrics.ReadReqErrors.Inc()
ylogger.Zero.Error().Err(err).Int64("offset reached", y.offsetReached).Int("bytes half-read", n).Int("retry count", int(retry)).Msg("encounter read error")
if err != nil {
lastErr = err
}

if n > 0 {
y.offsetReached += int64(n)
Expand All @@ -155,7 +160,10 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {
return n, err
}
}
return -1, fmt.Errorf("failed to upload within retries")
if lastErr != nil {
return -1, lastErr
}
return -1, fmt.Errorf("failed to read within retries: no error captured")
}

const (
Expand Down
61 changes: 44 additions & 17 deletions pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ type S3StorageInteractor struct {
multipartUploads sync.Map
}

func (s *S3StorageInteractor) getCredentials(bucket string) (config.StorageCredentials, error) {
cr, ok := s.credentialMap[bucket]
if !ok {
return cr, fmt.Errorf("no credentials configured for bucket %q; check credential_map in config", bucket)
}
if cr.AccessKeyId == "" || cr.SecretAccessKey == "" {
ylogger.Zero.Warn().Str("bucket", bucket).Msg("credentials for bucket have empty access_key_id or secret_access_key, will fall back to ambient credentials (env/IAM)")
}
return cr, nil
}

// ListBuckets implements StorageInteractor.
func (s *S3StorageInteractor) ListBuckets() []string {
keys := []string{}
Expand Down Expand Up @@ -66,8 +77,10 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, sett
return nil, err
}

// XXX: fix this
cr := s.credentialMap[bucket]
cr, err := s.getCredentials(bucket)
if err != nil {
return nil, err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
Expand Down Expand Up @@ -115,7 +128,10 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [
return err
}

cr := s.credentialMap[bucket]
cr, err := s.getCredentials(bucket)
if err != nil {
return err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
Expand Down Expand Up @@ -160,13 +176,14 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [

func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error {

/* XXX: fix usage of default bucket */
cr := s.credentialMap[s.cnf.StorageBucket]

cr, err := s.getCredentials(s.cnf.StorageBucket)
if err != nil {
return err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
return nil
return err
}

objectPath := strings.TrimLeft(path.Join(s.cnf.StoragePrefix, name), "/")
Expand Down Expand Up @@ -209,8 +226,10 @@ func (s *S3StorageInteractor) ListPath(prefix string, useCache bool, settings []

func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache bool) ([]*object.ObjectInfo, error) {

/* XXX: fix usage of default bucket */
cr := s.credentialMap[bucket]
cr, err := s.getCredentials(bucket)
if err != nil {
return nil, err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
Expand Down Expand Up @@ -286,8 +305,10 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo

func (s *S3StorageInteractor) DeleteObject(bucket, key string) error {

/* XXX: fix usage of default bucket */
cr := s.credentialMap[bucket]
cr, err := s.getCredentials(bucket)
if err != nil {
return err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
Expand Down Expand Up @@ -316,8 +337,10 @@ func (s *S3StorageInteractor) DeleteObject(bucket, key string) error {

func (s *S3StorageInteractor) SScopyObject(from, to, fromStoragePrefix, fromStorageBucket, toStorageBucket string) error {

/* XXX: fix usage of default bucket */
cr := s.credentialMap[toStorageBucket]
cr, err := s.getCredentials(toStorageBucket)
if err != nil {
return err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
Expand Down Expand Up @@ -370,8 +393,10 @@ func (s *S3StorageInteractor) CopyObject(bucket, from, to, fromStoragePrefix, fr

func (s *S3StorageInteractor) AbortMultipartUpload(bucket, key, uploadId string) error {

/* XXX: fix usage of default bucket */
cr := s.credentialMap[bucket]
cr, err := s.getCredentials(bucket)
if err != nil {
return err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
return err
Expand All @@ -386,8 +411,10 @@ func (s *S3StorageInteractor) AbortMultipartUpload(bucket, key, uploadId string)
}

func (s *S3StorageInteractor) ListFailedMultipartUploads(bucket string) (map[string]string, error) {
/* XXX: fix usage of default bucket */
cr := s.credentialMap[bucket]
cr, err := s.getCredentials(bucket)
if err != nil {
return nil, err
}
sess, err := s.pool.GetSession(context.TODO(), &cr)
if err != nil {
return nil, err
Expand Down
Loading