MongoDB Change Streams in Node.js: Real-Time Data Change Tracking

Database tutorial - IT technology blog
Database tutorial - IT technology blog

The Real Problem: When Polling Is No Longer the Answer

I’ve run into this situation more times than I can count: a system needs to react immediately when a new order comes in, when a payment status changes, or when a document gets deleted. The first solution most people reach for is polling — querying the database every few seconds.

It sounds simple, but the problems surface quickly. Polling every 5 seconds means a maximum latency of 5 seconds — frustrating for a user waiting on a payment confirmation. And when you scale up to 50–100 clients polling simultaneously, the database absorbs thousands of unnecessary queries every minute. I once saw a service polling every 2 seconds generate ~43,000 queries per day just to “check if anything is new.”

I’ve worked with MySQL, PostgreSQL, and MongoDB across different projects — each has its strengths. And one of MongoDB’s genuine advantages is Change Streams — a feature that lets you listen for data changes in real time, no polling required.

How Change Streams Work

Under the hood, Change Streams are built on the oplog (operation log) — MongoDB’s native replication mechanism. Every time a change occurs (insert, update, delete, replace), MongoDB writes an entry to the oplog. Change Streams open a cursor that tails this oplog and pushes events to your application — typically within 100ms.

One requirement you can’t skip: MongoDB must be running as a Replica Set or Sharded Cluster. Standalone instances do not support Change Streams. MongoDB Atlas is ready out of the box; self-hosted setups need a replica set initialized first.

Quick Replica Set Setup (dev/test)

# Start MongoDB with a replica set
mongod --replSet rs0 --dbpath /data/db --port 27017

# Connect and initialize
mongosh
rs.initiate()

Implementing Change Streams in Node.js

1. Basic Setup — Listening to All Changes in a Collection

const { MongoClient } = require('mongodb');

const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
const client = new MongoClient(uri);

async function watchOrders() {
  await client.connect();
  const db = client.db('shop');
  const collection = db.collection('orders');

  // Open a change stream
  const changeStream = collection.watch();

  changeStream.on('change', (change) => {
    console.log('Change detected:', change.operationType);
    console.log('Document:', change.fullDocument);
  });

  changeStream.on('error', (err) => {
    console.error('Change stream error:', err);
  });
}

watchOrders();

2. Filtering Events with an Aggregation Pipeline

After getting flooded with all kinds of noise events, I learned an important lesson: don’t let your application receive everything and filter in code. Filter inside the pipeline — MongoDB handles it on the server side, and your application only receives exactly what it needs.

// Only listen when an order status changes to 'paid'
const pipeline = [
  {
    $match: {
      operationType: 'update',
      'updateDescription.updatedFields.status': 'paid'
    }
  }
];

const changeStream = collection.watch(pipeline, {
  fullDocument: 'updateLookup' // Fetch the full document after update
});

changeStream.on('change', async (change) => {
  const order = change.fullDocument;
  console.log(`Order ${order._id} paid — sending confirmation email`);
  await sendConfirmationEmail(order);
});

3. Resume Tokens — Never Miss an Event on Restart

Few people think about this upfront. But if your application crashes or restarts, you’ll silently skip every event that occurred during the downtime — no warnings, no alerts. MongoDB provides a resume token so you can pick up exactly where you left off.

const fs = require('fs');
const TOKEN_FILE = './resume_token.json';

function loadResumeToken() {
  try {
    const data = fs.readFileSync(TOKEN_FILE, 'utf8');
    return JSON.parse(data);
  } catch {
    return null;
  }
}

function saveResumeToken(token) {
  fs.writeFileSync(TOKEN_FILE, JSON.stringify(token));
}

async function watchWithResume() {
  await client.connect();
  const collection = client.db('shop').collection('orders');

  const resumeToken = loadResumeToken();
  const options = resumeToken
    ? { resumeAfter: resumeToken, fullDocument: 'updateLookup' }
    : { fullDocument: 'updateLookup' };

  const changeStream = collection.watch([], options);

  changeStream.on('change', async (change) => {
    // Save token before processing — if we crash after processing there may be a duplicate,
    // but at least we won't lose the event
    saveResumeToken(change._id);
    await processChange(change);
  });
}

Handling Reconnects — The Most Overlooked Part

Change Streams do not automatically reconnect when the MongoDB connection drops. I once deployed what I thought was a solid setup, only to check the logs the next morning and find the Change Stream had died overnight because MongoDB restarted for maintenance. No clear error, no alert — the stream just went silent and stopped receiving events.

Option 1: Retry Loop with Exponential Backoff

async function watchWithRetry() {
  let retryDelay = 1000; // Start at 1 second
  const maxDelay = 30000; // Cap at 30 seconds

  while (true) {
    try {
      await runChangeStream();
    } catch (err) {
      console.error(`Change stream error, retrying in ${retryDelay}ms:`, err.message);
      await new Promise(resolve => setTimeout(resolve, retryDelay));
      retryDelay = Math.min(retryDelay * 2, maxDelay);
    }
  }
}

async function runChangeStream() {
  const resumeToken = loadResumeToken();
  const changeStream = collection.watch([], {
    fullDocument: 'updateLookup',
    ...(resumeToken && { resumeAfter: resumeToken })
  });

  return new Promise((resolve, reject) => {
    changeStream.on('change', async (change) => {
      saveResumeToken(change._id);
      await processChange(change);
      retryDelay = 1000; // Reset delay once connection is stable
    });
    changeStream.on('error', reject);
    changeStream.on('close', resolve);
  });
}

Option 2: Use a Library with Built-In Retry

Don’t want to implement retry logic yourself? You can use mongodb-change-stream-watcher or wrap everything in a lifecycle-managing class. For smaller projects, the manual approach above is sufficient — fewer dependencies and easier to debug when things go wrong.

Production-Ready Pattern: Lessons from the Field

After getting burned enough times with Change Streams in production, I’ve settled on this pattern:

Save Resume Tokens to the Database, Not a File

Files can be lost when a container restarts or a new deployment rolls out. Saving to a separate MongoDB collection guarantees atomicity and is far more durable.

async function saveResumeTokenToDb(token) {
  await client.db('meta').collection('stream_tokens').updateOne(
    { _id: 'orders_stream' },
    { $set: { token, updatedAt: new Date() } },
    { upsert: true }
  );
}

async function loadResumeTokenFromDb() {
  const doc = await client.db('meta').collection('stream_tokens').findOne(
    { _id: 'orders_stream' }
  );
  return doc?.token || null;
}

Graceful Shutdown

let activeChangeStream = null;

process.on('SIGINT', async () => {
  console.log('Closing change stream...');
  if (activeChangeStream) {
    await activeChangeStream.close();
  }
  await client.close();
  process.exit(0);
});

Monitoring — Don’t Let Your Stream Die Silently

// Heartbeat every 60 seconds — if staleSecs climbs unusually high, the stream has a problem
let lastEventTime = Date.now();

changeStream.on('change', (change) => {
  lastEventTime = Date.now();
  // ... process event
});

setInterval(() => {
  const staleSecs = Math.round((Date.now() - lastEventTime) / 1000);
  console.log(`[ChangeStream] Last event: ${staleSecs}s ago`);
  // Push metric to monitoring if staleSecs is too high
}, 60000);

Key Lessons from Production Experience

  • oplog window: MongoDB only retains the oplog for a limited window — typically around 24 hours or based on disk capacity. If your application is offline too long, the resume token may have expired. Catch ChangeStreamHistoryLostError and restart from scratch instead of silently crashing.
  • fullDocument with updates: By default, an update event only contains the changed fields, not the full document. Add fullDocument: 'updateLookup' when you need the complete document — but keep in mind this option triggers an additional lookup query, adding a small amount of latency.
  • Not suitable for heavy synchronous workloads: Change Streams are ideal for side effects — sending emails, updating caches, pushing notifications. Don’t block the event loop with heavy tasks. Process asynchronously or push to a queue (BullMQ, RabbitMQ) when needed.
  • Watch at the database level: Use db.watch() instead of watching individual collections to monitor multiple collections at once — useful for audit logs or syncing data between services.

Share: