Skip to content

MongoDB Change Streams: Real-Time Data Processing with SQL-Style Event Handling

Modern applications increasingly require real-time data processing capabilities. Whether you're building collaborative editing tools, live dashboards, notification systems, or real-time analytics, the ability to react to data changes as they happen is essential for delivering responsive user experiences.

MongoDB Change Streams provide a powerful mechanism for building event-driven architectures that react to database changes in real time. Combined with SQL-style event handling patterns, you can create sophisticated reactive systems that scale efficiently while maintaining familiar development patterns.

The Real-Time Data Challenge

Traditional polling approaches to detect data changes are inefficient and don't scale:

-- Inefficient polling approach
-- Check for new orders every 5 seconds
SELECT order_id, customer_id, total_amount, created_at
FROM orders 
WHERE created_at > '2025-08-25 10:00:00'
  AND status = 'pending'
ORDER BY created_at DESC;

-- Problems with polling:
-- - Constant database load
-- - Delayed reaction to changes (up to polling interval)
-- - Wasted resources when no changes occur
-- - Difficulty coordinating across multiple services

MongoDB Change Streams solve these problems by providing push-based notifications:

// Real-time change detection with MongoDB Change Streams
const changeStream = db.collection('orders').watch([
  {
    $match: {
      'operationType': { $in: ['insert', 'update'] },
      'fullDocument.status': 'pending'
    }
  }
]);

changeStream.on('change', (change) => {
  console.log('New order event:', change);
  // React immediately to changes
  processNewOrder(change.fullDocument);
});

Understanding Change Streams

Change Stream Events

MongoDB Change Streams emit events for various database operations:

// Sample change stream event structure
{
  "_id": {
    "_data": "8264F1A2C4000000012B022C0100296E5A1004..."
  },
  "operationType": "insert",  // insert, update, delete, replace, invalidate
  "clusterTime": Timestamp(1693547204, 1),
  "wallTime": ISODate("2025-08-25T10:15:04.123Z"),
  "fullDocument": {
    "_id": ObjectId("64f1a2c4567890abcdef1234"),
    "customer_id": ObjectId("64f1a2c4567890abcdef5678"),
    "items": [
      {
        "product_id": ObjectId("64f1a2c4567890abcdef9012"),
        "name": "Wireless Headphones",
        "quantity": 2,
        "price": 79.99
      }
    ],
    "total_amount": 159.98,
    "status": "pending",
    "created_at": ISODate("2025-08-25T10:15:04.120Z")
  },
  "ns": {
    "db": "ecommerce",
    "coll": "orders"
  },
  "documentKey": {
    "_id": ObjectId("64f1a2c4567890abcdef1234")
  }
}

SQL-style event interpretation:

-- Conceptual SQL trigger equivalent
CREATE TRIGGER order_changes
  AFTER INSERT OR UPDATE ON orders
  FOR EACH ROW
BEGIN
  -- Emit event with change details
  INSERT INTO change_events (
    event_id,
    operation_type,
    table_name,
    document_id, 
    new_document,
    old_document,
    timestamp
  ) VALUES (
    GENERATE_UUID(),
    CASE 
      WHEN TG_OP = 'INSERT' THEN 'insert'
      WHEN TG_OP = 'UPDATE' THEN 'update'
      WHEN TG_OP = 'DELETE' THEN 'delete'
    END,
    'orders',
    NEW.order_id,
    ROW_TO_JSON(NEW),
    ROW_TO_JSON(OLD),
    NOW()
  );
END;

Building Real-Time Applications

E-Commerce Order Processing

Create a real-time order processing system:

// Real-time order processing with Change Streams
class OrderProcessor {
  constructor(db) {
    this.db = db;
    this.orderChangeStream = null;
    this.inventoryChangeStream = null;
  }

  startProcessing() {
    // Watch for new orders
    this.orderChangeStream = this.db.collection('orders').watch([
      {
        $match: {
          $or: [
            { 
              'operationType': 'insert',
              'fullDocument.status': 'pending'
            },
            {
              'operationType': 'update',
              'updateDescription.updatedFields.status': 'paid'
            }
          ]
        }
      }
    ], { fullDocument: 'updateLookup' });

    this.orderChangeStream.on('change', async (change) => {
      try {
        await this.handleOrderChange(change);
      } catch (error) {
        console.error('Error processing order change:', error);
        await this.logErrorEvent(change, error);
      }
    });

    // Watch for inventory updates
    this.inventoryChangeStream = this.db.collection('inventory').watch([
      {
        $match: {
          'operationType': 'update',
          'updateDescription.updatedFields.quantity': { $exists: true }
        }
      }
    ]);

    this.inventoryChangeStream.on('change', async (change) => {
      await this.handleInventoryChange(change);
    });
  }

  async handleOrderChange(change) {
    const order = change.fullDocument;

    switch (change.operationType) {
      case 'insert':
        console.log(`New order received: ${order._id}`);
        await this.validateOrder(order);
        await this.reserveInventory(order);
        await this.notifyFulfillment(order);
        break;

      case 'update':
        if (order.status === 'paid') {
          console.log(`Order paid: ${order._id}`);
          await this.processPayment(order);
          await this.createShipmentRecord(order);
        }
        break;
    }
  }

  async validateOrder(order) {
    // Validate order data and business rules
    const customer = await this.db.collection('customers')
      .findOne({ _id: order.customer_id });

    if (!customer) {
      throw new Error('Invalid customer ID');
    }

    // Check product availability
    const productIds = order.items.map(item => item.product_id);
    const products = await this.db.collection('products')
      .find({ _id: { $in: productIds } }).toArray();

    if (products.length !== productIds.length) {
      throw new Error('Some products not found');
    }
  }

  async reserveInventory(order) {
    // Reserve inventory items atomically
    for (const item of order.items) {
      await this.db.collection('inventory').updateOne(
        {
          product_id: item.product_id,
          quantity: { $gte: item.quantity }
        },
        {
          $inc: { 
            quantity: -item.quantity,
            reserved: item.quantity
          },
          $push: {
            reservations: {
              order_id: order._id,
              quantity: item.quantity,
              timestamp: new Date()
            }
          }
        }
      );
    }
  }
}

Real-Time Dashboard Updates

Build live dashboards that update automatically:

// Real-time sales dashboard
class SalesDashboard {
  constructor(db, socketServer) {
    this.db = db;
    this.io = socketServer;
    this.metrics = new Map();
  }

  startMonitoring() {
    // Watch sales data changes
    const salesChangeStream = this.db.collection('orders').watch([
      {
        $match: {
          $or: [
            { 'operationType': 'insert' },
            { 
              'operationType': 'update',
              'updateDescription.updatedFields.status': 'completed'
            }
          ]
        }
      }
    ], { fullDocument: 'updateLookup' });

    salesChangeStream.on('change', async (change) => {
      await this.updateDashboardMetrics(change);
    });
  }

  async updateDashboardMetrics(change) {
    const order = change.fullDocument;

    // Calculate real-time metrics
    const now = new Date();
    const today = new Date(now.getFullYear(), now.getMonth(), now.getDate());

    if (change.operationType === 'insert' || 
        (change.operationType === 'update' && order.status === 'completed')) {

      // Update daily sales metrics
      const dailyStats = await this.calculateDailyStats(today);

      // Broadcast updates to connected dashboards
      this.io.emit('sales_update', {
        type: 'daily_stats',
        data: dailyStats,
        timestamp: now
      });

      // Update product performance metrics
      if (order.status === 'completed') {
        const productStats = await this.calculateProductStats(order);

        this.io.emit('sales_update', {
          type: 'product_performance', 
          data: productStats,
          timestamp: now
        });
      }
    }
  }

  async calculateDailyStats(date) {
    return await this.db.collection('orders').aggregate([
      {
        $match: {
          created_at: { 
            $gte: date,
            $lt: new Date(date.getTime() + 86400000) // Next day
          },
          status: { $in: ['pending', 'paid', 'completed'] }
        }
      },
      {
        $group: {
          _id: null,
          total_orders: { $sum: 1 },
          total_revenue: { $sum: '$total_amount' },
          completed_orders: {
            $sum: { $cond: [{ $eq: ['$status', 'completed'] }, 1, 0] }
          },
          pending_orders: {
            $sum: { $cond: [{ $eq: ['$status', 'pending'] }, 1, 0] }
          },
          avg_order_value: { $avg: '$total_amount' }
        }
      }
    ]).toArray();
  }
}

Advanced Change Stream Patterns

Filtering and Transformation

Use aggregation pipelines to filter and transform change events:

// Advanced change stream filtering
const changeStream = db.collection('user_activity').watch([
  // Stage 1: Filter for specific operations
  {
    $match: {
      'operationType': { $in: ['insert', 'update'] },
      $or: [
        { 'fullDocument.event_type': 'login' },
        { 'fullDocument.event_type': 'purchase' },
        { 'updateDescription.updatedFields.last_active': { $exists: true } }
      ]
    }
  },

  // Stage 2: Add computed fields
  {
    $addFields: {
      'processedAt': new Date(),
      'priority': {
        $switch: {
          branches: [
            { 
              case: { $eq: ['$fullDocument.event_type', 'purchase'] },
              then: 'high'
            },
            {
              case: { $eq: ['$fullDocument.event_type', 'login'] }, 
              then: 'medium'
            }
          ],
          default: 'low'
        }
      }
    }
  },

  // Stage 3: Project specific fields
  {
    $project: {
      '_id': 1,
      'operationType': 1,
      'fullDocument.user_id': 1,
      'fullDocument.event_type': 1,
      'fullDocument.timestamp': 1,
      'priority': 1,
      'processedAt': 1
    }
  }
]);

SQL-style event filtering concept:

-- Equivalent SQL-style event filtering
WITH filtered_changes AS (
  SELECT 
    event_id,
    operation_type,
    user_id,
    event_type,
    event_timestamp,
    processed_at,
    CASE 
      WHEN event_type = 'purchase' THEN 'high'
      WHEN event_type = 'login' THEN 'medium'
      ELSE 'low'
    END AS priority
  FROM user_activity_changes
  WHERE operation_type IN ('insert', 'update')
    AND (
      event_type IN ('login', 'purchase') OR
      last_active_updated = true
    )
)
SELECT *
FROM filtered_changes
WHERE priority IN ('high', 'medium')
ORDER BY 
  CASE priority
    WHEN 'high' THEN 1
    WHEN 'medium' THEN 2
    ELSE 3
  END,
  event_timestamp DESC;

Resume Tokens and Fault Tolerance

Implement robust change stream processing with resume capability:

// Fault-tolerant change stream processing
class ResilientChangeProcessor {
  constructor(db, collection, pipeline) {
    this.db = db;
    this.collection = collection;
    this.pipeline = pipeline;
    this.resumeToken = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
  }

  async start() {
    try {
      // Load last known resume token from persistent storage
      this.resumeToken = await this.loadResumeToken();

      const options = {
        fullDocument: 'updateLookup'
      };

      // Resume from last known position if available
      if (this.resumeToken) {
        options.resumeAfter = this.resumeToken;
        console.log('Resuming change stream from token:', this.resumeToken);
      }

      const changeStream = this.db.collection(this.collection)
        .watch(this.pipeline, options);

      changeStream.on('change', async (change) => {
        try {
          // Process the change event
          await this.processChange(change);

          // Save resume token for fault recovery
          this.resumeToken = change._id;
          await this.saveResumeToken(this.resumeToken);

          // Reset reconnect attempts on successful processing
          this.reconnectAttempts = 0;

        } catch (error) {
          console.error('Error processing change:', error);
          await this.handleProcessingError(change, error);
        }
      });

      changeStream.on('error', async (error) => {
        console.error('Change stream error:', error);
        await this.handleStreamError(error);
      });

      changeStream.on('close', () => {
        console.log('Change stream closed');
        this.scheduleReconnect();
      });

    } catch (error) {
      console.error('Failed to start change stream:', error);
      this.scheduleReconnect();
    }
  }

  async handleStreamError(error) {
    // Handle different types of errors appropriately
    if (error.code === 40573) { // InvalidResumeToken
      console.log('Resume token invalid, starting from current time');
      this.resumeToken = null;
      await this.saveResumeToken(null);
      this.scheduleReconnect();
    } else {
      this.scheduleReconnect();
    }
  }

  scheduleReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);

      console.log(`Scheduling reconnect in ${delay}ms (attempt ${this.reconnectAttempts})`);

      setTimeout(() => {
        this.start();
      }, delay);
    } else {
      console.error('Maximum reconnect attempts reached');
      process.exit(1);
    }
  }

  async loadResumeToken() {
    // Load from persistent storage (Redis, file, database, etc.)
    const tokenRecord = await this.db.collection('change_stream_tokens')
      .findOne({ processor_id: this.getProcessorId() });

    return tokenRecord ? tokenRecord.resume_token : null;
  }

  async saveResumeToken(token) {
    await this.db.collection('change_stream_tokens').updateOne(
      { processor_id: this.getProcessorId() },
      { 
        $set: { 
          resume_token: token,
          updated_at: new Date()
        }
      },
      { upsert: true }
    );
  }

  getProcessorId() {
    return `${this.collection}_processor_${process.env.HOSTNAME || 'default'}`;
  }
}

Change Streams for Microservices

Event-Driven Architecture

Use Change Streams to build loosely coupled microservices:

// Order service publishes events via Change Streams
class OrderService {
  constructor(db, eventBus) {
    this.db = db;
    this.eventBus = eventBus;
  }

  startEventPublisher() {
    const changeStream = this.db.collection('orders').watch([
      {
        $match: {
          'operationType': { $in: ['insert', 'update', 'delete'] }
        }
      }
    ], { fullDocument: 'updateLookup' });

    changeStream.on('change', async (change) => {
      const event = this.transformToBusinessEvent(change);
      await this.eventBus.publish(event);
    });
  }

  transformToBusinessEvent(change) {
    const baseEvent = {
      eventId: change._id._data,
      timestamp: change.wallTime,
      source: 'order-service',
      version: '1.0'
    };

    switch (change.operationType) {
      case 'insert':
        return {
          ...baseEvent,
          eventType: 'OrderCreated',
          data: {
            orderId: change.documentKey._id,
            customerId: change.fullDocument.customer_id,
            totalAmount: change.fullDocument.total_amount,
            items: change.fullDocument.items
          }
        };

      case 'update':
        const updatedFields = change.updateDescription?.updatedFields || {};

        if (updatedFields.status) {
          return {
            ...baseEvent,
            eventType: 'OrderStatusChanged',
            data: {
              orderId: change.documentKey._id,
              oldStatus: this.getOldStatus(change),
              newStatus: updatedFields.status
            }
          };
        }

        return {
          ...baseEvent,
          eventType: 'OrderUpdated',
          data: {
            orderId: change.documentKey._id,
            updatedFields: updatedFields
          }
        };

      case 'delete':
        return {
          ...baseEvent,
          eventType: 'OrderDeleted',
          data: {
            orderId: change.documentKey._id
          }
        };
    }
  }
}

Cross-Service Data Synchronization

Synchronize data across services using Change Streams:

-- SQL-style approach to service synchronization
-- Service A updates user profile
UPDATE users 
SET email = '[email protected]',
    updated_at = NOW()
WHERE user_id = 12345;

-- Service B receives event and updates its local cache
INSERT INTO user_cache (
  user_id,
  email,
  last_sync,
  sync_version
) VALUES (
  12345,
  '[email protected]',
  NOW(),
  (SELECT COALESCE(MAX(sync_version), 0) + 1 FROM user_cache WHERE user_id = 12345)
) ON CONFLICT (user_id) 
DO UPDATE SET
  email = EXCLUDED.email,
  last_sync = EXCLUDED.last_sync,
  sync_version = EXCLUDED.sync_version;

MongoDB Change Streams implementation:

// Service B subscribes to user changes from Service A
class UserSyncService {
  constructor(sourceDb, localDb) {
    this.sourceDb = sourceDb;
    this.localDb = localDb;
  }

  startSync() {
    const userChangeStream = this.sourceDb.collection('users').watch([
      {
        $match: {
          'operationType': { $in: ['insert', 'update', 'delete'] },
          'fullDocument.service_visibility': { $in: ['public', 'internal'] }
        }
      }
    ], { fullDocument: 'updateLookup' });

    userChangeStream.on('change', async (change) => {
      await this.syncUserChange(change);
    });
  }

  async syncUserChange(change) {
    const session = this.localDb.client.startSession();

    try {
      await session.withTransaction(async () => {
        switch (change.operationType) {
          case 'insert':
          case 'update':
            await this.localDb.collection('user_cache').updateOne(
              { user_id: change.documentKey._id },
              {
                $set: {
                  email: change.fullDocument.email,
                  name: change.fullDocument.name,
                  profile_data: change.fullDocument.profile_data,
                  last_sync: new Date(),
                  source_version: change.clusterTime
                }
              },
              { upsert: true, session }
            );
            break;

          case 'delete':
            await this.localDb.collection('user_cache').deleteOne(
              { user_id: change.documentKey._id },
              { session }
            );
            break;
        }

        // Log sync event for debugging
        await this.localDb.collection('sync_log').insertOne({
          operation: change.operationType,
          collection: 'users',
          document_id: change.documentKey._id,
          timestamp: new Date(),
          cluster_time: change.clusterTime
        }, { session });
      });

    } finally {
      await session.endSession();
    }
  }
}

Performance and Scalability

Change Stream Optimization

Optimize Change Streams for high-throughput scenarios:

// High-performance change stream configuration
const changeStreamOptions = {
  fullDocument: 'whenAvailable',  // Don't fetch full documents if not needed
  batchSize: 100,                 // Process changes in batches
  maxTimeMS: 5000,               // Timeout for getMore operations
  collation: {
    locale: 'simple'             // Use simple collation for performance
  }
};

// Batch processing for high-throughput scenarios
class BatchChangeProcessor {
  constructor(db, collection, batchSize = 50) {
    this.db = db;
    this.collection = collection;
    this.batchSize = batchSize;
    this.changeBatch = [];
    this.batchTimer = null;
  }

  startProcessing() {
    const changeStream = this.db.collection(this.collection)
      .watch([], changeStreamOptions);

    changeStream.on('change', (change) => {
      this.changeBatch.push(change);

      // Process batch when full or after timeout
      if (this.changeBatch.length >= this.batchSize) {
        this.processBatch();
      } else if (!this.batchTimer) {
        this.batchTimer = setTimeout(() => {
          if (this.changeBatch.length > 0) {
            this.processBatch();
          }
        }, 1000);
      }
    });
  }

  async processBatch() {
    const batch = this.changeBatch.splice(0);

    if (this.batchTimer) {
      clearTimeout(this.batchTimer);
      this.batchTimer = null;
    }

    try {
      // Process batch of changes
      await this.handleChangeBatch(batch);
    } catch (error) {
      console.error('Error processing change batch:', error);
      // Implement retry logic or dead letter queue
    }
  }

  async handleChangeBatch(changes) {
    // Group changes by operation type
    const inserts = changes.filter(c => c.operationType === 'insert');
    const updates = changes.filter(c => c.operationType === 'update');
    const deletes = changes.filter(c => c.operationType === 'delete');

    // Process each operation type in parallel
    await Promise.all([
      this.processInserts(inserts),
      this.processUpdates(updates), 
      this.processDeletes(deletes)
    ]);
  }
}

QueryLeaf Change Stream Integration

QueryLeaf can help translate Change Stream concepts to familiar SQL patterns:

-- QueryLeaf provides SQL-like syntax for change stream operations
CREATE TRIGGER user_activity_trigger 
  ON user_activity
  FOR INSERT, UPDATE, DELETE
AS
BEGIN
  -- Process real-time user activity changes
  WITH activity_changes AS (
    SELECT 
      CASE 
        WHEN operation = 'INSERT' THEN 'user_registered'
        WHEN operation = 'UPDATE' AND NEW.last_login != OLD.last_login THEN 'user_login'
        WHEN operation = 'DELETE' THEN 'user_deactivated'
      END AS event_type,
      NEW.user_id,
      NEW.email,
      NEW.last_login,
      CURRENT_TIMESTAMP AS event_timestamp
    FROM INSERTED NEW
    LEFT JOIN DELETED OLD ON NEW.user_id = OLD.user_id
    WHERE event_type IS NOT NULL
  )
  INSERT INTO user_events (
    event_type,
    user_id, 
    event_data,
    timestamp
  )
  SELECT 
    event_type,
    user_id,
    JSON_OBJECT(
      'email', email,
      'last_login', last_login
    ),
    event_timestamp
  FROM activity_changes;
END;

-- Query real-time user activity
SELECT 
  event_type,
  COUNT(*) as event_count,
  DATE_TRUNC('minute', timestamp) as minute
FROM user_events
WHERE timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY event_type, DATE_TRUNC('minute', timestamp)
ORDER BY minute DESC, event_count DESC;

-- QueryLeaf automatically translates this to:
-- 1. MongoDB Change Stream with appropriate filters
-- 2. Aggregation pipeline for event grouping
-- 3. Real-time event emission to subscribers
-- 4. Automatic resume token management

Security and Access Control

Change Stream Permissions

Control access to change stream data:

// Role-based change stream access
db.createRole({
  role: "orderChangeStreamReader",
  privileges: [
    {
      resource: { db: "ecommerce", collection: "orders" },
      actions: ["changeStream", "find"]
    }
  ],
  roles: []
});

// Create user with limited change stream access
db.createUser({
  user: "orderProcessor",
  pwd: "securePassword",
  roles: ["orderChangeStreamReader"]
});

Data Filtering and Privacy

Filter sensitive data from change streams:

// Privacy-aware change stream
const privateFieldsFilter = {
  $unset: [
    'fullDocument.credit_card',
    'fullDocument.ssn',
    'fullDocument.personal_notes'
  ]
};

const changeStream = db.collection('customers').watch([
  {
    $match: {
      'operationType': { $in: ['insert', 'update'] }
    }
  },
  privateFieldsFilter  // Remove sensitive fields
]);

Best Practices for Change Streams

  1. Resume Token Management: Always persist resume tokens for fault tolerance
  2. Error Handling: Implement comprehensive error handling and retry logic
  3. Performance Monitoring: Monitor change stream lag and processing times
  4. Resource Management: Use appropriate batch sizes and connection pooling
  5. Security: Filter sensitive data and implement proper access controls
  6. Testing: Test resume behavior and failover scenarios regularly

Conclusion

MongoDB Change Streams provide a powerful foundation for building real-time, event-driven applications. Combined with SQL-style event handling patterns, you can create responsive systems that react to data changes instantly while maintaining familiar development patterns.

Key benefits of Change Streams include:

  • Real-Time Processing: Immediate notification of database changes without polling
  • Event-Driven Architecture: Build loosely coupled microservices that react to data events
  • Fault Tolerance: Resume processing from any point using resume tokens
  • Scalability: Handle high-throughput scenarios with batch processing and filtering
  • Flexibility: Use aggregation pipelines to transform and filter events

Whether you're building collaborative applications, real-time dashboards, or distributed microservices, Change Streams enable you to create responsive systems that scale efficiently. The combination of MongoDB's powerful change detection with QueryLeaf's familiar SQL patterns makes building real-time applications both powerful and accessible.

From e-commerce order processing to live analytics dashboards, Change Streams provide the foundation for modern, event-driven applications that deliver exceptional user experiences through real-time data processing.