Instaclustr AWS S3 Apache Kafka Connect Plugin
AWS S3 Sink Connector Plugin
This plugin will store data from Kafka topics in a defined s3 bucket. Currently it only supports writing in the format described below. Please note that the values set for the value.convertor and the key.convertor should always be the ByteArrayConverter specified below.
Configurations Available
- “aws.s3.bucket” : S3 bucket to be written to.
- “prefix” : The path prefix to the location the s3 objects must be put
- “aws.secretKey” : AWS credentials
- “aws.accessKeyId” : AWS credentials
- “value.converter” : org.apache.kafka.connect.converters.ByteArrayConverter
- “key.converter” : org.apache.kafka.connect.converters.ByteArrayConverter
- “aws.region” : AWS region the client will be configured with (optional)
Data format
The sink connector buffers messages of each topic partition separately upto 5mb of data. This results in an S3 object with multiple messages. The object key would be defined as below
user-defined-prefix/<topic>/<partition>/<start offset>-<end offset>
The data within an object starts with a version identifier which would be written as an integer in the first four bytes of each s3 object. The rest of the bytes in the objects would be kafka record data. A kafka record would be written in the format below in bytes according to their data type (eg. long : 8 bytes).
| offset (long) | timestamp (long) | key byte length (int) | value byte length (int) | number of headers (int) | key | value | header-1 key length (int) | header-1 key | header-1 value length (int)| header-1 value | …..
Memory Requirements
As we buffer each partition of a kafka topic upto 5mb it is possible to calculate a rough minimum memory requirement using [number of partitions handled * 5mb]
Example Connector Creation Request
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
curl https://<ip>:8083/connectors -X POST -H 'Content-Type: application/json' -d '{ "name":"sink", "config":{ "connector.class":"com.instaclustr.kafka.connect.s3.sink.AwsStorageSinkConnector", "tasks.max":"3", "topics":"<topic-1,topic-2...>", "prefix":"<s3-prefix-to-data>", "aws.s3.bucket":"<s3-bucket-name>", "aws.accessKeyId":"<key>", "aws.secretKey":"<secret>", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "key.converter":"org.apache.kafka.connect.converters.ByteArrayConverter" } }' -k -u ic_kc_user:<password> |
AWS S3 Source Connector Plugin
Currently it is only possible to use this plugin to read data back into kafka topics that were written by the Instaclustr AWS s3 sink plugin. Please note that the values set for the value.convertor and the key.convertor should always be the ByteArrayConverter specified below.
When reading data from an S3 bucket the source connector expects to have a kafka topic with the same name and partitions in the target kafka cluster. By specifying the kafka.topicPrefix you can change the topic name into <prefix>-<topic name in s3> and it would also require creating a topic with that name in the kafka cluster.
Configurations Available
- “aws.s3.bucket” : S3 bucket to be read from
- “prefix” : The path prefix to the location the s3 objects must be read from
- “aws.secretKey” : AWS credentials
- “aws.accessKeyId” : AWS credentials
- “value.converter” : org.apache.kafka.connect.converters.ByteArrayConverter
- “key.converter” : org.apache.kafka.connect.converters.ByteArrayConverter
- “s3.topics” : Specify the required topics to process found in an S3 bucket location
- “kafka.topicPrefix” : Specify a prefix for the kafka topic written to
- “maxRecordsPerSecond” : The rate of records being produced to kafka. Will help with tuning it according to the capability of a worker.
- “aws.region” : AWS region the client will be configured with (optional)
Example Connector Creation Request
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
curl https://<ip>:8083/connectors -X POST -H 'Content-Type: application/json' -d '{ "name":"source", "config":{ "connector.class":"com.instaclustr.kafka.connect.s3.source.AwsStorageSourceConnector", "tasks.max":"3", "kafka.topicPrefix":"s3", "s3.topics":"<topic-1,topic-2...>", "aws.s3.bucket":"<s3-bucket-name>", "aws.accessKeyId":"<key>", "aws.secretKey":"<secret>", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "key.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "prefix":"<s3-prefix-to-data>" } }' -k -u ic_kc_user:<password> |