MongoDB Change Streams and Event-Driven Architecture: Building Reactive Applications with SQL-Style Event Processing
Modern applications increasingly require real-time responsiveness and event-driven architectures that can react instantly to data changes across distributed systems. Traditional polling-based approaches for change detection introduce significant latency, resource overhead, and scaling challenges that make building responsive applications complex and inefficient.
MongoDB Change Streams provide native event streaming capabilities that enable applications to watch for data changes in real-time, triggering immediate reactions without polling overhead. Unlike traditional database triggers or external change data capture systems, MongoDB Change Streams offer a unified, scalable approach to event-driven architecture that works seamlessly across replica sets and sharded clusters.
The Traditional Change Detection Challenge
Traditional approaches to detecting and reacting to data changes have significant architectural and performance limitations:
-- Traditional polling approach - inefficient and high-latency
-- PostgreSQL polling-based change detection
CREATE TABLE user_activities (
activity_id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
activity_type VARCHAR(50) NOT NULL,
activity_data JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE
);
-- Polling query runs every few seconds
SELECT
activity_id,
user_id,
activity_type,
activity_data,
created_at
FROM user_activities
WHERE processed = FALSE
ORDER BY created_at ASC
LIMIT 100;
-- Mark as processed after handling
UPDATE user_activities
SET processed = TRUE, updated_at = CURRENT_TIMESTAMP
WHERE activity_id IN (1, 2, 3, ...);
-- Problems with polling approach:
-- 1. High latency - changes only detected on poll intervals
-- 2. Resource waste - constant querying even when no changes
-- 3. Scaling issues - increased polling frequency impacts performance
-- 4. Race conditions - multiple consumers competing for same records
-- 5. Complex state management - tracking processed vs unprocessed
-- 6. Poor real-time experience - delays in reaction to changes
-- Database trigger approach (limited and complex)
CREATE OR REPLACE FUNCTION notify_activity_change()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('activity_changes',
json_build_object(
'activity_id', NEW.activity_id,
'user_id', NEW.user_id,
'activity_type', NEW.activity_type,
'operation', TG_OP
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER activity_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON user_activities
FOR EACH ROW EXECUTE FUNCTION notify_activity_change();
-- Trigger limitations:
-- - Limited to single database instance
-- - No ordering guarantees across tables
-- - Difficult error handling and retry logic
-- - Complex setup for distributed systems
-- - No built-in filtering or transformation
-- - Poor integration with modern event architectures
-- MySQL limitations (even more restrictive)
CREATE TABLE change_log (
id INT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100),
record_id VARCHAR(100),
operation VARCHAR(10),
change_data JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Basic trigger for change tracking
DELIMITER $$
CREATE TRIGGER user_change_tracker
AFTER INSERT ON users
FOR EACH ROW
BEGIN
INSERT INTO change_log (table_name, record_id, operation, change_data)
VALUES ('users', NEW.id, 'INSERT', JSON_OBJECT('user_id', NEW.id));
END$$
DELIMITER ;
-- MySQL trigger limitations:
-- - Very limited JSON functionality
-- - No advanced event routing capabilities
-- - Poor performance with high-volume changes
-- - Complex maintenance and debugging
-- - No distributed system support
MongoDB Change Streams provide comprehensive event-driven capabilities:
// MongoDB Change Streams - native event-driven architecture
const { MongoClient } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('event_driven_platform');
// Advanced Change Stream implementation for event-driven architecture
class EventDrivenMongoDBPlatform {
constructor(db) {
this.db = db;
this.changeStreams = new Map();
this.eventHandlers = new Map();
this.metrics = {
eventsProcessed: 0,
lastEvent: null,
errorCount: 0
};
}
async setupEventDrivenCollections() {
// Create collections for different event types
const collections = {
userActivities: db.collection('user_activities'),
orderEvents: db.collection('order_events'),
inventoryChanges: db.collection('inventory_changes'),
systemEvents: db.collection('system_events'),
auditLog: db.collection('audit_log')
};
// Create indexes for optimal change stream performance
for (const [name, collection] of Object.entries(collections)) {
await collection.createIndex({ userId: 1, timestamp: -1 });
await collection.createIndex({ eventType: 1, status: 1 });
await collection.createIndex({ createdAt: -1 });
}
return collections;
}
async startChangeStreamWatchers() {
console.log('Starting change stream watchers...');
// 1. Watch all changes across entire database
await this.watchDatabaseChanges();
// 2. Watch specific collection changes with filtering
await this.watchUserActivityChanges();
// 3. Watch order processing pipeline
await this.watchOrderEvents();
// 4. Watch inventory for real-time stock updates
await this.watchInventoryChanges();
console.log('All change stream watchers started');
}
async watchDatabaseChanges() {
console.log('Setting up database-level change stream...');
const changeStream = this.db.watch(
[
// Pipeline to filter and transform events
{
$match: {
// Only watch insert, update, delete operations
operationType: { $in: ['insert', 'update', 'delete', 'replace'] },
// Exclude system collections and temporary data
'ns.coll': {
$not: { $regex: '^(system\.|temp_)' }
}
}
},
{
$addFields: {
// Add event metadata
eventId: { $toString: '$_id' },
eventTimestamp: '$clusterTime',
database: '$ns.db',
collection: '$ns.coll',
// Create standardized event structure
eventData: {
$switch: {
branches: [
{
case: { $eq: ['$operationType', 'insert'] },
then: {
operation: 'created',
document: '$fullDocument'
}
},
{
case: { $eq: ['$operationType', 'update'] },
then: {
operation: 'updated',
documentKey: '$documentKey',
updatedFields: '$updateDescription.updatedFields',
removedFields: '$updateDescription.removedFields'
}
},
{
case: { $eq: ['$operationType', 'delete'] },
then: {
operation: 'deleted',
documentKey: '$documentKey'
}
}
],
default: {
operation: '$operationType',
documentKey: '$documentKey'
}
}
}
}
}
],
{
fullDocument: 'updateLookup', // Include full document for updates
fullDocumentBeforeChange: 'whenAvailable' // Include before state
}
);
this.changeStreams.set('database', changeStream);
// Handle database-level events
changeStream.on('change', async (changeEvent) => {
try {
await this.handleDatabaseEvent(changeEvent);
this.updateMetrics('database', changeEvent);
} catch (error) {
console.error('Error handling database event:', error);
this.metrics.errorCount++;
}
});
changeStream.on('error', (error) => {
console.error('Database change stream error:', error);
this.handleChangeStreamError('database', error);
});
}
async watchUserActivityChanges() {
console.log('Setting up user activity change stream...');
const userActivities = this.db.collection('user_activities');
const changeStream = userActivities.watch(
[
{
$match: {
operationType: { $in: ['insert', 'update'] },
// Only watch for significant user activities
$or: [
{ 'fullDocument.activityType': 'login' },
{ 'fullDocument.activityType': 'purchase' },
{ 'fullDocument.activityType': 'subscription_change' },
{ 'fullDocument.status': 'completed' },
{ 'updateDescription.updatedFields.status': 'completed' }
]
}
}
],
{
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
}
);
this.changeStreams.set('userActivities', changeStream);
changeStream.on('change', async (changeEvent) => {
try {
await this.handleUserActivityEvent(changeEvent);
// Trigger downstream events based on activity type
await this.triggerDownstreamEvents('user_activity', changeEvent);
} catch (error) {
console.error('Error handling user activity event:', error);
await this.logEventError('user_activities', changeEvent, error);
}
});
}
async watchOrderEvents() {
console.log('Setting up order events change stream...');
const orderEvents = this.db.collection('order_events');
const changeStream = orderEvents.watch(
[
{
$match: {
operationType: 'insert',
// Order lifecycle events
'fullDocument.eventType': {
$in: ['order_created', 'payment_processed', 'order_shipped',
'order_delivered', 'order_cancelled', 'refund_processed']
}
}
},
{
$addFields: {
// Enrich with order context
orderStage: {
$switch: {
branches: [
{ case: { $eq: ['$fullDocument.eventType', 'order_created'] }, then: 'pending' },
{ case: { $eq: ['$fullDocument.eventType', 'payment_processed'] }, then: 'confirmed' },
{ case: { $eq: ['$fullDocument.eventType', 'order_shipped'] }, then: 'in_transit' },
{ case: { $eq: ['$fullDocument.eventType', 'order_delivered'] }, then: 'completed' },
{ case: { $eq: ['$fullDocument.eventType', 'order_cancelled'] }, then: 'cancelled' }
],
default: 'unknown'
}
},
// Priority for event processing
processingPriority: {
$switch: {
branches: [
{ case: { $eq: ['$fullDocument.eventType', 'payment_processed'] }, then: 1 },
{ case: { $eq: ['$fullDocument.eventType', 'order_created'] }, then: 2 },
{ case: { $eq: ['$fullDocument.eventType', 'order_cancelled'] }, then: 1 },
{ case: { $eq: ['$fullDocument.eventType', 'refund_processed'] }, then: 1 }
],
default: 3
}
}
}
}
],
{ fullDocument: 'updateLookup' }
);
this.changeStreams.set('orderEvents', changeStream);
changeStream.on('change', async (changeEvent) => {
try {
// Route to appropriate order processing handler
await this.processOrderEventChange(changeEvent);
// Update order state machine
await this.updateOrderStateMachine(changeEvent);
// Trigger business logic workflows
await this.triggerOrderWorkflows(changeEvent);
} catch (error) {
console.error('Error processing order event:', error);
await this.handleOrderEventError(changeEvent, error);
}
});
}
async watchInventoryChanges() {
console.log('Setting up inventory change stream...');
const inventoryChanges = this.db.collection('inventory_changes');
const changeStream = inventoryChanges.watch(
[
{
$match: {
$or: [
// Stock level changes
{
operationType: 'update',
'updateDescription.updatedFields.stockLevel': { $exists: true }
},
// New inventory items
{
operationType: 'insert',
'fullDocument.itemType': 'product'
},
// Inventory alerts
{
operationType: 'insert',
'fullDocument.alertType': { $in: ['low_stock', 'out_of_stock', 'restock'] }
}
]
}
}
],
{
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
}
);
this.changeStreams.set('inventoryChanges', changeStream);
changeStream.on('change', async (changeEvent) => {
try {
// Real-time inventory updates
await this.handleInventoryChange(changeEvent);
// Check for low stock alerts
await this.checkInventoryAlerts(changeEvent);
// Update product availability in real-time
await this.updateProductAvailability(changeEvent);
// Notify relevant systems (pricing, recommendations, etc.)
await this.notifyInventorySubscribers(changeEvent);
} catch (error) {
console.error('Error handling inventory change:', error);
await this.logInventoryError(changeEvent, error);
}
});
}
async handleDatabaseEvent(changeEvent) {
const { database, collection, eventData, operationType } = changeEvent;
console.log(`Database Event: ${operationType} in ${database}.${collection}`);
// Global event logging
await this.logGlobalEvent({
eventId: changeEvent.eventId,
timestamp: new Date(changeEvent.clusterTime),
database: database,
collection: collection,
operation: operationType,
eventData: eventData
});
// Route to collection-specific handlers
await this.routeCollectionEvent(collection, changeEvent);
// Update global metrics and monitoring
await this.updateGlobalMetrics(changeEvent);
}
async handleUserActivityEvent(changeEvent) {
const { fullDocument, operationType } = changeEvent;
const activity = fullDocument;
console.log(`User Activity: ${activity.activityType} for user ${activity.userId}`);
// Real-time user analytics
if (activity.activityType === 'login') {
await this.updateUserSession(activity);
await this.trackUserLocation(activity);
}
// Purchase events
if (activity.activityType === 'purchase') {
await this.processRealtimePurchase(activity);
await this.updateRecommendations(activity.userId);
await this.triggerLoyaltyUpdates(activity);
}
// Subscription changes
if (activity.activityType === 'subscription_change') {
await this.processSubscriptionChange(activity);
await this.updateBilling(activity);
}
// Create reactive events for downstream systems
await this.publishUserEvent(activity, operationType);
}
async processOrderEventChange(changeEvent) {
const { fullDocument: orderEvent } = changeEvent;
console.log(`Order Event: ${orderEvent.eventType} for order ${orderEvent.orderId}`);
switch (orderEvent.eventType) {
case 'order_created':
await this.processNewOrder(orderEvent);
break;
case 'payment_processed':
await this.confirmOrderPayment(orderEvent);
await this.triggerFulfillment(orderEvent);
break;
case 'order_shipped':
await this.updateShippingTracking(orderEvent);
await this.notifyCustomer(orderEvent);
break;
case 'order_delivered':
await this.completeOrder(orderEvent);
await this.triggerPostDeliveryWorkflow(orderEvent);
break;
case 'order_cancelled':
await this.processCancellation(orderEvent);
await this.handleRefund(orderEvent);
break;
}
// Update order analytics in real-time
await this.updateOrderAnalytics(orderEvent);
}
async handleInventoryChange(changeEvent) {
const { fullDocument: inventory, operationType } = changeEvent;
console.log(`Inventory Change: ${operationType} for item ${inventory.itemId}`);
// Real-time stock updates
if (changeEvent.updateDescription?.updatedFields?.stockLevel !== undefined) {
const newStock = changeEvent.fullDocument.stockLevel;
const previousStock = changeEvent.fullDocumentBeforeChange?.stockLevel || 0;
await this.handleStockLevelChange({
itemId: inventory.itemId,
previousStock: previousStock,
newStock: newStock,
changeAmount: newStock - previousStock
});
}
// Product availability updates
await this.updateProductCatalog(inventory);
// Pricing adjustments based on stock levels
await this.updateDynamicPricing(inventory);
}
async triggerDownstreamEvents(eventType, changeEvent) {
// Message queue integration for external systems
const event = {
eventId: generateEventId(),
eventType: eventType,
timestamp: new Date(),
source: 'mongodb-change-stream',
data: changeEvent,
version: '1.0'
};
// Publish to different channels based on event type
await this.publishToEventBus(event);
await this.updateEventSourcing(event);
await this.triggerWebhooks(event);
}
async publishToEventBus(event) {
// Integration with message queues (Kafka, RabbitMQ, etc.)
console.log(`Publishing event ${event.eventId} to event bus`);
// Route to appropriate topics/queues
const routingKey = `${event.eventType}.${event.data.operationType}`;
// Simulate message queue publishing
// await messageQueue.publish(routingKey, event);
}
async setupResumeTokenPersistence() {
// Persist resume tokens for fault tolerance
const resumeTokens = this.db.collection('change_stream_resume_tokens');
// Save resume tokens periodically
setInterval(async () => {
for (const [streamName, changeStream] of this.changeStreams.entries()) {
try {
const resumeToken = changeStream.resumeToken;
if (resumeToken) {
await resumeTokens.updateOne(
{ streamName: streamName },
{
$set: {
resumeToken: resumeToken,
lastUpdated: new Date()
}
},
{ upsert: true }
);
}
} catch (error) {
console.error(`Error saving resume token for ${streamName}:`, error);
}
}
}, 10000); // Every 10 seconds
}
async handleChangeStreamError(streamName, error) {
console.error(`Change stream ${streamName} encountered error:`, error);
// Implement retry logic with exponential backoff
setTimeout(async () => {
try {
console.log(`Attempting to restart change stream: ${streamName}`);
// Load last known resume token
const resumeTokenDoc = await this.db.collection('change_stream_resume_tokens')
.findOne({ streamName: streamName });
// Restart stream from last known position
if (resumeTokenDoc?.resumeToken) {
// Restart with resume token
await this.restartChangeStream(streamName, resumeTokenDoc.resumeToken);
} else {
// Restart from current time
await this.restartChangeStream(streamName);
}
} catch (retryError) {
console.error(`Failed to restart change stream ${streamName}:`, retryError);
// Implement exponential backoff retry
}
}, 5000); // Initial 5-second delay
}
async getChangeStreamMetrics() {
return {
activeStreams: this.changeStreams.size,
eventsProcessed: this.metrics.eventsProcessed,
lastEventTime: this.metrics.lastEvent,
errorCount: this.metrics.errorCount,
streamHealth: Array.from(this.changeStreams.entries()).map(([name, stream]) => ({
name: name,
isActive: !stream.closed,
hasResumeToken: !!stream.resumeToken
}))
};
}
updateMetrics(streamName, changeEvent) {
this.metrics.eventsProcessed++;
this.metrics.lastEvent = new Date();
console.log(`Processed event from ${streamName}: ${changeEvent.operationType}`);
}
async shutdown() {
console.log('Shutting down change streams...');
// Close all change streams gracefully
for (const [name, changeStream] of this.changeStreams.entries()) {
try {
await changeStream.close();
console.log(`Closed change stream: ${name}`);
} catch (error) {
console.error(`Error closing change stream ${name}:`, error);
}
}
this.changeStreams.clear();
console.log('All change streams closed');
}
}
// Usage example
const startEventDrivenPlatform = async () => {
try {
const platform = new EventDrivenMongoDBPlatform(db);
// Setup collections and indexes
await platform.setupEventDrivenCollections();
// Start change stream watchers
await platform.startChangeStreamWatchers();
// Setup fault tolerance
await platform.setupResumeTokenPersistence();
// Monitor platform health
setInterval(async () => {
const metrics = await platform.getChangeStreamMetrics();
console.log('Platform Metrics:', metrics);
}, 30000); // Every 30 seconds
console.log('Event-driven platform started successfully');
return platform;
} catch (error) {
console.error('Error starting event-driven platform:', error);
throw error;
}
};
// Benefits of MongoDB Change Streams:
// - Real-time event processing without polling overhead
// - Ordered, durable event streams with resume token support
// - Cluster-wide change detection across replica sets and shards
// - Rich filtering and transformation capabilities through aggregation pipelines
// - Built-in fault tolerance and automatic failover
// - Integration with MongoDB's ACID transactions
// - Scalable event-driven architecture foundation
// - Native integration with MongoDB ecosystem and tools
module.exports = {
EventDrivenMongoDBPlatform,
startEventDrivenPlatform
};
Understanding MongoDB Change Streams Architecture
Advanced Change Stream Patterns
Implement sophisticated change stream patterns for different event-driven scenarios:
// Advanced change stream patterns and event processing
class AdvancedChangeStreamPatterns {
constructor(db) {
this.db = db;
this.eventProcessors = new Map();
this.eventStore = db.collection('event_store');
this.eventProjections = db.collection('event_projections');
}
async setupEventSourcingPattern() {
// Event sourcing with change streams
console.log('Setting up event sourcing pattern...');
const aggregateCollections = [
'user_aggregates',
'order_aggregates',
'inventory_aggregates',
'payment_aggregates'
];
for (const collectionName of aggregateCollections) {
const collection = this.db.collection(collectionName);
const changeStream = collection.watch(
[
{
$match: {
operationType: { $in: ['insert', 'update', 'replace'] }
}
},
{
$addFields: {
// Create event sourcing envelope
eventEnvelope: {
eventId: { $toString: '$_id' },
eventType: '$operationType',
aggregateId: '$documentKey._id',
aggregateType: collectionName,
eventVersion: { $ifNull: ['$fullDocument.version', 1] },
eventData: '$fullDocument',
eventMetadata: {
timestamp: '$clusterTime',
source: 'change-stream',
causationId: '$fullDocument.causationId',
correlationId: '$fullDocument.correlationId'
}
}
}
}
],
{
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
}
);
changeStream.on('change', async (changeEvent) => {
await this.processEventSourcingEvent(changeEvent);
});
this.eventProcessors.set(`${collectionName}_eventsourcing`, changeStream);
}
}
async processEventSourcingEvent(changeEvent) {
const { eventEnvelope } = changeEvent;
// Store event in event store
await this.eventStore.insertOne({
...eventEnvelope,
storedAt: new Date(),
processedBy: [],
projectionStatus: 'pending'
});
// Update read model projections
await this.updateProjections(eventEnvelope);
// Trigger sagas and process managers
await this.triggerSagas(eventEnvelope);
}
async setupCQRSPattern() {
// Command Query Responsibility Segregation with change streams
console.log('Setting up CQRS pattern...');
const commandCollections = ['commands', 'command_results'];
for (const collectionName of commandCollections) {
const collection = this.db.collection(collectionName);
const changeStream = collection.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.status': { $ne: 'processed' }
}
}
],
{ fullDocument: 'updateLookup' }
);
changeStream.on('change', async (changeEvent) => {
await this.processCommand(changeEvent.fullDocument);
});
this.eventProcessors.set(`${collectionName}_cqrs`, changeStream);
}
}
async setupSagaOrchestration() {
// Saga pattern for distributed transaction coordination
console.log('Setting up saga orchestration...');
const sagaCollection = this.db.collection('sagas');
const changeStream = sagaCollection.watch(
[
{
$match: {
$or: [
{ operationType: 'insert' },
{
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
]
}
}
],
{ fullDocument: 'updateLookup' }
);
changeStream.on('change', async (changeEvent) => {
await this.processSagaEvent(changeEvent);
});
this.eventProcessors.set('saga_orchestration', changeStream);
}
async processSagaEvent(changeEvent) {
const saga = changeEvent.fullDocument;
const { sagaId, status, currentStep, steps } = saga;
console.log(`Processing saga ${sagaId}: ${status} at step ${currentStep}`);
switch (status) {
case 'started':
await this.executeSagaStep(saga, 0);
break;
case 'step_completed':
if (currentStep + 1 < steps.length) {
await this.executeSagaStep(saga, currentStep + 1);
} else {
await this.completeSaga(sagaId);
}
break;
case 'step_failed':
await this.compensateSaga(saga, currentStep);
break;
case 'compensating':
if (currentStep > 0) {
await this.executeCompensation(saga, currentStep - 1);
} else {
await this.failSaga(sagaId);
}
break;
}
}
async setupStreamProcessing() {
// Stream processing with windowed aggregations
console.log('Setting up stream processing...');
const eventStream = this.db.collection('events');
const changeStream = eventStream.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.eventType': { $in: ['user_activity', 'transaction', 'system_event'] }
}
},
{
$addFields: {
processingWindow: {
$dateTrunc: {
date: '$fullDocument.timestamp',
unit: 'minute',
binSize: 5 // 5-minute windows
}
}
}
}
],
{ fullDocument: 'updateLookup' }
);
let windowBuffer = new Map();
changeStream.on('change', async (changeEvent) => {
await this.processStreamEvent(changeEvent, windowBuffer);
});
// Process window aggregations every minute
setInterval(async () => {
await this.processWindowedAggregations(windowBuffer);
}, 60000);
this.eventProcessors.set('stream_processing', changeStream);
}
async processStreamEvent(changeEvent, windowBuffer) {
const event = changeEvent.fullDocument;
const window = changeEvent.processingWindow;
const windowKey = window.toISOString();
if (!windowBuffer.has(windowKey)) {
windowBuffer.set(windowKey, {
window: window,
events: [],
aggregations: {
count: 0,
userActivities: 0,
transactions: 0,
systemEvents: 0,
totalValue: 0
}
});
}
const windowData = windowBuffer.get(windowKey);
windowData.events.push(event);
windowData.aggregations.count++;
// Type-specific aggregations
switch (event.eventType) {
case 'user_activity':
windowData.aggregations.userActivities++;
break;
case 'transaction':
windowData.aggregations.transactions++;
windowData.aggregations.totalValue += event.amount || 0;
break;
case 'system_event':
windowData.aggregations.systemEvents++;
break;
}
// Real-time alerting for anomalies
if (windowData.aggregations.count > 1000) {
await this.triggerVolumeAlert(windowKey, windowData);
}
}
async setupMultiCollectionCoordination() {
// Coordinate changes across multiple collections
console.log('Setting up multi-collection coordination...');
const coordinationConfig = [
{
collections: ['users', 'user_preferences', 'user_activities'],
coordinator: 'userProfileCoordinator'
},
{
collections: ['orders', 'order_items', 'payments', 'shipping'],
coordinator: 'orderProcessingCoordinator'
},
{
collections: ['products', 'inventory', 'pricing', 'reviews'],
coordinator: 'productManagementCoordinator'
}
];
for (const config of coordinationConfig) {
await this.setupCollectionCoordinator(config);
}
}
async setupCollectionCoordinator(config) {
const { collections, coordinator } = config;
for (const collectionName of collections) {
const collection = this.db.collection(collectionName);
const changeStream = collection.watch(
[
{
$match: {
operationType: { $in: ['insert', 'update', 'delete'] }
}
},
{
$addFields: {
coordinationContext: {
coordinator: coordinator,
sourceCollection: collectionName,
relatedCollections: collections.filter(c => c !== collectionName)
}
}
}
],
{ fullDocument: 'updateLookup' }
);
changeStream.on('change', async (changeEvent) => {
await this.processCoordinatedChange(changeEvent);
});
this.eventProcessors.set(`${collectionName}_${coordinator}`, changeStream);
}
}
async processCoordinatedChange(changeEvent) {
const { coordinationContext, fullDocument, operationType } = changeEvent;
const { coordinator, sourceCollection, relatedCollections } = coordinationContext;
console.log(`Coordinated change in ${sourceCollection} via ${coordinator}`);
// Execute coordination logic based on coordinator type
switch (coordinator) {
case 'userProfileCoordinator':
await this.coordinateUserProfileChanges(changeEvent);
break;
case 'orderProcessingCoordinator':
await this.coordinateOrderProcessing(changeEvent);
break;
case 'productManagementCoordinator':
await this.coordinateProductManagement(changeEvent);
break;
}
}
async coordinateUserProfileChanges(changeEvent) {
const { fullDocument, operationType, ns } = changeEvent;
const sourceCollection = ns.coll;
if (sourceCollection === 'users' && operationType === 'update') {
// User profile updated - sync preferences and activities
await this.syncUserPreferences(fullDocument._id);
await this.updateUserActivityContext(fullDocument._id);
}
if (sourceCollection === 'user_activities' && operationType === 'insert') {
// New activity - update user profile analytics
await this.updateUserAnalytics(fullDocument.userId, fullDocument);
}
}
async setupChangeStreamHealthMonitoring() {
// Health monitoring and metrics collection
console.log('Setting up change stream health monitoring...');
const healthMetrics = {
totalStreams: 0,
activeStreams: 0,
eventsProcessed: 0,
errorCount: 0,
lastProcessedEvent: null,
streamLatency: new Map()
};
// Monitor each change stream
for (const [streamName, changeStream] of this.eventProcessors.entries()) {
healthMetrics.totalStreams++;
if (!changeStream.closed) {
healthMetrics.activeStreams++;
}
// Monitor stream latency
const originalEmit = changeStream.emit;
changeStream.emit = function(event, ...args) {
if (event === 'change') {
const latency = Date.now() - args[0].clusterTime.getTime();
healthMetrics.streamLatency.set(streamName, latency);
healthMetrics.lastProcessedEvent = new Date();
healthMetrics.eventsProcessed++;
}
return originalEmit.call(this, event, ...args);
};
// Monitor errors
changeStream.on('error', (error) => {
healthMetrics.errorCount++;
console.error(`Stream ${streamName} error:`, error);
});
}
// Periodic health reporting
setInterval(() => {
this.reportHealthMetrics(healthMetrics);
}, 30000); // Every 30 seconds
return healthMetrics;
}
reportHealthMetrics(metrics) {
const avgLatency = Array.from(metrics.streamLatency.values())
.reduce((sum, latency) => sum + latency, 0) / metrics.streamLatency.size || 0;
console.log('Change Stream Health Report:', {
totalStreams: metrics.totalStreams,
activeStreams: metrics.activeStreams,
eventsProcessed: metrics.eventsProcessed,
errorCount: metrics.errorCount,
averageLatency: Math.round(avgLatency) + 'ms',
lastActivity: metrics.lastProcessedEvent
});
}
async shutdown() {
console.log('Shutting down advanced change stream patterns...');
for (const [name, processor] of this.eventProcessors.entries()) {
try {
await processor.close();
console.log(`Closed processor: ${name}`);
} catch (error) {
console.error(`Error closing processor ${name}:`, error);
}
}
this.eventProcessors.clear();
}
}
SQL-Style Change Stream Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB Change Stream operations:
-- QueryLeaf change stream operations with SQL-familiar syntax
-- Create change stream watchers with SQL-style syntax
CREATE CHANGE_STREAM user_activity_watcher ON user_activities
WITH (
operations = ['insert', 'update'],
full_document = 'updateLookup',
full_document_before_change = 'whenAvailable'
)
FILTER (
activity_type IN ('login', 'purchase', 'subscription_change')
OR status = 'completed'
);
-- Advanced change stream with aggregation pipeline
CREATE CHANGE_STREAM order_processing_watcher ON order_events
WITH (
operations = ['insert'],
full_document = 'updateLookup'
)
PIPELINE (
FILTER (
event_type IN ('order_created', 'payment_processed', 'order_shipped', 'order_delivered')
),
ADD_FIELDS (
order_stage = CASE
WHEN event_type = 'order_created' THEN 'pending'
WHEN event_type = 'payment_processed' THEN 'confirmed'
WHEN event_type = 'order_shipped' THEN 'in_transit'
WHEN event_type = 'order_delivered' THEN 'completed'
ELSE 'unknown'
END,
processing_priority = CASE
WHEN event_type = 'payment_processed' THEN 1
WHEN event_type = 'order_created' THEN 2
ELSE 3
END
)
);
-- Database-level change stream monitoring
CREATE CHANGE_STREAM database_monitor ON DATABASE
WITH (
operations = ['insert', 'update', 'delete'],
full_document = 'updateLookup'
)
FILTER (
-- Exclude system collections
ns.coll NOT LIKE 'system.%'
AND ns.coll NOT LIKE 'temp_%'
)
PIPELINE (
ADD_FIELDS (
event_id = CAST(_id AS VARCHAR),
event_timestamp = cluster_time,
database_name = ns.db,
collection_name = ns.coll,
event_data = CASE operation_type
WHEN 'insert' THEN JSON_BUILD_OBJECT('operation', 'created', 'document', full_document)
WHEN 'update' THEN JSON_BUILD_OBJECT(
'operation', 'updated',
'document_key', document_key,
'updated_fields', update_description.updated_fields,
'removed_fields', update_description.removed_fields
)
WHEN 'delete' THEN JSON_BUILD_OBJECT('operation', 'deleted', 'document_key', document_key)
ELSE JSON_BUILD_OBJECT('operation', operation_type, 'document_key', document_key)
END
)
);
-- Event-driven reactive queries
WITH CHANGE_STREAM inventory_changes AS (
SELECT
document_key._id as item_id,
full_document.item_name,
full_document.stock_level,
full_document_before_change.stock_level as previous_stock_level,
operation_type,
cluster_time as event_time,
-- Calculate stock change
full_document.stock_level - COALESCE(full_document_before_change.stock_level, 0) as stock_change
FROM CHANGE_STREAM ON inventory
WHERE operation_type IN ('insert', 'update')
AND (full_document.stock_level != full_document_before_change.stock_level OR operation_type = 'insert')
),
stock_alerts AS (
SELECT *,
CASE
WHEN stock_level = 0 THEN 'OUT_OF_STOCK'
WHEN stock_level <= 10 THEN 'LOW_STOCK'
WHEN stock_change > 0 AND previous_stock_level = 0 THEN 'RESTOCKED'
ELSE 'NORMAL'
END as alert_type,
CASE
WHEN stock_level = 0 THEN 'critical'
WHEN stock_level <= 10 THEN 'warning'
WHEN stock_change > 100 THEN 'info'
ELSE 'normal'
END as alert_severity
FROM inventory_changes
)
SELECT
item_id,
item_name,
stock_level,
previous_stock_level,
stock_change,
alert_type,
alert_severity,
event_time,
-- Generate alert message
CASE alert_type
WHEN 'OUT_OF_STOCK' THEN CONCAT('Item ', item_name, ' is now out of stock')
WHEN 'LOW_STOCK' THEN CONCAT('Item ', item_name, ' is running low (', stock_level, ' remaining)')
WHEN 'RESTOCKED' THEN CONCAT('Item ', item_name, ' has been restocked (', stock_level, ' units)')
ELSE CONCAT('Stock updated for ', item_name, ': ', stock_change, ' units')
END as alert_message
FROM stock_alerts
WHERE alert_type != 'NORMAL'
ORDER BY alert_severity DESC, event_time DESC;
-- Real-time user activity aggregation
WITH CHANGE_STREAM user_events AS (
SELECT
full_document.user_id,
full_document.activity_type,
full_document.session_id,
full_document.timestamp,
full_document.metadata,
cluster_time as event_time
FROM CHANGE_STREAM ON user_activities
WHERE operation_type = 'insert'
AND full_document.activity_type IN ('page_view', 'click', 'purchase', 'login')
),
session_aggregations AS (
SELECT
user_id,
session_id,
TIME_WINDOW('5 minutes', event_time) as time_window,
-- Activity counts
COUNT(*) as total_activities,
COUNT(*) FILTER (WHERE activity_type = 'page_view') as page_views,
COUNT(*) FILTER (WHERE activity_type = 'click') as clicks,
COUNT(*) FILTER (WHERE activity_type = 'purchase') as purchases,
-- Session metrics
MIN(timestamp) as session_start,
MAX(timestamp) as session_end,
MAX(timestamp) - MIN(timestamp) as session_duration,
-- Engagement scoring
COUNT(DISTINCT metadata.page_url) as unique_pages_visited,
AVG(EXTRACT(EPOCH FROM (LEAD(timestamp) OVER (ORDER BY timestamp) - timestamp))) as avg_time_between_activities
FROM user_events
GROUP BY user_id, session_id, TIME_WINDOW('5 minutes', event_time)
),
user_behavior_insights AS (
SELECT *,
-- Engagement level
CASE
WHEN session_duration > INTERVAL '30 minutes' AND clicks > 20 THEN 'highly_engaged'
WHEN session_duration > INTERVAL '10 minutes' AND clicks > 5 THEN 'engaged'
WHEN session_duration > INTERVAL '2 minutes' THEN 'browsing'
ELSE 'quick_visit'
END as engagement_level,
-- Conversion indicators
purchases > 0 as converted_session,
clicks / GREATEST(page_views, 1) as click_through_rate,
-- Behavioral patterns
CASE
WHEN unique_pages_visited > 10 THEN 'explorer'
WHEN avg_time_between_activities > 60 THEN 'reader'
WHEN clicks > page_views * 2 THEN 'active_clicker'
ELSE 'standard'
END as behavior_pattern
FROM session_aggregations
)
SELECT
user_id,
session_id,
time_window,
total_activities,
page_views,
clicks,
purchases,
session_duration,
engagement_level,
behavior_pattern,
converted_session,
ROUND(click_through_rate, 3) as ctr,
-- Real-time recommendations
CASE behavior_pattern
WHEN 'explorer' THEN 'Show product recommendations based on browsed categories'
WHEN 'reader' THEN 'Provide detailed product information and reviews'
WHEN 'active_clicker' THEN 'Present clear call-to-action buttons and offers'
ELSE 'Standard personalization approach'
END as recommendation_strategy
FROM user_behavior_insights
WHERE engagement_level IN ('engaged', 'highly_engaged')
ORDER BY session_start DESC;
-- Event sourcing with change streams
CREATE EVENT_STORE aggregate_events AS
SELECT
CAST(cluster_time AS VARCHAR) as event_id,
operation_type as event_type,
document_key._id as aggregate_id,
ns.coll as aggregate_type,
COALESCE(full_document.version, 1) as event_version,
full_document as event_data,
-- Event metadata
JSON_BUILD_OBJECT(
'timestamp', cluster_time,
'source', 'change-stream',
'causation_id', full_document.causation_id,
'correlation_id', full_document.correlation_id,
'user_id', full_document.user_id
) as event_metadata
FROM CHANGE_STREAM ON DATABASE
WHERE operation_type IN ('insert', 'update', 'replace')
AND ns.coll LIKE '%_aggregates'
ORDER BY cluster_time ASC;
-- CQRS read model projections
CREATE MATERIALIZED VIEW user_profile_projection AS
WITH user_events AS (
SELECT *
FROM aggregate_events
WHERE aggregate_type = 'user_aggregates'
AND event_type IN ('insert', 'update')
ORDER BY event_version ASC
),
profile_changes AS (
SELECT
aggregate_id as user_id,
event_data.email,
event_data.first_name,
event_data.last_name,
event_data.preferences,
event_data.subscription_status,
event_data.total_orders,
event_data.lifetime_value,
event_metadata.timestamp as last_updated,
-- Calculate derived fields
ROW_NUMBER() OVER (PARTITION BY aggregate_id ORDER BY event_version DESC) as rn
FROM user_events
)
SELECT
user_id,
email,
CONCAT(first_name, ' ', last_name) as full_name,
preferences,
subscription_status,
total_orders,
lifetime_value,
last_updated,
-- User segments
CASE
WHEN lifetime_value > 1000 THEN 'premium'
WHEN total_orders > 10 THEN 'loyal'
WHEN total_orders > 0 THEN 'customer'
ELSE 'prospect'
END as user_segment,
-- Activity status
CASE
WHEN last_updated >= CURRENT_TIMESTAMP - INTERVAL '7 days' THEN 'active'
WHEN last_updated >= CURRENT_TIMESTAMP - INTERVAL '30 days' THEN 'recent'
WHEN last_updated >= CURRENT_TIMESTAMP - INTERVAL '90 days' THEN 'inactive'
ELSE 'dormant'
END as activity_status
FROM profile_changes
WHERE rn = 1; -- Latest version only
-- Saga orchestration monitoring
WITH CHANGE_STREAM saga_events AS (
SELECT
full_document.saga_id,
full_document.saga_type,
full_document.status,
full_document.current_step,
full_document.steps,
full_document.started_at,
full_document.completed_at,
cluster_time as event_time,
operation_type
FROM CHANGE_STREAM ON sagas
WHERE operation_type IN ('insert', 'update')
),
saga_monitoring AS (
SELECT
saga_id,
saga_type,
status,
current_step,
ARRAY_LENGTH(steps, 1) as total_steps,
started_at,
completed_at,
event_time,
-- Progress calculation
CASE
WHEN status = 'completed' THEN 100.0
WHEN status = 'failed' THEN 0.0
WHEN total_steps > 0 THEN (current_step::numeric / total_steps) * 100.0
ELSE 0.0
END as progress_percentage,
-- Duration tracking
CASE
WHEN completed_at IS NOT NULL THEN completed_at - started_at
ELSE CURRENT_TIMESTAMP - started_at
END as duration,
-- Status classification
CASE status
WHEN 'completed' THEN 'success'
WHEN 'failed' THEN 'error'
WHEN 'compensating' THEN 'warning'
WHEN 'started' THEN 'in_progress'
ELSE 'unknown'
END as status_category
FROM saga_events
),
saga_health AS (
SELECT
saga_type,
status_category,
COUNT(*) as saga_count,
AVG(progress_percentage) as avg_progress,
AVG(EXTRACT(EPOCH FROM duration)) as avg_duration_seconds,
-- Performance metrics
COUNT(*) FILTER (WHERE status = 'completed') as success_count,
COUNT(*) FILTER (WHERE status = 'failed') as failure_count,
COUNT(*) FILTER (WHERE duration > INTERVAL '5 minutes') as slow_saga_count
FROM saga_monitoring
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY saga_type, status_category
)
SELECT
saga_type,
status_category,
saga_count,
ROUND(avg_progress, 1) as avg_progress_pct,
ROUND(avg_duration_seconds, 2) as avg_duration_sec,
success_count,
failure_count,
slow_saga_count,
-- Health indicators
CASE
WHEN failure_count > success_count THEN 'unhealthy'
WHEN slow_saga_count > saga_count * 0.5 THEN 'degraded'
ELSE 'healthy'
END as health_status,
-- Success rate
CASE
WHEN (success_count + failure_count) > 0
THEN ROUND((success_count::numeric / (success_count + failure_count)) * 100, 1)
ELSE 0.0
END as success_rate_pct
FROM saga_health
ORDER BY saga_type, status_category;
-- Resume token management for fault tolerance
CREATE TABLE change_stream_resume_tokens (
stream_name VARCHAR(100) PRIMARY KEY,
resume_token DOCUMENT NOT NULL,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
stream_config DOCUMENT,
-- Health tracking
last_event_time TIMESTAMP,
error_count INTEGER DEFAULT 0,
restart_count INTEGER DEFAULT 0
);
-- Monitoring and alerting for change streams
WITH stream_health AS (
SELECT
stream_name,
resume_token,
last_updated,
last_event_time,
error_count,
restart_count,
-- Health calculation
CURRENT_TIMESTAMP - last_event_time as time_since_last_event,
CURRENT_TIMESTAMP - last_updated as time_since_update,
CASE
WHEN last_event_time IS NULL THEN 'never_active'
WHEN CURRENT_TIMESTAMP - last_event_time > INTERVAL '5 minutes' THEN 'stalled'
WHEN error_count > 5 THEN 'error_prone'
WHEN restart_count > 3 THEN 'unstable'
ELSE 'healthy'
END as health_status
FROM change_stream_resume_tokens
)
SELECT
stream_name,
health_status,
EXTRACT(EPOCH FROM time_since_last_event) as seconds_since_last_event,
error_count,
restart_count,
-- Alert conditions
CASE health_status
WHEN 'never_active' THEN 'Stream has never processed events - check configuration'
WHEN 'stalled' THEN 'Stream has not processed events recently - investigate connectivity'
WHEN 'error_prone' THEN 'High error rate - review error logs and handlers'
WHEN 'unstable' THEN 'Frequent restarts - check resource limits and stability'
ELSE 'Stream operating normally'
END as alert_message,
CASE health_status
WHEN 'never_active' THEN 'critical'
WHEN 'stalled' THEN 'warning'
WHEN 'error_prone' THEN 'warning'
WHEN 'unstable' THEN 'info'
ELSE 'normal'
END as alert_severity
FROM stream_health
WHERE health_status != 'healthy'
ORDER BY
CASE health_status
WHEN 'never_active' THEN 1
WHEN 'stalled' THEN 2
WHEN 'error_prone' THEN 3
WHEN 'unstable' THEN 4
ELSE 5
END;
-- QueryLeaf provides comprehensive change stream capabilities:
-- 1. SQL-familiar change stream creation and management syntax
-- 2. Real-time event processing with filtering and transformation
-- 3. Event-driven architecture patterns (CQRS, Event Sourcing, Sagas)
-- 4. Advanced stream processing with windowed aggregations
-- 5. Fault tolerance with resume token management
-- 6. Health monitoring and alerting for change streams
-- 7. Integration with MongoDB's native change stream optimizations
-- 8. Reactive query patterns for real-time analytics
-- 9. Multi-collection coordination and event correlation
-- 10. Familiar SQL syntax for complex event-driven applications
Best Practices for Change Stream Implementation
Event-Driven Architecture Design
Essential patterns for building robust event-driven systems:
- Event Schema Design: Create consistent event schemas with proper versioning and backward compatibility
- Resume Token Management: Implement reliable resume token persistence for fault tolerance
- Error Handling: Design comprehensive error handling with retry logic and dead letter queues
- Ordering Guarantees: Understand MongoDB's ordering guarantees and design accordingly
- Filtering Optimization: Use aggregation pipelines to filter events at the database level
- Resource Management: Monitor memory usage and connection limits for change streams
Performance and Scalability
Optimize change streams for high-performance event processing:
- Connection Pooling: Use appropriate connection pooling for change stream connections
- Batch Processing: Process events in batches where possible to improve throughput
- Parallel Processing: Design for parallel event processing while maintaining ordering
- Resource Limits: Set appropriate limits on change stream cursors and connections
- Monitoring: Implement comprehensive monitoring for stream health and performance
- Graceful Degradation: Design fallback mechanisms for change stream failures
Conclusion
MongoDB Change Streams provide native event-driven architecture capabilities that eliminate the complexity and limitations of traditional polling and trigger-based approaches. The ability to react to data changes in real-time with ordered, resumable event streams makes building responsive, scalable applications both powerful and elegant.
Key Change Streams benefits include:
- Real-Time Reactivity: Instant response to data changes without polling overhead
- Ordered Event Processing: Guaranteed ordering within shards with resume token support
- Scalable Architecture: Works seamlessly across replica sets and sharded clusters
- Rich Filtering: Aggregation pipeline support for sophisticated event filtering and transformation
- Fault Tolerance: Built-in resume capabilities and error handling for production reliability
- Ecosystem Integration: Native integration with MongoDB's ACID transactions and tooling
Whether you're building microservices architectures, real-time dashboards, event sourcing systems, or any application requiring immediate response to data changes, MongoDB Change Streams with QueryLeaf's familiar SQL interface provides the foundation for modern event-driven applications.
QueryLeaf Integration: QueryLeaf seamlessly manages MongoDB Change Streams while providing SQL-familiar event processing syntax, change detection patterns, and reactive query capabilities. Advanced event-driven architecture patterns including CQRS, Event Sourcing, and Sagas are elegantly handled through familiar SQL constructs, making sophisticated reactive applications both powerful and accessible to SQL-oriented development teams.
The combination of native change stream capabilities with SQL-style event processing makes MongoDB an ideal platform for applications requiring both real-time responsiveness and familiar database interaction patterns, ensuring your event-driven solutions remain both effective and maintainable as they evolve and scale.