MongoDB Change Streams: Real-Time Data Processing with SQL-Style Event Handling
Modern applications increasingly require real-time data processing capabilities. Whether you're building collaborative editing tools, live dashboards, notification systems, or real-time analytics, the ability to react to data changes as they happen is essential for delivering responsive user experiences.
MongoDB Change Streams provide a powerful mechanism for building event-driven architectures that react to database changes in real time. Combined with SQL-style event handling patterns, you can create sophisticated reactive systems that scale efficiently while maintaining familiar development patterns.
The Real-Time Data Challenge
Traditional polling approaches to detect data changes are inefficient and don't scale:
-- Inefficient polling approach
-- Check for new orders every 5 seconds
SELECT order_id, customer_id, total_amount, created_at
FROM orders
WHERE created_at > '2025-08-25 10:00:00'
AND status = 'pending'
ORDER BY created_at DESC;
-- Problems with polling:
-- - Constant database load
-- - Delayed reaction to changes (up to polling interval)
-- - Wasted resources when no changes occur
-- - Difficulty coordinating across multiple services
MongoDB Change Streams solve these problems by providing push-based notifications:
// Real-time change detection with MongoDB Change Streams
const changeStream = db.collection('orders').watch([
{
$match: {
'operationType': { $in: ['insert', 'update'] },
'fullDocument.status': 'pending'
}
}
]);
changeStream.on('change', (change) => {
console.log('New order event:', change);
// React immediately to changes
processNewOrder(change.fullDocument);
});
Understanding Change Streams
Change Stream Events
MongoDB Change Streams emit events for various database operations:
// Sample change stream event structure
{
"_id": {
"_data": "8264F1A2C4000000012B022C0100296E5A1004..."
},
"operationType": "insert", // insert, update, delete, replace, invalidate
"clusterTime": Timestamp(1693547204, 1),
"wallTime": ISODate("2025-08-25T10:15:04.123Z"),
"fullDocument": {
"_id": ObjectId("64f1a2c4567890abcdef1234"),
"customer_id": ObjectId("64f1a2c4567890abcdef5678"),
"items": [
{
"product_id": ObjectId("64f1a2c4567890abcdef9012"),
"name": "Wireless Headphones",
"quantity": 2,
"price": 79.99
}
],
"total_amount": 159.98,
"status": "pending",
"created_at": ISODate("2025-08-25T10:15:04.120Z")
},
"ns": {
"db": "ecommerce",
"coll": "orders"
},
"documentKey": {
"_id": ObjectId("64f1a2c4567890abcdef1234")
}
}
SQL-style event interpretation:
-- Conceptual SQL trigger equivalent
CREATE TRIGGER order_changes
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
BEGIN
-- Emit event with change details
INSERT INTO change_events (
event_id,
operation_type,
table_name,
document_id,
new_document,
old_document,
timestamp
) VALUES (
GENERATE_UUID(),
CASE
WHEN TG_OP = 'INSERT' THEN 'insert'
WHEN TG_OP = 'UPDATE' THEN 'update'
WHEN TG_OP = 'DELETE' THEN 'delete'
END,
'orders',
NEW.order_id,
ROW_TO_JSON(NEW),
ROW_TO_JSON(OLD),
NOW()
);
END;
Building Real-Time Applications
E-Commerce Order Processing
Create a real-time order processing system:
// Real-time order processing with Change Streams
class OrderProcessor {
constructor(db) {
this.db = db;
this.orderChangeStream = null;
this.inventoryChangeStream = null;
}
startProcessing() {
// Watch for new orders
this.orderChangeStream = this.db.collection('orders').watch([
{
$match: {
$or: [
{
'operationType': 'insert',
'fullDocument.status': 'pending'
},
{
'operationType': 'update',
'updateDescription.updatedFields.status': 'paid'
}
]
}
}
], { fullDocument: 'updateLookup' });
this.orderChangeStream.on('change', async (change) => {
try {
await this.handleOrderChange(change);
} catch (error) {
console.error('Error processing order change:', error);
await this.logErrorEvent(change, error);
}
});
// Watch for inventory updates
this.inventoryChangeStream = this.db.collection('inventory').watch([
{
$match: {
'operationType': 'update',
'updateDescription.updatedFields.quantity': { $exists: true }
}
}
]);
this.inventoryChangeStream.on('change', async (change) => {
await this.handleInventoryChange(change);
});
}
async handleOrderChange(change) {
const order = change.fullDocument;
switch (change.operationType) {
case 'insert':
console.log(`New order received: ${order._id}`);
await this.validateOrder(order);
await this.reserveInventory(order);
await this.notifyFulfillment(order);
break;
case 'update':
if (order.status === 'paid') {
console.log(`Order paid: ${order._id}`);
await this.processPayment(order);
await this.createShipmentRecord(order);
}
break;
}
}
async validateOrder(order) {
// Validate order data and business rules
const customer = await this.db.collection('customers')
.findOne({ _id: order.customer_id });
if (!customer) {
throw new Error('Invalid customer ID');
}
// Check product availability
const productIds = order.items.map(item => item.product_id);
const products = await this.db.collection('products')
.find({ _id: { $in: productIds } }).toArray();
if (products.length !== productIds.length) {
throw new Error('Some products not found');
}
}
async reserveInventory(order) {
// Reserve inventory items atomically
for (const item of order.items) {
await this.db.collection('inventory').updateOne(
{
product_id: item.product_id,
quantity: { $gte: item.quantity }
},
{
$inc: {
quantity: -item.quantity,
reserved: item.quantity
},
$push: {
reservations: {
order_id: order._id,
quantity: item.quantity,
timestamp: new Date()
}
}
}
);
}
}
}
Real-Time Dashboard Updates
Build live dashboards that update automatically:
// Real-time sales dashboard
class SalesDashboard {
constructor(db, socketServer) {
this.db = db;
this.io = socketServer;
this.metrics = new Map();
}
startMonitoring() {
// Watch sales data changes
const salesChangeStream = this.db.collection('orders').watch([
{
$match: {
$or: [
{ 'operationType': 'insert' },
{
'operationType': 'update',
'updateDescription.updatedFields.status': 'completed'
}
]
}
}
], { fullDocument: 'updateLookup' });
salesChangeStream.on('change', async (change) => {
await this.updateDashboardMetrics(change);
});
}
async updateDashboardMetrics(change) {
const order = change.fullDocument;
// Calculate real-time metrics
const now = new Date();
const today = new Date(now.getFullYear(), now.getMonth(), now.getDate());
if (change.operationType === 'insert' ||
(change.operationType === 'update' && order.status === 'completed')) {
// Update daily sales metrics
const dailyStats = await this.calculateDailyStats(today);
// Broadcast updates to connected dashboards
this.io.emit('sales_update', {
type: 'daily_stats',
data: dailyStats,
timestamp: now
});
// Update product performance metrics
if (order.status === 'completed') {
const productStats = await this.calculateProductStats(order);
this.io.emit('sales_update', {
type: 'product_performance',
data: productStats,
timestamp: now
});
}
}
}
async calculateDailyStats(date) {
return await this.db.collection('orders').aggregate([
{
$match: {
created_at: {
$gte: date,
$lt: new Date(date.getTime() + 86400000) // Next day
},
status: { $in: ['pending', 'paid', 'completed'] }
}
},
{
$group: {
_id: null,
total_orders: { $sum: 1 },
total_revenue: { $sum: '$total_amount' },
completed_orders: {
$sum: { $cond: [{ $eq: ['$status', 'completed'] }, 1, 0] }
},
pending_orders: {
$sum: { $cond: [{ $eq: ['$status', 'pending'] }, 1, 0] }
},
avg_order_value: { $avg: '$total_amount' }
}
}
]).toArray();
}
}
Advanced Change Stream Patterns
Filtering and Transformation
Use aggregation pipelines to filter and transform change events:
// Advanced change stream filtering
const changeStream = db.collection('user_activity').watch([
// Stage 1: Filter for specific operations
{
$match: {
'operationType': { $in: ['insert', 'update'] },
$or: [
{ 'fullDocument.event_type': 'login' },
{ 'fullDocument.event_type': 'purchase' },
{ 'updateDescription.updatedFields.last_active': { $exists: true } }
]
}
},
// Stage 2: Add computed fields
{
$addFields: {
'processedAt': new Date(),
'priority': {
$switch: {
branches: [
{
case: { $eq: ['$fullDocument.event_type', 'purchase'] },
then: 'high'
},
{
case: { $eq: ['$fullDocument.event_type', 'login'] },
then: 'medium'
}
],
default: 'low'
}
}
}
},
// Stage 3: Project specific fields
{
$project: {
'_id': 1,
'operationType': 1,
'fullDocument.user_id': 1,
'fullDocument.event_type': 1,
'fullDocument.timestamp': 1,
'priority': 1,
'processedAt': 1
}
}
]);
SQL-style event filtering concept:
-- Equivalent SQL-style event filtering
WITH filtered_changes AS (
SELECT
event_id,
operation_type,
user_id,
event_type,
event_timestamp,
processed_at,
CASE
WHEN event_type = 'purchase' THEN 'high'
WHEN event_type = 'login' THEN 'medium'
ELSE 'low'
END AS priority
FROM user_activity_changes
WHERE operation_type IN ('insert', 'update')
AND (
event_type IN ('login', 'purchase') OR
last_active_updated = true
)
)
SELECT *
FROM filtered_changes
WHERE priority IN ('high', 'medium')
ORDER BY
CASE priority
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
ELSE 3
END,
event_timestamp DESC;
Resume Tokens and Fault Tolerance
Implement robust change stream processing with resume capability:
// Fault-tolerant change stream processing
class ResilientChangeProcessor {
constructor(db, collection, pipeline) {
this.db = db;
this.collection = collection;
this.pipeline = pipeline;
this.resumeToken = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
async start() {
try {
// Load last known resume token from persistent storage
this.resumeToken = await this.loadResumeToken();
const options = {
fullDocument: 'updateLookup'
};
// Resume from last known position if available
if (this.resumeToken) {
options.resumeAfter = this.resumeToken;
console.log('Resuming change stream from token:', this.resumeToken);
}
const changeStream = this.db.collection(this.collection)
.watch(this.pipeline, options);
changeStream.on('change', async (change) => {
try {
// Process the change event
await this.processChange(change);
// Save resume token for fault recovery
this.resumeToken = change._id;
await this.saveResumeToken(this.resumeToken);
// Reset reconnect attempts on successful processing
this.reconnectAttempts = 0;
} catch (error) {
console.error('Error processing change:', error);
await this.handleProcessingError(change, error);
}
});
changeStream.on('error', async (error) => {
console.error('Change stream error:', error);
await this.handleStreamError(error);
});
changeStream.on('close', () => {
console.log('Change stream closed');
this.scheduleReconnect();
});
} catch (error) {
console.error('Failed to start change stream:', error);
this.scheduleReconnect();
}
}
async handleStreamError(error) {
// Handle different types of errors appropriately
if (error.code === 40573) { // InvalidResumeToken
console.log('Resume token invalid, starting from current time');
this.resumeToken = null;
await this.saveResumeToken(null);
this.scheduleReconnect();
} else {
this.scheduleReconnect();
}
}
scheduleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`Scheduling reconnect in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.start();
}, delay);
} else {
console.error('Maximum reconnect attempts reached');
process.exit(1);
}
}
async loadResumeToken() {
// Load from persistent storage (Redis, file, database, etc.)
const tokenRecord = await this.db.collection('change_stream_tokens')
.findOne({ processor_id: this.getProcessorId() });
return tokenRecord ? tokenRecord.resume_token : null;
}
async saveResumeToken(token) {
await this.db.collection('change_stream_tokens').updateOne(
{ processor_id: this.getProcessorId() },
{
$set: {
resume_token: token,
updated_at: new Date()
}
},
{ upsert: true }
);
}
getProcessorId() {
return `${this.collection}_processor_${process.env.HOSTNAME || 'default'}`;
}
}
Change Streams for Microservices
Event-Driven Architecture
Use Change Streams to build loosely coupled microservices:
// Order service publishes events via Change Streams
class OrderService {
constructor(db, eventBus) {
this.db = db;
this.eventBus = eventBus;
}
startEventPublisher() {
const changeStream = this.db.collection('orders').watch([
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] }
}
}
], { fullDocument: 'updateLookup' });
changeStream.on('change', async (change) => {
const event = this.transformToBusinessEvent(change);
await this.eventBus.publish(event);
});
}
transformToBusinessEvent(change) {
const baseEvent = {
eventId: change._id._data,
timestamp: change.wallTime,
source: 'order-service',
version: '1.0'
};
switch (change.operationType) {
case 'insert':
return {
...baseEvent,
eventType: 'OrderCreated',
data: {
orderId: change.documentKey._id,
customerId: change.fullDocument.customer_id,
totalAmount: change.fullDocument.total_amount,
items: change.fullDocument.items
}
};
case 'update':
const updatedFields = change.updateDescription?.updatedFields || {};
if (updatedFields.status) {
return {
...baseEvent,
eventType: 'OrderStatusChanged',
data: {
orderId: change.documentKey._id,
oldStatus: this.getOldStatus(change),
newStatus: updatedFields.status
}
};
}
return {
...baseEvent,
eventType: 'OrderUpdated',
data: {
orderId: change.documentKey._id,
updatedFields: updatedFields
}
};
case 'delete':
return {
...baseEvent,
eventType: 'OrderDeleted',
data: {
orderId: change.documentKey._id
}
};
}
}
}
Cross-Service Data Synchronization
Synchronize data across services using Change Streams:
-- SQL-style approach to service synchronization
-- Service A updates user profile
UPDATE users
SET email = '[email protected]',
updated_at = NOW()
WHERE user_id = 12345;
-- Service B receives event and updates its local cache
INSERT INTO user_cache (
user_id,
email,
last_sync,
sync_version
) VALUES (
12345,
'[email protected]',
NOW(),
(SELECT COALESCE(MAX(sync_version), 0) + 1 FROM user_cache WHERE user_id = 12345)
) ON CONFLICT (user_id)
DO UPDATE SET
email = EXCLUDED.email,
last_sync = EXCLUDED.last_sync,
sync_version = EXCLUDED.sync_version;
MongoDB Change Streams implementation:
// Service B subscribes to user changes from Service A
class UserSyncService {
constructor(sourceDb, localDb) {
this.sourceDb = sourceDb;
this.localDb = localDb;
}
startSync() {
const userChangeStream = this.sourceDb.collection('users').watch([
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] },
'fullDocument.service_visibility': { $in: ['public', 'internal'] }
}
}
], { fullDocument: 'updateLookup' });
userChangeStream.on('change', async (change) => {
await this.syncUserChange(change);
});
}
async syncUserChange(change) {
const session = this.localDb.client.startSession();
try {
await session.withTransaction(async () => {
switch (change.operationType) {
case 'insert':
case 'update':
await this.localDb.collection('user_cache').updateOne(
{ user_id: change.documentKey._id },
{
$set: {
email: change.fullDocument.email,
name: change.fullDocument.name,
profile_data: change.fullDocument.profile_data,
last_sync: new Date(),
source_version: change.clusterTime
}
},
{ upsert: true, session }
);
break;
case 'delete':
await this.localDb.collection('user_cache').deleteOne(
{ user_id: change.documentKey._id },
{ session }
);
break;
}
// Log sync event for debugging
await this.localDb.collection('sync_log').insertOne({
operation: change.operationType,
collection: 'users',
document_id: change.documentKey._id,
timestamp: new Date(),
cluster_time: change.clusterTime
}, { session });
});
} finally {
await session.endSession();
}
}
}
Performance and Scalability
Change Stream Optimization
Optimize Change Streams for high-throughput scenarios:
// High-performance change stream configuration
const changeStreamOptions = {
fullDocument: 'whenAvailable', // Don't fetch full documents if not needed
batchSize: 100, // Process changes in batches
maxTimeMS: 5000, // Timeout for getMore operations
collation: {
locale: 'simple' // Use simple collation for performance
}
};
// Batch processing for high-throughput scenarios
class BatchChangeProcessor {
constructor(db, collection, batchSize = 50) {
this.db = db;
this.collection = collection;
this.batchSize = batchSize;
this.changeBatch = [];
this.batchTimer = null;
}
startProcessing() {
const changeStream = this.db.collection(this.collection)
.watch([], changeStreamOptions);
changeStream.on('change', (change) => {
this.changeBatch.push(change);
// Process batch when full or after timeout
if (this.changeBatch.length >= this.batchSize) {
this.processBatch();
} else if (!this.batchTimer) {
this.batchTimer = setTimeout(() => {
if (this.changeBatch.length > 0) {
this.processBatch();
}
}, 1000);
}
});
}
async processBatch() {
const batch = this.changeBatch.splice(0);
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}
try {
// Process batch of changes
await this.handleChangeBatch(batch);
} catch (error) {
console.error('Error processing change batch:', error);
// Implement retry logic or dead letter queue
}
}
async handleChangeBatch(changes) {
// Group changes by operation type
const inserts = changes.filter(c => c.operationType === 'insert');
const updates = changes.filter(c => c.operationType === 'update');
const deletes = changes.filter(c => c.operationType === 'delete');
// Process each operation type in parallel
await Promise.all([
this.processInserts(inserts),
this.processUpdates(updates),
this.processDeletes(deletes)
]);
}
}
QueryLeaf Change Stream Integration
QueryLeaf can help translate Change Stream concepts to familiar SQL patterns:
-- QueryLeaf provides SQL-like syntax for change stream operations
CREATE TRIGGER user_activity_trigger
ON user_activity
FOR INSERT, UPDATE, DELETE
AS
BEGIN
-- Process real-time user activity changes
WITH activity_changes AS (
SELECT
CASE
WHEN operation = 'INSERT' THEN 'user_registered'
WHEN operation = 'UPDATE' AND NEW.last_login != OLD.last_login THEN 'user_login'
WHEN operation = 'DELETE' THEN 'user_deactivated'
END AS event_type,
NEW.user_id,
NEW.email,
NEW.last_login,
CURRENT_TIMESTAMP AS event_timestamp
FROM INSERTED NEW
LEFT JOIN DELETED OLD ON NEW.user_id = OLD.user_id
WHERE event_type IS NOT NULL
)
INSERT INTO user_events (
event_type,
user_id,
event_data,
timestamp
)
SELECT
event_type,
user_id,
JSON_OBJECT(
'email', email,
'last_login', last_login
),
event_timestamp
FROM activity_changes;
END;
-- Query real-time user activity
SELECT
event_type,
COUNT(*) as event_count,
DATE_TRUNC('minute', timestamp) as minute
FROM user_events
WHERE timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY event_type, DATE_TRUNC('minute', timestamp)
ORDER BY minute DESC, event_count DESC;
-- QueryLeaf automatically translates this to:
-- 1. MongoDB Change Stream with appropriate filters
-- 2. Aggregation pipeline for event grouping
-- 3. Real-time event emission to subscribers
-- 4. Automatic resume token management
Security and Access Control
Change Stream Permissions
Control access to change stream data:
// Role-based change stream access
db.createRole({
role: "orderChangeStreamReader",
privileges: [
{
resource: { db: "ecommerce", collection: "orders" },
actions: ["changeStream", "find"]
}
],
roles: []
});
// Create user with limited change stream access
db.createUser({
user: "orderProcessor",
pwd: "securePassword",
roles: ["orderChangeStreamReader"]
});
Data Filtering and Privacy
Filter sensitive data from change streams:
// Privacy-aware change stream
const privateFieldsFilter = {
$unset: [
'fullDocument.credit_card',
'fullDocument.ssn',
'fullDocument.personal_notes'
]
};
const changeStream = db.collection('customers').watch([
{
$match: {
'operationType': { $in: ['insert', 'update'] }
}
},
privateFieldsFilter // Remove sensitive fields
]);
Best Practices for Change Streams
- Resume Token Management: Always persist resume tokens for fault tolerance
- Error Handling: Implement comprehensive error handling and retry logic
- Performance Monitoring: Monitor change stream lag and processing times
- Resource Management: Use appropriate batch sizes and connection pooling
- Security: Filter sensitive data and implement proper access controls
- Testing: Test resume behavior and failover scenarios regularly
Conclusion
MongoDB Change Streams provide a powerful foundation for building real-time, event-driven applications. Combined with SQL-style event handling patterns, you can create responsive systems that react to data changes instantly while maintaining familiar development patterns.
Key benefits of Change Streams include:
- Real-Time Processing: Immediate notification of database changes without polling
- Event-Driven Architecture: Build loosely coupled microservices that react to data events
- Fault Tolerance: Resume processing from any point using resume tokens
- Scalability: Handle high-throughput scenarios with batch processing and filtering
- Flexibility: Use aggregation pipelines to transform and filter events
Whether you're building collaborative applications, real-time dashboards, or distributed microservices, Change Streams enable you to create responsive systems that scale efficiently. The combination of MongoDB's powerful change detection with QueryLeaf's familiar SQL patterns makes building real-time applications both powerful and accessible.
From e-commerce order processing to live analytics dashboards, Change Streams provide the foundation for modern, event-driven applications that deliver exceptional user experiences through real-time data processing.