From 7e861d771f9afceeb4e160edc42517acf511fda8 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Thu, 9 Nov 2023 18:21:49 +0100 Subject: [PATCH 1/8] Send pending records --- go.mod | 7 ++++++- go.sum | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ out_sqs.go | 11 +++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 go.sum diff --git a/go.mod b/go.mod index f3647af..4671429 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ -module github.com/PayU/fluentBit-sqs-plugin +module github.com/piequi/fluentBit-sqs-plugin go 1.14 + +require ( + github.com/aws/aws-sdk-go v1.44.285 + github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8950a2b --- /dev/null +++ b/go.sum @@ -0,0 +1,51 @@ +github.com/aws/aws-sdk-go v1.44.285 h1:rgoWYl+NdmKzRgoi/fZLEtGXOjCkcWIa5jPH02Uahdo= +github.com/aws/aws-sdk-go v1.44.285/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c h1:yKN46XJHYC/gvgH2UsisJ31+n4K3S7QYZSfU2uAWjuI= +github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/out_sqs.go b/out_sqs.go index 05604a2..79a5b86 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -1,3 +1,5 @@ +//go:build linux + package main import ( @@ -236,6 +238,15 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } } + if SqsRecords != nil { + writeInfoLog(fmt.Sprintf("Flushing pending %d records before exiting", len(SqsRecords))) + err := sendBatchToSqs(sqsConf, SqsRecords) + if err != nil { + writeErrorLog(err) + return output.FLB_ERROR + } + } + return output.FLB_OK } From f406852b0fa3f7a190e368b86f8645fc15e5687b Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Mon, 4 Dec 2023 14:05:05 +0100 Subject: [PATCH 2/8] Add endpoint config --- out_sqs.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/out_sqs.go b/out_sqs.go index 79a5b86..d3feee2 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -37,6 +37,7 @@ var MessageCounter int = 0 var SqsRecords []*sqs.SendMessageBatchRequestEntry type sqsConfig struct { + endpoint string queueURL string queueMessageGroupID string mySQS *sqs.SQS @@ -53,6 +54,7 @@ func FLBPluginRegister(def unsafe.Pointer) int { //export FLBPluginInit func FLBPluginInit(plugin unsafe.Pointer) int { + endpoint := output.FLBPluginConfigKey(plugin, "SQSEndpoint") queueURL := output.FLBPluginConfigKey(plugin, "QueueUrl") queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion") queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId") @@ -60,6 +62,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl") batchSizeString := output.FLBPluginConfigKey(plugin, "BatchSize") + writeInfoLog(fmt.Sprintf("SQSEndpoint is: %s", endpoint)) writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL)) writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion)) writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID)) @@ -67,6 +70,11 @@ func FLBPluginInit(plugin unsafe.Pointer) int { writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL)) writeInfoLog(fmt.Sprintf("BatchSize is: %s", batchSizeString)) + if endpoint == "" { + endpoint = fmt.Sprintf("sqs.%s.amazonaws.com", queueRegion) + writeInfoLog(fmt.Sprintf("Using default regional AWS endpoint: %s", endpoint)) + } + if queueURL == "" { writeErrorLog(errors.New("QueueUrl configuration key is mandatory")) return output.FLB_ERROR @@ -110,6 +118,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), Credentials: awsCredentials, + Endpoint: aws.String(endpoint), } } From 937ff24aa883a21977997ae552f8674ea7dd4905 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Mon, 4 Dec 2023 14:29:00 +0100 Subject: [PATCH 3/8] Add log statements --- out_sqs.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/out_sqs.go b/out_sqs.go index d3feee2..b079311 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -113,7 +113,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { CredentialsChainVerboseErrors: aws.Bool(true), } } else { - writeInfoLog("environment variables credentials where found") + writeInfoLog("environment variables credentials were found") awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), @@ -140,6 +140,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { writeErrorLog(sessionError) return output.FLB_ERROR } + writeInfoLog("AWS session created") // Set the context to point to any Go variable output.FLBPluginSetContext(plugin, &sqsConfig{ @@ -150,6 +151,8 @@ func FLBPluginInit(plugin unsafe.Pointer) int { batchSize: batchSize, }) + writeInfoLog("Fluentbit context created") + return output.FLB_OK } @@ -248,7 +251,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } if SqsRecords != nil { - writeInfoLog(fmt.Sprintf("Flushing pending %d records before exiting", len(SqsRecords))) + writeInfoLog(fmt.Sprintf("Flushing pending %d records", len(SqsRecords))) err := sendBatchToSqs(sqsConf, SqsRecords) if err != nil { writeErrorLog(err) @@ -261,6 +264,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int //export FLBPluginExit func FLBPluginExit() int { + writeInfoLog("Exiting plugin now.") return output.FLB_OK } From 563099de3f7bb488a146c0967e5205a8abbc0b2a Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Mon, 4 Dec 2023 14:30:43 +0100 Subject: [PATCH 4/8] Reset after flushing --- out_sqs.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/out_sqs.go b/out_sqs.go index b079311..b44d534 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -257,6 +257,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int writeErrorLog(err) return output.FLB_ERROR } + SqsRecords = nil + MessageCounter = 0 } return output.FLB_OK From c93a9a2fc7d6345ce0198cdec22a62cadf8e4dae Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Mon, 4 Dec 2023 15:13:37 +0100 Subject: [PATCH 5/8] Cleanup code --- out_sqs.go | 53 +++++++++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/out_sqs.go b/out_sqs.go index b44d534..fe9af3c 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -49,7 +49,7 @@ type sqsConfig struct { //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { setLogLevel() - return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin") + return output.FLBPluginRegister(def, "sqs", "AWS SQS output plugin") } //export FLBPluginInit @@ -76,12 +76,12 @@ func FLBPluginInit(plugin unsafe.Pointer) int { } if queueURL == "" { - writeErrorLog(errors.New("QueueUrl configuration key is mandatory")) + writeErrorLog(errors.New("QueueUrl configuration key is mandatory.")) return output.FLB_ERROR } if queueRegion == "" { - writeErrorLog(errors.New("QueueRegion configuration key is mandatory")) + writeErrorLog(errors.New("QueueRegion configuration key is mandatory.")) return output.FLB_ERROR } @@ -94,11 +94,11 @@ func FLBPluginInit(plugin unsafe.Pointer) int { batchSize, err := strconv.Atoi(batchSizeString) if err != nil || (0 > batchSize && batchSize > 10) { - writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10")) + writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10.")) return output.FLB_ERROR } - writeInfoLog("retrieving aws credentials from environment variables") + writeInfoLog("Retrieving AWS credentials from environment variables...") awsCredentials := credentials.NewEnvCredentials() var myAWSSession *session.Session var sessionError error @@ -107,13 +107,13 @@ func FLBPluginInit(plugin unsafe.Pointer) int { // Retrieve the credentials value _, credError := awsCredentials.Get() if credError != nil { - writeInfoLog("unable to find aws credentials from environment variables..using credentials chain") + writeInfoLog("Unable to find AWS credentials from environment variables... Trying credentials chain") awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), } } else { - writeInfoLog("environment variables credentials were found") + writeInfoLog("AWS credentials found in environment") awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), @@ -124,7 +124,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { // if proxy if proxyURL != "" { - writeInfoLog("set http client struct on aws configuration since proxy url has been found") + writeInfoLog("Configuring AWS HTTP client using the provided proxy URL...") awsConfig.HTTPClient = &http.Client{ Transport: &http.Transport{ Proxy: func(*http.Request) (*url.URL, error) { @@ -140,7 +140,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { writeErrorLog(sessionError) return output.FLB_ERROR } - writeInfoLog("AWS session created") + writeInfoLog("AWS session created.") // Set the context to point to any Go variable output.FLBPluginSetContext(plugin, &sqsConfig{ @@ -151,7 +151,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { batchSize: batchSize, }) - writeInfoLog("Fluentbit context created") + writeInfoLog("Fluentbit context populated.") return output.FLB_OK } @@ -163,11 +163,14 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int var record map[interface{}]interface{} var sqsRecord *sqs.SendMessageBatchRequestEntry + + writeDebugLog("===== Entering FLBPluginFlushCtx() =====") + // Type assert context back into the original type for the Go variable sqsConf, ok := output.FLBPluginGetContext(ctx).(*sqsConfig) if !ok { - writeErrorLog(errors.New("Unexpected error during get plugin context in flush function")) + writeErrorLog(errors.New("Unexpected error from FLBPluginGetContext().")) return output.FLB_ERROR } @@ -182,10 +185,10 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int break } - writeDebugLog(fmt.Sprintf("got new record from input. record length is: %d", len(record))) + writeDebugLog(fmt.Sprintf("Got new record from input (length : %d)", len(record))) if len(record) == 0 { - writeInfoLog("got empty record from input. skipping it") + writeInfoLog("Got empty record from input. Skipping it.") continue } @@ -197,12 +200,12 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int case uint64: timeStamp = time.Unix(int64(t), 0) default: - writeInfoLog("given time is not in a known format, defaulting to now") + writeInfoLog("The record time format is unknown, defaulting to now") timeStamp = time.Now() } tagStr := C.GoString(tag) - recordString, err := createRecordString(timeStamp, tagStr, record) + recordString, err := createRecordString(timeStamp, record) if err != nil { writeErrorLog(err) @@ -214,8 +217,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int MessageCounter++ - writeDebugLog(fmt.Sprintf("record string: %s", recordString)) - writeDebugLog(fmt.Sprintf("message counter: %d", MessageCounter)) + writeDebugLog(fmt.Sprintf("Record string: %s", recordString)) + writeDebugLog(fmt.Sprintf("Message counter: %d", MessageCounter)) sqsRecord = &sqs.SendMessageBatchRequestEntry{ Id: aws.String(fmt.Sprintf("MessageNumber-%d", MessageCounter)), @@ -238,20 +241,19 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int SqsRecords = append(SqsRecords, sqsRecord) if MessageCounter == sqsConf.batchSize { + writeDebugLog(fmt.Sprintf("Sending %d records in a SQS message batch...", len(SqsRecords))) err := sendBatchToSqs(sqsConf, SqsRecords) - - SqsRecords = nil - MessageCounter = 0 - if err != nil { writeErrorLog(err) return output.FLB_ERROR } + SqsRecords = nil + MessageCounter = 0 } } if SqsRecords != nil { - writeInfoLog(fmt.Sprintf("Flushing pending %d records", len(SqsRecords))) + writeDebugLog(fmt.Sprintf("Flushing pending %d records", len(SqsRecords))) err := sendBatchToSqs(sqsConf, SqsRecords) if err != nil { writeErrorLog(err) @@ -261,6 +263,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int MessageCounter = 0 } + writeDebugLog("===== Exiting FLBPluginFlushCtx() =====") + return output.FLB_OK } @@ -279,6 +283,7 @@ func sendBatchToSqs(sqsConf *sqsConfig, sqsRecords []*sqs.SendMessageBatchReques output, err := sqsConf.mySQS.SendMessageBatch(&sqsBatch) if err != nil { + writeErrorLog(fmt.Errorf("Failed sending SQS message batch: %v", err)) return err } @@ -289,7 +294,7 @@ func sendBatchToSqs(sqsConf *sqsConfig, sqsRecords []*sqs.SendMessageBatchReques return nil } -func createRecordString(timestamp time.Time, tag string, record map[interface{}]interface{}) (string, error) { +func createRecordString(timestamp time.Time, record map[interface{}]interface{}) (string, error) { m := make(map[string]interface{}) // convert timestamp to RFC3339Nano m["@timestamp"] = timestamp.UTC().Format(time.RFC3339Nano) @@ -304,7 +309,7 @@ func createRecordString(timestamp time.Time, tag string, record map[interface{}] } js, err := json.Marshal(m) if err != nil { - writeErrorLog(fmt.Errorf("error creating message for sqs. tag: %s. error: %v", tag, err)) + writeErrorLog(fmt.Errorf("Failed creating SQS message content: %v", err)) return "", err } From 116ec5c5eae7f5834d97c2cfed6881a136e72339 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Mon, 4 Dec 2023 16:08:04 +0100 Subject: [PATCH 6/8] Add flag to flush messages --- out_sqs.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/out_sqs.go b/out_sqs.go index fe9af3c..19003cf 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -44,6 +44,7 @@ type sqsConfig struct { pluginTagAttribute string proxyURL string batchSize int + flushPendingRecords bool } //export FLBPluginRegister @@ -61,6 +62,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute") proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl") batchSizeString := output.FLBPluginConfigKey(plugin, "BatchSize") + flushPendingRecordsString := output.FLBPluginConfigKey(plugin, "FlushPendingRecords") writeInfoLog(fmt.Sprintf("SQSEndpoint is: %s", endpoint)) writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL)) @@ -69,6 +71,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute)) writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL)) writeInfoLog(fmt.Sprintf("BatchSize is: %s", batchSizeString)) + writeInfoLog(fmt.Sprintf("flushPendingRecords is: %s", flushPendingRecordsString)) if endpoint == "" { endpoint = fmt.Sprintf("sqs.%s.amazonaws.com", queueRegion) @@ -98,6 +101,16 @@ func FLBPluginInit(plugin unsafe.Pointer) int { return output.FLB_ERROR } + if flushPendingRecordsString == "" { + writeInfoLog("Flushing pending records by default.") + flushPendingRecordsString = "true" + } + flushPendingRecords, err := strconv.ParseBool(flushPendingRecordsString) + if err != nil { + writeErrorLog(errors.New(fmt.Sprintf("Cannot set flushPendingRecords as boolean; got %s", flushPendingRecordsString))) + return output.FLB_ERROR + } + writeInfoLog("Retrieving AWS credentials from environment variables...") awsCredentials := credentials.NewEnvCredentials() var myAWSSession *session.Session @@ -149,6 +162,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { mySQS: sqs.New(myAWSSession), pluginTagAttribute: pluginTagAttribute, batchSize: batchSize, + flushPendingRecords: flushPendingRecords, }) writeInfoLog("Fluentbit context populated.") @@ -252,7 +266,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } } - if SqsRecords != nil { + if SqsRecords != nil && sqsConf.flushPendingRecords { writeDebugLog(fmt.Sprintf("Flushing pending %d records", len(SqsRecords))) err := sendBatchToSqs(sqsConf, SqsRecords) if err != nil { From ec9e55d71a61b1b1dc6c75654d5b86f8d365f44c Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Mon, 4 Dec 2023 17:02:33 +0100 Subject: [PATCH 7/8] Update README --- README.md | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 7283267..bceb505 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,16 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS. ## Configuration Parameters -| Configuration Key Name | Description | Mandatory | -| ---------------------- | -------------------------------------------------------- | --------- | -| QueueUrl | the queue url in your aws account | yes | -| QueueRegion | the queue region in your aws account | yes | -| PluginTagAttribute | attribute name of the message tag | no | -| QueueMessageGroupId | the group id required for fifo queues | fifo-only | -| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no | -| BatchSize | set amount of messages to be sent in a batch request | yes | +| Configuration Key Name | Description | Mandatory | +| ---------------------- | ------------------------------------------------------------- | --------- | +| QueueUrl | the queue url in your aws account | yes | +| QueueRegion | the queue region in your aws account | yes | +| SQSEndpoint | the SQS endpoint to connect to to configure AWS client | no | +| PluginTagAttribute | attribute name of the message tag | no | +| QueueMessageGroupId | the group id required for fifo queues | fifo-only | +| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no | +| BatchSize | set amount of messages to be sent in a batch request | yes | +| FlushPendingRecords | if true, pending records will be sent following `Flush` value | no | ```conf [SERVICE] @@ -65,7 +67,7 @@ ENTRYPOINT ["/fluent-bit/bin/fluent-bit"] CMD ["-c", "/fluent-bit/etc/some_configuration.conf", "-e", "/fluent-bit/bin/fluentBit-sqs-plugin.so"] ``` -More information about the usage and installation of golang plugins can be found here: https://docs.fluentbit.io/manual/development/golang_plugins +More information about the usage and installation of golang plugins can be found here: https://docs.fluentbit.io/manual/development/golang-output-plugins ## Special Notes From 59b3b5419d130127cfbf11dd18e42af917e41542 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Bourgeot Date: Mon, 4 Dec 2023 17:21:18 +0100 Subject: [PATCH 8/8] Restore package name --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 4671429..6dc79da 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/piequi/fluentBit-sqs-plugin +module github.com/PayU/fluentBit-sqs-plugin go 1.14