実際の課題:pollingではもう限界
こういう状況は何度も経験してきた:新しい注文が入ったとき、支払いステータスが変わったとき、documentが削除されたとき。システムはすぐに反応しなければならない。最初に多くの人が思いつく解決策はpollingだ——数秒ごとにdatabaseにqueryを投げる方法。
シンプルに聞こえるが、問題はすぐに顔を出す。5秒ごとのpollingは最大5秒の遅延を意味する——支払い確認を待つユーザーには不快だ。さらに50〜100クライアントが同時にpollingするようになると、databaseは毎分数千もの無駄なqueryを処理しなければならない。実際に、2秒ごとにpollingするサービスが「新しいものがあるか確認するだけ」のために1日に〜43,000 queryを生成するのを目にしたことがある。
MySQL、PostgreSQL、MongoDBと、さまざまなプロジェクトで使ってきたが、それぞれに強みがある。そしてMongoDBの真の強みの一つがChange Streamsだ——pollingなしで、データ変更をリアルタイムに監視できる機能。
Change Streamsの仕組み
内部的には、Change Streamsはoplog(operation log)——MongoDBが元から備えるreplicationの仕組みに基づいている。変更(insert、update、delete、replace)が発生するたびに、MongoDBはoplogにentryを書き込む。Change Streamsはそのoplogをフォローするcursorをオープンしてアプリケーションにイベントをプッシュする——通常100ms以内に。
一つ絶対に外せない条件がある:MongoDBはReplica SetまたはSharded Clusterモードで動作している必要がある。Standaloneインスタンスは、Change Streamsをサポートしない。MongoDB Atlasを使えばすぐに利用できるが、セルフホストの場合はreplica setを先に初期化する必要がある。
Replica Setの素早い初期化(dev/test)
# replica setでMongoDBを起動
mongod --replSet rs0 --dbpath /data/db --port 27017
# 接続して初期化
mongosh
rs.initiate()
Node.jsでChange Streamsを実装する
1. 基本セットアップ — collectionのすべての変更を監視
const { MongoClient } = require('mongodb');
const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
const client = new MongoClient(uri);
async function watchOrders() {
await client.connect();
const db = client.db('shop');
const collection = db.collection('orders');
// change streamをオープン
const changeStream = collection.watch();
changeStream.on('change', (change) => {
console.log('変更あり:', change.operationType);
console.log('Document:', change.fullDocument);
});
changeStream.on('error', (err) => {
console.error('Change streamエラー:', err);
});
}
watchOrders();
2. aggregation pipelineでイベントをフィルタリング
あらゆるノイズイベントを処理するハメになってから学んだ教訓:アプリがすべて受け取ってからコードでフィルタリングするのはやめよう。pipeline内でフィルタリングする——MongoDBがサーバー側で処理してくれるので、アプリは必要なものだけを受け取れる。
// 注文ステータスが'paid'に変わったときだけ監視
const pipeline = [
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': 'paid'
}
}
];
const changeStream = collection.watch(pipeline, {
fullDocument: 'updateLookup' // update後のdocument全体を取得
});
changeStream.on('change', async (change) => {
const order = change.fullDocument;
console.log(`注文 ${order._id} の支払いが完了 — 確認メールを送信`);
await sendConfirmationEmail(order);
});
3. Resume token — restart時のイベント漏れを防ぐ
最初はなかなか気づかない点だ。しかしアプリがcrashしたりrestartしたりすると、ダウンタイム中に発生したすべてのイベントが失われる——警告もなく、静かに。MongoDBはresume tokenを提供しており、最後の停止地点から正確に再開できる。
const fs = require('fs');
const TOKEN_FILE = './resume_token.json';
function loadResumeToken() {
try {
const data = fs.readFileSync(TOKEN_FILE, 'utf8');
return JSON.parse(data);
} catch {
return null;
}
}
function saveResumeToken(token) {
fs.writeFileSync(TOKEN_FILE, JSON.stringify(token));
}
async function watchWithResume() {
await client.connect();
const collection = client.db('shop').collection('orders');
const resumeToken = loadResumeToken();
const options = resumeToken
? { resumeAfter: resumeToken, fullDocument: 'updateLookup' }
: { fullDocument: 'updateLookup' };
const changeStream = collection.watch([], options);
changeStream.on('change', async (change) => {
// 処理前にtokenを保存——処理後にcrashすると重複する可能性があるが、
// 少なくともイベントを失うことはない
saveResumeToken(change._id);
await processChange(change);
});
}
reconnect処理——最も見落とされやすい部分
Change StreamはMongoDBへの接続が切れても自動的にreconnectしない。deployして問題ないと思っていたら、翌朝ログを確認するとChange StreamはMongoDBのメンテナンスrestartが原因で夜中に止まっていた、なんてことがあった。明確なエラーもなく、alertもない——streamはただ静かにイベントを受け取らなくなる。
方法1:exponential backoffを使ったループ
async function watchWithRetry() {
let retryDelay = 1000; // 初期は1秒
const maxDelay = 30000; // 最大30秒
while (true) {
try {
await runChangeStream();
} catch (err) {
console.error(`Change streamエラー、${retryDelay}ms後に再試行:`, err.message);
await new Promise(resolve => setTimeout(resolve, retryDelay));
retryDelay = Math.min(retryDelay * 2, maxDelay);
}
}
}
async function runChangeStream() {
const resumeToken = loadResumeToken();
const changeStream = collection.watch([], {
fullDocument: 'updateLookup',
...(resumeToken && { resumeAfter: resumeToken })
});
return new Promise((resolve, reject) => {
changeStream.on('change', async (change) => {
saveResumeToken(change._id);
await processChange(change);
retryDelay = 1000; // 接続が安定したらdelayをリセット
});
changeStream.on('error', reject);
changeStream.on('close', resolve);
});
}
方法2:built-in retryのあるライブラリを使う
retry logicを自分で実装したくない場合は、mongodb-change-stream-watcherを使うか、lifecycleを管理するclassでラップする方法もある。小規模なプロジェクトなら上記の手動アプローチで十分だ——dependencyが少なく、問題発生時にデバッグしやすい。
production-readyなパターン:実戦からの知見まとめ
productionでChange Streamsに何度も痛い目を見た末に、このパターンにたどり着いた:
resume tokenはfileではなくdatabaseに保存する
Fileはcontainerのrestartや新しいdeployで消えてしまう可能性がある。別のMongoDBコレクションに保存することで、atomicかつ永続性が大幅に向上する。
async function saveResumeTokenToDb(token) {
await client.db('meta').collection('stream_tokens').updateOne(
{ _id: 'orders_stream' },
{ $set: { token, updatedAt: new Date() } },
{ upsert: true }
);
}
async function loadResumeTokenFromDb() {
const doc = await client.db('meta').collection('stream_tokens').findOne(
{ _id: 'orders_stream' }
);
return doc?.token || null;
}
graceful shutdownの処理
let activeChangeStream = null;
process.on('SIGINT', async () => {
console.log('change streamを閉じています...');
if (activeChangeStream) {
await activeChangeStream.close();
}
await client.close();
process.exit(0);
});
Monitoring——streamが止まっても気づかない状況を防ぐ
// 60秒ごとのハートビート——staleSecsが異常に増加したらstreamに問題あり
let lastEventTime = Date.now();
changeStream.on('change', (change) => {
lastEventTime = Date.now();
// ... イベントを処理
});
setInterval(() => {
const staleSecs = Math.round((Date.now() - lastEventTime) / 1000);
console.log(`[ChangeStream] Last event: ${staleSecs}s ago`);
// staleSecsが高すぎる場合はmonitoringにmetricを送信
}, 60000);
実戦から学んだ注意点
- oplog window:MongoDBはoplogを一定期間しか保持しない——デフォルトは約24時間またはディスク容量による。アプリが長時間オフラインになると、resume tokenが期限切れになる可能性がある。静かにcrashするのではなく、
ChangeStreamHistoryLostErrorをcatchして最初からrestartする処理が必要だ。 - updateのfullDocument:デフォルトでは、
updateイベントには変更されたフィールドのみが含まれ、document全体は含まれない。full documentが必要な場合はfullDocument: 'updateLookup'を追加しよう——ただし、このオプションはquery lookupを追加で発生させ、latencyがわずかに増加することを覚えておくこと。 - 重いsynchronousな処理には使わない:Change Streamsはside effects——メール送信、cacheの更新、push notificationなどに向いている。重い処理でevent loopをブロックしてはいけない。必要に応じてasyncで処理するか、queue(BullMQ、RabbitMQ)に投げよう。
- databaseレベルでのWatch:個々のcollectionをwatchする代わりに
db.watch()を使うと、複数のcollectionを同時に監視できる——audit logやservice間のdata syncに役立つ。

