MongoDB Change Streams and Event-Driven Architecture: Real-Time Data Processing and Reactive Application Development with SQL-Compatible Operations
Modern applications increasingly require real-time responsiveness to data changes, enabling immediate updates across distributed systems, live dashboards, notification systems, and collaborative features. Traditional polling-based approaches create significant performance overhead, increase database load, and introduce unacceptable latency for responsive user experiences.
MongoDB Change Streams provide native event-driven capabilities that eliminate polling overhead through real-time change notifications, enabling sophisticated reactive architectures with guaranteed delivery, resumability, and comprehensive filtering. Unlike traditional database triggers or external message queues that require complex infrastructure management, Change Streams deliver enterprise-grade real-time data processing with automatic failover, distributed coordination, and seamless integration with MongoDB's operational model.
The Traditional Change Detection Challenge
Conventional approaches to detecting data changes involve significant complexity, performance penalties, and reliability issues:
-- Traditional PostgreSQL change detection - complex polling with performance overhead
-- Audit table approach with triggers (complex maintenance and performance impact)
CREATE TABLE product_audit (
audit_id BIGSERIAL PRIMARY KEY,
product_id BIGINT NOT NULL,
operation_type VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
old_data JSONB,
new_data JSONB,
changed_fields TEXT[],
change_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id BIGINT,
session_id VARCHAR(100),
application_context JSONB,
-- Performance indexes
INDEX audit_product_time_idx (product_id, change_timestamp DESC),
INDEX audit_operation_time_idx (operation_type, change_timestamp DESC)
);
-- Complex trigger function for change tracking
CREATE OR REPLACE FUNCTION track_product_changes()
RETURNS TRIGGER AS $$
DECLARE
old_json JSONB;
new_json JSONB;
changed_fields TEXT[] := ARRAY[]::TEXT[];
field_name TEXT;
field_value_old TEXT;
field_value_new TEXT;
BEGIN
-- Handle different operation types
CASE TG_OP
WHEN 'INSERT' THEN
new_json := row_to_json(NEW)::JSONB;
INSERT INTO product_audit (
product_id, operation_type, new_data,
changed_fields, user_id, session_id
) VALUES (
NEW.product_id, 'INSERT', new_json,
array(select key from jsonb_each(new_json)),
NEW.last_modified_by, NEW.session_id
);
RETURN NEW;
WHEN 'UPDATE' THEN
old_json := row_to_json(OLD)::JSONB;
new_json := row_to_json(NEW)::JSONB;
-- Complex field-by-field comparison for change detection
FOR field_name IN SELECT key FROM jsonb_each(new_json) LOOP
field_value_old := COALESCE((old_json->>field_name), '');
field_value_new := COALESCE((new_json->>field_name), '');
IF field_value_old != field_value_new THEN
changed_fields := array_append(changed_fields, field_name);
END IF;
END LOOP;
-- Only log if there are actual changes
IF array_length(changed_fields, 1) > 0 THEN
INSERT INTO product_audit (
product_id, operation_type, old_data, new_data,
changed_fields, user_id, session_id
) VALUES (
NEW.product_id, 'UPDATE', old_json, new_json,
changed_fields, NEW.last_modified_by, NEW.session_id
);
END IF;
RETURN NEW;
WHEN 'DELETE' THEN
old_json := row_to_json(OLD)::JSONB;
INSERT INTO product_audit (
product_id, operation_type, old_data,
changed_fields, user_id, session_id
) VALUES (
OLD.product_id, 'DELETE', old_json,
array(select key from jsonb_each(old_json)),
OLD.last_modified_by, OLD.session_id
);
RETURN OLD;
END CASE;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Create triggers on multiple tables (maintenance overhead)
CREATE TRIGGER product_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION track_product_changes();
CREATE TRIGGER inventory_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON inventory
FOR EACH ROW EXECUTE FUNCTION track_inventory_changes();
-- Polling-based change consumption (expensive and unreliable)
WITH recent_changes AS (
SELECT
pa.audit_id,
pa.product_id,
pa.operation_type,
pa.old_data,
pa.new_data,
pa.changed_fields,
pa.change_timestamp,
pa.user_id,
pa.session_id,
-- Product context enrichment (expensive joins)
p.name as product_name,
p.category_id,
p.current_price,
p.status,
c.category_name,
-- Change analysis
CASE
WHEN pa.operation_type = 'INSERT' THEN 'Product Created'
WHEN pa.operation_type = 'UPDATE' AND 'current_price' = ANY(pa.changed_fields) THEN 'Price Updated'
WHEN pa.operation_type = 'UPDATE' AND 'status' = ANY(pa.changed_fields) THEN 'Status Changed'
WHEN pa.operation_type = 'UPDATE' THEN 'Product Modified'
WHEN pa.operation_type = 'DELETE' THEN 'Product Removed'
END as change_description,
-- Business impact assessment
CASE
WHEN pa.operation_type = 'UPDATE' AND 'current_price' = ANY(pa.changed_fields) THEN
CASE
WHEN (pa.new_data->>'current_price')::DECIMAL > (pa.old_data->>'current_price')::DECIMAL
THEN 'Price Increase'
ELSE 'Price Decrease'
END
WHEN pa.operation_type = 'UPDATE' AND 'inventory_count' = ANY(pa.changed_fields) THEN
CASE
WHEN (pa.new_data->>'inventory_count')::INTEGER <= 5 THEN 'Low Stock Alert'
WHEN (pa.new_data->>'inventory_count')::INTEGER = 0 THEN 'Out of Stock'
ELSE 'Inventory Updated'
END
END as business_impact,
-- Notification targeting
CASE
WHEN pa.operation_type = 'INSERT' THEN ARRAY['product_managers', 'inventory_team']
WHEN pa.operation_type = 'UPDATE' AND 'current_price' = ANY(pa.changed_fields)
THEN ARRAY['pricing_team', 'sales_team', 'customers']
WHEN pa.operation_type = 'UPDATE' AND 'inventory_count' = ANY(pa.changed_fields)
THEN ARRAY['inventory_team', 'fulfillment']
WHEN pa.operation_type = 'DELETE' THEN ARRAY['product_managers', 'customers']
ELSE ARRAY['general_subscribers']
END as notification_targets
FROM product_audit pa
LEFT JOIN products p ON pa.product_id = p.product_id
LEFT JOIN categories c ON p.category_id = c.category_id
WHERE pa.change_timestamp > (
-- Get last processed timestamp (requires external state management)
SELECT COALESCE(last_processed_timestamp, CURRENT_TIMESTAMP - INTERVAL '5 minutes')
FROM change_processing_checkpoint
WHERE processor_name = 'product_change_handler'
)
ORDER BY pa.change_timestamp ASC
),
change_aggregation AS (
-- Complex aggregation for batch processing
SELECT
rc.product_id,
rc.product_name,
rc.category_name,
COUNT(*) as total_changes,
-- Change type counts
COUNT(*) FILTER (WHERE operation_type = 'INSERT') as creates,
COUNT(*) FILTER (WHERE operation_type = 'UPDATE') as updates,
COUNT(*) FILTER (WHERE operation_type = 'DELETE') as deletes,
-- Business impact analysis
COUNT(*) FILTER (WHERE business_impact LIKE '%Price%') as price_changes,
COUNT(*) FILTER (WHERE business_impact LIKE '%Stock%') as inventory_changes,
-- Change timeline
MIN(change_timestamp) as first_change,
MAX(change_timestamp) as last_change,
EXTRACT(SECONDS FROM (MAX(change_timestamp) - MIN(change_timestamp))) as change_window_seconds,
-- Most recent change details
(array_agg(rc.operation_type ORDER BY rc.change_timestamp DESC))[1] as latest_operation,
(array_agg(rc.change_description ORDER BY rc.change_timestamp DESC))[1] as latest_description,
(array_agg(rc.business_impact ORDER BY rc.change_timestamp DESC))[1] as latest_impact,
-- Notification consolidation
array_agg(DISTINCT unnest(rc.notification_targets)) as all_notification_targets,
-- Change velocity (changes per minute)
CASE
WHEN EXTRACT(SECONDS FROM (MAX(change_timestamp) - MIN(change_timestamp))) > 0
THEN COUNT(*)::DECIMAL / (EXTRACT(SECONDS FROM (MAX(change_timestamp) - MIN(change_timestamp))) / 60)
ELSE COUNT(*)::DECIMAL
END as changes_per_minute
FROM recent_changes rc
GROUP BY rc.product_id, rc.product_name, rc.category_name
),
notification_prioritization AS (
SELECT
ca.*,
-- Priority scoring
(
-- Change frequency component
LEAST(changes_per_minute * 2, 10) +
-- Business impact component
CASE
WHEN price_changes > 0 THEN 5
WHEN inventory_changes > 0 THEN 4
WHEN creates > 0 THEN 3
WHEN deletes > 0 THEN 6
ELSE 1
END +
-- Recency component
CASE
WHEN change_window_seconds < 300 THEN 3 -- Within 5 minutes
WHEN change_window_seconds < 3600 THEN 2 -- Within 1 hour
ELSE 1
END
) as priority_score,
-- Alert classification
CASE
WHEN deletes > 0 THEN 'critical'
WHEN price_changes > 0 AND changes_per_minute > 1 THEN 'high'
WHEN inventory_changes > 0 THEN 'medium'
WHEN creates > 0 THEN 'low'
ELSE 'informational'
END as alert_level,
-- Message formatting
CASE
WHEN total_changes = 1 THEN latest_description
ELSE CONCAT(total_changes, ' changes to ', product_name, ' (', latest_description, ')')
END as notification_message
FROM change_aggregation ca
)
-- Final change processing output (still requires external message queue)
SELECT
np.product_id,
np.product_name,
np.category_name,
np.total_changes,
np.priority_score,
np.alert_level,
np.notification_message,
np.all_notification_targets,
np.last_change,
-- Processing metadata
CURRENT_TIMESTAMP as processed_at,
'product_change_handler' as processor_name,
-- External system integration requirements
CASE alert_level
WHEN 'critical' THEN 'immediate_push_notification'
WHEN 'high' THEN 'priority_email_and_push'
WHEN 'medium' THEN 'email_notification'
ELSE 'dashboard_update_only'
END as delivery_method,
-- Routing information for message queue
CASE
WHEN 'customers' = ANY(all_notification_targets) THEN 'customer_notifications_queue'
WHEN 'pricing_team' = ANY(all_notification_targets) THEN 'internal_alerts_queue'
ELSE 'general_updates_queue'
END as routing_key,
-- Deduplication key (manual implementation required)
MD5(CONCAT(product_id, ':', array_to_string(all_notification_targets, ','), ':', DATE_TRUNC('minute', last_change))) as deduplication_key
FROM notification_prioritization np
WHERE priority_score >= 3 -- Filter low-priority notifications
ORDER BY priority_score DESC, last_change DESC;
-- Update checkpoint after processing (manual transaction management)
UPDATE change_processing_checkpoint
SET
last_processed_timestamp = CURRENT_TIMESTAMP,
processed_count = processed_count + (SELECT COUNT(*) FROM recent_changes),
last_updated = CURRENT_TIMESTAMP
WHERE processor_name = 'product_change_handler';
-- Traditional polling approach problems:
-- 1. Expensive polling operations creating unnecessary database load
-- 2. Complex trigger-based audit tables requiring extensive maintenance
-- 3. Race conditions and missed changes during high-concurrency periods
-- 4. Manual checkpoint management and external state tracking required
-- 5. Complex field-level change detection with performance overhead
-- 6. No guaranteed delivery or automatic failure recovery mechanisms
-- 7. Difficult horizontal scaling of change processing systems
-- 8. External message queue infrastructure required for reliability
-- 9. Manual deduplication and ordering logic implementation required
-- 10. Limited filtering capabilities and expensive context enrichment queries
MongoDB Change Streams eliminate polling complexity with native real-time change notifications:
// MongoDB Change Streams - native real-time change processing with comprehensive event handling
const { MongoClient, ObjectId } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('ecommerce_platform');
// Advanced MongoDB Change Streams Manager
class MongoDBChangeStreamsManager {
constructor(db, config = {}) {
this.db = db;
this.config = {
// Change stream configuration
enableChangeStreams: config.enableChangeStreams !== false,
resumeAfterFailure: config.resumeAfterFailure !== false,
batchSize: config.batchSize || 100,
maxAwaitTimeMS: config.maxAwaitTimeMS || 1000,
// Event processing configuration
enableEventEnrichment: config.enableEventEnrichment !== false,
enableEventFiltering: config.enableEventFiltering !== false,
enableEventAggregation: config.enableEventAggregation !== false,
enableEventRouting: config.enableEventRouting !== false,
// Reliability and resilience
enableAutoResume: config.enableAutoResume !== false,
enableDeadLetterQueue: config.enableDeadLetterQueue !== false,
maxRetries: config.maxRetries || 3,
retryDelayMs: config.retryDelayMs || 1000,
// Performance optimization
enableParallelProcessing: config.enableParallelProcessing !== false,
processingConcurrency: config.processingConcurrency || 10,
enableBatchProcessing: config.enableBatchProcessing !== false,
batchProcessingWindowMs: config.batchProcessingWindowMs || 5000,
// Monitoring and observability
enableMetrics: config.enableMetrics !== false,
enableLogging: config.enableLogging !== false,
logLevel: config.logLevel || 'info',
...config
};
// Collection references
this.collections = {
products: db.collection('products'),
inventory: db.collection('inventory'),
orders: db.collection('orders'),
customers: db.collection('customers'),
// Event processing collections
changeEvents: db.collection('change_events'),
processingCheckpoints: db.collection('processing_checkpoints'),
deadLetterQueue: db.collection('dead_letter_queue'),
eventMetrics: db.collection('event_metrics')
};
// Change stream management
this.changeStreams = new Map();
this.eventProcessors = new Map();
this.processingQueues = new Map();
this.resumeTokens = new Map();
// Performance metrics
this.metrics = {
eventsProcessed: 0,
eventsFailured: 0,
averageProcessingTime: 0,
totalProcessingTime: 0,
lastProcessedAt: null,
processingErrors: []
};
this.initializeChangeStreams();
}
async initializeChangeStreams() {
console.log('Initializing MongoDB Change Streams for real-time data processing...');
try {
// Setup change streams for different collections
await this.setupProductChangeStream();
await this.setupInventoryChangeStream();
await this.setupOrderChangeStream();
await this.setupCustomerChangeStream();
// Setup cross-collection change aggregation
await this.setupDatabaseChangeStream();
// Initialize event processing infrastructure
await this.setupEventProcessingInfrastructure();
console.log('Change streams initialized successfully');
} catch (error) {
console.error('Error initializing change streams:', error);
throw error;
}
}
async setupProductChangeStream() {
console.log('Setting up product change stream...');
const productsCollection = this.collections.products;
// Advanced change stream pipeline with filtering and enrichment
const changeStreamPipeline = [
// Stage 1: Filter relevant operations
{
$match: {
$or: [
{ 'operationType': 'insert' },
{ 'operationType': 'update' },
{ 'operationType': 'delete' },
{ 'operationType': 'replace' }
],
// Optional namespace filtering
'ns.db': this.db.databaseName,
'ns.coll': 'products'
}
},
// Stage 2: Enrich change events with business context
{
$lookup: {
from: 'categories',
localField: 'fullDocument.categoryId',
foreignField: '_id',
as: 'categoryInfo'
}
},
// Stage 3: Add computed fields and change analysis
{
$addFields: {
// Event metadata
eventId: { $toString: '$_id' },
eventTimestamp: '$$NOW',
collectionName: '$ns.coll',
// Change analysis
changedFields: {
$cond: {
if: { $eq: ['$operationType', 'update'] },
then: { $objectToArray: '$updateDescription.updatedFields' },
else: []
}
},
// Business context
categoryInfo: { $arrayElemAt: ['$categoryInfo', 0] },
// Priority assessment
eventPriority: {
$switch: {
branches: [
{
case: { $eq: ['$operationType', 'delete'] },
then: 'critical'
},
{
case: {
$and: [
{ $eq: ['$operationType', 'update'] },
{ $ne: [{ $type: '$updateDescription.updatedFields.price' }, 'missing'] }
]
},
then: 'high'
},
{
case: {
$and: [
{ $eq: ['$operationType', 'update'] },
{ $ne: [{ $type: '$updateDescription.updatedFields.inventory' }, 'missing'] }
]
},
then: 'medium'
},
{
case: { $eq: ['$operationType', 'insert'] },
then: 'low'
}
],
default: 'informational'
}
},
// Notification routing
notificationTargets: {
$switch: {
branches: [
{
case: { $eq: ['$operationType', 'insert'] },
then: ['product_managers', 'inventory_team']
},
{
case: {
$and: [
{ $eq: ['$operationType', 'update'] },
{ $ne: [{ $type: '$updateDescription.updatedFields.price' }, 'missing'] }
]
},
then: ['pricing_team', 'sales_team', 'customers']
},
{
case: {
$and: [
{ $eq: ['$operationType', 'update'] },
{ $ne: [{ $type: '$updateDescription.updatedFields.inventory' }, 'missing'] }
]
},
then: ['inventory_team', 'fulfillment']
},
{
case: { $eq: ['$operationType', 'delete'] },
then: ['product_managers', 'customers']
}
],
default: ['general_subscribers']
}
}
}
}
];
// Create change stream with pipeline and options
const productChangeStream = productsCollection.watch(changeStreamPipeline, {
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable',
batchSize: this.config.batchSize,
maxAwaitTimeMS: this.config.maxAwaitTimeMS,
resumeAfter: this.resumeTokens.get('products')
});
// Event processing handler
productChangeStream.on('change', async (changeEvent) => {
await this.processProductChangeEvent(changeEvent);
});
// Error handling and resume token management
productChangeStream.on('error', async (error) => {
console.error('Product change stream error:', error);
await this.handleChangeStreamError('products', error);
});
productChangeStream.on('resumeTokenChanged', (resumeToken) => {
this.resumeTokens.set('products', resumeToken);
this.persistResumeToken('products', resumeToken);
});
this.changeStreams.set('products', productChangeStream);
console.log('Product change stream setup complete');
}
async processProductChangeEvent(changeEvent) {
const startTime = Date.now();
try {
console.log(`Processing product change event: ${changeEvent.operationType} for product ${changeEvent.documentKey._id}`);
// Enrich change event with additional context
const enrichedEvent = await this.enrichProductChangeEvent(changeEvent);
// Apply business logic and routing
const processedEvent = await this.applyProductBusinessLogic(enrichedEvent);
// Route to appropriate handlers
await this.routeProductChangeEvent(processedEvent);
// Store event for audit and analytics
await this.storeChangeEvent(processedEvent);
// Update metrics
this.updateProcessingMetrics(startTime, 'success');
} catch (error) {
console.error('Error processing product change event:', error);
// Handle processing failure
await this.handleEventProcessingError(changeEvent, error);
this.updateProcessingMetrics(startTime, 'error');
}
}
async enrichProductChangeEvent(changeEvent) {
console.log('Enriching product change event with business context...');
try {
const enrichedEvent = {
...changeEvent,
// Processing metadata
processingId: new ObjectId(),
processingTimestamp: new Date(),
processorVersion: '1.0',
// Document context (current and previous state)
currentDocument: changeEvent.fullDocument,
previousDocument: changeEvent.fullDocumentBeforeChange,
// Change analysis
changeAnalysis: await this.analyzeProductChange(changeEvent),
// Business impact assessment
businessImpact: await this.assessProductBusinessImpact(changeEvent),
// Related data enrichment
relatedData: await this.getRelatedProductData(changeEvent.documentKey._id),
// Notification configuration
notificationConfig: await this.getProductNotificationConfig(changeEvent),
// Processing context
processingContext: {
correlationId: changeEvent.eventId,
sourceCollection: changeEvent.collectionName,
processingPipeline: 'product_changes',
retryCount: 0,
maxRetries: this.config.maxRetries
}
};
return enrichedEvent;
} catch (error) {
console.error('Error enriching product change event:', error);
throw error;
}
}
async analyzeProductChange(changeEvent) {
const analysis = {
operationType: changeEvent.operationType,
affectedFields: [],
fieldChanges: {},
changeType: 'unknown',
changeSignificance: 'low'
};
switch (changeEvent.operationType) {
case 'insert':
analysis.changeType = 'product_creation';
analysis.changeSignificance = 'medium';
analysis.affectedFields = Object.keys(changeEvent.fullDocument || {});
break;
case 'update':
if (changeEvent.updateDescription && changeEvent.updateDescription.updatedFields) {
analysis.affectedFields = Object.keys(changeEvent.updateDescription.updatedFields);
// Analyze specific field changes
const updatedFields = changeEvent.updateDescription.updatedFields;
for (const [field, newValue] of Object.entries(updatedFields)) {
const oldValue = changeEvent.fullDocumentBeforeChange?.[field];
analysis.fieldChanges[field] = {
oldValue,
newValue,
changeType: this.classifyFieldChange(field, oldValue, newValue)
};
}
// Determine change type and significance
if ('price' in updatedFields) {
analysis.changeType = 'price_update';
analysis.changeSignificance = 'high';
} else if ('inventory' in updatedFields) {
analysis.changeType = 'inventory_update';
analysis.changeSignificance = 'medium';
} else if ('status' in updatedFields) {
analysis.changeType = 'status_change';
analysis.changeSignificance = 'medium';
} else {
analysis.changeType = 'product_modification';
analysis.changeSignificance = 'low';
}
}
break;
case 'delete':
analysis.changeType = 'product_deletion';
analysis.changeSignificance = 'critical';
break;
case 'replace':
analysis.changeType = 'product_replacement';
analysis.changeSignificance = 'high';
break;
}
return analysis;
}
classifyFieldChange(fieldName, oldValue, newValue) {
switch (fieldName) {
case 'price':
if (newValue > oldValue) return 'price_increase';
if (newValue < oldValue) return 'price_decrease';
return 'price_change';
case 'inventory':
if (newValue === 0) return 'out_of_stock';
if (newValue <= 5) return 'low_stock';
if (newValue > oldValue) return 'stock_increase';
if (newValue < oldValue) return 'stock_decrease';
return 'inventory_adjustment';
case 'status':
if (newValue === 'discontinued') return 'product_discontinued';
if (newValue === 'active' && oldValue !== 'active') return 'product_activated';
if (newValue !== 'active' && oldValue === 'active') return 'product_deactivated';
return 'status_change';
default:
return 'field_update';
}
}
async assessProductBusinessImpact(changeEvent) {
const impact = {
impactLevel: 'low',
impactAreas: [],
affectedSystems: [],
businessMetrics: {},
actionRequired: false,
recommendations: []
};
const productId = changeEvent.documentKey._id;
const analysis = await this.analyzeProductChange(changeEvent);
// Assess impact based on change type
switch (analysis.changeType) {
case 'price_update':
impact.impactLevel = 'high';
impact.impactAreas = ['revenue', 'customer_experience', 'competitive_positioning'];
impact.affectedSystems = ['pricing_engine', 'recommendation_system', 'customer_notifications'];
impact.actionRequired = true;
impact.recommendations = [
'Notify customers of price changes',
'Update marketing materials',
'Review competitive pricing'
];
// Calculate price change impact
const priceChange = analysis.fieldChanges?.price;
if (priceChange) {
impact.businessMetrics.priceChangePercentage =
((priceChange.newValue - priceChange.oldValue) / priceChange.oldValue * 100).toFixed(2);
}
break;
case 'inventory_update':
impact.impactLevel = 'medium';
impact.impactAreas = ['fulfillment', 'customer_experience'];
impact.affectedSystems = ['inventory_management', 'order_processing'];
const inventoryChange = analysis.fieldChanges?.inventory;
if (inventoryChange) {
if (inventoryChange.newValue === 0) {
impact.impactLevel = 'high';
impact.actionRequired = true;
impact.recommendations = ['Update product availability', 'Notify backordered customers'];
} else if (inventoryChange.newValue <= 5) {
impact.recommendations = ['Monitor inventory levels', 'Plan restocking'];
}
}
break;
case 'product_deletion':
impact.impactLevel = 'critical';
impact.impactAreas = ['customer_experience', 'revenue', 'data_integrity'];
impact.affectedSystems = ['catalog_management', 'order_processing', 'recommendations'];
impact.actionRequired = true;
impact.recommendations = [
'Handle existing orders',
'Update customer wishlists',
'Archive product data',
'Redirect product URLs'
];
break;
case 'product_creation':
impact.impactLevel = 'medium';
impact.impactAreas = ['catalog_expansion', 'revenue_opportunity'];
impact.affectedSystems = ['search_indexing', 'recommendation_system', 'inventory_tracking'];
impact.recommendations = [
'Index for search',
'Generate recommendations',
'Create marketing content'
];
break;
}
return impact;
}
async getRelatedProductData(productId) {
try {
// Get product relationships and context
const relatedData = await Promise.allSettled([
// Category information
this.collections.products.aggregate([
{ $match: { _id: productId } },
{
$lookup: {
from: 'categories',
localField: 'categoryId',
foreignField: '_id',
as: 'category'
}
},
{ $project: { category: { $arrayElemAt: ['$category', 0] } } }
]).toArray(),
// Inventory information
this.collections.inventory.findOne({ productId: productId }),
// Recent orders for this product
this.collections.orders.find({
'items.productId': productId,
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) } // Last 30 days
}).limit(10).toArray(),
// Customer interest metrics
this.collections.analytics.findOne({
productId: productId,
type: 'product_engagement'
})
]);
const [categoryResult, inventoryResult, ordersResult, analyticsResult] = relatedData;
return {
category: categoryResult.status === 'fulfilled' ? categoryResult.value[0]?.category : null,
inventory: inventoryResult.status === 'fulfilled' ? inventoryResult.value : null,
recentOrders: ordersResult.status === 'fulfilled' ? ordersResult.value : [],
analytics: analyticsResult.status === 'fulfilled' ? analyticsResult.value : null,
dataRetrievedAt: new Date()
};
} catch (error) {
console.error('Error getting related product data:', error);
return { error: error.message };
}
}
async getProductNotificationConfig(changeEvent) {
const config = {
enableNotifications: true,
notificationTargets: changeEvent.notificationTargets || [],
deliveryMethods: ['push', 'email'],
priority: changeEvent.eventPriority || 'low',
batching: {
enabled: true,
windowMs: 60000, // 1 minute
maxBatchSize: 10
},
filtering: {
enabled: true,
rules: []
}
};
// Customize based on event type and priority
switch (changeEvent.operationType) {
case 'delete':
config.deliveryMethods = ['push', 'email', 'sms'];
config.batching.enabled = false; // Immediate delivery
break;
case 'update':
if (changeEvent.eventPriority === 'high') {
config.deliveryMethods = ['push', 'email'];
config.batching.windowMs = 30000; // 30 seconds
}
break;
}
return config;
}
async applyProductBusinessLogic(enrichedEvent) {
console.log('Applying business logic to product change event...');
try {
const processedEvent = {
...enrichedEvent,
// Business rules execution results
businessRules: await this.executeProductBusinessRules(enrichedEvent),
// Workflow triggers
workflowTriggers: await this.identifyWorkflowTriggers(enrichedEvent),
// Integration requirements
integrationRequirements: await this.identifyIntegrationRequirements(enrichedEvent),
// Compliance and governance
complianceChecks: await this.performComplianceChecks(enrichedEvent)
};
return processedEvent;
} catch (error) {
console.error('Error applying business logic:', error);
throw error;
}
}
async executeProductBusinessRules(enrichedEvent) {
const rules = [];
const analysis = enrichedEvent.changeAnalysis;
// Price change rules
if (analysis.changeType === 'price_update') {
const priceChange = analysis.fieldChanges.price;
const changePercent = Math.abs(
((priceChange.newValue - priceChange.oldValue) / priceChange.oldValue) * 100
);
if (changePercent > 20) {
rules.push({
rule: 'significant_price_change',
triggered: true,
severity: 'high',
action: 'require_manager_approval',
details: `Price change of ${changePercent.toFixed(2)}% requires approval`
});
}
if (priceChange.newValue < priceChange.oldValue * 0.5) {
rules.push({
rule: 'deep_discount_alert',
triggered: true,
severity: 'medium',
action: 'fraud_detection_review',
details: 'Price reduced by more than 50%'
});
}
}
// Inventory rules
if (analysis.changeType === 'inventory_update') {
const inventoryChange = analysis.fieldChanges.inventory;
if (inventoryChange?.newValue === 0) {
rules.push({
rule: 'out_of_stock',
triggered: true,
severity: 'high',
action: 'update_product_availability',
details: 'Product is now out of stock'
});
}
if (inventoryChange?.newValue <= 5 && inventoryChange?.newValue > 0) {
rules.push({
rule: 'low_stock_warning',
triggered: true,
severity: 'medium',
action: 'reorder_notification',
details: `Low stock: ${inventoryChange.newValue} units remaining`
});
}
}
// Product lifecycle rules
if (analysis.changeType === 'product_deletion') {
rules.push({
rule: 'product_deletion',
triggered: true,
severity: 'critical',
action: 'cleanup_related_data',
details: 'Product deleted - cleanup required'
});
}
return rules;
}
async routeProductChangeEvent(processedEvent) {
console.log('Routing product change event to appropriate handlers...');
try {
const routingTasks = [];
// Real-time notification routing
if (processedEvent.notificationConfig.enableNotifications) {
routingTasks.push(this.routeToNotificationSystem(processedEvent));
}
// Search index updates
if (['insert', 'update', 'replace'].includes(processedEvent.operationType)) {
routingTasks.push(this.routeToSearchIndexing(processedEvent));
}
// Analytics and reporting
routingTasks.push(this.routeToAnalytics(processedEvent));
// Integration webhooks
if (processedEvent.integrationRequirements?.webhooks?.length > 0) {
routingTasks.push(this.routeToWebhooks(processedEvent));
}
// Workflow automation
if (processedEvent.workflowTriggers?.length > 0) {
routingTasks.push(this.routeToWorkflowEngine(processedEvent));
}
// Business intelligence
routingTasks.push(this.routeToBusinessIntelligence(processedEvent));
// Execute routing tasks concurrently
await Promise.allSettled(routingTasks);
} catch (error) {
console.error('Error routing product change event:', error);
throw error;
}
}
async routeToNotificationSystem(processedEvent) {
console.log('Routing to notification system...');
const notification = {
eventId: processedEvent.processingId,
eventType: 'product_change',
operationType: processedEvent.operationType,
productId: processedEvent.documentKey._id,
priority: processedEvent.eventPriority,
targets: processedEvent.notificationTargets,
deliveryMethods: processedEvent.notificationConfig.deliveryMethods,
message: this.generateNotificationMessage(processedEvent),
payload: {
productDetails: processedEvent.currentDocument,
changeAnalysis: processedEvent.changeAnalysis,
businessImpact: processedEvent.businessImpact
},
routing: {
immediate: processedEvent.eventPriority === 'critical',
batchable: processedEvent.notificationConfig.batching.enabled,
batchWindowMs: processedEvent.notificationConfig.batching.windowMs
},
createdAt: new Date()
};
// Route to notification queue (could be MongoDB collection, message queue, etc.)
await this.collections.notifications.insertOne(notification);
return notification;
}
generateNotificationMessage(processedEvent) {
const analysis = processedEvent.changeAnalysis;
const product = processedEvent.currentDocument;
switch (analysis.changeType) {
case 'product_creation':
return `New product added: ${product.name}`;
case 'price_update':
const priceChange = analysis.fieldChanges.price;
const direction = priceChange.newValue > priceChange.oldValue ? 'increased' : 'decreased';
return `Price ${direction} for ${product.name}: $${priceChange.oldValue} → $${priceChange.newValue}`;
case 'inventory_update':
const inventoryChange = analysis.fieldChanges.inventory;
if (inventoryChange.newValue === 0) {
return `${product.name} is now out of stock`;
} else if (inventoryChange.newValue <= 5) {
return `Low stock alert: ${product.name} (${inventoryChange.newValue} remaining)`;
} else {
return `Inventory updated for ${product.name}: ${inventoryChange.newValue} units`;
}
case 'product_deletion':
return `Product removed: ${product.name}`;
default:
return `Product updated: ${product.name}`;
}
}
async routeToSearchIndexing(processedEvent) {
console.log('Routing to search indexing system...');
const indexUpdate = {
eventId: processedEvent.processingId,
operationType: processedEvent.operationType,
documentId: processedEvent.documentKey._id,
collection: 'products',
document: processedEvent.currentDocument,
priority: processedEvent.eventPriority === 'critical' ? 'immediate' : 'normal',
indexingInstructions: {
fullReindex: processedEvent.operationType === 'insert',
partialUpdate: processedEvent.operationType === 'update',
deleteFromIndex: processedEvent.operationType === 'delete',
affectedFields: processedEvent.changeAnalysis.affectedFields
},
createdAt: new Date()
};
await this.collections.searchIndexUpdates.insertOne(indexUpdate);
return indexUpdate;
}
async setupDatabaseChangeStream() {
console.log('Setting up database-wide change stream for cross-collection analytics...');
// Database-level change stream for comprehensive monitoring
const databaseChangeStream = this.db.watch([
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] },
'ns.db': this.db.databaseName,
'ns.coll': { $in: ['products', 'orders', 'customers', 'inventory'] }
}
},
{
$addFields: {
eventId: { $toString: '$_id' },
eventTimestamp: '$$NOW',
// Cross-collection correlation
correlationContext: {
$switch: {
branches: [
{
case: { $eq: ['$ns.coll', 'products'] },
then: {
type: 'product_event',
productId: '$documentKey._id',
correlationKey: '$documentKey._id'
}
},
{
case: { $eq: ['$ns.coll', 'orders'] },
then: {
type: 'order_event',
orderId: '$documentKey._id',
correlationKey: '$fullDocument.customerId'
}
}
],
default: { type: 'generic_event' }
}
}
}
}
], {
fullDocument: 'updateLookup'
});
databaseChangeStream.on('change', async (changeEvent) => {
await this.processDatabaseChangeEvent(changeEvent);
});
this.changeStreams.set('database', databaseChangeStream);
console.log('Database change stream setup complete');
}
async processDatabaseChangeEvent(changeEvent) {
try {
// Cross-collection event correlation and analytics
await this.performCrossCollectionAnalytics(changeEvent);
// Real-time business metrics updates
await this.updateRealTimeMetrics(changeEvent);
// Event pattern detection
await this.detectEventPatterns(changeEvent);
} catch (error) {
console.error('Error processing database change event:', error);
}
}
async storeChangeEvent(processedEvent) {
try {
const changeEventRecord = {
eventId: processedEvent.processingId,
resumeToken: processedEvent._id,
// Event identification
operationType: processedEvent.operationType,
collection: processedEvent.ns?.coll,
documentId: processedEvent.documentKey._id,
// Timing information
clusterTime: processedEvent.clusterTime,
eventTimestamp: processedEvent.eventTimestamp,
processingTimestamp: processedEvent.processingTimestamp,
// Change details
changeAnalysis: processedEvent.changeAnalysis,
businessImpact: processedEvent.businessImpact,
// Processing results
businessRules: processedEvent.businessRules,
routingResults: processedEvent.routingResults,
// Status and metadata
processingStatus: 'completed',
processingVersion: processedEvent.processorVersion,
// Audit trail
createdAt: new Date(),
retentionPolicy: 'standard' // Keep for standard retention period
};
await this.collections.changeEvents.insertOne(changeEventRecord);
} catch (error) {
console.error('Error storing change event:', error);
// Don't throw - storage failure shouldn't stop processing
}
}
async handleEventProcessingError(changeEvent, error) {
console.log('Handling event processing error...');
try {
const errorRecord = {
eventId: new ObjectId(),
originalEventId: changeEvent.eventId,
// Error details
error: {
name: error.name,
message: error.message,
stack: error.stack,
code: error.code
},
// Event context
changeEvent: changeEvent,
processingAttempt: (changeEvent.processingContext?.retryCount || 0) + 1,
maxRetries: this.config.maxRetries,
// Status
status: 'pending_retry',
nextRetryAt: new Date(Date.now() + this.config.retryDelayMs),
createdAt: new Date()
};
// Store in dead letter queue if max retries exceeded
if (errorRecord.processingAttempt >= this.config.maxRetries) {
errorRecord.status = 'dead_letter';
errorRecord.nextRetryAt = null;
}
await this.collections.deadLetterQueue.insertOne(errorRecord);
// Schedule retry if applicable
if (errorRecord.status === 'pending_retry') {
setTimeout(() => {
this.retryEventProcessing(errorRecord);
}, this.config.retryDelayMs);
}
} catch (storeError) {
console.error('Error storing failed event:', storeError);
}
}
updateProcessingMetrics(startTime, status) {
const processingTime = Date.now() - startTime;
this.metrics.eventsProcessed++;
this.metrics.totalProcessingTime += processingTime;
this.metrics.averageProcessingTime = this.metrics.totalProcessingTime / this.metrics.eventsProcessed;
this.metrics.lastProcessedAt = new Date();
if (status === 'error') {
this.metrics.eventsFailured++;
}
if (this.config.enableMetrics) {
// Log metrics periodically
if (this.metrics.eventsProcessed % 100 === 0) {
console.log(`Processing metrics: ${this.metrics.eventsProcessed} events processed, ` +
`${this.metrics.averageProcessingTime.toFixed(2)}ms avg processing time, ` +
`${this.metrics.eventsFailured} failures`);
}
}
}
async persistResumeToken(streamName, resumeToken) {
try {
await this.collections.processingCheckpoints.updateOne(
{ streamName: streamName },
{
$set: {
resumeToken: resumeToken,
lastUpdated: new Date()
}
},
{ upsert: true }
);
} catch (error) {
console.error(`Error persisting resume token for ${streamName}:`, error);
}
}
async loadResumeTokens() {
try {
const checkpoints = await this.collections.processingCheckpoints.find({}).toArray();
for (const checkpoint of checkpoints) {
this.resumeTokens.set(checkpoint.streamName, checkpoint.resumeToken);
}
console.log(`Loaded ${checkpoints.length} resume tokens`);
} catch (error) {
console.error('Error loading resume tokens:', error);
}
}
async getProcessingStatistics() {
return {
activeStreams: this.changeStreams.size,
eventsProcessed: this.metrics.eventsProcessed,
eventsFailured: this.metrics.eventsFailured,
averageProcessingTime: this.metrics.averageProcessingTime,
successRate: ((this.metrics.eventsProcessed - this.metrics.eventsFailured) / this.metrics.eventsProcessed * 100).toFixed(2),
lastProcessedAt: this.metrics.lastProcessedAt,
// Stream-specific metrics
streamMetrics: Object.fromEntries(this.changeStreams.keys().map(name => [
name,
{ active: true, resumeToken: this.resumeTokens.has(name) }
]))
};
}
async shutdown() {
console.log('Shutting down Change Streams Manager...');
// Close all change streams
for (const [name, stream] of this.changeStreams) {
try {
await stream.close();
console.log(`Closed change stream: ${name}`);
} catch (error) {
console.error(`Error closing change stream ${name}:`, error);
}
}
// Final metrics log
const stats = await this.getProcessingStatistics();
console.log('Final processing statistics:', stats);
console.log('Change Streams Manager shutdown complete');
}
}
// Benefits of MongoDB Change Streams:
// - Real-time change notifications without polling overhead
// - Guaranteed delivery with automatic resume capability and failure recovery
// - Advanced filtering and aggregation pipelines for targeted event processing
// - Comprehensive change context including before/after document state
// - Native integration with MongoDB's replica set and sharding architecture
// - Atomic change detection with cluster-wide ordering guarantees
// - Efficient resource utilization with intelligent batching and buffering
// - Seamless integration with existing MongoDB operations and security
// - SQL-compatible event processing through QueryLeaf integration
// - Production-ready reliability with built-in error handling and retry logic
module.exports = {
MongoDBChangeStreamsManager
};
Understanding MongoDB Change Streams Architecture
Advanced Event-Driven Patterns for Real-Time Applications
Implement sophisticated change stream patterns for production event-driven systems:
// Production-ready Change Streams with advanced event processing and routing
class ProductionChangeStreamsProcessor extends MongoDBChangeStreamsManager {
constructor(db, productionConfig) {
super(db, productionConfig);
this.productionConfig = {
...productionConfig,
enableEventSourcing: true,
enableCQRS: true,
enableEventStore: true,
enableSagaOrchestration: true,
enableEventProjections: true,
enableSnapshotting: true
};
this.setupProductionEventProcessing();
this.initializeEventSourcing();
this.setupCQRSProjections();
this.setupSagaOrchestration();
}
async implementEventSourcingPattern() {
console.log('Implementing event sourcing pattern with Change Streams...');
const eventSourcingStrategy = {
// Event store management
eventStore: {
enableEventPersistence: true,
enableEventReplay: true,
enableSnapshotting: true,
snapshotFrequency: 1000
},
// Command handling
commandHandling: {
enableCommandValidation: true,
enableCommandProjections: true,
enableCommandSagas: true
},
// Query projections
queryProjections: {
enableRealTimeProjections: true,
enableMaterializedViews: true,
enableProjectionRecovery: true
}
};
return await this.deployEventSourcing(eventSourcingStrategy);
}
async setupAdvancedEventRouting() {
console.log('Setting up advanced event routing and distribution...');
const routingStrategy = {
// Message routing
messageRouting: {
enableTopicRouting: true,
enableContentRouting: true,
enableGeographicRouting: true,
enableLoadBalancing: true
},
// Event transformation
eventTransformation: {
enableEventEnrichment: true,
enableEventFiltering: true,
enableEventAggregation: true,
enableEventSplitting: true
},
// Delivery guarantees
deliveryGuarantees: {
enableAtLeastOnceDelivery: true,
enableExactlyOnceDelivery: true,
enableOrderedDelivery: true,
enableDuplicateDetection: true
}
};
return await this.deployAdvancedRouting(routingStrategy);
}
async implementReactiveStreams() {
console.log('Implementing reactive streams for backpressure management...');
const reactiveConfig = {
// Backpressure handling
backpressure: {
enableFlowControl: true,
bufferStrategy: 'drop_oldest',
maxBufferSize: 10000,
backpressureThreshold: 0.8
},
// Stream processing
streamProcessing: {
enableParallelProcessing: true,
parallelismLevel: 10,
enableBatching: true,
batchSize: 100
},
// Error handling
errorHandling: {
enableCircuitBreaker: true,
enableRetryLogic: true,
enableDeadLetterQueue: true,
enableGracefulDegradation: true
}
};
return await this.deployReactiveStreams(reactiveConfig);
}
}
SQL-Style Change Stream Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB Change Streams and event-driven operations:
-- QueryLeaf change stream operations with SQL-familiar syntax
-- Create change stream monitoring with SQL-style syntax
CREATE CHANGE_STREAM product_changes
ON products
WITH (
-- Change stream configuration
full_document = 'updateLookup',
full_document_before_change = 'whenAvailable',
batch_size = 100,
max_await_time_ms = 1000,
-- Event filtering
FILTER (
operation_type IN ('insert', 'update', 'delete') AND
namespace.database = 'ecommerce' AND
namespace.collection = 'products'
),
-- Event enrichment pipeline
ENRICH (
-- Add business context
category_info FROM categories USING fullDocument.categoryId,
inventory_info FROM inventory USING documentKey._id,
-- Compute derived fields
event_priority = CASE
WHEN operation_type = 'delete' THEN 'critical'
WHEN operation_type = 'update' AND updateDescription.updatedFields.price IS NOT NULL THEN 'high'
WHEN operation_type = 'update' AND updateDescription.updatedFields.inventory IS NOT NULL THEN 'medium'
ELSE 'low'
END,
-- Change analysis
change_type = CASE
WHEN operation_type = 'insert' THEN 'product_creation'
WHEN operation_type = 'update' AND updateDescription.updatedFields.price IS NOT NULL THEN 'price_update'
WHEN operation_type = 'update' AND updateDescription.updatedFields.inventory IS NOT NULL THEN 'inventory_update'
WHEN operation_type = 'delete' THEN 'product_deletion'
ELSE 'product_modification'
END
)
);
-- Monitor change events with SQL queries
SELECT
event_id,
operation_type,
document_key._id as product_id,
full_document.name as product_name,
full_document.price as current_price,
-- Change analysis
change_type,
event_priority,
cluster_time,
-- Business context
category_info.name as category_name,
inventory_info.quantity as current_inventory,
-- Change details for updates
CASE
WHEN operation_type = 'update' THEN
JSON_BUILD_OBJECT(
'updated_fields', updateDescription.updatedFields,
'removed_fields', updateDescription.removedFields,
'truncated_arrays', updateDescription.truncatedArrays
)
ELSE NULL
END as update_details,
-- Price change analysis
CASE
WHEN change_type = 'price_update' THEN
JSON_BUILD_OBJECT(
'old_price', fullDocumentBeforeChange.price,
'new_price', fullDocument.price,
'change_amount', fullDocument.price - fullDocumentBeforeChange.price,
'change_percentage',
ROUND(
((fullDocument.price - fullDocumentBeforeChange.price) /
fullDocumentBeforeChange.price) * 100,
2
)
)
ELSE NULL
END as price_change_analysis,
-- Inventory change analysis
CASE
WHEN change_type = 'inventory_update' THEN
JSON_BUILD_OBJECT(
'old_inventory', fullDocumentBeforeChange.inventory,
'new_inventory', fullDocument.inventory,
'change_amount', fullDocument.inventory - fullDocumentBeforeChange.inventory,
'stock_status',
CASE
WHEN fullDocument.inventory = 0 THEN 'out_of_stock'
WHEN fullDocument.inventory <= 5 THEN 'low_stock'
WHEN fullDocument.inventory > fullDocumentBeforeChange.inventory THEN 'restocked'
ELSE 'normal'
END
)
ELSE NULL
END as inventory_change_analysis
FROM CHANGE_STREAM product_changes
WHERE cluster_time > TIMESTAMP '2025-01-05 00:00:00'
ORDER BY cluster_time DESC;
-- Event aggregation and analytics
WITH change_events AS (
SELECT
*,
DATE_TRUNC('hour', cluster_time) as hour_bucket,
DATE_TRUNC('day', cluster_time) as day_bucket
FROM CHANGE_STREAM product_changes
WHERE cluster_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
),
hourly_change_metrics AS (
SELECT
hour_bucket,
-- Operation counts
COUNT(*) as total_events,
COUNT(*) FILTER (WHERE operation_type = 'insert') as product_creates,
COUNT(*) FILTER (WHERE operation_type = 'update') as product_updates,
COUNT(*) FILTER (WHERE operation_type = 'delete') as product_deletes,
-- Change type analysis
COUNT(*) FILTER (WHERE change_type = 'price_update') as price_changes,
COUNT(*) FILTER (WHERE change_type = 'inventory_update') as inventory_changes,
COUNT(*) FILTER (WHERE change_type = 'product_creation') as new_products,
-- Priority distribution
COUNT(*) FILTER (WHERE event_priority = 'critical') as critical_events,
COUNT(*) FILTER (WHERE event_priority = 'high') as high_priority_events,
COUNT(*) FILTER (WHERE event_priority = 'medium') as medium_priority_events,
COUNT(*) FILTER (WHERE event_priority = 'low') as low_priority_events,
-- Business impact metrics
AVG(CAST(price_change_analysis->>'change_percentage' AS DECIMAL)) as avg_price_change_pct,
COUNT(*) FILTER (WHERE inventory_change_analysis->>'stock_status' = 'out_of_stock') as out_of_stock_events,
COUNT(*) FILTER (WHERE inventory_change_analysis->>'stock_status' = 'low_stock') as low_stock_events,
-- Unique products affected
COUNT(DISTINCT document_key._id) as unique_products_affected,
COUNT(DISTINCT category_info.name) as categories_affected
FROM change_events
GROUP BY hour_bucket
),
change_velocity_analysis AS (
SELECT
hcm.*,
-- Change velocity metrics
total_events / 60.0 as events_per_minute,
unique_products_affected / 60.0 as products_changed_per_minute,
-- Change intensity scoring
CASE
WHEN critical_events > 10 THEN 'very_high_intensity'
WHEN high_priority_events > 50 THEN 'high_intensity'
WHEN total_events > 100 THEN 'moderate_intensity'
ELSE 'normal_intensity'
END as change_intensity,
-- Business activity classification
CASE
WHEN price_changes > total_events * 0.3 THEN 'pricing_focused'
WHEN inventory_changes > total_events * 0.4 THEN 'inventory_focused'
WHEN new_products > total_events * 0.2 THEN 'catalog_expansion'
ELSE 'general_maintenance'
END as activity_pattern,
-- Alert thresholds
CASE
WHEN critical_events > 5 OR out_of_stock_events > 20 THEN 'alert_required'
WHEN high_priority_events > 30 OR events_per_minute > 5 THEN 'monitoring_required'
ELSE 'normal_operations'
END as operational_status
FROM hourly_change_metrics hcm
)
SELECT
TO_CHAR(hour_bucket, 'YYYY-MM-DD HH24:00') as hour,
-- Core metrics
total_events,
ROUND(events_per_minute, 2) as events_per_minute,
unique_products_affected,
categories_affected,
-- Operation breakdown
product_creates,
product_updates,
product_deletes,
-- Change type breakdown
price_changes,
inventory_changes,
-- Priority breakdown
critical_events,
high_priority_events,
medium_priority_events,
low_priority_events,
-- Business insights
change_intensity,
activity_pattern,
operational_status,
-- Impact metrics
ROUND(COALESCE(avg_price_change_pct, 0), 2) as avg_price_change_pct,
out_of_stock_events,
low_stock_events,
-- Health indicators
ROUND((total_events - critical_events)::DECIMAL / total_events * 100, 1) as operational_health_pct,
-- Recommendations
CASE operational_status
WHEN 'alert_required' THEN 'Immediate attention required - high critical event volume'
WHEN 'monitoring_required' THEN 'Increased monitoring recommended'
ELSE 'Normal operations - continue monitoring'
END as recommendation
FROM change_velocity_analysis
ORDER BY hour_bucket DESC;
-- Real-time event routing and notifications
CREATE TRIGGER change_event_router
ON CHANGE_STREAM product_changes
FOR EACH CHANGE_EVENT
EXECUTE FUNCTION (
-- Route critical events immediately
WHEN event_priority = 'critical' THEN
NOTIFY 'critical_alerts' WITH PAYLOAD JSON_BUILD_OBJECT(
'event_id', event_id,
'product_id', document_key._id,
'operation', operation_type,
'priority', event_priority,
'timestamp', cluster_time
),
-- Batch medium/low priority events
WHEN event_priority IN ('medium', 'low') THEN
INSERT INTO event_batch_queue (
event_id, event_priority, event_data, batch_window
) VALUES (
event_id,
event_priority,
JSON_BUILD_OBJECT(
'product_id', document_key._id,
'operation', operation_type,
'change_type', change_type,
'details', full_document
),
DATE_TRUNC('minute', CURRENT_TIMESTAMP, 5) -- 5-minute batching window
),
-- Route to search indexing
WHEN operation_type IN ('insert', 'update') THEN
INSERT INTO search_index_updates (
document_id, collection_name, operation_type,
document_data, priority, created_at
) VALUES (
document_key._id,
'products',
operation_type,
full_document,
CASE WHEN event_priority = 'critical' THEN 'immediate' ELSE 'normal' END,
CURRENT_TIMESTAMP
),
-- Route to analytics pipeline
ALWAYS THEN
INSERT INTO analytics_events (
event_id, event_type, collection_name, document_id,
operation_type, event_data, processing_priority, created_at
) VALUES (
event_id,
'change_stream_event',
'products',
document_key._id,
operation_type,
JSON_BUILD_OBJECT(
'change_type', change_type,
'priority', event_priority,
'business_context', JSON_BUILD_OBJECT(
'category', category_info.name,
'inventory', inventory_info.quantity
)
),
event_priority,
CURRENT_TIMESTAMP
)
);
-- Event sourcing and audit trail queries
CREATE VIEW product_change_audit AS
SELECT
event_id,
document_key._id as product_id,
operation_type,
cluster_time as event_time,
-- Change details
change_type,
-- Document states
full_document as current_state,
full_document_before_change as previous_state,
-- Change delta for updates
CASE
WHEN operation_type = 'update' THEN
JSON_BUILD_OBJECT(
'updated_fields', updateDescription.updatedFields,
'removed_fields', updateDescription.removedFields
)
ELSE NULL
END as change_delta,
-- Business impact
event_priority,
-- Audit metadata
resume_token,
wall_time,
-- Computed audit fields
ROW_NUMBER() OVER (
PARTITION BY document_key._id
ORDER BY cluster_time
) as change_sequence,
LAG(cluster_time) OVER (
PARTITION BY document_key._id
ORDER BY cluster_time
) as previous_change_time,
-- Time between changes
EXTRACT(SECONDS FROM (
cluster_time - LAG(cluster_time) OVER (
PARTITION BY document_key._id
ORDER BY cluster_time
)
)) as seconds_since_last_change
FROM CHANGE_STREAM product_changes
ORDER BY document_key._id, cluster_time;
-- Advanced pattern detection
WITH product_lifecycle_events AS (
SELECT
document_key._id as product_id,
operation_type,
change_type,
cluster_time,
-- Lifecycle stage detection
CASE
WHEN operation_type = 'insert' THEN 'creation'
WHEN operation_type = 'delete' THEN 'deletion'
WHEN change_type = 'price_update' THEN 'pricing_management'
WHEN change_type = 'inventory_update' THEN 'inventory_management'
ELSE 'maintenance'
END as lifecycle_stage,
-- Change frequency analysis
COUNT(*) OVER (
PARTITION BY document_key._id
ORDER BY cluster_time
RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
) as changes_last_hour,
-- Pattern detection
LAG(change_type) OVER (
PARTITION BY document_key._id
ORDER BY cluster_time
) as previous_change_type,
LEAD(change_type) OVER (
PARTITION BY document_key._id
ORDER BY cluster_time
) as next_change_type
FROM CHANGE_STREAM product_changes
WHERE cluster_time >= CURRENT_TIMESTAMP - INTERVAL '7 days'
),
change_patterns AS (
SELECT
product_id,
lifecycle_stage,
change_type,
previous_change_type,
next_change_type,
changes_last_hour,
cluster_time,
-- Pattern identification
CASE
WHEN changes_last_hour > 10 THEN 'high_frequency_changes'
WHEN change_type = 'price_update' AND previous_change_type = 'price_update' THEN 'price_oscillation'
WHEN change_type = 'inventory_update' AND changes_last_hour > 5 THEN 'inventory_volatility'
WHEN lifecycle_stage = 'creation' AND next_change_type = 'price_update' THEN 'immediate_pricing_adjustment'
WHEN lifecycle_stage = 'deletion' AND previous_change_type = 'inventory_update' THEN 'clearance_deletion'
ELSE 'normal_pattern'
END as pattern_type,
-- Anomaly scoring
CASE
WHEN changes_last_hour > 20 THEN 5 -- Very high frequency
WHEN changes_last_hour > 10 THEN 3 -- High frequency
WHEN change_type = previous_change_type AND change_type = next_change_type THEN 2 -- Repetitive changes
ELSE 0
END as anomaly_score
FROM product_lifecycle_events
),
pattern_alerts AS (
SELECT
cp.*,
-- Alert classification
CASE
WHEN anomaly_score >= 5 THEN 'critical_pattern_anomaly'
WHEN anomaly_score >= 3 THEN 'unusual_pattern_detected'
WHEN pattern_type IN ('price_oscillation', 'inventory_volatility') THEN 'business_pattern_concern'
ELSE 'normal_pattern'
END as alert_level,
-- Recommended actions
CASE pattern_type
WHEN 'high_frequency_changes' THEN 'Investigate automated system behavior'
WHEN 'price_oscillation' THEN 'Review pricing strategy and rules'
WHEN 'inventory_volatility' THEN 'Check inventory management system'
WHEN 'clearance_deletion' THEN 'Verify clearance process completion'
ELSE 'Continue monitoring'
END as recommended_action
FROM change_patterns cp
WHERE anomaly_score > 0 OR pattern_type != 'normal_pattern'
)
SELECT
product_id,
lifecycle_stage,
pattern_type,
alert_level,
anomaly_score,
changes_last_hour,
TO_CHAR(cluster_time, 'YYYY-MM-DD HH24:MI:SS') as event_time,
recommended_action,
-- Pattern context
CASE
WHEN alert_level = 'critical_pattern_anomaly' THEN
'CRITICAL: Unusual change frequency detected - immediate investigation required'
WHEN alert_level = 'unusual_pattern_detected' THEN
'WARNING: Pattern anomaly detected - monitoring recommended'
WHEN alert_level = 'business_pattern_concern' THEN
'BUSINESS ALERT: Review business process associated with detected pattern'
ELSE 'INFO: Pattern identified for awareness'
END as alert_description
FROM pattern_alerts
ORDER BY anomaly_score DESC, cluster_time DESC
LIMIT 100;
-- QueryLeaf provides comprehensive change stream capabilities:
-- 1. SQL-familiar change stream creation and monitoring syntax
-- 2. Advanced event filtering and enrichment with business context
-- 3. Real-time event routing and notification triggers
-- 4. Comprehensive change analytics and velocity analysis
-- 5. Pattern detection and anomaly identification
-- 6. Event sourcing and audit trail capabilities
-- 7. Business rule integration and automated responses
-- 8. Cross-collection change correlation and analysis
-- 9. Production-ready error handling and resume capabilities
-- 10. Native integration with MongoDB Change Streams performance optimization
Best Practices for Change Streams Implementation
Event Processing Strategy and Performance Optimization
Essential principles for effective MongoDB Change Streams deployment:
- Resume Token Management: Implement robust resume token persistence and recovery strategies for guaranteed delivery
- Pipeline Optimization: Design change stream pipelines that minimize network traffic and processing overhead
- Error Handling: Implement comprehensive error handling with retry logic and dead letter queue management
- Filtering Strategy: Apply server-side filtering to reduce client processing load and network usage
- Batch Processing: Implement intelligent batching for high-volume event processing scenarios
- Performance Monitoring: Track change stream performance metrics and optimize based on usage patterns
Production Event-Driven Architecture
Optimize Change Streams for enterprise-scale event-driven systems:
- Scalability Design: Plan for horizontal scaling with appropriate sharding and replica set configurations
- Fault Tolerance: Implement automatic failover and recovery mechanisms for change stream processors
- Event Enrichment: Design efficient event enrichment patterns that balance context with performance
- Integration Patterns: Establish clear integration patterns with external systems and message queues
- Security Considerations: Implement proper authentication and authorization for change stream access
- Operational Monitoring: Deploy comprehensive monitoring and alerting for change stream health
Conclusion
MongoDB Change Streams provide comprehensive real-time event-driven capabilities that eliminate the complexity and performance overhead of traditional polling-based change detection. The combination of guaranteed delivery, automatic resume capabilities, and sophisticated filtering makes Change Streams ideal for building responsive, event-driven applications that scale efficiently with growing data volumes.
Key MongoDB Change Streams benefits include:
- Real-Time Notifications: Native change notifications without polling overhead or database performance impact
- Guaranteed Delivery: Automatic resume capability and failure recovery with cluster-wide ordering guarantees
- Advanced Filtering: Server-side aggregation pipelines for targeted event processing and context enrichment
- Production Ready: Built-in error handling, retry logic, and integration with MongoDB's operational model
- Event-Driven Architecture: Native support for reactive patterns, event sourcing, and CQRS implementations
- SQL Accessibility: Familiar SQL-style change stream operations through QueryLeaf for accessible event processing
Whether you're building real-time dashboards, notification systems, data synchronization services, or comprehensive event-driven architectures, MongoDB Change Streams with QueryLeaf's familiar SQL interface provide the foundation for scalable, responsive applications.
QueryLeaf Integration: QueryLeaf seamlessly manages MongoDB Change Streams while providing SQL-familiar syntax for change event monitoring, filtering, and processing. Advanced event-driven patterns including real-time analytics, pattern detection, and automated routing are elegantly handled through familiar SQL constructs, making sophisticated event processing both powerful and accessible to SQL-oriented development teams.
The combination of MongoDB's robust change stream capabilities with SQL-style event operations makes it an ideal platform for applications requiring both real-time responsiveness and familiar database interaction patterns, ensuring your event-driven systems can evolve with changing requirements while maintaining reliable, high-performance operation.