Bài toán thực tế: Khi polling không còn là giải pháp
Mình từng gặp tình huống này nhiều lần: hệ thống cần phản ứng ngay khi có đơn hàng mới, khi trạng thái thanh toán thay đổi, hoặc khi một document bị xóa. Giải pháp đầu tiên mà hầu hết mọi người nghĩ đến là polling — cứ mỗi vài giây lại query database một lần.
Nghe thì đơn giản, nhưng vấn đề nhanh chóng lộ ra. Poll mỗi 5 giây nghĩa là độ trễ tối đa 5 giây — khó chịu với người dùng đang chờ xác nhận thanh toán. Còn khi scale lên 50–100 clients đồng thời poll, database gánh thêm hàng nghìn query thừa mỗi phút. Mình đã từng thấy một service polling mỗi 2 giây tạo ra ~43.000 query/ngày chỉ để “kiểm tra xem có gì mới không”.
Mình đã làm việc với cả MySQL, PostgreSQL và MongoDB trong các dự án khác nhau — mỗi cái có điểm mạnh riêng. Và một trong những lợi thế thực sự của MongoDB là Change Streams — tính năng cho phép bạn lắng nghe thay đổi dữ liệu ngay lập tức, không cần polling.
Change Streams hoạt động thế nào?
Bên dưới hood, Change Streams dựa trên oplog (operation log) — cơ chế replication vốn có của MongoDB. Mỗi lần có thay đổi (insert, update, delete, replace), MongoDB ghi một entry vào oplog. Change Streams mở một cursor theo dõi oplog đó và đẩy sự kiện về ứng dụng — thường trong vòng dưới 100ms.
Một điều kiện không thể bỏ qua: MongoDB phải chạy ở chế độ Replica Set hoặc Sharded Cluster. Standalone instance không hỗ trợ Change Streams. Dùng MongoDB Atlas thì đã sẵn sàng ngay; tự host thì cần khởi tạo replica set trước.
Khởi tạo Replica Set nhanh (dev/test)
# Chạy MongoDB với replica set
mongod --replSet rs0 --dbpath /data/db --port 27017
# Kết nối và khởi tạo
mongosh
rs.initiate()
Triển khai Change Streams trong Node.js
1. Setup cơ bản — lắng nghe toàn bộ thay đổi trong collection
const { MongoClient } = require('mongodb');
const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
const client = new MongoClient(uri);
async function watchOrders() {
await client.connect();
const db = client.db('shop');
const collection = db.collection('orders');
// Mở change stream
const changeStream = collection.watch();
changeStream.on('change', (change) => {
console.log('Có thay đổi:', change.operationType);
console.log('Document:', change.fullDocument);
});
changeStream.on('error', (err) => {
console.error('Change stream lỗi:', err);
});
}
watchOrders();
2. Lọc sự kiện với aggregation pipeline
Sau khi bị “ăn” đủ loại sự kiện rác, mình học được một bài: đừng để ứng dụng nhận tất cả rồi mới lọc ở code. Lọc ngay trong pipeline — MongoDB xử lý ở server, ứng dụng chỉ nhận đúng thứ mình cần.
// Chỉ lắng nghe khi trạng thái đơn hàng chuyển sang 'paid'
const pipeline = [
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': 'paid'
}
}
];
const changeStream = collection.watch(pipeline, {
fullDocument: 'updateLookup' // Lấy toàn bộ document sau khi update
});
changeStream.on('change', async (change) => {
const order = change.fullDocument;
console.log(`Đơn hàng ${order._id} đã thanh toán — gửi email xác nhận`);
await sendConfirmationEmail(order);
});
3. Resume token — không bỏ sót sự kiện khi restart
Ít ai nghĩ đến cái này lúc đầu. Nhưng nếu ứng dụng crash hoặc restart, bạn sẽ bỏ qua tất cả sự kiện xảy ra trong lúc downtime — âm thầm, không có cảnh báo nào. MongoDB cung cấp resume token để tiếp tục chính xác từ điểm dừng cuối cùng.
const fs = require('fs');
const TOKEN_FILE = './resume_token.json';
function loadResumeToken() {
try {
const data = fs.readFileSync(TOKEN_FILE, 'utf8');
return JSON.parse(data);
} catch {
return null;
}
}
function saveResumeToken(token) {
fs.writeFileSync(TOKEN_FILE, JSON.stringify(token));
}
async function watchWithResume() {
await client.connect();
const collection = client.db('shop').collection('orders');
const resumeToken = loadResumeToken();
const options = resumeToken
? { resumeAfter: resumeToken, fullDocument: 'updateLookup' }
: { fullDocument: 'updateLookup' };
const changeStream = collection.watch([], options);
changeStream.on('change', async (change) => {
// Lưu token trước khi xử lý — nếu crash sau khi xử lý thì có thể bị trùng,
// nhưng ít nhất không bị mất sự kiện
saveResumeToken(change._id);
await processChange(change);
});
}
Xử lý reconnect — phần dễ bị bỏ quên nhất
Change Stream không tự reconnect khi mất kết nối MongoDB. Mình đã từng deploy xong tưởng ổn, sáng hôm sau vào xem log thì Change Stream đã chết từ đêm vì MongoDB restart bảo trì. Không có lỗi rõ ràng, không có alert — stream chỉ im lặng không nhận sự kiện nữa.
Cách 1: Vòng lặp với exponential backoff
async function watchWithRetry() {
let retryDelay = 1000; // 1 giây ban đầu
const maxDelay = 30000; // Tối đa 30 giây
while (true) {
try {
await runChangeStream();
} catch (err) {
console.error(`Change stream lỗi, thử lại sau ${retryDelay}ms:`, err.message);
await new Promise(resolve => setTimeout(resolve, retryDelay));
retryDelay = Math.min(retryDelay * 2, maxDelay);
}
}
}
async function runChangeStream() {
const resumeToken = loadResumeToken();
const changeStream = collection.watch([], {
fullDocument: 'updateLookup',
...(resumeToken && { resumeAfter: resumeToken })
});
return new Promise((resolve, reject) => {
changeStream.on('change', async (change) => {
saveResumeToken(change._id);
await processChange(change);
retryDelay = 1000; // Reset delay khi kết nối ổn định
});
changeStream.on('error', reject);
changeStream.on('close', resolve);
});
}
Cách 2: Dùng thư viện có built-in retry
Không muốn tự implement retry logic? Có thể dùng mongodb-change-stream-watcher hoặc wrap trong một class quản lý lifecycle. Với dự án nhỏ thì cách thủ công trên đủ dùng — ít dependency, dễ debug hơn khi có vấn đề.
Pattern production-ready: tổng hợp từ thực chiến
Sau nhiều lần đau đớn với Change Streams trên production, mình đúc kết ra pattern này:
Lưu resume token vào database, không phải file
File có thể mất khi container restart hoặc deploy mới. Lưu vào MongoDB collection khác đảm bảo atomic và bền hơn nhiều.
async function saveResumeTokenToDb(token) {
await client.db('meta').collection('stream_tokens').updateOne(
{ _id: 'orders_stream' },
{ $set: { token, updatedAt: new Date() } },
{ upsert: true }
);
}
async function loadResumeTokenFromDb() {
const doc = await client.db('meta').collection('stream_tokens').findOne(
{ _id: 'orders_stream' }
);
return doc?.token || null;
}
Xử lý graceful shutdown
let activeChangeStream = null;
process.on('SIGINT', async () => {
console.log('Đang đóng change stream...');
if (activeChangeStream) {
await activeChangeStream.close();
}
await client.close();
process.exit(0);
});
Monitoring — đừng để stream chết mà không hay biết
// Heartbeat mỗi 60 giây — nếu staleSecs tăng bất thường thì stream có vấn đề
let lastEventTime = Date.now();
changeStream.on('change', (change) => {
lastEventTime = Date.now();
// ... xử lý sự kiện
});
setInterval(() => {
const staleSecs = Math.round((Date.now() - lastEventTime) / 1000);
console.log(`[ChangeStream] Last event: ${staleSecs}s ago`);
// Gửi metric lên monitoring nếu staleSecs quá cao
}, 60000);
Những điểm cần lưu ý từ thực chiến
- oplog window: MongoDB chỉ giữ oplog trong một khoảng thời gian nhất định — mặc định khoảng 24h hoặc theo dung lượng disk. Nếu ứng dụng offline quá lâu, resume token có thể đã hết hạn. Cần catch lỗi
ChangeStreamHistoryLostErrorvà restart từ đầu thay vì crash im lặng. - fullDocument với update: Mặc định, event
updatechỉ chứa các field được thay đổi, không phải toàn bộ document. ThêmfullDocument: 'updateLookup'nếu cần full document — nhưng nhớ rằng option này tạo thêm một query lookup, tăng nhẹ latency. - Không dùng cho heavy workload synchronous: Change Streams phù hợp cho side effects — gửi email, cập nhật cache, push notification. Đừng block event loop bằng tác vụ nặng. Xử lý async hoặc đẩy vào queue (BullMQ, RabbitMQ) nếu cần.
- Watch ở cấp database: Dùng
db.watch()thay vì watch từng collection để theo dõi nhiều collection cùng lúc — hữu ích cho audit log hoặc sync data giữa các service.

