Logo AppDev24 Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
Applications

Kafka Connect Debezium Source to AWS S3 Sink

Updated on Aug 07, 2023

In this article we will learn how to write Kafka event messages from Debezium Source Database Topics to AWS S3 using Kafka Connect. We will use Amazon S3 Sink Connector to write the messages as Parquet files to our S3 Datalake. Also we will write the Kafka Tombstone records to a separate file to handle downstream delete operations.

The S3 sink connector, allows to export data from Kafka topics to S3 objects in either Avro, Parquet or JSON formats. Being a sink, the S3 connector periodically polls data from Kafka and in turn uploads it to S3. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an S3 object, whose key name encodes the topic, the Kafka partition and the start offset of this data chunk.

We will use the Confluent hub client to install this connector in our Kafka Connect cluster.

confluent-hub install confluentinc/kafka-connect-s3:10.5.1

Now let us look into the Connector configuration.

"name": "vendors",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics": "vendors",
  • The Name of the Connector Configuration
  • The Name of the Connector Class
  • The underlying Storage layer
  • The Name of the Kafka Topic
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"key.converter.use.latest.version": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"value.converter.use.latest.version": "true",
"schema.compatibility": "BACKWARD",
  • The Key Converter Class
  • The Value Converter Class
  • The Schema Registry URL
  • Use Latest Version of the Schemas
  • The schema compatibility rule to use when the connector is observing schema changes
"s3.bucket.name": "s3-datalake",
"s3.region": "eu-central-1",
"s3.object.tagging": "true",
"topics.dir": "kafka"
  • The S3 Bucket
  • The AWS region to be used by the connector
  • Tag S3 objects with start and end offsets, as well as record count
  • Top level directory to store the data ingested from Kafka
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "Record",
"timezone": "UTC",
"locale": "en-US",
  • The Format Class to use when writing data to the store
  • The Partitioner Class to use when writing data to the store
  • The Format of the Data Directories when partitioning with TimeBasedPartitioner
  • The Extractor that gets the timestamp for records when partitioning with TimeBasedPartitioner
  • The Timezone to use when partitioning with TimeBasedPartitioner
  • The Locale to use when partitioning with TimeBasedPartitioner
"transforms": "unwrap,extractKey,kafkaMetaData,formatTs",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.kafkaMetaData.topic.field": "kafka_topic",
"transforms.kafkaMetaData.partition.field": "kafka_partition",
"transforms.kafkaMetaData.offset.field": "kafka_offset",
"transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
"transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTs.target.type": "string",
"transforms.formatTs.field": "message_ts",

We will apply few Single Message Transforms to facilitate ease of downstream data handling.

  • To simplify the format of the event records that the Debezium connectors produce, we will use the Debezium event flattening single message transformation (SMT). The event flattening SMT extracts the after field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after field to create a simple Kafka record
  • Keeps tombstone records for DELETE operations in the event stream
  • Adds change event metadata for the table and lsn fields to the simplified Kafka record
  • ExtractField pulls a field out of a complex (non-primitive, Map or Struct) key and replaces the entire key or value with the extracted field
  • InsertField insert fields using attributes from the record metadata
  • Add information about the Kafka message (topic, partition, offset, timestamp)
  • Transform the Kafka timestamp format
"delete.handling.mode": "none",
"behavior.on.null.values": "write",
"store.kafka.keys": "true",
"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  • Debezium generates a change event record for each DELETE operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for DELETE operations in the stream, set delete.handling.mode to none
  • How to handle records with a null value, i.e. Kafka tombstone records
  • Enable Writing record keys to storage
  • The Format Class to use when writing keys to the store
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "300000",
"partition.duration.ms": "300000",
  • The Part Size in S3 Multi-part Uploads
  • Number of Records written to store before invoking file commits
  • The Time Interval in milliseconds to periodically invoke file commits
  • The Duration of a Partition milliseconds used by TimeBasedPartitioner

Now this is how the final connector configuration properties looks like:

{
    "name": "vendors",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "topics": "vendors",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enable": "false",
    "key.converter.use.latest.version": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true",
    "value.converter.use.latest.version": "true",
    "schema.compatibility": "BACKWARD",
    "s3.bucket.name": "s3-datalake",
    "s3.region": "eu-central-1",
    "s3.object.tagging": "true",
    "topics.dir": "kafka",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "timestamp.extractor": "Record",
    "timezone": "UTC",
    "locale": "en-US",
    "transforms": "unwrap,extractKey,kafkaMetaData,formatTs",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "id",
    "transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.kafkaMetaData.topic.field": "kafka_topic",
    "transforms.kafkaMetaData.partition.field": "kafka_partition",
    "transforms.kafkaMetaData.offset.field": "kafka_offset",
    "transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
    "transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
    "transforms.formatTs.target.type": "string",
    "transforms.formatTs.field": "message_ts",
    "delete.handling.mode": "none",
    "behavior.on.null.values": "write",
    "store.kafka.keys": "true",
    "keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "s3.part.size": "5242880",
    "flush.size": "1000",
    "rotate.interval.ms": "300000",
    "partition.duration.ms": "300000",
    "tasks.max": "3",
    "aws.access.key.id": "AWS_ACCESS_KEY",
    "aws.secret.access.key": "AWS_ACCESS_SECRET"
}
PrimeChess

PrimeChess.org

PrimeChess.org makes elite chess training accessible and affordable for everyone. For the past 6 years, we have offered free chess camps for kids in Singapore and India, and during that time, we also observed many average-rated coaches charging far too much for their services.

To change that, we assembled a team of top-rated coaches including International Masters (IM) or coaches with multiple IM or GM norms, to provide online classes starting from $50 per month (8 classes each month + 4 tournaments)

This affordability is only possible if we get more students. This is why it will be very helpful if you could please pass-on this message to others.

Exclucively For Indian Residents: 
Basic - ₹1500
Intermediate- ₹2000
Advanced - ₹2500

Top 10 Articles