Lúc 2 giờ sáng và cái bài toán đồng bộ dữ liệu kinh điển
Hệ thống đang chạy ổn, rồi team data yêu cầu: “Tụi mình cần stream mọi thay đổi từ database lên Kafka để analytics real-time.” Nghe đơn giản, nhưng khi nhìn vào codebase cũ có hàng chục service đang ghi vào MySQL — viết lại từng service để emit event là việc không tưởng.
Mình đã gặp đúng tình huống này. Không thể sửa ứng dụng, không có downtime, nhưng vẫn phải đồng bộ real-time. Đó là lúc mình tìm đến Debezium — và nó xử lý gọn bài toán mà không cần đụng vào một dòng code nào.
Debezium là gì và tại sao nó không cần sửa code
Debezium là một open-source CDC (Change Data Capture) platform, chạy như Kafka Connect connector. Thay vì hook vào application layer, nó đọc thẳng từ transaction log của database:
- MySQL:
binlog - PostgreSQL:
WAL(Write-Ahead Log) qua logical replication
Mỗi khi có INSERT, UPDATE, DELETE — Debezium capture event đó và push lên Kafka topic tương ứng. Ứng dụng gốc không biết gì. Không cần sửa dòng code nào.
Debezium không chỉ báo “row này vừa bị sửa” mà còn emit cả before/after state — nghĩa là bạn có đủ dữ liệu để biết giá trị cũ lẫn giá trị mới cùng lúc. Rất hữu ích cho audit log hoặc data reconciliation. Latency thực tế thường dưới 1 giây tính từ lúc commit vào database đến khi event xuất hiện trên Kafka.
Thực hành: Dựng stack Debezium + Kafka + MySQL bằng Docker Compose
Bước 1: Chuẩn bị Docker Compose
Tạo file docker-compose.yml với đầy đủ các service cần thiết:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: inventory
MYSQL_USER: debezium
MYSQL_PASSWORD: dbzpass
command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW --binlog-row-image=FULL
connect:
image: debezium/connect:2.5
depends_on: [kafka, mysql]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_connect_configs
OFFSET_STORAGE_TOPIC: debezium_connect_offsets
STATUS_STORAGE_TOPIC: debezium_connect_statuses
docker compose up -d
# Đợi khoảng 30 giây cho các service khởi động
docker compose ps
Bước 2: Cấu hình MySQL cho binlog
MySQL cần user có đủ quyền để Debezium đọc binlog. Kết nối vào MySQL container và tạo quyền:
docker exec -it <mysql-container> mysql -uroot -prootpass
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
-- Kiểm tra binlog đang bật
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
Output phải là log_bin = ON và binlog_format = ROW. Nếu format không phải ROW, Debezium chỉ nhận được statement SQL chứ không có full row data — connector sẽ chạy nhưng event thiếu trường before/after.
Bước 3: Đăng ký MySQL Connector
Debezium connector được register qua Kafka Connect REST API:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbzpass",
"database.server.id": "184054",
"topic.prefix": "myapp",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}'
Kiểm tra connector đã chạy chưa:
curl http://localhost:8083/connectors/mysql-connector/status
# "state": "RUNNING" là thành công
Bước 4: Thử ghi dữ liệu và xem Kafka event
Tạo table và insert data vào MySQL:
USE inventory;
CREATE TABLE orders (
id INT AUTO_INCREMENT PRIMARY KEY,
product VARCHAR(100),
quantity INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO orders (product, quantity) VALUES ('Laptop', 2);
UPDATE orders SET quantity = 3 WHERE id = 1;
DELETE FROM orders WHERE id = 1;
Xem event trên Kafka topic:
docker exec -it <kafka-container> \
kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic myapp.inventory.orders \
--from-beginning
Ba thao tác trên sẽ sinh ra ba JSON message riêng biệt. Field op cho biết loại thao tác: c (create/insert), u (update), d (delete). Kèm theo đó là before và after — với DELETE thì after là null, với INSERT thì before là null.
Cấu hình PostgreSQL CDC với Debezium
PostgreSQL cần thêm một bước so với MySQL: bật logical replication. Trong postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Nếu dùng Docker, truyền tham số thẳng qua command để khỏi phải mount file config:
postgres:
image: postgres:15
command: postgres -c wal_level=logical -c max_replication_slots=4
environment:
POSTGRES_PASSWORD: pgpass
POSTGRES_DB: mydb
Tạo user với quyền replication:
CREATE USER debezium REPLICATION LOGIN PASSWORD 'dbzpass';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
Đăng ký PostgreSQL connector:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbzpass",
"database.dbname": "mydb",
"topic.prefix": "pgapp",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot"
}
}'
Cảnh báo quan trọng với PostgreSQL: Debezium tạo một replication slot trên Postgres và slot này giữ WAL lại cho đến khi được consume. Nếu Debezium bị dừng vài giờ trên hệ thống write-heavy, WAL có thể tích lũy vài chục GB và làm đầy disk. Luôn monitor lag của replication slot:
SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;
Nếu thấy cột lag vượt quá vài GB thì cần xem lại ngay — hoặc Debezium đang bị stuck, hoặc consumer downstream quá chậm.
Xử lý một vài lỗi hay gặp
MySQL: “Access denied; you need the SUPER privilege”
Xảy ra khi MySQL 8.0 yêu cầu thêm quyền BACKUP_ADMIN cho consistent snapshot. Thêm:
GRANT BACKUP_ADMIN ON *.* TO 'debezium'@'%';
Connector stuck ở UNASSIGNED
Thường do Kafka Connect chưa kịp khởi động xong. Xóa connector và đăng ký lại sau 30 giây:
curl -X DELETE http://localhost:8083/connectors/mysql-connector
# Đợi rồi POST lại
Schema history topic bị thiếu
Nếu Debezium báo lỗi không đọc được schema history, kiểm tra topic đã tồn tại chưa và đặt retention thành unlimited cho topic này — đây là topic quan trọng nhất, mất nó là connector không khởi động được:
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name schema-changes.inventory \
--alter --add-config retention.ms=-1
À, có lần mình cần đối chiếu dữ liệu config giữa các môi trường — export ra CSV rồi cần convert sang JSON để import vào script Python. Lúc đó mình hay dùng converter tại toolcraft.app/vi/tools/data/csv-to-json — chạy trên trình duyệt nên không sợ lộ data production ra ngoài, nhanh hơn nhiều so với viết script tay.
Kết luận
Debezium giải quyết bài toán CDC theo cách ít xâm lấn nhất: không đụng vào application code, không cần database trigger, chỉ đọc những gì database đã ghi sẵn vào transaction log. MySQL thì dùng binlog ROW format, PostgreSQL thì dùng logical replication với pgoutput plugin.
Ba điểm cần nhớ khi đưa vào production:
- Monitor replication slot lag với PostgreSQL — đây là nguồn gây đầy disk phổ biến nhất, lag vài GB là dấu hiệu cần xử lý ngay
- Đặt retention cho schema history topic thành unlimited (
retention.ms=-1) - Dùng dedicated Kafka Connect cluster riêng nếu traffic cao, tránh dùng chung với application Kafka
- Test failover scenario: tắt Debezium 1 tiếng rồi bật lại, đảm bảo nó catch up đúng từ checkpoint
Khi đã quen với flow này, CDC trở thành foundation cho hàng loạt use case khác: event sourcing, data lake ingestion, cache invalidation, microservice sync. Tất cả chạy được mà không cần chạm vào legacy application — đó mới là điểm đáng giá nhất.

