MongoDB Change Streams for Event-Driven Microservices: Real-Time Data Processing and Distributed System Architecture
Modern distributed applications require sophisticated event-driven architectures that can react to data changes in real-time, maintain consistency across microservices, and process streaming data with minimal latency. Traditional database approaches struggle to provide efficient change detection, often requiring complex polling mechanisms, external message brokers, or custom trigger implementations that introduce significant overhead and operational complexity.
MongoDB Change Streams provide native, real-time change detection capabilities that enable applications to reactively process database modifications with millisecond latency. Unlike traditional approaches that require periodic polling or complex event sourcing implementations, Change Streams deliver ordered, resumable streams of database changes that integrate seamlessly with microservices architectures and event-driven patterns.
The Traditional Change Detection Challenge
Conventional approaches to detecting and reacting to database changes have significant limitations for modern applications:
-- Traditional PostgreSQL change detection - complex and resource-intensive
-- Polling-based approach with timestamps
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_status VARCHAR(50) DEFAULT 'pending',
total_amount DECIMAL(10,2) NOT NULL,
items JSONB NOT NULL,
shipping_address JSONB NOT NULL,
payment_info JSONB,
-- Tracking fields for change detection
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
version INTEGER DEFAULT 1,
-- Change tracking
last_processed_at TIMESTAMP,
change_events TEXT[] DEFAULT ARRAY[]::TEXT[],
-- Indexes for polling queries
INDEX idx_orders_updated_at (updated_at),
INDEX idx_orders_status_updated (order_status, updated_at),
INDEX idx_orders_processing (last_processed_at, updated_at)
);
-- Trigger-based change tracking (complex maintenance)
CREATE TABLE order_change_log (
log_id SERIAL PRIMARY KEY,
order_id INTEGER REFERENCES orders(order_id),
change_type VARCHAR(20) NOT NULL, -- INSERT, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_fields TEXT[],
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
processing_attempts INTEGER DEFAULT 0,
INDEX idx_change_log_processing (processed, changed_at),
INDEX idx_change_log_order (order_id, changed_at)
);
-- Complex trigger function for change tracking
CREATE OR REPLACE FUNCTION track_order_changes()
RETURNS TRIGGER AS $$
DECLARE
old_json JSONB;
new_json JSONB;
changed_fields TEXT[] := ARRAY[]::TEXT[];
field_name TEXT;
BEGIN
-- Handle different operation types
IF TG_OP = 'DELETE' THEN
INSERT INTO order_change_log (order_id, change_type, old_values)
VALUES (OLD.order_id, 'DELETE', to_jsonb(OLD));
RETURN OLD;
END IF;
IF TG_OP = 'INSERT' THEN
INSERT INTO order_change_log (order_id, change_type, new_values)
VALUES (NEW.order_id, 'INSERT', to_jsonb(NEW));
RETURN NEW;
END IF;
-- UPDATE operation - detect changed fields
old_json := to_jsonb(OLD);
new_json := to_jsonb(NEW);
-- Compare each field
FOR field_name IN
SELECT DISTINCT key
FROM jsonb_each(old_json)
UNION
SELECT DISTINCT key
FROM jsonb_each(new_json)
LOOP
IF old_json->field_name != new_json->field_name OR
(old_json->field_name IS NULL) != (new_json->field_name IS NULL) THEN
changed_fields := array_append(changed_fields, field_name);
END IF;
END LOOP;
-- Only log if fields actually changed
IF array_length(changed_fields, 1) > 0 THEN
INSERT INTO order_change_log (
order_id, change_type, old_values, new_values, changed_fields
) VALUES (
NEW.order_id, 'UPDATE', old_json, new_json, changed_fields
);
-- Update version and tracking fields
NEW.updated_at := CURRENT_TIMESTAMP;
NEW.version := OLD.version + 1;
NEW.change_events := array_append(OLD.change_events,
'updated_' || array_to_string(changed_fields, ',') || '_at_' ||
extract(epoch from CURRENT_TIMESTAMP)::text
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create triggers (high overhead on write operations)
CREATE TRIGGER orders_change_trigger
BEFORE INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION track_order_changes();
-- Application polling logic (inefficient and high-latency)
WITH pending_changes AS (
SELECT
ocl.*,
o.order_status,
o.customer_id,
o.total_amount,
-- Determine change significance
CASE
WHEN 'order_status' = ANY(ocl.changed_fields) THEN 'high'
WHEN 'total_amount' = ANY(ocl.changed_fields) THEN 'medium'
WHEN 'items' = ANY(ocl.changed_fields) THEN 'medium'
ELSE 'low'
END as change_priority,
-- Calculate processing delay
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - ocl.changed_at)) as delay_seconds
FROM order_change_log ocl
JOIN orders o ON ocl.order_id = o.order_id
WHERE
ocl.processed = FALSE
AND ocl.processing_attempts < 3
AND ocl.changed_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
),
prioritized_changes AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY changed_at DESC
) as change_sequence,
-- Batch processing grouping
CASE change_priority
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
ELSE 3
END as processing_batch
FROM pending_changes
)
SELECT
pc.log_id,
pc.order_id,
pc.change_type,
pc.changed_fields,
pc.old_values,
pc.new_values,
pc.change_priority,
pc.delay_seconds,
pc.processing_batch,
-- Processing metadata
CASE
WHEN pc.delay_seconds > 300 THEN 'DELAYED'
WHEN pc.processing_attempts > 0 THEN 'RETRY'
ELSE 'READY'
END as processing_status,
-- Related order context
pc.order_status,
pc.customer_id,
pc.total_amount
FROM prioritized_changes pc
WHERE
pc.change_sequence = 1 -- Only latest change per order
AND (
pc.change_priority = 'high'
OR (pc.change_priority = 'medium' AND pc.delay_seconds < 60)
OR (pc.change_priority = 'low' AND pc.delay_seconds < 300)
)
ORDER BY
pc.processing_batch,
pc.changed_at ASC
LIMIT 100;
-- Problems with traditional change detection:
-- 1. High overhead from triggers on every write operation
-- 2. Complex polling logic with high latency and resource usage
-- 3. Risk of missing changes during application downtime
-- 4. Difficult to scale across multiple application instances
-- 5. No guaranteed delivery or ordering of change events
-- 6. Complex state management for processed vs unprocessed changes
-- 7. Performance degradation with high-volume write workloads
-- 8. Backup and restore complications with change log tables
-- 9. Cross-database change coordination challenges
-- 10. Limited filtering and transformation capabilities
-- MySQL change detection (even more limited)
CREATE TABLE mysql_orders (
id INT AUTO_INCREMENT PRIMARY KEY,
status VARCHAR(50),
amount DECIMAL(10,2),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX(updated_at)
);
-- Basic polling approach (no trigger support in standard MySQL)
SELECT
id, status, amount, updated_at,
UNIX_TIMESTAMP() - UNIX_TIMESTAMP(updated_at) as age_seconds
FROM mysql_orders
WHERE updated_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
ORDER BY updated_at DESC
LIMIT 1000;
-- MySQL limitations:
-- - No comprehensive trigger system for change tracking
-- - Limited JSON functionality for change metadata
-- - Basic polling only - no streaming capabilities
-- - Poor performance with high-volume change detection
-- - No built-in change stream or event sourcing support
-- - Complex custom implementation required for real-time processing
-- - Limited scalability for distributed architectures
MongoDB Change Streams provide powerful, real-time change detection with minimal overhead:
// MongoDB Change Streams - native real-time change processing
const { MongoClient } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('ecommerce');
// Comprehensive order management with change stream support
const setupOrderManagement = async () => {
const orders = db.collection('orders');
// Create sample order document structure
const orderDocument = {
_id: new ObjectId(),
customerId: new ObjectId("64a1b2c3d4e5f6789012347a"),
// Order details
orderNumber: "ORD-2024-001234",
status: "pending", // pending, confirmed, processing, shipped, delivered, cancelled
// Financial information
financial: {
subtotal: 299.99,
tax: 24.00,
shipping: 15.99,
discount: 25.00,
total: 314.98,
currency: "USD"
},
// Items with detailed tracking
items: [
{
productId: new ObjectId("64b2c3d4e5f6789012347b1a"),
sku: "LAPTOP-PRO-2024",
name: "Professional Laptop 2024",
quantity: 1,
unitPrice: 1299.99,
totalPrice: 1299.99,
// Item-level tracking
status: "pending", // pending, reserved, picked, shipped
warehouse: "WEST-01",
trackingNumber: null
},
{
productId: new ObjectId("64b2c3d4e5f6789012347b1b"),
sku: "MOUSE-WIRELESS-PREMIUM",
name: "Premium Wireless Mouse",
quantity: 2,
unitPrice: 79.99,
totalPrice: 159.98,
status: "pending",
warehouse: "WEST-01",
trackingNumber: null
}
],
// Customer information
customer: {
customerId: new ObjectId("64a1b2c3d4e5f6789012347a"),
email: "[email protected]",
name: "John Smith",
phone: "+1-555-0123",
loyaltyTier: "gold"
},
// Shipping details
shipping: {
method: "standard", // standard, express, overnight
carrier: "FedEx",
trackingNumber: null,
address: {
street: "123 Main Street",
unit: "Apt 4B",
city: "San Francisco",
state: "CA",
country: "USA",
postalCode: "94105",
// Geospatial data for logistics
coordinates: {
type: "Point",
coordinates: [-122.3937, 37.7955]
}
},
// Delivery preferences
preferences: {
signatureRequired: true,
leaveAtDoor: false,
deliveryInstructions: "Ring doorbell, apartment entrance on left side",
preferredTimeWindow: "9AM-12PM"
}
},
// Payment information
payment: {
method: "credit_card", // credit_card, paypal, apple_pay, etc.
status: "pending", // pending, authorized, captured, failed, refunded
transactionId: null,
// Payment processor details (sensitive data encrypted/redacted)
processor: {
name: "stripe",
paymentIntentId: "pi_1234567890abcdef",
chargeId: null,
receiptUrl: null
},
// Billing address
billingAddress: {
street: "123 Main Street",
city: "San Francisco",
state: "CA",
country: "USA",
postalCode: "94105"
}
},
// Fulfillment tracking
fulfillment: {
warehouseId: "WEST-01",
assignedAt: null,
pickedAt: null,
packedAt: null,
shippedAt: null,
deliveredAt: null,
// Fulfillment team
assignedTo: {
pickerId: null,
packerId: null
},
// Special handling
specialInstructions: [],
requiresSignature: true,
isFragile: false,
isGift: false
},
// Analytics and tracking
analytics: {
source: "web", // web, mobile, api, phone
campaign: "summer_sale_2024",
referrer: "google_ads",
sessionId: "sess_abc123def456",
// Customer journey
customerJourney: [
{
event: "product_view",
productId: new ObjectId("64b2c3d4e5f6789012347b1a"),
timestamp: new Date("2024-11-07T14:15:00Z")
},
{
event: "add_to_cart",
productId: new ObjectId("64b2c3d4e5f6789012347b1a"),
timestamp: new Date("2024-11-07T14:18:00Z")
},
{
event: "checkout_initiated",
timestamp: new Date("2024-11-07T14:25:00Z")
}
]
},
// Communication history
communications: [
{
type: "email",
subject: "Order Confirmation",
sentAt: new Date("2024-11-07T14:30:00Z"),
status: "sent",
templateId: "order_confirmation_v2"
}
],
// Audit trail
audit: {
createdAt: new Date("2024-11-07T14:30:00Z"),
createdBy: "system",
updatedAt: new Date("2024-11-07T14:30:00Z"),
updatedBy: "system",
version: 1,
// Change history for compliance
changes: []
},
// System metadata
metadata: {
environment: "production",
region: "us-west-1",
tenantId: "tenant_123",
// Feature flags
features: {
realTimeTracking: true,
smsNotifications: true,
expressDelivery: false
}
}
};
// Insert sample order
await orders.insertOne(orderDocument);
// Create indexes for change stream performance
await orders.createIndex({ status: 1, "audit.updatedAt": 1 });
await orders.createIndex({ customerId: 1, "audit.createdAt": -1 });
await orders.createIndex({ "items.status": 1 });
await orders.createIndex({ "payment.status": 1 });
await orders.createIndex({ "shipping.trackingNumber": 1 });
console.log('Order management setup completed');
return orders;
};
// Advanced Change Stream processing for event-driven architecture
class OrderEventProcessor {
constructor(db) {
this.db = db;
this.orders = db.collection('orders');
this.eventHandlers = new Map();
this.changeStream = null;
this.resumeToken = null;
this.processedEvents = new Set();
// Event processing statistics
this.stats = {
eventsProcessed: 0,
eventsSkipped: 0,
processingErrors: 0,
lastProcessedAt: null
};
}
async startChangeStreamProcessing() {
console.log('Starting MongoDB Change Stream processing...');
// Configure change stream with comprehensive options
const changeStreamOptions = {
// Pipeline to filter relevant changes
pipeline: [
{
// Only process specific operation types
$match: {
operationType: { $in: ['insert', 'update', 'delete', 'replace'] }
}
},
{
// Add additional metadata for processing
$addFields: {
// Extract key fields for quick processing decisions
documentKey: '$documentKey',
changeType: '$operationType',
changedFields: { $objectToArray: '$updateDescription.updatedFields' },
removedFields: '$updateDescription.removedFields',
// Processing metadata
processingPriority: {
$switch: {
branches: [
// High priority changes
{
case: {
$or: [
{ $eq: ['$operationType', 'insert'] },
{ $eq: ['$operationType', 'delete'] },
{
$anyElementTrue: {
$map: {
input: '$updateDescription.updatedFields',
in: {
$regexMatch: {
input: '$$this.k',
regex: '^(status|payment\\.status|fulfillment\\.).*'
}
}
}
}
}
]
},
then: 'high'
},
// Medium priority changes
{
case: {
$anyElementTrue: {
$map: {
input: '$updateDescription.updatedFields',
in: {
$regexMatch: {
input: '$$this.k',
regex: '^(items\\.|shipping\\.|customer\\.).*'
}
}
}
}
},
then: 'medium'
}
],
default: 'low'
}
}
}
}
],
// Change stream configuration
fullDocument: 'updateLookup', // Always include full document
fullDocumentBeforeChange: 'whenAvailable', // Include pre-change document when available
maxAwaitTimeMS: 1000, // Maximum wait time for new changes
batchSize: 100, // Process changes in batches
// Resume from stored token if available
startAfter: this.resumeToken
};
try {
// Create change stream on orders collection
this.changeStream = this.orders.watch(changeStreamOptions);
console.log('Change stream established - listening for events...');
// Process change events asynchronously
for await (const change of this.changeStream) {
try {
await this.processChangeEvent(change);
// Store resume token for recovery
this.resumeToken = change._id;
// Update statistics
this.stats.eventsProcessed++;
this.stats.lastProcessedAt = new Date();
} catch (error) {
console.error('Error processing change event:', error);
this.stats.processingErrors++;
// Implement retry logic or dead letter queue here
await this.handleProcessingError(change, error);
}
}
} catch (error) {
console.error('Change stream error:', error);
// Implement reconnection logic
await this.handleChangeStreamError(error);
}
}
async processChangeEvent(change) {
const { operationType, fullDocument, documentKey, updateDescription } = change;
const orderId = documentKey._id;
console.log(`Processing ${operationType} event for order ${orderId}`);
// Prevent duplicate processing
const eventId = `${orderId}_${change._id.toString()}`;
if (this.processedEvents.has(eventId)) {
console.log(`Skipping duplicate event: ${eventId}`);
this.stats.eventsSkipped++;
return;
}
// Add to processed events (with TTL cleanup)
this.processedEvents.add(eventId);
setTimeout(() => this.processedEvents.delete(eventId), 300000); // 5 minute TTL
// Route event based on operation type and changed fields
switch (operationType) {
case 'insert':
await this.handleOrderCreated(fullDocument);
break;
case 'update':
await this.handleOrderUpdated(fullDocument, updateDescription, change.fullDocumentBeforeChange);
break;
case 'delete':
await this.handleOrderDeleted(documentKey);
break;
case 'replace':
await this.handleOrderReplaced(fullDocument, change.fullDocumentBeforeChange);
break;
default:
console.warn(`Unhandled operation type: ${operationType}`);
}
}
async handleOrderCreated(order) {
console.log(`New order created: ${order.orderNumber}`);
// Parallel processing of order creation events
const creationTasks = [
// Send order confirmation email
this.sendOrderConfirmation(order),
// Reserve inventory for ordered items
this.reserveInventory(order),
// Create payment authorization
this.authorizePayment(order),
// Notify fulfillment center
this.notifyFulfillmentCenter(order),
// Update customer metrics
this.updateCustomerMetrics(order),
// Log analytics event
this.logAnalyticsEvent('order_created', order),
// Check for fraud indicators
this.performFraudCheck(order)
];
try {
const results = await Promise.allSettled(creationTasks);
// Handle any failed tasks
const failedTasks = results.filter(result => result.status === 'rejected');
if (failedTasks.length > 0) {
console.error(`${failedTasks.length} tasks failed for order creation:`, failedTasks);
await this.handlePartialFailure(order, failedTasks);
}
console.log(`Order creation processing completed: ${order.orderNumber}`);
} catch (error) {
console.error(`Error processing order creation: ${error}`);
throw error;
}
}
async handleOrderUpdated(order, updateDescription, previousOrder) {
console.log(`Order updated: ${order.orderNumber}`);
const updatedFields = Object.keys(updateDescription.updatedFields || {});
const removedFields = updateDescription.removedFields || [];
// Process specific field changes
for (const fieldPath of updatedFields) {
await this.processFieldChange(order, previousOrder, fieldPath, updateDescription.updatedFields[fieldPath]);
}
// Handle removed fields
for (const fieldPath of removedFields) {
await this.processFieldRemoval(order, previousOrder, fieldPath);
}
// Log comprehensive change event
await this.logAnalyticsEvent('order_updated', {
order,
changedFields: updatedFields,
removedFields: removedFields
});
}
async processFieldChange(order, previousOrder, fieldPath, newValue) {
console.log(`Field changed: ${fieldPath} = ${JSON.stringify(newValue)}`);
// Route processing based on changed field
if (fieldPath === 'status') {
await this.handleStatusChange(order, previousOrder);
} else if (fieldPath.startsWith('payment.')) {
await this.handlePaymentChange(order, previousOrder, fieldPath, newValue);
} else if (fieldPath.startsWith('fulfillment.')) {
await this.handleFulfillmentChange(order, previousOrder, fieldPath, newValue);
} else if (fieldPath.startsWith('shipping.')) {
await this.handleShippingChange(order, previousOrder, fieldPath, newValue);
} else if (fieldPath.startsWith('items.')) {
await this.handleItemChange(order, previousOrder, fieldPath, newValue);
}
}
async handleStatusChange(order, previousOrder) {
const newStatus = order.status;
const previousStatus = previousOrder?.status;
console.log(`Order status changed: ${previousStatus} → ${newStatus}`);
// Status-specific processing
switch (newStatus) {
case 'confirmed':
await Promise.all([
this.processPayment(order),
this.sendStatusUpdateEmail(order, 'order_confirmed'),
this.createFulfillmentTasks(order)
]);
break;
case 'processing':
await Promise.all([
this.notifyWarehouse(order),
this.updateInventoryReservations(order),
this.sendStatusUpdateEmail(order, 'order_processing')
]);
break;
case 'shipped':
await Promise.all([
this.generateTrackingInfo(order),
this.sendShippingNotification(order),
this.releaseInventoryReservations(order),
this.updateDeliveryEstimate(order)
]);
break;
case 'delivered':
await Promise.all([
this.sendDeliveryConfirmation(order),
this.triggerReviewRequest(order),
this.updateCustomerLoyaltyPoints(order),
this.closeOrderInSystems(order)
]);
break;
case 'cancelled':
await Promise.all([
this.processRefund(order),
this.releaseInventoryReservations(order),
this.sendCancellationNotification(order),
this.updateAnalytics(order, 'cancelled')
]);
break;
}
}
async handlePaymentChange(order, previousOrder, fieldPath, newValue) {
console.log(`Payment change: ${fieldPath} = ${newValue}`);
if (fieldPath === 'payment.status') {
switch (newValue) {
case 'authorized':
await this.handlePaymentAuthorized(order);
break;
case 'captured':
await this.handlePaymentCaptured(order);
break;
case 'failed':
await this.handlePaymentFailed(order);
break;
case 'refunded':
await this.handlePaymentRefunded(order);
break;
}
}
}
async handleShippingChange(order, previousOrder, fieldPath, newValue) {
if (fieldPath === 'shipping.trackingNumber' && newValue) {
console.log(`Tracking number assigned: ${newValue}`);
// Send tracking information to customer
await Promise.all([
this.sendTrackingInfo(order),
this.setupDeliveryNotifications(order),
this.updateShippingPartnerSystems(order)
]);
}
}
// Implementation of helper methods for event processing
async sendOrderConfirmation(order) {
console.log(`Sending order confirmation for ${order.orderNumber}`);
// Simulate email service call
const emailData = {
to: order.customer.email,
subject: `Order Confirmation - ${order.orderNumber}`,
template: 'order_confirmation',
data: {
orderNumber: order.orderNumber,
customerName: order.customer.name,
items: order.items,
total: order.financial.total,
estimatedDelivery: this.calculateDeliveryDate(order)
}
};
// Would integrate with actual email service
await this.sendEmail(emailData);
}
async reserveInventory(order) {
console.log(`Reserving inventory for order ${order.orderNumber}`);
const inventoryUpdates = order.items.map(item => ({
productId: item.productId,
sku: item.sku,
quantity: item.quantity,
reservedFor: order._id,
reservedAt: new Date()
}));
// Update inventory collection
const inventory = this.db.collection('inventory');
for (const update of inventoryUpdates) {
await inventory.updateOne(
{
productId: update.productId,
availableQuantity: { $gte: update.quantity }
},
{
$inc: {
availableQuantity: -update.quantity,
reservedQuantity: update.quantity
},
$push: {
reservations: {
orderId: update.reservedFor,
quantity: update.quantity,
reservedAt: update.reservedAt
}
}
}
);
}
}
async authorizePayment(order) {
console.log(`Authorizing payment for order ${order.orderNumber}`);
// Simulate payment processor call
const paymentResult = await this.callPaymentProcessor({
action: 'authorize',
amount: order.financial.total,
currency: order.financial.currency,
paymentMethod: order.payment.method,
paymentIntentId: order.payment.processor.paymentIntentId
});
if (paymentResult.success) {
// Update order with payment authorization
await this.orders.updateOne(
{ _id: order._id },
{
$set: {
'payment.status': 'authorized',
'payment.processor.chargeId': paymentResult.chargeId,
'audit.updatedAt': new Date(),
'audit.updatedBy': 'payment_processor'
},
$inc: { 'audit.version': 1 }
}
);
} else {
throw new Error(`Payment authorization failed: ${paymentResult.error}`);
}
}
// Helper methods (simplified implementations)
async sendEmail(emailData) {
console.log(`Email sent: ${emailData.subject} to ${emailData.to}`);
}
async callPaymentProcessor(request) {
// Simulate payment processor response
await new Promise(resolve => setTimeout(resolve, 100));
return {
success: true,
chargeId: `ch_${Math.random().toString(36).substr(2, 9)}`
};
}
calculateDeliveryDate(order) {
const baseDate = new Date();
const daysToAdd = order.shipping.method === 'express' ? 2 :
order.shipping.method === 'overnight' ? 1 : 5;
baseDate.setDate(baseDate.getDate() + daysToAdd);
return baseDate;
}
async logAnalyticsEvent(eventType, data) {
const analytics = this.db.collection('analytics_events');
await analytics.insertOne({
eventType,
data,
timestamp: new Date(),
source: 'change_stream_processor'
});
}
async handleProcessingError(change, error) {
console.error(`Processing error for change ${change._id}:`, error);
// Log error for monitoring
const errorLog = this.db.collection('processing_errors');
await errorLog.insertOne({
changeId: change._id,
operationType: change.operationType,
documentKey: change.documentKey,
error: {
message: error.message,
stack: error.stack
},
timestamp: new Date(),
retryCount: 0
});
}
async handleChangeStreamError(error) {
console.error('Change stream error:', error);
// Wait before attempting reconnection
await new Promise(resolve => setTimeout(resolve, 5000));
// Restart change stream processing
await this.startChangeStreamProcessing();
}
getProcessingStatistics() {
return {
...this.stats,
resumeToken: this.resumeToken,
processedEventsInMemory: this.processedEvents.size
};
}
}
// Multi-service change stream coordination
class DistributedOrderEventSystem {
constructor(db) {
this.db = db;
this.serviceProcessors = new Map();
this.eventBus = new Map(); // Simple in-memory event bus
this.globalStats = {
totalEventsProcessed: 0,
servicesActive: 0,
lastProcessingTime: null
};
}
async setupDistributedProcessing() {
console.log('Setting up distributed order event processing...');
// Create specialized processors for different services
const services = [
'inventory-service',
'payment-service',
'fulfillment-service',
'notification-service',
'analytics-service',
'customer-service'
];
for (const serviceName of services) {
const processor = new ServiceSpecificProcessor(this.db, serviceName, this);
await processor.initialize();
this.serviceProcessors.set(serviceName, processor);
}
console.log(`Distributed processing setup completed with ${services.length} services`);
}
async publishEvent(eventType, data, source) {
console.log(`Publishing event: ${eventType} from ${source}`);
// Add to event bus
if (!this.eventBus.has(eventType)) {
this.eventBus.set(eventType, []);
}
const event = {
id: new ObjectId(),
type: eventType,
data,
source,
timestamp: new Date(),
processed: new Set()
};
this.eventBus.get(eventType).push(event);
// Notify interested services
for (const [serviceName, processor] of this.serviceProcessors.entries()) {
if (processor.isInterestedInEvent(eventType)) {
await processor.processEvent(event);
}
}
this.globalStats.totalEventsProcessed++;
this.globalStats.lastProcessingTime = new Date();
}
getGlobalStatistics() {
const serviceStats = {};
for (const [serviceName, processor] of this.serviceProcessors.entries()) {
serviceStats[serviceName] = processor.getStatistics();
}
return {
global: this.globalStats,
services: serviceStats,
eventBusSize: Array.from(this.eventBus.values()).reduce((total, events) => total + events.length, 0)
};
}
}
// Service-specific processor for handling events relevant to each microservice
class ServiceSpecificProcessor {
constructor(db, serviceName, eventSystem) {
this.db = db;
this.serviceName = serviceName;
this.eventSystem = eventSystem;
this.eventFilters = new Map();
this.stats = {
eventsProcessed: 0,
eventsFiltered: 0,
lastProcessedAt: null
};
this.setupEventFilters();
}
setupEventFilters() {
// Define which events each service cares about
const filterConfigs = {
'inventory-service': [
'order_created',
'order_cancelled',
'item_status_changed'
],
'payment-service': [
'order_created',
'order_confirmed',
'order_cancelled',
'payment_status_changed'
],
'fulfillment-service': [
'order_confirmed',
'payment_authorized',
'inventory_reserved'
],
'notification-service': [
'order_created',
'status_changed',
'payment_status_changed',
'shipping_updated'
],
'analytics-service': [
'*' // Analytics service processes all events
],
'customer-service': [
'order_created',
'order_delivered',
'order_cancelled'
]
};
const filters = filterConfigs[this.serviceName] || [];
filters.forEach(filter => this.eventFilters.set(filter, true));
}
async initialize() {
console.log(`Initializing ${this.serviceName} processor...`);
// Service-specific initialization
switch (this.serviceName) {
case 'inventory-service':
await this.initializeInventoryTracking();
break;
case 'payment-service':
await this.initializePaymentProcessing();
break;
// ... other services
}
console.log(`${this.serviceName} processor initialized`);
}
isInterestedInEvent(eventType) {
return this.eventFilters.has('*') || this.eventFilters.has(eventType);
}
async processEvent(event) {
if (!this.isInterestedInEvent(event.type)) {
this.stats.eventsFiltered++;
return;
}
console.log(`${this.serviceName} processing event: ${event.type}`);
try {
// Service-specific event processing
await this.handleServiceEvent(event);
event.processed.add(this.serviceName);
this.stats.eventsProcessed++;
this.stats.lastProcessedAt = new Date();
} catch (error) {
console.error(`${this.serviceName} error processing event ${event.id}:`, error);
throw error;
}
}
async handleServiceEvent(event) {
// Dispatch to service-specific handlers
const handlerMethod = `handle${event.type.split('_').map(word =>
word.charAt(0).toUpperCase() + word.slice(1)
).join('')}`;
if (typeof this[handlerMethod] === 'function') {
await this[handlerMethod](event);
} else {
console.warn(`No handler found: ${handlerMethod} in ${this.serviceName}`);
}
}
// Service-specific event handlers
async handleOrderCreated(event) {
if (this.serviceName === 'inventory-service') {
await this.reserveInventoryForOrder(event.data);
} else if (this.serviceName === 'notification-service') {
await this.sendOrderConfirmationEmail(event.data);
}
}
async handleStatusChanged(event) {
if (this.serviceName === 'customer-service') {
await this.updateCustomerOrderHistory(event.data);
}
}
// Helper methods for specific services
async reserveInventoryForOrder(order) {
console.log(`Reserving inventory for order: ${order.orderNumber}`);
// Implementation would interact with inventory management system
}
async sendOrderConfirmationEmail(order) {
console.log(`Sending confirmation email for order: ${order.orderNumber}`);
// Implementation would use email service
}
async initializeInventoryTracking() {
// Setup inventory-specific collections and indexes
const inventory = this.db.collection('inventory');
await inventory.createIndex({ productId: 1, warehouse: 1 });
}
async initializePaymentProcessing() {
// Setup payment-specific configurations
console.log('Payment service initialized with fraud detection enabled');
}
getStatistics() {
return this.stats;
}
}
// Benefits of MongoDB Change Streams:
// - Real-time change detection with minimal latency
// - Native event sourcing capabilities without complex triggers
// - Resumable streams with automatic recovery from failures
// - Ordered event processing with guaranteed delivery
// - Fine-grained filtering and transformation pipelines
// - Horizontal scaling across multiple application instances
// - Integration with MongoDB's replica set and sharding architecture
// - No polling overhead or resource waste
// - Built-in clustering and high availability support
// - Simple integration with existing MongoDB applications
module.exports = {
setupOrderManagement,
OrderEventProcessor,
DistributedOrderEventSystem,
ServiceSpecificProcessor
};
Understanding MongoDB Change Streams Architecture
Change Stream Processing Patterns
MongoDB Change Streams operate at the replica set level and provide several key capabilities for event-driven architectures:
// Advanced change stream patterns and configurations
class AdvancedChangeStreamManager {
constructor(client) {
this.client = client;
this.db = client.db('ecommerce');
this.changeStreams = new Map();
this.resumeTokens = new Map();
this.errorHandlers = new Map();
}
async setupMultiCollectionStreams() {
console.log('Setting up multi-collection change streams...');
// 1. Collection-specific streams with targeted processing
const collectionConfigs = [
{
name: 'orders',
pipeline: [
{
$match: {
$or: [
{ operationType: 'insert' },
{ operationType: 'update', 'updateDescription.updatedFields.status': { $exists: true } },
{ operationType: 'update', 'updateDescription.updatedFields.payment.status': { $exists: true } }
]
}
}
],
handler: this.handleOrderChanges.bind(this)
},
{
name: 'inventory',
pipeline: [
{
$match: {
$or: [
{ operationType: 'update', 'updateDescription.updatedFields.availableQuantity': { $exists: true } },
{ operationType: 'update', 'updateDescription.updatedFields.reservedQuantity': { $exists: true } }
]
}
}
],
handler: this.handleInventoryChanges.bind(this)
},
{
name: 'customers',
pipeline: [
{
$match: {
operationType: { $in: ['insert', 'update'] },
$or: [
{ 'fullDocument.loyaltyTier': { $exists: true } },
{ 'updateDescription.updatedFields.loyaltyTier': { $exists: true } },
{ 'updateDescription.updatedFields.preferences': { $exists: true } }
]
}
}
],
handler: this.handleCustomerChanges.bind(this)
}
];
// Start streams for each collection
for (const config of collectionConfigs) {
await this.startCollectionStream(config);
}
// 2. Database-level change stream for cross-collection events
await this.startDatabaseStream();
console.log(`Started ${collectionConfigs.length + 1} change streams`);
}
async startCollectionStream(config) {
const collection = this.db.collection(config.name);
const resumeToken = this.resumeTokens.get(config.name);
const options = {
pipeline: config.pipeline,
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable',
maxAwaitTimeMS: 1000,
startAfter: resumeToken
};
try {
const changeStream = collection.watch(options);
this.changeStreams.set(config.name, changeStream);
// Process changes asynchronously
this.processChangeStream(config.name, changeStream, config.handler);
} catch (error) {
console.error(`Error starting stream for ${config.name}:`, error);
this.scheduleStreamRestart(config);
}
}
async startDatabaseStream() {
// Database-level stream for cross-collection coordination
const pipeline = [
{
$match: {
// Monitor for significant cross-collection events
$or: [
{
operationType: 'insert',
'fullDocument.metadata.requiresCrossCollectionSync': true
},
{
operationType: 'update',
'updateDescription.updatedFields.syncRequired': { $exists: true }
}
]
}
},
{
$addFields: {
// Add processing metadata
collectionName: '$ns.coll',
databaseName: '$ns.db',
changeSignature: {
$concat: [
'$ns.coll', '_',
'$operationType', '_',
{ $toString: '$clusterTime' }
]
}
}
}
];
const options = {
pipeline,
fullDocument: 'updateLookup',
maxAwaitTimeMS: 2000
};
const dbStream = this.db.watch(options);
this.changeStreams.set('_database', dbStream);
this.processChangeStream('_database', dbStream, this.handleDatabaseChanges.bind(this));
}
async processChangeStream(streamName, changeStream, handler) {
console.log(`Processing change stream: ${streamName}`);
try {
for await (const change of changeStream) {
try {
// Store resume token
this.resumeTokens.set(streamName, change._id);
// Process the change
await handler(change);
// Persist resume token for recovery
await this.persistResumeToken(streamName, change._id);
} catch (processingError) {
console.error(`Error processing change in ${streamName}:`, processingError);
await this.handleProcessingError(streamName, change, processingError);
}
}
} catch (streamError) {
console.error(`Stream error in ${streamName}:`, streamError);
await this.handleStreamError(streamName, streamError);
}
}
async handleOrderChanges(change) {
console.log(`Order change detected: ${change.operationType}`);
const { operationType, fullDocument, documentKey, updateDescription } = change;
// Route based on change type and affected fields
if (operationType === 'insert') {
await this.processNewOrder(fullDocument);
} else if (operationType === 'update') {
const updatedFields = Object.keys(updateDescription.updatedFields || {});
// Process specific field updates
if (updatedFields.includes('status')) {
await this.processOrderStatusChange(fullDocument, updateDescription);
}
if (updatedFields.some(field => field.startsWith('payment.'))) {
await this.processPaymentChange(fullDocument, updateDescription);
}
if (updatedFields.some(field => field.startsWith('fulfillment.'))) {
await this.processFulfillmentChange(fullDocument, updateDescription);
}
}
}
async handleInventoryChanges(change) {
console.log(`Inventory change detected: ${change.operationType}`);
const { fullDocument, updateDescription } = change;
const updatedFields = Object.keys(updateDescription.updatedFields || {});
// Check for low stock conditions
if (updatedFields.includes('availableQuantity')) {
const newQuantity = updateDescription.updatedFields.availableQuantity;
if (newQuantity <= fullDocument.reorderLevel) {
await this.triggerReorderAlert(fullDocument);
}
}
// Propagate inventory changes to dependent systems
await this.syncInventoryWithExternalSystems(fullDocument, updatedFields);
}
async handleCustomerChanges(change) {
console.log(`Customer change detected: ${change.operationType}`);
const { fullDocument, updateDescription } = change;
// Handle loyalty tier changes
if (updateDescription?.updatedFields?.loyaltyTier) {
await this.processLoyaltyTierChange(fullDocument, updateDescription);
}
// Handle preference updates
if (updateDescription?.updatedFields?.preferences) {
await this.updatePersonalizationEngine(fullDocument);
}
}
async handleDatabaseChanges(change) {
console.log(`Database-level change: ${change.collectionName}.${change.operationType}`);
// Handle cross-collection synchronization events
await this.coordinateCrossCollectionSync(change);
}
// Resilience and error handling
async handleStreamError(streamName, error) {
console.error(`Stream ${streamName} encountered error:`, error);
// Implement exponential backoff for reconnection
const baseDelay = 1000; // 1 second
const maxRetries = 5;
let retryCount = 0;
while (retryCount < maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount);
console.log(`Attempting to restart ${streamName} in ${delay}ms (retry ${retryCount + 1})`);
await new Promise(resolve => setTimeout(resolve, delay));
try {
// Restart the specific stream
await this.restartStream(streamName);
console.log(`Successfully restarted ${streamName}`);
break;
} catch (restartError) {
console.error(`Failed to restart ${streamName}:`, restartError);
retryCount++;
}
}
if (retryCount >= maxRetries) {
console.error(`Failed to restart ${streamName} after ${maxRetries} attempts`);
// Implement alerting for operations team
await this.sendOperationalAlert(`Critical: Change stream ${streamName} failed to restart`);
}
}
async restartStream(streamName) {
// Close existing stream if it exists
const existingStream = this.changeStreams.get(streamName);
if (existingStream) {
try {
await existingStream.close();
} catch (closeError) {
console.warn(`Error closing ${streamName}:`, closeError);
}
this.changeStreams.delete(streamName);
}
// Restart based on stream type
if (streamName === '_database') {
await this.startDatabaseStream();
} else {
// Find and restart collection stream
const config = this.getCollectionConfig(streamName);
if (config) {
await this.startCollectionStream(config);
}
}
}
async persistResumeToken(streamName, resumeToken) {
// Store resume tokens in MongoDB for crash recovery
const tokenCollection = this.db.collection('change_stream_tokens');
await tokenCollection.updateOne(
{ streamName },
{
$set: {
resumeToken,
lastUpdated: new Date(),
streamName
}
},
{ upsert: true }
);
}
async loadPersistedResumeTokens() {
console.log('Loading persisted resume tokens...');
const tokenCollection = this.db.collection('change_stream_tokens');
const tokens = await tokenCollection.find({}).toArray();
for (const token of tokens) {
this.resumeTokens.set(token.streamName, token.resumeToken);
console.log(`Loaded resume token for ${token.streamName}`);
}
}
// Performance monitoring and optimization
async getChangeStreamMetrics() {
const metrics = {
activeStreams: this.changeStreams.size,
resumeTokens: this.resumeTokens.size,
streamStatus: {},
systemHealth: await this.checkSystemHealth()
};
// Check status of each stream
for (const [streamName, stream] of this.changeStreams.entries()) {
metrics.streamStatus[streamName] = {
isActive: !stream.closed,
hasResumeToken: this.resumeTokens.has(streamName)
};
}
return metrics;
}
async checkSystemHealth() {
try {
// Check MongoDB replica set status
const replicaSetStatus = await this.client.db('admin').admin().replSetGetStatus();
const healthMetrics = {
replicaSetHealthy: replicaSetStatus.ok === 1,
primaryNode: replicaSetStatus.members.find(member => member.state === 1)?.name,
secondaryNodes: replicaSetStatus.members.filter(member => member.state === 2).length,
oplogSize: await this.getOplogSize(),
changeStreamSupported: true
};
return healthMetrics;
} catch (error) {
console.error('Error checking system health:', error);
return {
replicaSetHealthy: false,
error: error.message
};
}
}
async getOplogSize() {
// Check oplog size to ensure sufficient retention for change streams
const oplog = this.client.db('local').collection('oplog.rs');
const stats = await oplog.stats();
return {
sizeBytes: stats.size,
sizeMB: Math.round(stats.size / 1024 / 1024),
maxSizeBytes: stats.maxSize,
maxSizeMB: Math.round(stats.maxSize / 1024 / 1024),
utilizationPercent: Math.round((stats.size / stats.maxSize) * 100)
};
}
// Cleanup and shutdown
async shutdown() {
console.log('Shutting down change stream manager...');
const shutdownPromises = [];
// Close all active streams
for (const [streamName, stream] of this.changeStreams.entries()) {
console.log(`Closing stream: ${streamName}`);
shutdownPromises.push(
stream.close().catch(error =>
console.warn(`Error closing ${streamName}:`, error)
)
);
}
await Promise.allSettled(shutdownPromises);
// Clear internal state
this.changeStreams.clear();
this.resumeTokens.clear();
console.log('Change stream manager shutdown complete');
}
}
// Helper methods for event processing
async function processNewOrder(order) {
console.log(`Processing new order: ${order.orderNumber}`);
// Comprehensive order processing workflow
const processingTasks = [
validateOrderData(order),
checkInventoryAvailability(order),
validatePaymentMethod(order),
calculateShippingOptions(order),
applyPromotionsAndDiscounts(order),
createFulfillmentWorkflow(order),
sendCustomerNotifications(order),
updateAnalyticsAndReporting(order)
];
const results = await Promise.allSettled(processingTasks);
// Handle any failed tasks
const failures = results.filter(result => result.status === 'rejected');
if (failures.length > 0) {
console.error(`${failures.length} tasks failed for order ${order.orderNumber}`);
await handleOrderProcessingFailures(order, failures);
}
}
async function triggerReorderAlert(inventoryItem) {
console.log(`Low stock alert: ${inventoryItem.sku} - ${inventoryItem.availableQuantity} remaining`);
// Create automatic reorder if conditions are met
if (inventoryItem.autoReorder && inventoryItem.availableQuantity <= inventoryItem.criticalLevel) {
const reorderQuantity = inventoryItem.maxStock - inventoryItem.availableQuantity;
await createPurchaseOrder({
productId: inventoryItem.productId,
sku: inventoryItem.sku,
quantity: reorderQuantity,
supplier: inventoryItem.preferredSupplier,
urgency: 'high',
reason: 'automated_reorder_low_stock'
});
}
}
// Example helper implementations
async function validateOrderData(order) {
// Comprehensive order validation
const validationResults = {
customerValid: await validateCustomer(order.customerId),
itemsValid: await validateOrderItems(order.items),
addressValid: await validateShippingAddress(order.shipping.address),
paymentValid: await validatePaymentInfo(order.payment)
};
const isValid = Object.values(validationResults).every(result => result === true);
if (!isValid) {
throw new Error(`Order validation failed: ${JSON.stringify(validationResults)}`);
}
}
async function createPurchaseOrder(orderData) {
console.log(`Creating purchase order: ${orderData.sku} x ${orderData.quantity}`);
// Implementation would create purchase order in procurement system
}
async function sendOperationalAlert(message) {
console.error(`OPERATIONAL ALERT: ${message}`);
// Implementation would integrate with alerting system (PagerDuty, Slack, etc.)
}
SQL-Style Change Stream Operations with QueryLeaf
QueryLeaf provides SQL-familiar syntax for MongoDB Change Stream operations:
-- QueryLeaf change stream operations with SQL-familiar syntax
-- Create change stream listeners with SQL-style syntax
CREATE CHANGE STREAM order_status_changes
ON orders
WHERE
operation_type IN ('update', 'insert')
AND (
changed_fields CONTAINS 'status'
OR changed_fields CONTAINS 'payment.status'
)
WITH (
full_document = 'update_lookup',
full_document_before_change = 'when_available',
max_await_time = '1 second',
batch_size = 50
);
-- Multi-collection change stream with filtering
CREATE CHANGE STREAM inventory_and_orders
ON DATABASE ecommerce
WHERE
collection_name IN ('orders', 'inventory', 'products')
AND (
(collection_name = 'orders' AND operation_type = 'insert')
OR (collection_name = 'inventory' AND changed_fields CONTAINS 'availableQuantity')
OR (collection_name = 'products' AND changed_fields CONTAINS 'price')
)
WITH (
resume_after = '8264BEB9F3000000012B0229296E04'
);
-- Real-time order processing with change stream triggers
CREATE TRIGGER process_order_changes
ON CHANGE STREAM order_status_changes
FOR EACH CHANGE AS
BEGIN
-- Route processing based on change type
CASE change.operation_type
WHEN 'insert' THEN
-- New order created
CALL process_new_order(change.full_document);
-- Send notifications
INSERT INTO notification_queue (
recipient,
type,
message,
data
)
VALUES (
change.full_document.customer.email,
'order_confirmation',
'Your order has been received',
change.full_document
);
WHEN 'update' THEN
-- Order updated - check what changed
IF change.changed_fields CONTAINS 'status' THEN
CALL process_status_change(
change.full_document,
change.update_description.updated_fields.status
);
END IF;
IF change.changed_fields CONTAINS 'payment.status' THEN
CALL process_payment_status_change(
change.full_document,
change.update_description.updated_fields['payment.status']
);
END IF;
END CASE;
-- Update processing metrics
UPDATE change_stream_metrics
SET
events_processed = events_processed + 1,
last_processed_at = CURRENT_TIMESTAMP
WHERE stream_name = 'order_status_changes';
END;
-- Change stream analytics and monitoring
WITH change_stream_analytics AS (
SELECT
stream_name,
operation_type,
collection_name,
DATE_TRUNC('minute', change_timestamp) as minute_bucket,
COUNT(*) as change_count,
COUNT(DISTINCT document_key._id) as unique_documents,
-- Processing latency analysis
AVG(processing_time_ms) as avg_processing_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY processing_time_ms) as p95_processing_time,
-- Change characteristics
COUNT(*) FILTER (WHERE operation_type = 'insert') as insert_count,
COUNT(*) FILTER (WHERE operation_type = 'update') as update_count,
COUNT(*) FILTER (WHERE operation_type = 'delete') as delete_count,
-- Field change patterns
STRING_AGG(DISTINCT changed_fields, ',') as common_changed_fields,
-- Error tracking
COUNT(*) FILTER (WHERE processing_status = 'error') as error_count,
COUNT(*) FILTER (WHERE processing_status = 'retry') as retry_count
FROM change_stream_events
WHERE change_timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY stream_name, operation_type, collection_name, minute_bucket
),
stream_performance AS (
SELECT
stream_name,
SUM(change_count) as total_changes,
AVG(avg_processing_time) as overall_avg_processing_time,
MAX(p95_processing_time) as max_p95_processing_time,
-- Throughput analysis
SUM(change_count) / 60.0 as changes_per_second,
-- Error rates
SUM(error_count) as total_errors,
(SUM(error_count)::numeric / SUM(change_count)) * 100 as error_rate_percent,
-- Change type distribution
SUM(insert_count) as total_inserts,
SUM(update_count) as total_updates,
SUM(delete_count) as total_deletes,
-- Field change frequency
COUNT(DISTINCT common_changed_fields) as unique_field_patterns,
-- Performance assessment
CASE
WHEN AVG(avg_processing_time) > 1000 THEN 'SLOW'
WHEN AVG(avg_processing_time) > 500 THEN 'MODERATE'
ELSE 'FAST'
END as performance_rating,
-- Health indicators
CASE
WHEN (SUM(error_count)::numeric / SUM(change_count)) > 0.05 THEN 'UNHEALTHY'
WHEN (SUM(error_count)::numeric / SUM(change_count)) > 0.01 THEN 'WARNING'
ELSE 'HEALTHY'
END as health_status
FROM change_stream_analytics
GROUP BY stream_name
)
SELECT
sp.stream_name,
sp.total_changes,
ROUND(sp.changes_per_second, 2) as changes_per_sec,
ROUND(sp.overall_avg_processing_time, 1) as avg_processing_ms,
ROUND(sp.max_p95_processing_time, 1) as max_p95_ms,
sp.performance_rating,
sp.health_status,
-- Change breakdown
sp.total_inserts,
sp.total_updates,
sp.total_deletes,
-- Error analysis
sp.total_errors,
ROUND(sp.error_rate_percent, 2) as error_rate_pct,
-- Field change patterns
sp.unique_field_patterns,
-- Recommendations
CASE
WHEN sp.performance_rating = 'SLOW' THEN 'Optimize change processing logic or increase resources'
WHEN sp.error_rate_percent > 5 THEN 'Investigate error patterns and improve error handling'
WHEN sp.changes_per_second > 1000 THEN 'Consider stream partitioning for better throughput'
ELSE 'Performance within acceptable parameters'
END as recommendation
FROM stream_performance sp
ORDER BY sp.total_changes DESC;
-- Advanced change stream query patterns
CREATE VIEW real_time_order_insights AS
WITH order_changes AS (
SELECT
full_document.*,
change_timestamp,
operation_type,
changed_fields,
-- Calculate order lifecycle timing
CASE
WHEN operation_type = 'insert' THEN 'order_created'
WHEN changed_fields CONTAINS 'status' THEN
CONCAT('status_changed_to_', full_document.status)
WHEN changed_fields CONTAINS 'payment.status' THEN
CONCAT('payment_', full_document.payment.status)
ELSE 'other_update'
END as change_event_type,
-- Time-based analytics
DATE_TRUNC('hour', change_timestamp) as hour_bucket,
EXTRACT(DOW FROM change_timestamp) as day_of_week,
EXTRACT(HOUR FROM change_timestamp) as hour_of_day,
-- Order value categories
CASE
WHEN full_document.financial.total >= 500 THEN 'high_value'
WHEN full_document.financial.total >= 100 THEN 'medium_value'
ELSE 'low_value'
END as order_value_category,
-- Customer segment analysis
full_document.customer.loyaltyTier as customer_segment,
-- Geographic analysis
full_document.shipping.address.state as shipping_state,
full_document.shipping.address.country as shipping_country
FROM CHANGE_STREAM(orders)
WHERE change_timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
),
order_metrics AS (
SELECT
hour_bucket,
day_of_week,
hour_of_day,
change_event_type,
order_value_category,
customer_segment,
shipping_state,
COUNT(*) as event_count,
COUNT(DISTINCT full_document._id) as unique_orders,
AVG(full_document.financial.total) as avg_order_value,
SUM(full_document.financial.total) as total_order_value,
-- Conversion funnel analysis
COUNT(*) FILTER (WHERE change_event_type = 'order_created') as orders_created,
COUNT(*) FILTER (WHERE change_event_type = 'status_changed_to_confirmed') as orders_confirmed,
COUNT(*) FILTER (WHERE change_event_type = 'status_changed_to_shipped') as orders_shipped,
COUNT(*) FILTER (WHERE change_event_type = 'status_changed_to_delivered') as orders_delivered,
COUNT(*) FILTER (WHERE change_event_type = 'status_changed_to_cancelled') as orders_cancelled,
-- Payment analysis
COUNT(*) FILTER (WHERE change_event_type = 'payment_authorized') as payments_authorized,
COUNT(*) FILTER (WHERE change_event_type = 'payment_captured') as payments_captured,
COUNT(*) FILTER (WHERE change_event_type = 'payment_failed') as payments_failed,
-- Customer behavior
COUNT(DISTINCT full_document.customer.customerId) as unique_customers,
AVG(ARRAY_LENGTH(full_document.items, 1)) as avg_items_per_order
FROM order_changes
GROUP BY
hour_bucket, day_of_week, hour_of_day, change_event_type,
order_value_category, customer_segment, shipping_state
)
SELECT
hour_bucket,
change_event_type,
order_value_category,
customer_segment,
event_count,
unique_orders,
ROUND(avg_order_value, 2) as avg_order_value,
ROUND(total_order_value, 2) as total_order_value,
-- Conversion rates
CASE
WHEN orders_created > 0 THEN
ROUND((orders_confirmed::numeric / orders_created) * 100, 1)
ELSE 0
END as confirmation_rate_pct,
CASE
WHEN orders_confirmed > 0 THEN
ROUND((orders_shipped::numeric / orders_confirmed) * 100, 1)
ELSE 0
END as fulfillment_rate_pct,
CASE
WHEN orders_shipped > 0 THEN
ROUND((orders_delivered::numeric / orders_shipped) * 100, 1)
ELSE 0
END as delivery_rate_pct,
-- Payment success rates
CASE
WHEN payments_authorized > 0 THEN
ROUND((payments_captured::numeric / payments_authorized) * 100, 1)
ELSE 0
END as payment_success_rate_pct,
-- Business insights
unique_customers,
ROUND(avg_items_per_order, 1) as avg_items_per_order,
-- Time-based patterns
day_of_week,
hour_of_day,
-- Geographic insights
shipping_state,
-- Performance indicators
CASE
WHEN change_event_type = 'order_created' AND event_count > 100 THEN 'HIGH_VOLUME'
WHEN change_event_type = 'payment_failed' AND event_count > 10 THEN 'PAYMENT_ISSUES'
WHEN change_event_type = 'status_changed_to_cancelled' AND event_count > 20 THEN 'HIGH_CANCELLATION'
ELSE 'NORMAL'
END as alert_status
FROM order_metrics
WHERE event_count > 0
ORDER BY hour_bucket DESC, event_count DESC;
-- Resume token management for change stream reliability
CREATE TABLE change_stream_resume_tokens (
stream_name VARCHAR(255) PRIMARY KEY,
resume_token TEXT NOT NULL,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Stream configuration
collection_name VARCHAR(255),
database_name VARCHAR(255),
filter_pipeline JSONB,
-- Monitoring
events_processed BIGINT DEFAULT 0,
last_event_timestamp TIMESTAMP,
stream_status VARCHAR(50) DEFAULT 'active',
-- Performance tracking
avg_processing_latency_ms INTEGER,
last_error_message TEXT,
last_error_timestamp TIMESTAMP,
consecutive_errors INTEGER DEFAULT 0
);
-- Automatic resume token persistence
CREATE TRIGGER update_resume_tokens
AFTER INSERT OR UPDATE ON change_stream_events
FOR EACH ROW
EXECUTE FUNCTION update_stream_resume_token();
-- Change stream health monitoring
SELECT
cst.stream_name,
cst.collection_name,
cst.events_processed,
cst.stream_status,
-- Time since last activity
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - cst.last_event_timestamp)) / 60 as minutes_since_last_event,
-- Performance metrics
cst.avg_processing_latency_ms,
cst.consecutive_errors,
-- Health assessment
CASE
WHEN cst.stream_status != 'active' THEN 'INACTIVE'
WHEN cst.consecutive_errors >= 5 THEN 'FAILING'
WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - cst.last_event_timestamp)) > 300 THEN 'STALE'
WHEN cst.avg_processing_latency_ms > 1000 THEN 'SLOW'
ELSE 'HEALTHY'
END as health_status,
-- Recovery information
cst.resume_token,
cst.last_updated,
-- Error details
cst.last_error_message,
cst.last_error_timestamp
FROM change_stream_resume_tokens cst
ORDER BY
CASE health_status
WHEN 'FAILING' THEN 1
WHEN 'INACTIVE' THEN 2
WHEN 'STALE' THEN 3
WHEN 'SLOW' THEN 4
ELSE 5
END,
cst.events_processed DESC;
-- QueryLeaf change stream features provide:
-- 1. SQL-familiar syntax for MongoDB Change Stream operations
-- 2. Real-time event processing with familiar trigger patterns
-- 3. Advanced filtering and transformation using SQL expressions
-- 4. Built-in analytics and monitoring with SQL aggregation functions
-- 5. Resume token management for reliable stream processing
-- 6. Performance monitoring and health assessment queries
-- 7. Integration with existing SQL-based reporting and analytics
-- 8. Event-driven architecture patterns using familiar SQL constructs
-- 9. Multi-collection change coordination with SQL joins and unions
-- 10. Seamless scaling from simple change detection to complex event processing
Best Practices for Change Stream Implementation
Performance and Scalability Considerations
Optimize Change Streams for high-throughput, production environments:
- Pipeline Filtering: Use aggregation pipelines to filter changes at the database level
- Resume Token Management: Implement robust resume token persistence for crash recovery
- Batch Processing: Process changes in batches to improve throughput
- Resource Management: Monitor memory and connection usage for long-running streams
- Error Handling: Implement comprehensive error handling and retry logic
- Oplog Sizing: Ensure adequate oplog size for change stream retention requirements
Event-Driven Architecture Patterns
Design scalable event-driven systems with Change Streams:
- Event Sourcing: Use Change Streams as the foundation for event sourcing patterns
- CQRS Integration: Implement Command Query Responsibility Segregation with change-driven read model updates
- Microservice Communication: Coordinate microservices through change-driven events
- Data Synchronization: Maintain consistency across distributed systems
- Real-time Analytics: Power real-time dashboards and analytics with streaming changes
- Audit and Compliance: Implement comprehensive audit trails with change event logging
Conclusion
MongoDB Change Streams provide comprehensive real-time change detection capabilities that eliminate the complexity and overhead of traditional polling-based approaches while enabling sophisticated event-driven architectures. The native integration with MongoDB's replica set architecture, combined with resumable streams and fine-grained filtering, makes building reactive applications both powerful and reliable.
Key Change Stream benefits include:
- Real-time Processing: Millisecond latency change detection without polling overhead
- Guaranteed Delivery: Ordered, resumable streams with crash recovery capabilities
- Rich Filtering: Aggregation pipeline-based change filtering and transformation
- Horizontal Scaling: Native support for distributed processing across multiple application instances
- Operational Simplicity: No external message brokers or complex trigger maintenance required
- Event Sourcing Support: Built-in capabilities for implementing event sourcing patterns
Whether you're building microservices architectures, real-time analytics platforms, data synchronization systems, or event-driven applications, MongoDB Change Streams with QueryLeaf's familiar SQL interface provides the foundation for sophisticated reactive data processing. This combination enables you to implement complex event-driven functionality while preserving familiar database interaction patterns.
QueryLeaf Integration: QueryLeaf automatically manages MongoDB Change Stream operations while providing SQL-familiar event processing syntax, resume token handling, and stream analytics functions. Advanced change detection, event routing, and stream monitoring are seamlessly handled through familiar SQL patterns, making event-driven architecture development both powerful and accessible.
The integration of native change detection capabilities with SQL-style stream processing makes MongoDB an ideal platform for applications requiring both real-time reactivity and familiar database interaction patterns, ensuring your event-driven solutions remain both effective and maintainable as they scale and evolve.