Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions pkg/gatherers/conditional/conditional_gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/openshift/insights-operator/pkg/gatherers/common"
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
"github.com/openshift/insights-operator/pkg/record"
"github.com/openshift/insights-operator/pkg/retry"
"github.com/openshift/insights-operator/pkg/utils"
)

Expand Down Expand Up @@ -271,35 +272,33 @@ func (g *Gatherer) getRemoteConfiguration(ctx context.Context) ([]byte, error) {
Cap: 3 * time.Minute,
}
endpointWithVersion := fmt.Sprintf(endpoint, ocpVersion)
var remoteConfigData []byte
err = wait.ExponentialBackoffWithContext(ctx, backOff, func(ctx context.Context) (done bool, err error) {

result, err := retry.RetryWithExpBackOff(ctx, backOff, retry.RetryOnNon200HTTP, func() (retry.Result, error) {
resp, err := g.insightsCli.GetWithPathParam(ctx, endpoint, ocpVersion, false)
if err != nil {
return false, err
return retry.Result{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
klog.Infof("Received HTTP status code %d, trying again in %s", resp.StatusCode, backOff.Step())
if backOff.Steps > 1 {
return false, nil
}
return true, insightsclient.HttpError{
Err: fmt.Errorf("received HTTP %s from %s. Using the default built-in configuration", resp.Status, endpointWithVersion),
return retry.Result{}, insightsclient.HttpError{
Err: fmt.Errorf("received HTTP %s from %s", resp.Status, endpointWithVersion),
StatusCode: resp.StatusCode,
}
}
remoteConfigData, err = io.ReadAll(resp.Body)
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
return false, nil
return retry.Result{}, err
}
return true, nil
return retry.Result{Data: data}, nil
})

if err != nil {
return nil, err
}

return remoteConfigData, nil
return result.Data, nil
}

func (g *Gatherer) getRemoteConfigEndpoint() (string, error) {
Expand Down
24 changes: 9 additions & 15 deletions pkg/insights/insightsuploader/insightsuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/openshift/insights-operator/pkg/config/configobserver"
"github.com/openshift/insights-operator/pkg/controllerstatus"
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
"github.com/openshift/insights-operator/pkg/retry"
)

type Authorizer interface {
Expand Down Expand Up @@ -206,25 +207,18 @@ func (c *Controller) Upload(ctx context.Context, s *insightsclient.Source) (stri
start := time.Now()
s.ID = start.Format(time.RFC3339)
s.Type = "application/vnd.redhat.openshift.periodic"
var requestID string
var statusCode int
err := wait.ExponentialBackoff(c.backoff, func() (done bool, err error) {
requestID, statusCode, err = c.client.SendAndGetID(ctx, c.configurator.Config().DataReporting.UploadEndpoint, *s)
if err != nil {
// do no return the error if it's not the last attempt
if c.backoff.Steps > 1 {
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
klog.Errorf("%v. Trying again in %s", err, c.backoff.Step())
return false, nil
}
}
return true, err

result, err := retry.RetryWithExpBackOff(ctx, c.backoff, retry.RetryOnAll, func() (retry.Result, error) {
requestID, statusCode, err := c.client.SendAndGetID(ctx, c.configurator.Config().DataReporting.UploadEndpoint, *s)
return retry.Result{RequestID: requestID, StatusCode: statusCode}, err
})

if err != nil {
return "", statusCode, err
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
return "", result.StatusCode, err
}
klog.Infof("Uploaded report successfully in %s", time.Since(start))
return requestID, statusCode, nil
return result.RequestID, result.StatusCode, nil
}

func reportToLogs(source io.Reader) error {
Expand Down
31 changes: 8 additions & 23 deletions pkg/ocm/clustertransfer/cluster_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openshift/insights-operator/pkg/controllerstatus"
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
"github.com/openshift/insights-operator/pkg/ocm"
"github.com/openshift/insights-operator/pkg/retry"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -81,7 +82,7 @@ func (c *Controller) Run(ctx context.Context) {
// in the response then check if a secret update is required, and if so, perform the update.
func (c *Controller) requestDataAndUpdateSecret(ctx context.Context, endpoint string) {
klog.Infof("checking the availability of cluster transfer. Next check is in %s", c.configurator.Config().ClusterTransfer.Interval)
data, err := c.requestClusterTransferWithExponentialBackoff(endpoint)
data, err := c.requestClusterTransferWithExponentialBackoff(ctx, endpoint)
if err != nil {
msg := fmt.Sprintf("failed to pull cluster transfer: %v", err)
httpErr, ok := err.(insightsclient.HttpError)
Expand Down Expand Up @@ -212,7 +213,7 @@ func (c *Controller) updatePullSecret(ctx context.Context, newData []byte) error
// from the OCM API with exponential backoff.
// It returns HttpError (see insightsclient.go) in case of any HTTP error response from the OCM API.
// The exponential backoff is applied only for HTTP errors >= 500.
func (c *Controller) requestClusterTransferWithExponentialBackoff(endpoint string) ([]byte, error) {
func (c *Controller) requestClusterTransferWithExponentialBackoff(ctx context.Context, endpoint string) ([]byte, error) {
bo := wait.Backoff{
Duration: c.configurator.Config().ClusterTransfer.Interval / 24, // 30 min as the first waiting
Factor: 2,
Expand All @@ -221,31 +222,15 @@ func (c *Controller) requestClusterTransferWithExponentialBackoff(endpoint strin
Cap: c.configurator.Config().ClusterTransfer.Interval,
}

var data []byte
err := wait.ExponentialBackoff(bo, func() (bool, error) {
var err error
data, err = c.client.RecvClusterTransfer(endpoint)
if err == nil {
return true, nil
}
// don't try again in case it's not an HTTP error - it could mean we're in disconnected env
if !insightsclient.IsHttpError(err) {
return true, err
}
httpErr := err.(insightsclient.HttpError)
if httpErr.StatusCode >= http.StatusInternalServerError {
// check the number of steps to prevent "timeout waiting for condition" error - we want to propagate the HTTP error below
if bo.Steps > 1 {
klog.Errorf("Got HTTP %v. Trying again in %s", httpErr.StatusCode, bo.Step())
return false, nil
}
}
return true, httpErr
result, err := retry.RetryWithExpBackOff(ctx, bo, retry.RetryOn50xHTTP, func() (retry.Result, error) {
data, err := c.client.RecvClusterTransfer(endpoint)
return retry.Result{Data: data}, err
})

if err != nil {
return nil, err
}
return data, nil
return result.Data, nil
}

func (c *Controller) updateStatus(healthy bool, msg, reason string, httpErr *insightsclient.HttpError) {
Expand Down
29 changes: 6 additions & 23 deletions pkg/ocm/sca/sca.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/openshift/insights-operator/pkg/controllerstatus"
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
"github.com/openshift/insights-operator/pkg/ocm"
"github.com/openshift/insights-operator/pkg/retry"
)

const (
Expand Down Expand Up @@ -299,35 +300,17 @@ func (c *Controller) requestSCAWithExpBackoff(
Cap: c.configurator.Config().SCA.Interval,
}

var err error
var data []byte
err = wait.ExponentialBackoff(bo, func() (bool, error) {
data, err = c.client.RecvSCACerts(ctx, endpoint, nodeArchitectures)
if err != nil {
// don't try again in case it's not an HTTP error - it could mean we're in disconnected env
if !insightsclient.IsHttpError(err) {
return true, err
}
httpErr := err.(insightsclient.HttpError)
// try again only in case of 500 or higher
if httpErr.StatusCode >= http.StatusInternalServerError {
// check the number of steps to prevent "timeout waiting for condition" error - we want to propagate the HTTP error
if bo.Steps > 1 {
klog.Errorf("%v. Trying again in %s", httpErr, bo.Step())
return false, nil
}
}
return true, httpErr
}

return true, nil
result, err := retry.RetryWithExpBackOff(ctx, bo, retry.RetryOn50xHTTP, func() (retry.Result, error) {
data, err := c.client.RecvSCACerts(ctx, endpoint, nodeArchitectures)
return retry.Result{Data: data}, err
})

if err != nil {
return nil, err
}

var response Response
err = json.Unmarshal(data, &response)
err = json.Unmarshal(result.Data, &response)
if err != nil {
klog.Errorf("Unable to decode response: %v", err)
return nil, err
Expand Down
138 changes: 138 additions & 0 deletions pkg/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Package retry provides shared retry logic with exponential backoff for HTTP operations.
//
// Usage example:
//
// result, err := retry.RetryWithExpBackOff(
// ctx,
// wait.Backoff{
// Duration: interval/32,
// Factor: 2,
// Steps: ocm.FailureCountThreshold,
// Cap: interval,
// },
// retry.RetryOn50xHTTP,
// func() (retry.Result, error) {
// data, err := client.RecvSCACerts(ctx, endpoint, nodeArchs)
// return retry.Result{Data: data}, err
// },
// )
package retry

import (
"context"
"errors"
"net/http"

"github.com/openshift/insights-operator/pkg/insights/insightsclient"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

type RetryStrategy int64

const (
// RetryOn50xHTTP retries only on HTTP 500+ errors, skips retry for non-HTTP errors (disconnected env)
// Used by: sca.go, clustertransfer.go
RetryOn50xHTTP RetryStrategy = iota

// RetryOnNon200HTTP retries on any non-200 HTTP status code
// Used by: conditional_gatherer.go
RetryOnNon200HTTP

// RetryOnAll retries on all errors
// Used by: insightsuploader.go
RetryOnAll
)

// Result holds the response data from retry operations
type Result struct {
Data []byte
StatusCode int
RequestID string
}
Comment thread
katushiik11 marked this conversation as resolved.

// shouldRetry determines if an error should be retried based on the strategy.
// Returns true if retry should be attempted (when steps remain).
// Returns false immediately if the context is canceled or deadline exceeded.
func shouldRetry(ctx context.Context, err error, strategy RetryStrategy) bool {
// Don't retry if context is canceled or deadline exceeded
if ctx.Err() != nil {
return false
}

// Don't retry context cancellation or deadline errors
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}

// Extract status code from HttpError (handles both pointer and non-pointer)
var statusCode int
var isHTTPError bool

switch e := err.(type) {
case *insightsclient.HttpError:
// Pointer - real-world case from newHTTPErrorFromResponse
statusCode = e.StatusCode
isHTTPError = true
case insightsclient.HttpError:
// Non-pointer - test case
statusCode = e.StatusCode
isHTTPError = true
}

switch strategy {
case RetryOn50xHTTP:
// Only retry HTTP 500+ errors, skip non-HTTP errors (disconnected env)
if !isHTTPError {
return false
}
return statusCode >= http.StatusInternalServerError

case RetryOnNon200HTTP:
// Retry on any non-200 HTTP status, or non-HTTP errors
if !isHTTPError {
return true // retry non-HTTP errors
}
return statusCode != http.StatusOK

case RetryOnAll:
// Retry on all errors
return true
Comment thread
katushiik11 marked this conversation as resolved.

default:
// Unknown strategy, don't retry
klog.Infof("Unknown strategy %d for retry mechanism", strategy)
return false
}
}

func RetryWithExpBackOff(ctx context.Context, bo wait.Backoff, strategy RetryStrategy, operation func() (Result, error)) (Result, error) {
var lastErr error
var result Result

attempt := 0
maxAttempts := bo.Steps

err := wait.ExponentialBackoffWithContext(ctx, bo, func(context.Context) (bool, error) {
attempt++
result, lastErr = operation()
if lastErr != nil {
// Use strategy to determine if we should retry
if shouldRetry(ctx, lastErr, strategy) {
klog.Errorf("%v. Retrying (attempt %d/%d)", lastErr, attempt, maxAttempts)
return false, nil
}
// Don't retry based on strategy
return true, lastErr
}

return true, nil
})

// If we exhausted retries, return the last operation error instead of the timeout error
if wait.Interrupted(err) && lastErr != nil {
return result, lastErr
}

return result, err
}
Loading