diff --git a/README.md b/README.md index 7283267..bceb505 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,16 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS. ## Configuration Parameters -| Configuration Key Name | Description | Mandatory | -| ---------------------- | -------------------------------------------------------- | --------- | -| QueueUrl | the queue url in your aws account | yes | -| QueueRegion | the queue region in your aws account | yes | -| PluginTagAttribute | attribute name of the message tag | no | -| QueueMessageGroupId | the group id required for fifo queues | fifo-only | -| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no | -| BatchSize | set amount of messages to be sent in a batch request | yes | +| Configuration Key Name | Description | Mandatory | +| ---------------------- | ------------------------------------------------------------- | --------- | +| QueueUrl | the queue url in your aws account | yes | +| QueueRegion | the queue region in your aws account | yes | +| SQSEndpoint | the SQS endpoint to connect to to configure AWS client | no | +| PluginTagAttribute | attribute name of the message tag | no | +| QueueMessageGroupId | the group id required for fifo queues | fifo-only | +| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no | +| BatchSize | set amount of messages to be sent in a batch request | yes | +| FlushPendingRecords | if true, pending records will be sent following `Flush` value | no | ```conf [SERVICE] @@ -65,7 +67,7 @@ ENTRYPOINT ["/fluent-bit/bin/fluent-bit"] CMD ["-c", "/fluent-bit/etc/some_configuration.conf", "-e", "/fluent-bit/bin/fluentBit-sqs-plugin.so"] ``` -More information about the usage and installation of golang plugins can be found here: https://docs.fluentbit.io/manual/development/golang_plugins +More information about the usage and installation of golang plugins can be found here: https://docs.fluentbit.io/manual/development/golang-output-plugins ## Special Notes diff --git a/go.mod b/go.mod index f3647af..6dc79da 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module github.com/PayU/fluentBit-sqs-plugin go 1.14 + +require ( + github.com/aws/aws-sdk-go v1.44.285 + github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8950a2b --- /dev/null +++ b/go.sum @@ -0,0 +1,51 @@ +github.com/aws/aws-sdk-go v1.44.285 h1:rgoWYl+NdmKzRgoi/fZLEtGXOjCkcWIa5jPH02Uahdo= +github.com/aws/aws-sdk-go v1.44.285/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c h1:yKN46XJHYC/gvgH2UsisJ31+n4K3S7QYZSfU2uAWjuI= +github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/out_sqs.go b/out_sqs.go index 05604a2..19003cf 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -1,3 +1,5 @@ +//go:build linux + package main import ( @@ -35,43 +37,54 @@ var MessageCounter int = 0 var SqsRecords []*sqs.SendMessageBatchRequestEntry type sqsConfig struct { + endpoint string queueURL string queueMessageGroupID string mySQS *sqs.SQS pluginTagAttribute string proxyURL string batchSize int + flushPendingRecords bool } //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { setLogLevel() - return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin") + return output.FLBPluginRegister(def, "sqs", "AWS SQS output plugin") } //export FLBPluginInit func FLBPluginInit(plugin unsafe.Pointer) int { + endpoint := output.FLBPluginConfigKey(plugin, "SQSEndpoint") queueURL := output.FLBPluginConfigKey(plugin, "QueueUrl") queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion") queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId") pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute") proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl") batchSizeString := output.FLBPluginConfigKey(plugin, "BatchSize") + flushPendingRecordsString := output.FLBPluginConfigKey(plugin, "FlushPendingRecords") + writeInfoLog(fmt.Sprintf("SQSEndpoint is: %s", endpoint)) writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL)) writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion)) writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID)) writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute)) writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL)) writeInfoLog(fmt.Sprintf("BatchSize is: %s", batchSizeString)) + writeInfoLog(fmt.Sprintf("flushPendingRecords is: %s", flushPendingRecordsString)) + + if endpoint == "" { + endpoint = fmt.Sprintf("sqs.%s.amazonaws.com", queueRegion) + writeInfoLog(fmt.Sprintf("Using default regional AWS endpoint: %s", endpoint)) + } if queueURL == "" { - writeErrorLog(errors.New("QueueUrl configuration key is mandatory")) + writeErrorLog(errors.New("QueueUrl configuration key is mandatory.")) return output.FLB_ERROR } if queueRegion == "" { - writeErrorLog(errors.New("QueueRegion configuration key is mandatory")) + writeErrorLog(errors.New("QueueRegion configuration key is mandatory.")) return output.FLB_ERROR } @@ -84,11 +97,21 @@ func FLBPluginInit(plugin unsafe.Pointer) int { batchSize, err := strconv.Atoi(batchSizeString) if err != nil || (0 > batchSize && batchSize > 10) { - writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10")) + writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10.")) + return output.FLB_ERROR + } + + if flushPendingRecordsString == "" { + writeInfoLog("Flushing pending records by default.") + flushPendingRecordsString = "true" + } + flushPendingRecords, err := strconv.ParseBool(flushPendingRecordsString) + if err != nil { + writeErrorLog(errors.New(fmt.Sprintf("Cannot set flushPendingRecords as boolean; got %s", flushPendingRecordsString))) return output.FLB_ERROR } - writeInfoLog("retrieving aws credentials from environment variables") + writeInfoLog("Retrieving AWS credentials from environment variables...") awsCredentials := credentials.NewEnvCredentials() var myAWSSession *session.Session var sessionError error @@ -97,23 +120,24 @@ func FLBPluginInit(plugin unsafe.Pointer) int { // Retrieve the credentials value _, credError := awsCredentials.Get() if credError != nil { - writeInfoLog("unable to find aws credentials from environment variables..using credentials chain") + writeInfoLog("Unable to find AWS credentials from environment variables... Trying credentials chain") awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), } } else { - writeInfoLog("environment variables credentials where found") + writeInfoLog("AWS credentials found in environment") awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), Credentials: awsCredentials, + Endpoint: aws.String(endpoint), } } // if proxy if proxyURL != "" { - writeInfoLog("set http client struct on aws configuration since proxy url has been found") + writeInfoLog("Configuring AWS HTTP client using the provided proxy URL...") awsConfig.HTTPClient = &http.Client{ Transport: &http.Transport{ Proxy: func(*http.Request) (*url.URL, error) { @@ -129,6 +153,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { writeErrorLog(sessionError) return output.FLB_ERROR } + writeInfoLog("AWS session created.") // Set the context to point to any Go variable output.FLBPluginSetContext(plugin, &sqsConfig{ @@ -137,8 +162,11 @@ func FLBPluginInit(plugin unsafe.Pointer) int { mySQS: sqs.New(myAWSSession), pluginTagAttribute: pluginTagAttribute, batchSize: batchSize, + flushPendingRecords: flushPendingRecords, }) + writeInfoLog("Fluentbit context populated.") + return output.FLB_OK } @@ -149,11 +177,14 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int var record map[interface{}]interface{} var sqsRecord *sqs.SendMessageBatchRequestEntry + + writeDebugLog("===== Entering FLBPluginFlushCtx() =====") + // Type assert context back into the original type for the Go variable sqsConf, ok := output.FLBPluginGetContext(ctx).(*sqsConfig) if !ok { - writeErrorLog(errors.New("Unexpected error during get plugin context in flush function")) + writeErrorLog(errors.New("Unexpected error from FLBPluginGetContext().")) return output.FLB_ERROR } @@ -168,10 +199,10 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int break } - writeDebugLog(fmt.Sprintf("got new record from input. record length is: %d", len(record))) + writeDebugLog(fmt.Sprintf("Got new record from input (length : %d)", len(record))) if len(record) == 0 { - writeInfoLog("got empty record from input. skipping it") + writeInfoLog("Got empty record from input. Skipping it.") continue } @@ -183,12 +214,12 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int case uint64: timeStamp = time.Unix(int64(t), 0) default: - writeInfoLog("given time is not in a known format, defaulting to now") + writeInfoLog("The record time format is unknown, defaulting to now") timeStamp = time.Now() } tagStr := C.GoString(tag) - recordString, err := createRecordString(timeStamp, tagStr, record) + recordString, err := createRecordString(timeStamp, record) if err != nil { writeErrorLog(err) @@ -200,8 +231,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int MessageCounter++ - writeDebugLog(fmt.Sprintf("record string: %s", recordString)) - writeDebugLog(fmt.Sprintf("message counter: %d", MessageCounter)) + writeDebugLog(fmt.Sprintf("Record string: %s", recordString)) + writeDebugLog(fmt.Sprintf("Message counter: %d", MessageCounter)) sqsRecord = &sqs.SendMessageBatchRequestEntry{ Id: aws.String(fmt.Sprintf("MessageNumber-%d", MessageCounter)), @@ -224,23 +255,36 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int SqsRecords = append(SqsRecords, sqsRecord) if MessageCounter == sqsConf.batchSize { + writeDebugLog(fmt.Sprintf("Sending %d records in a SQS message batch...", len(SqsRecords))) err := sendBatchToSqs(sqsConf, SqsRecords) - - SqsRecords = nil - MessageCounter = 0 - if err != nil { writeErrorLog(err) return output.FLB_ERROR } + SqsRecords = nil + MessageCounter = 0 + } + } + + if SqsRecords != nil && sqsConf.flushPendingRecords { + writeDebugLog(fmt.Sprintf("Flushing pending %d records", len(SqsRecords))) + err := sendBatchToSqs(sqsConf, SqsRecords) + if err != nil { + writeErrorLog(err) + return output.FLB_ERROR } + SqsRecords = nil + MessageCounter = 0 } + writeDebugLog("===== Exiting FLBPluginFlushCtx() =====") + return output.FLB_OK } //export FLBPluginExit func FLBPluginExit() int { + writeInfoLog("Exiting plugin now.") return output.FLB_OK } @@ -253,6 +297,7 @@ func sendBatchToSqs(sqsConf *sqsConfig, sqsRecords []*sqs.SendMessageBatchReques output, err := sqsConf.mySQS.SendMessageBatch(&sqsBatch) if err != nil { + writeErrorLog(fmt.Errorf("Failed sending SQS message batch: %v", err)) return err } @@ -263,7 +308,7 @@ func sendBatchToSqs(sqsConf *sqsConfig, sqsRecords []*sqs.SendMessageBatchReques return nil } -func createRecordString(timestamp time.Time, tag string, record map[interface{}]interface{}) (string, error) { +func createRecordString(timestamp time.Time, record map[interface{}]interface{}) (string, error) { m := make(map[string]interface{}) // convert timestamp to RFC3339Nano m["@timestamp"] = timestamp.UTC().Format(time.RFC3339Nano) @@ -278,7 +323,7 @@ func createRecordString(timestamp time.Time, tag string, record map[interface{}] } js, err := json.Marshal(m) if err != nil { - writeErrorLog(fmt.Errorf("error creating message for sqs. tag: %s. error: %v", tag, err)) + writeErrorLog(fmt.Errorf("Failed creating SQS message content: %v", err)) return "", err }