From 6231a1e53823d8aaa48d015fd31456907a267cb0 Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Thu, 7 May 2026 13:48:54 +0300 Subject: [PATCH 01/11] Fix yproxy error messages --- pkg/proc/interaction.go | 125 +++++++++++++++++++++++++++++++++------ pkg/proc/yio/yrreader.go | 17 +++++- pkg/storage/s3storage.go | 96 ++++++++++++++++++++++-------- 3 files changed, 194 insertions(+), 44 deletions(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 6764a06..a359ddf 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -2,10 +2,13 @@ package proc import ( "context" + "errors" "fmt" "io" "strings" "sync" + "syscall" + "time" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/backups" @@ -21,6 +24,8 @@ import ( "golang.org/x/sync/semaphore" ) +const catDecryptRetryLimit = 10 + func ProcessCatExtended( s storage.StorageInteractor, pr *ProtoReader, @@ -29,32 +34,116 @@ func ProcessCatExtended( ycl.SetExternalFilePath(name) - yr := yio.NewYRetryReader(yio.NewRestartReader(s, name, settings), ycl) + if kek { + err := fmt.Errorf("KEK is currently unsupported") + ylogger.Zero.Error().Err(err).Msg("cat failed") + // return err + } - var contentReader io.Reader - contentReader = yr + if decrypt { + // When decryption is active, the retry reader cannot do mid-stream + // restarts because the GPG decryptor has internal cipher state that + // is initialized from the beginning of the encrypted stream. A restart + // from an arbitrary encrypted byte offset would feed raw ciphertext + // into a decryptor expecting continuation bytes, producing garbage. + // + // Instead, we handle retries at this level: on any read error, we + // re-create the entire pipeline (S3 reader → decryptor → discard → + // copy) from scratch. + return processCatDecrypted(s, name, startOffset, settings, cr, ycl) + } + + // Non-decrypt path: the retry reader handles mid-stream restarts correctly + // since there is no cipher state to corrupt. + yr := yio.NewYRetryReader(yio.NewRestartReader(s, name, settings), ycl) defer func() { _ = yr.Close() }() - var err error - if decrypt { - if cr == nil { - err := fmt.Errorf("failed to decrypt object, decrypter not configured") + var contentReader io.Reader = yr - ylogger.Zero.Error().Err(err).Msg("cat failed") + if startOffset != 0 { + if _, err := io.CopyN(io.Discard, contentReader, int64(startOffset)); err != nil { return err } - ylogger.Zero.Debug().Str("object-path", name).Msg("decrypt object") - contentReader, err = cr.Decrypt(yr) - if err != nil { - ylogger.Zero.Error().Err(err).Msg("failed to decrypt object") - return err + } + + n, err := io.Copy(ycl.GetRW(), contentReader) + if err != nil { + if errors.Is(err, syscall.EPIPE) || errors.Is(err, io.ErrClosedPipe) { + ylogger.Zero.Warn().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("client disconnected during cat") + } else { + ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") } + return err } + ylogger.Zero.Debug().Int64("copied bytes", n).Msg("cat object completed") - if kek { - err := fmt.Errorf("KEK is currently unsupported") + return nil +} + +// processCatDecrypted handles the cat operation when decryption is required. +// It retries the entire pipeline (S3 read → decrypt → discard → copy) on +// transient errors, because the GPG decryptor cannot survive mid-stream restarts. +func processCatDecrypted( + s storage.StorageInteractor, + name string, + startOffset uint64, + settings []settings.StorageSettings, + cr crypt.Crypter, + ycl client.YproxyClient, +) error { + if cr == nil { + err := fmt.Errorf("failed to decrypt object, decrypter not configured") ylogger.Zero.Error().Err(err).Msg("cat failed") - // return err + return err + } + + var lastErr error + for attempt := range catDecryptRetryLimit { + err := tryCatDecrypted(s, name, startOffset, settings, cr, ycl) + if err == nil { + return nil + } + + // Client disconnected — not retryable + if errors.Is(err, syscall.EPIPE) || errors.Is(err, io.ErrClosedPipe) { + ylogger.Zero.Warn().Err(err).Uint("client id", ycl.ID()).Msg("client disconnected during encrypted cat") + return err + } + + lastErr = err + ylogger.Zero.Warn().Err(err). + Str("object-path", name). + Int("attempt", attempt). + Msg("encrypted cat failed, retrying entire pipeline") + + time.Sleep(time.Second) + } + + ylogger.Zero.Error().Err(lastErr). + Str("object-path", name). + Int("retry-limit", catDecryptRetryLimit). + Msg("encrypted cat failed after all retries") + return lastErr +} + +// tryCatDecrypted performs a single attempt of: open S3 → decrypt → discard startOffset → copy to client. +func tryCatDecrypted( + s storage.StorageInteractor, + name string, + startOffset uint64, + setts []settings.StorageSettings, + cr crypt.Crypter, + ycl client.YproxyClient, +) error { + // Use no-retry reader: errors propagate up so we can restart the full pipeline + yr := yio.NewYRetryReaderNoRetry(yio.NewRestartReader(s, name, setts), ycl) + defer func() { _ = yr.Close() }() + + ylogger.Zero.Debug().Str("object-path", name).Msg("decrypt object") + contentReader, err := cr.Decrypt(yr) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to decrypt object") + return err } if startOffset != 0 { @@ -65,10 +154,10 @@ func ProcessCatExtended( n, err := io.Copy(ycl.GetRW(), contentReader) if err != nil { - ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") + ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat decrypted object") return err } - ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object") + ylogger.Zero.Debug().Int64("copied bytes", n).Msg("cat decrypted object completed") return nil } diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index 8e44094..f43d8f5 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -65,7 +65,7 @@ func (y *YRestartReader) Restart(offsetStart int64) error { if offsetStart == 0 { ylogger.Zero.Debug().Str("object-path", y.name).Msg("cat object with offset") } else { - ylogger.Zero.Error().Str("object-path", y.name).Int64("offset", offsetStart).Msg("cat object with offset after possible error") + ylogger.Zero.Warn().Str("object-path", y.name).Int64("offset", offsetStart).Msg("cat object: restarting read from offset after transient error") } r, err := y.s.CatFileFromStorage(y.name, offsetStart, y.settings) if err != nil { @@ -172,4 +172,19 @@ func NewYRetryReader(r RestartReader, selfCl client.YproxyClient) io.ReadCloser } } +// NewYRetryReaderNoRetry creates a retry reader that does not retry on errors. +// Use this when the reader is wrapped by a decryptor (e.g. GPG) whose internal +// cipher state cannot survive a mid-stream restart from an arbitrary offset. +// In this case, retries must be handled at a higher level by re-creating the +// entire pipeline (reader + decryptor). +func NewYRetryReaderNoRetry(r RestartReader, selfCl client.YproxyClient) io.ReadCloser { + return &YproxyRetryReader{ + underlying: r, + retryLimit: 1, /* no retries — propagate errors immediately */ + selfCl: selfCl, + offsetReached: 0, + needReacquire: true, + } +} + var _ io.ReadCloser = &YproxyRetryReader{} diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 2c50a9b..07c30b1 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -36,6 +36,26 @@ type S3StorageInteractor struct { multipartUploads sync.Map } +const maxUploadRetries = 3 + +func (s *S3StorageInteractor) getCredentials(bucket string) (config.StorageCredentials, error) { + cr, ok := s.credentialMap[bucket] + if !ok { + return cr, fmt.Errorf("no credentials configured for bucket %q; check credential_map in config", bucket) + } + if cr.AccessKeyId == "" || cr.SecretAccessKey == "" { + ylogger.Zero.Warn().Str("bucket", bucket).Msg("credentials for bucket have empty access_key_id or secret_access_key, will fall back to ambient credentials (env/IAM)") + } + return cr, nil +} + +func isNoSuchUploadError(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "NoSuchUpload") +} + // ListBuckets implements StorageInteractor. func (s *S3StorageInteractor) ListBuckets() []string { keys := []string{} @@ -66,8 +86,10 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, sett return nil, err } - // XXX: fix this - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return nil, err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -115,7 +137,10 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ return err } - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -129,14 +154,24 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ putLen := int(multipartChunkSize) if multipartUpload { s.multipartUploads.Store(objectPath, true) - _, err = up.Upload( - &s3manager.UploadInput{ - Bucket: aws.String(bucket), - Key: aws.String(objectPath), - Body: r, - StorageClass: aws.String(storageClass), - }, - ) + for attempt := 0; attempt < maxUploadRetries; attempt++ { + _, err = up.Upload( + &s3manager.UploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(objectPath), + Body: r, + StorageClass: aws.String(storageClass), + }, + ) + if err == nil { + break + } + if isNoSuchUploadError(err) { + ylogger.Zero.Warn().Str("name", name).Int("attempt", attempt).Err(err).Msg("multipart upload ID expired, retrying full upload") + continue + } + break // non-retryable error + } s.multipartUploads.Delete(objectPath) } else { var body []byte @@ -160,13 +195,14 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[s.cnf.StorageBucket] - + cr, err := s.getCredentials(s.cnf.StorageBucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil + return err } objectPath := strings.TrimLeft(path.Join(s.cnf.StoragePrefix, name), "/") @@ -209,8 +245,10 @@ func (s *S3StorageInteractor) ListPath(prefix string, useCache bool, settings [] func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache bool) ([]*object.ObjectInfo, error) { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return nil, err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -286,8 +324,10 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo func (s *S3StorageInteractor) DeleteObject(bucket, key string) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -316,8 +356,10 @@ func (s *S3StorageInteractor) DeleteObject(bucket, key string) error { func (s *S3StorageInteractor) SScopyObject(from, to, fromStoragePrefix, fromStorageBucket, toStorageBucket string) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[toStorageBucket] + cr, err := s.getCredentials(toStorageBucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -370,8 +412,10 @@ func (s *S3StorageInteractor) CopyObject(bucket, from, to, fromStoragePrefix, fr func (s *S3StorageInteractor) AbortMultipartUpload(bucket, key, uploadId string) error { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { return err @@ -386,8 +430,10 @@ func (s *S3StorageInteractor) AbortMultipartUpload(bucket, key, uploadId string) } func (s *S3StorageInteractor) ListFailedMultipartUploads(bucket string) (map[string]string, error) { - /* XXX: fix usage of default bucket */ - cr := s.credentialMap[bucket] + cr, err := s.getCredentials(bucket) + if err != nil { + return nil, err + } sess, err := s.pool.GetSession(context.TODO(), &cr) if err != nil { return nil, err From 2e353b4208df9e7b8171e02f59aa1c7c59b1d4b4 Mon Sep 17 00:00:00 2001 From: Leonid <63977577+leborchuk@users.noreply.github.com> Date: Thu, 7 May 2026 14:07:30 +0300 Subject: [PATCH 02/11] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- pkg/proc/interaction.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index a359ddf..d517c67 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -37,7 +37,7 @@ func ProcessCatExtended( if kek { err := fmt.Errorf("KEK is currently unsupported") ylogger.Zero.Error().Err(err).Msg("cat failed") - // return err + return err } if decrypt { From c9ffaba50ed72820bf31ef5215714506f865b900 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 11:11:37 +0000 Subject: [PATCH 03/11] Fix multipart retry for non-seekable upload readers Agent-Logs-Url: https://github.com/open-gpdb/yproxy/sessions/035ec91b-e699-4190-9305-a4dbdf19c0a3 Co-authored-by: leborchuk <63977577+leborchuk@users.noreply.github.com> --- pkg/storage/s3storage.go | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 07c30b1..3e67d79 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -154,7 +154,21 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ putLen := int(multipartChunkSize) if multipartUpload { s.multipartUploads.Store(objectPath, true) - for attempt := 0; attempt < maxUploadRetries; attempt++ { + defer s.multipartUploads.Delete(objectPath) + + seeker, canRetry := r.(io.Seeker) + attempts := 1 + if canRetry { + attempts = maxUploadRetries + } + + for attempt := 0; attempt < attempts; attempt++ { + if canRetry { + if _, seekErr := seeker.Seek(0, io.SeekStart); seekErr != nil { + return fmt.Errorf("failed to reset multipart upload reader: %w", seekErr) + } + } + _, err = up.Upload( &s3manager.UploadInput{ Bucket: aws.String(bucket), @@ -163,16 +177,17 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ StorageClass: aws.String(storageClass), }, ) - if err == nil { + if err == nil || !isNoSuchUploadError(err) { break } - if isNoSuchUploadError(err) { - ylogger.Zero.Warn().Str("name", name).Int("attempt", attempt).Err(err).Msg("multipart upload ID expired, retrying full upload") - continue + + if !canRetry { + ylogger.Zero.Warn().Str("name", name).Err(err).Msg("multipart upload ID expired, cannot retry because upload source is not seekable") + break } - break // non-retryable error + + ylogger.Zero.Warn().Str("name", name).Int("attempt", attempt+1).Err(err).Msg("multipart upload ID expired, retrying full upload") } - s.multipartUploads.Delete(objectPath) } else { var body []byte body, err = io.ReadAll(r) From 673e594f8fa33b20f932932dbddfe6a07a2c23d2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 11:18:07 +0000 Subject: [PATCH 04/11] Propagate underlying read errors when retries are exhausted Agent-Logs-Url: https://github.com/open-gpdb/yproxy/sessions/bc2c617f-667c-4fef-a3be-ea84684a3d43 Co-authored-by: leborchuk <63977577+leborchuk@users.noreply.github.com> --- pkg/proc/yio/yrreader.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index f43d8f5..2786b21 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -105,6 +105,7 @@ func (y *YproxyRetryReader) Close() error { // Read implements io.ReadCloser. func (y *YproxyRetryReader) Read(p []byte) (int, error) { + var lastErr error for retry := range y.retryLimit { @@ -116,6 +117,7 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { // log error and continue. // Try to mitigate overload problems with random sleep ylogger.Zero.Error().Err(err).Int("offset reached", int(y.offsetReached)).Int("retry count", int(retry)).Msg("failed to reacquire external storage connection, wait and retry") + lastErr = err time.Sleep(time.Second) continue @@ -134,6 +136,9 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { if err != nil || n < 0 { metrics.ReadReqErrors.Inc() ylogger.Zero.Error().Err(err).Int64("offset reached", y.offsetReached).Int("bytes half-read", n).Int("retry count", int(retry)).Msg("encounter read error") + if err != nil { + lastErr = err + } if n > 0 { y.offsetReached += int64(n) @@ -155,6 +160,9 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { return n, err } } + if lastErr != nil { + return -1, lastErr + } return -1, fmt.Errorf("failed to upload within retries") } From e1ea52258d1c674ae898aa4703df314ead26261f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 11:19:28 +0000 Subject: [PATCH 05/11] Clarify retry-reader fallback error message Agent-Logs-Url: https://github.com/open-gpdb/yproxy/sessions/bc2c617f-667c-4fef-a3be-ea84684a3d43 Co-authored-by: leborchuk <63977577+leborchuk@users.noreply.github.com> --- pkg/proc/yio/yrreader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index 2786b21..2100b3a 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -163,7 +163,7 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { if lastErr != nil { return -1, lastErr } - return -1, fmt.Errorf("failed to upload within retries") + return -1, fmt.Errorf("failed to read within retries") } const ( From 034517cd851b1a31a3d239c2c2d631534400beec Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 11:20:23 +0000 Subject: [PATCH 06/11] Add context to retry-reader fallback error Agent-Logs-Url: https://github.com/open-gpdb/yproxy/sessions/bc2c617f-667c-4fef-a3be-ea84684a3d43 Co-authored-by: leborchuk <63977577+leborchuk@users.noreply.github.com> --- pkg/proc/yio/yrreader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index 2100b3a..0237f08 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -163,7 +163,7 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) { if lastErr != nil { return -1, lastErr } - return -1, fmt.Errorf("failed to read within retries") + return -1, fmt.Errorf("failed to read within retries: no error captured") } const ( From 308adac8c5c07018d2eab2fe726c0361cdb14c5e Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Thu, 7 May 2026 14:23:38 +0300 Subject: [PATCH 07/11] Do not fail on KEK --- pkg/proc/interaction.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index d517c67..e4f45bf 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -37,7 +37,8 @@ func ProcessCatExtended( if kek { err := fmt.Errorf("KEK is currently unsupported") ylogger.Zero.Error().Err(err).Msg("cat failed") - return err + // I do not know if we should actually fail here, just log error + // return err } if decrypt { From 085c0d9c72e0c8e07878ed040ca58a450fa931af Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Thu, 7 May 2026 16:52:19 +0300 Subject: [PATCH 08/11] Revert back behaviour with decrypt --- pkg/proc/interaction.go | 126 ++++++--------------------------------- pkg/proc/yio/yrreader.go | 17 ------ 2 files changed, 18 insertions(+), 125 deletions(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index e4f45bf..6764a06 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -2,13 +2,10 @@ package proc import ( "context" - "errors" "fmt" "io" "strings" "sync" - "syscall" - "time" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/backups" @@ -24,8 +21,6 @@ import ( "golang.org/x/sync/semaphore" ) -const catDecryptRetryLimit = 10 - func ProcessCatExtended( s storage.StorageInteractor, pr *ProtoReader, @@ -34,117 +29,32 @@ func ProcessCatExtended( ycl.SetExternalFilePath(name) - if kek { - err := fmt.Errorf("KEK is currently unsupported") - ylogger.Zero.Error().Err(err).Msg("cat failed") - // I do not know if we should actually fail here, just log error - // return err - } - - if decrypt { - // When decryption is active, the retry reader cannot do mid-stream - // restarts because the GPG decryptor has internal cipher state that - // is initialized from the beginning of the encrypted stream. A restart - // from an arbitrary encrypted byte offset would feed raw ciphertext - // into a decryptor expecting continuation bytes, producing garbage. - // - // Instead, we handle retries at this level: on any read error, we - // re-create the entire pipeline (S3 reader → decryptor → discard → - // copy) from scratch. - return processCatDecrypted(s, name, startOffset, settings, cr, ycl) - } - - // Non-decrypt path: the retry reader handles mid-stream restarts correctly - // since there is no cipher state to corrupt. yr := yio.NewYRetryReader(yio.NewRestartReader(s, name, settings), ycl) + + var contentReader io.Reader + contentReader = yr defer func() { _ = yr.Close() }() + var err error - var contentReader io.Reader = yr + if decrypt { + if cr == nil { + err := fmt.Errorf("failed to decrypt object, decrypter not configured") - if startOffset != 0 { - if _, err := io.CopyN(io.Discard, contentReader, int64(startOffset)); err != nil { + ylogger.Zero.Error().Err(err).Msg("cat failed") return err } - } - - n, err := io.Copy(ycl.GetRW(), contentReader) - if err != nil { - if errors.Is(err, syscall.EPIPE) || errors.Is(err, io.ErrClosedPipe) { - ylogger.Zero.Warn().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("client disconnected during cat") - } else { - ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") - } - return err - } - ylogger.Zero.Debug().Int64("copied bytes", n).Msg("cat object completed") - - return nil -} - -// processCatDecrypted handles the cat operation when decryption is required. -// It retries the entire pipeline (S3 read → decrypt → discard → copy) on -// transient errors, because the GPG decryptor cannot survive mid-stream restarts. -func processCatDecrypted( - s storage.StorageInteractor, - name string, - startOffset uint64, - settings []settings.StorageSettings, - cr crypt.Crypter, - ycl client.YproxyClient, -) error { - if cr == nil { - err := fmt.Errorf("failed to decrypt object, decrypter not configured") - ylogger.Zero.Error().Err(err).Msg("cat failed") - return err - } - - var lastErr error - for attempt := range catDecryptRetryLimit { - err := tryCatDecrypted(s, name, startOffset, settings, cr, ycl) - if err == nil { - return nil - } - - // Client disconnected — not retryable - if errors.Is(err, syscall.EPIPE) || errors.Is(err, io.ErrClosedPipe) { - ylogger.Zero.Warn().Err(err).Uint("client id", ycl.ID()).Msg("client disconnected during encrypted cat") + ylogger.Zero.Debug().Str("object-path", name).Msg("decrypt object") + contentReader, err = cr.Decrypt(yr) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to decrypt object") return err } - - lastErr = err - ylogger.Zero.Warn().Err(err). - Str("object-path", name). - Int("attempt", attempt). - Msg("encrypted cat failed, retrying entire pipeline") - - time.Sleep(time.Second) } - ylogger.Zero.Error().Err(lastErr). - Str("object-path", name). - Int("retry-limit", catDecryptRetryLimit). - Msg("encrypted cat failed after all retries") - return lastErr -} - -// tryCatDecrypted performs a single attempt of: open S3 → decrypt → discard startOffset → copy to client. -func tryCatDecrypted( - s storage.StorageInteractor, - name string, - startOffset uint64, - setts []settings.StorageSettings, - cr crypt.Crypter, - ycl client.YproxyClient, -) error { - // Use no-retry reader: errors propagate up so we can restart the full pipeline - yr := yio.NewYRetryReaderNoRetry(yio.NewRestartReader(s, name, setts), ycl) - defer func() { _ = yr.Close() }() - - ylogger.Zero.Debug().Str("object-path", name).Msg("decrypt object") - contentReader, err := cr.Decrypt(yr) - if err != nil { - ylogger.Zero.Error().Err(err).Msg("failed to decrypt object") - return err + if kek { + err := fmt.Errorf("KEK is currently unsupported") + ylogger.Zero.Error().Err(err).Msg("cat failed") + // return err } if startOffset != 0 { @@ -155,10 +65,10 @@ func tryCatDecrypted( n, err := io.Copy(ycl.GetRW(), contentReader) if err != nil { - ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat decrypted object") + ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") return err } - ylogger.Zero.Debug().Int64("copied bytes", n).Msg("cat decrypted object completed") + ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object") return nil } diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index 0237f08..7e51ea9 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -179,20 +179,3 @@ func NewYRetryReader(r RestartReader, selfCl client.YproxyClient) io.ReadCloser needReacquire: true, /* do initial storage request */ } } - -// NewYRetryReaderNoRetry creates a retry reader that does not retry on errors. -// Use this when the reader is wrapped by a decryptor (e.g. GPG) whose internal -// cipher state cannot survive a mid-stream restart from an arbitrary offset. -// In this case, retries must be handled at a higher level by re-creating the -// entire pipeline (reader + decryptor). -func NewYRetryReaderNoRetry(r RestartReader, selfCl client.YproxyClient) io.ReadCloser { - return &YproxyRetryReader{ - underlying: r, - retryLimit: 1, /* no retries — propagate errors immediately */ - selfCl: selfCl, - offsetReached: 0, - needReacquire: true, - } -} - -var _ io.ReadCloser = &YproxyRetryReader{} From 7e65569aaa798a3b46c32992fc1d43596443eb55 Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Thu, 7 May 2026 16:58:25 +0300 Subject: [PATCH 09/11] Add delsy to listing --- pkg/storage/s3storage.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 3e67d79..2222d24 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -36,7 +36,10 @@ type S3StorageInteractor struct { multipartUploads sync.Map } -const maxUploadRetries = 3 +const ( + maxUploadRetries = 3 + listingDelay = time.Second / 2 +) func (s *S3StorageInteractor) getCredentials(bucket string) (config.StorageCredentials, error) { cr, ok := s.credentialMap[bucket] From 8c9a2d6d57e0b720d1726c20dd7a6754f84c8660 Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Thu, 7 May 2026 17:06:46 +0300 Subject: [PATCH 10/11] do not report errors about connections closed by client --- pkg/proc/interaction.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 6764a06..785f6df 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -2,10 +2,12 @@ package proc import ( "context" + "errors" "fmt" "io" "strings" "sync" + "syscall" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/backups" @@ -65,7 +67,11 @@ func ProcessCatExtended( n, err := io.Copy(ycl.GetRW(), contentReader) if err != nil { - ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") + if errors.Is(err, syscall.EPIPE) || errors.Is(err, io.ErrClosedPipe) { + ylogger.Zero.Warn().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("client disconnected during cat") + } else { + ylogger.Zero.Error().Err(err).Uint("client id", ycl.ID()).Int64("copied bytes", n).Msg("failed to cat object") + } return err } ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object") From 04a818abe863762a93ed309994fefb35710d6aa2 Mon Sep 17 00:00:00 2001 From: Leonid Borchuk Date: Fri, 8 May 2026 12:52:31 +0300 Subject: [PATCH 11/11] Remove retries in multipart uploader since it retries in aws sdk --- pkg/proc/yio/yrreader.go | 2 ++ pkg/storage/s3storage.go | 55 +++++++--------------------------------- 2 files changed, 11 insertions(+), 46 deletions(-) diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index 7e51ea9..653cf34 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -179,3 +179,5 @@ func NewYRetryReader(r RestartReader, selfCl client.YproxyClient) io.ReadCloser needReacquire: true, /* do initial storage request */ } } + +var _ io.ReadCloser = &YproxyRetryReader{} diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 2222d24..81b242e 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -36,11 +36,6 @@ type S3StorageInteractor struct { multipartUploads sync.Map } -const ( - maxUploadRetries = 3 - listingDelay = time.Second / 2 -) - func (s *S3StorageInteractor) getCredentials(bucket string) (config.StorageCredentials, error) { cr, ok := s.credentialMap[bucket] if !ok { @@ -52,13 +47,6 @@ func (s *S3StorageInteractor) getCredentials(bucket string) (config.StorageCrede return cr, nil } -func isNoSuchUploadError(err error) bool { - if err == nil { - return false - } - return strings.Contains(err.Error(), "NoSuchUpload") -} - // ListBuckets implements StorageInteractor. func (s *S3StorageInteractor) ListBuckets() []string { keys := []string{} @@ -157,40 +145,15 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ putLen := int(multipartChunkSize) if multipartUpload { s.multipartUploads.Store(objectPath, true) - defer s.multipartUploads.Delete(objectPath) - - seeker, canRetry := r.(io.Seeker) - attempts := 1 - if canRetry { - attempts = maxUploadRetries - } - - for attempt := 0; attempt < attempts; attempt++ { - if canRetry { - if _, seekErr := seeker.Seek(0, io.SeekStart); seekErr != nil { - return fmt.Errorf("failed to reset multipart upload reader: %w", seekErr) - } - } - - _, err = up.Upload( - &s3manager.UploadInput{ - Bucket: aws.String(bucket), - Key: aws.String(objectPath), - Body: r, - StorageClass: aws.String(storageClass), - }, - ) - if err == nil || !isNoSuchUploadError(err) { - break - } - - if !canRetry { - ylogger.Zero.Warn().Str("name", name).Err(err).Msg("multipart upload ID expired, cannot retry because upload source is not seekable") - break - } - - ylogger.Zero.Warn().Str("name", name).Int("attempt", attempt+1).Err(err).Msg("multipart upload ID expired, retrying full upload") - } + _, err = up.Upload( + &s3manager.UploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(objectPath), + Body: r, + StorageClass: aws.String(storageClass), + }, + ) + s.multipartUploads.Delete(objectPath) } else { var body []byte body, err = io.ReadAll(r)