MongoDB Aggregation Frameworkを極める:実践的なデータ処理パイプラインの構築

Database tutorial - IT technology blog
Database tutorial - IT technology blog

なぜ find() 命令だけでは不十分なのか?

MongoDBを使い始めたばかりの頃、私は find()sort() さえあれば、どんな要望にも応えられると思っていました。しかし、現実はそう甘くありませんでした。2年前のECサイトのプロジェクトで、「先月支払い済みの注文のみを対象に、サプライヤー名を含めてカテゴリ別の売上を集計してほしい」という依頼を受けました。

当時、私はすべてのデータをバックエンドに取得してから、mapfilterreduce を使って計算しようとしました。その結果、サーバーのRAM使用率は90%に急上昇。約50万件のデータセットに対し、APIのレスポンスに15秒もかかってしまったのです。その時、**MongoDB Aggregation Framework** を真剣に学ぶ必要があると痛感しました。

Aggregationを「浄水ライン」に例えてみてください。一端から原水が入り、ろ過、除菌、ミネラル添加などの工程を経て、もう一端から純水が出てきます。数GBものデータをアプリ側に持ってくる代わりに、MongoDB自身の「家」で処理を完結させるのです。速度は上がり、帯域幅は節約され、バックエンドのコードも驚くほどスッキリします。

はじめに:パイプラインはどのように機能するのか?

嬉しいことに、Aggregationは標準機能として組み込まれているため、追加のインストールは不要です。効率的に作業するために、ステージごとのデータ変換を視覚化できる **MongoDB Compass** や、コマンドラインが好きな方は **Mongosh** の使用をお勧めします。

Aggregation命令の基本構造は、stages(ステージ)の配列です:

db.collection.aggregate([
  { $stage1: { ... } },
  { $stage2: { ... } },
  { $stage3: { ... } }
])

各ステージは、前のステップの結果を入力として受け取ります。**ステージの順序によって、クエリの実行時間が1秒になるか1分になるかが決まる**ことを忘れないでください。

練習用サンプルデータ

次のような orders コレクションがあると仮定します:

db.orders.insertMany([
  { _id: 1, product: "Laptop", category: "Electronics", amount: 1200, status: "completed" },
  { _id: 2, product: "Mouse", category: "Electronics", amount: 25, status: "completed" },
  { _id: 3, product: "Shirt", category: "Fashion", amount: 50, status: "pending" }
])

4つの「黄金」ステージによるパイプラインの構築

パイプライン最適化の鉄則は、**「早期フィルタリング」を行い、できるだけ早くデータを削減すること**です。

1. $match – 精鋭フィルタ

常に $match を先頭に配置することを優先してください。なぜなら、データの範囲を絞り込み、**インデックス(Index)**を活用できるからです。フィルタリングの前にグルーピング($group)を行うと、MongoDBはコレクション全体をスキャン(Collection Scan)することになり、システム遅延の原因となります。

{ $match: { status: "completed" } }

2. $group – ロジック処理の中心

ここで計算処理を行います。売上合計の算出、平均価格の検索、注文数のカウントなどを行いたい場合に、$group は特定の基準(_id)に基づいてドキュメントをまとめます。

{
  $group: {
    _id: "$category",
    totalRevenue: { $sum: "$amount" },
    avgOrderValue: { $avg: "$amount" },
    count: { $sum: 1 }
  }
}

実践的なアドバイス:$group の使用をためらわないでください。ステップ1で適切にフィルタリング($match)できていれば、このステップは非常にスムーズに動作します。

3. $lookup – NoSQLにおける「Join」の秘策

MongoDBはテーブル結合ができないという誤解を解きましょう。$lookup を使えば、別のコレクションからデータを取得できます。例えば、製品のサプライヤー詳細情報を取得する場合:

{
  $lookup: {
    from: "suppliers",
    localField: "supplierId",
    foreignField: "_id",
    as: "supplier_details"
  }
}

重要な注意点:$lookup はリソースをかなり消費します。外部テーブルのフィールド(foreignField)にインデックスが貼られていることを必ず確認してください。

4. $project – 出力の整形

不要なフィールドを返してはいけません。$project を使うと、APIが必要とするデータを正確に定義できます。これにより、クライアントに送信するデータ(ペイロード)のサイズを大幅に削減できます。

{
  $project: {
    _id: 0,
    category: "$_id",
    totalRevenue: 1,
    status: { $literal: "SHIPPED" }
  }
}

テストと最適化:パイプラインにRAMを使い果たさせないために

大規模なデータを扱う際、Aggregationは適切に制御しないと負荷が大きくなる可能性があります。コードをプッシュする前に、私が常に使用している2つのテクニックを紹介します:

explain() でパフォーマンスを確認する

SQLと同様に、MongoDBは explain() 命令を提供しています。これにより、パイプラインがインデックスを使用しているか、あるいはハードディスク全体を「苦労して」スキャンしているかを知ることができます。

db.orders.aggregate([...]).explain("executionStats")

winningPlan セクションに注目してください。フィルタリングステージで COLLSCAN が表示されたら、すぐにインデックスを貼る必要があるという合図です。

100MB RAM制限を超える

デフォルトでは、パイプラインの各ステージで使用できるRAMは最大100MBです。数百万件のレコードを処理する場合、MongoDBはすぐにエラーを返します。解決策は、allowDiskUse: true オプションを使用して、一時データをディスクに書き出すことです。

db.orders.aggregate([...], { allowDiskUse: true })

ただし、ディスクへの書き出しはRAMよりも低速です。最善の方法は、やはり $match$project を使用して、データセットを可能な限りコンパクトに保つことです。

効率的なデバッグのコツ

長いパイプラインを一気に書くのではなく、1ステージずつ実行してみてください。ステージ1の結果が正確であることを確認してから、ステージ2を書き始めます。MongoDB Compassの「Aggregation Pipeline Builder」機能は非常に便利で、変換結果をリアルタイムで確認できます。

要約すると、データの早期フィルタリングの原則をマスターすれば、MongoDB Aggregation Frameworkは決して難しくありません。単純な計算とインデックスの最適化から始めれば、数百万件のレコード処理ももはや恐怖ではなくなるはずです。

Share: