Hướng dẫn sử dụng Debezium để theo dõi thay đổi dữ liệu (CDC) từ MySQL và PostgreSQL: Đồng bộ real-time lên Kafka không cần sửa ứng dụng

Database tutorial - IT technology blog
Database tutorial - IT technology blog

Lúc 2 giờ sáng và cái bài toán đồng bộ dữ liệu kinh điển

Hệ thống đang chạy ổn, rồi team data yêu cầu: “Tụi mình cần stream mọi thay đổi từ database lên Kafka để analytics real-time.” Nghe đơn giản, nhưng khi nhìn vào codebase cũ có hàng chục service đang ghi vào MySQL — viết lại từng service để emit event là việc không tưởng.

Mình đã gặp đúng tình huống này. Không thể sửa ứng dụng, không có downtime, nhưng vẫn phải đồng bộ real-time. Đó là lúc mình tìm đến Debezium — và nó xử lý gọn bài toán mà không cần đụng vào một dòng code nào.

Debezium là gì và tại sao nó không cần sửa code

Debezium là một open-source CDC (Change Data Capture) platform, chạy như Kafka Connect connector. Thay vì hook vào application layer, nó đọc thẳng từ transaction log của database:

  • MySQL: binlog
  • PostgreSQL: WAL (Write-Ahead Log) qua logical replication

Mỗi khi có INSERT, UPDATE, DELETE — Debezium capture event đó và push lên Kafka topic tương ứng. Ứng dụng gốc không biết gì. Không cần sửa dòng code nào.

Debezium không chỉ báo “row này vừa bị sửa” mà còn emit cả before/after state — nghĩa là bạn có đủ dữ liệu để biết giá trị cũ lẫn giá trị mới cùng lúc. Rất hữu ích cho audit log hoặc data reconciliation. Latency thực tế thường dưới 1 giây tính từ lúc commit vào database đến khi event xuất hiện trên Kafka.

Thực hành: Dựng stack Debezium + Kafka + MySQL bằng Docker Compose

Bước 1: Chuẩn bị Docker Compose

Tạo file docker-compose.yml với đầy đủ các service cần thiết:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:8.0
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: inventory
      MYSQL_USER: debezium
      MYSQL_PASSWORD: dbzpass
    command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW --binlog-row-image=FULL

  connect:
    image: debezium/connect:2.5
    depends_on: [kafka, mysql]
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_connect_configs
      OFFSET_STORAGE_TOPIC: debezium_connect_offsets
      STATUS_STORAGE_TOPIC: debezium_connect_statuses
docker compose up -d
# Đợi khoảng 30 giây cho các service khởi động
docker compose ps

Bước 2: Cấu hình MySQL cho binlog

MySQL cần user có đủ quyền để Debezium đọc binlog. Kết nối vào MySQL container và tạo quyền:

docker exec -it <mysql-container> mysql -uroot -prootpass
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

-- Kiểm tra binlog đang bật
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

Output phải là log_bin = ONbinlog_format = ROW. Nếu format không phải ROW, Debezium chỉ nhận được statement SQL chứ không có full row data — connector sẽ chạy nhưng event thiếu trường before/after.

Bước 3: Đăng ký MySQL Connector

Debezium connector được register qua Kafka Connect REST API:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "mysql-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbzpass",
      "database.server.id": "184054",
      "topic.prefix": "myapp",
      "database.include.list": "inventory",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
      "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
  }'

Kiểm tra connector đã chạy chưa:

curl http://localhost:8083/connectors/mysql-connector/status
# "state": "RUNNING" là thành công

Bước 4: Thử ghi dữ liệu và xem Kafka event

Tạo table và insert data vào MySQL:

USE inventory;
CREATE TABLE orders (
  id INT AUTO_INCREMENT PRIMARY KEY,
  product VARCHAR(100),
  quantity INT,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO orders (product, quantity) VALUES ('Laptop', 2);
UPDATE orders SET quantity = 3 WHERE id = 1;
DELETE FROM orders WHERE id = 1;

Xem event trên Kafka topic:

docker exec -it <kafka-container> \
  kafka-console-consumer \
  --bootstrap-server localhost:29092 \
  --topic myapp.inventory.orders \
  --from-beginning

Ba thao tác trên sẽ sinh ra ba JSON message riêng biệt. Field op cho biết loại thao tác: c (create/insert), u (update), d (delete). Kèm theo đó là beforeafter — với DELETE thì after là null, với INSERT thì before là null.

Cấu hình PostgreSQL CDC với Debezium

PostgreSQL cần thêm một bước so với MySQL: bật logical replication. Trong postgresql.conf:

wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

Nếu dùng Docker, truyền tham số thẳng qua command để khỏi phải mount file config:

postgres:
  image: postgres:15
  command: postgres -c wal_level=logical -c max_replication_slots=4
  environment:
    POSTGRES_PASSWORD: pgpass
    POSTGRES_DB: mydb

Tạo user với quyền replication:

CREATE USER debezium REPLICATION LOGIN PASSWORD 'dbzpass';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

Đăng ký PostgreSQL connector:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "dbzpass",
      "database.dbname": "mydb",
      "topic.prefix": "pgapp",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot"
    }
  }'

Cảnh báo quan trọng với PostgreSQL: Debezium tạo một replication slot trên Postgres và slot này giữ WAL lại cho đến khi được consume. Nếu Debezium bị dừng vài giờ trên hệ thống write-heavy, WAL có thể tích lũy vài chục GB và làm đầy disk. Luôn monitor lag của replication slot:

SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;

Nếu thấy cột lag vượt quá vài GB thì cần xem lại ngay — hoặc Debezium đang bị stuck, hoặc consumer downstream quá chậm.

Xử lý một vài lỗi hay gặp

MySQL: “Access denied; you need the SUPER privilege”

Xảy ra khi MySQL 8.0 yêu cầu thêm quyền BACKUP_ADMIN cho consistent snapshot. Thêm:

GRANT BACKUP_ADMIN ON *.* TO 'debezium'@'%';

Connector stuck ở UNASSIGNED

Thường do Kafka Connect chưa kịp khởi động xong. Xóa connector và đăng ký lại sau 30 giây:

curl -X DELETE http://localhost:8083/connectors/mysql-connector
# Đợi rồi POST lại

Schema history topic bị thiếu

Nếu Debezium báo lỗi không đọc được schema history, kiểm tra topic đã tồn tại chưa và đặt retention thành unlimited cho topic này — đây là topic quan trọng nhất, mất nó là connector không khởi động được:

kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name schema-changes.inventory \
  --alter --add-config retention.ms=-1

À, có lần mình cần đối chiếu dữ liệu config giữa các môi trường — export ra CSV rồi cần convert sang JSON để import vào script Python. Lúc đó mình hay dùng converter tại toolcraft.app/vi/tools/data/csv-to-json — chạy trên trình duyệt nên không sợ lộ data production ra ngoài, nhanh hơn nhiều so với viết script tay.

Kết luận

Debezium giải quyết bài toán CDC theo cách ít xâm lấn nhất: không đụng vào application code, không cần database trigger, chỉ đọc những gì database đã ghi sẵn vào transaction log. MySQL thì dùng binlog ROW format, PostgreSQL thì dùng logical replication với pgoutput plugin.

Ba điểm cần nhớ khi đưa vào production:

  • Monitor replication slot lag với PostgreSQL — đây là nguồn gây đầy disk phổ biến nhất, lag vài GB là dấu hiệu cần xử lý ngay
  • Đặt retention cho schema history topic thành unlimited (retention.ms=-1)
  • Dùng dedicated Kafka Connect cluster riêng nếu traffic cao, tránh dùng chung với application Kafka
  • Test failover scenario: tắt Debezium 1 tiếng rồi bật lại, đảm bảo nó catch up đúng từ checkpoint

Khi đã quen với flow này, CDC trở thành foundation cho hàng loạt use case khác: event sourcing, data lake ingestion, cache invalidation, microservice sync. Tất cả chạy được mà không cần chạm vào legacy application — đó mới là điểm đáng giá nhất.

Share: