CDC
Change Data Capture (CDC) is a technique and a design pattern. We often use it to replicate data between databases in real-time.
We can also track data changes written to a source database and automatically sync target databases. CDC enables incremental loading and eliminates the need for bulk load updating.
Debezium
Based on Apache Kafka, Debezium is an open-source platform for CDC. Its main purpose is to create a transaction log that contains all row-level modifications committed to each source database table. Based on incremental changes in the data, any application that is listening to these events can take the necessary steps.
Project overview
docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
– 52181:2181
kafka:
image: confluentinc/cp-kafka:7.0.1
ports:
– “9092:9092”
depends_on:
– zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: ‘zookeeper:2181’
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafka-admin:
image: provectuslabs/kafka-ui
container_name: kafka-admin
ports:
– “9091:8080”
restart: always
depends_on:
– kafka
– zookeeper
environment:
– KAFKA_CLUSTERS_0_NAME=local
– KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
– KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
db:
platform: linux/x86_64
image: debezium/example-postgres
restart: always
environment:
POSTGRES_PASSWORD: example
ports:
– 5432:5432
extra_hosts:
– “host.docker.internal:host-gateway”
command:
– “postgres”
– “-c”
– “wal_level=logical”
volumes:
– ./resource/init.sql:/docker-entrypoint-initdb.d/create-db-tables.sql
elasticsearch:
image: elasticsearch:8.8.0
ports:
– 9200:9200
– 9300:9300
environment:
– discovery.type=single-node
– xpack.security.enabled=false
– “ES_JAVA_OPTS=-Xms512m -Xmx512m”
adminer:
image: adminer
restart: always
ports:
– 8001:8080
kafka_connect:
container_name: kafka_connect
image: debezium/connect
links:
– db
– kafka
ports:
– ‘8083:8083’
environment:
– BOOTSTRAP_SERVERS=kafka:29092
– GROUP_ID=medium_debezium
– CONFIG_STORAGE_TOPIC=my_connect_configs
– OFFSET_STORAGE_TOPIC=my_connect_offsets
– STATUS_STORAGE_TOPIC=my_connect_statuses
Kafka UI
Database
Set connection username : postgres and password : example
Create a valid database create database debezium_demo
Run query to create database
properties.yml file example
username: postgres
password: example
Connector Requests
Connector information:
Connector should be created at first. It seems like you’re working with the Debezium connector, which is used for change data capture (CDC) from database management systems like PostgreSQL. Debezium is commonly used in streaming data pipelines to capture and propagate database changes to other systems or applications in real-time.
To configure a Debezium connector for PostgreSQL, you typically need to provide configuration parameters such as database connection details, the replication slot name, and other connector-specific settings. Below is an example configuration for setting up a Debezium connector for PostgreSQL:
“name”: “product-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.hostname”: “db”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “example”,
“database.dbname”: “debezium_demo”,
“database.server.name”: “postgres”,
“tombstones.on.delete” : false,
“topic.prefix” : “product”,
“table.inclde.list” : “public.product”,
“heartbeat.interval.ms” : “5000”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: false,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: false,
“plugin.name” : “pgoutput”,
“slot.name”: “unique_slot_name”
}
}
connector.class : Specifies the class for the PostgreSQL connector.
tasks.max : Defines the maximum number of tasks to create for this connector.
database.hostname, database.port, database.user, database.password, database.dbname : Connection details for PostgreSQL.
database.server.name: A unique identifier for the connector instance. Server’s name can be found in properties for database
slot.name : The replication slot name used by the connector. Make sure it’s unique for each connector.
plugin.name : Specifies the Debezium plugin to use for capturing changes.
schema.include, table.include.list: Specify the schema and tables to include in the CDC.
snapshot.mode : Defines how the initial snapshot of data should be taken.
heartbeat.interval.ms : Interval for sending heartbeat messages to PostgreSQL.
heartbeat.action.query : SQL query used for heartbeat messages.
snapshot.lock.timeout.ms : Timeout for acquiring locks during the snapshot.
snapshot.select.statement.overrides : Overrides the default SELECT statement used during the snapshot.
database.history.kafka.bootstrap.servers”: “kafka:9092” : Kafka broker information
“key.converter”: “org.apache.kafka.connect.json.JsonConverter” and “value.converter”: “org.apache.kafka.connect.json.JsonConverter” : Kafka serializing configs
CURL Requests:
GET Connector :
–url http://localhost:8083/connectors/product-connector
POST – add a connector
CURL Request :
–header ‘Accept: application/json’
–header ‘Content-Type: application/json’
–header ‘Cookie: csrftoken=M7jtiBvQrP5MVhm9kN1nbq8hNFgi0lRS’
–data ‘{
“name”: “product-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.hostname”: “db”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “example”,
“database.dbname”: “debezium_demo”,
“database.server.name”: “postgres”,
“tombstones.on.delete” : false,
“topic.prefix” : “product”,
“table.inclde.list” : “public.product”,
“heartbeat.interval.ms” : “5000”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: false,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: false,
“plugin.name” : “pgoutput”,
“slot.name”: “unique_slot_name”
}
}’
DELETE Connector
–url http://localhost:8083/connectors/product-connector
Extra information for slot.name:
The error message indicates that the creation of a replication slot failed due to a slot with the name “debezium” already existing. This typically happens when attempting to set up multiple connectors for the same database host using the same replication slot name.
To resolve this issue, you need to ensure that each connector you set up for the same database host uses a distinct replication slot name. This can be achieved by configuring a unique replication slot name for each connector.
You can specify the replication slot name in the Debezium PostgreSQL connector configuration. Look for the slot.name parameter and ensure that it is set to a unique value for each connector.
Before null problem
alter table product replica identity full; : run this command in database console
Project Running Configuration
Build project
Open Docker in local machine
Run docker-compose up -d for docker-compose.yml file, this can be changed for other configurations
Run project
Open http://localhost:9091/ui/clusters/local/all-topics?perPage=25 to see topics and messages on Kafka
Send CURL to create a connector, post connector
Open messages in kafka ui localhost:9091, product.public.product topic should be seen
Send request to create Product
–header ‘Content-Type: application/json’
–header ‘Cookie: csrftoken=M7jtiBvQrP5MVhm9kN1nbq8hNFgi0lRS’
–data ‘{
“name” : “pant”,
“price” : 150,
“stock” : 4
}’
Check debezium topic [CREATE]
“before”: null,
“after”: {
“id”: 25,
“name”: “pant”,
“price”: “Opg=”,
“stock”: 4
},
“source”: {
“version”: “2.5.0.Final”,
“connector”: “postgresql”,
“name”: “product”,
“ts_ms”: 1710684511415,
“snapshot”: “false”,
“db”: “debezium_demo”,
“sequence”: “[“25659272″,”25659664″]”,
“schema”: “public”,
“table”: “product”,
“txId”: 771,
“lsn”: 25659664,
“xmin”: null
},
“op”: “c”,
“ts_ms”: 1710684511616,
“transaction”: null
}
Make an update operation to product:
–header ‘Content-Type: application/json’
–header ‘Cookie: csrftoken=M7jtiBvQrP5MVhm9kN1nbq8hNFgi0lRS’
–data ‘{
“name” : “pant”,
“price” : 1512,
“stock” : 6
}’
Check messages in debezium topic: product.public.product
“before”: {
“id”: 14,
“name”: “pant”,
“price”: “Opg=”,
“stock”: 4
},
“after”: {
“id”: 14,
“name”: “pant”,
“price”: “Ak6g”,
“stock”: 6
},
“source”: {
“version”: “2.5.0.Final”,
“connector”: “postgresql”,
“name”: “product”,
“ts_ms”: 1710680240132,
“snapshot”: “false”,
“db”: “debezium_demo”,
“sequence”: “[“25645448″,”25645504″]”,
“schema”: “public”,
“table”: “product”,
“txId”: 760,
“lsn”: 25645504,
“xmin”: null
},
“op”: “u”,
“ts_ms”: 1710680240470,
“transaction”: null
}