Logo AppDev24 Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
Applications

Kafka Connect to AWS S3 Sink

Updated on Aug 07, 2023

In this article we will learn how to write Kafka event messages to AWS S3 using Kafka Connect. We will use Amazon S3 Sink Connector to write the messages as Parquet files to our S3 Datalake. Also we will write the Kafka Tombstone records to a separate file to handle downstream delete operations.

The S3 sink connector, allows to export data from Kafka topics to S3 objects in either Avro, Parquet or JSON formats. Being a sink, the S3 connector periodically polls data from Kafka and in turn uploads it to S3. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an S3 object, whose key name encodes the topic, the Kafka partition and the start offset of this data chunk.

We will use the Confluent hub client to install this connector in our Kafka Connect cluster.

confluent-hub install confluentinc/kafka-connect-s3:10.5.1

Now let us look into the Connector configuration.

"name": "customers",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics": "customers",
  • The Name of the Connector Configuration
  • The Name of the Connector Class
  • The underlying Storage layer
  • The Name of the Kafka Topic
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"key.converter.use.latest.version": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"value.converter.use.latest.version": "true",
"schema.compatibility": "BACKWARD",
  • The Key Converter Class
  • The Value Converter Class
  • The Schema Registry URL
  • Use Latest Version of the Schemas
  • The schema compatibility rule to use when the connector is observing schema changes
"s3.bucket.name": "s3-datalake",
"s3.region": "eu-central-1",
"s3.object.tagging": "true",
"topics.dir": "kafka"
  • The S3 Bucket
  • The AWS region to be used by the connector
  • Tag S3 objects with start and end offsets, as well as record count
  • Top level directory to store the data ingested from Kafka
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "Record",
"timezone": "UTC",
"locale": "en-US",
  • The Format Class to use when writing data to the store
  • The Partitioner Class to use when writing data to the store
  • The Format of the Data Directories when partitioning with TimeBasedPartitioner
  • The Extractor that gets the timestamp for records when partitioning with TimeBasedPartitioner
  • The Timezone to use when partitioning with TimeBasedPartitioner
  • The Locale to use when partitioning with TimeBasedPartitioner
"transforms": "kafkaMetaData,formatTs",
"transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.kafkaMetaData.topic.field": "kafka_topic",
"transforms.kafkaMetaData.partition.field": "kafka_partition",
"transforms.kafkaMetaData.offset.field": "kafka_offset",
"transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
"transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTs.target.type": "string",
"transforms.formatTs.field": "message_ts",

We will apply few Single Message Transforms to facilitate ease of downstream data handling.

  • InsertField insert fields using attributes from the record metadata
  • Add information about the Kafka message (topic, partition, offset, timestamp)
  • Transform the Kafka timestamp format
"behavior.on.null.values": "write",
"store.kafka.keys": "true",
"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  • How to handle records with a null value, i.e. Kafka tombstone records
  • Enable Writing record keys to storage
  • The Format Class to use when writing keys to the store
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "300000",
"partition.duration.ms": "300000",
  • The Part Size in S3 Multi-part Uploads
  • Number of Records written to store before invoking file commits
  • The Time Interval in milliseconds to periodically invoke file commits
  • The Duration of a Partition milliseconds used by TimeBasedPartitioner

Now this is how the final connector configuration properties looks like:

{
    "name": "customers",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "topics": "customers",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enable": "false",
    "key.converter.use.latest.version": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true",
    "value.converter.use.latest.version": "true",
    "schema.compatibility": "BACKWARD",
    "s3.bucket.name": "s3-datalake",
    "s3.region": "eu-central-1",
    "s3.object.tagging": "true",
    "topics.dir": "kafka",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "timestamp.extractor": "Record",
    "timezone": "UTC",
    "locale": "en-US",
    "transforms": "kafkaMetaData,formatTs",
    "transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.kafkaMetaData.topic.field": "kafka_topic",
    "transforms.kafkaMetaData.partition.field": "kafka_partition",
    "transforms.kafkaMetaData.offset.field": "kafka_offset",
    "transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
    "transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
    "transforms.formatTs.target.type": "string",
    "transforms.formatTs.field": "message_ts",
    "behavior.on.null.values": "write",
    "store.kafka.keys": "true",
    "keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "s3.part.size": "5242880",
    "flush.size": "1000",
    "rotate.interval.ms": "300000",
    "partition.duration.ms": "300000",
    "tasks.max": "3",
    "aws.access.key.id": "AWS_ACCESS_KEY",
    "aws.secret.access.key": "AWS_ACCESS_SECRET"
}

This is how the output looks like in S3 Bucket.

PrimeChess

PrimeChess.org

PrimeChess.org makes elite chess training accessible and affordable for everyone. For the past 6 years, we have offered free chess camps for kids in Singapore and India, and during that time, we also observed many average-rated coaches charging far too much for their services.

To change that, we assembled a team of top-rated coaches including International Masters (IM) or coaches with multiple IM or GM norms, to provide online classes starting from $50 per month (8 classes each month + 4 tournaments)

This affordability is only possible if we get more students. This is why it will be very helpful if you could please pass-on this message to others.

Exclucively For Indian Residents: 
Basic - ₹1500
Intermediate- ₹2000
Advanced - ₹2500

Top 10 Articles