This example demonstrates how Benthos can be used to stream an S3 bucket of
.tar.gz archives containing JSON documents into any output target. This
example is able to listen for newly added archives and then downloads,
decompresses, unarchives and streams the JSON documents found within to a Kafka
topic. The Kafka output in this example can be replaced with any Benthos
output target.
The method used to stream archives is via an SQS queue, which is a common pattern. Benthos can work either with S3 events sent via SQS directly, or by S3 events broadcast via SNS to SQS, there is a small adjustment to the config which is explained in the input section.
The full config for this example can be found here.
input:
type: s3
s3:
region: eu-west-1 # TODO
bucket: TODO
delete_objects: false
sqs_url: TODO
sqs_body_path: Records.s3.object.key
sqs_envelope_path: ""
sqs_max_messages: 10
credentials:
id: "TODO"
secret: "TODO"
token: "TODO"
role: "TODO"This input section contains lots of fields to be completed which are self
explanatory, such as bucket, sqs_url and the credentials section.
The sqs_body_path field is the JSON path within an SQS message that contains
the name of new S3 files, which should be left as Records.s3.object.key unless
you have built a custom solution.
If SNS is being used to broadcast S3 events instead of connecting SQS directly
you will need to fill in the sqs_envelope_path, which is the JSON path inside
an SNS message that contains the enveloped S3 event. The value of
sqs_envelope_path should be Message when using the standard AWS set up.
This example uses a single consumer, but if the throughput isn't high enough to
keep up with the bucket it is possible to use a broker type to have multiple
parallel consumers:
input:
type: broker
broker:
copies: 8 # Increase this to gain more parallel consumers
inputs:
- type: s3
s3:
... etcYou can have any number of consumers of a bucket and messages (archives) will automatically be distributed amongst them via the SQS queue.
pipeline:
threads: 4 # Try to match the number of available logical CPU cores
processors:
- type: decompress
decompress:
algorithm: gzip
- type: unarchive
unarchive:
format: tar
- type: split
- type: batch
batch:
count: 10 # The size of message batches to send to KafkaThe processors in this example start off with a simple decompress and unarchive of the payload. This results in a single payload of multiple documents. The split processor turns this payload into individual messages.
The final processor is optional. It is a batch stage that bundles the individual messages back into batches to be sent to the Kafka topic, increasing the throughput. If the batch processor is used it should be a factor of the number of messages inside the S3 archives. The size also needs to be low enough so that the overall size of the batch doesn't exceed the maximum bytes of a Kafka request.
These processors are heavy on CPU, which is why they are configured inside the pipeline section. This allows you to explicitly set the number of parallel threads to exactly match the number of logical CPU cores available.
The output config is a standard Kafka output.