DebeziumでMySQL・PostgreSQLのデータ変更を追跡する(CDC):アプリ改修不要でKafkaにリアルタイム同期する方法

Database tutorial - IT technology blog
Database tutorial - IT technology blog

深夜2時とデータ同期の古典的な問題

システムは安定稼働していた。そこへデータチームからの要求が届く:「データベースのすべての変更をKafkaにストリームして、リアルタイム分析に使いたい。」 一見シンプルに聞こえるが、MySQLに書き込んでいる数十のサービスを抱えた古いコードベースを前にすると、各サービスを書き直してイベントをemitさせるのは現実的ではない。

まさにこの状況に直面した。アプリは触れない、ダウンタイムもない、でもリアルタイム同期は必要。そのときに辿り着いたのがDebeziumだ――コードを一行も変えずに、この問題をきれいに解決してくれた。

Debeziumとは何か、なぜコード修正が不要なのか

DebeziumはオープンソースのCDC(Change Data Capture)プラットフォームで、Kafka Connectコネクターとして動作する。アプリケーション層にフックするのではなく、データベースのトランザクションログを直接読み取る:

  • MySQL:binlog
  • PostgreSQL:WAL(Write-Ahead Log)、ロジカルレプリケーション経由

INSERTUPDATEDELETEが発生するたびに、Debeziumはそのイベントをキャプチャして対応するKafkaトピックにpushする。元のアプリケーションは何も知らない。コードの修正は一切不要だ。

Debeziumは「この行が変更された」と通知するだけでなく、変更前後の状態(before/after)もemitする――つまり古い値と新しい値を同時に把握できる。監査ログやデータ整合性確認に非常に役立つ。実際のレイテンシは、データベースへのコミットからKafkaにイベントが現れるまで、通常1秒未満だ。

実践:Docker ComposeでDebezium + Kafka + MySQLスタックを構築する

ステップ1:Docker Composeの準備

必要なサービスをすべて含むdocker-compose.ymlを作成する:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:8.0
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: inventory
      MYSQL_USER: debezium
      MYSQL_PASSWORD: dbzpass
    command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW --binlog-row-image=FULL

  connect:
    image: debezium/connect:2.5
    depends_on: [kafka, mysql]
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_connect_configs
      OFFSET_STORAGE_TOPIC: debezium_connect_offsets
      STATUS_STORAGE_TOPIC: debezium_connect_statuses
docker compose up -d
# 各サービスの起動を待つ(約30秒)
docker compose ps

ステップ2:MySQLのbinlogを設定する

DebeziumがbinlogをReadするには、適切な権限を持つユーザーが必要だ。MySQLコンテナに接続して権限を付与する:

docker exec -it <mysql-container> mysql -uroot -prootpass
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

-- binlogが有効か確認
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

出力はlog_bin = ONかつbinlog_format = ROWでなければならない。フォーマットがROWでない場合、DebeziumはフルのRow dataではなくSQL文のみを受け取る――コネクターは動作するが、イベントにbefore/afterフィールドが含まれなくなる。

ステップ3:MySQLコネクターを登録する

DebeziumコネクターはKafka Connect REST API経由で登録する:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "mysql-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbzpass",
      "database.server.id": "184054",
      "topic.prefix": "myapp",
      "database.include.list": "inventory",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
      "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
  }'

コネクターが起動しているか確認する:

curl http://localhost:8083/connectors/mysql-connector/status
# "state": "RUNNING" なら成功

ステップ4:データを書き込んでKafkaイベントを確認する

MySQLにテーブルを作成してデータをInsertする:

USE inventory;
CREATE TABLE orders (
  id INT AUTO_INCREMENT PRIMARY KEY,
  product VARCHAR(100),
  quantity INT,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO orders (product, quantity) VALUES ('Laptop', 2);
UPDATE orders SET quantity = 3 WHERE id = 1;
DELETE FROM orders WHERE id = 1;

KafkaトピックのイベントをReadする:

docker exec -it <kafka-container> \
  kafka-console-consumer \
  --bootstrap-server localhost:29092 \
  --topic myapp.inventory.orders \
  --from-beginning

上記3つの操作でそれぞれ別個のJSONメッセージが生成される。opフィールドで操作の種類がわかる:c(create/insert)、u(update)、d(delete)。またbeforeafterも含まれる――DELETEではafterがnull、INSERTではbeforeがnullになる。

DebeziumでPostgreSQL CDCを設定する

PostgreSQLはMySQLより一手間かかる:ロジカルレプリケーションを有効化する必要がある。postgresql.confに追記:

wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

Dockerを使う場合は、設定ファイルをマウントせずにcommandで直接パラメーターを渡せる:

postgres:
  image: postgres:15
  command: postgres -c wal_level=logical -c max_replication_slots=4
  environment:
    POSTGRES_PASSWORD: pgpass
    POSTGRES_DB: mydb

レプリケーション権限を持つユーザーを作成する:

CREATE USER debezium REPLICATION LOGIN PASSWORD 'dbzpass';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

PostgreSQLコネクターを登録する:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "dbzpass",
      "database.dbname": "mydb",
      "topic.prefix": "pgapp",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot"
    }
  }'

PostgreSQLに関する重要な注意:Debeziumはpostgres上にレプリケーションスロットを作成し、そのスロットはconsumeされるまでWALを保持し続ける。書き込みが多いシステムでDebeziumが数時間停止すると、WALが数十GBに膨らんでディスクを圧迫する可能性がある。レプリケーションスロットのlagは必ず監視すること

SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;

lagカラムが数GBを超えていたら要注意――Debeziumがスタックしているか、下流のconsumerが遅すぎるかのどちらかだ。

よくあるエラーへの対処法

MySQL:「Access denied; you need the SUPER privilege」

MySQL 8.0でconsistentスナップショットにBACKUP_ADMIN権限が追加で必要な場合に発生する。以下を実行:

GRANT BACKUP_ADMIN ON *.* TO 'debezium'@'%';

コネクターがUNASSIGNEDのまま止まる

Kafka Connectの起動が完了していない場合によく起きる。コネクターを削除して30秒後に再登録する:

curl -X DELETE http://localhost:8083/connectors/mysql-connector
# 待機してから再度POSTする

スキーマヒストリートピックが見つからない

Debeziumがスキーマヒストリーを読み込めないエラーを出したら、トピックが存在するか確認し、このトピックのretentionをunlimitedに設定する――最も重要なトピックで、失うとコネクターが起動できなくなる:

kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name schema-changes.inventory \
  --alter --add-config retention.ms=-1

以前、環境間でconfigデータを照合する必要があったとき――CSVでexportしてPythonスクリプトにimportするためJSONに変換したかった。そのとき重宝したのがtoolcraft.app/ja/tools/data/csv-to-jsonのコンバーターだ。ブラウザ上で動くので本番データが外部に漏れる心配がなく、スクリプトを手書きするより圧倒的に速い。

まとめ

Debeziumは最も侵襲性の低い方法でCDCの課題を解決する:アプリケーションコードに触れず、データベーストリガーも不要で、データベースがトランザクションログに書き込んだものをそのまま読み取るだけだ。MySQLにはbinlog ROWフォーマット、PostgreSQLにはpgoutputプラグインによるロジカルレプリケーションを使う。

本番運用で押さえておくべき3つのポイント:

  • PostgreSQLのレプリケーションスロットlagを監視する――ディスク逼迫の最も一般的な原因で、数GBのlagは即対処のサインだ
  • スキーマヒストリートピックのretentionをunlimited(retention.ms=-1)に設定する
  • トラフィックが多い場合は専用のKafka Connectクラスターを用意し、アプリケーション用Kafkaと共用しない
  • フェイルオーバーシナリオをテストする:Debeziumを1時間停止してから再起動し、チェックポイントから正しくcatch upすることを確認する

このフローに慣れると、CDCはさまざまなユースケースの基盤になる:イベントソーシング、データレイクへの取り込み、キャッシュ無効化、マイクロサービス間の同期。すべてレガシーアプリケーションに手を加えることなく実現できる――それこそがDebeziumの最大の価値だ。

Share: