Chinh phục MongoDB Aggregation Framework: Xây dựng Pipeline xử lý dữ liệu thực tế

Database tutorial - IT technology blog
Database tutorial - IT technology blog

Tại sao lệnh find() là chưa đủ?

Hồi mới làm quen với MongoDB, mình cứ ngỡ chỉ cần find()sort() là đủ cân cả thế giới. Nhưng đời không như là mơ. Trong một dự án thương mại điện tử cách đây 2 năm, mình nhận được yêu cầu: ‘Thống kê doanh thu theo từng danh mục, chỉ tính đơn hàng đã thanh toán tháng trước và phải kèm tên nhà cung cấp’.

Lúc đó, mình thử kéo hết dữ liệu về Backend rồi dùng map, filter, reduce để tính toán. Kết quả là RAM server nhảy vọt lên 90%. API mất tận 15 giây mới phản hồi với tập dữ liệu khoảng 500.000 bản ghi. Đó là lúc mình nhận ra phải nghiêm túc học về MongoDB Aggregation Framework.

Hãy tưởng tượng Aggregation là một ‘dây chuyền lọc nước’. Dữ liệu thô đi vào đầu này, qua các lõi lọc, khử khuẩn, bù khoáng và cho ra nước tinh khiết ở đầu kia. Thay vì kéo hàng GB dữ liệu về app, bạn để MongoDB tự xử lý ngay tại ‘nhà’ của nó. Tốc độ nhanh hơn, tiết kiệm băng thông và code Backend cũng sạch sẽ hơn hẳn.

Bắt đầu: Pipeline hoạt động như thế nào?

Tin vui là bạn chẳng cần cài đặt gì thêm vì Aggregation đã tích hợp sẵn. Để làm việc hiệu quả, mình khuyên dùng MongoDB Compass để quan sát dữ liệu biến đổi qua từng stage, hoặc dùng Mongosh nếu bạn thích gõ lệnh.

Cấu trúc cơ bản của một lệnh Aggregation là một mảng các stages (giai đoạn):

db.collection.aggregate([
  { $stage1: { ... } },
  { $stage2: { ... } },
  { $stage3: { ... } }
])

Mỗi giai đoạn sẽ nhận đầu vào là kết quả của bước trước đó. Hãy nhớ kỹ: Thứ tự sắp xếp các stage sẽ quyết định query của bạn chạy mất 1 giây hay 1 phút.

Dữ liệu thực hành mẫu

Giả sử chúng ta có collection orders như sau:

db.orders.insertMany([
  { _id: 1, product: "Laptop", category: "Electronics", amount: 1200, status: "completed" },
  { _id: 2, product: "Mouse", category: "Electronics", amount: 25, status: "completed" },
  { _id: 3, product: "Shirt", category: "Fashion", amount: 50, status: "pending" }
])

Xây dựng Pipeline qua 4 giai đoạn “vàng”

Nguyên tắc sống còn để tối ưu pipeline là: Lọc sớm, giảm dữ liệu nhanh nhất có thể.

1. $match – Bộ lọc tinh nhuệ

Luôn ưu tiên $match ở vị trí đầu tiên. Tại sao? Vì nó thu hẹp phạm vi dữ liệu và có thể tận dụng được Index. Nếu bạn gom nhóm dữ liệu ($group) trước khi lọc, MongoDB sẽ phải quét toàn bộ collection (Collection Scan), gây trì trệ hệ thống.

{ $match: { status: "completed" } }

2. $group – Trung tâm xử lý logic

Đây là nơi bạn thực hiện các phép toán. Bạn muốn tính tổng doanh thu, tìm giá trung bình hay đếm số đơn hàng? $group sẽ gom các tài liệu dựa trên một tiêu chí (_id).

{
  $group: {
    _id: "$category",
    totalRevenue: { $sum: "$amount" },
    avgOrderValue: { $avg: "$amount" },
    count: { $sum: 1 }
  }
}

Kinh nghiệm thực tế: Đừng ngại dùng $group. Chỉ cần bạn đã lọc ($match) tốt ở bước 1, bước này sẽ chạy cực kỳ mượt mà.

3. $lookup – Tuyệt chiêu “Join” trong NoSQL

Xóa tan lầm tưởng MongoDB không thể liên kết bảng. $lookup cho phép bạn kéo dữ liệu từ collection khác sang. Ví dụ: lấy thông tin chi tiết nhà cung cấp cho sản phẩm:

{
  $lookup: {
    from: "suppliers",
    localField: "supplierId",
    foreignField: "_id",
    as: "supplier_details"
  }
}

Lưu ý quan trọng: $lookup khá tốn tài nguyên. Hãy chắc chắn rằng field ở bảng ngoại (foreignField) đã được đánh index.

4. $project – Gọt giũa đầu ra

Đừng bao giờ trả về những field dư thừa. $project giúp bạn định nghĩa chính xác những gì API cần. Điều này giúp giảm đáng kể kích thước gói tin (payload) gửi về cho client.

{
  $project: {
    _id: 0,
    category: "$_id",
    totalRevenue: 1,
    status: { $literal: "SHIPPED" }
  }
}

Kiểm tra & Tối ưu: Đừng để Pipeline “ngốn” sạch RAM

Khi làm việc với dữ liệu lớn, Aggregation có thể trở thành gánh nặng nếu không biết cách kiểm soát. Có hai kỹ thuật mình luôn dùng trước khi push code:

Dùng explain() để soi hiệu năng

Tương tự SQL, MongoDB cung cấp lệnh explain(). Nó cho bạn biết pipeline có đang dùng index hay đang “vật vã” quét toàn bộ đĩa cứng.

db.orders.aggregate([...]).explain("executionStats")

Hãy chú ý mục winningPlan. Nếu thấy xuất hiện COLLSCAN ở giai đoạn lọc, đó là dấu hiệu bạn cần đánh index ngay lập tức.

Vượt giới hạn 100MB RAM

Mặc định, mỗi giai đoạn trong pipeline chỉ được dùng tối đa 100MB RAM. Nếu xử lý hàng triệu bản ghi, MongoDB sẽ báo lỗi ngay. Giải pháp là dùng option allowDiskUse: true để ghi dữ liệu tạm ra đĩa.

db.orders.aggregate([...], { allowDiskUse: true })

Tuy nhiên, ghi dữ liệu ra đĩa sẽ chậm hơn RAM. Cách tốt nhất vẫn là dùng $match$project để giữ tập dữ liệu nhỏ gọn nhất có thể.

Mẹo Debug hiệu quả

Thay vì viết một pipeline dài loằng ngoằng, hãy chạy thử từng giai đoạn một. Khi kết quả giai đoạn 1 chuẩn xác, bạn mới bắt đầu viết giai đoạn 2. Tính năng ‘Aggregation Pipeline Builder’ của MongoDB Compass rất hữu ích, giúp bạn nhìn thấy kết quả biến đổi ngay lập tức.

Tóm lại, MongoDB Aggregation Framework không hề khó nếu bạn nắm vững nguyên lý lọc dữ liệu sớm. Hãy bắt đầu từ những phép tính đơn giản và tối ưu index, bạn sẽ thấy việc xử lý hàng triệu record không còn là nỗi ám ảnh.

Share: