Change Data Capture from PostgreSQL to Kafka
As data becomes increasingly critical to businesses, the need to capture and process changes in real-time has never been more important. In this article, we'll explore how to read changed data from a PostgreSQL database and write it to a Kafka topic as event messages using Debezium's PostgreSQL CDC Source Connector.
Setting Up Environment
Before we dive into the code, make sure you have a Kafka ecosystem set up on your MacOS machine using Docker. You can refer to our previous article on setting up a Kafka container using Docker for more information.
Installing the Debezium Connector
To use the Debezium PostgreSQL CDC Source Connector, install it using Confluent Hub. Add the below entry inside the docker kafka-connect service under the command.
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.5.4
Simulating PostgreSQL Source Database
Add a Docker container service to simulate a PostgreSQL source database in the docker-compose.yml file.
# PostgreSQL Source Database
postgres_src:
# Apple M1 Chip
# platform: linux/amd64
image: postgres:latest
container_name: postgres_src
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: Password1234
POSTGRES_DB: postgres
PGDATA: /var/lib/postgresql/data/pgdata
ports:
- 5434:5432
volumes:
- pgdata_dir:/var/lib/postgresql/data
networks:
- kafka-network
command:
- "postgres"
- "-c"
- "wal_level=logical"
Please also remember to add pgdata_dir
under the volumes block.
Note:
The PostgreSQL connector reads from a logical replication stream. The command "postgres", "-c", "wal_level=logical"
configures the PostgreSQL database to start with wal_level as logical.
Create Source Table
Create a source table with some data in the PostgreSQL database for our demo:
CREATE TABLE consultants(
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
rate NUMERIC(8,2),
status VARCHAR(20),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
ALTER TABLE consultants REPLICA IDENTITY USING INDEX consultants_pkey;
INSERT INTO consultants(first_name, last_name, email, rate, status) values
('John', 'Doe', 'john.doe@gmail.com', 3000.00, 'perm'),
('Tom', 'Hanks', 'tom.hanks@yahoo.com', 3500.75, 'contract'),
('Jane', 'Doe', 'jane.doe@moneybank.com', 3500.75, 'perm'),
('Duke', 'Johnson', 'duke@hello.com', 4500.25, 'contract'),
('Peter', 'Parker', 'peter@gmail.com', 4500.25, 'contract'),
('Rick', 'Nice', 'rick@gmail.com', 4900, 'contract'),
('Tommy', 'Hill', 'tommy@gmail.com', 4100, 'perm'),
('Jill', 'Stone', 'jill@gmail.com', 4250.50, 'contract'),
('Honey', 'Bee', 'honey@gmail.com', 3200, 'perm'),
('Bell', 'Doe', 'bell@gmail.com', 34000, 'contract')
;
SELECT * FROM consultants;
Kafka Connect Configuration
Create a configuration file for the Debezium PostgreSQL CDC Source Connector:
{
"name": "src-pg-debezium-cdc-consultants",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "host.docker.internal",
"database.port": "5434",
"database.user": "postgres",
"database.password": "Password1234",
"database.dbname": "postgres",
"database.server.name": "pgserver",
"plugin.name": "pgoutput",
"table.include.list": "public.consultants",
"include.schema.changes": "false",
"decimal.handling.mode": "string",
"time.precision.mode": "connect",
"topic.prefix": "postgres_src",
"database.history.kafka.topic": "dbhistory.pg-public",
"database.history.kafka.bootstrap.servers": "broker:9092",
"tasks.max": "1"
}
Once the Kafka Connector is up & running, let us now look into the Kafka Topic.
Capture Changed Data from Database
Modify some data in the PostgreSQL database to observe the CDC from the database to a Kafka topic.
INSERT INTO consultants(first_name, last_name, email, rate, status) VALUES ('Saurav', 'Mitra', 'saurav.karate@gmail.com', 5000.00, 'perm');
UPDATE consultants set rate=6500.00 where email='saurav.karate@gmail.com';
INSERT INTO consultants(first_name, last_name, email, rate, status) VALUES ('Tim', 'Smith', 'tim.smith@freelance.com', 3500.70, 'contract');
DELETE from consultants where email='tim.smith@freelance.com';
INSERT INTO consultants(first_name, last_name, email, rate, status) VALUES
('Shane', 'Wilson', 'shane.wilson@freelance.com', 5000.00, 'perm'),
('John', 'Sinha', 'john.sinha@freelance.com', 9000.00, 'contract')
;
We can now see, that the database events have been streamed in near real-time to the Kafka Topic.
tombstones.on.delete: true
is the default setting.
After a source record is deleted, a delete operation is represented by a delete event and a subsequent tombstone event in Kafka Topic. The tombstone event allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.
In this article, we've demonstrated how to capture change data from a PostgreSQL database and write it to a Kafka topic using Debezium's PostgreSQL CDC Source Connector. With this setup, you can now process changes in real-time and integrate them with your existing data pipelines.