いつデータストリーミングが本当に必要になるのか?
キャリアの初期、私はすべてのデータをREST API経由で直接データベースに保存する習慣がありました。同時実行ユーザーが5万人に達するまでは問題ありませんでしたが、私が参加したプロジェクトでその節目を超えたとき、毎秒数百万のログ、トラッキングイベント、通知が押し寄せ、サーバーが限界を迎えました。レイテンシは200msから10sに跳ね上がり、データベースは「Too many connections」エラーを吐き出し続け、いいね(Like)ボタンのクリックが急増しただけでシステムは完全にダウンしました。
問題の本質は、伝統的なデータベースがこれほど高頻度かつ継続的な書き込みを想定して設計されていないことにあります。数百万のメッセージを飲み込み、安全に保存し、後から各サービスに配信できる「ブラックホール」のような存在が必要です。そこでApache Kafkaの出番です。
武器の選択:Kafka、RabbitMQ、それともRedis Pub/Sub?
最初のコードを書く前に、現在の主要なメッセージブローカーソリューションを比較してみましょう。
1. Redis Pub/Sub
長所: RAM上で動作するため極めて高速で、遅延はほぼゼロです。
短所: 「投げっぱなし(Fire and Forget)」の仕組みです。コンシューマー(受信側)が切断されている間に送られたメッセージは永久に消失します。データの保証が必要な場合には向きません。
2. RabbitMQ (Message Queue)
長所: 複雑なルーティング管理に優れ、メッセージを確実に目的地へ届けます。
短所: 未処理メッセージが数百万件溜まると、パフォーマンスが著しく低下します。スループットよりもメッセージの配信管理を優先する設計です。
3. Apache Kafka (Event Streaming)
長所: データをディスクにアペンドオンリー(追記型)ログとして書き込むため、非常に堅牢です。デバッグのために3日前のデータを「リプレイ」することも可能です。NetflixやUberのような大手は、1日あたり数兆件のイベントを処理するためにKafkaを使用しています。
短所: 学習曲線がやや険しく、Redisよりも運用リソースを消費します。
アドバイス: 絶対的な信頼性、過去データの遡及、そして無限の拡張性が必要なら、Kafkaを選びましょう。
Dockerによるクイックスタート
Kafkaの手動インストールは、JavaやZookeeperへの依存関係が多く、挫折しがちです。最も早い方法はDockerを使うことです。30秒足らずで標準的な環境を構築できます。
docker-compose.ymlファイルを作成します:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker-compose up -dコマンドを実行します。これで、あなたのPCは立派なKafkaブローカーになりました。
Node.jsでのProducerとConsumerの実装
Node.jsの世界では、kafkajsライブラリが最も有力な候補です。軽量で純粋なJavaScriptで書かれており、パフォーマンスも非常に優れています。
ステップ1: プロジェクトのセットアップ
mkdir kafka-demo && cd kafka-demo
npm init -y
npm install kafkajs
ステップ2: Producer(送信側)の作成
Producerは、ニュースを本社に送る記者のようなものです。ここでは、注文完了イベントの送信をシミュレートします。
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'] });
const producer = kafka.producer();
(async () => {
await producer.connect();
console.log("✅ Producerの準備が完了しました");
await producer.send({
topic: 'orders',
messages: [
{ value: JSON.stringify({ id: 1, item: 'MacBook M3', price: 2500 }) },
],
});
await producer.disconnect();
})();
ステップ3: Consumer(受信側)の作成
Consumerは、トピックにメッセージが現れたらすぐに処理できるよう待機します。
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'inventory-service', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'inventory-group' });
(async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value.toString());
console.log(`📦 注文の在庫処理中: ${order.id}`);
},
});
})();
実践導入における3つの重要な教訓
1日500GB以上のデータを処理するシステムを運用して気づいたのは、動くコードを書くことは道のりの30%に過ぎないということです。残りの70%は運用の最適化にあります。
1. 本番環境でパーティションを1つにしない
Kafkaは**パーティション(Partition)**を通じて処理を並列化します。トピックにパーティションが1つしかない場合、データを読み取れるコンシューマーも1つだけです。サーバーを10台に増やしても、9台は何もせず待機することになります。将来のスケールアウトを容易にするため、少なくとも3つか6つのパーティションから始めましょう。
2. 冪等性(Idempotency)の確保
分散システムにおいて、コンシューマーが同じメッセージを重複して受信することは必ず発生します(ネットワークタイムアウトやリバランシングのため)。
解決策: 処理前に必ずデータベースのステータスを確認してください。例えば、そのorder_idが既に在庫減算済みであれば、二重に処理せずスキップするようにします。
3. 常にConsumer Lagを監視する
Consumer Lag(コンシューマー・ラグ)は、実際のデータに対して処理がどれだけ遅れているかを示す数値です。この数値が数十万に達している場合、処理コードが遅すぎるか、流入データが多すぎることを意味します。ディスクがいっぱいになる(リテンションポリシー)まで、コンシューマーが2日前から停止していたことに気づかない、という事態は避けなければなりません。
まとめ
Kafkaは単なる技術ではなく、ソフトウェアの考え方を「リクエスト・レスポンス」から「**イベント駆動(Event-Driven)**」へと変えるものです。Node.jsのノンブロッキング機構は、スムーズなI/O処理のおかげでKafkaのコンシューマーとして非常に適しています。
小規模なアプリケーションなら、無理に複雑にする必要はありません。しかし、低遅延で数百万人を支えるシステムを目指すなら、Kafkaはマスターすべき鍵となるでしょう。

