diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 6764a06..785f6df 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -2,10 +2,12 @@ package proc import ( "context" + "errors" "fmt" "io" "strings" "sync" + "syscall" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/backups" @@ -65,7 +67,11 @@ func ProcessCatExtended( n, err := io.Copy(ycl.GetRW(), contentReader) if err != nil { - ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") + if errors.Is(err, syscall.EPIPE) || errors.Is(err, io.ErrClosedPipe) { + ylogger.Zero.Warn().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("client disconnected during cat") + } else { + ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") + } return err } ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object") diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index 8e44094..653cf34 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -65,7 +65,7 @@ func (y *YRestartReader) Restart(offsetStart int64) error { if offsetStart == 0 { ylogger.Zero.Debug().Str("object-path", y.name).Msg("cat object with offset") } else { - ylogger.Zero.Error().Str("object-path", y.name).Int64("offset", offsetStart).Msg("cat object with offset after possible error") + ylogger.Zero.Warn().Str("object-path", y.name).Int64("offset", offsetStart).Msg("cat object: restarting read from offset after transient error") } r, err := y.s.CatFileFromStorage(y.name, offsetStart, y.settings) if err != nil { @@ -105,6 +105,7 @@ func (y *YproxyRetryReader) Close() error { // Read implements io.ReadCloser. func (y *YproxyRetryReader) Read(p []byte) (int, error) { + var lastErr error for retry := range y.retryLimit { @@ -116,6 +117,7 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { // log error and continue. // Try to mitigate overload problems with random sleep ylogger.Zero.Error().Err(err).Int("offset reached", int(y.offsetReached)).Int("retry count", int(retry)).Msg("failed to reacquire external storage connection, wait and retry") + lastErr = err time.Sleep(time.Second) continue @@ -134,6 +136,9 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { if err != nil || n < 0 { metrics.ReadReqErrors.Inc() ylogger.Zero.Error().Err(err).Int64("offset reached", y.offsetReached).Int("bytes half-read", n).Int("retry count", int(retry)).Msg("encounter read error") + if err != nil { + lastErr = err + } if n > 0 { y.offsetReached += int64(n) @@ -155,7 +160,10 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { return n, err } } - return -1, fmt.Errorf("failed to upload within retries") + if lastErr != nil { + return -1, lastErr + } + return -1, fmt.Errorf("failed to read within retries: no error captured") } const ( diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 2c50a9b..81b242e 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -36,6 +36,17 @@ type S3StorageInteractor struct { multipartUploads sync.Map } +func (s *S3StorageInteractor) getCredentials(bucket string) (config.StorageCredentials, error) { + cr, ok := s.credentialMap[bucket] + if !ok { + return cr, fmt.Errorf("no credentials configured for bucket %q; check credential_map in config", bucket) + } + if cr.AccessKeyId == "" || cr.SecretAccessKey == "" { + ylogger.Zero.Warn().Str("bucket", bucket).Msg("credentials for bucket have empty access_key_id or secret_access_key, will fall back to ambient credentials (env/IAM)") + } + return cr, nil +} + // ListBuckets implements StorageInteractor. func (s *S3StorageInteractor) ListBuckets() []string { keys := []string{} @@ -66,8 +77,10 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, sett return nil, err } - // XXX: fix this - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return nil, err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -115,7 +128,10 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ return err } - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -160,13 +176,14 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[s.cnf.StorageBucket] - + cr, err := s.getCredentials(s.cnf.StorageBucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil + return err } objectPath := strings.TrimLeft(path.Join(s.cnf.StoragePrefix, name), "/") @@ -209,8 +226,10 @@ func (s *S3StorageInteractor) ListPath(prefix string, useCache bool, settings [] func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache bool) ([]*object.ObjectInfo, error) { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return nil, err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -286,8 +305,10 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo func (s *S3StorageInteractor) DeleteObject(bucket, key string) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -316,8 +337,10 @@ func (s *S3StorageInteractor) DeleteObject(bucket, key string) error { func (s *S3StorageInteractor) SScopyObject(from, to, fromStoragePrefix, fromStorageBucket, toStorageBucket string) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[toStorageBucket] + cr, err := s.getCredentials(toStorageBucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -370,8 +393,10 @@ func (s *S3StorageInteractor) CopyObject(bucket, from, to, fromStoragePrefix, fr func (s *S3StorageInteractor) AbortMultipartUpload(bucket, key, uploadId string) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { return err @@ -386,8 +411,10 @@ func (s *S3StorageInteractor) AbortMultipartUpload(bucket, key, uploadId string) } func (s *S3StorageInteractor) ListFailedMultipartUploads(bucket string) (map[string]string, error) { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return nil, err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { return nil, err