Logo AppDev24 Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
Applications

Change Data Capture from Oracle to Kafka

Updated on Jul 03, 2024

As data becomes increasingly critical to businesses, the need to capture and process changes in real-time has never been more important. In this article, we'll explore how to read changed data from a Oracle database and write it to a Kafka topic as event messages using Confluent Oracle CDC Source Connector.

Important:

The Confluent Oracle CDC Source Connector is a Premium Confluent connector which requires an additional subscription. The Oracle CDC Source Connector captures changes in an Oracle database and writes the changes as event messages in Kafka topics.

Setting Up Environment

Before we dive into the code, make sure you have a Kafka ecosystem set up on your MacOS machine using Docker. You can refer to our previous article on setting up a Kafka container using Docker for more information.

Also refer to our previous article to set up an Oracle Docker Image.

Installing the Confluent Connector

To use the Confluent Oracle CDC Source Connector, install it using Confluent Hub. Add the below entry inside the docker kafka-connect service under the command.

confluent-hub install --no-prompt confluentinc/kafka-connect-oracle-cdc:2.14.0

Simulating Oracle Source Database

Add a Docker container service to simulate a Oracle source database in the docker-compose.yml file.

# Oracle Source Database
  oracle_src:
    image: oracle/database:19.3.0-ee
    hostname: oracle_src
    container_name: oracle_src
    restart: always
    shm_size: 1gb
    ports:
      - "1521:1521"
    environment:
      - ORACLE_SID=ORCLCDB
      - ORACLE_PDB=ORCLPDB1
      - ORACLE_PWD=Password1234
      - ENABLE_ARCHIVELOG=true
    volumes:
      - oradata:/opt/oracle/oradata
    networks:
      - kafka-network

Please also remember to add oradata under the volumes block.

Note:

The connector uses Oracle LogMiner to read the database's redo log and requires supplemental logging with "ALL" columns. The connector supports Oracle 11g, 12c, 18c, and 19c. It supports both container databases and non-container databases, and supports databases running on-premises or in the cloud.

Once the docker containers are up & running refer the article on Oracle Docker Image to setup the Oracle Database and enable CDC.

Create Source Table

Create a source table with some data in the Oracle database for our demo:

ALTER SESSION SET CONTAINER=ORCLPDB1;

CREATE TABLE ORCL_USER.CONSULTANTS("ID" INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,"FIRST_NAME" VARCHAR(50),"LAST_NAME" VARCHAR(50),"EMAIL" VARCHAR(50),"RATE" NUMBER(8,2),"STATUS" VARCHAR(20),"CREATED_AT" TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ,"UPDATED_AT" TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
GRANT SELECT ON ORCL_USER.CONSULTANTS TO C##MYUSER;

INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('John', 'Doe', 'john.doe@gmail.com', 3000.00, 'perm');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Tom', 'Hanks', 'tom.hanks@yahoo.com', 3500.75, 'contract');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Jane', 'Doe', 'jane.doe@moneybank.com', 3500.75, 'perm');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Duke', 'Johnson', 'duke@hello.com', 4500.25, 'contract');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Peter', 'Parker', 'peter@gmail.com', 5000, 'contract');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Rick', 'Nice', 'rick@gmail.com', 4900, 'contract');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Tommy', 'Hill', 'tommy@gmail.com', 4100, 'perm');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Jill', 'Stone', 'jill@gmail.com', 4250.50, 'contract');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Honey', 'Bee', 'honey@gmail.com', 3200, 'perm');
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Bell', 'Doe', 'bell@gmail.com', 34000, 'contract');
COMMIT;

Kafka Connect Configuration

First, create a Redo Log Topic in Kafka-

  • Name: oracle-redo-log-topic
  • Replication Factor: 1
  • Partitions: 1
  • Cleanup Policy: delete
  • Retention Time ms: 172800000

Next we will create a configuration file for the Confluent Oracle CDC Source Connector:

{
    "name": "src-oracle-confluent-cdc-consultants",
    "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
    "oracle.server": "host.docker.internal",
    "oracle.port": "1521",
    "oracle.sid": "ORCLCDB",
    "oracle.pdb.name": "ORCLPDB1",
    "oracle.username": "C##MYUSER",
    "oracle.password": "PASSWORD",
    "table.inclusion.regex": "ORCLPDB1[.]ORCL_USER[.](CONSULTANTS)",
    "numeric.mapping": "best_fit",
    "start.from": "snapshot",
    "emit.tombstone.on.delete": "true",

    "connection.pool.max.size": "5",
    "max.batch.size": "100",
    "snapshot.row.fetch.size": "1",
    "auto.create.topics.enable": "true",

    "table.topic.name.template": "ORCL-CDC-${databaseName}-${tableName}",
    "lob.topic.name.template": "ORCL-CDC-${databaseName}-${tableName}-${columnName}",
    "confluent.topic.replication.factor": "1",
    "redo.log.topic.name": "oracle-redo-log-topic",
    "redo.log.consumer.fetch.min.bytes": "1",
    "redo.log.row.fetch.size": "1",
    "topic.creation.redo.include": "oracle-redo-log-topic",

    "bootstrap.servers": "broker:29092",
    "confluent.topic.bootstrap.servers": "broker:29092",
    "redo.log.consumer.bootstrap.servers": "broker:29092",

    "topic.creation.enable": "true",
    "topic.creation.groups": "redo",
    "topic.creation.redo.replication.factor": "1",
    "topic.creation.redo.partitions": "1",
    "topic.creation.redo.cleanup.policy": "delete",
    "topic.creation.redo.retention.ms": "1209600000",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "topic.creation.default.cleanup.policy": "compact",

    "key.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "tasks.max": "1"
}

Once the Kafka Connector is up & running, let us now look into the Kafka Topic.

Capture Changed Data from Database

Modify some data in the Oracle database to observe the CDC from the database to a Kafka topic.

ALTER SESSION SET CONTAINER=ORCLPDB1;

INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Saurav', 'Mitra', 'saurav.karate@gmail.com', 5000.00, 'perm'); COMMIT;
UPDATE ORCL_USER.CONSULTANTS SET rate=6500.00 WHERE email='saurav.karate@gmail.com'; COMMIT;
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Tim', 'Smith', 'tim.smith@freelance.com', 3500.70, 'contract'); COMMIT;
DELETE FROM ORCL_USER.CONSULTANTS WHERE email='tim.smith@freelance.com'; COMMIT;
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('Shane', 'Wilson', 'shane.wilson@freelance.com', 5000.00, 'perm'); COMMIT;
INSERT INTO ORCL_USER.CONSULTANTS(FIRST_NAME, LAST_NAME, EMAIL, RATE, STATUS) VALUES ('John', 'Sinha', 'john.sinha@freelance.com', 9000.00, 'contract'); COMMIT;

SELECT * FROM ORCL_USER.CONSULTANTS;

We can now see, that the database events have been streamed in near real-time to the Kafka Topic.

Here we have configured the property emit.tombstone.on.delete: true .
After a source record is deleted, a delete operation is represented by a delete event and a subsequent tombstone event in Kafka Topic. The tombstone event allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.

Using Debezium Connector for Oracle

Alternatively, we can use the open-source Debezium Connector for Oracle.

Firstly, download the Oracle Connector plugin archive. Next unzip and place the plugin directory under data folder.

Modify the kafka-connect docker compose service:

Set the environment variable 

  • CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data

Set the path location of the downloaded plugin-

volumes:
      - $PWD/data:/data

Debezium Kafka Connect Configuration

{
    "name": "src-oracle-debezium-cdc-consultants",
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "database.hostname": "host.docker.internal",
    "database.port": "1521",
    "database.dbname": "ORCLCDB",
    "database.pdb.name": "ORCLPDB1",
    "database.user": "C##MYUSER",
    "database.password": "PASSWORD",
    "table.include.list": "ORCL_USER.CONSULTANTS",
    "decimal.handling.mode": "string",
    "time.precision.mode": "connect",
    "topic.prefix": "oracle_src",
    "schema.history.internal.kafka.topic": "dbhistory.oracle-orcl-user",
    "schema.history.internal.kafka.bootstrap.servers": "broker:29092",
    "tasks.max": "1"
}

In this article, we've demonstrated how to capture change data from a Oracle database and write it to a Kafka topic using Confluent Oracle CDC Source Connector. With this setup, you can now process changes in real-time and integrate them with your existing data pipelines.

PrimeChess

PrimeChess.org

PrimeChess.org makes elite chess training accessible and affordable for everyone. For the past 6 years, we have offered free chess camps for kids in Singapore and India, and during that time, we also observed many average-rated coaches charging far too much for their services.

To change that, we assembled a team of top-rated coaches including International Masters (IM) or coaches with multiple IM or GM norms, to provide online classes starting from $50 per month (8 classes each month + 4 tournaments)

This affordability is only possible if we get more students. This is why it will be very helpful if you could please pass-on this message to others.

Exclucively For Indian Residents: 
Basic - ₹1500
Intermediate- ₹2000
Advanced - ₹2500

Top 10 Articles