Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
175c1d0
Add retry logic for HTTP fetcher
roger2hk May 11, 2026
7279e26
Fix linter error
roger2hk May 11, 2026
533ee96
Drain response body on transient errors to reuse connection
roger2hk May 11, 2026
61da9fd
Fix `time.After` memory leak
roger2hk May 11, 2026
d551f9b
Handle past-dated `Retry-After` headers in fetcher
roger2hk May 11, 2026
a6bf8d0
Restrict transient network error classification to timeout errors only
roger2hk May 11, 2026
3955135
Handle non-positive Retry-After values by defaulting to minimal duration
roger2hk May 12, 2026
bdb1e57
Expand transient network error detection and handle read errors in fe…
roger2hk May 12, 2026
fc741c0
Adjust retry loop to correctly perform maxRetries attempts plus the i…
roger2hk May 12, 2026
a4aa8cb
Limit response body drainage to 4KB to prevent hangs on large inputs
roger2hk May 12, 2026
6acc055
Update fetcher to support RFC 850 and ANSI C date formats for Retry-A…
roger2hk May 12, 2026
9338d0b
Cap retry delay at maxBackoff to prevent excessively long waits
roger2hk May 12, 2026
cfe152d
Treat io.EOF as a retryable error in fetcher
roger2hk May 12, 2026
0386207
Adjust retry logic to correctly propagate the final error and last re…
roger2hk May 12, 2026
24683f2
Exclude context cancellation and deadline errors from transient netwo…
roger2hk May 12, 2026
66132ec
Add jitter to backoff duration in fetcher retry logic
roger2hk May 12, 2026
89148cf
Prevent panic in backoff jitter calculation when backoff is zero
roger2hk May 12, 2026
086fa0a
Adjust backoff calculation
roger2hk May 12, 2026
b9a3eb4
Return error when Retry-After duration exceeds maxBackoff instead of …
roger2hk May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 213 additions & 7 deletions client/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,40 @@ package client

import (
"context"
"errors"
"fmt"
"io"
"math/rand/v2"
"net"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"
"syscall"
"time"

"log/slog"

"github.com/transparency-dev/tessera/api/layout"
"github.com/transparency-dev/tessera/internal/fetcher"
)

// TransientError indicates that an error is temporary and the operation can be retried.
type TransientError struct {
Err error
RetryAfter time.Duration // Optional, parsed from header if available
}

func (e TransientError) Error() string {
return fmt.Sprintf("transient error: %v", e.Err)
}

func (e TransientError) Unwrap() error {
return e.Err
}

// NewHTTPFetcher creates a new HTTPFetcher for the log rooted at the given URL, using
// the provided HTTP client.
//
Expand Down Expand Up @@ -61,6 +81,29 @@ func (h *HTTPFetcher) SetAuthorizationHeader(v string) {
h.authHeader = v
}

func isTransientNetworkError(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
return true
}
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
var errno syscall.Errno
if errors.As(err, &errno) {
switch errno {
case syscall.ECONNRESET, syscall.ECONNABORTED, syscall.ECONNREFUSED:
return true
}
}
return false
}

func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) {
u, err := h.rootURL.Parse(p)
if err != nil {
Expand All @@ -75,24 +118,70 @@ func (h HTTPFetcher) fetch(ctx context.Context, p string) ([]byte, error) {
}
r, err := h.c.Do(req)
if err != nil {
return nil, fmt.Errorf("get(%q): %v", u.String(), err)
if isTransientNetworkError(err) {
return nil, TransientError{Err: err}
}
return nil, err
}
defer func() {
// Drain the body to ensure the underlying TCP connection can be returned
// to the keep-alive pool and reused for future requests.
// Limit the drain to avoid hanging on large or infinite responses.
_, _ = io.Copy(io.Discard, io.LimitReader(r.Body, 4096))

if err := r.Body.Close(); err != nil {
slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err))
}
}()

switch r.StatusCode {
case http.StatusOK:
// All good, continue below
data, err := io.ReadAll(r.Body)
if err != nil {
if isTransientNetworkError(err) {
return nil, TransientError{Err: err}
}
return nil, err
}
return data, nil
case http.StatusNotFound:
// Need to return ErrNotExist here, by contract.
return nil, fmt.Errorf("get(%q): %w", u.String(), os.ErrNotExist)
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
var retryAfter time.Duration
if ra := r.Header.Get("Retry-After"); ra != "" {
retryAfter = parseRetryAfter(ra)
}
return nil, TransientError{
Err: fmt.Errorf("get(%q): status %d", u.String(), r.StatusCode),
RetryAfter: retryAfter,
}
default:
return nil, fmt.Errorf("get(%q): %v", u.String(), r.StatusCode)
}
}

defer func() {
if err := r.Body.Close(); err != nil {
slog.ErrorContext(ctx, "resp.Body.Close", slog.Any("error", err))
// parseRetryAfter parses the Retry-After header and returns a time.Duration.
func parseRetryAfter(retryAfter string) time.Duration {
if retryAfter == "" {
return 0
}
d, err := http.ParseTime(retryAfter)
if err == nil {
dur := time.Until(d)
if dur <= 0 {
return time.Nanosecond
}
}()
return io.ReadAll(r.Body)
return dur
}
s, err := strconv.Atoi(retryAfter)
if err == nil {
if s <= 0 {
return time.Nanosecond
}
return time.Duration(s) * time.Second
}
return 0
}

func (h HTTPFetcher) ReadCheckpoint(ctx context.Context) ([]byte, error) {
Expand All @@ -111,6 +200,123 @@ func (h HTTPFetcher) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]
})
}

// retryOpts holds the configuration for retry logic.
type retryOpts struct {
maxRetries int
initialBackoff time.Duration
maxBackoff time.Duration
}

// RetryOption is a function that modifies retryOpts.
type RetryOption func(*retryOpts)

// WithMaxRetries sets the maximum number of retries.
func WithMaxRetries(n int) RetryOption {
return func(o *retryOpts) { o.maxRetries = n }
}

// WithInitialBackoff sets the initial backoff duration.
func WithInitialBackoff(d time.Duration) RetryOption {
return func(o *retryOpts) { o.initialBackoff = d }
}

// WithMaxBackoff sets the maximum backoff duration.
func WithMaxBackoff(d time.Duration) RetryOption {
return func(o *retryOpts) { o.maxBackoff = d }
}

func defaultRetryOpts() retryOpts {
return retryOpts{
maxRetries: 5,
initialBackoff: 100 * time.Millisecond,
maxBackoff: 2 * time.Second,
}
}

// WithTileRetry decorates a TileFetcherFunc with retry logic.
func WithTileRetry(f TileFetcherFunc, opts ...RetryOption) TileFetcherFunc {
o := defaultRetryOpts()
for _, opt := range opts {
opt(&o)
}
return func(ctx context.Context, level, index uint64, p uint8) ([]byte, error) {
return retry(ctx, o, func() ([]byte, error) {
return f(ctx, level, index, p)
})
}
}

// WithEntryBundleRetry decorates an EntryBundleFetcherFunc with retry logic.
func WithEntryBundleRetry(f EntryBundleFetcherFunc, opts ...RetryOption) EntryBundleFetcherFunc {
o := defaultRetryOpts()
for _, opt := range opts {
opt(&o)
}
return func(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) {
return retry(ctx, o, func() ([]byte, error) {
return f(ctx, bundleIndex, p)
})
}
}

// WithCheckpointRetry decorates a CheckpointFetcherFunc with retry logic.
func WithCheckpointRetry(f CheckpointFetcherFunc, opts ...RetryOption) CheckpointFetcherFunc {
o := defaultRetryOpts()
for _, opt := range opts {
opt(&o)
}
return func(ctx context.Context) ([]byte, error) {
return retry(ctx, o, func() ([]byte, error) {
return f(ctx)
})
}
}

// retry retries the function f with exponential backoff up to maxRetries.
func retry[T any](ctx context.Context, opts retryOpts, f func() (T, error)) (T, error) {
var backoff = opts.initialBackoff
var err error
var res T
for attempt := 0; attempt <= opts.maxRetries; attempt++ {
res, err = f()
if err == nil {
return res, nil
}

var tErr TransientError
if errors.As(err, &tErr) {
if attempt == opts.maxRetries {
break
}
delay := backoff
if tErr.RetryAfter > 0 {
if tErr.RetryAfter > opts.maxBackoff {
return res, fmt.Errorf("Retry-After %v exceeds maxBackoff %v: %w", tErr.RetryAfter, opts.maxBackoff, err)
}
delay = tErr.RetryAfter
}
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return res, ctx.Err()
case <-timer.C:
if tErr.RetryAfter == 0 {
nextBackoff := backoff * 2
var jitter time.Duration
if n := int64(backoff); n > 0 {
jitter = time.Duration(rand.Int64N(n))
}
backoff = min(nextBackoff+jitter, opts.maxBackoff)
}
continue
}
}
return res, err
}
return res, fmt.Errorf("after %d retries: %w", opts.maxRetries, err)
}

// FileFetcher knows how to fetch log artifacts from a filesystem rooted at Root.
type FileFetcher struct {
Root string
Expand Down
Loading
Loading