Node.jsでMongoDB Change Streams:リアルタイムデータ変更を追跡する

Database tutorial - IT technology blog
Database tutorial - IT technology blog

実際の課題:pollingではもう限界

こういう状況は何度も経験してきた:新しい注文が入ったとき、支払いステータスが変わったとき、documentが削除されたとき。システムはすぐに反応しなければならない。最初に多くの人が思いつく解決策はpollingだ——数秒ごとにdatabaseにqueryを投げる方法。

シンプルに聞こえるが、問題はすぐに顔を出す。5秒ごとのpollingは最大5秒の遅延を意味する——支払い確認を待つユーザーには不快だ。さらに50〜100クライアントが同時にpollingするようになると、databaseは毎分数千もの無駄なqueryを処理しなければならない。実際に、2秒ごとにpollingするサービスが「新しいものがあるか確認するだけ」のために1日に〜43,000 queryを生成するのを目にしたことがある。

MySQL、PostgreSQL、MongoDBと、さまざまなプロジェクトで使ってきたが、それぞれに強みがある。そしてMongoDBの真の強みの一つがChange Streamsだ——pollingなしで、データ変更をリアルタイムに監視できる機能。

Change Streamsの仕組み

内部的には、Change Streamsはoplog(operation log)——MongoDBが元から備えるreplicationの仕組みに基づいている。変更(insert、update、delete、replace)が発生するたびに、MongoDBはoplogにentryを書き込む。Change Streamsはそのoplogをフォローするcursorをオープンしてアプリケーションにイベントをプッシュする——通常100ms以内に。

一つ絶対に外せない条件がある:MongoDBはReplica SetまたはSharded Clusterモードで動作している必要がある。Standaloneインスタンスは、Change Streamsをサポートしない。MongoDB Atlasを使えばすぐに利用できるが、セルフホストの場合はreplica setを先に初期化する必要がある。

Replica Setの素早い初期化(dev/test)

# replica setでMongoDBを起動
mongod --replSet rs0 --dbpath /data/db --port 27017

# 接続して初期化
mongosh
rs.initiate()

Node.jsでChange Streamsを実装する

1. 基本セットアップ — collectionのすべての変更を監視

const { MongoClient } = require('mongodb');

const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
const client = new MongoClient(uri);

async function watchOrders() {
  await client.connect();
  const db = client.db('shop');
  const collection = db.collection('orders');

  // change streamをオープン
  const changeStream = collection.watch();

  changeStream.on('change', (change) => {
    console.log('変更あり:', change.operationType);
    console.log('Document:', change.fullDocument);
  });

  changeStream.on('error', (err) => {
    console.error('Change streamエラー:', err);
  });
}

watchOrders();

2. aggregation pipelineでイベントをフィルタリング

あらゆるノイズイベントを処理するハメになってから学んだ教訓:アプリがすべて受け取ってからコードでフィルタリングするのはやめよう。pipeline内でフィルタリングする——MongoDBがサーバー側で処理してくれるので、アプリは必要なものだけを受け取れる。

// 注文ステータスが'paid'に変わったときだけ監視
const pipeline = [
  {
    $match: {
      operationType: 'update',
      'updateDescription.updatedFields.status': 'paid'
    }
  }
];

const changeStream = collection.watch(pipeline, {
  fullDocument: 'updateLookup' // update後のdocument全体を取得
});

changeStream.on('change', async (change) => {
  const order = change.fullDocument;
  console.log(`注文 ${order._id} の支払いが完了 — 確認メールを送信`);
  await sendConfirmationEmail(order);
});

3. Resume token — restart時のイベント漏れを防ぐ

最初はなかなか気づかない点だ。しかしアプリがcrashしたりrestartしたりすると、ダウンタイム中に発生したすべてのイベントが失われる——警告もなく、静かに。MongoDBはresume tokenを提供しており、最後の停止地点から正確に再開できる。

const fs = require('fs');
const TOKEN_FILE = './resume_token.json';

function loadResumeToken() {
  try {
    const data = fs.readFileSync(TOKEN_FILE, 'utf8');
    return JSON.parse(data);
  } catch {
    return null;
  }
}

function saveResumeToken(token) {
  fs.writeFileSync(TOKEN_FILE, JSON.stringify(token));
}

async function watchWithResume() {
  await client.connect();
  const collection = client.db('shop').collection('orders');

  const resumeToken = loadResumeToken();
  const options = resumeToken
    ? { resumeAfter: resumeToken, fullDocument: 'updateLookup' }
    : { fullDocument: 'updateLookup' };

  const changeStream = collection.watch([], options);

  changeStream.on('change', async (change) => {
    // 処理前にtokenを保存——処理後にcrashすると重複する可能性があるが、
    // 少なくともイベントを失うことはない
    saveResumeToken(change._id);
    await processChange(change);
  });
}

reconnect処理——最も見落とされやすい部分

Change StreamはMongoDBへの接続が切れても自動的にreconnectしない。deployして問題ないと思っていたら、翌朝ログを確認するとChange StreamはMongoDBのメンテナンスrestartが原因で夜中に止まっていた、なんてことがあった。明確なエラーもなく、alertもない——streamはただ静かにイベントを受け取らなくなる。

方法1:exponential backoffを使ったループ

async function watchWithRetry() {
  let retryDelay = 1000; // 初期は1秒
  const maxDelay = 30000; // 最大30秒

  while (true) {
    try {
      await runChangeStream();
    } catch (err) {
      console.error(`Change streamエラー、${retryDelay}ms後に再試行:`, err.message);
      await new Promise(resolve => setTimeout(resolve, retryDelay));
      retryDelay = Math.min(retryDelay * 2, maxDelay);
    }
  }
}

async function runChangeStream() {
  const resumeToken = loadResumeToken();
  const changeStream = collection.watch([], {
    fullDocument: 'updateLookup',
    ...(resumeToken && { resumeAfter: resumeToken })
  });

  return new Promise((resolve, reject) => {
    changeStream.on('change', async (change) => {
      saveResumeToken(change._id);
      await processChange(change);
      retryDelay = 1000; // 接続が安定したらdelayをリセット
    });
    changeStream.on('error', reject);
    changeStream.on('close', resolve);
  });
}

方法2:built-in retryのあるライブラリを使う

retry logicを自分で実装したくない場合は、mongodb-change-stream-watcherを使うか、lifecycleを管理するclassでラップする方法もある。小規模なプロジェクトなら上記の手動アプローチで十分だ——dependencyが少なく、問題発生時にデバッグしやすい。

production-readyなパターン:実戦からの知見まとめ

productionでChange Streamsに何度も痛い目を見た末に、このパターンにたどり着いた:

resume tokenはfileではなくdatabaseに保存する

Fileはcontainerのrestartや新しいdeployで消えてしまう可能性がある。別のMongoDBコレクションに保存することで、atomicかつ永続性が大幅に向上する。

async function saveResumeTokenToDb(token) {
  await client.db('meta').collection('stream_tokens').updateOne(
    { _id: 'orders_stream' },
    { $set: { token, updatedAt: new Date() } },
    { upsert: true }
  );
}

async function loadResumeTokenFromDb() {
  const doc = await client.db('meta').collection('stream_tokens').findOne(
    { _id: 'orders_stream' }
  );
  return doc?.token || null;
}

graceful shutdownの処理

let activeChangeStream = null;

process.on('SIGINT', async () => {
  console.log('change streamを閉じています...');
  if (activeChangeStream) {
    await activeChangeStream.close();
  }
  await client.close();
  process.exit(0);
});

Monitoring——streamが止まっても気づかない状況を防ぐ

// 60秒ごとのハートビート——staleSecsが異常に増加したらstreamに問題あり
let lastEventTime = Date.now();

changeStream.on('change', (change) => {
  lastEventTime = Date.now();
  // ... イベントを処理
});

setInterval(() => {
  const staleSecs = Math.round((Date.now() - lastEventTime) / 1000);
  console.log(`[ChangeStream] Last event: ${staleSecs}s ago`);
  // staleSecsが高すぎる場合はmonitoringにmetricを送信
}, 60000);

実戦から学んだ注意点

  • oplog window:MongoDBはoplogを一定期間しか保持しない——デフォルトは約24時間またはディスク容量による。アプリが長時間オフラインになると、resume tokenが期限切れになる可能性がある。静かにcrashするのではなく、ChangeStreamHistoryLostErrorをcatchして最初からrestartする処理が必要だ。
  • updateのfullDocument:デフォルトでは、updateイベントには変更されたフィールドのみが含まれ、document全体は含まれない。full documentが必要な場合はfullDocument: 'updateLookup'を追加しよう——ただし、このオプションはquery lookupを追加で発生させ、latencyがわずかに増加することを覚えておくこと。
  • 重いsynchronousな処理には使わない:Change Streamsはside effects——メール送信、cacheの更新、push notificationなどに向いている。重い処理でevent loopをブロックしてはいけない。必要に応じてasyncで処理するか、queue(BullMQ、RabbitMQ)に投げよう。
  • databaseレベルでのWatch:個々のcollectionをwatchする代わりにdb.watch()を使うと、複数のcollectionを同時に監視できる——audit logやservice間のdata syncに役立つ。

Share: