Kafka Connect to AWS S3 Sink
In this article we will learn how to write Kafka event messages to AWS S3 using Kafka Connect. We will use Amazon S3 Sink Connector to write the messages as Parquet files to our S3 Datalake. Also we will write the Kafka Tombstone records to a separate file to handle downstream delete operations.
The S3 sink connector, allows to export data from Kafka topics to S3 objects in either Avro, Parquet or JSON formats. Being a sink, the S3 connector periodically polls data from Kafka and in turn uploads it to S3. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an S3 object, whose key name encodes the topic, the Kafka partition and the start offset of this data chunk.
We will use the Confluent hub client to install this connector in our Kafka Connect cluster.
confluent-hub install confluentinc/kafka-connect-s3:10.5.1
Now let us look into the Connector configuration.
"name": "customers",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics": "customers",
- The Name of the Connector Configuration
- The Name of the Connector Class
- The underlying Storage layer
- The Name of the Kafka Topic
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"key.converter.use.latest.version": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"value.converter.use.latest.version": "true",
"schema.compatibility": "BACKWARD",
- The Key Converter Class
- The Value Converter Class
- The Schema Registry URL
- Use Latest Version of the Schemas
- The schema compatibility rule to use when the connector is observing schema changes
"s3.bucket.name": "s3-datalake",
"s3.region": "eu-central-1",
"s3.object.tagging": "true",
"topics.dir": "kafka"
- The S3 Bucket
- The AWS region to be used by the connector
- Tag S3 objects with start and end offsets, as well as record count
- Top level directory to store the data ingested from Kafka
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "Record",
"timezone": "UTC",
"locale": "en-US",
- The Format Class to use when writing data to the store
- The Partitioner Class to use when writing data to the store
- The Format of the Data Directories when partitioning with TimeBasedPartitioner
- The Extractor that gets the timestamp for records when partitioning with TimeBasedPartitioner
- The Timezone to use when partitioning with TimeBasedPartitioner
- The Locale to use when partitioning with TimeBasedPartitioner
"transforms": "kafkaMetaData,formatTs",
"transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.kafkaMetaData.topic.field": "kafka_topic",
"transforms.kafkaMetaData.partition.field": "kafka_partition",
"transforms.kafkaMetaData.offset.field": "kafka_offset",
"transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
"transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTs.target.type": "string",
"transforms.formatTs.field": "message_ts",
We will apply few Single Message Transforms to facilitate ease of downstream data handling.
InsertField
insert fields using attributes from the record metadata- Add information about the Kafka message (topic, partition, offset, timestamp)
- Transform the Kafka timestamp format
"behavior.on.null.values": "write",
"store.kafka.keys": "true",
"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
- How to handle records with a null value, i.e. Kafka tombstone records
- Enable Writing record keys to storage
- The Format Class to use when writing keys to the store
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "300000",
"partition.duration.ms": "300000",
- The Part Size in S3 Multi-part Uploads
- Number of Records written to store before invoking file commits
- The Time Interval in milliseconds to periodically invoke file commits
- The Duration of a Partition milliseconds used by TimeBasedPartitioner
Now this is how the final connector configuration properties looks like:
{
"name": "customers",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics": "customers",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"key.converter.use.latest.version": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"value.converter.use.latest.version": "true",
"schema.compatibility": "BACKWARD",
"s3.bucket.name": "s3-datalake",
"s3.region": "eu-central-1",
"s3.object.tagging": "true",
"topics.dir": "kafka",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "Record",
"timezone": "UTC",
"locale": "en-US",
"transforms": "kafkaMetaData,formatTs",
"transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.kafkaMetaData.topic.field": "kafka_topic",
"transforms.kafkaMetaData.partition.field": "kafka_partition",
"transforms.kafkaMetaData.offset.field": "kafka_offset",
"transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
"transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTs.target.type": "string",
"transforms.formatTs.field": "message_ts",
"behavior.on.null.values": "write",
"store.kafka.keys": "true",
"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "300000",
"partition.duration.ms": "300000",
"tasks.max": "3",
"aws.access.key.id": "AWS_ACCESS_KEY",
"aws.secret.access.key": "AWS_ACCESS_SECRET"
}
This is how the output looks like in S3 Bucket.