Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS.

## Configuration Parameters

| Configuration Key Name | Description | Mandatory |
| ---------------------- | -------------------------------------------------------- | --------- |
| QueueUrl | the queue url in your aws account | yes |
| QueueRegion | the queue region in your aws account | yes |
| PluginTagAttribute | attribute name of the message tag | no |
| QueueMessageGroupId | the group id required for fifo queues | fifo-only |
| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no |
| BatchSize | set amount of messages to be sent in a batch request | yes |
| Configuration Key Name | Description | Mandatory |
| ---------------------- | ------------------------------------------------------------- | --------- |
| QueueUrl | the queue url in your aws account | yes |
| QueueRegion | the queue region in your aws account | yes |
| SQSEndpoint | the SQS endpoint to connect to to configure AWS client | no |
| PluginTagAttribute | attribute name of the message tag | no |
| QueueMessageGroupId | the group id required for fifo queues | fifo-only |
| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no |
| BatchSize | set amount of messages to be sent in a batch request | yes |
| FlushPendingRecords | if true, pending records will be sent following `Flush` value | no |

```conf
[SERVICE]
Expand Down Expand Up @@ -65,7 +67,7 @@ ENTRYPOINT ["/fluent-bit/bin/fluent-bit"]
CMD ["-c", "/fluent-bit/etc/some_configuration.conf", "-e", "/fluent-bit/bin/fluentBit-sqs-plugin.so"]
```

More information about the usage and installation of golang plugins can be found here: https://docs.fluentbit.io/manual/development/golang_plugins
More information about the usage and installation of golang plugins can be found here: https://docs.fluentbit.io/manual/development/golang-output-plugins

## Special Notes

Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
module github.com/PayU/fluentBit-sqs-plugin

go 1.14

require (
github.com/aws/aws-sdk-go v1.44.285
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
)
51 changes: 51 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
github.com/aws/aws-sdk-go v1.44.285 h1:rgoWYl+NdmKzRgoi/fZLEtGXOjCkcWIa5jPH02Uahdo=
github.com/aws/aws-sdk-go v1.44.285/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c h1:yKN46XJHYC/gvgH2UsisJ31+n4K3S7QYZSfU2uAWjuI=
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
87 changes: 66 additions & 21 deletions out_sqs.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build linux

package main

import (
Expand Down Expand Up @@ -35,43 +37,54 @@ var MessageCounter int = 0
var SqsRecords []*sqs.SendMessageBatchRequestEntry

type sqsConfig struct {
endpoint string
queueURL string
queueMessageGroupID string
mySQS *sqs.SQS
pluginTagAttribute string
proxyURL string
batchSize int
flushPendingRecords bool
}

//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer) int {
setLogLevel()
return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin")
return output.FLBPluginRegister(def, "sqs", "AWS SQS output plugin")
}

//export FLBPluginInit
func FLBPluginInit(plugin unsafe.Pointer) int {
endpoint := output.FLBPluginConfigKey(plugin, "SQSEndpoint")
queueURL := output.FLBPluginConfigKey(plugin, "QueueUrl")
queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion")
queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId")
pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute")
proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl")
batchSizeString := output.FLBPluginConfigKey(plugin, "BatchSize")
flushPendingRecordsString := output.FLBPluginConfigKey(plugin, "FlushPendingRecords")

writeInfoLog(fmt.Sprintf("SQSEndpoint is: %s", endpoint))
writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL))
writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion))
writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID))
writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute))
writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL))
writeInfoLog(fmt.Sprintf("BatchSize is: %s", batchSizeString))
writeInfoLog(fmt.Sprintf("flushPendingRecords is: %s", flushPendingRecordsString))

if endpoint == "" {
endpoint = fmt.Sprintf("sqs.%s.amazonaws.com", queueRegion)
writeInfoLog(fmt.Sprintf("Using default regional AWS endpoint: %s", endpoint))
}

if queueURL == "" {
writeErrorLog(errors.New("QueueUrl configuration key is mandatory"))
writeErrorLog(errors.New("QueueUrl configuration key is mandatory."))
return output.FLB_ERROR
}

if queueRegion == "" {
writeErrorLog(errors.New("QueueRegion configuration key is mandatory"))
writeErrorLog(errors.New("QueueRegion configuration key is mandatory."))
return output.FLB_ERROR
}

Expand All @@ -84,11 +97,21 @@ func FLBPluginInit(plugin unsafe.Pointer) int {

batchSize, err := strconv.Atoi(batchSizeString)
if err != nil || (0 > batchSize && batchSize > 10) {
writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10"))
writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10."))
return output.FLB_ERROR
}

if flushPendingRecordsString == "" {
writeInfoLog("Flushing pending records by default.")
flushPendingRecordsString = "true"
}
flushPendingRecords, err := strconv.ParseBool(flushPendingRecordsString)
if err != nil {
writeErrorLog(errors.New(fmt.Sprintf("Cannot set flushPendingRecords as boolean; got %s", flushPendingRecordsString)))
return output.FLB_ERROR
}

writeInfoLog("retrieving aws credentials from environment variables")
writeInfoLog("Retrieving AWS credentials from environment variables...")
awsCredentials := credentials.NewEnvCredentials()
var myAWSSession *session.Session
var sessionError error
Expand All @@ -97,23 +120,24 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
// Retrieve the credentials value
_, credError := awsCredentials.Get()
if credError != nil {
writeInfoLog("unable to find aws credentials from environment variables..using credentials chain")
writeInfoLog("Unable to find AWS credentials from environment variables... Trying credentials chain")
awsConfig = &aws.Config{
Region: aws.String(queueRegion),
CredentialsChainVerboseErrors: aws.Bool(true),
}
} else {
writeInfoLog("environment variables credentials where found")
writeInfoLog("AWS credentials found in environment")
awsConfig = &aws.Config{
Region: aws.String(queueRegion),
CredentialsChainVerboseErrors: aws.Bool(true),
Credentials: awsCredentials,
Endpoint: aws.String(endpoint),
}
}

// if proxy
if proxyURL != "" {
writeInfoLog("set http client struct on aws configuration since proxy url has been found")
writeInfoLog("Configuring AWS HTTP client using the provided proxy URL...")
awsConfig.HTTPClient = &http.Client{
Transport: &http.Transport{
Proxy: func(*http.Request) (*url.URL, error) {
Expand All @@ -129,6 +153,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
writeErrorLog(sessionError)
return output.FLB_ERROR
}
writeInfoLog("AWS session created.")

// Set the context to point to any Go variable
output.FLBPluginSetContext(plugin, &sqsConfig{
Expand All @@ -137,8 +162,11 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
mySQS: sqs.New(myAWSSession),
pluginTagAttribute: pluginTagAttribute,
batchSize: batchSize,
flushPendingRecords: flushPendingRecords,
})

writeInfoLog("Fluentbit context populated.")

return output.FLB_OK
}

Expand All @@ -149,11 +177,14 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
var record map[interface{}]interface{}
var sqsRecord *sqs.SendMessageBatchRequestEntry


writeDebugLog("===== Entering FLBPluginFlushCtx() =====")

// Type assert context back into the original type for the Go variable
sqsConf, ok := output.FLBPluginGetContext(ctx).(*sqsConfig)

if !ok {
writeErrorLog(errors.New("Unexpected error during get plugin context in flush function"))
writeErrorLog(errors.New("Unexpected error from FLBPluginGetContext()."))
return output.FLB_ERROR
}

Expand All @@ -168,10 +199,10 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
break
}

writeDebugLog(fmt.Sprintf("got new record from input. record length is: %d", len(record)))
writeDebugLog(fmt.Sprintf("Got new record from input (length : %d)", len(record)))

if len(record) == 0 {
writeInfoLog("got empty record from input. skipping it")
writeInfoLog("Got empty record from input. Skipping it.")
continue
}

Expand All @@ -183,12 +214,12 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
case uint64:
timeStamp = time.Unix(int64(t), 0)
default:
writeInfoLog("given time is not in a known format, defaulting to now")
writeInfoLog("The record time format is unknown, defaulting to now")
timeStamp = time.Now()
}

tagStr := C.GoString(tag)
recordString, err := createRecordString(timeStamp, tagStr, record)
recordString, err := createRecordString(timeStamp, record)

if err != nil {
writeErrorLog(err)
Expand All @@ -200,8 +231,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int

MessageCounter++

writeDebugLog(fmt.Sprintf("record string: %s", recordString))
writeDebugLog(fmt.Sprintf("message counter: %d", MessageCounter))
writeDebugLog(fmt.Sprintf("Record string: %s", recordString))
writeDebugLog(fmt.Sprintf("Message counter: %d", MessageCounter))

sqsRecord = &sqs.SendMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("MessageNumber-%d", MessageCounter)),
Expand All @@ -224,23 +255,36 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
SqsRecords = append(SqsRecords, sqsRecord)

if MessageCounter == sqsConf.batchSize {
writeDebugLog(fmt.Sprintf("Sending %d records in a SQS message batch...", len(SqsRecords)))
err := sendBatchToSqs(sqsConf, SqsRecords)

SqsRecords = nil
MessageCounter = 0

if err != nil {
writeErrorLog(err)
return output.FLB_ERROR
}
SqsRecords = nil
MessageCounter = 0
}
}

if SqsRecords != nil && sqsConf.flushPendingRecords {
writeDebugLog(fmt.Sprintf("Flushing pending %d records", len(SqsRecords)))
err := sendBatchToSqs(sqsConf, SqsRecords)
if err != nil {
writeErrorLog(err)
return output.FLB_ERROR
}
SqsRecords = nil
MessageCounter = 0
}

writeDebugLog("===== Exiting FLBPluginFlushCtx() =====")

return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
writeInfoLog("Exiting plugin now.")
return output.FLB_OK
}

Expand All @@ -253,6 +297,7 @@ func sendBatchToSqs(sqsConf *sqsConfig, sqsRecords []*sqs.SendMessageBatchReques
output, err := sqsConf.mySQS.SendMessageBatch(&sqsBatch)

if err != nil {
writeErrorLog(fmt.Errorf("Failed sending SQS message batch: %v", err))
return err
}

Expand All @@ -263,7 +308,7 @@ func sendBatchToSqs(sqsConf *sqsConfig, sqsRecords []*sqs.SendMessageBatchReques
return nil
}

func createRecordString(timestamp time.Time, tag string, record map[interface{}]interface{}) (string, error) {
func createRecordString(timestamp time.Time, record map[interface{}]interface{}) (string, error) {
m := make(map[string]interface{})
// convert timestamp to RFC3339Nano
m["@timestamp"] = timestamp.UTC().Format(time.RFC3339Nano)
Expand All @@ -278,7 +323,7 @@ func createRecordString(timestamp time.Time, tag string, record map[interface{}]
}
js, err := json.Marshal(m)
if err != nil {
writeErrorLog(fmt.Errorf("error creating message for sqs. tag: %s. error: %v", tag, err))
writeErrorLog(fmt.Errorf("Failed creating SQS message content: %v", err))
return "", err
}

Expand Down