Skip to content

MongoDB Change Data Capture and Real-Time Data Synchronization: Advanced Event-Driven Architecture and Data Pipeline Management

Modern distributed applications require real-time data synchronization capabilities that enable immediate propagation of data changes across multiple systems, microservices, and external platforms without complex polling mechanisms or batch synchronization processes. Traditional change detection approaches rely on timestamp-based polling, database triggers, or application-level change tracking, leading to data inconsistencies, performance bottlenecks, and complex synchronization logic that fails to scale with growing data volumes.

MongoDB Change Data Capture provides comprehensive real-time change detection and streaming capabilities through change streams, enabling applications to react immediately to data modifications, maintain synchronized data across distributed systems, and build event-driven architectures that scale efficiently. Unlike traditional CDC approaches that require complex trigger systems or external change detection tools, MongoDB change streams offer native, scalable, and reliable change tracking with minimal performance impact.

The Traditional Change Detection Challenge

Conventional approaches to change data capture and synchronization have significant limitations for modern distributed architectures:

-- Traditional PostgreSQL change detection - complex and resource-intensive approaches

-- Manual timestamp-based change tracking with performance limitations
CREATE TABLE products (
    product_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    product_name VARCHAR(200) NOT NULL,
    category VARCHAR(100),
    price DECIMAL(10,2) NOT NULL,
    stock_quantity INTEGER NOT NULL DEFAULT 0,
    supplier_id UUID,

    -- Manual change tracking fields (limited granularity)
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    version_number INTEGER NOT NULL DEFAULT 1,
    last_modified_by VARCHAR(100),

    -- Change type tracking (application-managed)
    change_type VARCHAR(20) DEFAULT 'insert',
    is_deleted BOOLEAN DEFAULT FALSE,

    -- Synchronization status tracking
    sync_status VARCHAR(50) DEFAULT 'pending',
    last_sync_timestamp TIMESTAMP,
    sync_retry_count INTEGER DEFAULT 0,
    sync_error TEXT
);

-- Trigger-based change detection with complex maintenance requirements
CREATE TABLE product_changes (
    change_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    product_id UUID NOT NULL,
    change_type VARCHAR(20) NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
    change_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    -- Before and after data (limited effectiveness for complex documents)
    old_data JSONB,
    new_data JSONB,
    changed_fields TEXT[], -- Manual field tracking

    -- Change metadata
    changed_by VARCHAR(100),
    change_reason VARCHAR(200),
    transaction_id BIGINT,

    -- Synchronization tracking
    processed BOOLEAN DEFAULT FALSE,
    processed_at TIMESTAMP,
    processing_attempts INTEGER DEFAULT 0,
    error_message TEXT
);

-- Complex trigger system for change capture (high maintenance overhead)
CREATE OR REPLACE FUNCTION track_product_changes()
RETURNS TRIGGER AS $$
DECLARE
    change_type_value VARCHAR(20);
    old_json JSONB;
    new_json JSONB;
    changed_fields_array TEXT[] := '{}';
    field_name TEXT;
BEGIN
    -- Determine change type
    IF TG_OP = 'INSERT' THEN
        change_type_value := 'INSERT';
        new_json := to_jsonb(NEW);
        old_json := NULL;

    ELSIF TG_OP = 'UPDATE' THEN
        change_type_value := 'UPDATE';
        old_json := to_jsonb(OLD);
        new_json := to_jsonb(NEW);

        -- Manual field-by-field comparison (extremely limited)
        IF OLD.product_name != NEW.product_name THEN
            changed_fields_array := array_append(changed_fields_array, 'product_name');
        END IF;
        IF OLD.category != NEW.category OR (OLD.category IS NULL) != (NEW.category IS NULL) THEN
            changed_fields_array := array_append(changed_fields_array, 'category');
        END IF;
        IF OLD.price != NEW.price THEN
            changed_fields_array := array_append(changed_fields_array, 'price');
        END IF;
        IF OLD.stock_quantity != NEW.stock_quantity THEN
            changed_fields_array := array_append(changed_fields_array, 'stock_quantity');
        END IF;
        -- Limited to predefined fields, no support for dynamic schema

    ELSIF TG_OP = 'DELETE' THEN
        change_type_value := 'DELETE';
        old_json := to_jsonb(OLD);
        new_json := NULL;

    END IF;

    -- Insert change record (potential performance bottleneck)
    INSERT INTO product_changes (
        product_id,
        change_type,
        old_data,
        new_data,
        changed_fields,
        changed_by,
        transaction_id
    ) VALUES (
        COALESCE(NEW.product_id, OLD.product_id),
        change_type_value,
        old_json,
        new_json,
        changed_fields_array,
        current_user,
        txid_current()
    );

    -- Update the main record's change tracking
    IF TG_OP != 'DELETE' THEN
        NEW.updated_at := CURRENT_TIMESTAMP;
        NEW.version_number := COALESCE(OLD.version_number, 0) + 1;
        NEW.sync_status := 'pending';
        RETURN NEW;
    ELSE
        RETURN OLD;
    END IF;

EXCEPTION 
    WHEN OTHERS THEN
        -- Log error but don't fail the main operation
        RAISE WARNING 'Change tracking failed: %', SQLERRM;
        RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

-- Create triggers (must be maintained for every table)
CREATE TRIGGER products_change_trigger
    BEFORE INSERT OR UPDATE OR DELETE ON products
    FOR EACH ROW EXECUTE FUNCTION track_product_changes();

-- Polling-based synchronization query (inefficient and resource-intensive)
WITH pending_changes AS (
    SELECT 
        pc.change_id,
        pc.product_id,
        pc.change_type,
        pc.change_timestamp,
        pc.new_data,
        pc.old_data,
        pc.changed_fields,

        -- Extract change details for synchronization
        CASE 
            WHEN pc.change_type = 'INSERT' THEN pc.new_data
            WHEN pc.change_type = 'UPDATE' THEN pc.new_data  
            WHEN pc.change_type = 'DELETE' THEN pc.old_data
        END as sync_data,

        -- Priority scoring (limited effectiveness)
        CASE 
            WHEN pc.change_type = 'DELETE' THEN 3
            WHEN pc.change_type = 'INSERT' THEN 2
            WHEN pc.change_type = 'UPDATE' AND 'price' = ANY(pc.changed_fields) THEN 2
            ELSE 1
        END as sync_priority,

        -- Calculate processing delay (for monitoring)
        EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - pc.change_timestamp) as delay_seconds

    FROM product_changes pc
    WHERE pc.processed = false
    AND pc.processing_attempts < 5
    AND (pc.change_timestamp + INTERVAL '1 minute' * POWER(2, pc.processing_attempts)) <= CURRENT_TIMESTAMP

    ORDER BY sync_priority DESC, change_timestamp ASC
    LIMIT 1000  -- Batch processing limitation
),

synchronization_targets AS (
    -- Define external systems to sync with (static configuration)
    SELECT unnest(ARRAY[
        'search_service',
        'analytics_warehouse', 
        'recommendation_engine',
        'external_api',
        'cache_invalidation'
    ]) as target_system
)

SELECT 
    pc.change_id,
    pc.product_id,
    pc.change_type,
    pc.sync_data,
    pc.delay_seconds,
    st.target_system,

    -- Generate synchronization payloads (limited transformation capabilities)
    JSON_BUILD_OBJECT(
        'change_id', pc.change_id,
        'entity_type', 'product',
        'entity_id', pc.product_id,
        'operation', LOWER(pc.change_type),
        'timestamp', pc.change_timestamp,
        'data', pc.sync_data,
        'changed_fields', pc.changed_fields,
        'target_system', st.target_system,
        'priority', pc.sync_priority
    ) as sync_payload,

    -- Endpoint configuration (manual maintenance)
    CASE st.target_system
        WHEN 'search_service' THEN 'http://search-api/products/sync'
        WHEN 'analytics_warehouse' THEN 'http://warehouse-api/data/ingest'
        WHEN 'recommendation_engine' THEN 'http://recommendations/products/update'
        WHEN 'external_api' THEN 'http://external-partner/webhook/products'
        WHEN 'cache_invalidation' THEN 'http://cache-service/invalidate'
        ELSE 'http://default-sync-service/webhook'
    END as target_endpoint

FROM pending_changes pc
CROSS JOIN synchronization_targets st;

-- Update processing status (requires external application logic)
UPDATE product_changes 
SET 
    processing_attempts = processing_attempts + 1,
    error_message = CASE 
        WHEN processing_attempts >= 4 THEN 'Max retry attempts exceeded'
        ELSE error_message
    END
WHERE change_id IN (SELECT change_id FROM pending_changes);

-- Problems with traditional CDC approaches:
-- 1. Complex trigger maintenance and performance impact on write operations
-- 2. Limited change detection granularity and field-level change tracking
-- 3. Manual synchronization logic with no built-in retry or error handling
-- 4. Polling-based detection causing delays and resource waste
-- 5. No support for transaction-level change grouping or ordering guarantees  
-- 6. Difficult schema evolution and maintenance of change tracking infrastructure
-- 7. No built-in filtering or transformation capabilities for change streams
-- 8. Complex error handling and dead letter queue management
-- 9. Limited scalability for high-volume change processing
-- 10. No native support for distributed system synchronization patterns

-- Manual batch synchronization attempt (resource-intensive and delayed)
WITH hourly_changes AS (
    SELECT 
        product_id,
        array_agg(
            JSON_BUILD_OBJECT(
                'change_type', change_type,
                'timestamp', change_timestamp,
                'data', COALESCE(new_data, old_data)
            ) 
            ORDER BY change_timestamp
        ) as change_history,
        MIN(change_timestamp) as first_change,
        MAX(change_timestamp) as last_change,
        COUNT(*) as change_count

    FROM product_changes
    WHERE change_timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    AND processed = false
    GROUP BY product_id
),

batch_sync_data AS (
    SELECT 
        hc.product_id,
        hc.change_history,
        hc.change_count,

        -- Get current product state (may be inconsistent due to timing)
        p.product_name,
        p.category, 
        p.price,
        p.stock_quantity,

        -- Calculate sync requirements
        CASE 
            WHEN p.product_id IS NULL THEN 'product_deleted'
            WHEN hc.change_count > 10 THEN 'full_refresh'
            ELSE 'incremental_sync'
        END as sync_strategy

    FROM hourly_changes hc
    LEFT JOIN products p ON hc.product_id = p.product_id
)

SELECT 
    COUNT(*) as total_products_to_sync,
    COUNT(*) FILTER (WHERE sync_strategy = 'full_refresh') as full_refresh_count,
    COUNT(*) FILTER (WHERE sync_strategy = 'incremental_sync') as incremental_count,  
    COUNT(*) FILTER (WHERE sync_strategy = 'product_deleted') as deletion_count,
    SUM(change_count) as total_changes,
    AVG(change_count) as avg_changes_per_product,

    -- Estimate processing time (rough calculation)
    CEIL(SUM(change_count) / 100.0) as estimated_processing_minutes

FROM batch_sync_data;

-- Traditional limitations:
-- 1. No real-time change detection - relies on polling with delays
-- 2. Complex trigger and stored procedure maintenance overhead
-- 3. Performance impact on write operations due to change tracking triggers
-- 4. Limited transformation and filtering capabilities for change data
-- 5. Manual error handling and retry logic implementation required
-- 6. No built-in support for distributed synchronization patterns
-- 7. Difficult to scale change processing for high-volume systems
-- 8. Schema evolution breaks change tracking infrastructure
-- 9. No transaction-level change ordering or consistency guarantees
-- 10. Complex debugging and monitoring of change propagation failures

MongoDB provides sophisticated Change Data Capture capabilities with advanced streaming and synchronization:

// MongoDB Advanced Change Data Capture and Real-Time Synchronization System
const { MongoClient, ChangeStream } = require('mongodb');
const { EventEmitter } = require('events');

const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
const db = client.db('realtime_cdc_system');

// Comprehensive MongoDB Change Data Capture Manager
class AdvancedCDCManager extends EventEmitter {
  constructor(db, config = {}) {
    super();
    this.db = db;
    this.collections = {
      products: db.collection('products'),
      orders: db.collection('orders'),
      customers: db.collection('customers'),
      inventory: db.collection('inventory'),
      cdcConfiguration: db.collection('cdc_configuration'),
      changeLog: db.collection('change_log'),
      syncStatus: db.collection('sync_status'),
      errorLog: db.collection('error_log')
    };

    // Advanced CDC configuration
    this.config = {
      enableResumeTokens: config.enableResumeTokens !== false,
      batchSize: config.batchSize || 100,
      maxAwaitTimeMS: config.maxAwaitTimeMS || 1000,
      fullDocument: config.fullDocument || 'updateLookup',
      fullDocumentBeforeChange: config.fullDocumentBeforeChange || 'whenAvailable',

      // Change stream filtering
      enableFiltering: config.enableFiltering !== false,
      filterCriteria: config.filterCriteria || {},
      includeNamespaces: config.includeNamespaces || [],
      excludeNamespaces: config.excludeNamespaces || [],

      // Synchronization targets
      syncTargets: config.syncTargets || [
        { name: 'search_service', enabled: true, priority: 1 },
        { name: 'analytics_warehouse', enabled: true, priority: 2 },
        { name: 'recommendation_engine', enabled: true, priority: 3 },
        { name: 'cache_invalidation', enabled: true, priority: 1 }
      ],

      // Error handling and retry
      enableRetryLogic: config.enableRetryLogic !== false,
      maxRetries: config.maxRetries || 5,
      retryDelayBase: config.retryDelayBase || 1000,
      deadLetterQueue: config.enableDeadLetterQueue !== false,

      // Performance optimization
      enableBatchProcessing: config.enableBatchProcessing !== false,
      enableParallelSync: config.enableParallelSync !== false,
      maxConcurrentSyncs: config.maxConcurrentSyncs || 10,

      // Monitoring and metrics
      enableMetrics: config.enableMetrics !== false,
      metricsInterval: config.metricsInterval || 60000,
      enableHealthChecks: config.enableHealthChecks !== false
    };

    // CDC state management
    this.changeStreams = new Map();
    this.resumeTokens = new Map();
    this.syncQueues = new Map();
    this.processingStats = {
      totalChanges: 0,
      successfulSyncs: 0,
      failedSyncs: 0,
      avgProcessingTime: 0,
      lastProcessedTimestamp: null
    };

    // Initialize CDC system
    this.initializeCDCSystem();
  }

  async initializeCDCSystem() {
    console.log('Initializing comprehensive MongoDB Change Data Capture system...');

    try {
      // Setup change stream configurations
      await this.setupChangeStreamConfiguration();

      // Initialize synchronization targets
      await this.initializeSyncTargets();

      // Setup error handling and monitoring
      await this.setupErrorHandlingAndMonitoring();

      // Start change streams for configured collections
      await this.startChangeStreams();

      // Initialize metrics collection
      if (this.config.enableMetrics) {
        await this.startMetricsCollection();
      }

      // Setup health monitoring
      if (this.config.enableHealthChecks) {
        await this.setupHealthMonitoring();
      }

      console.log('Change Data Capture system initialized successfully');

    } catch (error) {
      console.error('Error initializing CDC system:', error);
      throw error;
    }
  }

  async setupChangeStreamConfiguration() {
    console.log('Setting up change stream configuration...');

    try {
      // Define collections to monitor with specific configurations
      const monitoringConfig = [
        {
          collection: 'products',
          pipeline: [
            {
              $match: {
                'operationType': { $in: ['insert', 'update', 'delete', 'replace'] },
                $or: [
                  { 'updateDescription.updatedFields.price': { $exists: true } },
                  { 'updateDescription.updatedFields.stock_quantity': { $exists: true } },
                  { 'updateDescription.updatedFields.status': { $exists: true } },
                  { 'operationType': { $in: ['insert', 'delete'] } }
                ]
              }
            }
          ],
          options: {
            fullDocument: 'updateLookup',
            fullDocumentBeforeChange: 'whenAvailable'
          },
          syncTargets: ['search_service', 'analytics_warehouse', 'cache_invalidation'],
          transformations: ['priceCalculation', 'stockValidation', 'searchIndexing']
        },
        {
          collection: 'orders',
          pipeline: [
            {
              $match: {
                'operationType': { $in: ['insert', 'update'] },
                $or: [
                  { 'operationType': 'insert' },
                  { 'updateDescription.updatedFields.status': { $exists: true } },
                  { 'updateDescription.updatedFields.payment_status': { $exists: true } }
                ]
              }
            }
          ],
          options: {
            fullDocument: 'updateLookup'
          },
          syncTargets: ['analytics_warehouse', 'recommendation_engine'],
          transformations: ['orderAnalytics', 'customerInsights', 'inventoryImpact']
        },
        {
          collection: 'customers',
          pipeline: [
            {
              $match: {
                'operationType': { $in: ['insert', 'update'] },
                $or: [
                  { 'operationType': 'insert' },
                  { 'updateDescription.updatedFields.preferences': { $exists: true } },
                  { 'updateDescription.updatedFields.profile': { $exists: true } }
                ]
              }
            }
          ],
          options: {
            fullDocument: 'updateLookup'
          },
          syncTargets: ['recommendation_engine', 'analytics_warehouse'],
          transformations: ['profileEnrichment', 'preferencesAnalysis', 'segmentation']
        }
      ];

      // Store configuration for runtime access
      await this.collections.cdcConfiguration.deleteMany({});
      await this.collections.cdcConfiguration.insertMany(
        monitoringConfig.map(config => ({
          ...config,
          enabled: true,
          createdAt: new Date(),
          lastResumeToken: null
        }))
      );

      this.monitoringConfig = monitoringConfig;

    } catch (error) {
      console.error('Error setting up change stream configuration:', error);
      throw error;
    }
  }

  async startChangeStreams() {
    console.log('Starting change streams for all configured collections...');

    try {
      for (const config of this.monitoringConfig) {
        await this.startCollectionChangeStream(config);
      }

      console.log(`Started ${this.changeStreams.size} change streams successfully`);

    } catch (error) {
      console.error('Error starting change streams:', error);
      throw error;
    }
  }

  async startCollectionChangeStream(config) {
    console.log(`Starting change stream for collection: ${config.collection}`);

    try {
      const collection = this.collections[config.collection];
      if (!collection) {
        throw new Error(`Collection ${config.collection} not found`);
      }

      // Retrieve resume token if available
      const savedConfig = await this.collections.cdcConfiguration.findOne({
        collection: config.collection
      });

      const changeStreamOptions = {
        ...config.options,
        batchSize: this.config.batchSize,
        maxAwaitTimeMS: this.config.maxAwaitTimeMS
      };

      // Add resume token if available and enabled
      if (this.config.enableResumeTokens && savedConfig?.lastResumeToken) {
        changeStreamOptions.resumeAfter = savedConfig.lastResumeToken;
        console.log(`Resuming change stream for ${config.collection} from saved token`);
      }

      // Create change stream with pipeline
      const changeStream = collection.watch(config.pipeline || [], changeStreamOptions);

      // Store change stream reference
      this.changeStreams.set(config.collection, {
        stream: changeStream,
        config: config,
        startTime: new Date(),
        processedCount: 0,
        errorCount: 0
      });

      // Setup change event handling
      changeStream.on('change', async (changeDoc) => {
        await this.handleChangeEvent(changeDoc, config);
      });

      changeStream.on('error', async (error) => {
        console.error(`Change stream error for ${config.collection}:`, error);
        await this.handleChangeStreamError(config.collection, error);
      });

      changeStream.on('close', () => {
        console.warn(`Change stream closed for ${config.collection}`);
        this.emit('changeStreamClosed', config.collection);
      });

      changeStream.on('end', () => {
        console.warn(`Change stream ended for ${config.collection}`);
        this.emit('changeStreamEnded', config.collection);
      });

      // Store resume token periodically
      if (this.config.enableResumeTokens) {
        setInterval(async () => {
          try {
            const resumeToken = changeStream.resumeToken;
            if (resumeToken) {
              await this.saveResumeToken(config.collection, resumeToken);
            }
          } catch (error) {
            console.warn(`Error saving resume token for ${config.collection}:`, error.message);
          }
        }, 30000); // Save every 30 seconds
      }

    } catch (error) {
      console.error(`Error starting change stream for ${config.collection}:`, error);
      throw error;
    }
  }

  async handleChangeEvent(changeDoc, config) {
    const startTime = Date.now();

    try {
      // Update processing statistics
      this.processingStats.totalChanges++;
      this.processingStats.lastProcessedTimestamp = new Date();

      // Update collection-specific statistics
      const streamInfo = this.changeStreams.get(config.collection);
      if (streamInfo) {
        streamInfo.processedCount++;
      }

      console.log(`Processing change event for ${config.collection}:`, {
        operationType: changeDoc.operationType,
        documentKey: changeDoc.documentKey,
        timestamp: changeDoc.clusterTime
      });

      // Apply transformations if configured
      const transformedChangeDoc = await this.applyTransformations(changeDoc, config);

      // Log change event for audit trail
      await this.logChangeEvent(transformedChangeDoc, config);

      // Process synchronization to configured targets
      const syncPromises = config.syncTargets.map(targetName => 
        this.synchronizeToTarget(transformedChangeDoc, config, targetName)
      );

      if (this.config.enableParallelSync) {
        // Execute synchronizations in parallel
        const syncResults = await Promise.allSettled(syncPromises);
        await this.processSyncResults(syncResults, transformedChangeDoc, config);
      } else {
        // Execute synchronizations sequentially
        for (const syncPromise of syncPromises) {
          try {
            await syncPromise;
            this.processingStats.successfulSyncs++;
          } catch (error) {
            this.processingStats.failedSyncs++;
            console.error('Sequential sync error:', error);
            await this.handleSyncError(error, transformedChangeDoc, config);
          }
        }
      }

      // Update processing time metrics
      const processingTime = Date.now() - startTime;
      this.updateProcessingMetrics(processingTime);

      // Emit processed event for external monitoring
      this.emit('changeProcessed', {
        collection: config.collection,
        operationType: changeDoc.operationType,
        documentKey: changeDoc.documentKey,
        processingTime: processingTime,
        syncTargets: config.syncTargets
      });

    } catch (error) {
      console.error('Error handling change event:', error);

      // Update error statistics
      const streamInfo = this.changeStreams.get(config.collection);
      if (streamInfo) {
        streamInfo.errorCount++;
      }

      // Log error for debugging
      await this.logError(error, changeDoc, config);

      // Emit error event
      this.emit('changeProcessingError', {
        error: error,
        changeDoc: changeDoc,
        config: config
      });
    }
  }

  async applyTransformations(changeDoc, config) {
    if (!config.transformations || config.transformations.length === 0) {
      return changeDoc;
    }

    console.log(`Applying ${config.transformations.length} transformations...`);

    let transformedDoc = { ...changeDoc };

    try {
      for (const transformationName of config.transformations) {
        transformedDoc = await this.applyTransformation(transformedDoc, transformationName, config);
      }

      return transformedDoc;

    } catch (error) {
      console.error('Error applying transformations:', error);
      // Return original document if transformation fails
      return changeDoc;
    }
  }

  async applyTransformation(changeDoc, transformationName, config) {
    switch (transformationName) {
      case 'priceCalculation':
        return await this.transformPriceCalculation(changeDoc);

      case 'stockValidation':
        return await this.transformStockValidation(changeDoc);

      case 'searchIndexing':
        return await this.transformSearchIndexing(changeDoc);

      case 'orderAnalytics':
        return await this.transformOrderAnalytics(changeDoc);

      case 'customerInsights':
        return await this.transformCustomerInsights(changeDoc);

      case 'inventoryImpact':
        return await this.transformInventoryImpact(changeDoc);

      case 'profileEnrichment':
        return await this.transformProfileEnrichment(changeDoc);

      case 'preferencesAnalysis':
        return await this.transformPreferencesAnalysis(changeDoc);

      case 'segmentation':
        return await this.transformSegmentation(changeDoc);

      default:
        console.warn(`Unknown transformation: ${transformationName}`);
        return changeDoc;
    }
  }

  async transformPriceCalculation(changeDoc) {
    if (changeDoc.operationType === 'update' && 
        changeDoc.updateDescription?.updatedFields?.price) {

      const newPrice = changeDoc.fullDocument?.price;
      const oldPrice = changeDoc.fullDocumentBeforeChange?.price;

      if (newPrice && oldPrice) {
        const priceChange = newPrice - oldPrice;
        const priceChangePercent = ((priceChange / oldPrice) * 100);

        changeDoc.enrichment = {
          ...changeDoc.enrichment,
          priceAnalysis: {
            oldPrice: oldPrice,
            newPrice: newPrice,
            priceChange: priceChange,
            priceChangePercent: Math.round(priceChangePercent * 100) / 100,
            priceDirection: priceChange > 0 ? 'increase' : 'decrease',
            significantChange: Math.abs(priceChangePercent) > 10
          }
        };
      }
    }

    return changeDoc;
  }

  async transformStockValidation(changeDoc) {
    if (changeDoc.fullDocument?.stock_quantity !== undefined) {
      const stockQuantity = changeDoc.fullDocument.stock_quantity;

      changeDoc.enrichment = {
        ...changeDoc.enrichment,
        stockAnalysis: {
          currentStock: stockQuantity,
          stockStatus: stockQuantity === 0 ? 'out_of_stock' : 
                      stockQuantity < 10 ? 'low_stock' : 'in_stock',
          restockNeeded: stockQuantity < 10,
          stockChangeAlert: changeDoc.operationType === 'update' && 
                           changeDoc.updateDescription?.updatedFields?.stock_quantity !== undefined
        }
      };
    }

    return changeDoc;
  }

  async transformSearchIndexing(changeDoc) {
    if (changeDoc.fullDocument) {
      const doc = changeDoc.fullDocument;

      // Generate search keywords and metadata
      const searchKeywords = [];
      if (doc.product_name) searchKeywords.push(...doc.product_name.toLowerCase().split(/\s+/));
      if (doc.category) searchKeywords.push(...doc.category.toLowerCase().split(/\s+/));
      if (doc.tags) searchKeywords.push(...doc.tags.map(tag => tag.toLowerCase()));

      changeDoc.enrichment = {
        ...changeDoc.enrichment,
        searchMetadata: {
          searchKeywords: [...new Set(searchKeywords)].filter(word => word.length > 2),
          searchableFields: ['product_name', 'category', 'description', 'tags'],
          indexPriority: doc.featured ? 'high' : 'normal',
          lastIndexUpdate: new Date()
        }
      };
    }

    return changeDoc;
  }

  async transformOrderAnalytics(changeDoc) {
    if (changeDoc.fullDocument && changeDoc.ns.coll === 'orders') {
      const order = changeDoc.fullDocument;

      // Calculate order metrics
      const orderValue = order.items?.reduce((sum, item) => sum + (item.price * item.quantity), 0) || 0;
      const itemCount = order.items?.reduce((sum, item) => sum + item.quantity, 0) || 0;

      changeDoc.enrichment = {
        ...changeDoc.enrichment,
        orderAnalytics: {
          orderValue: orderValue,
          itemCount: itemCount,
          averageItemValue: itemCount > 0 ? orderValue / itemCount : 0,
          customerSegment: orderValue > 500 ? 'high_value' : orderValue > 100 ? 'medium_value' : 'low_value',
          orderComplexity: itemCount > 5 ? 'complex' : 'simple'
        }
      };
    }

    return changeDoc;
  }

  async synchronizeToTarget(changeDoc, config, targetName) {
    console.log(`Synchronizing to target: ${targetName}`);

    try {
      // Find target configuration
      const targetConfig = this.config.syncTargets.find(t => t.name === targetName);
      if (!targetConfig || !targetConfig.enabled) {
        console.log(`Target ${targetName} is disabled, skipping sync`);
        return;
      }

      // Prepare synchronization payload
      const syncPayload = await this.prepareSyncPayload(changeDoc, config, targetName);

      // Execute synchronization based on target type
      const syncResult = await this.executeSynchronization(syncPayload, targetConfig);

      // Log successful synchronization
      await this.logSuccessfulSync(changeDoc, targetName, syncResult);

      return syncResult;

    } catch (error) {
      console.error(`Synchronization failed for target ${targetName}:`, error);

      // Handle sync error with retry logic
      await this.handleSyncError(error, changeDoc, config, targetName);
      throw error;
    }
  }

  async prepareSyncPayload(changeDoc, config, targetName) {
    const basePayload = {
      changeId: changeDoc._id?.toString() || `${Date.now()}-${Math.random()}`,
      timestamp: new Date(),
      source: {
        database: changeDoc.ns.db,
        collection: changeDoc.ns.coll,
        operationType: changeDoc.operationType
      },
      documentKey: changeDoc.documentKey,
      clusterTime: changeDoc.clusterTime,
      enrichment: changeDoc.enrichment || {}
    };

    // Add operation-specific data
    switch (changeDoc.operationType) {
      case 'insert':
        basePayload.document = changeDoc.fullDocument;
        break;

      case 'update':
        basePayload.document = changeDoc.fullDocument;
        basePayload.updateDescription = changeDoc.updateDescription;
        if (changeDoc.fullDocumentBeforeChange) {
          basePayload.documentBeforeChange = changeDoc.fullDocumentBeforeChange;
        }
        break;

      case 'replace':
        basePayload.document = changeDoc.fullDocument;
        if (changeDoc.fullDocumentBeforeChange) {
          basePayload.documentBeforeChange = changeDoc.fullDocumentBeforeChange;
        }
        break;

      case 'delete':
        if (changeDoc.fullDocumentBeforeChange) {
          basePayload.deletedDocument = changeDoc.fullDocumentBeforeChange;
        }
        break;
    }

    // Apply target-specific transformations
    return await this.applyTargetSpecificTransformations(basePayload, targetName);
  }

  async applyTargetSpecificTransformations(payload, targetName) {
    switch (targetName) {
      case 'search_service':
        return this.transformForSearchService(payload);

      case 'analytics_warehouse':
        return this.transformForAnalyticsWarehouse(payload);

      case 'recommendation_engine':
        return this.transformForRecommendationEngine(payload);

      case 'cache_invalidation':
        return this.transformForCacheInvalidation(payload);

      default:
        return payload;
    }
  }

  transformForSearchService(payload) {
    if (payload.source.collection === 'products') {
      return {
        ...payload,
        searchServiceData: {
          action: payload.source.operationType === 'delete' ? 'delete' : 'upsert',
          document: payload.document ? {
            id: payload.document._id,
            title: payload.document.product_name,
            content: payload.document.description,
            category: payload.document.category,
            price: payload.document.price,
            inStock: payload.document.stock_quantity > 0,
            keywords: payload.enrichment?.searchMetadata?.searchKeywords || [],
            lastUpdated: payload.timestamp
          } : null,
          priority: payload.enrichment?.searchMetadata?.indexPriority || 'normal'
        }
      };
    }

    return payload;
  }

  transformForAnalyticsWarehouse(payload) {
    return {
      ...payload,
      warehouseData: {
        entityType: payload.source.collection,
        eventType: `${payload.source.collection}_${payload.source.operationType}`,
        eventTimestamp: payload.timestamp,
        entityId: payload.documentKey._id,
        eventData: payload.document || payload.deletedDocument,
        changeMetadata: {
          updatedFields: payload.updateDescription?.updatedFields ? Object.keys(payload.updateDescription.updatedFields) : null,
          removedFields: payload.updateDescription?.removedFields || null
        },
        enrichment: payload.enrichment
      }
    };
  }

  transformForRecommendationEngine(payload) {
    if (payload.source.collection === 'orders') {
      return {
        ...payload,
        recommendationData: {
          userId: payload.document?.customer_id,
          items: payload.document?.items || [],
          orderValue: payload.enrichment?.orderAnalytics?.orderValue,
          customerSegment: payload.enrichment?.orderAnalytics?.customerSegment,
          eventType: 'purchase',
          timestamp: payload.timestamp
        }
      };
    }

    return payload;
  }

  transformForCacheInvalidation(payload) {
    const cacheKeys = [];

    // Generate cache keys based on the changed document
    if (payload.source.collection === 'products' && payload.documentKey._id) {
      cacheKeys.push(
        `product:${payload.documentKey._id}`,
        `products:category:${payload.document?.category}`,
        `products:search:*` // Wildcard for search result caches
      );
    }

    return {
      ...payload,
      cacheInvalidationData: {
        keys: cacheKeys,
        operation: 'invalidate',
        cascade: true,
        reason: `${payload.source.collection}_${payload.source.operationType}`
      }
    };
  }

  async executeSynchronization(syncPayload, targetConfig) {
    // Simulate different synchronization mechanisms
    switch (targetConfig.name) {
      case 'search_service':
        return await this.syncToSearchService(syncPayload);

      case 'analytics_warehouse':
        return await this.syncToAnalyticsWarehouse(syncPayload);

      case 'recommendation_engine':
        return await this.syncToRecommendationEngine(syncPayload);

      case 'cache_invalidation':
        return await this.syncToCacheService(syncPayload);

      default:
        throw new Error(`Unknown sync target: ${targetConfig.name}`);
    }
  }

  async syncToSearchService(payload) {
    // Simulate search service synchronization
    console.log('Syncing to search service:', payload.searchServiceData?.action);

    // Simulate API call delay
    await new Promise(resolve => setTimeout(resolve, 50));

    return {
      success: true,
      target: 'search_service',
      action: payload.searchServiceData?.action,
      processedAt: new Date(),
      responseTime: 50
    };
  }

  async syncToAnalyticsWarehouse(payload) {
    // Simulate analytics warehouse synchronization
    console.log('Syncing to analytics warehouse:', payload.warehouseData?.eventType);

    // Simulate processing delay
    await new Promise(resolve => setTimeout(resolve, 100));

    return {
      success: true,
      target: 'analytics_warehouse',
      eventType: payload.warehouseData?.eventType,
      processedAt: new Date(),
      responseTime: 100
    };
  }

  async syncToRecommendationEngine(payload) {
    // Simulate recommendation engine synchronization
    console.log('Syncing to recommendation engine');

    await new Promise(resolve => setTimeout(resolve, 75));

    return {
      success: true,
      target: 'recommendation_engine',
      processedAt: new Date(),
      responseTime: 75
    };
  }

  async syncToCacheService(payload) {
    // Simulate cache invalidation
    console.log('Invalidating cache keys:', payload.cacheInvalidationData?.keys);

    await new Promise(resolve => setTimeout(resolve, 25));

    return {
      success: true,
      target: 'cache_invalidation',
      keysInvalidated: payload.cacheInvalidationData?.keys?.length || 0,
      processedAt: new Date(),
      responseTime: 25
    };
  }

  async handleSyncError(error, changeDoc, config, targetName) {
    console.error(`Sync error for ${targetName}:`, error.message);

    // Log error for debugging
    await this.collections.errorLog.insertOne({
      errorType: 'sync_error',
      targetName: targetName,
      collection: config.collection,
      changeId: changeDoc._id?.toString(),
      documentKey: changeDoc.documentKey,
      error: {
        message: error.message,
        stack: error.stack,
        name: error.name
      },
      timestamp: new Date(),
      retryable: this.isRetryableError(error)
    });

    // Implement retry logic if enabled
    if (this.config.enableRetryLogic && this.isRetryableError(error)) {
      await this.scheduleRetry(changeDoc, config, targetName);
    } else if (this.config.deadLetterQueue) {
      await this.sendToDeadLetterQueue(changeDoc, config, targetName, error);
    }
  }

  isRetryableError(error) {
    // Define which errors should trigger retries
    const retryableErrorTypes = [
      'ECONNREFUSED',
      'ECONNRESET', 
      'ETIMEDOUT',
      'EAI_AGAIN',
      'ENOTFOUND'
    ];

    return retryableErrorTypes.includes(error.code) || 
           error.message?.includes('timeout') ||
           error.message?.includes('connection') ||
           (error.status >= 500 && error.status < 600);
  }

  async scheduleRetry(changeDoc, config, targetName) {
    // Implement exponential backoff retry
    const retryKey = `${config.collection}_${changeDoc.documentKey._id}_${targetName}`;

    // Get current retry count
    let retryCount = await this.getRetryCount(retryKey);

    if (retryCount < this.config.maxRetries) {
      const delayMs = this.config.retryDelayBase * Math.pow(2, retryCount);

      console.log(`Scheduling retry ${retryCount + 1} for ${targetName} in ${delayMs}ms`);

      setTimeout(async () => {
        try {
          await this.synchronizeToTarget(changeDoc, config, targetName);
          await this.clearRetryCount(retryKey);
        } catch (retryError) {
          await this.incrementRetryCount(retryKey);
          await this.handleSyncError(retryError, changeDoc, config, targetName);
        }
      }, delayMs);
    } else {
      console.error(`Max retries exceeded for ${targetName}, sending to dead letter queue`);
      await this.sendToDeadLetterQueue(changeDoc, config, targetName, new Error('Max retries exceeded'));
    }
  }

  async logChangeEvent(changeDoc, config) {
    try {
      await this.collections.changeLog.insertOne({
        changeId: changeDoc._id?.toString() || `${Date.now()}-${Math.random()}`,
        collection: config.collection,
        operationType: changeDoc.operationType,
        documentKey: changeDoc.documentKey,
        clusterTime: changeDoc.clusterTime,
        hasFullDocument: !!changeDoc.fullDocument,
        hasFullDocumentBeforeChange: !!changeDoc.fullDocumentBeforeChange,
        updateDescription: changeDoc.updateDescription,
        enrichment: changeDoc.enrichment,
        syncTargets: config.syncTargets,
        timestamp: new Date()
      });
    } catch (error) {
      console.warn('Error logging change event:', error.message);
    }
  }

  async saveResumeToken(collection, resumeToken) {
    try {
      await this.collections.cdcConfiguration.updateOne(
        { collection: collection },
        { 
          $set: { 
            lastResumeToken: resumeToken,
            lastResumeTokenUpdate: new Date()
          }
        }
      );

      this.resumeTokens.set(collection, resumeToken);
    } catch (error) {
      console.warn(`Error saving resume token for ${collection}:`, error.message);
    }
  }

  updateProcessingMetrics(processingTime) {
    const currentAvg = this.processingStats.avgProcessingTime;
    const totalProcessed = this.processingStats.totalChanges;

    this.processingStats.avgProcessingTime = 
      ((currentAvg * (totalProcessed - 1)) + processingTime) / totalProcessed;
  }

  async generateCDCHealthReport() {
    console.log('Generating CDC system health report...');

    try {
      const healthReport = {
        timestamp: new Date(),
        systemStatus: 'healthy',

        // Change stream status
        changeStreams: Array.from(this.changeStreams.entries()).map(([collection, info]) => ({
          collection: collection,
          status: info.stream.closed ? 'closed' : 'active',
          processedCount: info.processedCount,
          errorCount: info.errorCount,
          startTime: info.startTime,
          uptime: Date.now() - info.startTime.getTime()
        })),

        // Processing statistics
        processingStats: {
          ...this.processingStats,
          successRate: this.processingStats.totalChanges > 0 ? 
            (this.processingStats.successfulSyncs / this.processingStats.totalChanges * 100).toFixed(2) : 0,
          errorRate: this.processingStats.totalChanges > 0 ?
            (this.processingStats.failedSyncs / this.processingStats.totalChanges * 100).toFixed(2) : 0
        },

        // Sync target health
        syncTargetHealth: await this.checkSyncTargetHealth(),

        // Recent errors
        recentErrors: await this.collections.errorLog.find({
          timestamp: { $gte: new Date(Date.now() - 3600000) } // Last hour
        }).limit(10).toArray(),

        // Resume token status
        resumeTokenStatus: Array.from(this.resumeTokens.entries()).map(([collection, token]) => ({
          collection: collection,
          hasResumeToken: !!token,
          tokenAge: new Date() // Would calculate actual age in production
        }))
      };

      return healthReport;

    } catch (error) {
      console.error('Error generating health report:', error);
      return {
        timestamp: new Date(),
        systemStatus: 'error',
        error: error.message
      };
    }
  }

  async checkSyncTargetHealth() {
    const healthChecks = [];

    for (const target of this.config.syncTargets) {
      try {
        // Simulate health check for each target
        const healthCheck = {
          name: target.name,
          status: target.enabled ? 'healthy' : 'disabled',
          priority: target.priority,
          lastHealthCheck: new Date(),
          responseTime: Math.random() * 100 + 50 // Simulated response time
        };

        healthChecks.push(healthCheck);

      } catch (error) {
        healthChecks.push({
          name: target.name,
          status: 'unhealthy',
          error: error.message,
          lastHealthCheck: new Date()
        });
      }
    }

    return healthChecks;
  }

  // Utility methods for retry management
  async getRetryCount(retryKey) {
    const status = await this.collections.syncStatus.findOne({ retryKey: retryKey });
    return status ? status.retryCount : 0;
  }

  async incrementRetryCount(retryKey) {
    await this.collections.syncStatus.updateOne(
      { retryKey: retryKey },
      { 
        $inc: { retryCount: 1 },
        $set: { lastRetryAttempt: new Date() }
      },
      { upsert: true }
    );
  }

  async clearRetryCount(retryKey) {
    await this.collections.syncStatus.deleteOne({ retryKey: retryKey });
  }

  async sendToDeadLetterQueue(changeDoc, config, targetName, error) {
    console.log(`Sending to dead letter queue: ${config.collection} -> ${targetName}`);

    await this.collections.errorLog.insertOne({
      errorType: 'dead_letter_queue',
      collection: config.collection,
      targetName: targetName,
      changeDoc: changeDoc,
      config: config,
      error: {
        message: error.message,
        stack: error.stack
      },
      timestamp: new Date(),
      requiresManualIntervention: true
    });
  }
}

// Benefits of MongoDB Advanced Change Data Capture:
// - Real-time change detection with minimal latency and no polling overhead
// - Comprehensive change filtering and transformation capabilities
// - Built-in resume token support for fault-tolerant change stream processing
// - Advanced error handling with retry logic and dead letter queue management
// - Parallel synchronization to multiple targets with configurable priorities
// - Transaction-aware change ordering and consistency guarantees
// - Native MongoDB integration with minimal performance impact
// - Scalable architecture supporting high-volume change processing
// - Flexible transformation pipeline for data enrichment and formatting
// - SQL-compatible CDC operations through QueryLeaf integration

module.exports = {
  AdvancedCDCManager
};

Understanding MongoDB Change Data Capture Architecture

Advanced Change Stream and Synchronization Patterns

Implement sophisticated CDC patterns for production MongoDB deployments:

// Production-ready MongoDB CDC with enterprise-grade features
class EnterpriseCDCOrchestrator extends AdvancedCDCManager {
  constructor(db, enterpriseConfig) {
    super(db, enterpriseConfig);

    this.enterpriseConfig = {
      ...enterpriseConfig,
      enableDistributedProcessing: true,
      enableLoadBalancing: true,
      enableFailoverHandling: true,
      enableComplianceAuditing: true,
      enableDataLineage: true,
      enableSchemaEvolution: true
    };

    this.setupEnterpriseFeatures();
    this.initializeDistributedCDC();
    this.setupComplianceFramework();
  }

  async implementDistributedCDCProcessing() {
    console.log('Implementing distributed CDC processing across multiple nodes...');

    const distributedConfig = {
      // Multi-node change stream distribution
      nodeDistribution: {
        enableShardAwareness: true,
        balanceAcrossNodes: true,
        minimizeCrossShardOperations: true,
        optimizeForReplicaSetTopology: true
      },

      // Load balancing strategies
      loadBalancing: {
        dynamicWorkloadDistribution: true,
        nodeCapacityAware: true,
        latencyOptimized: true,
        failoverCapable: true
      },

      // Consistency guarantees
      consistencyManagement: {
        maintainChangeOrdering: true,
        transactionBoundaryRespect: true,
        causalConsistencyPreservation: true,
        replicationLagHandling: true
      }
    };

    return await this.deployDistributedCDC(distributedConfig);
  }

  async setupEnterpriseComplianceFramework() {
    console.log('Setting up enterprise compliance framework for CDC...');

    const complianceFramework = {
      // Data governance
      dataGovernance: {
        changeDataClassification: true,
        sensitiveDataDetection: true,
        accessControlEnforcement: true,
        dataRetentionPolicies: true
      },

      // Audit requirements
      auditCompliance: {
        comprehensiveChangeLogging: true,
        tamperEvidenceCapture: true,
        regulatoryReportingSupport: true,
        retentionPolicyEnforcement: true
      },

      // Security controls
      securityCompliance: {
        encryptionInTransit: true,
        encryptionAtRest: true,
        accessControlValidation: true,
        nonRepudiationSupport: true
      }
    };

    return await this.implementComplianceFramework(complianceFramework);
  }
}

SQL-Style Change Data Capture with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB Change Data Capture and real-time synchronization operations:

-- QueryLeaf advanced change data capture with SQL-familiar syntax for MongoDB

-- Configure comprehensive change data capture with advanced filtering and routing
CONFIGURE CHANGE_DATA_CAPTURE
SET enabled = true,
    resume_tokens = true,
    batch_size = 100,
    max_await_time_ms = 1000,
    full_document = 'updateLookup',
    full_document_before_change = 'whenAvailable';

-- Setup change stream monitoring with sophisticated filtering and transformation
CREATE CHANGE_STREAM product_changes_stream AS
WITH change_filtering AS (
  -- Advanced change detection filters
  SELECT 
    change_id,
    operation_type,
    collection_name,
    document_key,
    cluster_time,
    full_document,
    full_document_before_change,
    update_description,

    -- Intelligent change classification
    CASE 
      WHEN operation_type = 'insert' THEN 'new_product'
      WHEN operation_type = 'delete' THEN 'product_removal'
      WHEN operation_type = 'update' AND 
           JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL THEN 'price_change'
      WHEN operation_type = 'update' AND
           JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL THEN 'inventory_change'
      WHEN operation_type = 'update' AND
           JSON_EXTRACT(update_description, '$.updatedFields.status') IS NOT NULL THEN 'status_change'
      ELSE 'general_update'
    END as change_category,

    -- Priority scoring for processing order
    CASE 
      WHEN operation_type = 'delete' THEN 5
      WHEN operation_type = 'insert' THEN 4
      WHEN JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL THEN 4
      WHEN JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL THEN 3
      WHEN JSON_EXTRACT(update_description, '$.updatedFields.status') IS NOT NULL THEN 3
      ELSE 1
    END as processing_priority,

    -- Impact assessment
    CASE 
      WHEN operation_type IN ('insert', 'delete') THEN 'high'
      WHEN JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL THEN
        CASE 
          WHEN ABS(
            CAST(JSON_EXTRACT(full_document, '$.price') AS DECIMAL(10,2)) - 
            CAST(JSON_EXTRACT(full_document_before_change, '$.price') AS DECIMAL(10,2))
          ) > 50 THEN 'high'
          WHEN ABS(
            CAST(JSON_EXTRACT(full_document, '$.price') AS DECIMAL(10,2)) - 
            CAST(JSON_EXTRACT(full_document_before_change, '$.price') AS DECIMAL(10,2))
          ) > 10 THEN 'medium'
          ELSE 'low'
        END
      WHEN JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL THEN
        CASE 
          WHEN CAST(JSON_EXTRACT(full_document, '$.stock_quantity') AS INTEGER) = 0 THEN 'high'
          WHEN CAST(JSON_EXTRACT(full_document, '$.stock_quantity') AS INTEGER) < 10 THEN 'medium'
          ELSE 'low'
        END
      ELSE 'low'
    END as business_impact,

    -- Synchronization target determination
    ARRAY[
      CASE WHEN change_category IN ('new_product', 'product_removal', 'price_change') 
           THEN 'search_service' ELSE NULL END,
      CASE WHEN change_category IN ('new_product', 'price_change', 'inventory_change')
           THEN 'analytics_warehouse' ELSE NULL END,
      CASE WHEN change_category IN ('new_product', 'price_change', 'inventory_change', 'status_change')
           THEN 'cache_invalidation' ELSE NULL END,
      CASE WHEN change_category IN ('new_product', 'price_change')
           THEN 'recommendation_engine' ELSE NULL END
    ]::TEXT[] as sync_targets,

    -- Enhanced change metadata
    JSON_OBJECT(
      'change_detected_at', CURRENT_TIMESTAMP,
      'source_replica_set', CONNECTION_INFO('replica_set'),
      'source_node', CONNECTION_INFO('host'),
      'change_stream_id', CHANGE_STREAM_INFO('stream_id'),
      'resume_token', CHANGE_STREAM_INFO('resume_token')
    ) as change_metadata,

    CURRENT_TIMESTAMP as processed_at

  FROM CHANGE_STREAM('products')
  WHERE 
    -- Filter criteria for relevant changes
    operation_type IN ('insert', 'update', 'delete')
    AND (
      operation_type IN ('insert', 'delete') OR
      update_description IS NOT NULL AND (
        JSON_EXTRACT(update_description, '$.updatedFields.price') IS NOT NULL OR
        JSON_EXTRACT(update_description, '$.updatedFields.stock_quantity') IS NOT NULL OR
        JSON_EXTRACT(update_description, '$.updatedFields.status') IS NOT NULL OR
        JSON_EXTRACT(update_description, '$.updatedFields.category') IS NOT NULL
      )
    )
),

change_enrichment AS (
  SELECT 
    cf.*,

    -- Price change analysis
    CASE 
      WHEN cf.change_category = 'price_change' THEN
        JSON_OBJECT(
          'old_price', CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2)),
          'new_price', CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)),
          'price_change', 
            CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)) - 
            CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2)),
          'price_change_percent',
            ROUND(
              ((CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)) - 
                CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2))) /
               CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2))) * 100,
              2
            ),
          'price_direction',
            CASE 
              WHEN CAST(JSON_EXTRACT(cf.full_document, '$.price') AS DECIMAL(10,2)) > 
                   CAST(JSON_EXTRACT(cf.full_document_before_change, '$.price') AS DECIMAL(10,2))
              THEN 'increase' ELSE 'decrease'
            END
        )
      ELSE NULL
    END as price_analysis,

    -- Inventory change analysis
    CASE 
      WHEN cf.change_category = 'inventory_change' THEN
        JSON_OBJECT(
          'old_stock', CAST(JSON_EXTRACT(cf.full_document_before_change, '$.stock_quantity') AS INTEGER),
          'new_stock', CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER),
          'stock_change',
            CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) -
            CAST(JSON_EXTRACT(cf.full_document_before_change, '$.stock_quantity') AS INTEGER),
          'stock_status',
            CASE 
              WHEN CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) = 0 THEN 'out_of_stock'
              WHEN CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) < 10 THEN 'low_stock'
              WHEN CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) > 100 THEN 'high_stock'
              ELSE 'normal_stock'
            END,
          'restock_needed',
            CAST(JSON_EXTRACT(cf.full_document, '$.stock_quantity') AS INTEGER) < 10
        )
      ELSE NULL
    END as inventory_analysis,

    -- Generate search keywords for search service sync
    CASE 
      WHEN 'search_service' = ANY(cf.sync_targets) THEN
        ARRAY_CAT(
          STRING_TO_ARRAY(LOWER(JSON_EXTRACT_TEXT(cf.full_document, '$.product_name')), ' '),
          STRING_TO_ARRAY(LOWER(JSON_EXTRACT_TEXT(cf.full_document, '$.category')), ' ')
        )
      ELSE NULL
    END as search_keywords,

    -- Cache invalidation keys
    CASE 
      WHEN 'cache_invalidation' = ANY(cf.sync_targets) THEN
        ARRAY[
          'product:' || JSON_EXTRACT_TEXT(cf.document_key, '$._id'),
          'products:category:' || JSON_EXTRACT_TEXT(cf.full_document, '$.category'),
          'products:search:*'
        ]
      ELSE NULL
    END as cache_keys_to_invalidate

  FROM change_filtering cf
),

sync_routing AS (
  SELECT 
    ce.*,

    -- Generate target-specific sync payloads
    UNNEST(
      ARRAY_REMOVE(ce.sync_targets, NULL)
    ) as sync_target,

    -- Create sync payload based on target
    CASE UNNEST(ARRAY_REMOVE(ce.sync_targets, NULL))
      WHEN 'search_service' THEN
        JSON_OBJECT(
          'action', 
            CASE ce.operation_type 
              WHEN 'delete' THEN 'delete'
              ELSE 'upsert'
            END,
          'document', 
            CASE ce.operation_type
              WHEN 'delete' THEN NULL
              ELSE JSON_OBJECT(
                'id', JSON_EXTRACT_TEXT(ce.document_key, '$._id'),
                'title', JSON_EXTRACT_TEXT(ce.full_document, '$.product_name'),
                'content', JSON_EXTRACT_TEXT(ce.full_document, '$.description'),
                'category', JSON_EXTRACT_TEXT(ce.full_document, '$.category'),
                'price', CAST(JSON_EXTRACT(ce.full_document, '$.price') AS DECIMAL(10,2)),
                'in_stock', CAST(JSON_EXTRACT(ce.full_document, '$.stock_quantity') AS INTEGER) > 0,
                'keywords', ce.search_keywords,
                'last_updated', ce.processed_at
              )
            END,
          'priority', 
            CASE ce.business_impact
              WHEN 'high' THEN 'urgent'
              WHEN 'medium' THEN 'normal'
              ELSE 'low'
            END
        )

      WHEN 'analytics_warehouse' THEN
        JSON_OBJECT(
          'entity_type', 'product',
          'event_type', 'product_' || ce.operation_type,
          'event_timestamp', ce.processed_at,
          'entity_id', JSON_EXTRACT_TEXT(ce.document_key, '$._id'),
          'event_data', 
            CASE ce.operation_type
              WHEN 'delete' THEN ce.full_document_before_change
              ELSE ce.full_document
            END,
          'change_metadata', JSON_OBJECT(
            'updated_fields', 
              CASE WHEN ce.update_description IS NOT NULL THEN
                JSON_EXTRACT(ce.update_description, '$.updatedFields')
              ELSE NULL END,
            'removed_fields',
              CASE WHEN ce.update_description IS NOT NULL THEN
                JSON_EXTRACT(ce.update_description, '$.removedFields') 
              ELSE NULL END
          ),
          'enrichment', JSON_OBJECT(
            'price_analysis', ce.price_analysis,
            'inventory_analysis', ce.inventory_analysis,
            'business_impact', ce.business_impact,
            'change_category', ce.change_category
          )
        )

      WHEN 'recommendation_engine' THEN
        JSON_OBJECT(
          'entity_type', 'product',
          'entity_id', JSON_EXTRACT_TEXT(ce.document_key, '$._id'),
          'action', 
            CASE ce.operation_type
              WHEN 'delete' THEN 'remove'
              WHEN 'insert' THEN 'add'
              ELSE 'update'
            END,
          'product_data', 
            CASE ce.operation_type
              WHEN 'delete' THEN ce.full_document_before_change
              ELSE ce.full_document
            END,
          'recommendation_hints', JSON_OBJECT(
            'price_changed', ce.price_analysis IS NOT NULL,
            'new_product', ce.operation_type = 'insert',
            'business_impact', ce.business_impact,
            'category', JSON_EXTRACT_TEXT(ce.full_document, '$.category')
          )
        )

      WHEN 'cache_invalidation' THEN  
        JSON_OBJECT(
          'operation', 'invalidate',
          'keys', ce.cache_keys_to_invalidate,
          'cascade', true,
          'reason', ce.change_category,
          'priority', 
            CASE ce.business_impact
              WHEN 'high' THEN 1
              WHEN 'medium' THEN 2
              ELSE 3
            END
        )

      ELSE JSON_OBJECT('error', 'unknown_sync_target')
    END as sync_payload

  FROM change_enrichment ce
)

-- Execute synchronization with comprehensive tracking and monitoring
INSERT INTO sync_operations (
  change_id,
  operation_type,
  collection_name,
  document_key,
  change_category,
  business_impact,
  processing_priority,
  sync_target,
  sync_payload,
  sync_status,
  sync_attempt_count,
  created_at,
  scheduled_for
)
SELECT 
  sr.change_id,
  sr.operation_type,
  sr.collection_name,
  sr.document_key,
  sr.change_category,
  sr.business_impact,
  sr.processing_priority,
  sr.sync_target,
  sr.sync_payload,
  'pending' as sync_status,
  0 as sync_attempt_count,
  sr.processed_at as created_at,

  -- Schedule based on priority
  sr.processed_at + 
  CASE sr.processing_priority
    WHEN 5 THEN INTERVAL '0 seconds'     -- Immediate for deletes
    WHEN 4 THEN INTERVAL '5 seconds'     -- Near immediate for high priority
    WHEN 3 THEN INTERVAL '30 seconds'    -- Medium priority
    WHEN 2 THEN INTERVAL '2 minutes'     -- Lower priority
    ELSE INTERVAL '5 minutes'            -- Lowest priority
  END as scheduled_for

FROM sync_routing sr
ORDER BY sr.processing_priority DESC, sr.processed_at ASC;

-- Advanced change stream monitoring and analytics
WITH change_stream_analytics AS (
  SELECT 
    DATE_TRUNC('minute', processed_at) as time_bucket,
    change_category,
    business_impact,

    -- Volume metrics
    COUNT(*) as change_count,
    COUNT(DISTINCT document_key) as unique_documents_changed,

    -- Operation type distribution
    COUNT(*) FILTER (WHERE operation_type = 'insert') as inserts,
    COUNT(*) FILTER (WHERE operation_type = 'update') as updates, 
    COUNT(*) FILTER (WHERE operation_type = 'delete') as deletes,

    -- Business impact distribution
    COUNT(*) FILTER (WHERE business_impact = 'high') as high_impact_changes,
    COUNT(*) FILTER (WHERE business_impact = 'medium') as medium_impact_changes,
    COUNT(*) FILTER (WHERE business_impact = 'low') as low_impact_changes,

    -- Sync target requirements
    SUM(ARRAY_LENGTH(sync_targets, 1)) as total_sync_operations,
    AVG(ARRAY_LENGTH(sync_targets, 1)) as avg_sync_targets_per_change,

    -- Processing latency (from cluster time to processing)
    AVG(
      EXTRACT(MILLISECONDS FROM processed_at - cluster_time)
    ) as avg_processing_latency_ms,

    PERCENTILE_CONT(0.95) WITHIN GROUP (
      ORDER BY EXTRACT(MILLISECONDS FROM processed_at - cluster_time)
    ) as p95_processing_latency_ms

  FROM CHANGE_STREAM_LOG
  WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
  GROUP BY DATE_TRUNC('minute', processed_at), change_category, business_impact
),

sync_performance_analysis AS (
  SELECT 
    DATE_TRUNC('minute', created_at) as time_bucket,
    sync_target,

    -- Sync success metrics
    COUNT(*) as total_sync_attempts,
    COUNT(*) FILTER (WHERE sync_status = 'completed') as successful_syncs,
    COUNT(*) FILTER (WHERE sync_status = 'failed') as failed_syncs,
    COUNT(*) FILTER (WHERE sync_status = 'pending') as pending_syncs,
    COUNT(*) FILTER (WHERE sync_status = 'retrying') as retrying_syncs,

    -- Performance metrics
    AVG(sync_duration_ms) as avg_sync_duration_ms,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY sync_duration_ms) as p95_sync_duration_ms,
    AVG(sync_attempt_count) as avg_retry_count,

    -- Success rate calculation
    ROUND(
      (COUNT(*) FILTER (WHERE sync_status = 'completed')::FLOAT / 
       NULLIF(COUNT(*), 0)) * 100, 
      2
    ) as success_rate_percent,

    -- Queue depth analysis
    AVG(
      EXTRACT(MILLISECONDS FROM sync_started_at - scheduled_for)
    ) as avg_queue_wait_time_ms

  FROM sync_operations
  WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
  GROUP BY DATE_TRUNC('minute', created_at), sync_target
)

SELECT 
  csa.time_bucket,
  csa.change_category,
  csa.business_impact,

  -- Change stream metrics
  csa.change_count,
  csa.unique_documents_changed,
  csa.inserts,
  csa.updates,
  csa.deletes,

  -- Impact distribution
  csa.high_impact_changes,
  csa.medium_impact_changes,
  csa.low_impact_changes,

  -- Processing performance
  ROUND(csa.avg_processing_latency_ms::NUMERIC, 2) as avg_processing_latency_ms,
  ROUND(csa.p95_processing_latency_ms::NUMERIC, 2) as p95_processing_latency_ms,

  -- Sync requirements
  csa.total_sync_operations,
  ROUND(csa.avg_sync_targets_per_change::NUMERIC, 2) as avg_sync_targets_per_change,

  -- Sync performance by target
  JSON_OBJECT_AGG(
    spa.sync_target,
    JSON_OBJECT(
      'success_rate', spa.success_rate_percent,
      'avg_duration_ms', ROUND(spa.avg_sync_duration_ms::NUMERIC, 2),
      'p95_duration_ms', ROUND(spa.p95_sync_duration_ms::NUMERIC, 2),
      'avg_queue_wait_ms', ROUND(spa.avg_queue_wait_time_ms::NUMERIC, 2),
      'pending_count', spa.pending_syncs,
      'retry_count', spa.retrying_syncs
    )
  ) as sync_performance_by_target,

  -- Health indicators
  CASE 
    WHEN AVG(spa.success_rate_percent) > 95 THEN 'healthy'
    WHEN AVG(spa.success_rate_percent) > 90 THEN 'warning'
    ELSE 'critical'
  END as overall_health_status,

  -- Alerts and recommendations
  ARRAY[
    CASE WHEN csa.avg_processing_latency_ms > 1000 
         THEN 'High processing latency detected' END,
    CASE WHEN AVG(spa.success_rate_percent) < 95 
         THEN 'Sync success rate below threshold' END,
    CASE WHEN AVG(spa.avg_queue_wait_time_ms) > 30000 
         THEN 'High sync queue wait times' END,
    CASE WHEN csa.high_impact_changes > 100 
         THEN 'Unusual volume of high-impact changes' END
  ]::TEXT[] as alert_conditions

FROM change_stream_analytics csa
LEFT JOIN sync_performance_analysis spa ON csa.time_bucket = spa.time_bucket
GROUP BY 
  csa.time_bucket, csa.change_category, csa.business_impact,
  csa.change_count, csa.unique_documents_changed, csa.inserts, csa.updates, csa.deletes,
  csa.high_impact_changes, csa.medium_impact_changes, csa.low_impact_changes,
  csa.avg_processing_latency_ms, csa.p95_processing_latency_ms,
  csa.total_sync_operations, csa.avg_sync_targets_per_change
ORDER BY csa.time_bucket DESC, csa.change_count DESC;

-- Real-time CDC health monitoring dashboard
CREATE VIEW cdc_health_dashboard AS
WITH real_time_metrics AS (
  SELECT 
    -- Current timestamp for real-time display
    CURRENT_TIMESTAMP as dashboard_time,

    -- Change stream status
    (SELECT COUNT(*) FROM ACTIVE_CHANGE_STREAMS) as active_streams,
    (SELECT COUNT(*) FROM CHANGE_STREAM_LOG 
     WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '1 minute') as changes_last_minute,
    (SELECT COUNT(*) FROM CHANGE_STREAM_LOG 
     WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes') as changes_last_5_minutes,

    -- Sync operation status
    (SELECT COUNT(*) FROM sync_operations WHERE sync_status = 'pending') as pending_syncs,
    (SELECT COUNT(*) FROM sync_operations WHERE sync_status = 'retrying') as retrying_syncs,
    (SELECT COUNT(*) FROM sync_operations WHERE sync_status = 'failed') as failed_syncs,

    -- Performance indicators
    (SELECT AVG(EXTRACT(MILLISECONDS FROM processed_at - cluster_time))
     FROM CHANGE_STREAM_LOG 
     WHERE processed_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes') as avg_latency_5m,

    (SELECT AVG(sync_duration_ms) FROM sync_operations 
     WHERE sync_completed_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
     AND sync_status = 'completed') as avg_sync_duration_5m,

    -- Error rates
    (SELECT COUNT(*) FROM sync_operations 
     WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
     AND sync_status = 'failed') as sync_failures_5m,

    (SELECT COUNT(*) FROM sync_operations
     WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '5 minutes') as total_syncs_5m
)

SELECT 
  dashboard_time,

  -- Stream health
  active_streams,
  changes_last_minute,
  changes_last_5_minutes,
  ROUND(changes_last_5_minutes / 5.0, 1) as avg_changes_per_minute,

  -- Queue status
  pending_syncs,
  retrying_syncs, 
  failed_syncs,

  -- Performance metrics
  ROUND(avg_latency_5m::NUMERIC, 2) as avg_processing_latency_ms,
  ROUND(avg_sync_duration_5m::NUMERIC, 2) as avg_sync_duration_ms,

  -- Health indicators
  CASE 
    WHEN pending_syncs + retrying_syncs > 1000 THEN 'critical'
    WHEN pending_syncs + retrying_syncs > 500 THEN 'warning'
    WHEN avg_latency_5m > 1000 THEN 'warning'
    ELSE 'healthy'
  END as system_health,

  -- Error rates
  sync_failures_5m,
  total_syncs_5m,
  CASE 
    WHEN total_syncs_5m > 0 THEN
      ROUND((sync_failures_5m::FLOAT / total_syncs_5m * 100)::NUMERIC, 2)
    ELSE 0
  END as error_rate_percent,

  -- Capacity indicators
  CASE 
    WHEN changes_last_minute > 1000 THEN 'high_volume'
    WHEN changes_last_minute > 100 THEN 'medium_volume'
    ELSE 'normal_volume'  
  END as current_load,

  -- Operational status
  ARRAY[
    CASE WHEN active_streams = 0 THEN 'No active change streams' END,
    CASE WHEN failed_syncs > 10 THEN 'High number of failed syncs' END,
    CASE WHEN pending_syncs > 500 THEN 'High sync queue depth' END,
    CASE WHEN avg_latency_5m > 1000 THEN 'High processing latency' END
  ]::TEXT[] as current_alerts

FROM real_time_metrics;

-- QueryLeaf provides comprehensive MongoDB CDC capabilities:
-- 1. SQL-familiar syntax for change stream configuration and monitoring
-- 2. Advanced change filtering and routing with business logic integration
-- 3. Intelligent sync target determination based on change characteristics
-- 4. Real-time change processing with priority-based queue management
-- 5. Comprehensive error handling and retry mechanisms with dead letter queues
-- 6. Advanced performance monitoring and analytics with health indicators
-- 7. Production-ready CDC operations with resume token management
-- 8. Integration with MongoDB's native change stream capabilities
-- 9. Sophisticated transformation and enrichment pipeline support
-- 10. Enterprise-grade compliance and audit trail capabilities

Best Practices for Production CDC Implementation

Change Data Capture Strategy Design

Essential principles for effective MongoDB CDC deployment and management:

  1. Stream Configuration: Configure change streams with appropriate filtering, batch sizing, and resume token management for reliability
  2. Transformation Pipeline: Design flexible transformation pipelines that can adapt to schema evolution and business requirement changes
  3. Error Handling: Implement comprehensive error handling with retry logic, dead letter queues, and alert mechanisms
  4. Performance Optimization: Optimize CDC performance through intelligent batching, parallel processing, and resource management
  5. Monitoring and Observability: Deploy comprehensive monitoring that tracks change stream health, sync performance, and business metrics
  6. Scalability Planning: Design CDC architecture that can scale with data volume growth and increasing synchronization requirements

Enterprise CDC Deployment

Optimize CDC systems for production enterprise environments:

  1. Distributed Processing: Implement distributed CDC processing that can handle high-volume change streams across multiple nodes
  2. Compliance Integration: Ensure CDC operations meet regulatory requirements for data lineage, audit trails, and access controls
  3. Disaster Recovery: Design CDC systems with failover capabilities and data recovery procedures for business continuity
  4. Security Controls: Implement encryption, access controls, and security monitoring for CDC data flows
  5. Operational Integration: Integrate CDC with existing monitoring, alerting, and operational workflows for seamless management
  6. Cost Optimization: Monitor and optimize CDC resource usage and synchronization costs for efficient operations

Conclusion

MongoDB Change Data Capture provides sophisticated real-time data synchronization capabilities that enable modern event-driven architectures, distributed system integration, and responsive data pipelines without the complexity and limitations of traditional CDC approaches. Native change streams offer reliable, scalable, and efficient change detection with minimal performance impact and comprehensive transformation capabilities.

Key MongoDB CDC benefits include:

  • Real-Time Synchronization: Immediate change detection and propagation without polling delays or batch processing limitations
  • Advanced Filtering: Sophisticated change stream filtering and routing based on business logic and data characteristics
  • Fault Tolerance: Built-in resume token support and error handling for reliable change stream processing
  • Scalable Architecture: Native MongoDB integration that scales efficiently with data volume and system complexity
  • Flexible Transformations: Comprehensive data transformation and enrichment capabilities for target-specific synchronization
  • SQL Accessibility: Familiar SQL-style CDC operations through QueryLeaf for accessible change data capture management

Whether you're building event-driven microservices, maintaining data warehouse synchronization, implementing search index updates, or orchestrating complex distributed system workflows, MongoDB CDC with QueryLeaf's familiar SQL interface provides the foundation for reliable, efficient, and scalable real-time data synchronization.

QueryLeaf Integration: QueryLeaf automatically translates SQL-style CDC configurations into MongoDB's native change streams, making advanced real-time synchronization accessible to SQL-oriented development teams. Complex change filtering, transformation pipelines, and sync orchestration are seamlessly handled through familiar SQL constructs, enabling sophisticated event-driven architectures without requiring deep MongoDB change stream expertise.

The combination of MongoDB's robust change data capture capabilities with SQL-style synchronization operations makes it an ideal platform for applications requiring both real-time data propagation and familiar database management patterns, ensuring your distributed systems can maintain data consistency and responsiveness as they scale and evolve.