MongoDB Change Data Capture and Real-Time Data Synchronization: Advanced Event-Driven Architecture and Data Pipeline Management
Modern distributed applications require real-time data synchronization capabilities that enable immediate propagation of data changes across multiple systems, microservices, and external platforms without complex polling mechanisms or batch synchronization processes. Traditional change detection approaches rely on timestamp-based polling, database triggers, or application-level change tracking, leading to data inconsistencies, performance bottlenecks, and complex synchronization logic that fails to scale with growing data volumes.
MongoDB Change Data Capture provides comprehensive real-time change detection and streaming capabilities through change streams, enabling applications to react immediately to data modifications, maintain synchronized data across distributed systems, and build event-driven architectures that scale efficiently. Unlike traditional CDC approaches that require complex trigger systems or external change detection tools, MongoDB change streams offer native, scalable, and reliable change tracking with minimal performance impact.
The Traditional Change Detection Challenge
Conventional approaches to change data capture and synchronization have significant limitations for modern distributed architectures:
-- Traditional PostgreSQL change detection - complex and resource-intensive approaches
-- Manual timestamp-based change tracking with performance limitations
CREATE TABLE products (
product_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
product_name VARCHAR(200) NOT NULL,
category VARCHAR(100),
price DECIMAL(10,2) NOT NULL,
stock_quantity INTEGER NOT NULL DEFAULT 0,
supplier_id UUID,
-- Manual change tracking fields (limited granularity)
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
version_number INTEGER NOT NULL DEFAULT 1,
last_modified_by VARCHAR(100),
-- Change type tracking (application-managed)
change_type VARCHAR(20) DEFAULT 'insert',
is_deleted BOOLEAN DEFAULT FALSE,
-- Synchronization status tracking
sync_status VARCHAR(50) DEFAULT 'pending',
last_sync_timestamp TIMESTAMP,
sync_retry_count INTEGER DEFAULT 0,
sync_error TEXT
);
-- Trigger-based change detection with complex maintenance requirements
CREATE TABLE product_changes (
change_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
product_id UUID NOT NULL,
change_type VARCHAR(20) NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
change_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- Before and after data (limited effectiveness for complex documents)
old_data JSONB,
new_data JSONB,
changed_fields TEXT[], -- Manual field tracking
-- Change metadata
changed_by VARCHAR(100),
change_reason VARCHAR(200),
transaction_id BIGINT,
-- Synchronization tracking
processed BOOLEAN DEFAULT FALSE,
processed_at TIMESTAMP,
processing_attempts INTEGER DEFAULT 0,
error_message TEXT
);
-- Complex trigger system for change capture (high maintenance overhead)
CREATE OR REPLACE FUNCTION track_product_changes()
RETURNS TRIGGER AS $$
DECLARE
change_type_value VARCHAR(20);
old_json JSONB;
new_json JSONB;
changed_fields_array TEXT[] := '{}';
field_name TEXT;
BEGIN
-- Determine change type
IF TG_OP = 'INSERT' THEN
change_type_value := 'INSERT';
new_json := to_jsonb(NEW);
old_json := NULL;
ELSIF TG_OP = 'UPDATE' THEN
change_type_value := 'UPDATE';
old_json := to_jsonb(OLD);
new_json := to_jsonb(NEW);
-- Manual field-by-field comparison (extremely limited)
IF OLD.product_name != NEW.product_name THEN
changed_fields_array := array_append(changed_fields_array, 'product_name');
END IF;
IF OLD.category != NEW.category OR (OLD.category IS NULL) != (NEW.category IS NULL) THEN
changed_fields_array := array_append(changed_fields_array, 'category');
END IF;
IF OLD.price != NEW.price THEN
changed_fields_array := array_append(changed_fields_array, 'price');
END IF;
IF OLD.stock_quantity != NEW.stock_quantity THEN
changed_fields_array := array_append(changed_fields_array, 'stock_quantity');
END IF;
-- Limited to predefined fields, no support for dynamic schema
ELSIF TG_OP = 'DELETE' THEN
change_type_value := 'DELETE';
old_json := to_jsonb(OLD);
new_json := NULL;
END IF;
-- Insert change record (potential performance bottleneck)
INSERT INTO product_changes (
product_id,
change_type,
old_data,
new_data,
changed_fields,
changed_by,
transaction_id
) VALUES (
COALESCE(NEW.product_id, OLD.product_id),
change_type_value,
old_json,
new_json,
changed_fields_array,
current_user,
txid_current()
);
-- Update the main record's change tracking
IF TG_OP != 'DELETE' THEN
NEW.updated_at := CURRENT_TIMESTAMP;
NEW.version_number := COALESCE(OLD.version_number, 0) + 1;
NEW.sync_status := 'pending';
RETURN NEW;
ELSE
RETURN OLD;
END IF;
EXCEPTION
WHEN OTHERS THEN
-- Log error but don't fail the main operation
RAISE WARNING 'Change tracking failed: %', SQLERRM;
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
-- Create triggers (must be maintained for every table)
CREATE TRIGGER products_change_trigger
BEFORE INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION track_product_changes();
-- Polling-based synchronization query (inefficient and resource-intensive)
WITH pending_changes AS (
SELECT
pc.change_id,
pc.product_id,
pc.change_type,
pc.change_timestamp,
pc.new_data,
pc.old_data,
pc.changed_fields,
-- Extract change details for synchronization
CASE
WHEN pc.change_type = 'INSERT' THEN pc.new_data
WHEN pc.change_type = 'UPDATE' THEN pc.new_data
WHEN pc.change_type = 'DELETE' THEN pc.old_data
END as sync_data,
-- Priority scoring (limited effectiveness)
CASE
WHEN pc.change_type = 'DELETE' THEN 3
WHEN pc.change_type = 'INSERT' THEN 2
WHEN pc.change_type = 'UPDATE' AND 'price' = ANY(pc.changed_fields) THEN 2
ELSE 1
END as sync_priority,
-- Calculate processing delay (for monitoring)
EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - pc.change_timestamp) as delay_seconds
FROM product_changes pc
WHERE pc.processed = false
AND pc.processing_attempts < 5
AND (pc.change_timestamp + INTERVAL '1 minute' * POWER(2, pc.processing_attempts)) <= CURRENT_TIMESTAMP
ORDER BY sync_priority DESC, change_timestamp ASC
LIMIT 1000 -- Batch processing limitation
),
synchronization_targets AS (
-- Define external systems to sync with (static configuration)
SELECT unnest(ARRAY[
'search_service',
'analytics_warehouse',
'recommendation_engine',
'external_api',
'cache_invalidation'
]) as target_system
)
SELECT
pc.change_id,
pc.product_id,
pc.change_type,
pc.sync_data,
pc.delay_seconds,
st.target_system,
-- Generate synchronization payloads (limited transformation capabilities)
JSON_BUILD_OBJECT(
'change_id', pc.change_id,
'entity_type', 'product',
'entity_id', pc.product_id,
'operation', LOWER(pc.change_type),
'timestamp', pc.change_timestamp,
'data', pc.sync_data,
'changed_fields', pc.changed_fields,
'target_system', st.target_system,
'priority', pc.sync_priority
) as sync_payload,
-- Endpoint configuration (manual maintenance)
CASE st.target_system
WHEN 'search_service' THEN 'http://search-api/products/sync'
WHEN 'analytics_warehouse' THEN 'http://warehouse-api/data/ingest'
WHEN 'recommendation_engine' THEN 'http://recommendations/products/update'
WHEN 'external_api' THEN 'http://external-partner/webhook/products'
WHEN 'cache_invalidation' THEN 'http://cache-service/invalidate'
ELSE 'http://default-sync-service/webhook'
END as target_endpoint
FROM pending_changes pc
CROSS JOIN synchronization_targets st;
-- Update processing status (requires external application logic)
UPDATE product_changes
SET
processing_attempts = processing_attempts + 1,
error_message = CASE
WHEN processing_attempts >= 4 THEN 'Max retry attempts exceeded'
ELSE error_message
END
WHERE change_id IN (SELECT change_id FROM pending_changes);
-- Problems with traditional CDC approaches:
-- 1. Complex trigger maintenance and performance impact on write operations
-- 2. Limited change detection granularity and field-level change tracking
-- 3. Manual synchronization logic with no built-in retry or error handling
-- 4. Polling-based detection causing delays and resource waste
-- 5. No support for transaction-level change grouping or ordering guarantees
-- 6. Difficult schema evolution and maintenance of change tracking infrastructure
-- 7. No built-in filtering or transformation capabilities for change streams
-- 8. Complex error handling and dead letter queue management
-- 9. Limited scalability for high-volume change processing
-- 10. No native support for distributed system synchronization patterns
-- Manual batch synchronization attempt (resource-intensive and delayed)
WITH hourly_changes AS (
SELECT
product_id,
array_agg(
JSON_BUILD_OBJECT(
'change_type', change_type,
'timestamp', change_timestamp,
'data', COALESCE(new_data, old_data)
)
ORDER BY change_timestamp
) as change_history,
MIN(change_timestamp) as first_change,
MAX(change_timestamp) as last_change,
COUNT(*) as change_count
FROM product_changes
WHERE change_timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
AND processed = false
GROUP BY product_id
),
batch_sync_data AS (
SELECT
hc.product_id,
hc.change_history,
hc.change_count,
-- Get current product state (may be inconsistent due to timing)
p.product_name,
p.category,
p.price,
p.stock_quantity,
-- Calculate sync requirements
CASE
WHEN p.product_id IS NULL THEN 'product_deleted'
WHEN hc.change_count > 10 THEN 'full_refresh'
ELSE 'incremental_sync'
END as sync_strategy
FROM hourly_changes hc
LEFT JOIN products p ON hc.product_id = p.product_id
)
SELECT
COUNT(*) as total_products_to_sync,
COUNT(*) FILTER (WHERE sync_strategy = 'full_refresh') as full_refresh_count,
COUNT(*) FILTER (WHERE sync_strategy = 'incremental_sync') as incremental_count,
COUNT(*) FILTER (WHERE sync_strategy = 'product_deleted') as deletion_count,
SUM(change_count) as total_changes,
AVG(change_count) as avg_changes_per_product,
-- Estimate processing time (rough calculation)
CEIL(SUM(change_count) / 100.0) as estimated_processing_minutes
FROM batch_sync_data;
-- Traditional limitations:
-- 1. No real-time change detection - relies on polling with delays
-- 2. Complex trigger and stored procedure maintenance overhead
-- 3. Performance impact on write operations due to change tracking triggers
-- 4. Limited transformation and filtering capabilities for change data
-- 5. Manual error handling and retry logic implementation required
-- 6. No built-in support for distributed synchronization patterns
-- 7. Difficult to scale change processing for high-volume systems
-- 8. Schema evolution breaks change tracking infrastructure
-- 9. No transaction-level change ordering or consistency guarantees
-- 10. Complex debugging and monitoring of change propagation failures
MongoDB provides sophisticated Change Data Capture capabilities with advanced streaming and synchronization:
// MongoDB Advanced Change Data Capture and Real-Time Synchronization System
const { MongoClient, ChangeStream } = require('mongodb');
const { EventEmitter } = require('events');
const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
const db = client.db('realtime_cdc_system');
// Comprehensive MongoDB Change Data Capture Manager
class AdvancedCDCManager extends EventEmitter {
constructor(db, config = {}) {
super();
this.db = db;
this.collections = {
products: db.collection('products'),
orders: db.collection('orders'),
customers: db.collection('customers'),
inventory: db.collection('inventory'),
cdcConfiguration: db.collection('cdc_configuration'),
changeLog: db.collection('change_log'),
syncStatus: db.collection('sync_status'),
errorLog: db.collection('error_log')
};
// Advanced CDC configuration
this.config = {
enableResumeTokens: config.enableResumeTokens !== false,
batchSize: config.batchSize || 100,
maxAwaitTimeMS: config.maxAwaitTimeMS || 1000,
fullDocument: config.fullDocument || 'updateLookup',
fullDocumentBeforeChange: config.fullDocumentBeforeChange || 'whenAvailable',
// Change stream filtering
enableFiltering: config.enableFiltering !== false,
filterCriteria: config.filterCriteria || {},
includeNamespaces: config.includeNamespaces || [],
excludeNamespaces: config.excludeNamespaces || [],
// Synchronization targets
syncTargets: config.syncTargets || [
{ name: 'search_service', enabled: true, priority: 1 },
{ name: 'analytics_warehouse', enabled: true, priority: 2 },
{ name: 'recommendation_engine', enabled: true, priority: 3 },
{ name: 'cache_invalidation', enabled: true, priority: 1 }
],
// Error handling and retry
enableRetryLogic: config.enableRetryLogic !== false,
maxRetries: config.maxRetries || 5,
retryDelayBase: config.retryDelayBase || 1000,
deadLetterQueue: config.enableDeadLetterQueue !== false,
// Performance optimization
enableBatchProcessing: config.enableBatchProcessing !== false,
enableParallelSync: config.enableParallelSync !== false,
maxConcurrentSyncs: config.maxConcurrentSyncs || 10,
// Monitoring and metrics
enableMetrics: config.enableMetrics !== false,
metricsInterval: config.metricsInterval || 60000,
enableHealthChecks: config.enableHealthChecks !== false
};
// CDC state management
this.changeStreams = new Map();
this.resumeTokens = new Map();
this.syncQueues = new Map();
this.processingStats = {
totalChanges: 0,
successfulSyncs: 0,
failedSyncs: 0,
avgProcessingTime: 0,
lastProcessedTimestamp: null
};
// Initialize CDC system
this.initializeCDCSystem();
}
async initializeCDCSystem() {
console.log('Initializing comprehensive MongoDB Change Data Capture system...');
try {
// Setup change stream configurations
await this.setupChangeStreamConfiguration();
// Initialize synchronization targets
await this.initializeSyncTargets();
// Setup error handling and monitoring
await this.setupErrorHandlingAndMonitoring();
// Start change streams for configured collections
await this.startChangeStreams();
// Initialize metrics collection
if (this.config.enableMetrics) {
await this.startMetricsCollection();
}
// Setup health monitoring
if (this.config.enableHealthChecks) {
await this.setupHealthMonitoring();
}
console.log('Change Data Capture system initialized successfully');
} catch (error) {
console.error('Error initializing CDC system:', error);
throw error;
}
}
async setupChangeStreamConfiguration() {
console.log('Setting up change stream configuration...');
try {
// Define collections to monitor with specific configurations
const monitoringConfig = [
{
collection: 'products',
pipeline: [
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete', 'replace'] },
$or: [
{ 'updateDescription.updatedFields.price': { $exists: true } },
{ 'updateDescription.updatedFields.stock_quantity': { $exists: true } },
{ 'updateDescription.updatedFields.status': { $exists: true } },
{ 'operationType': { $in: ['insert', 'delete'] } }
]
}
}
],
options: {
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
},
syncTargets: ['search_service', 'analytics_warehouse', 'cache_invalidation'],
transformations: ['priceCalculation', 'stockValidation', 'searchIndexing']
},
{
collection: 'orders',
pipeline: [
{
$match: {
'operationType': { $in: ['insert', 'update'] },
$or: [
{ 'operationType': 'insert' },
{ 'updateDescription.updatedFields.status': { $exists: true } },
{ 'updateDescription.updatedFields.payment_status': { $exists: true } }
]
}
}
],
options: {
fullDocument: 'updateLookup'
},
syncTargets: ['analytics_warehouse', 'recommendation_engine'],
transformations: ['orderAnalytics', 'customerInsights', 'inventoryImpact']
},
{
collection: 'customers',
pipeline: [
{
$match: {
'operationType': { $in: ['insert', 'update'] },
$or: [
{ 'operationType': 'insert' },
{ 'updateDescription.updatedFields.preferences': { $exists: true } },
{ 'updateDescription.updatedFields.profile': { $exists: true } }
]
}
}
],
options: {
fullDocument: 'updateLookup'
},
syncTargets: ['recommendation_engine', 'analytics_warehouse'],
transformations: ['profileEnrichment', 'preferencesAnalysis', 'segmentation']
}
];
// Store configuration for runtime access
await this.collections.cdcConfiguration.deleteMany({});
await this.collections.cdcConfiguration.insertMany(
monitoringConfig.map(config => ({
...config,
enabled: true,
createdAt: new Date(),
lastResumeToken: null
}))
);
this.monitoringConfig = monitoringConfig;
} catch (error) {
console.error('Error setting up change stream configuration:', error);
throw error;
}
}
async startChangeStreams() {
console.log('Starting change streams for all configured collections...');
try {
for (const config of this.monitoringConfig) {
await this.startCollectionChangeStream(config);
}
console.log(`Started ${this.changeStreams.size} change streams successfully`);
} catch (error) {
console.error('Error starting change streams:', error);
throw error;
}
}
async startCollectionChangeStream(config) {
console.log(`Starting change stream for collection: ${config.collection}`);
try {
const collection = this.collections[config.collection];
if (!collection) {
throw new Error(`Collection ${config.collection} not found`);
}
// Retrieve resume token if available
const savedConfig = await this.collections.cdcConfiguration.findOne({
collection: config.collection
});
const changeStreamOptions = {
...config.options,
batchSize: this.config.batchSize,
maxAwaitTimeMS: this.config.maxAwaitTimeMS
};
// Add resume token if available and enabled
if (this.config.enableResumeTokens && savedConfig?.lastResumeToken) {
changeStreamOptions.resumeAfter = savedConfig.lastResumeToken;
console.log(`Resuming change stream for ${config.collection} from saved token`);
}
// Create change stream with pipeline
const changeStream = collection.watch(config.pipeline || [], changeStreamOptions);
// Store change stream reference
this.changeStreams.set(config.collection, {
stream: changeStream,
config: config,
startTime: new Date(),
processedCount: 0,
errorCount: 0
});
// Setup change event handling
changeStream.on('change', async (changeDoc) => {
await this.handleChangeEvent(changeDoc, config);
});
changeStream.on('error', async (error) => {
console.error(`Change stream error for ${config.collection}:`, error);
await this.handleChangeStreamError(config.collection, error);
});
changeStream.on('close', () => {
console.warn(`Change stream closed for ${config.collection}`);
this.emit('changeStreamClosed', config.collection);
});
changeStream.on('end', () => {
console.warn(`Change stream ended for ${config.collection}`);
this.emit('changeStreamEnded', config.collection);
});
// Store resume token periodically
if (this.config.enableResumeTokens) {
setInterval(async () => {
try {
const resumeToken = changeStream.resumeToken;
if (resumeToken) {
await this.saveResumeToken(config.collection, resumeToken);
}
} catch (error) {
console.warn(`Error saving resume token for ${config.collection}:`, error.message);
}
}, 30000); // Save every 30 seconds
}
} catch (error) {
console.error(`Error starting change stream for ${config.collection}:`, error);
throw error;
}
}
async handleChangeEvent(changeDoc, config) {
const startTime = Date.now();
try {
// Update processing statistics
this.processingStats.totalChanges++;
this.processingStats.lastProcessedTimestamp = new Date();
// Update collection-specific statistics
const streamInfo = this.changeStreams.get(config.collection);
if (streamInfo) {
streamInfo.processedCount++;
}
console.log(`Processing change event for ${config.collection}:`, {
operationType: changeDoc.operationType,
documentKey: changeDoc.documentKey,
timestamp: changeDoc.clusterTime
});
// Apply transformations if configured
const transformedChangeDoc = await this.applyTransformations(changeDoc, config);
// Log change event for audit trail
await this.logChangeEvent(transformedChangeDoc, config);
// Process synchronization to configured targets
const syncPromises = config.syncTargets.map(targetName =>
this.synchronizeToTarget(transformedChangeDoc, config, targetName)
);
if (this.config.enableParallelSync) {
// Execute synchronizations in parallel
const syncResults = await Promise.allSettled(syncPromises);
await this.processSyncResults(syncResults, transformedChangeDoc, config);
} else {
// Execute synchronizations sequentially
for (const syncPromise of syncPromises) {
try {
await syncPromise;
this.processingStats.successfulSyncs++;
} catch (error) {
this.processingStats.failedSyncs++;
console.error('Sequential sync error:', error);
await this.handleSyncError(error, transformedChangeDoc, config);
}
}
}
// Update processing time metrics
const processingTime = Date.now() - startTime;
this.updateProcessingMetrics(processingTime);
// Emit processed event for external monitoring
this.emit('changeProcessed', {
collection: config.collection,
operationType: changeDoc.operationType,
documentKey: changeDoc.documentKey,
processingTime: processingTime,
syncTargets: config.syncTargets
});
} catch (error) {
console.error('Error handling change event:', error);
// Update error statistics
const streamInfo = this.changeStreams.get(config.collection);
if (streamInfo) {
streamInfo.errorCount++;
}
// Log error for debugging
await this.logError(error, changeDoc, config);
// Emit error event
this.emit('changeProcessingError', {
error: error,
changeDoc: changeDoc,
config: config
});
}
}
async applyTransformations(changeDoc, config) {
if (!config.transformations || config.transformations.length === 0) {
return changeDoc;
}
console.log(`Applying ${config.transformations.length} transformations...`);
let transformedDoc = { ...changeDoc };
try {
for (const transformationName of config.transformations) {
transformedDoc = await this.applyTransformation(transformedDoc, transformationName, config);
}
return transformedDoc;
} catch (error) {
console.error('Error applying transformations:', error);
// Return original document if transformation fails
return changeDoc;
}
}
async applyTransformation(changeDoc, transformationName, config) {
switch (transformationName) {
case 'priceCalculation':
return await this.transformPriceCalculation(changeDoc);
case 'stockValidation':
return await this.transformStockValidation(changeDoc);
case 'searchIndexing':
return await this.transformSearchIndexing(changeDoc);
case 'orderAnalytics':
return await this.transformOrderAnalytics(changeDoc);
case 'customerInsights':
return await this.transformCustomerInsights(changeDoc);
case 'inventoryImpact':
return await this.transformInventoryImpact(changeDoc);
case 'profileEnrichment':
return await this.transformProfileEnrichment(changeDoc);
case 'preferencesAnalysis':
return await this.transformPreferencesAnalysis(changeDoc);
case 'segmentation':
return await this.transformSegmentation(changeDoc);
default:
console.warn(`Unknown transformation: ${transformationName}`);
return changeDoc;
}
}
async transformPriceCalculation(changeDoc) {
if (changeDoc.operationType === 'update' &&
changeDoc.updateDescription?.updatedFields?.price) {
const newPrice = changeDoc.fullDocument?.price;
const oldPrice = changeDoc.fullDocumentBeforeChange?.price;
if (newPrice && oldPrice) {
const priceChange = newPrice - oldPrice;
const priceChangePercent = ((priceChange / oldPrice) * 100);
changeDoc.enrichment = {
...changeDoc.enrichment,
priceAnalysis: {
oldPrice: oldPrice,
newPrice: newPrice,
priceChange: priceChange,
priceChangePercent: Math.round(priceChangePercent * 100) / 100,
priceDirection: priceChange > 0 ? 'increase' : 'decrease',
significantChange: Math.abs(priceChangePercent) > 10
}
};
}
}
return changeDoc;
}
async transformStockValidation(changeDoc) {
if (changeDoc.fullDocument?.stock_quantity !== undefined) {
const stockQuantity = changeDoc.fullDocument.stock_quantity;
changeDoc.enrichment = {
...changeDoc.enrichment,
stockAnalysis: {
currentStock: stockQuantity,
stockStatus: stockQuantity === 0 ? 'out_of_stock' :
stockQuantity < 10 ? 'low_stock' : 'in_stock',
restockNeeded: stockQuantity < 10,
stockChangeAlert: changeDoc.operationType === 'update' &&
changeDoc.updateDescription?.updatedFields?.stock_quantity !== undefined
}
};
}
return changeDoc;
}
async transformSearchIndexing(changeDoc) {
if (changeDoc.fullDocument) {
const doc = changeDoc.fullDocument;
// Generate search keywords and metadata
const searchKeywords = [];
if (doc.product_name) searchKeywords.push(...doc.product_name.toLowerCase().split(/\s+/));
if (doc.category) searchKeywords.push(...doc.category.toLowerCase().split(/\s+/));
if (doc.tags) searchKeywords.push(...doc.tags.map(tag => tag.toLowerCase()));
changeDoc.enrichment = {
...changeDoc.enrichment,
searchMetadata: {
searchKeywords: [...new Set(searchKeywords)].filter(word => word.length > 2),
searchableFields: ['product_name', 'category', 'description', 'tags'],
indexPriority: doc.featured ? 'high' : 'normal',
lastIndexUpdate: new Date()
}
};
}
return changeDoc;
}
async transformOrderAnalytics(changeDoc) {
if (changeDoc.fullDocument && changeDoc.ns.coll === 'orders') {
const order = changeDoc.fullDocument;
// Calculate order metrics
const orderValue = order.items?.reduce((sum, item) => sum + (item.price * item.quantity), 0) || 0;
const itemCount = order.items?.reduce((sum, item) => sum + item.quantity, 0) || 0;
changeDoc.enrichment = {
...changeDoc.enrichment,
orderAnalytics: {
orderValue: orderValue,
itemCount: itemCount,
averageItemValue: itemCount > 0 ? orderValue / itemCount : 0,
customerSegment: orderValue > 500 ? 'high_value' : orderValue > 100 ? 'medium_value' : 'low_value',
orderComplexity: itemCount > 5 ? 'complex' : 'simple'
}
};
}
return changeDoc;
}
async synchronizeToTarget(changeDoc, config, targetName) {
console.log(`Synchronizing to target: ${targetName}`);
try {
// Find target configuration
const targetConfig = this.config.syncTargets.find(t => t.name === targetName);
if (!targetConfig || !targetConfig.enabled) {
console.log(`Target ${targetName} is disabled, skipping sync`);
return;
}
// Prepare synchronization payload
const syncPayload = await this.prepareSyncPayload(changeDoc, config, targetName);
// Execute synchronization based on target type
const syncResult = await this.executeSynchronization(syncPayload, targetConfig);
// Log successful synchronization
await this.logSuccessfulSync(changeDoc, targetName, syncResult);
return syncResult;
} catch (error) {
console.error(`Synchronization failed for target ${targetName}:`, error);
// Handle sync error with retry logic
await this.handleSyncError(error, changeDoc, config, targetName);
throw error;
}
}
async prepareSyncPayload(changeDoc, config, targetName) {
const basePayload = {
changeId: changeDoc._id?.toString() || `${Date.now()}-${Math.random()}`,
timestamp: new Date(),
source: {
database: changeDoc.ns.db,
collection: changeDoc.ns.coll,
operationType: changeDoc.operationType
},
documentKey: changeDoc.documentKey,
clusterTime: changeDoc.clusterTime,
enrichment: changeDoc.enrichment || {}
};
// Add operation-specific data
switch (changeDoc.operationType) {
case 'insert':
basePayload.document = changeDoc.fullDocument;
break;
case 'update':
basePayload.document = changeDoc.fullDocument;
basePayload.updateDescription = changeDoc.updateDescription;
if (changeDoc.fullDocumentBeforeChange) {
basePayload.documentBeforeChange = changeDoc.fullDocumentBeforeChange;
}
break;
case 'replace':
basePayload.document = changeDoc.fullDocument;
if (changeDoc.fullDocumentBeforeChange) {
basePayload.documentBeforeChange = changeDoc.fullDocumentBeforeChange;
}
break;
case 'delete':
if (changeDoc.fullDocumentBeforeChange) {
basePayload.deletedDocument = changeDoc.fullDocumentBeforeChange;
}
break;
}
// Apply target-specific transformations
return await this.applyTargetSpecificTransformations(basePayload, targetName);
}
async applyTargetSpecificTransformations(payload, targetName) {
switch (targetName) {
case 'search_service':
return this.transformForSearchService(payload);
case 'analytics_warehouse':
return this.transformForAnalyticsWarehouse(payload);
case 'recommendation_engine':
return this.transformForRecommendationEngine(payload);
case 'cache_invalidation':
return this.transformForCacheInvalidation(payload);
default:
return payload;
}
}
transformForSearchService(payload) {
if (payload.source.collection === 'products') {
return {
...payload,
searchServiceData: {
action: payload.source.operationType === 'delete' ? 'delete' : 'upsert',
document: payload.document ? {
id: payload.document._id,
title: payload.document.product_name,
content: payload.document.description,
category: payload.document.category,
price: payload.document.price,
inStock: payload.document.stock_quantity > 0,
keywords: payload.enrichment?.searchMetadata?.searchKeywords || [],
lastUpdated: payload.timestamp
} : null,
priority: payload.enrichment?.searchMetadata?.indexPriority || 'normal'
}
};
}
return payload;
}
transformForAnalyticsWarehouse(payload) {
return {
...payload,
warehouseData: {
entityType: payload.source.collection,
eventType: `${payload.source.collection}_${payload.source.operationType}`,
eventTimestamp: payload.timestamp,
entityId: payload.documentKey._id,
eventData: payload.document || payload.deletedDocument,
changeMetadata: {
updatedFields: payload.updateDescription?.updatedFields ? Object.keys(payload.updateDescription.updatedFields) : null,
removedFields: payload.updateDescription?.removedFields || null
},
enrichment: payload.enrichment
}
};
}
transformForRecommendationEngine(payload) {
if (payload.source.collection === 'orders') {
return {
...payload,
recommendationData: {
userId: payload.document?.customer_id,
items: payload.document?.items || [],
orderValue: payload.enrichment?.orderAnalytics?.orderValue,
customerSegment: payload.enrichment?.orderAnalytics?.customerSegment,
eventType: 'purchase',
timestamp: payload.timestamp
}
};
}
return payload;
}
transformForCacheInvalidation(payload) {
const cacheKeys = [];
// Generate cache keys based on the changed document
if (payload.source.collection === 'products' && payload.documentKey._id) {
cacheKeys.push(
`product:${payload.documentKey._id}`,
`products:category:${payload.document?.category}`,
`products:search:*` // Wildcard for search result caches
);
}
return {
...payload,
cacheInvalidationData: {
keys: cacheKeys,
operation: 'invalidate',
cascade: true,
reason: `${payload.source.collection}_${payload.source.operationType}`
}
};
}
async executeSynchronization(syncPayload, targetConfig) {
// Simulate different synchronization mechanisms
switch (targetConfig.name) {
case 'search_service':
return await this.syncToSearchService(syncPayload);
case 'analytics_warehouse':
return await this.syncToAnalyticsWarehouse(syncPayload);
case 'recommendation_engine':
return await this.syncToRecommendationEngine(syncPayload);
case 'cache_invalidation':
return await this.syncToCacheService(syncPayload);
default:
throw new Error(`Unknown sync target: ${targetConfig.name}`);
}
}
async syncToSearchService(payload) {
// Simulate search service synchronization
console.log('Syncing to search service:', payload.searchServiceData?.action);
// Simulate API call delay
await new Promise(resolve => setTimeout(resolve, 50));
return {
success: true,
target: 'search_service',
action: payload.searchServiceData?.action,
processedAt: new Date(),
responseTime: 50
};
}
async syncToAnalyticsWarehouse(payload) {
// Simulate analytics warehouse synchronization
console.log('Syncing to analytics warehouse:', payload.warehouseData?.eventType);
// Simulate processing delay
await new Promise(resolve => setTimeout(resolve, 100));
return {
success: true,
target: 'analytics_warehouse',
eventType: payload.warehouseData?.eventType,
processedAt: new Date(),
responseTime: 100
};
}
async syncToRecommendationEngine(payload) {
// Simulate recommendation engine synchronization
console.log('Syncing to recommendation engine');
await new Promise(resolve => setTimeout(resolve, 75));
return {
success: true,
target: 'recommendation_engine',
processedAt: new Date(),
responseTime: 75
};
}
async syncToCacheService(payload) {
// Simulate cache invalidation
console.log('Invalidating cache keys:', payload.cacheInvalidationData?.keys);
await new Promise(resolve => setTimeout(resolve, 25));
return {
success: true,
target: 'cache_invalidation',
keysInvalidated: payload.cacheInvalidationData?.keys?.length || 0,
processedAt: new Date(),
responseTime: 25
};
}
async handleSyncError(error, changeDoc, config, targetName) {
console.error(`Sync error for ${targetName}:`, error.message);
// Log error for debugging
await this.collections.errorLog.insertOne({
errorType: 'sync_error',
targetName: targetName,
collection: config.collection,
changeId: changeDoc._id?.toString(),
documentKey: changeDoc.documentKey,
error: {
message: error.message,
stack: error.stack,
name: error.name
},
timestamp: new Date(),
retryable: this.isRetryableError(error)
});
// Implement retry logic if enabled
if (this.config.enableRetryLogic && this.isRetryableError(error)) {
await this.scheduleRetry(changeDoc, config, targetName);
} else if (this.config.deadLetterQueue) {
await this.sendToDeadLetterQueue(changeDoc, config, targetName, error);
}
}
isRetryableError(error) {
// Define which errors should trigger retries
const retryableErrorTypes = [
'ECONNREFUSED',
'ECONNRESET',
'ETIMEDOUT',
'EAI_AGAIN',
'ENOTFOUND'
];
return retryableErrorTypes.includes(error.code) ||
error.message?.includes('timeout') ||
error.message?.includes('connection') ||
(error.status >= 500 && error.status < 600);
}
async scheduleRetry(changeDoc, config, targetName) {
// Implement exponential backoff retry
const retryKey = `${config.collection}_${changeDoc.documentKey._id}_${targetName}`;
// Get current retry count
let retryCount = await this.getRetryCount(retryKey);
if (retryCount < this.config.maxRetries) {
const delayMs = this.config.retryDelayBase * Math.pow(2, retryCount);
console.log(`Scheduling retry ${retryCount + 1} for ${targetName} in ${delayMs}ms`);
setTimeout(async () => {
try {
await this.synchronizeToTarget(changeDoc, config, targetName);
await this.clearRetryCount(retryKey);
} catch (retryError) {
await this.incrementRetryCount(retryKey);
await this.handleSyncError(retryError, changeDoc, config, targetName);
}
}, delayMs);
} else {
console.error(`Max retries exceeded for ${targetName}, sending to dead letter queue`);
await this.sendToDeadLetterQueue(changeDoc, config, targetName, new Error('Max retries exceeded'));
}
}
async logChangeEvent(changeDoc, config) {
try {
await this.collections.changeLog.insertOne({
changeId: changeDoc._id?.toString() || `${Date.now()}-${Math.random()}`,
collection: config.collection,
operationType: changeDoc.operationType,
documentKey: changeDoc.documentKey,
clusterTime: changeDoc.clusterTime,
hasFullDocument: !!changeDoc.fullDocument,
hasFullDocumentBeforeChange: !!changeDoc.fullDocumentBeforeChange,
updateDescription: changeDoc.updateDescription,
enrichment: changeDoc.enrichment,
syncTargets: config.syncTargets,
timestamp: new Date()
});
} catch (error) {
console.warn('Error logging change event:', error.message);
}
}
async saveResumeToken(collection, resumeToken) {
try {
await this.collections.cdcConfiguration.updateOne(
{ collection: collection },
{
$set: {
lastResumeToken: resumeToken,
lastResumeTokenUpdate: new Date()
}
}
);
this.resumeTokens.set(collection, resumeToken);
} catch (error) {
console.warn(`Error saving resume token for ${collection}:`, error.message);
}
}
updateProcessingMetrics(processingTime) {
const currentAvg = this.processingStats.avgProcessingTime;
const totalProcessed = this.processingStats.totalChanges;
this.processingStats.avgProcessingTime =
((currentAvg * (totalProcessed - 1)) + processingTime) / totalProcessed;
}
async generateCDCHealthReport() {
console.log('Generating CDC system health report...');
try {
const healthReport = {
timestamp: new Date(),
systemStatus: 'healthy',
// Change stream status
changeStreams: Array.from(this.changeStreams.entries()).map(([collection, info]) => ({
collection: collection,
status: info.stream.closed ? 'closed' : 'active',
processedCount: info.processedCount,
errorCount: info.errorCount,
startTime: info.startTime,
uptime: Date.now() - info.startTime.getTime()
})),
// Processing statistics
processingStats: {
...this.processingStats,
successRate: this.processingStats.totalChanges > 0 ?
(this.processingStats.successfulSyncs / this.processingStats.totalChanges * 100).toFixed(2) : 0,
errorRate: this.processingStats.totalChanges > 0 ?
(this.processingStats.failedSyncs / this.processingStats.totalChanges * 100).toFixed(2) : 0
},
// Sync target health
syncTargetHealth: await this.checkSyncTargetHealth(),
// Recent errors
recentErrors: await this.collections.errorLog.find({
timestamp: { $gte: new Date(Date.now() - 3600000) } // Last hour
}).limit(10).toArray(),
// Resume token status
resumeTokenStatus: Array.from(this.resumeTokens.entries()).map(([collection, token]) => ({
collection: collection,
hasResumeToken: !!token,
tokenAge: new Date() // Would calculate actual age in production
}))
};
return healthReport;
} catch (error) {
console.error('Error generating health report:', error);
return {
timestamp: new Date(),
systemStatus: 'error',
error: error.message
};
}
}
async checkSyncTargetHealth() {
const healthChecks = [];
for (const target of this.config.syncTargets) {
try {
// Simulate health check for each target
const healthCheck = {
name: target.name,
status: target.enabled ? 'healthy' : 'disabled',
priority: target.priority,
lastHealthCheck: new Date(),
responseTime: Math.random() * 100 + 50 // Simulated response time
};
healthChecks.push(healthCheck);
} catch (error) {
healthChecks.push({
name: target.name,
status: 'unhealthy',
error: error.message,
lastHealthCheck: new Date()
});
}
}
return healthChecks;
}
// Utility methods for retry management
async getRetryCount(retryKey) {
const status = await this.collections.syncStatus.findOne({ retryKey: retryKey });
return status ? status.retryCount : 0;
}
async incrementRetryCount(retryKey) {
await this.collections.syncStatus.updateOne(
{ retryKey: retryKey },
{
$inc: { retryCount: 1 },
$set: { lastRetryAttempt: new Date() }
},
{ upsert: true }
);
}
async clearRetryCount(retryKey) {
await this.collections.syncStatus.deleteOne({ retryKey: retryKey });
}
async sendToDeadLetterQueue(changeDoc, config, targetName, error) {
console.log(`Sending to dead letter queue: ${config.collection} -> ${targetName}`);
await this.collections.errorLog.insertOne({
errorType: 'dead_letter_queue',
collection: config.collection,
targetName: targetName,
changeDoc: changeDoc,
config: config,
error: {
message: error.message,
stack: error.stack
},
timestamp: new Date(),
requiresManualIntervention: true
});
}
}
// Benefits of MongoDB Advanced Change Data Capture:
// - Real-time change detection with minimal latency and no polling overhead
// - Comprehensive change filtering and transformation capabilities
// - Built-in resume token support for fault-tolerant change stream processing
// - Advanced error handling with retry logic and dead letter queue management
// - Parallel synchronization to multiple targets with configurable priorities
// - Transaction-aware change ordering and consistency guarantees
// - Native MongoDB integration with minimal performance impact
// - Scalable architecture supporting high-volume change processing
// - Flexible transformation pipeline for data enrichment and formatting
// - SQL-compatible CDC operations through QueryLeaf integration
module.exports = {
AdvancedCDCManager
};
Understanding MongoDB Change Data Capture Architecture
Advanced Change Stream and Synchronization Patterns
Implement sophisticated CDC patterns for production MongoDB deployments:
// Production-ready MongoDB CDC with enterprise-grade features
class EnterpriseCDCOrchestrator extends AdvancedCDCManager {
constructor(db, enterpriseConfig) {
super(db, enterpriseConfig);
this.enterpriseConfig = {
...enterpriseConfig,
enableDistributedProcessing: true,
enableLoadBalancing: true,
enableFailoverHandling: true,
enableComplianceAuditing: true,
enableDataLineage: true,
enableSchemaEvolution: true
};
this.setupEnterpriseFeatures();
this.initializeDistributedCDC();
this.setupComplianceFramework();
}
async implementDistributedCDCProcessing() {
console.log('Implementing distributed CDC processing across multiple nodes...');
const distributedConfig = {
// Multi-node change stream distribution
nodeDistribution: {
enableShardAwareness: true,
balanceAcrossNodes: true,
minimizeCrossShardOperations: true,
optimizeForReplicaSetTopology: true
},
// Load balancing strategies
loadBalancing: {
dynamicWorkloadDistribution: true,
nodeCapacityAware: true,
latencyOptimized: true,
failoverCapable: true
},
// Consistency guarantees
consistencyManagement: {
maintainChangeOrdering: true,
transactionBoundaryRespect: true,
causalConsistencyPreservation: true,
replicationLagHandling: true
}
};
return await this.deployDistributedCDC(distributedConfig);
}
async setupEnterpriseComplianceFramework() {
console.log('Setting up enterprise compliance framework for CDC...');
const complianceFramework = {
// Data governance
dataGovernance: {
changeDataClassification: true,
sensitiveDataDetection: true,
accessControlEnforcement: true,
dataRetentionPolicies: true
},
// Audit requirements
auditCompliance: {
comprehensiveChangeLogging: true,
tamperEvidenceCapture: true,
regulatoryReportingSupport: true,
retentionPolicyEnforcement: true
},
// Security controls
securityCompliance: {
encryptionInTransit: true,
encryptionAtRest: true,
accessControlValidation: true,
nonRepudiationSupport: true
}
};
return await this.implementComplianceFramework(complianceFramework);
}
}
SQL-Style Change Data Capture with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB Change Data Capture and real-time synchronization operations:
-- QueryLeaf advanced change data capture with SQL-familiar syntax for MongoDB
-- Configure comprehensive change data capture with advanced filtering and routing
CONFIGURE CHANGE_DATA_CAPTURE
SET enabled = true,
resume_tokens = true,
batch_size = 100,
max_await_time_ms = 1000,
full_document = 'updateLookup',
full_document_before_change = 'whenAvailable';
-- Setup change stream monitoring with sophisticated filtering and transformation
CREATE CHANGE_STREAM product_changes_stream AS
WITH change_filtering AS (
-- Advanced change detection filters
SELECT
change_id,
operation_type,
collection_name,
document_key,
cluster_time,
full_document,
full_document_before_change,
update_description,
-- Intelligent change classification
CASE
WHEN operation_type = 'insert' THEN 'new_product'
WHEN operation_type = 'delete' THEN 'product_removal'
WHEN operation_type = 'update' AND
JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL THEN 'price_change'
WHEN operation_type = 'update' AND
JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL THEN 'inventory_change'
WHEN operation_type = 'update' AND
JSON_EXTRACT(update_description, '$.updatedFields.status') IS NOT NULL THEN 'status_change'
ELSE 'general_update'
END as change_category,
-- Priority scoring for processing order
CASE
WHEN operation_type = 'delete' THEN 5
WHEN operation_type = 'insert' THEN 4
WHEN JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL THEN 4
WHEN JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL THEN 3
WHEN JSON_EXTRACT(update_description, '$.updatedFields.status') IS NOT NULL THEN 3
ELSE 1
END as processing_priority,
-- Impact assessment
CASE
WHEN operation_type IN ('insert', 'delete') THEN 'high'
WHEN JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL THEN
CASE
WHEN ABS(
CAST(JSON_EXTRACT(full_document, '$.price') AS DECIMAL(10,2)) -
CAST(JSON_EXTRACT(full_document_before_change, '$.price') AS DECIMAL(10,2))
) > 50 THEN 'high'
WHEN ABS(
CAST(JSON_EXTRACT(full_document, '$.price') AS DECIMAL(10,2)) -
CAST(JSON_EXTRACT(full_document_before_change, '$.price') AS DECIMAL(10,2))
) > 10 THEN 'medium'
ELSE 'low'
END
WHEN JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL THEN
CASE
WHEN CAST(JSON_EXTRACT(full_document, '$.stock_quantity') AS INTEGER) = 0 THEN 'high'
WHEN CAST(JSON_EXTRACT(full_document, '$.stock_quantity') AS INTEGER) < 10 THEN 'medium'
ELSE 'low'
END
ELSE 'low'
END as business_impact,
-- Synchronization target determination
ARRAY[
CASE WHEN change_category IN ('new_product', 'product_removal', 'price_change')
THEN 'search_service' ELSE NULL END,
CASE WHEN change_category IN ('new_product', 'price_change', 'inventory_change')
THEN 'analytics_warehouse' ELSE NULL END,
CASE WHEN change_category IN ('new_product', 'price_change', 'inventory_change', 'status_change')
THEN 'cache_invalidation' ELSE NULL END,
CASE WHEN change_category IN ('new_product', 'price_change')
THEN 'recommendation_engine' ELSE NULL END
]::TEXT[] as sync_targets,
-- Enhanced change metadata
JSON_OBJECT(
'change_detected_at', CURRENT_TIMESTAMP,
'source_replica_set', CONNECTION_INFO('replica_set'),
'source_node', CONNECTION_INFO('host'),
'change_stream_id', CHANGE_STREAM_INFO('stream_id'),
'resume_token', CHANGE_STREAM_INFO('resume_token')
) as change_metadata,
CURRENT_TIMESTAMP as processed_at
FROM CHANGE_STREAM('products')
WHERE
-- Filter criteria for relevant changes
operation_type IN ('insert', 'update', 'delete')
AND (
operation_type IN ('insert', 'delete') OR
update_description IS NOT NULL AND (
JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL OR
JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL OR
JSON_EXTRACT(update_description, '$.updatedFields.status') IS NOT NULL OR
JSON_EXTRACT(update_description, '$.updatedFields.category') IS NOT NULL
)
)
),
change_enrichment AS (
SELECT
cf.*,
-- Price change analysis
CASE
WHEN cf.change_category = 'price_change' THEN
JSON_OBJECT(
'old_price', CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2)),
'new_price', CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)),
'price_change',
CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)) -
CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2)),
'price_change_percent',
ROUND(
((CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)) -
CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2))) /
CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2))) * 100,
2
),
'price_direction',
CASE
WHEN CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)) >
CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2))
THEN 'increase' ELSE 'decrease'
END
)
ELSE NULL
END as price_analysis,
-- Inventory change analysis
CASE
WHEN cf.change_category = 'inventory_change' THEN
JSON_OBJECT(
'old_stock', CAST(JSON_EXTRACT(cf.full_document_before_change, '$.stock_quantity') AS INTEGER),
'new_stock', CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER),
'stock_change',
CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) -
CAST(JSON_EXTRACT(cf.full_document_before_change, '$.stock_quantity') AS INTEGER),
'stock_status',
CASE
WHEN CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) = 0 THEN 'out_of_stock'
WHEN CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) < 10 THEN 'low_stock'
WHEN CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) > 100 THEN 'high_stock'
ELSE 'normal_stock'
END,
'restock_needed',
CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) < 10
)
ELSE NULL
END as inventory_analysis,
-- Generate search keywords for search service sync
CASE
WHEN 'search_service' = ANY(cf.sync_targets) THEN
ARRAY_CAT(
STRING_TO_ARRAY(LOWER(JSON_EXTRACT_TEXT(cf.full_document, '$.product_name')), ' '),
STRING_TO_ARRAY(LOWER(JSON_EXTRACT_TEXT(cf.full_document, '$.category')), ' ')
)
ELSE NULL
END as search_keywords,
-- Cache invalidation keys
CASE
WHEN 'cache_invalidation' = ANY(cf.sync_targets) THEN
ARRAY[
'product:' || JSON_EXTRACT_TEXT(cf.document_key, '$._id'),
'products:category:' || JSON_EXTRACT_TEXT(cf.full_document, '$.category'),
'products:search:*'
]
ELSE NULL
END as cache_keys_to_invalidate
FROM change_filtering cf
),
sync_routing AS (
SELECT
ce.*,
-- Generate target-specific sync payloads
UNNEST(
ARRAY_REMOVE(ce.sync_targets, NULL)
) as sync_target,
-- Create sync payload based on target
CASE UNNEST(ARRAY_REMOVE(ce.sync_targets, NULL))
WHEN 'search_service' THEN
JSON_OBJECT(
'action',
CASE ce.operation_type
WHEN 'delete' THEN 'delete'
ELSE 'upsert'
END,
'document',
CASE ce.operation_type
WHEN 'delete' THEN NULL
ELSE JSON_OBJECT(
'id', JSON_EXTRACT_TEXT(ce.document_key, '$._id'),
'title', JSON_EXTRACT_TEXT(ce.full_document, '$.product_name'),
'content', JSON_EXTRACT_TEXT(ce.full_document, '$.description'),
'category', JSON_EXTRACT_TEXT(ce.full_document, '$.category'),
'price', CAST(JSON_EXTRACT(ce.full_document, '$.price') AS DECIMAL(10,2)),
'in_stock', CAST(JSON_EXTRACT(ce.full_document, '$.stock_quantity') AS INTEGER) > 0,
'keywords', ce.search_keywords,
'last_updated', ce.processed_at
)
END,
'priority',
CASE ce.business_impact
WHEN 'high' THEN 'urgent'
WHEN 'medium' THEN 'normal'
ELSE 'low'
END
)
WHEN 'analytics_warehouse' THEN
JSON_OBJECT(
'entity_type', 'product',
'event_type', 'product_' || ce.operation_type,
'event_timestamp', ce.processed_at,
'entity_id', JSON_EXTRACT_TEXT(ce.document_key, '$._id'),
'event_data',
CASE ce.operation_type
WHEN 'delete' THEN ce.full_document_before_change
ELSE ce.full_document
END,
'change_metadata', JSON_OBJECT(
'updated_fields',
CASE WHEN ce.update_description IS NOT NULL THEN
JSON_EXTRACT(ce.update_description, '$.updatedFields')
ELSE NULL END,
'removed_fields',
CASE WHEN ce.update_description IS NOT NULL THEN
JSON_EXTRACT(ce.update_description, '$.removedFields')
ELSE NULL END
),
'enrichment', JSON_OBJECT(
'price_analysis', ce.price_analysis,
'inventory_analysis', ce.inventory_analysis,
'business_impact', ce.business_impact,
'change_category', ce.change_category
)
)
WHEN 'recommendation_engine' THEN
JSON_OBJECT(
'entity_type', 'product',
'entity_id', JSON_EXTRACT_TEXT(ce.document_key, '$._id'),
'action',
CASE ce.operation_type
WHEN 'delete' THEN 'remove'
WHEN 'insert' THEN 'add'
ELSE 'update'
END,
'product_data',
CASE ce.operation_type
WHEN 'delete' THEN ce.full_document_before_change
ELSE ce.full_document
END,
'recommendation_hints', JSON_OBJECT(
'price_changed', ce.price_analysis IS NOT NULL,
'new_product', ce.operation_type = 'insert',
'business_impact', ce.business_impact,
'category', JSON_EXTRACT_TEXT(ce.full_document, '$.category')
)
)
WHEN 'cache_invalidation' THEN
JSON_OBJECT(
'operation', 'invalidate',
'keys', ce.cache_keys_to_invalidate,
'cascade', true,
'reason', ce.change_category,
'priority',
CASE ce.business_impact
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
ELSE 3
END
)
ELSE JSON_OBJECT('error', 'unknown_sync_target')
END as sync_payload
FROM change_enrichment ce
)
-- Execute synchronization with comprehensive tracking and monitoring
INSERT INTO sync_operations (
change_id,
operation_type,
collection_name,
document_key,
change_category,
business_impact,
processing_priority,
sync_target,
sync_payload,
sync_status,
sync_attempt_count,
created_at,
scheduled_for
)
SELECT
sr.change_id,
sr.operation_type,
sr.collection_name,
sr.document_key,
sr.change_category,
sr.business_impact,
sr.processing_priority,
sr.sync_target,
sr.sync_payload,
'pending' as sync_status,
0 as sync_attempt_count,
sr.processed_at as created_at,
-- Schedule based on priority
sr.processed_at +
CASE sr.processing_priority
WHEN 5 THEN INTERVAL '0 seconds' -- Immediate for deletes
WHEN 4 THEN INTERVAL '5 seconds' -- Near immediate for high priority
WHEN 3 THEN INTERVAL '30 seconds' -- Medium priority
WHEN 2 THEN INTERVAL '2 minutes' -- Lower priority
ELSE INTERVAL '5 minutes' -- Lowest priority
END as scheduled_for
FROM sync_routing sr
ORDER BY sr.processing_priority DESC, sr.processed_at ASC;
-- Advanced change stream monitoring and analytics
WITH change_stream_analytics AS (
SELECT
DATE_TRUNC('minute', processed_at) as time_bucket,
change_category,
business_impact,
-- Volume metrics
COUNT(*) as change_count,
COUNT(DISTINCT document_key) as unique_documents_changed,
-- Operation type distribution
COUNT(*) FILTER (WHERE operation_type = 'insert') as inserts,
COUNT(*) FILTER (WHERE operation_type = 'update') as updates,
COUNT(*) FILTER (WHERE operation_type = 'delete') as deletes,
-- Business impact distribution
COUNT(*) FILTER (WHERE business_impact = 'high') as high_impact_changes,
COUNT(*) FILTER (WHERE business_impact = 'medium') as medium_impact_changes,
COUNT(*) FILTER (WHERE business_impact = 'low') as low_impact_changes,
-- Sync target requirements
SUM(ARRAY_LENGTH(sync_targets, 1)) as total_sync_operations,
AVG(ARRAY_LENGTH(sync_targets, 1)) as avg_sync_targets_per_change,
-- Processing latency (from cluster time to processing)
AVG(
EXTRACT(MILLISECONDS FROM processed_at - cluster_time)
) as avg_processing_latency_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY EXTRACT(MILLISECONDS FROM processed_at - cluster_time)
) as p95_processing_latency_ms
FROM CHANGE_STREAM_LOG
WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY DATE_TRUNC('minute', processed_at), change_category, business_impact
),
sync_performance_analysis AS (
SELECT
DATE_TRUNC('minute', created_at) as time_bucket,
sync_target,
-- Sync success metrics
COUNT(*) as total_sync_attempts,
COUNT(*) FILTER (WHERE sync_status = 'completed') as successful_syncs,
COUNT(*) FILTER (WHERE sync_status = 'failed') as failed_syncs,
COUNT(*) FILTER (WHERE sync_status = 'pending') as pending_syncs,
COUNT(*) FILTER (WHERE sync_status = 'retrying') as retrying_syncs,
-- Performance metrics
AVG(sync_duration_ms) as avg_sync_duration_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY sync_duration_ms) as p95_sync_duration_ms,
AVG(sync_attempt_count) as avg_retry_count,
-- Success rate calculation
ROUND(
(COUNT(*) FILTER (WHERE sync_status = 'completed')::FLOAT /
NULLIF(COUNT(*), 0)) * 100,
2
) as success_rate_percent,
-- Queue depth analysis
AVG(
EXTRACT(MILLISECONDS FROM sync_started_at - scheduled_for)
) as avg_queue_wait_time_ms
FROM sync_operations
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY DATE_TRUNC('minute', created_at), sync_target
)
SELECT
csa.time_bucket,
csa.change_category,
csa.business_impact,
-- Change stream metrics
csa.change_count,
csa.unique_documents_changed,
csa.inserts,
csa.updates,
csa.deletes,
-- Impact distribution
csa.high_impact_changes,
csa.medium_impact_changes,
csa.low_impact_changes,
-- Processing performance
ROUND(csa.avg_processing_latency_ms::NUMERIC, 2) as avg_processing_latency_ms,
ROUND(csa.p95_processing_latency_ms::NUMERIC, 2) as p95_processing_latency_ms,
-- Sync requirements
csa.total_sync_operations,
ROUND(csa.avg_sync_targets_per_change::NUMERIC, 2) as avg_sync_targets_per_change,
-- Sync performance by target
JSON_OBJECT_AGG(
spa.sync_target,
JSON_OBJECT(
'success_rate', spa.success_rate_percent,
'avg_duration_ms', ROUND(spa.avg_sync_duration_ms::NUMERIC, 2),
'p95_duration_ms', ROUND(spa.p95_sync_duration_ms::NUMERIC, 2),
'avg_queue_wait_ms', ROUND(spa.avg_queue_wait_time_ms::NUMERIC, 2),
'pending_count', spa.pending_syncs,
'retry_count', spa.retrying_syncs
)
) as sync_performance_by_target,
-- Health indicators
CASE
WHEN AVG(spa.success_rate_percent) > 95 THEN 'healthy'
WHEN AVG(spa.success_rate_percent) > 90 THEN 'warning'
ELSE 'critical'
END as overall_health_status,
-- Alerts and recommendations
ARRAY[
CASE WHEN csa.avg_processing_latency_ms > 1000
THEN 'High processing latency detected' END,
CASE WHEN AVG(spa.success_rate_percent) < 95
THEN 'Sync success rate below threshold' END,
CASE WHEN AVG(spa.avg_queue_wait_time_ms) > 30000
THEN 'High sync queue wait times' END,
CASE WHEN csa.high_impact_changes > 100
THEN 'Unusual volume of high-impact changes' END
]::TEXT[] as alert_conditions
FROM change_stream_analytics csa
LEFT JOIN sync_performance_analysis spa ON csa.time_bucket = spa.time_bucket
GROUP BY
csa.time_bucket, csa.change_category, csa.business_impact,
csa.change_count, csa.unique_documents_changed, csa.inserts, csa.updates, csa.deletes,
csa.high_impact_changes, csa.medium_impact_changes, csa.low_impact_changes,
csa.avg_processing_latency_ms, csa.p95_processing_latency_ms,
csa.total_sync_operations, csa.avg_sync_targets_per_change
ORDER BY csa.time_bucket DESC, csa.change_count DESC;
-- Real-time CDC health monitoring dashboard
CREATE VIEW cdc_health_dashboard AS
WITH real_time_metrics AS (
SELECT
-- Current timestamp for real-time display
CURRENT_TIMESTAMP as dashboard_time,
-- Change stream status
(SELECT COUNT(*) FROM ACTIVE_CHANGE_STREAMS) as active_streams,
(SELECT COUNT(*) FROM CHANGE_STREAM_LOG
WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '1 minute') as changes_last_minute,
(SELECT COUNT(*) FROM CHANGE_STREAM_LOG
WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes') as changes_last_5_minutes,
-- Sync operation status
(SELECT COUNT(*) FROM sync_operations WHERE sync_status = 'pending') as pending_syncs,
(SELECT COUNT(*) FROM sync_operations WHERE sync_status = 'retrying') as retrying_syncs,
(SELECT COUNT(*) FROM sync_operations WHERE sync_status = 'failed') as failed_syncs,
-- Performance indicators
(SELECT AVG(EXTRACT(MILLISECONDS FROM processed_at - cluster_time))
FROM CHANGE_STREAM_LOG
WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes') as avg_latency_5m,
(SELECT AVG(sync_duration_ms) FROM sync_operations
WHERE sync_completed_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
AND sync_status = 'completed') as avg_sync_duration_5m,
-- Error rates
(SELECT COUNT(*) FROM sync_operations
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
AND sync_status = 'failed') as sync_failures_5m,
(SELECT COUNT(*) FROM sync_operations
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes') as total_syncs_5m
)
SELECT
dashboard_time,
-- Stream health
active_streams,
changes_last_minute,
changes_last_5_minutes,
ROUND(changes_last_5_minutes / 5.0, 1) as avg_changes_per_minute,
-- Queue status
pending_syncs,
retrying_syncs,
failed_syncs,
-- Performance metrics
ROUND(avg_latency_5m::NUMERIC, 2) as avg_processing_latency_ms,
ROUND(avg_sync_duration_5m::NUMERIC, 2) as avg_sync_duration_ms,
-- Health indicators
CASE
WHEN pending_syncs + retrying_syncs > 1000 THEN 'critical'
WHEN pending_syncs + retrying_syncs > 500 THEN 'warning'
WHEN avg_latency_5m > 1000 THEN 'warning'
ELSE 'healthy'
END as system_health,
-- Error rates
sync_failures_5m,
total_syncs_5m,
CASE
WHEN total_syncs_5m > 0 THEN
ROUND((sync_failures_5m::FLOAT / total_syncs_5m * 100)::NUMERIC, 2)
ELSE 0
END as error_rate_percent,
-- Capacity indicators
CASE
WHEN changes_last_minute > 1000 THEN 'high_volume'
WHEN changes_last_minute > 100 THEN 'medium_volume'
ELSE 'normal_volume'
END as current_load,
-- Operational status
ARRAY[
CASE WHEN active_streams = 0 THEN 'No active change streams' END,
CASE WHEN failed_syncs > 10 THEN 'High number of failed syncs' END,
CASE WHEN pending_syncs > 500 THEN 'High sync queue depth' END,
CASE WHEN avg_latency_5m > 1000 THEN 'High processing latency' END
]::TEXT[] as current_alerts
FROM real_time_metrics;
-- QueryLeaf provides comprehensive MongoDB CDC capabilities:
-- 1. SQL-familiar syntax for change stream configuration and monitoring
-- 2. Advanced change filtering and routing with business logic integration
-- 3. Intelligent sync target determination based on change characteristics
-- 4. Real-time change processing with priority-based queue management
-- 5. Comprehensive error handling and retry mechanisms with dead letter queues
-- 6. Advanced performance monitoring and analytics with health indicators
-- 7. Production-ready CDC operations with resume token management
-- 8. Integration with MongoDB's native change stream capabilities
-- 9. Sophisticated transformation and enrichment pipeline support
-- 10. Enterprise-grade compliance and audit trail capabilities
Best Practices for Production CDC Implementation
Change Data Capture Strategy Design
Essential principles for effective MongoDB CDC deployment and management:
- Stream Configuration: Configure change streams with appropriate filtering, batch sizing, and resume token management for reliability
- Transformation Pipeline: Design flexible transformation pipelines that can adapt to schema evolution and business requirement changes
- Error Handling: Implement comprehensive error handling with retry logic, dead letter queues, and alert mechanisms
- Performance Optimization: Optimize CDC performance through intelligent batching, parallel processing, and resource management
- Monitoring and Observability: Deploy comprehensive monitoring that tracks change stream health, sync performance, and business metrics
- Scalability Planning: Design CDC architecture that can scale with data volume growth and increasing synchronization requirements
Enterprise CDC Deployment
Optimize CDC systems for production enterprise environments:
- Distributed Processing: Implement distributed CDC processing that can handle high-volume change streams across multiple nodes
- Compliance Integration: Ensure CDC operations meet regulatory requirements for data lineage, audit trails, and access controls
- Disaster Recovery: Design CDC systems with failover capabilities and data recovery procedures for business continuity
- Security Controls: Implement encryption, access controls, and security monitoring for CDC data flows
- Operational Integration: Integrate CDC with existing monitoring, alerting, and operational workflows for seamless management
- Cost Optimization: Monitor and optimize CDC resource usage and synchronization costs for efficient operations
Conclusion
MongoDB Change Data Capture provides sophisticated real-time data synchronization capabilities that enable modern event-driven architectures, distributed system integration, and responsive data pipelines without the complexity and limitations of traditional CDC approaches. Native change streams offer reliable, scalable, and efficient change detection with minimal performance impact and comprehensive transformation capabilities.
Key MongoDB CDC benefits include:
- Real-Time Synchronization: Immediate change detection and propagation without polling delays or batch processing limitations
- Advanced Filtering: Sophisticated change stream filtering and routing based on business logic and data characteristics
- Fault Tolerance: Built-in resume token support and error handling for reliable change stream processing
- Scalable Architecture: Native MongoDB integration that scales efficiently with data volume and system complexity
- Flexible Transformations: Comprehensive data transformation and enrichment capabilities for target-specific synchronization
- SQL Accessibility: Familiar SQL-style CDC operations through QueryLeaf for accessible change data capture management
Whether you're building event-driven microservices, maintaining data warehouse synchronization, implementing search index updates, or orchestrating complex distributed system workflows, MongoDB CDC with QueryLeaf's familiar SQL interface provides the foundation for reliable, efficient, and scalable real-time data synchronization.
QueryLeaf Integration: QueryLeaf automatically translates SQL-style CDC configurations into MongoDB's native change streams, making advanced real-time synchronization accessible to SQL-oriented development teams. Complex change filtering, transformation pipelines, and sync orchestration are seamlessly handled through familiar SQL constructs, enabling sophisticated event-driven architectures without requiring deep MongoDB change stream expertise.
The combination of MongoDB's robust change data capture capabilities with SQL-style synchronization operations makes it an ideal platform for applications requiring both real-time data propagation and familiar database management patterns, ensuring your distributed systems can maintain data consistency and responsiveness as they scale and evolve.