diff --git a/internal/impl/io/input_http_client.go b/internal/impl/io/input_http_client.go index d461a44ba..728f6874d 100644 --- a/internal/impl/io/input_http_client.go +++ b/internal/impl/io/input_http_client.go @@ -10,9 +10,9 @@ import ( "sync" "github.com/redpanda-data/benthos/v4/internal/component" - "github.com/redpanda-data/benthos/v4/internal/httpclient" "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/benthos/v4/public/service/codec" + "github.com/redpanda-data/benthos/v4/public/utils/httpclient" ) func httpClientInputSpec() *service.ConfigSpec { @@ -132,7 +132,7 @@ func newHTTPClientInputFromParsed(conf *service.ParsedConfig, mgr *service.Resou return nil, err } - client, err := httpclient.NewClientFromOldConfig(oldConf, mgr, httpclient.WithExplicitBody(payloadExpr)) + client, err := httpclient.NewClientFromConfig(oldConf, mgr, httpclient.WithExplicitBody(payloadExpr)) if err != nil { return nil, err } diff --git a/internal/impl/io/output_http_client.go b/internal/impl/io/output_http_client.go index e008c6e0e..cc39315ff 100644 --- a/internal/impl/io/output_http_client.go +++ b/internal/impl/io/output_http_client.go @@ -5,8 +5,8 @@ package io import ( "context" - "github.com/redpanda-data/benthos/v4/internal/httpclient" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/benthos/v4/public/utils/httpclient" ) func httpClientOutputSpec() *service.ConfigSpec { @@ -111,7 +111,7 @@ func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Reso return nil, err } - client, err := httpclient.NewClientFromOldConfig(oldHTTPConf, mgr, opts...) + client, err := httpclient.NewClientFromConfig(oldHTTPConf, mgr, opts...) if err != nil { return nil, err } diff --git a/internal/impl/io/processor_http.go b/internal/impl/io/processor_http.go index 5f17db9a4..44c059c35 100644 --- a/internal/impl/io/processor_http.go +++ b/internal/impl/io/processor_http.go @@ -7,8 +7,8 @@ import ( "errors" "fmt" - "github.com/redpanda-data/benthos/v4/internal/httpclient" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/benthos/v4/public/utils/httpclient" ) func httpProcSpec() *service.ConfigSpec { @@ -103,7 +103,7 @@ func newHTTPProcFromParsed(conf *service.ParsedConfig, mgr *service.Resources) ( asMultipart: asMultipart, parallel: parallel, } - if g.client, err = httpclient.NewClientFromOldConfig(oldConf, mgr); err != nil { + if g.client, err = httpclient.NewClientFromConfig(oldConf, mgr); err != nil { return nil, err } return g, nil diff --git a/internal/impl/pure/output_drop_on_test.go b/internal/impl/pure/output_drop_on_test.go index 9df8af841..5f4b67631 100644 --- a/internal/impl/pure/output_drop_on_test.go +++ b/internal/impl/pure/output_drop_on_test.go @@ -82,7 +82,7 @@ drop_on: t.Fatal("timed out") } - assert.EqualError(t, res, fmt.Sprintf("%s: HTTP request returned unexpected response code (403): 403 Forbidden, Error: test error", ts.URL)) + assert.EqualError(t, res, fmt.Sprintf("%s: HTTP request returned unexpected response code (403): 403 Forbidden, body: test error", ts.URL)) } func TestDropOnError(t *testing.T) { diff --git a/internal/httpclient/auth_oauth2.go b/public/utils/httpclient/auth_oauth2.go similarity index 77% rename from internal/httpclient/auth_oauth2.go rename to public/utils/httpclient/auth_oauth2.go index bbffe30bc..414e23671 100644 --- a/internal/httpclient/auth_oauth2.go +++ b/public/utils/httpclient/auth_oauth2.go @@ -34,37 +34,37 @@ type oauth2Config struct { EndpointParams map[string][]string } -// Client returns an http.Client with OAuth2 configured. -func (oauth oauth2Config) Client(ctx context.Context, base *http.Client) *http.Client { - if !oauth.Enabled { +// Client returns an [http.Client] with OAuth2 configured. +func (ac oauth2Config) Client(ctx context.Context, base *http.Client) *http.Client { + if !ac.Enabled { return base } // Support for refresh_token grant type with bootstrapped refresh token to obtain access token - if gt, ok := oauth.EndpointParams["grant_type"]; ok && gt[0] == "refresh_token" { + if gt, ok := ac.EndpointParams["grant_type"]; ok && gt[0] == "refresh_token" { conf := &oauth2.Config{ - ClientID: oauth.ClientKey, - ClientSecret: oauth.ClientSecret, + ClientID: ac.ClientKey, + ClientSecret: ac.ClientSecret, Endpoint: oauth2.Endpoint{ - TokenURL: oauth.TokenURL, + TokenURL: ac.TokenURL, AuthStyle: oauth2.AuthStyleAutoDetect, }, - Scopes: oauth.Scopes, + Scopes: ac.Scopes, } // We don't consider bootstrapped access token if any as it might be expired, rather we generate a new one token := &oauth2.Token{} - if rt, ok := oauth.EndpointParams["refresh_token"]; ok { + if rt, ok := ac.EndpointParams["refresh_token"]; ok { token.RefreshToken = rt[0] } return conf.Client(context.WithValue(ctx, oauth2.HTTPClient, base), token) } conf := &clientcredentials.Config{ - ClientID: oauth.ClientKey, - ClientSecret: oauth.ClientSecret, - TokenURL: oauth.TokenURL, - Scopes: oauth.Scopes, - EndpointParams: oauth.EndpointParams, + ClientID: ac.ClientKey, + ClientSecret: ac.ClientSecret, + TokenURL: ac.TokenURL, + Scopes: ac.Scopes, + EndpointParams: ac.EndpointParams, } return conf.Client(context.WithValue(ctx, oauth2.HTTPClient, base)) @@ -138,33 +138,33 @@ func oauth2ClientCtorFromParsed(conf *service.ParsedConfig) (res func(context.Co } conf = conf.Namespace(aFieldOAuth2) - var oldConf oauth2Config - if oldConf.Enabled, err = conf.FieldBool(ao2FieldEnabled); err != nil { + var oauthConf oauth2Config + if oauthConf.Enabled, err = conf.FieldBool(ao2FieldEnabled); err != nil { return } - if oldConf.ClientKey, err = conf.FieldString(ao2FieldClientKey); err != nil { + if oauthConf.ClientKey, err = conf.FieldString(ao2FieldClientKey); err != nil { return } - if oldConf.ClientSecret, err = conf.FieldString(ao2FieldClientSecret); err != nil { + if oauthConf.ClientSecret, err = conf.FieldString(ao2FieldClientSecret); err != nil { return } - if oldConf.TokenURL, err = conf.FieldString(ao2FieldTokenURL); err != nil { + if oauthConf.TokenURL, err = conf.FieldString(ao2FieldTokenURL); err != nil { return } - if oldConf.Scopes, err = conf.FieldStringList(ao2FieldScopes); err != nil { + if oauthConf.Scopes, err = conf.FieldStringList(ao2FieldScopes); err != nil { return } var endpointParams map[string]*service.ParsedConfig if endpointParams, err = conf.FieldAnyMap(ao2FieldEndpointParams); err != nil { return } - oldConf.EndpointParams = map[string][]string{} + oauthConf.EndpointParams = map[string][]string{} for k, v := range endpointParams { - if oldConf.EndpointParams[k], err = v.FieldStringList(); err != nil { + if oauthConf.EndpointParams[k], err = v.FieldStringList(); err != nil { return } } - res = oldConf.Client + res = oauthConf.Client return } diff --git a/internal/httpclient/client.go b/public/utils/httpclient/client.go similarity index 68% rename from internal/httpclient/client.go rename to public/utils/httpclient/client.go index 1b1543dd2..ee8d6ac63 100644 --- a/internal/httpclient/client.go +++ b/public/utils/httpclient/client.go @@ -53,16 +53,15 @@ type Client struct { codesMut sync.RWMutex } -// NewClientFromOldConfig creates a new request creator from an old struct style -// config. Eventually I'd like to phase these out for the more dynamic service -// style parses, but it'll take a while so we have this for now. -func NewClientFromOldConfig(conf OldConfig, mgr *service.Resources, opts ...RequestOpt) (*Client, error) { - reqCreator, err := RequestCreatorFromOldConfig(conf, mgr, opts...) +// NewClientFromConfig creates a new HTTP client from the provided +// configuration. +func NewClientFromConfig(conf Config, mgr *service.Resources, opts ...RequestOpt) (*Client, error) { + reqCreator, err := RequestCreatorFromConfig(conf, mgr, opts...) if err != nil { return nil, err } - h := Client{ + c := Client{ reqCreator: reqCreator, client: &http.Client{}, metaExtractFilter: conf.ExtractMetadata, @@ -74,26 +73,26 @@ func NewClientFromOldConfig(conf OldConfig, mgr *service.Resources, opts ...Requ mgr: mgr, log: mgr.Logger(), } - h.clientCtx, h.clientCancel = context.WithCancel(context.Background()) + c.clientCtx, c.clientCancel = context.WithCancel(context.Background()) if conf.Timeout > 0 { - h.client.Timeout = conf.Timeout + c.client.Timeout = conf.Timeout } - h.followRedirects = conf.FollowRedirects - if !h.followRedirects { - h.client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + c.followRedirects = conf.FollowRedirects + if !c.followRedirects { + c.client.CheckRedirect = func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse } } if conf.TLSEnabled && conf.TLSConf != nil { - if c, ok := http.DefaultTransport.(*http.Transport); ok { - cloned := c.Clone() + if transport, ok := http.DefaultTransport.(*http.Transport); ok { + cloned := transport.Clone() cloned.TLSClientConfig = conf.TLSConf - h.client.Transport = cloned + c.client.Transport = cloned } else { - h.client.Transport = &http.Transport{ + c.client.Transport = &http.Transport{ TLSClientConfig: conf.TLSConf, } } @@ -102,69 +101,69 @@ func NewClientFromOldConfig(conf OldConfig, mgr *service.Resources, opts ...Requ if conf.ProxyURL != "" { proxyURL, err := url.Parse(conf.ProxyURL) if err != nil { - return nil, fmt.Errorf("failed to parse proxy_url string: %v", err) + return nil, fmt.Errorf("failed to parse proxy_url string: %w", err) } - if h.client.Transport != nil { - if tr, ok := h.client.Transport.(*http.Transport); ok { + if c.client.Transport != nil { + if tr, ok := c.client.Transport.(*http.Transport); ok { tr.Proxy = http.ProxyURL(proxyURL) } else { - return nil, fmt.Errorf("unable to apply proxy_url to transport, unexpected type %T", h.client.Transport) + return nil, fmt.Errorf("unable to apply proxy_url to transport, unexpected type %T", c.client.Transport) } } else { - h.client.Transport = &http.Transport{ + c.client.Transport = &http.Transport{ Proxy: http.ProxyURL(proxyURL), } } } - h.client.Transport, err = newRequestLog(h.client.Transport, h.log, conf.DumpRequestLogLevel) + c.client.Transport, err = newRequestLog(c.client.Transport, c.log, conf.DumpRequestLogLevel) if err != nil { - return nil, fmt.Errorf("failed to config logger for request dump: %v", err) + return nil, fmt.Errorf("failed to config logger for request dump: %w", err) } if conf.DisableHTTP2 { - if c, ok := h.client.Transport.(*http.Transport); ok { - c.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{} + if transport, ok := c.client.Transport.(*http.Transport); ok { + transport.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{} } } - h.client = conf.clientCtor(h.clientCtx, h.client) + c.client = conf.clientCtor(c.clientCtx, c.client) - for _, c := range conf.BackoffOn { - h.backoffOn[c] = struct{}{} + for _, code := range conf.BackoffOn { + c.backoffOn[code] = struct{}{} } - for _, c := range conf.DropOn { - h.dropOn[c] = struct{}{} + for _, code := range conf.DropOn { + c.dropOn[code] = struct{}{} } - for _, c := range conf.SuccessfulOn { - h.successOn[c] = struct{}{} + for _, code := range conf.SuccessfulOn { + c.successOn[code] = struct{}{} } - h.mLatency = h.mgr.Metrics().NewTimer("http_request_latency_ns") - h.mCodes = map[int]*service.MetricCounter{} + c.mLatency = c.mgr.Metrics().NewTimer("http_request_latency_ns") + c.mCodes = map[int]*service.MetricCounter{} - if h.rateLimit = conf.RateLimit; h.rateLimit != "" { - if !h.mgr.HasRateLimit(h.rateLimit) { - return nil, fmt.Errorf("rate limit resource '%v' was not found", h.rateLimit) + if c.rateLimit = conf.RateLimit; c.rateLimit != "" { + if !c.mgr.HasRateLimit(c.rateLimit) { + return nil, fmt.Errorf("rate limit resource %q was not found", c.rateLimit) } } - h.numRetries = conf.NumRetries - h.retryThrottle = throttle.New( + c.numRetries = conf.NumRetries + c.retryThrottle = throttle.New( throttle.OptMaxUnthrottledRetries(0), throttle.OptThrottlePeriod(conf.Retry), throttle.OptMaxExponentPeriod(conf.MaxBackoff), ) - return &h, nil + return &c, nil } //------------------------------------------------------------------------------ -func (h *Client) incrCode(code int) { - h.codesMut.RLock() - ctr, exists := h.mCodes[code] - h.codesMut.RUnlock() +func (c *Client) incrCode(code int) { + c.codesMut.RLock() + ctr, exists := c.mCodes[code] + c.codesMut.RUnlock() if exists { ctr.Incr(1) @@ -175,28 +174,28 @@ func (h *Client) incrCode(code int) { if tier < 0 || tier > 5 { return } - ctr = h.mgr.Metrics().NewCounter(fmt.Sprintf("http_request_code_%vxx", tier)) + ctr = c.mgr.Metrics().NewCounter(fmt.Sprintf("http_request_code_%vxx", tier)) ctr.Incr(1) - h.codesMut.Lock() - h.mCodes[code] = ctr - h.codesMut.Unlock() + c.codesMut.Lock() + c.mCodes[code] = ctr + c.codesMut.Unlock() } -func (h *Client) waitForAccess(ctx context.Context) bool { - if h.rateLimit == "" { +func (c *Client) waitForAccess(ctx context.Context) bool { + if c.rateLimit == "" { return true } for { var period time.Duration var err error - if rerr := h.mgr.AccessRateLimit(ctx, h.rateLimit, func(rl service.RateLimit) { + if rerr := c.mgr.AccessRateLimit(ctx, c.rateLimit, func(rl service.RateLimit) { period, err = rl.Access(ctx) }); rerr != nil { err = rerr } if err != nil { - h.log.Errorf("Rate limit error: %v\n", err) + c.log.Errorf("Rate limit error: %v\n", err) period = time.Second } @@ -213,15 +212,15 @@ func (h *Client) waitForAccess(ctx context.Context) bool { } // ResponseToBatch attempts to parse an HTTP response into a 2D slice of bytes. -func (h *Client) ResponseToBatch(res *http.Response) (service.MessageBatch, error) { +func (c *Client) ResponseToBatch(res *http.Response) (service.MessageBatch, error) { var resMsg service.MessageBatch annotatePart := func(p *service.Message) { p.MetaSetMut("http_status_code", res.StatusCode) - if !h.metaExtractFilter.IsEmpty() { + if !c.metaExtractFilter.IsEmpty() { for k, values := range res.Header { normalisedHeader := strings.ToLower(k) - if len(values) > 0 && h.metaExtractFilter.Match(normalisedHeader) { + if len(values) > 0 && c.metaExtractFilter.Match(normalisedHeader) { if len(values) == 1 { p.MetaSetMut(normalisedHeader, values[0]) } else { @@ -249,7 +248,7 @@ func (h *Client) ResponseToBatch(res *http.Response) (service.MessageBatch, erro var err error if contentType := res.Header.Get("Content-Type"); contentType != "" { if mediaType, params, err = mime.ParseMediaType(contentType); err != nil { - h.log.Warnf("Failed to parse media type from Content-Type header: %v\n", err) + c.log.Warnf("Failed to parse media type from Content-Type header: %v\n", err) } } @@ -257,7 +256,7 @@ func (h *Client) ResponseToBatch(res *http.Response) (service.MessageBatch, erro if !strings.HasPrefix(mediaType, "multipart/") { var bytesRead int64 if bytesRead, err = buffer.ReadFrom(res.Body); err != nil { - h.log.Errorf("Failed to read response: %v\n", err) + c.log.Errorf("Failed to read response: %v\n", err) return resMsg, err } @@ -284,7 +283,7 @@ func (h *Client) ResponseToBatch(res *http.Response) (service.MessageBatch, erro var bytesRead int64 if bytesRead, err = buffer.ReadFrom(p); err != nil { - h.log.Errorf("Failed to read response: %v\n", err) + c.log.Errorf("Failed to read response: %v\n", err) return resMsg, err } @@ -309,17 +308,17 @@ const ( // checkStatus compares a returned status code against configured logic // determining whether the send succeeded, and if not what the retry strategy // should be. -func (h *Client) checkStatus(code int) (succeeded bool, retStrat retryStrategy) { - if _, exists := h.dropOn[code]; exists { +func (c *Client) checkStatus(code int) (succeeded bool, retStrat retryStrategy) { + if _, exists := c.dropOn[code]; exists { return false, noRetry } - if _, exists := h.backoffOn[code]; exists { + if _, exists := c.backoffOn[code]; exists { return false, retryBackoff } - if _, exists := h.successOn[code]; exists { + if _, exists := c.successOn[code]; exists { return true, noRetry } - if !h.followRedirects && code >= 300 && code <= 399 { + if !c.followRedirects && code >= 300 && code <= 399 { return true, noRetry } if code < 200 || code > 299 { @@ -348,10 +347,10 @@ func errorType(err error) string { // SendToResponse attempts to create an HTTP request from a provided message, // performs it, and then returns the *http.Response, allowing the raw response // to be consumed. -func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatch) (res *http.Response, err error) { +func (c *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatch) (res *http.Response, err error) { var spans []*tracing.Span if sendMsg != nil { - sendMsg, spans = tracing.WithChildSpans(h.mgr.OtelTracer(), "http_request", sendMsg) + sendMsg, spans = tracing.WithChildSpans(c.mgr.OtelTracer(), "http_request", sendMsg) defer func() { for _, s := range spans { s.Finish() @@ -369,7 +368,7 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc } var req *http.Request - if req, err = h.reqCreator.Create(sendMsg); err != nil { + if req, err = c.reqCreator.Create(sendMsg); err != nil { logErr(err) return nil, err } @@ -398,7 +397,7 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc } }() - if !h.waitForAccess(ctx) { + if !c.waitForAccess(ctx) { if ctx.Err() != nil { return nil, ctx.Err() } @@ -406,16 +405,16 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc } rateLimited := false - numRetries := h.numRetries + numRetries := c.numRetries startedAt := time.Now() - if res, err = h.client.Do(req.WithContext(ctx)); err == nil { - h.incrCode(res.StatusCode) + if res, err = c.client.Do(req.WithContext(ctx)); err == nil { + c.incrCode(res.StatusCode) for _, s := range spans { s.SetTagInt("http.response.status_code", res.StatusCode) } - if resolved, retryStrat := h.checkStatus(res.StatusCode); !resolved { + if resolved, retryStrat := c.checkStatus(res.StatusCode); !resolved { rateLimited = retryStrat == retryBackoff if retryStrat == noRetry { numRetries = 0 @@ -426,7 +425,7 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc } } } - h.mLatency.Timing(time.Since(startedAt).Nanoseconds()) + c.mLatency.Timing(time.Since(startedAt).Nanoseconds()) i, j := 0, numRetries for i < j && err != nil { @@ -434,25 +433,25 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc for _, s := range spans { s.SetTagInt("http.request.resend_count", i+1) } - if req, err = h.reqCreator.Create(sendMsg); err != nil { + if req, err = c.reqCreator.Create(sendMsg); err != nil { continue } if rateLimited { - if !h.retryThrottle.ExponentialRetryWithContext(ctx) { + if !c.retryThrottle.ExponentialRetryWithContext(ctx) { if ctx.Err() != nil { return nil, ctx.Err() } return nil, errTimedOut } } else { - if !h.retryThrottle.RetryWithContext(ctx) { + if !c.retryThrottle.RetryWithContext(ctx) { if ctx.Err() != nil { return nil, ctx.Err() } return nil, errTimedOut } } - if !h.waitForAccess(ctx) { + if !c.waitForAccess(ctx) { if ctx.Err() != nil { return nil, ctx.Err() } @@ -461,12 +460,12 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc rateLimited = false startedAt = time.Now() - if res, err = h.client.Do(req.WithContext(ctx)); err == nil { - h.incrCode(res.StatusCode) + if res, err = c.client.Do(req.WithContext(ctx)); err == nil { + c.incrCode(res.StatusCode) for _, s := range spans { s.SetTagInt("http.response.status_code", res.StatusCode) } - if resolved, retryStrat := h.checkStatus(res.StatusCode); !resolved { + if resolved, retryStrat := c.checkStatus(res.StatusCode); !resolved { rateLimited = retryStrat == retryBackoff if retryStrat == noRetry { j = 0 @@ -477,7 +476,7 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc } } } - h.mLatency.Timing(time.Since(startedAt).Nanoseconds()) + c.mLatency.Timing(time.Since(startedAt).Nanoseconds()) i++ } if err != nil { @@ -485,7 +484,7 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg service.MessageBatc return nil, err } - h.retryThrottle.Reset() + c.retryThrottle.Reset() return res, nil } @@ -504,16 +503,16 @@ func unexpectedErr(res *http.Response) error { // // If the request is successful then the response is parsed into a message, // including headers added as metadata (when configured to do so). -func (h *Client) Send(ctx context.Context, sendMsg service.MessageBatch) (service.MessageBatch, error) { - res, err := h.SendToResponse(ctx, sendMsg) +func (c *Client) Send(ctx context.Context, sendMsg service.MessageBatch) (service.MessageBatch, error) { + res, err := c.SendToResponse(ctx, sendMsg) if err != nil { return nil, err } - return h.ResponseToBatch(res) + return c.ResponseToBatch(res) } -// Close the client. -func (h *Client) Close(ctx context.Context) error { - h.clientCancel() +// Close closes the client. +func (c *Client) Close(ctx context.Context) error { + c.clientCancel() return nil } diff --git a/internal/httpclient/client_test.go b/public/utils/httpclient/client_test.go similarity index 93% rename from internal/httpclient/client_test.go rename to public/utils/httpclient/client_test.go index 95868a7de..51d232041 100644 --- a/internal/httpclient/client_test.go +++ b/public/utils/httpclient/client_test.go @@ -25,7 +25,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" ) -func clientConfig(t testing.TB, confStr string, args ...any) OldConfig { +func clientConfig(t testing.TB, confStr string, args ...any) Config { t.Helper() spec := service.NewConfigSpec().Field(ConfigField("GET", false)) @@ -51,7 +51,7 @@ retry_period: 1ms retries: 3 `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) defer h.Close(t.Context()) @@ -68,7 +68,7 @@ retry_period: 1ms retries: 3 `) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) _, err = h.Send(t.Context(), service.MessageBatch{service.NewMessage([]byte("test"))}) @@ -96,7 +96,7 @@ func TestHTTPClientSendBasic(t *testing.T) { url: %v `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) for i := 0; i < nTestLoops; i++ { @@ -131,7 +131,7 @@ func TestHTTPClientBadContentType(t *testing.T) { url: %v `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) testMsg := service.MessageBatch{service.NewMessage([]byte("hello world"))} @@ -158,7 +158,7 @@ url: %v drop_on: [ 400 ] `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) testMsg := service.MessageBatch{service.NewMessage([]byte(`{"bar":"baz"}`))} @@ -181,7 +181,7 @@ url: %v successful_on: [ 400 ] `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) testMsg := service.MessageBatch{service.NewMessage([]byte(`{"bar":"baz"}`))} @@ -223,7 +223,7 @@ headers: "Host": "simpleHost.com" `, ts.URL+`/${! json("foo.bar") }`) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) for i := 0; i < nTestLoops; i++ { @@ -282,7 +282,7 @@ func TestHTTPClientSendMultipart(t *testing.T) { url: %v `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) for i := 0; i < nTestLoops; i++ { @@ -323,7 +323,7 @@ func TestHTTPClientReceive(t *testing.T) { url: %v `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) for i := 0; i < nTestLoops; i++ { @@ -367,7 +367,7 @@ metadata: include_prefixes: [ "foo_" ] `, ts.URL+"/testpost") - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) sendMsg := service.MessageBatch{service.NewMessage([]byte("hello world"))} @@ -448,7 +448,7 @@ extract_headers: include_patterns: %v `, url, gabs.Wrap(tt.includePrefixes).String(), gabs.Wrap(tt.includePatterns).String()) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) if err != nil { t.Fatalf("%s: %s", tt.name, err) } @@ -527,7 +527,7 @@ func TestHTTPClientReceiveMultipart(t *testing.T) { url: %v `, ts.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) for i := 0; i < nTestLoops; i++ { @@ -572,7 +572,7 @@ url: %v retries: 0 `, ts.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) _, err = h.Send(t.Context(), service.MessageBatch{ @@ -600,7 +600,7 @@ url: %v proxy_url: %v `, ts.URL+"/testpost", tsProxy.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) resBatch, err := h.Send(t.Context(), service.MessageBatch{ @@ -636,7 +636,7 @@ tls: proxy_url: %v `, ts.URL+"/testpost", tsProxy.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) resBatch, err := h.Send(t.Context(), service.MessageBatch{ @@ -677,7 +677,7 @@ oauth2: client_secret: foosecret `, ts.URL+"/testpost", tsOAuth2.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) resBatch, err := h.Send(t.Context(), service.MessageBatch{ @@ -723,7 +723,7 @@ oauth2: - foorefresh `, ts.URL+"/testpost", tsOAuth2.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) resBatch, err := h.Send(t.Context(), service.MessageBatch{ @@ -767,7 +767,7 @@ oauth2: - refresh_token `, ts.URL+"/testpost", tsOAuth2.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) _, err = h.Send(t.Context(), service.MessageBatch{ @@ -806,7 +806,7 @@ tls: skip_cert_verify: true `, ts.URL+"/testpost", tsOAuth2.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) resBatch, err := h.Send(t.Context(), service.MessageBatch{ @@ -834,7 +834,7 @@ extract_headers: - '^location$' `, ts.URL) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) resBatch, err := h.Send(t.Context(), service.MessageBatch{ @@ -891,7 +891,7 @@ tls: skip_cert_verify: true `, ts.URL, test.disableHTTP2) - h, err := NewClientFromOldConfig(conf, service.MockResources()) + h, err := NewClientFromConfig(conf, service.MockResources()) require.NoError(t, err) dummyMsg := "hello world" diff --git a/internal/httpclient/config.go b/public/utils/httpclient/config.go similarity index 96% rename from internal/httpclient/config.go rename to public/utils/httpclient/config.go index 715f5be9a..3ffc514e2 100644 --- a/internal/httpclient/config.go +++ b/public/utils/httpclient/config.go @@ -122,9 +122,9 @@ func ConfigField(defaultVerb string, forOutput bool, extraChildren ...*service.C //------------------------------------------------------------------------------ -// ConfigFromParsed attempts to parse an http client config struct from a parsed -// plugin config. -func ConfigFromParsed(pConf *service.ParsedConfig) (conf OldConfig, err error) { +// ConfigFromParsed parses an HTTP client configuration from a parsed plugin +// config. +func ConfigFromParsed(pConf *service.ParsedConfig) (conf Config, err error) { if conf.URL, err = pConf.FieldInterpolatedString(hcFieldURL); err != nil { return } @@ -182,8 +182,8 @@ func ConfigFromParsed(pConf *service.ParsedConfig) (conf OldConfig, err error) { return } -// OldConfig is a configuration struct for an HTTP client. -type OldConfig struct { +// Config contains HTTP client configuration parameters. +type Config struct { URL *service.InterpolatedString Verb string Headers map[string]*service.InterpolatedString diff --git a/internal/httpclient/config_test.go b/public/utils/httpclient/config_test.go similarity index 90% rename from internal/httpclient/config_test.go rename to public/utils/httpclient/config_test.go index 07310fbdc..98d72a6f2 100644 --- a/internal/httpclient/config_test.go +++ b/public/utils/httpclient/config_test.go @@ -17,7 +17,7 @@ func TestNewStyleConfigs(t *testing.T) { verbOverride string forOutput bool inputYAML string - validator func(t *testing.T, c *OldConfig) + validator func(t *testing.T, c *Config) }{ { name: "basic fields", @@ -28,7 +28,7 @@ headers: foo1: bar1 foo2: bar2 `, - validator: func(t *testing.T, o *OldConfig) { + validator: func(t *testing.T, o *Config) { sURL, _ := o.URL.Static() assert.Equal(t, "example.com/foo1", sURL) assert.Equal(t, "PUT", o.Verb) @@ -50,7 +50,7 @@ url: example.com/foo2 rate_limit: nah `, verbOverride: "GET", - validator: func(t *testing.T, o *OldConfig) { + validator: func(t *testing.T, o *Config) { sURL, _ := o.URL.Static() assert.Equal(t, "example.com/foo2", sURL) assert.Equal(t, "GET", o.Verb) @@ -65,7 +65,7 @@ successful_on: [ 1, 2, 3 ] backoff_on: [ 4, 5, 6 ] drop_on: [ 7, 8, 9 ] `, - validator: func(t *testing.T, o *OldConfig) { + validator: func(t *testing.T, o *Config) { sURL, _ := o.URL.Static() assert.Equal(t, "example.com/foo3", sURL) assert.Equal(t, []int{1, 2, 3}, o.SuccessfulOn) diff --git a/internal/httpclient/errors.go b/public/utils/httpclient/errors.go similarity index 80% rename from internal/httpclient/errors.go rename to public/utils/httpclient/errors.go index c09f20387..4357b1819 100644 --- a/internal/httpclient/errors.go +++ b/public/utils/httpclient/errors.go @@ -15,8 +15,8 @@ type ErrUnexpectedHTTPRes struct { Body []byte } -// Error returns the Error string. +// Error returns the error message for [ErrUnexpectedHTTPRes]. func (e ErrUnexpectedHTTPRes) Error() string { body := strings.ReplaceAll(string(e.Body), "\n", "") - return fmt.Sprintf("HTTP request returned unexpected response code (%v): %v, Error: %v", e.Code, e.S, body) + return fmt.Sprintf("HTTP request returned unexpected response code (%d): %s, body: %s", e.Code, e.S, body) } diff --git a/internal/httpclient/errors_test.go b/public/utils/httpclient/errors_test.go similarity index 88% rename from internal/httpclient/errors_test.go rename to public/utils/httpclient/errors_test.go index fcd3ad840..23af91e56 100644 --- a/internal/httpclient/errors_test.go +++ b/public/utils/httpclient/errors_test.go @@ -11,7 +11,7 @@ func TestHTTPError(t *testing.T) { Body: []byte("test body str"), } - exp, act := `HTTP request returned unexpected response code (0): test str, Error: test body str`, err.Error() + exp, act := `HTTP request returned unexpected response code (0): test str, body: test body str`, err.Error() if exp != act { t.Errorf("Wrong Error() from ErrUnexpectedHTTPRes: %v != %v", exp, act) } diff --git a/internal/httpclient/logger.go b/public/utils/httpclient/logger.go similarity index 96% rename from internal/httpclient/logger.go rename to public/utils/httpclient/logger.go index 370fea1eb..0eb94c1b4 100644 --- a/internal/httpclient/logger.go +++ b/public/utils/httpclient/logger.go @@ -49,7 +49,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { ) var ( - reqBodyCaptured interface{} + reqBodyCaptured any reqBodyBuf = &bytes.Buffer{} reqBodyErr error ) @@ -80,7 +80,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { } var ( - respBodyCaptured interface{} + respBodyCaptured any respBodyBuf = &bytes.Buffer{} respErrBody error ) @@ -155,11 +155,10 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return respOriginal, roundTripErr } -var toSimpleMap = func(h http.Header) map[string]string { +func toSimpleMap(h http.Header) map[string]string { out := map[string]string{} for k, v := range h { out[k] = strings.Join(v, " ") } - return out } diff --git a/internal/httpclient/logger_test.go b/public/utils/httpclient/logger_test.go similarity index 100% rename from internal/httpclient/logger_test.go rename to public/utils/httpclient/logger_test.go diff --git a/internal/httpclient/request.go b/public/utils/httpclient/request.go similarity index 95% rename from internal/httpclient/request.go rename to public/utils/httpclient/request.go index 05a684e53..8a13de782 100644 --- a/internal/httpclient/request.go +++ b/public/utils/httpclient/request.go @@ -46,10 +46,9 @@ type RequestCreator struct { // RequestOpt represents a customisation of a request creator. type RequestOpt func(r *RequestCreator) -// RequestCreatorFromOldConfig creates a new request creator from an old struct -// style config. Eventually I'd like to phase these out for the more dynamic -// service style parses, but it'll take a while so we have this for now. -func RequestCreatorFromOldConfig(conf OldConfig, mgr *service.Resources, opts ...RequestOpt) (*RequestCreator, error) { +// RequestCreatorFromConfig creates a new request creator from the provided +// configuration. +func RequestCreatorFromConfig(conf Config, mgr *service.Resources, opts ...RequestOpt) (*RequestCreator, error) { r := &RequestCreator{ fs: mgr.FS(), url: conf.URL, diff --git a/internal/httpclient/request_test.go b/public/utils/httpclient/request_test.go similarity index 92% rename from internal/httpclient/request_test.go rename to public/utils/httpclient/request_test.go index cb7ef764b..5592c9f7f 100644 --- a/internal/httpclient/request_test.go +++ b/public/utils/httpclient/request_test.go @@ -25,7 +25,7 @@ metadata: oldConf, err := ConfigFromParsed(parsed) require.NoError(t, err) - reqCreator, err := RequestCreatorFromOldConfig(oldConf, service.MockResources()) + reqCreator, err := RequestCreatorFromConfig(oldConf, service.MockResources()) require.NoError(t, err) part := service.NewMessage([]byte("hello world"))