From 175c1d01e582015bcf91b947d6e121d72a39d363 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Mon, 11 May 2026 16:51:29 +0000 Subject: [PATCH 01/19] Add retry logic for HTTP fetcher --- client/fetcher.go | 163 +++++++++++++++++++++++++++++++++++-- client/fetcher_test.go | 179 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 334 insertions(+), 8 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index 774c62ef8..fc848fb5c 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -16,13 +16,16 @@ package client import ( "context" + "errors" "fmt" "io" "net/http" "net/url" "os" "path" + "strconv" "strings" + "time" "log/slog" @@ -30,6 +33,20 @@ import ( "github.com/transparency-dev/tessera/internal/fetcher" ) +// TransientError indicates that an error is temporary and the operation can be retried. +type TransientError struct { + Err error + RetryAfter time.Duration // Optional, parsed from header if available +} + +func (e TransientError) Error() string { + return fmt.Sprintf("transient error: %v", e.Err) +} + +func (e TransientError) Unwrap() error { + return e.Err +} + // NewHTTPFetcher creates a new HTTPFetcher for the log rooted at the given URL, using // the provided HTTP client. // @@ -75,24 +92,49 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) { } r, err := h.c.Do(req) if err != nil { - return nil, fmt.Errorf("get(%q): %v", u.String(), err) + // Network errors are considered transient + return nil, TransientError{Err: err} } + defer func() { + if err := r.Body.Close(); err != nil { + slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err)) + } + }() + switch r.StatusCode { case http.StatusOK: - // All good, continue below + return io.ReadAll(r.Body) case http.StatusNotFound: // Need to return ErrNotExist here, by contract. return nil, fmt.Errorf("get(%q): %w", u.String(), os.ErrNotExist) + case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + var retryAfter time.Duration + if ra := r.Header.Get("Retry-After"); ra != "" { + retryAfter = parseRetryAfter(ra) + } + return nil, TransientError{ + Err: fmt.Errorf("get(%q): status %d", u.String(), r.StatusCode), + RetryAfter: retryAfter, + } default: return nil, fmt.Errorf("get(%q): %v", u.String(), r.StatusCode) } +} - defer func() { - if err := r.Body.Close(); err != nil { - slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err)) - } - }() - return io.ReadAll(r.Body) +// parseRetryAfter parses the Retry-After header and returns a time.Duration. +func parseRetryAfter(retryAfter string) time.Duration { + if retryAfter == "" { + return 0 + } + d, err := time.Parse(http.TimeFormat, retryAfter) + if err == nil { + return time.Until(d) + } + s, err := strconv.Atoi(retryAfter) + if err == nil { + return time.Duration(s) * time.Second + } + return 0 } func (h HTTPFetcher) ReadCheckpoint(ctx context.Context) ([]byte, error) { @@ -111,6 +153,111 @@ func (h HTTPFetcher) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([] }) } +// retryOpts holds the configuration for retry logic. +type retryOpts struct { + maxRetries int + initialBackoff time.Duration + maxBackoff time.Duration +} + +// RetryOption is a function that modifies retryOpts. +type RetryOption func(*retryOpts) + +// WithMaxRetries sets the maximum number of retries. +func WithMaxRetries(n int) RetryOption { + return func(o *retryOpts) { o.maxRetries = n } +} + +// WithInitialBackoff sets the initial backoff duration. +func WithInitialBackoff(d time.Duration) RetryOption { + return func(o *retryOpts) { o.initialBackoff = d } +} + +// WithMaxBackoff sets the maximum backoff duration. +func WithMaxBackoff(d time.Duration) RetryOption { + return func(o *retryOpts) { o.maxBackoff = d } +} + +func defaultRetryOpts() retryOpts { + return retryOpts{ + maxRetries: 5, + initialBackoff: 100 * time.Millisecond, + maxBackoff: 2 * time.Second, + } +} + +// WithTileRetry decorates a TileFetcherFunc with retry logic. +func WithTileRetry(f TileFetcherFunc, opts ...RetryOption) TileFetcherFunc { + o := defaultRetryOpts() + for _, opt := range opts { + opt(&o) + } + return func(ctx context.Context, level, index uint64, p uint8) ([]byte, error) { + return retry(ctx, o, func() ([]byte, error) { + return f(ctx, level, index, p) + }) + } +} + +// WithEntryBundleRetry decorates an EntryBundleFetcherFunc with retry logic. +func WithEntryBundleRetry(f EntryBundleFetcherFunc, opts ...RetryOption) EntryBundleFetcherFunc { + o := defaultRetryOpts() + for _, opt := range opts { + opt(&o) + } + return func(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) { + return retry(ctx, o, func() ([]byte, error) { + return f(ctx, bundleIndex, p) + }) + } +} + +// WithCheckpointRetry decorates a CheckpointFetcherFunc with retry logic. +func WithCheckpointRetry(f CheckpointFetcherFunc, opts ...RetryOption) CheckpointFetcherFunc { + o := defaultRetryOpts() + for _, opt := range opts { + opt(&o) + } + return func(ctx context.Context) ([]byte, error) { + return retry(ctx, o, func() ([]byte, error) { + return f(ctx) + }) + } +} + +// retry retries the function f with exponential backoff up to maxRetries. +func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, error) { + var backoff = opts.initialBackoff + for attempt := 0; attempt < opts.maxRetries; attempt++ { + res, err := f() + if err == nil { + return res, nil + } + + var tErr TransientError + if errors.As(err, &tErr) { + if attempt == opts.maxRetries-1 { + return res, fmt.Errorf("after %d attempts: %w", opts.maxRetries, err) + } + delay := backoff + if tErr.RetryAfter > 0 { + delay = tErr.RetryAfter + } + select { + case <-ctx.Done(): + return res, ctx.Err() + case <-time.After(delay): + if tErr.RetryAfter == 0 { + backoff = min(backoff*2, opts.maxBackoff) + } + continue + } + } + return res, err + } + return *new(T), fmt.Errorf("max retries reached") +} + // FileFetcher knows how to fetch log artifacts from a filesystem rooted at Root. type FileFetcher struct { Root string diff --git a/client/fetcher_test.go b/client/fetcher_test.go index 3c0c69d3c..ae1b48ef8 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -3,7 +3,13 @@ package client import ( "context" "errors" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" "testing" + "time" ) func TestFileFetcherContextCancellation(t *testing.T) { @@ -31,3 +37,176 @@ func TestFileFetcherContextCancellation(t *testing.T) { t.Errorf("ReadEntryBundle: got error %v, want %v", err, context.Canceled) } } + +func TestHTTPFetcherRetry(t *testing.T) { + tests := []struct { + name string + responses []int + retryAfter string + expectedError error + wantAttempts int + }{ + { + name: "SuccessFirstTry", + responses: []int{http.StatusOK}, + wantAttempts: 1, + }, + { + name: "RetryThenSuccess", + responses: []int{http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusOK}, + wantAttempts: 3, + }, + { + name: "MaxRetriesExceeded", + responses: []int{http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable}, + expectedError: errors.New("after 5 attempts"), + wantAttempts: 5, + }, + { + name: "NotFoundNoRetry", + responses: []int{http.StatusNotFound}, + expectedError: os.ErrNotExist, + wantAttempts: 1, + }, + { + name: "RetryAfterRespected", + responses: []int{http.StatusTooManyRequests, http.StatusOK}, + retryAfter: "1", // 1 second + wantAttempts: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if attempts < len(tc.responses) { + status := tc.responses[attempts] + attempts++ + if tc.retryAfter != "" && status == http.StatusTooManyRequests { + w.Header().Set("Retry-After", tc.retryAfter) + } + w.WriteHeader(status) + if status == http.StatusOK { + w.Write([]byte("data")) + } + return + } + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + u, _ := url.Parse(server.URL) + fetcher, err := NewHTTPFetcher(u, nil) + if err != nil { + t.Fatal(err) + } + + // Decorate the fetch call using the helper + decoratedFetch := func(ctx context.Context) ([]byte, error) { + return retry(ctx, defaultRetryOpts(), func() ([]byte, error) { + return fetcher.fetch(ctx, "/") + }) + } + + ctx := context.Background() + + startTime := time.Now() + _, err = decoratedFetch(ctx) + duration := time.Since(startTime) + + if tc.expectedError != nil { + if err == nil { + t.Errorf("expected error %v, got nil", tc.expectedError) + } else if !errors.Is(err, tc.expectedError) && !strings.Contains(err.Error(), tc.expectedError.Error()) { + t.Errorf("expected error containing %v, got %v", tc.expectedError, err) + } + } else if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if attempts != tc.wantAttempts { + t.Errorf("got %d attempts, want %d", attempts, tc.wantAttempts) + } + + if tc.retryAfter != "" && duration < time.Second { + t.Errorf("expected retry delay of at least 1s, took %v", duration) + } + }) + } +} + +func TestWithTileRetry(t *testing.T) { + tests := []struct { + name string + responses []error + options []RetryOption + expectedError error + wantAttempts int + }{ + { + name: "SuccessFirstTry", + responses: []error{nil}, + wantAttempts: 1, + }, + { + name: "RetryThenSuccess", + responses: []error{TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, nil}, + wantAttempts: 3, + }, + { + name: "MaxRetriesExceeded", + responses: []error{TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}}, + expectedError: errors.New("after 5 attempts"), + wantAttempts: 5, + }, + { + name: "NonTransientErrorNoRetry", + responses: []error{errors.New("fatal")}, + expectedError: errors.New("fatal"), + wantAttempts: 1, + }, + { + name: "CustomMaxRetries", + responses: []error{TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}}, + options: []RetryOption{WithMaxRetries(2)}, + expectedError: errors.New("after 2 attempts"), + wantAttempts: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + attempts := 0 + dummyFetcher := func(ctx context.Context, level, index uint64, p uint8) ([]byte, error) { + if attempts < len(tc.responses) { + err := tc.responses[attempts] + attempts++ + if err != nil { + return nil, err + } + return []byte("data"), nil + } + return nil, errors.New("unexpected call") + } + + decorated := WithTileRetry(dummyFetcher, tc.options...) + + _, err := decorated(context.Background(), 0, 0, 0) + + if tc.expectedError != nil { + if err == nil { + t.Errorf("expected error %v, got nil", tc.expectedError) + } else if !strings.Contains(err.Error(), tc.expectedError.Error()) { + t.Errorf("expected error containing %v, got %v", tc.expectedError, err) + } + } else if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if attempts != tc.wantAttempts { + t.Errorf("got %d attempts, want %d", attempts, tc.wantAttempts) + } + }) + } +} From 7279e26d05ca957a4a12546edc38f955502d61af Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Mon, 11 May 2026 16:54:55 +0000 Subject: [PATCH 02/19] Fix linter error --- client/fetcher_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/fetcher_test.go b/client/fetcher_test.go index ae1b48ef8..f9f1c4fae 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -88,7 +88,7 @@ func TestHTTPFetcherRetry(t *testing.T) { } w.WriteHeader(status) if status == http.StatusOK { - w.Write([]byte("data")) + _, _ = w.Write([]byte("data")) } return } From 533ee96ebfae9c33a7fcd0083edbaf9f0f809e1a Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Mon, 11 May 2026 18:40:54 +0000 Subject: [PATCH 03/19] Drain response body on transient errors to reuse connection --- client/fetcher.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/fetcher.go b/client/fetcher.go index fc848fb5c..f331538a7 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -96,6 +96,10 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) { return nil, TransientError{Err: err} } defer func() { + // Drain the body to ensure the underlying TCP connection can be returned + // to the keep-alive pool and reused for future requests. + _, _ = io.Copy(io.Discard, r.Body) + if err := r.Body.Close(); err != nil { slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err)) } From 61da9fd3a5787f867b18bc431fbf8ea28d713618 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Mon, 11 May 2026 18:42:25 +0000 Subject: [PATCH 04/19] Fix `time.After` memory leak --- client/fetcher.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index f331538a7..5940b51f0 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -247,10 +247,12 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, if tErr.RetryAfter > 0 { delay = tErr.RetryAfter } + timer := time.NewTimer(delay) select { case <-ctx.Done(): + timer.Stop() return res, ctx.Err() - case <-time.After(delay): + case <-timer.C: if tErr.RetryAfter == 0 { backoff = min(backoff*2, opts.maxBackoff) } From d551f9b9af915d7617cec850ec70baf535cac612 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Mon, 11 May 2026 19:01:16 +0000 Subject: [PATCH 05/19] Handle past-dated `Retry-After` headers in fetcher --- client/fetcher.go | 6 +++++- client/fetcher_test.go | 13 +++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index 5940b51f0..ffd1dd85b 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -132,7 +132,11 @@ func parseRetryAfter(retryAfter string) time.Duration { } d, err := time.Parse(http.TimeFormat, retryAfter) if err == nil { - return time.Until(d) + dur := time.Until(d) + if dur <= 0 { + return time.Nanosecond + } + return dur } s, err := strconv.Atoi(retryAfter) if err == nil { diff --git a/client/fetcher_test.go b/client/fetcher_test.go index f9f1c4fae..4488a3e98 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -45,6 +45,7 @@ func TestHTTPFetcherRetry(t *testing.T) { retryAfter string expectedError error wantAttempts int + minDuration time.Duration }{ { name: "SuccessFirstTry", @@ -73,6 +74,14 @@ func TestHTTPFetcherRetry(t *testing.T) { responses: []int{http.StatusTooManyRequests, http.StatusOK}, retryAfter: "1", // 1 second wantAttempts: 2, + minDuration: time.Second, + }, + { + name: "RetryAfterPastDate", + responses: []int{http.StatusTooManyRequests, http.StatusOK}, + retryAfter: "Wed, 21 Oct 2015 07:28:00 GMT", + wantAttempts: 2, + minDuration: 0, }, } @@ -129,8 +138,8 @@ func TestHTTPFetcherRetry(t *testing.T) { t.Errorf("got %d attempts, want %d", attempts, tc.wantAttempts) } - if tc.retryAfter != "" && duration < time.Second { - t.Errorf("expected retry delay of at least 1s, took %v", duration) + if tc.minDuration > 0 && duration < tc.minDuration { + t.Errorf("expected retry delay of at least %v, took %v", tc.minDuration, duration) } }) } From a6bf8d0790eca976abaeb92ab29c937430891a80 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Mon, 11 May 2026 19:05:11 +0000 Subject: [PATCH 06/19] Restrict transient network error classification to timeout errors only --- client/fetcher.go | 15 ++++++++++++-- client/fetcher_test.go | 45 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index ffd1dd85b..a5053607c 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "net" "net/http" "net/url" "os" @@ -78,6 +79,14 @@ func (h *HTTPFetcher) SetAuthorizationHeader(v string) { h.authHeader = v } +func isTransientNetworkError(err error) bool { + var netErr net.Error + if errors.As(err, &netErr) { + return netErr.Timeout() + } + return false +} + func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) { u, err := h.rootURL.Parse(p) if err != nil { @@ -92,8 +101,10 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) { } r, err := h.c.Do(req) if err != nil { - // Network errors are considered transient - return nil, TransientError{Err: err} + if isTransientNetworkError(err) { + return nil, TransientError{Err: err} + } + return nil, err } defer func() { // Drain the body to ensure the underlying TCP connection can be returned diff --git a/client/fetcher_test.go b/client/fetcher_test.go index 4488a3e98..75849bf8c 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -219,3 +219,48 @@ func TestWithTileRetry(t *testing.T) { }) } } + +type myNetError struct { + timeout bool +} + +func (e myNetError) Error() string { return "error" } +func (e myNetError) Timeout() bool { return e.timeout } +func (e myNetError) Temporary() bool { return false } + +func TestIsTransientNetworkError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "NilError", + err: nil, + want: false, + }, + { + name: "GenericError", + err: errors.New("generic error"), + want: false, + }, + { + name: "TimeoutError", + err: myNetError{timeout: true}, + want: true, + }, + { + name: "NonTimeoutNetError", + err: myNetError{timeout: false}, + want: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := isTransientNetworkError(tc.err); got != tc.want { + t.Errorf("isTransientNetworkError() = %v, want %v", got, tc.want) + } + }) + } +} From 395513502522149603df99ce3b5ab363076e55f3 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 08:51:38 +0000 Subject: [PATCH 07/19] Handle non-positive Retry-After values by defaulting to minimal duration --- client/fetcher.go | 5 ++++- client/fetcher_test.go | 7 +++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index a5053607c..5b90b625a 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -112,7 +112,7 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) { _, _ = io.Copy(io.Discard, r.Body) if err := r.Body.Close(); err != nil { - slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err)) + slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err)) } }() @@ -151,6 +151,9 @@ func parseRetryAfter(retryAfter string) time.Duration { } s, err := strconv.Atoi(retryAfter) if err == nil { + if s <= 0 { + return time.Nanosecond + } return time.Duration(s) * time.Second } return 0 diff --git a/client/fetcher_test.go b/client/fetcher_test.go index 75849bf8c..3061d3354 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -83,6 +83,13 @@ func TestHTTPFetcherRetry(t *testing.T) { wantAttempts: 2, minDuration: 0, }, + { + name: "RetryAfterZero", + responses: []int{http.StatusTooManyRequests, http.StatusOK}, + retryAfter: "0", + wantAttempts: 2, + minDuration: 0, + }, } for _, tc := range tests { From bdb1e572a92020a4cf6a0ca0a6f46bec3f999ede Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 09:19:36 +0000 Subject: [PATCH 08/19] Expand transient network error detection and handle read errors in fetcher --- client/fetcher.go | 24 ++++++++++++++++++++++-- client/fetcher_test.go | 17 +++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index 5b90b625a..1440b69f2 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -26,6 +26,7 @@ import ( "path" "strconv" "strings" + "syscall" "time" "log/slog" @@ -82,7 +83,19 @@ func (h *HTTPFetcher) SetAuthorizationHeader(v string) { func isTransientNetworkError(err error) bool { var netErr net.Error if errors.As(err, &netErr) { - return netErr.Timeout() + if netErr.Timeout() { + return true + } + } + if errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + var errno syscall.Errno + if errors.As(err, &errno) { + switch errno { + case syscall.ECONNRESET, syscall.ECONNABORTED, syscall.ECONNREFUSED: + return true + } } return false } @@ -118,7 +131,14 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) { switch r.StatusCode { case http.StatusOK: - return io.ReadAll(r.Body) + data, err := io.ReadAll(r.Body) + if err != nil { + if isTransientNetworkError(err) { + return nil, TransientError{Err: err} + } + return nil, err + } + return data, nil case http.StatusNotFound: // Need to return ErrNotExist here, by contract. return nil, fmt.Errorf("get(%q): %w", u.String(), os.ErrNotExist) diff --git a/client/fetcher_test.go b/client/fetcher_test.go index 3061d3354..dc31361a4 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -3,11 +3,13 @@ package client import ( "context" "errors" + "io" "net/http" "net/http/httptest" "net/url" "os" "strings" + "syscall" "testing" "time" ) @@ -261,6 +263,21 @@ func TestIsTransientNetworkError(t *testing.T) { err: myNetError{timeout: false}, want: false, }, + { + name: "UnexpectedEOF", + err: io.ErrUnexpectedEOF, + want: true, + }, + { + name: "ConnReset", + err: syscall.ECONNRESET, + want: true, + }, + { + name: "ConnRefused", + err: syscall.ECONNREFUSED, + want: true, + }, } for _, tc := range tests { From fc741c0321bf3859c86f9e6cbd118ac3e621a76f Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 09:22:44 +0000 Subject: [PATCH 09/19] Adjust retry loop to correctly perform maxRetries attempts plus the initial request --- client/fetcher.go | 6 +++--- client/fetcher_test.go | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index 1440b69f2..0ac4271f9 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -270,7 +270,7 @@ func WithCheckpointRetry(f CheckpointFetcherFunc, opts ...RetryOption) Checkpoin // retry retries the function f with exponential backoff up to maxRetries. func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, error) { var backoff = opts.initialBackoff - for attempt := 0; attempt < opts.maxRetries; attempt++ { + for attempt := 0; attempt <= opts.maxRetries; attempt++ { res, err := f() if err == nil { return res, nil @@ -278,8 +278,8 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, var tErr TransientError if errors.As(err, &tErr) { - if attempt == opts.maxRetries-1 { - return res, fmt.Errorf("after %d attempts: %w", opts.maxRetries, err) + if attempt == opts.maxRetries { + return res, fmt.Errorf("after %d retries: %w", opts.maxRetries, err) } delay := backoff if tErr.RetryAfter > 0 { diff --git a/client/fetcher_test.go b/client/fetcher_test.go index dc31361a4..937d08e9d 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -61,9 +61,9 @@ func TestHTTPFetcherRetry(t *testing.T) { }, { name: "MaxRetriesExceeded", - responses: []int{http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable}, - expectedError: errors.New("after 5 attempts"), - wantAttempts: 5, + responses: []int{http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable, http.StatusServiceUnavailable}, + expectedError: errors.New("after 5 retries"), + wantAttempts: 6, }, { name: "NotFoundNoRetry", @@ -174,9 +174,9 @@ func TestWithTileRetry(t *testing.T) { }, { name: "MaxRetriesExceeded", - responses: []error{TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}}, - expectedError: errors.New("after 5 attempts"), - wantAttempts: 5, + responses: []error{TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}}, + expectedError: errors.New("after 5 retries"), + wantAttempts: 6, }, { name: "NonTransientErrorNoRetry", @@ -186,10 +186,10 @@ func TestWithTileRetry(t *testing.T) { }, { name: "CustomMaxRetries", - responses: []error{TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}}, + responses: []error{TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}, TransientError{Err: errors.New("temporary")}}, options: []RetryOption{WithMaxRetries(2)}, - expectedError: errors.New("after 2 attempts"), - wantAttempts: 2, + expectedError: errors.New("after 2 retries"), + wantAttempts: 3, }, } From a4aa8cb0adec1c78d7520ef220424fcd92a43bfc Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 10:23:14 +0000 Subject: [PATCH 10/19] Limit response body drainage to 4KB to prevent hangs on large inputs --- client/fetcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index 0ac4271f9..9b002ad61 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -122,7 +122,8 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) { defer func() { // Drain the body to ensure the underlying TCP connection can be returned // to the keep-alive pool and reused for future requests. - _, _ = io.Copy(io.Discard, r.Body) + // Limit the drain to avoid hanging on large or infinite responses. + _, _ = io.Copy(io.Discard, io.LimitReader(r.Body, 4096)) if err := r.Body.Close(); err != nil { slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err)) From 6acc055bde5c79ab281f367d2f11d9f17a468f20 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 10:34:35 +0000 Subject: [PATCH 11/19] Update fetcher to support RFC 850 and ANSI C date formats for Retry-After header --- client/fetcher.go | 2 +- client/fetcher_test.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index 9b002ad61..8a5262e03 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -162,7 +162,7 @@ func parseRetryAfter(retryAfter string) time.Duration { if retryAfter == "" { return 0 } - d, err := time.Parse(http.TimeFormat, retryAfter) + d, err := http.ParseTime(retryAfter) if err == nil { dur := time.Until(d) if dur <= 0 { diff --git a/client/fetcher_test.go b/client/fetcher_test.go index 937d08e9d..3645ae39b 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -92,6 +92,20 @@ func TestHTTPFetcherRetry(t *testing.T) { wantAttempts: 2, minDuration: 0, }, + { + name: "RetryAfterRFC850", + responses: []int{http.StatusTooManyRequests, http.StatusOK}, + retryAfter: "Sunday, 06-Nov-94 08:49:37 GMT", + wantAttempts: 2, + minDuration: 0, + }, + { + name: "RetryAfterANSIC", + responses: []int{http.StatusTooManyRequests, http.StatusOK}, + retryAfter: "Sun Nov 6 08:49:37 1994", + wantAttempts: 2, + minDuration: 0, + }, } for _, tc := range tests { From 9338d0bcf97efe391738e66c61c57a68b85a4812 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 10:41:54 +0000 Subject: [PATCH 12/19] Cap retry delay at maxBackoff to prevent excessively long waits --- client/fetcher.go | 2 +- client/fetcher_test.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index 8a5262e03..fbe78ca33 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -284,7 +284,7 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, } delay := backoff if tErr.RetryAfter > 0 { - delay = tErr.RetryAfter + delay = min(tErr.RetryAfter, opts.maxBackoff) } timer := time.NewTimer(delay) select { diff --git a/client/fetcher_test.go b/client/fetcher_test.go index 3645ae39b..ac5659b2d 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -106,6 +106,13 @@ func TestHTTPFetcherRetry(t *testing.T) { wantAttempts: 2, minDuration: 0, }, + { + name: "RetryAfterLargeCapped", + responses: []int{http.StatusTooManyRequests, http.StatusOK}, + retryAfter: "3600", // 1 hour + wantAttempts: 2, + minDuration: 2 * time.Second, // Capped at maxBackoff (2s) + }, } for _, tc := range tests { From cfe152df352eaf633c1af2b5e754b7ebb90e9c83 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 10:43:45 +0000 Subject: [PATCH 13/19] Treat io.EOF as a retryable error in fetcher --- client/fetcher.go | 2 +- client/fetcher_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index fbe78ca33..2ae55e2d5 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -87,7 +87,7 @@ func isTransientNetworkError(err error) bool { return true } } - if errors.Is(err, io.ErrUnexpectedEOF) { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { return true } var errno syscall.Errno diff --git a/client/fetcher_test.go b/client/fetcher_test.go index ac5659b2d..546aed3ec 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -289,6 +289,11 @@ func TestIsTransientNetworkError(t *testing.T) { err: io.ErrUnexpectedEOF, want: true, }, + { + name: "EOF", + err: io.EOF, + want: true, + }, { name: "ConnReset", err: syscall.ECONNRESET, From 03862078b5758d50de2791d1b6f422cee3688a9a Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 10:45:22 +0000 Subject: [PATCH 14/19] Adjust retry logic to correctly propagate the final error and last result after exhausted attempts --- client/fetcher.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index 2ae55e2d5..c58e464c4 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -271,8 +271,10 @@ func WithCheckpointRetry(f CheckpointFetcherFunc, opts ...RetryOption) Checkpoin // retry retries the function f with exponential backoff up to maxRetries. func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, error) { var backoff = opts.initialBackoff + var err error + var res T for attempt := 0; attempt <= opts.maxRetries; attempt++ { - res, err := f() + res, err = f() if err == nil { return res, nil } @@ -280,7 +282,7 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, var tErr TransientError if errors.As(err, &tErr) { if attempt == opts.maxRetries { - return res, fmt.Errorf("after %d retries: %w", opts.maxRetries, err) + break } delay := backoff if tErr.RetryAfter > 0 { @@ -300,7 +302,7 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, } return res, err } - return *new(T), fmt.Errorf("max retries reached") + return res, fmt.Errorf("after %d retries: %w", opts.maxRetries, err) } // FileFetcher knows how to fetch log artifacts from a filesystem rooted at Root. From 24683f2267d13ec89aa737b4054b657b56adecaf Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 10:51:55 +0000 Subject: [PATCH 15/19] Exclude context cancellation and deadline errors from transient network error check --- client/fetcher.go | 3 +++ client/fetcher_test.go | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/client/fetcher.go b/client/fetcher.go index c58e464c4..8509d30c3 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -81,6 +81,9 @@ func (h *HTTPFetcher) SetAuthorizationHeader(v string) { } func isTransientNetworkError(err error) bool { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } var netErr net.Error if errors.As(err, &netErr) { if netErr.Timeout() { diff --git a/client/fetcher_test.go b/client/fetcher_test.go index 546aed3ec..d63213fb5 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -274,6 +274,16 @@ func TestIsTransientNetworkError(t *testing.T) { err: errors.New("generic error"), want: false, }, + { + name: "ContextCanceled", + err: context.Canceled, + want: false, + }, + { + name: "ContextDeadlineExceeded", + err: context.DeadlineExceeded, + want: false, + }, { name: "TimeoutError", err: myNetError{timeout: true}, From 66132ecd92babc398c64b7ede55ee40117c1266d Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 10:53:08 +0000 Subject: [PATCH 16/19] Add jitter to backoff duration in fetcher retry logic --- client/fetcher.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index 8509d30c3..52a607c10 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "math/rand/v2" "net" "net/http" "net/url" @@ -298,7 +299,9 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, return res, ctx.Err() case <-timer.C: if tErr.RetryAfter == 0 { - backoff = min(backoff*2, opts.maxBackoff) + // Add jitter up to 50% of the current backoff + jitter := time.Duration(rand.Int64N(int64(backoff / 2))) + backoff = min(backoff*2, opts.maxBackoff) + jitter } continue } From 89148cfda72ff32c1791b281061d38f6882c6cb7 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 11:02:00 +0000 Subject: [PATCH 17/19] Prevent panic in backoff jitter calculation when backoff is zero --- client/fetcher.go | 5 ++++- client/fetcher_test.go | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/client/fetcher.go b/client/fetcher.go index 52a607c10..54e896b24 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -300,7 +300,10 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, case <-timer.C: if tErr.RetryAfter == 0 { // Add jitter up to 50% of the current backoff - jitter := time.Duration(rand.Int64N(int64(backoff / 2))) + var jitter time.Duration + if n := int64(backoff / 2); n > 0 { + jitter = time.Duration(rand.Int64N(n)) + } backoff = min(backoff*2, opts.maxBackoff) + jitter } continue diff --git a/client/fetcher_test.go b/client/fetcher_test.go index d63213fb5..dfe228b37 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -212,6 +212,12 @@ func TestWithTileRetry(t *testing.T) { expectedError: errors.New("after 2 retries"), wantAttempts: 3, }, + { + name: "InitialBackoffZero", + responses: []error{TransientError{Err: errors.New("temporary")}, nil}, + options: []RetryOption{WithInitialBackoff(0)}, + wantAttempts: 2, + }, } for _, tc := range tests { From 086fa0a52419d4c6ad572beaf210eb91a9a458dc Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 12:34:50 +0000 Subject: [PATCH 18/19] Adjust backoff calculation --- client/fetcher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index 54e896b24..dc199b050 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -299,12 +299,12 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, return res, ctx.Err() case <-timer.C: if tErr.RetryAfter == 0 { - // Add jitter up to 50% of the current backoff + nextBackoff := backoff * 2 var jitter time.Duration - if n := int64(backoff / 2); n > 0 { + if n := int64(backoff); n > 0 { jitter = time.Duration(rand.Int64N(n)) } - backoff = min(backoff*2, opts.maxBackoff) + jitter + backoff = min(nextBackoff+jitter, opts.maxBackoff) } continue } From b9a3eb4422b7dc81348f9493f1f03305285e1923 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Tue, 12 May 2026 12:37:56 +0000 Subject: [PATCH 19/19] Return error when Retry-After duration exceeds maxBackoff instead of capping it --- client/fetcher.go | 5 ++++- client/fetcher_test.go | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/client/fetcher.go b/client/fetcher.go index dc199b050..687e31256 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -290,7 +290,10 @@ func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, } delay := backoff if tErr.RetryAfter > 0 { - delay = min(tErr.RetryAfter, opts.maxBackoff) + if tErr.RetryAfter > opts.maxBackoff { + return res, fmt.Errorf("Retry-After %v exceeds maxBackoff %v: %w", tErr.RetryAfter, opts.maxBackoff, err) + } + delay = tErr.RetryAfter } timer := time.NewTimer(delay) select { diff --git a/client/fetcher_test.go b/client/fetcher_test.go index dfe228b37..409927ed7 100644 --- a/client/fetcher_test.go +++ b/client/fetcher_test.go @@ -107,11 +107,11 @@ func TestHTTPFetcherRetry(t *testing.T) { minDuration: 0, }, { - name: "RetryAfterLargeCapped", - responses: []int{http.StatusTooManyRequests, http.StatusOK}, - retryAfter: "3600", // 1 hour - wantAttempts: 2, - minDuration: 2 * time.Second, // Capped at maxBackoff (2s) + name: "RetryAfterExceedsMaxBackoff", + responses: []int{http.StatusTooManyRequests}, + retryAfter: "3600", // 1 hour + expectedError: errors.New("exceeds maxBackoff"), + wantAttempts: 1, }, }