Kafka Connect Debezium Source to AWS S3 Sink
In this article we will learn how to write Kafka event messages from Debezium Source Database Topics to AWS S3 using Kafka Connect. We will use Amazon S3 Sink Connector to write the messages as Parquet files to our S3 Datalake. Also we will write the Kafka Tombstone records to a separate file to handle downstream delete operations.
The S3 sink connector, allows to export data from Kafka topics to S3 objects in either Avro, Parquet or JSON formats. Being a sink, the S3 connector periodically polls data from Kafka and in turn uploads it to S3. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an S3 object, whose key name encodes the topic, the Kafka partition and the start offset of this data chunk.
We will use the Confluent hub client to install this connector in our Kafka Connect cluster.
confluent-hub install confluentinc/kafka-connect-s3:10.5.1
Now let us look into the Connector configuration.
"name": "vendors",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics": "vendors",
- The Name of the Connector Configuration
- The Name of the Connector Class
- The underlying Storage layer
- The Name of the Kafka Topic
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"key.converter.use.latest.version": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"value.converter.use.latest.version": "true",
"schema.compatibility": "BACKWARD",
- The Key Converter Class
- The Value Converter Class
- The Schema Registry URL
- Use Latest Version of the Schemas
- The schema compatibility rule to use when the connector is observing schema changes
"s3.bucket.name": "s3-datalake",
"s3.region": "eu-central-1",
"s3.object.tagging": "true",
"topics.dir": "kafka"
- The S3 Bucket
- The AWS region to be used by the connector
- Tag S3 objects with start and end offsets, as well as record count
- Top level directory to store the data ingested from Kafka
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "Record",
"timezone": "UTC",
"locale": "en-US",
- The Format Class to use when writing data to the store
- The Partitioner Class to use when writing data to the store
- The Format of the Data Directories when partitioning with TimeBasedPartitioner
- The Extractor that gets the timestamp for records when partitioning with TimeBasedPartitioner
- The Timezone to use when partitioning with TimeBasedPartitioner
- The Locale to use when partitioning with TimeBasedPartitioner
"transforms": "unwrap,extractKey,kafkaMetaData,formatTs",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.kafkaMetaData.topic.field": "kafka_topic",
"transforms.kafkaMetaData.partition.field": "kafka_partition",
"transforms.kafkaMetaData.offset.field": "kafka_offset",
"transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
"transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTs.target.type": "string",
"transforms.formatTs.field": "message_ts",
We will apply few Single Message Transforms to facilitate ease of downstream data handling.
- To simplify the format of the event records that the Debezium connectors produce, we will use the Debezium event flattening single message transformation (SMT). The event flattening SMT extracts the after field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its after field to create a simple Kafka record
- Keeps tombstone records for
DELETE
operations in the event stream - Adds change event metadata for the
table
andlsn
fields to the simplified Kafka record
pulls a field out of a complex (non-primitive, Map or Struct) key and replaces the entire key or value with the extracted fieldExtractField
InsertField
insert fields using attributes from the record metadata- Add information about the Kafka message (topic, partition, offset, timestamp)
- Transform the Kafka timestamp format
"delete.handling.mode": "none",
"behavior.on.null.values": "write",
"store.kafka.keys": "true",
"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
- Debezium generates a change event record for each
DELETE
operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records forDELETE
operations in the stream, setdelete.handling.mode
tonone
- How to handle records with a null value, i.e. Kafka tombstone records
- Enable Writing record keys to storage
- The Format Class to use when writing keys to the store
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "300000",
"partition.duration.ms": "300000",
- The Part Size in S3 Multi-part Uploads
- Number of Records written to store before invoking file commits
- The Time Interval in milliseconds to periodically invoke file commits
- The Duration of a Partition milliseconds used by TimeBasedPartitioner
Now this is how the final connector configuration properties looks like:
{
"name": "vendors",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics": "vendors",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"key.converter.use.latest.version": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"value.converter.use.latest.version": "true",
"schema.compatibility": "BACKWARD",
"s3.bucket.name": "s3-datalake",
"s3.region": "eu-central-1",
"s3.object.tagging": "true",
"topics.dir": "kafka",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "Record",
"timezone": "UTC",
"locale": "en-US",
"transforms": "unwrap,extractKey,kafkaMetaData,formatTs",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"transforms.kafkaMetaData.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.kafkaMetaData.topic.field": "kafka_topic",
"transforms.kafkaMetaData.partition.field": "kafka_partition",
"transforms.kafkaMetaData.offset.field": "kafka_offset",
"transforms.kafkaMetaData.timestamp.field": "kafka_timestamp",
"transforms.formatTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTs.format": "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTs.target.type": "string",
"transforms.formatTs.field": "message_ts",
"delete.handling.mode": "none",
"behavior.on.null.values": "write",
"store.kafka.keys": "true",
"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "300000",
"partition.duration.ms": "300000",
"tasks.max": "3",
"aws.access.key.id": "AWS_ACCESS_KEY",
"aws.secret.access.key": "AWS_ACCESS_SECRET"
}