Change Data Capture from MongoDB to Kafka
As data becomes increasingly critical to businesses, the need to capture and process changes in real-time has never been more important. In this article, we'll explore how to read changed data from a MongoDB Server and write it to a Kafka topic as event messages using Debezium's MongoDB CDC Source Connector.
Setting Up Environment
Before we dive into the code, make sure you have a Kafka ecosystem set up on your MacOS machine using Docker. You can refer to our previous article on setting up a Kafka container using Docker for more information.
Installing the Debezium Connector
To use the Debezium MongoDB CDC Source Connector, install it using Confluent Hub. Add the below entry inside the docker kafka-connect service under the command.
confluent-hub install --no-prompt debezium/debezium-connector-mongodb:2.4.2
Simulating MongoDB Source Database
Add a Docker container service to simulate a MongoDB source database in the docker-compose.yml file.
# MongoDB Source Database
mongo_src:
# Apple M1 Chip
# platform: linux/amd64
image: mongo:4.0
container_name: mongo_src
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: Password1234
ports:
- 27019:27017
volumes:
- mongo_datadir:/data/db
networks:
- kafka-network
command:
- "--replSet"
- "rs0"
- "--bind_ip"
- "localhost,mongo_src"
Please also remember to add mongo_datadir
under the volumes block.
Initialize MongoDB Replica Set
docker-compose exec mongo_src /usr/bin/mongo --username root --authenticationDatabase admin --password Password1234 --eval "rs.initiate({_id: \"rs0\", members: [{_id: 0, host: \"mongo_src\"}]})"
docker-compose exec mongo_src /usr/bin/mongo --username root --authenticationDatabase admin --password Password1234 --eval "rs.status()"
Note: Using MongoDB Change Streams feature. the MongoDB server exposes the changes that occur in a collection as an event stream. The Debezium connector monitors the stream and then delivers the changes downstream.
Create Source Collection
Create a source collection with some documents in the MongoDB database for our demo:
use sales
db.consultants.insert({first_name: "John", last_name: "Doe", email: "john.doe@gmail.com", rate: 3000.00, status: "perm", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Tom", last_name: "Hanks", email: "tom.hanks@gmail.com", rate: 3500.75, status: "contract", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Jane", last_name: "Doe", email: "jane.doe@gmail.com", rate: 3500.75, status: "perm", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Duke", last_name: "Johnson", email: "duke@hello.com", rate: 4500.25, status: "contract", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Peter", last_name: "Parker", email: "peter@gmail.com", rate: 4500.25, status: "contract", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Rick", last_name: "Nice", email: "rick@gmail.com", rate: 4900, status: "contract", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Tommy", last_name: "Hill", email: "tommy@gmail.com", rate: 4100, status: "perm", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Jill", last_name: "Stone", email: "jill@gmail.com", rate: 4250.50, status: "contract", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Honey", last_name: "Bee", email: "honey@gmail.com", rate: 3200, status: "perm", created_at: new Date(), updated_at: new Date()})
db.consultants.insert({first_name: "Bell", last_name: "Doe", email: "bell@gmail.com", rate: 3400, status: "contract", created_at: new Date(), updated_at: new Date()})
Kafka Connect Configuration
Create a configuration file for the Debezium MongoDB CDC Source Connector:
{
"name": "src-mongo-debezium-cdc-consultants",
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://host.docker.internal:27019/?replicaSet=rs0",
"mongodb.connection.mode": "replica_set",
"mongodb.user": "root",
"mongodb.password": "Password1234",
"database.include.list": "sales",
"collection.include.list": "sales.consultants",
"topic.prefix": "mongo_src",
"capture.mode": "change_streams_update_full",
"tasks.max": "1"
}
Once the Kafka Connector is up & running, let us now look into the Kafka Topic.
Capture Changed Data from Database
Modify some documents in the MongoDB database to observe the CDC from the database to a Kafka topic.
db.consultants.insert({first_name: "Saurav", last_name: "Mitra", email: "saurav.karate@gmail.com", rate: 5000.00, status: "perm", created_at: new Date(), updated_at: new Date()})
db.consultants.updateOne({email: "saurav.karate@gmail.com"}, {$set: {rate: 6500.00}, $currentDate: { updated_at: true }})
db.consultants.insert({first_name: "Tim", last_name: "Smith", email: "tim.smith@freelance.com", rate: 3500.70, status: "contract", created_at: new Date(), updated_at: new Date()})
db.consultants.deleteOne({ email: "tim.smith@freelance.com" })
db.consultants.insertMany([
{first_name: "Shane", last_name: "Wilson", email: "shane.wilson@freelance.com", rate: 5000.00, status: "perm", created_at: new Date(), updated_at: new Date()},
{first_name: "John", last_name: "Sinha", email: "john.sinha@freelance.com", rate: 9000.00, status: "contract", created_at: new Date(), updated_at: new Date()}
])
We can now see, that the database events have been streamed in near real-time to the Kafka Topic.
tombstones.on.delete: true
is the default setting.
After a source record is deleted, a delete operation is represented by a delete event and a subsequent tombstone event in Kafka Topic. The tombstone event allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.
Alternative MongoDB Connector
To use the MongoDB CDC Source Connector, install it using Confluent Hub. Add the below entry inside the docker kafka-connect service under the command.
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.13.0
Create a configuration file for the MongoDB CDC Source Connector:
{
"name": "src-mongo-mongodb-cdc-consultants",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://root:Password1234@host.docker.internal:27019/?replicaSet=rs0",
"database": "sales",
"collection": "consultants",
"topic.prefix": "mongodb_src",
"startup.mode": "copy_existing",
"publish.full.document.only": "true",
"publish.full.document.only.tombstone.on.delete": "true",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"output.format.key": "json",
"output.format.value": "schema",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"output.schema.infer.value": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": "1"
}
In this article, we've demonstrated how to capture change data from a MongoDB database and write it to a Kafka topic using Debezium's MongoDB CDC Source Connector. With this setup, you can now process changes in real-time and integrate them with your existing data pipelines.