From bfe62f8d4e4e42cb8be7a03b948302d1223cddaa Mon Sep 17 00:00:00 2001 From: Eng Keong <5253775+engkeong@users.noreply.github.com> Date: Wed, 3 Jun 2026 19:08:37 +0800 Subject: [PATCH 1/2] feat: Add OAuth2 client credentials support for Kafka SASL - Implements dynamic token renewal for OAUTHBEARER mechanism - Supports OAuth2 client credentials grant type - Compatible with Microsoft Entra, Okta, Auth0, and Keycloak - Adds configurable scopes and endpoint parameters - Includes comprehensive test coverage for OAuth2 flow The new oauth2 configuration allows automatic token management without requiring manual token updates, improving reliability and reducing operational overhead. --- internal/impl/kafka/sasl.go | 80 +++++++++++++++++ internal/impl/kafka/sasl_oauth_test.go | 118 +++++++++++++++++++++++++ 2 files changed, 198 insertions(+) create mode 100644 internal/impl/kafka/sasl_oauth_test.go diff --git a/internal/impl/kafka/sasl.go b/internal/impl/kafka/sasl.go index f9da84dbc0..a1022e529c 100644 --- a/internal/impl/kafka/sasl.go +++ b/internal/impl/kafka/sasl.go @@ -24,6 +24,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/aws/config" + connectoauth2 "github.com/redpanda-data/connect/v4/internal/oauth2" "github.com/redpanda-data/connect/v4/internal/serviceaccount" "github.com/twmb/franz-go/pkg/sasl" @@ -64,6 +65,32 @@ func SASLFields() *service.ConfigField { service.NewStringMapField("extensions"). Description("Key/value pairs to add to OAUTHBEARER authentication requests."). Optional(), + service.NewObjectField("oauth2", + service.NewBoolField("enabled"). + Description("Whether to use OAuth2 client credentials for dynamic token renewal."). + Default(false), + service.NewStringField("client_key"). + Description("The OAuth2 client ID."). + Default(""), + service.NewStringField("client_secret"). + Description("The OAuth2 client secret."). + Default(""). + Secret(), + service.NewStringField("token_url"). + Description("The URL of the token provider."). + Default(""), + service.NewStringListField("scopes"). + Description("Optional list of requested permissions."). + Default([]any{}). + Advanced(), + service.NewStringMapField("endpoint_params"). + Description("Optional additional parameters for the token endpoint (e.g. audience)."). + Optional(). + Advanced(), + ). + Description("Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak."). + Optional(). + Advanced(), service.NewObjectField("aws", config.SessionFields()...). Description("Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`."). Optional(), @@ -151,6 +178,59 @@ func plainSaslFromConfig(c *service.ParsedConfig) (sasl.Mechanism, error) { } func oauthSaslFromConfig(c *service.ParsedConfig) (sasl.Mechanism, error) { + if c.Contains("oauth2") { + oauth2Parsed := c.Namespace("oauth2") + enabled, err := oauth2Parsed.FieldBool("enabled") + if err != nil { + return nil, err + } + if enabled { + clientKey, err := oauth2Parsed.FieldString("client_key") + if err != nil { + return nil, err + } + clientSecret, err := oauth2Parsed.FieldString("client_secret") + if err != nil { + return nil, err + } + tokenURL, err := oauth2Parsed.FieldString("token_url") + if err != nil { + return nil, err + } + scopes, err := oauth2Parsed.FieldStringList("scopes") + if err != nil { + return nil, err + } + + oauth2Conf := connectoauth2.Config{ + Enabled: true, + ClientKey: clientKey, + ClientSecret: clientSecret, + TokenURL: tokenURL, + Scopes: scopes, + } + if oauth2Parsed.Contains("endpoint_params") { + ep, err := oauth2Parsed.FieldStringMap("endpoint_params") + if err != nil { + return nil, err + } + oauth2Conf.EndpointParams = make(map[string][]string, len(ep)) + for k, v := range ep { + oauth2Conf.EndpointParams[k] = []string{v} + } + } + + tokenSource := oauth2Conf.TokenSource(context.Background()) + return oauth.Oauth(func(context.Context) (oauth.Auth, error) { + t, err := tokenSource.Token() + if err != nil { + return oauth.Auth{}, err + } + return oauth.Auth{Token: t.AccessToken}, nil + }), nil + } + } + token, err := c.FieldString("token") if err != nil { return nil, err diff --git a/internal/impl/kafka/sasl_oauth_test.go b/internal/impl/kafka/sasl_oauth_test.go new file mode 100644 index 0000000000..860f27a408 --- /dev/null +++ b/internal/impl/kafka/sasl_oauth_test.go @@ -0,0 +1,118 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +func TestOAuthSASLFromConfigOAuth2ClientCredentials(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodPost, r.Method) + require.NoError(t, r.ParseForm()) + require.Equal(t, "client_credentials", r.PostForm.Get("grant_type")) + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"access_token":"test-token","token_type":"Bearer","expires_in":3600}`) + })) + t.Cleanup(testServer.Close) + + mConf := parseFirstSASLObjectConfig(t, ` +sasl: + - mechanism: OAUTHBEARER + oauth2: + enabled: true + client_key: test-client + client_secret: test-secret + token_url: `+testServer.URL+` + scopes: + - scope-a + endpoint_params: + audience: test-audience +`) + + mechanism, err := oauthSaslFromConfig(mConf) + require.NoError(t, err) + + _, authBytes, err := mechanism.Authenticate(context.Background(), "localhost:9092") + require.NoError(t, err) + require.Contains(t, string(authBytes), "test-token") +} + +func TestOAuthSASLFromConfigStaticTokenFallback(t *testing.T) { + tests := []struct { + name string + yaml string + expectedToken string + }{ + { + name: "oauth2 absent", + yaml: ` +sasl: + - mechanism: OAUTHBEARER + token: static-token-1 +`, + expectedToken: "static-token-1", + }, + { + name: "oauth2 disabled", + yaml: ` +sasl: + - mechanism: OAUTHBEARER + token: static-token-2 + oauth2: + enabled: false + client_key: ignored + client_secret: ignored + token_url: https://example.invalid/token +`, + expectedToken: "static-token-2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mConf := parseFirstSASLObjectConfig(t, tt.yaml) + mechanism, err := oauthSaslFromConfig(mConf) + require.NoError(t, err) + + _, authBytes, err := mechanism.Authenticate(context.Background(), "localhost:9092") + require.NoError(t, err) + require.Contains(t, string(authBytes), tt.expectedToken) + }) + } +} + +func parseFirstSASLObjectConfig(t *testing.T, rawYAML string) *service.ParsedConfig { + t.Helper() + + spec := service.NewConfigSpec().Field(SASLFields()) + parsed, err := spec.ParseYAML(rawYAML, nil) + require.NoError(t, err) + + saslObjs, err := parsed.FieldObjectList("sasl") + require.NoError(t, err) + require.Len(t, saslObjs, 1) + + return saslObjs[0] +} From b234e07483bce137d8dcf954f773ad8718df184d Mon Sep 17 00:00:00 2001 From: engkeong <5253775+engkeong@users.noreply.github.com> Date: Fri, 19 Jun 2026 13:33:16 +0800 Subject: [PATCH 2/2] docs: regenerate for oauth2 SASL fields --- .../components/pages/caches/redpanda.adoc | 66 +++++++++++++++++++ .../components/pages/inputs/redpanda.adoc | 66 +++++++++++++++++++ .../pages/inputs/redpanda_migrator.adoc | 66 +++++++++++++++++++ .../components/pages/outputs/kafka_franz.adoc | 66 +++++++++++++++++++ .../components/pages/outputs/redpanda.adoc | 66 +++++++++++++++++++ .../pages/outputs/redpanda_migrator.adoc | 66 +++++++++++++++++++ .../components/pages/redpanda/about.adoc | 66 +++++++++++++++++++ .../components/pages/tracers/redpanda.adoc | 66 +++++++++++++++++++ 8 files changed, 528 insertions(+) diff --git a/docs/modules/components/pages/caches/redpanda.adoc b/docs/modules/components/pages/caches/redpanda.adoc index 4746c4dae7..9b988907df 100644 --- a/docs/modules/components/pages/caches/redpanda.adoc +++ b/docs/modules/components/pages/caches/redpanda.adoc @@ -364,6 +364,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`. diff --git a/docs/modules/components/pages/inputs/redpanda.adoc b/docs/modules/components/pages/inputs/redpanda.adoc index 7068941f75..a3b431f179 100644 --- a/docs/modules/components/pages/inputs/redpanda.adoc +++ b/docs/modules/components/pages/inputs/redpanda.adoc @@ -444,6 +444,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`. diff --git a/docs/modules/components/pages/inputs/redpanda_migrator.adoc b/docs/modules/components/pages/inputs/redpanda_migrator.adoc index c3f7bad040..b83be08f9d 100644 --- a/docs/modules/components/pages/inputs/redpanda_migrator.adoc +++ b/docs/modules/components/pages/inputs/redpanda_migrator.adoc @@ -427,6 +427,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`. diff --git a/docs/modules/components/pages/outputs/kafka_franz.adoc b/docs/modules/components/pages/outputs/kafka_franz.adoc index 1804eb23f4..b00af4c5fc 100644 --- a/docs/modules/components/pages/outputs/kafka_franz.adoc +++ b/docs/modules/components/pages/outputs/kafka_franz.adoc @@ -399,6 +399,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`. diff --git a/docs/modules/components/pages/outputs/redpanda.adoc b/docs/modules/components/pages/outputs/redpanda.adoc index b0cc38a11c..493f2f119b 100644 --- a/docs/modules/components/pages/outputs/redpanda.adoc +++ b/docs/modules/components/pages/outputs/redpanda.adoc @@ -436,6 +436,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`. diff --git a/docs/modules/components/pages/outputs/redpanda_migrator.adoc b/docs/modules/components/pages/outputs/redpanda_migrator.adoc index 4d1c62f411..dc42b21a16 100644 --- a/docs/modules/components/pages/outputs/redpanda_migrator.adoc +++ b/docs/modules/components/pages/outputs/redpanda_migrator.adoc @@ -616,6 +616,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`. diff --git a/docs/modules/components/pages/redpanda/about.adoc b/docs/modules/components/pages/redpanda/about.adoc index f87bfb7f80..281aa7337a 100644 --- a/docs/modules/components/pages/redpanda/about.adoc +++ b/docs/modules/components/pages/redpanda/about.adoc @@ -361,6 +361,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`. diff --git a/docs/modules/components/pages/tracers/redpanda.adoc b/docs/modules/components/pages/tracers/redpanda.adoc index c50e8281f9..8e8f111cf8 100644 --- a/docs/modules/components/pages/tracers/redpanda.adoc +++ b/docs/modules/components/pages/tracers/redpanda.adoc @@ -409,6 +409,72 @@ Key/value pairs to add to OAUTHBEARER authentication requests. *Type*: `object` +=== `sasl[].oauth2` + +Allows you to specify OAuth2 client credentials for automatic token renewal on OAUTHBEARER. Compatible with any OAuth2 identity provider including Microsoft Entra, Okta, Auth0, and Keycloak. + + +*Type*: `object` + + +=== `sasl[].oauth2.enabled` + +Whether to use OAuth2 client credentials for dynamic token renewal. + + +*Type*: `bool` + +*Default*: `false` + +=== `sasl[].oauth2.client_key` + +The OAuth2 client ID. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.client_secret` + +The OAuth2 client secret. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.token_url` + +The URL of the token provider. + + +*Type*: `string` + +*Default*: `""` + +=== `sasl[].oauth2.scopes` + +Optional list of requested permissions. + + +*Type*: `array` + +*Default*: `[]` + +=== `sasl[].oauth2.endpoint_params` + +Optional additional parameters for the token endpoint (e.g. audience). + + +*Type*: `object` + + === `sasl[].aws` Contains AWS specific fields for when the `mechanism` is set to `AWS_MSK_IAM`.