Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/impl/io/input_http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"sync"

"github.com/redpanda-data/benthos/v4/internal/component"
"github.com/redpanda-data/benthos/v4/internal/httpclient"
"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service/codec"
"github.com/redpanda-data/benthos/v4/public/utils/httpclient"
)

func httpClientInputSpec() *service.ConfigSpec {
Expand Down Expand Up @@ -132,7 +132,7 @@ func newHTTPClientInputFromParsed(conf *service.ParsedConfig, mgr *service.Resou
return nil, err
}

client, err := httpclient.NewClientFromOldConfig(oldConf, mgr, httpclient.WithExplicitBody(payloadExpr))
client, err := httpclient.NewClientFromConfig(oldConf, mgr, httpclient.WithExplicitBody(payloadExpr))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/io/output_http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package io
import (
"context"

"github.com/redpanda-data/benthos/v4/internal/httpclient"
"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/utils/httpclient"
)

func httpClientOutputSpec() *service.ConfigSpec {
Expand Down Expand Up @@ -111,7 +111,7 @@ func newHTTPClientOutputFromParsed(conf *service.ParsedConfig, mgr *service.Reso
return nil, err
}

client, err := httpclient.NewClientFromOldConfig(oldHTTPConf, mgr, opts...)
client, err := httpclient.NewClientFromConfig(oldHTTPConf, mgr, opts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/io/processor_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"errors"
"fmt"

"github.com/redpanda-data/benthos/v4/internal/httpclient"
"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/utils/httpclient"
)

func httpProcSpec() *service.ConfigSpec {
Expand Down Expand Up @@ -103,7 +103,7 @@ func newHTTPProcFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
asMultipart: asMultipart,
parallel: parallel,
}
if g.client, err = httpclient.NewClientFromOldConfig(oldConf, mgr); err != nil {
if g.client, err = httpclient.NewClientFromConfig(oldConf, mgr); err != nil {
return nil, err
}
return g, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/pure/output_drop_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ drop_on:
t.Fatal("timed out")
}

assert.EqualError(t, res, fmt.Sprintf("%s: HTTP request returned unexpected response code (403): 403 Forbidden, Error: test error", ts.URL))
assert.EqualError(t, res, fmt.Sprintf("%s: HTTP request returned unexpected response code (403): 403 Forbidden, body: test error", ts.URL))
}

func TestDropOnError(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,37 @@ type oauth2Config struct {
EndpointParams map[string][]string
}

// Client returns an http.Client with OAuth2 configured.
func (oauth oauth2Config) Client(ctx context.Context, base *http.Client) *http.Client {
if !oauth.Enabled {
// Client returns an [http.Client] with OAuth2 configured.
func (ac oauth2Config) Client(ctx context.Context, base *http.Client) *http.Client {
if !ac.Enabled {
return base
}

// Support for refresh_token grant type with bootstrapped refresh token to obtain access token
if gt, ok := oauth.EndpointParams["grant_type"]; ok && gt[0] == "refresh_token" {
if gt, ok := ac.EndpointParams["grant_type"]; ok && gt[0] == "refresh_token" {
conf := &oauth2.Config{
ClientID: oauth.ClientKey,
ClientSecret: oauth.ClientSecret,
ClientID: ac.ClientKey,
ClientSecret: ac.ClientSecret,
Endpoint: oauth2.Endpoint{
TokenURL: oauth.TokenURL,
TokenURL: ac.TokenURL,
AuthStyle: oauth2.AuthStyleAutoDetect,
},
Scopes: oauth.Scopes,
Scopes: ac.Scopes,
}
// We don't consider bootstrapped access token if any as it might be expired, rather we generate a new one
token := &oauth2.Token{}
if rt, ok := oauth.EndpointParams["refresh_token"]; ok {
if rt, ok := ac.EndpointParams["refresh_token"]; ok {
token.RefreshToken = rt[0]
}
return conf.Client(context.WithValue(ctx, oauth2.HTTPClient, base), token)
}

conf := &clientcredentials.Config{
ClientID: oauth.ClientKey,
ClientSecret: oauth.ClientSecret,
TokenURL: oauth.TokenURL,
Scopes: oauth.Scopes,
EndpointParams: oauth.EndpointParams,
ClientID: ac.ClientKey,
ClientSecret: ac.ClientSecret,
TokenURL: ac.TokenURL,
Scopes: ac.Scopes,
EndpointParams: ac.EndpointParams,
}

return conf.Client(context.WithValue(ctx, oauth2.HTTPClient, base))
Expand Down Expand Up @@ -138,33 +138,33 @@ func oauth2ClientCtorFromParsed(conf *service.ParsedConfig) (res func(context.Co
}
conf = conf.Namespace(aFieldOAuth2)

var oldConf oauth2Config
if oldConf.Enabled, err = conf.FieldBool(ao2FieldEnabled); err != nil {
var oauthConf oauth2Config
if oauthConf.Enabled, err = conf.FieldBool(ao2FieldEnabled); err != nil {
return
}
if oldConf.ClientKey, err = conf.FieldString(ao2FieldClientKey); err != nil {
if oauthConf.ClientKey, err = conf.FieldString(ao2FieldClientKey); err != nil {
return
}
if oldConf.ClientSecret, err = conf.FieldString(ao2FieldClientSecret); err != nil {
if oauthConf.ClientSecret, err = conf.FieldString(ao2FieldClientSecret); err != nil {
return
}
if oldConf.TokenURL, err = conf.FieldString(ao2FieldTokenURL); err != nil {
if oauthConf.TokenURL, err = conf.FieldString(ao2FieldTokenURL); err != nil {
return
}
if oldConf.Scopes, err = conf.FieldStringList(ao2FieldScopes); err != nil {
if oauthConf.Scopes, err = conf.FieldStringList(ao2FieldScopes); err != nil {
return
}
var endpointParams map[string]*service.ParsedConfig
if endpointParams, err = conf.FieldAnyMap(ao2FieldEndpointParams); err != nil {
return
}
oldConf.EndpointParams = map[string][]string{}
oauthConf.EndpointParams = map[string][]string{}
for k, v := range endpointParams {
if oldConf.EndpointParams[k], err = v.FieldStringList(); err != nil {
if oauthConf.EndpointParams[k], err = v.FieldStringList(); err != nil {
return
}
}

res = oldConf.Client
res = oauthConf.Client
return
}
Loading
Loading