diff --git a/plugin/input/kafka/README.md b/plugin/input/kafka/README.md
index ff4ebbbeb..7947c8b89 100755
--- a/plugin/input/kafka/README.md
+++ b/plugin/input/kafka/README.md
@@ -81,7 +81,7 @@ If set, the plugin will use SASL authentications mechanism.
-**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512`*
+**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|OAUTHBEARER`*
SASL mechanism to use.
@@ -99,6 +99,15 @@ SASL password.
+**`sasl_oauth`** *`xoauth.Config`*
+
+SASL OAuth config.
+* `client_id` - client ID
+* `client_secret` - client secret
+* `token_url` - token url
+
+
+
**`is_ssl_enabled`** *`bool`* *`default=false`*
If set, the plugin will use SSL/TLS connections method.
diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go
index 8ea76b154..94d4c6e24 100644
--- a/plugin/input/kafka/kafka.go
+++ b/plugin/input/kafka/kafka.go
@@ -10,6 +10,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/xoauth"
"github.com/ozontech/file.d/xscram"
"github.com/ozontech/file.d/xtls"
"github.com/prometheus/client_golang/prometheus"
@@ -120,7 +121,7 @@ type Config struct {
// > @3@4@5@6
// >
// > SASL mechanism to use.
- SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512"` // *
+ SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|OAUTHBEARER"` // *
// > @3@4@5@6
// >
@@ -132,6 +133,14 @@ type Config struct {
// > SASL password.
SaslPassword string `json:"sasl_password" default:"password"` // *
+ // > @3@4@5@6
+ // >
+ // > SASL OAuth config.
+ // > * `client_id` - client ID
+ // > * `client_secret` - client secret
+ // > * `token_url` - token url
+ SaslOauth xoauth.Config `json:"sasl_oauth" child:"true"` // *
+
// > @3@4@5@6
// >
// > If set, the plugin will use SSL/TLS connections method.
@@ -241,6 +250,12 @@ func NewConsumerGroup(c *Config, l *zap.SugaredLogger) sarama.ConsumerGroup {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA256) }
case sarama.SASLTypeSCRAMSHA512:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA512) }
+ case sarama.SASLTypeOAuth:
+ provider, err := xoauth.NewSaramaTokenProvider(c.SaslOauth)
+ if err != nil {
+ l.Fatalf("can't create OAuth token provider: %s", err.Error())
+ }
+ config.Net.SASL.TokenProvider = provider
}
}
diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md
index 767a723f9..ca054e9a7 100755
--- a/plugin/output/kafka/README.md
+++ b/plugin/output/kafka/README.md
@@ -114,6 +114,15 @@ SASL password.
+**`sasl_oauth`** *`xoauth.Config`*
+
+SASL OAuth config.
+* `client_id` - client ID
+* `client_secret` - client secret
+* `token_url` - token url
+
+
+
**`is_ssl_enabled`** *`bool`* *`default=false`*
If set, the plugin will use SSL/TLS connections method.
diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go
index 3edbe4afc..179089230 100644
--- a/plugin/output/kafka/kafka.go
+++ b/plugin/output/kafka/kafka.go
@@ -10,6 +10,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/xoauth"
"github.com/ozontech/file.d/xscram"
"github.com/ozontech/file.d/xtls"
"github.com/prometheus/client_golang/prometheus"
@@ -146,6 +147,14 @@ type Config struct {
// > SASL password.
SaslPassword string `json:"sasl_password" default:"password"` // *
+ // > @3@4@5@6
+ // >
+ // > SASL OAuth config.
+ // > * `client_id` - client ID
+ // > * `client_secret` - client secret
+ // > * `token_url` - token url
+ SaslOauth xoauth.Config `json:"sasl_oauth" child:"true"` // *
+
// > @3@4@5@6
// >
// > If set, the plugin will use SSL/TLS connections method.
@@ -322,6 +331,12 @@ func NewProducer(c *Config, l *zap.SugaredLogger) sarama.SyncProducer {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA256) }
case sarama.SASLTypeSCRAMSHA512:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return xscram.NewClient(xscram.SHA512) }
+ case sarama.SASLTypeOAuth:
+ provider, err := xoauth.NewSaramaTokenProvider(c.SaslOauth)
+ if err != nil {
+ l.Fatalf("can't create OAuth token provider: %s", err.Error())
+ }
+ config.Net.SASL.TokenProvider = provider
}
}
diff --git a/xoauth/config.go b/xoauth/config.go
new file mode 100644
index 000000000..255f31bb6
--- /dev/null
+++ b/xoauth/config.go
@@ -0,0 +1,22 @@
+package xoauth
+
+import "errors"
+
+type Config struct {
+ ClientID string `json:"client_id"`
+ ClientSecret string `json:"client_secret"`
+ TokenURL string `json:"token_url"`
+}
+
+func (c Config) validate() error {
+ if c.ClientID == "" {
+ return errors.New("'client_id' must be set")
+ }
+ if c.ClientSecret == "" {
+ return errors.New("'client_secret' must be set")
+ }
+ if c.TokenURL == "" {
+ return errors.New("'token_url' must be set")
+ }
+ return nil
+}
diff --git a/xoauth/sarama_token_provider.go b/xoauth/sarama_token_provider.go
new file mode 100644
index 000000000..0ad0327ab
--- /dev/null
+++ b/xoauth/sarama_token_provider.go
@@ -0,0 +1,42 @@
+package xoauth
+
+import (
+ "context"
+
+ "github.com/Shopify/sarama"
+ "golang.org/x/oauth2"
+ cred "golang.org/x/oauth2/clientcredentials"
+)
+
+// saramaTokenProvider implements sarama.AccessTokenProvider
+type saramaTokenProvider struct {
+ tokenSource oauth2.TokenSource
+}
+
+// NewSaramaTokenProvider creates a new sarama.AccessTokenProvider with the provided clientID and clientSecret.
+// The provided tokenURL is used to perform the 2 legged client credentials flow.
+func NewSaramaTokenProvider(cfg Config) (sarama.AccessTokenProvider, error) {
+ if err := cfg.validate(); err != nil {
+ return nil, err
+ }
+
+ credCfg := cred.Config{
+ ClientID: cfg.ClientID,
+ ClientSecret: cfg.ClientSecret,
+ TokenURL: cfg.TokenURL,
+ }
+
+ return &saramaTokenProvider{
+ tokenSource: credCfg.TokenSource(context.Background()),
+ }, nil
+}
+
+// Token returns a new sarama.AccessToken
+func (t *saramaTokenProvider) Token() (*sarama.AccessToken, error) {
+ token, err := t.tokenSource.Token()
+ if err != nil {
+ return nil, err
+ }
+
+ return &sarama.AccessToken{Token: token.AccessToken}, nil
+}