Docker Kafka Container
Are you looking to set up a Kafka ecosystem containers using Docker on your MacOS machine? Look no further! In this article, we will explore the process of setting up a Kafka cluster, creating topics, connectors, and streams using Confluent's tools, including Schema Registry and ksqlDB. We will also cover how to use Docker to run these components in a local environment.
Setup
We will be deploying Kafka based on Kraft rather than Zookeeper.
Components we will deploy are-
- Kafka Broker
- Confluent Schema Registry
- Confluent REST Proxy
- Kafka Connect
- ksqlDB
- Kafka-UI
Create an Environment File
The first step is to create an environment file that will store our Cluster ID.
.env
CLUSTER_ID=AsK12ABCDExyABCDEFG99x
Create a Docker Compose File
Next, we'll create a Docker compose file that defines our Kafka containers.
kafka-docker-compose.yml
version: '3.9'
services:
broker:
# Apple M1 Chip
# platform: linux/amd64
image: confluentinc/cp-kafka:7.4.0
container_name: broker
hostname: broker
restart: always
env_file:
- .env
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: broker
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=broker -Dcom.sun.management.jmxremote.rmi.port=9101
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
# KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
KAFKA_LOG_DIRS: /var/lib/kafka/data
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: $CLUSTER_ID
ports:
- 9092:9092
- 9101:9101
volumes:
- broker_logdir:/var/lib/kafka/data
networks:
- kafka-network
schema-registry:
# Apple M1 Chip
# platform: linux/amd64
image: confluentinc/cp-schema-registry:7.4.0
container_name: schema-registry
hostname: schema-registry
restart: always
env_file:
- .env
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: __schemas
ports:
- 8081:8081
networks:
- kafka-network
depends_on:
- broker
rest-proxy:
# Apple M1 Chip
# platform: linux/amd64
image: confluentinc/cp-kafka-rest:7.4.0
container_name: rest-proxy
hostname: rest-proxy
restart: always
env_file:
- .env
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: broker:29092
KAFKA_REST_LISTENERS: http://0.0.0.0:8082
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
ports:
- 8082:8082
networks:
- kafka-network
depends_on:
- broker
- schema-registry
kafka-connect:
# Apple M1 Chip
# platform: linux/amd64
image: confluentinc/cp-kafka-connect:7.4.0
container_name: kafka-connect
hostname: kafka-connect
restart: always
env_file:
- .env
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: __connect-config
CONNECT_OFFSET_STORAGE_TOPIC: __connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: __connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.4.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.0
confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.3
/etc/confluent/docker/run &
sleep infinity
ports:
- 8083:8083
volumes:
- $PWD/data:/data
networks:
- kafka-network
depends_on:
- broker
- schema-registry
ksqldb-server:
# Apple M1 Chip
# platform: linux/amd64
image: confluentinc/cp-ksqldb-server:7.4.0
container_name: ksqldb-server
hostname: ksqldb-server
restart: always
env_file:
- .env
environment:
KSQL_CONFIG_DIR: /etc/ksql
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
KSQL_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: true
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: true
ports:
- 8088:8088
networks:
- kafka-network
depends_on:
- broker
- schema-registry
- kafka-connect
kafka-ui:
# Apple M1 Chip
# platform: linux/amd64
image: provectuslabs/kafka-ui:master
# image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
restart: always
env_file:
- .env
environment:
DYNAMIC_CONFIG_ENABLED: true
KAFKA_CLUSTERS_0_NAME: kafka-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9101
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: kafka-connect-cluster
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb-server:8088
ports:
- 8888:8080
volumes:
#- /tmp/config.yml:/etc/kafkaui/dynamic_config.yaml
- kafkaui_dir:/etc/kafkaui
networks:
- kafka-network
depends_on:
- broker
- schema-registry
- kafka-connect
- ksqldb-server
networks:
kafka-network:
driver: bridge
volumes:
broker_logdir:
kafkaui_dir:
This file defines the services that will be used to run Kafka, Schema Registry, ksqlDB, and Kafka Connect. It also sets up the necessary volumes and networks.
Start the Container
Now that we have our environment file and Docker compose file set up, it's time to start the container! Run the following command:
docker-compose -f kafka-docker-compose.yml up -d
This will start all the containers/services in detached mode, meaning they will run in the background.
List Kafka Cluster
Once the containers are running, you can list the available Kafka clusters using the following command:
rest_proxy='http://127.0.0.1:8082'
curl -s -k -X GET ${rest_proxy}/v3/clusters | jq
This will display a list of all available Kafka clusters.
Get Kafka Cluster Id
cluster_id=`curl -s -k -X GET ${rest_proxy}/v3/clusters | jq -r '.data | .[0].cluster_id'`
Create Kafka Topics
Now let us create two Kafka Topics namely pageviews & users-
curl -s -k -X POST -H "Content-Type: application/json" ${rest_proxy}/v3/clusters/${cluster_id}/topics \
--data '{"topic_name": "pageviews", "partitions_count": 1, "replication_factor": 1, "configs": [{"name": "cleanup.policy", "value": "delete"},{"name": "retention.ms", "value": 3600000}]}' | jq
curl -s -k -X POST -H "Content-Type: application/json" ${rest_proxy}/v3/clusters/${cluster_id}/topics \
--data '{"topic_name": "users", "partitions_count": 1, "replication_factor": 1, "configs": [{"name": "cleanup.policy", "value": "delete"},{"name": "retention.ms", "value": 3600000}]}' | jq
List Kafka Topics
curl -s -k -X GET ${rest_proxy}/v3/clusters/${cluster_id}/topics | jq | grep '.topic_name'
List Kafka Connector Plugins
kafka_connect='http://127.0.0.1:8083'
curl -s -k -X GET ${kafka_connect}/connector-plugins | jq
Create Kafka Connectors
We will generate mock data using Datagen Source Connector.
datagen-pageviews.json
{
"name": "datagen-pageviews",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"max.interval": 10000,
"tasks.max": "1"
}
datagen-users.json
{
"name": "datagen-users",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "users",
"quickstart": "users",
"max.interval": 10000,
"tasks.max": "1"
}
file='datagen-pageviews.json'
cname='datagen-pageviews'
curl -k -s -X PUT -H "Content-Type: application/json" -d @${file} ${kafka_connect}/connectors/${cname}/config | jq .
file='datagen-users.json'
cname='datagen-users'
curl -k -s -X PUT -H "Content-Type: application/json" -d @${file} ${kafka_connect}/connectors/${cname}/config | jq .
Also let us add a S3 Sink Connector.
tgt-s3-sink-insert-user.json
{
"aws.access.key.id": "XXXXXXXXXXXXX",
"aws.secret.access.key": "XXXXXXXXXXXXXXX",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"flush.size": "300",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"name": "tgt-s3-sink-insert-user",
"rotate.schedule.interval.ms": "60000",
"s3.bucket.name": "kafka-s3-tgt",
"s3.region": "ap-southeast-1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"tasks.max": "1",
"timezone": "UTC",
"topics": "users",
"topics.dir": "kafka"
}
file='tgt-s3-sink-insert-user.json'
cname='tgt-s3-sink-insert-user'
curl -k -s -X PUT -H "Content-Type: application/json" -d @${file} ${kafka_connect}/connectors/${cname}/config | jq .
List Kafka Connectors
curl -s -k -X GET ${kafka_connect}/connectors | jq
List Schemas in Schema Registry
schema_registry='http://127.0.0.1:8081'
curl -s -k -X GET ${schema_registry}/schemas | jq
curl -s -k -X GET ${schema_registry}/subjects | jq
Kafka-UI
Now let's visit the Kafka User Interface at http://127.0.0.1:8888 from browser.
Create ksqlDB stream and table
Now we will create a stream & a table from the UI.
CREATE STREAM pageviews_stream
WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
This will create a new stream named pageviews_stream that reads data from the pageviews topic.
Use the below command to display the contents of the stream.
SELECT * FROM pageviews_stream EMIT CHANGES;
To create a table, we can use the following command:
CREATE TABLE users_table (id VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
This will create a new table named users_table that reads data from the users topic.
Joining Streams and Tables
To join two streams or tables, you can use the following command:
CREATE STREAM user_pageviews
AS SELECT users_table.id AS userid, pageid, regionid, gender
FROM pageviews_stream
LEFT JOIN users_table ON pageviews_stream.userid = users_table.id
EMIT CHANGES;
This will create a new stream named user_pageviews that joins the pageviews_stream and users_table.
select * from user_pageviews EMIT CHANGES;
Stop the Container
Finally, we can stop the containers by running the following command:
docker-compose -f kafka-docker-compose.yml down
This will stop the containers and remove them from memory.
NOTE:
For MacOS M1, confluent have also released the images based on arm architecture.
https://hub.docker.com/u/confluentinc
- confluentinc/cp-kafka:7.5.1.arm64
- confluentinc/cp-schema-registry:7.5.1.arm64
- confluentinc/cp-kafka-rest:7.5.1.arm64
- confluentinc/cp-kafka-connect:7.5.1.arm64
- confluentinc/cp-ksqldb-server:7.5.1.arm64
In this article, we have covered how to set up a Kafka cluster using Docker, create topics and connectors, and use ksqlDB to process data. We have also demonstrated how to join streams and tables using ksqlDB.