Skip to content

Blog

MongoDB Time Series Collections for IoT and Real-Time Analytics: High-Performance Sensor Data Management and Stream Processing

Modern IoT applications generate massive volumes of time-stamped sensor data that require specialized storage and query optimization strategies. Traditional relational databases struggle with the volume, velocity, and analytical requirements of IoT workloads, particularly when dealing with millions of data points per second from distributed sensor networks, real-time alerting systems, and complex analytical queries across historical time ranges.

MongoDB time series collections provide purpose-built storage and query optimization specifically designed for time-stamped data, offering automatic data organization, specialized compression algorithms, and optimized aggregation pipelines that can handle high-velocity IoT data ingestion while supporting real-time analytics and historical trend analysis at scale.

The IoT Data Challenge

Traditional approaches to storing and analyzing time series data face significant scalability and performance limitations:

-- Traditional PostgreSQL time series approach - performance bottlenecks
CREATE TABLE sensor_readings (
    reading_id BIGSERIAL PRIMARY KEY,
    device_id VARCHAR(50) NOT NULL,
    sensor_type VARCHAR(50) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    value DECIMAL(15,4) NOT NULL,
    unit VARCHAR(20),
    location_lat DECIMAL(10,8),
    location_lng DECIMAL(11,8),

    -- Basic metadata
    device_status VARCHAR(20) DEFAULT 'online',
    data_quality INTEGER DEFAULT 100,

    CONSTRAINT valid_quality CHECK (data_quality BETWEEN 0 AND 100)
);

-- Partitioning by time (complex maintenance)
CREATE TABLE sensor_readings_2025_01 PARTITION OF sensor_readings
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE sensor_readings_2025_02 PARTITION OF sensor_readings
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

-- Multiple indexes required for different query patterns
CREATE INDEX idx_sensor_device_time ON sensor_readings (device_id, timestamp);
CREATE INDEX idx_sensor_type_time ON sensor_readings (sensor_type, timestamp);
CREATE INDEX idx_sensor_location ON sensor_readings (location_lat, location_lng);
CREATE INDEX idx_sensor_timestamp ON sensor_readings (timestamp);

-- Complex aggregation queries for analytics
WITH hourly_averages AS (
    SELECT 
        device_id,
        sensor_type,
        DATE_TRUNC('hour', timestamp) as hour_bucket,
        AVG(value) as avg_value,
        COUNT(*) as reading_count,
        MIN(value) as min_value,
        MAX(value) as max_value,
        STDDEV(value) as stddev_value
    FROM sensor_readings
    WHERE timestamp >= NOW() - INTERVAL '24 hours'
      AND device_status = 'online'
      AND data_quality > 80
    GROUP BY device_id, sensor_type, DATE_TRUNC('hour', timestamp)
),

device_statistics AS (
    SELECT 
        device_id,
        sensor_type,
        COUNT(*) as total_hours,
        AVG(avg_value) as daily_average,
        MAX(max_value) as daily_peak,
        MIN(min_value) as daily_low,
        AVG(reading_count) as avg_readings_per_hour,

        -- Calculate trend using linear regression approximation
        CASE 
            WHEN COUNT(*) > 1 THEN
                (COUNT(*) * SUM(EXTRACT(EPOCH FROM hour_bucket) * avg_value) - 
                 SUM(EXTRACT(EPOCH FROM hour_bucket)) * SUM(avg_value)) /
                (COUNT(*) * SUM(POWER(EXTRACT(EPOCH FROM hour_bucket), 2)) - 
                 POWER(SUM(EXTRACT(EPOCH FROM hour_bucket)), 2))
            ELSE 0
        END as trend_slope
    FROM hourly_averages
    GROUP BY device_id, sensor_type
)

-- Final aggregation (performance intensive)
SELECT 
    ds.device_id,
    ds.sensor_type,
    ROUND(ds.daily_average, 2) as avg_24h,
    ROUND(ds.daily_peak, 2) as peak_24h,
    ROUND(ds.daily_low, 2) as low_24h,
    ROUND(ds.avg_readings_per_hour, 0) as readings_per_hour,

    -- Trend analysis
    CASE 
        WHEN ds.trend_slope > 0.1 THEN 'rising'
        WHEN ds.trend_slope < -0.1 THEN 'falling'
        ELSE 'stable'
    END as trend_direction,

    -- Alert conditions
    CASE 
        WHEN ds.daily_peak > 100 THEN 'high_alert'
        WHEN ds.daily_low < 10 THEN 'low_alert'
        WHEN ds.avg_readings_per_hour < 5 THEN 'connectivity_alert'
        ELSE 'normal'
    END as alert_status,

    ds.total_hours

FROM device_statistics ds
WHERE ds.total_hours >= 20  -- At least 20 hours of data
ORDER BY ds.device_id, ds.sensor_type;

-- Challenges with traditional time series approaches:
-- 1. Storage overhead - separate tables and partition management
-- 2. Index explosion - multiple indexes needed for various query patterns
-- 3. Query complexity - complex CTEs and window functions for basic analytics
-- 4. Maintenance burden - manual partition creation and cleanup
-- 5. Limited compression - basic storage compression insufficient for time series patterns
-- 6. Scaling bottlenecks - horizontal scaling requires complex sharding strategies
-- 7. Real-time constraints - difficult to optimize for both writes and analytics
-- 8. Data lifecycle management - complex procedures for archiving and cleanup

-- Real-time ingestion performance issues
INSERT INTO sensor_readings (
    device_id, sensor_type, timestamp, value, unit, 
    location_lat, location_lng, device_status, data_quality
)
SELECT 
    'device_' || (random() * 1000)::int,
    CASE (random() * 5)::int 
        WHEN 0 THEN 'temperature'
        WHEN 1 THEN 'humidity'
        WHEN 2 THEN 'pressure'
        WHEN 3 THEN 'light'
        ELSE 'motion'
    END,
    NOW() - (random() * interval '1 hour'),
    random() * 100,
    CASE (random() * 5)::int 
        WHEN 0 THEN 'celsius'
        WHEN 1 THEN 'percent'
        WHEN 2 THEN 'pascal'
        WHEN 3 THEN 'lux'
        ELSE 'boolean'
    END,
    40.7128 + (random() - 0.5) * 0.1,
    -74.0060 + (random() - 0.5) * 0.1,
    CASE WHEN random() > 0.1 THEN 'online' ELSE 'offline' END,
    80 + (random() * 20)::int
FROM generate_series(1, 10000) -- 10K inserts - already showing performance issues
ON CONFLICT DO NOTHING;

-- Problems:
-- 1. Linear performance degradation with data volume
-- 2. Index maintenance overhead during high-velocity writes
-- 3. Lock contention during concurrent analytics and writes
-- 4. Complex query optimization required for time-range queries
-- 5. Storage bloat due to lack of time-series specific compression
-- 6. Difficult to implement real-time alerting efficiently
-- 7. Complex setup for distributed deployments across geographic regions
-- 8. Limited built-in support for time-series specific operations

MongoDB Time Series Collections eliminate these limitations with purpose-built time series capabilities:

// MongoDB Time Series Collections - optimized for IoT and analytics workloads
const { MongoClient, ObjectId } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
const db = client.db('iot_platform');

class MongoDBTimeSeriesManager {
  constructor(db) {
    this.db = db;

    // Time series collections with automatic optimization
    this.sensorData = null;
    this.deviceMetrics = null;
    this.analyticsCache = null;

    this.initializeTimeSeriesCollections();
  }

  async initializeTimeSeriesCollections() {
    console.log('Setting up optimized time series collections...');

    try {
      // Primary sensor data collection
      await this.db.createCollection('sensor_readings', {
        timeseries: {
          timeField: 'timestamp',
          metaField: 'device',
          granularity: 'seconds', // Optimal for IoT sensor data
          bucketMaxSpanSeconds: 3600, // 1-hour buckets
          bucketRoundingSeconds: 60 // Round to nearest minute
        },
        expireAfterSeconds: 31536000, // 1 year retention
        storageEngine: { wiredTiger: { configString: 'block_compressor=zstd' } }
      });

      // Device analytics and metrics collection
      await this.db.createCollection('device_metrics', {
        timeseries: {
          timeField: 'timestamp',
          metaField: 'device_info',
          granularity: 'minutes', // Aggregated data points
          bucketMaxSpanSeconds: 86400, // 24-hour buckets for analytics
          bucketRoundingSeconds: 3600 // Round to nearest hour
        },
        expireAfterSeconds: 94608000 // 3 year retention for analytics
      });

      // Real-time analytics cache for dashboard queries
      await this.db.createCollection('analytics_cache', {
        timeseries: {
          timeField: 'computed_at',
          metaField: 'computation_type',
          granularity: 'minutes'
        },
        expireAfterSeconds: 604800 // 1 week cache retention
      });

      this.sensorData = this.db.collection('sensor_readings');
      this.deviceMetrics = this.db.collection('device_metrics');
      this.analyticsCache = this.db.collection('analytics_cache');

      // Create specialized indexes for time series queries
      await this.createOptimizedIndexes();

      console.log('Time series collections initialized successfully');

    } catch (error) {
      console.error('Error initializing time series collections:', error);
      throw error;
    }
  }

  async createOptimizedIndexes() {
    console.log('Creating optimized time series indexes...');

    // Sensor data indexes
    await this.sensorData.createIndexes([
      // Compound index for device + time range queries
      { 
        key: { 'device.id': 1, timestamp: 1 },
        name: 'device_time_optimal'
      },

      // Sensor type + time for analytics
      { 
        key: { 'device.sensor_type': 1, timestamp: 1 },
        name: 'sensor_type_time'
      },

      // Geospatial index for location-based queries
      { 
        key: { 'device.location': '2dsphere' },
        name: 'device_location_geo'
      },

      // Value range index for threshold queries
      { 
        key: { 'measurements.value': 1, timestamp: 1 },
        name: 'value_threshold_time',
        partialFilterExpression: { 'measurements.value': { $exists: true } }
      }
    ]);

    // Device metrics indexes
    await this.deviceMetrics.createIndexes([
      {
        key: { 'device_info.id': 1, timestamp: -1 },
        name: 'device_metrics_latest'
      },

      {
        key: { 'device_info.facility': 1, timestamp: 1 },
        name: 'facility_time_series'
      }
    ]);

    console.log('Time series indexes created successfully');
  }

  async ingestSensorData(deviceId, sensorType, measurements, metadata = {}) {
    const timestamp = new Date();

    try {
      const document = {
        timestamp: timestamp,

        // Metadata field for efficient bucketing
        device: {
          id: deviceId,
          sensor_type: sensorType,
          facility: metadata.facility || 'default',
          zone: metadata.zone || 'unspecified',
          location: metadata.location ? {
            type: 'Point',
            coordinates: [metadata.location.lng, metadata.location.lat]
          } : null,

          // Device characteristics
          model: metadata.model || 'unknown',
          firmware_version: metadata.firmware_version || '1.0',
          installation_date: metadata.installation_date,

          // Network information
          network_info: {
            connection_type: metadata.connection_type || 'wifi',
            signal_strength: metadata.signal_strength || -50,
            gateway_id: metadata.gateway_id
          }
        },

        // Time series measurements
        measurements: this.normalizeMeasurements(measurements),

        // Data quality and status
        quality_metrics: {
          data_quality_score: this.calculateDataQuality(measurements, metadata),
          sensor_health: metadata.sensor_health || 'normal',
          calibration_status: metadata.calibration_status || 'valid',
          measurement_accuracy: metadata.measurement_accuracy || 0.95
        },

        // Processing metadata
        processing_info: {
          ingestion_timestamp: timestamp,
          processing_latency_ms: 0,
          source: metadata.source || 'sensor_direct',
          batch_id: metadata.batch_id,
          schema_version: '2.0'
        }
      };

      const result = await this.sensorData.insertOne(document);

      // Trigger real-time processing if enabled
      if (metadata.enable_realtime_processing !== false) {
        await this.processRealTimeAnalytics(document);
      }

      return {
        success: true,
        documentId: result.insertedId,
        timestamp: timestamp,
        bucketed: true, // Time series collections automatically bucket
        processing_time_ms: Date.now() - timestamp.getTime()
      };

    } catch (error) {
      console.error('Sensor data ingestion failed:', error);
      return {
        success: false,
        error: error.message,
        timestamp: timestamp
      };
    }
  }

  normalizeMeasurements(rawMeasurements) {
    // Normalize different measurement formats into consistent structure
    const normalized = {};

    if (Array.isArray(rawMeasurements)) {
      // Handle array of measurement objects
      rawMeasurements.forEach(measurement => {
        if (measurement.type && measurement.value !== undefined) {
          normalized[measurement.type] = {
            value: Number(measurement.value),
            unit: measurement.unit || '',
            precision: measurement.precision || 2,
            range: measurement.range || { min: null, max: null }
          };
        }
      });
    } else if (typeof rawMeasurements === 'object') {
      // Handle object with measurement properties
      Object.entries(rawMeasurements).forEach(([key, value]) => {
        if (typeof value === 'number') {
          normalized[key] = {
            value: value,
            unit: '',
            precision: 2,
            range: { min: null, max: null }
          };
        } else if (typeof value === 'object' && value.value !== undefined) {
          normalized[key] = {
            value: Number(value.value),
            unit: value.unit || '',
            precision: value.precision || 2,
            range: value.range || { min: null, max: null }
          };
        }
      });
    }

    return normalized;
  }

  calculateDataQuality(measurements, metadata) {
    let qualityScore = 100;

    // Check signal strength impact
    if (metadata.signal_strength < -80) {
      qualityScore -= 20;
    } else if (metadata.signal_strength < -70) {
      qualityScore -= 10;
    }

    // Check measurement consistency
    Object.values(measurements).forEach(measurement => {
      if (typeof measurement === 'object' && measurement.value !== undefined) {
        const value = Number(measurement.value);
        const range = measurement.range;

        if (range && range.min !== null && range.max !== null) {
          if (value < range.min || value > range.max) {
            qualityScore -= 15; // Out of expected range
          }
        }

        // Check for anomalous readings
        if (isNaN(value) || !isFinite(value)) {
          qualityScore -= 30;
        }
      }
    });

    return Math.max(0, qualityScore);
  }

  async processRealTimeAnalytics(document) {
    const deviceId = document.device.id;
    const timestamp = document.timestamp;

    // Real-time threshold monitoring
    await this.checkAlertThresholds(document);

    // Update device status and health metrics
    await this.updateDeviceHealthMetrics(deviceId, document);

    // Calculate rolling averages for dashboard
    await this.updateRollingAverages(deviceId, document);
  }

  async checkAlertThresholds(document) {
    const measurements = document.measurements;
    const deviceId = document.device.id;
    const sensorType = document.device.sensor_type;

    // Define threshold rules (could be stored in configuration collection)
    const thresholds = {
      temperature: { min: -10, max: 60, critical: 80 },
      humidity: { min: 0, max: 100, critical: 95 },
      pressure: { min: 900, max: 1100, critical: 1200 },
      light: { min: 0, max: 100000, critical: 120000 },
      motion: { min: 0, max: 1, critical: null }
    };

    const sensorThresholds = thresholds[sensorType];
    if (!sensorThresholds) return;

    Object.entries(measurements).forEach(async ([measurementType, measurement]) => {
      const value = measurement.value;
      const threshold = sensorThresholds;

      let alertLevel = null;
      let alertMessage = null;

      if (threshold.critical && value > threshold.critical) {
        alertLevel = 'critical';
        alertMessage = `Critical ${measurementType} level: ${value} (threshold: ${threshold.critical})`;
      } else if (value > threshold.max) {
        alertLevel = 'high';
        alertMessage = `High ${measurementType} level: ${value} (max: ${threshold.max})`;
      } else if (value < threshold.min) {
        alertLevel = 'low';
        alertMessage = `Low ${measurementType} level: ${value} (min: ${threshold.min})`;
      }

      if (alertLevel) {
        await this.createAlert({
          device_id: deviceId,
          sensor_type: sensorType,
          measurement_type: measurementType,
          alert_level: alertLevel,
          message: alertMessage,
          value: value,
          threshold: threshold,
          timestamp: document.timestamp,
          location: document.device.location
        });
      }
    });
  }

  async createAlert(alertData) {
    const alertsCollection = this.db.collection('alerts');

    const alert = {
      _id: new ObjectId(),
      ...alertData,
      created_at: new Date(),
      status: 'active',
      acknowledged: false,
      acknowledged_by: null,
      acknowledged_at: null,
      resolved: false,
      resolved_at: null,
      escalation_level: 0,

      // Alert metadata
      correlation_id: `${alertData.device_id}_${alertData.sensor_type}_${alertData.measurement_type}`,
      alert_hash: this.calculateAlertHash(alertData)
    };

    // Check for duplicate recent alerts (deduplication)
    const recentAlert = await alertsCollection.findOne({
      alert_hash: alert.alert_hash,
      created_at: { $gte: new Date(Date.now() - 300000) }, // Last 5 minutes
      status: 'active'
    });

    if (!recentAlert) {
      await alertsCollection.insertOne(alert);

      // Trigger real-time notifications
      await this.sendAlertNotification(alert);
    } else {
      // Update escalation level for repeated alerts
      await alertsCollection.updateOne(
        { _id: recentAlert._id },
        { 
          $inc: { escalation_level: 1 },
          $set: { last_occurrence: new Date() }
        }
      );
    }
  }

  calculateAlertHash(alertData) {
    const crypto = require('crypto');
    const hashString = `${alertData.device_id}:${alertData.sensor_type}:${alertData.measurement_type}:${alertData.alert_level}`;
    return crypto.createHash('md5').update(hashString).digest('hex');
  }

  async sendAlertNotification(alert) {
    // Implementation would integrate with notification systems
    console.log(`ALERT [${alert.alert_level.toUpperCase()}]: ${alert.message}`);

    // Here you would integrate with:
    // - Email/SMS services
    // - Slack/Teams webhooks  
    // - PagerDuty/OpsGenie
    // - Custom notification APIs
  }

  async updateDeviceHealthMetrics(deviceId, document) {
    const now = new Date();

    // Calculate device health score based on multiple factors
    const healthMetrics = {
      timestamp: now,
      device_info: {
        id: deviceId,
        facility: document.device.facility,
        zone: document.device.zone
      },

      health_indicators: {
        data_quality_score: document.quality_metrics.data_quality_score,
        signal_strength: document.device.network_info.signal_strength,
        sensor_health: document.quality_metrics.sensor_health,
        measurement_frequency: await this.calculateMeasurementFrequency(deviceId),
        last_communication: now,

        // Calculated health score
        overall_health_score: this.calculateOverallHealthScore(document),

        // Status indicators
        is_online: true,
        is_responsive: true,
        calibration_valid: document.quality_metrics.calibration_status === 'valid'
      },

      performance_metrics: {
        uptime_percentage: await this.calculateUptimePercentage(deviceId),
        average_response_time_ms: document.processing_info.processing_latency_ms,
        data_completeness_percentage: 100, // Could be calculated based on expected vs actual measurements
        error_rate_percentage: 0 // Could be calculated from failed measurements
      }
    };

    await this.deviceMetrics.insertOne(healthMetrics);
  }

  calculateOverallHealthScore(document) {
    let score = 100;

    // Factor in data quality
    score = score * (document.quality_metrics.data_quality_score / 100);

    // Factor in signal strength
    const signalStrength = document.device.network_info.signal_strength;
    if (signalStrength < -80) {
      score *= 0.8;
    } else if (signalStrength < -70) {
      score *= 0.9;
    }

    // Factor in sensor health
    if (document.quality_metrics.sensor_health !== 'normal') {
      score *= 0.7;
    }

    return Math.round(score);
  }

  async calculateMeasurementFrequency(deviceId, windowMinutes = 60) {
    const windowStart = new Date(Date.now() - windowMinutes * 60 * 1000);

    const count = await this.sensorData.countDocuments({
      'device.id': deviceId,
      timestamp: { $gte: windowStart }
    });

    return count / windowMinutes; // Measurements per minute
  }

  async calculateUptimePercentage(deviceId, windowHours = 24) {
    const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000);

    // Get expected measurement intervals (assuming every minute)
    const expectedMeasurements = windowHours * 60;

    const actualMeasurements = await this.sensorData.countDocuments({
      'device.id': deviceId,
      timestamp: { $gte: windowStart }
    });

    return Math.min(100, (actualMeasurements / expectedMeasurements) * 100);
  }

  async updateRollingAverages(deviceId, document) {
    // Update cached analytics for dashboard performance
    const measurementTypes = Object.keys(document.measurements);

    for (const measurementType of measurementTypes) {
      const value = document.measurements[measurementType].value;

      // Calculate rolling averages for different time windows
      const timeWindows = [
        { name: '5min', minutes: 5 },
        { name: '1hour', minutes: 60 },
        { name: '24hour', minutes: 1440 }
      ];

      for (const window of timeWindows) {
        await this.updateWindowAverage(deviceId, measurementType, value, window);
      }
    }
  }

  async updateWindowAverage(deviceId, measurementType, currentValue, window) {
    const windowStart = new Date(Date.now() - window.minutes * 60 * 1000);

    // Calculate average for the time window using aggregation
    const pipeline = [
      {
        $match: {
          'device.id': deviceId,
          timestamp: { $gte: windowStart },
          [`measurements.${measurementType}`]: { $exists: true }
        }
      },
      {
        $group: {
          _id: null,
          average: { $avg: `$measurements.${measurementType}.value` },
          count: { $sum: 1 },
          min: { $min: `$measurements.${measurementType}.value` },
          max: { $max: `$measurements.${measurementType}.value` },
          stddev: { $stdDevPop: `$measurements.${measurementType}.value` }
        }
      }
    ];

    const result = await this.sensorData.aggregate(pipeline).next();

    if (result) {
      const cacheDocument = {
        computed_at: new Date(),
        computation_type: {
          type: 'rolling_average',
          device_id: deviceId,
          measurement_type: measurementType,
          window: window.name
        },

        statistics: {
          average: result.average,
          count: result.count,
          min: result.min,
          max: result.max,
          stddev: result.stddev || 0,
          current_value: currentValue,

          // Trend calculation
          trend: currentValue > result.average ? 'rising' : 
                 currentValue < result.average ? 'falling' : 'stable',
          deviation_percentage: Math.abs((currentValue - result.average) / result.average * 100)
        }
      };

      await this.analyticsCache.replaceOne(
        { 
          'computation_type.type': 'rolling_average',
          'computation_type.device_id': deviceId,
          'computation_type.measurement_type': measurementType,
          'computation_type.window': window.name
        },
        cacheDocument,
        { upsert: true }
      );
    }
  }

  async getDeviceAnalytics(deviceId, options = {}) {
    const timeRange = options.timeRange || '24h';
    const measurementTypes = options.measurementTypes || null;
    const includeAggregations = options.includeAggregations !== false;

    try {
      // Parse time range
      const timeRangeMs = this.parseTimeRange(timeRange);
      const startTime = new Date(Date.now() - timeRangeMs);

      // Build aggregation pipeline
      const pipeline = [
        {
          $match: {
            'device.id': deviceId,
            timestamp: { $gte: startTime }
          }
        }
      ];

      // Add measurement type filtering if specified
      if (measurementTypes && measurementTypes.length > 0) {
        const measurementFilters = {};
        measurementTypes.forEach(type => {
          measurementFilters[`measurements.${type}`] = { $exists: true };
        });
        pipeline.push({ $match: { $or: Object.entries(measurementFilters).map(([key, value]) => ({ [key]: value })) } });
      }

      if (includeAggregations) {
        // Add aggregation stages for comprehensive analytics
        pipeline.push(
          {
            $addFields: {
              hour_bucket: {
                $dateTrunc: { date: '$timestamp', unit: 'hour' }
              }
            }
          },
          {
            $group: {
              _id: {
                hour: '$hour_bucket',
                sensor_type: '$device.sensor_type'
              },

              // Time and count metrics
              measurement_count: { $sum: 1 },
              first_measurement: { $min: '$timestamp' },
              last_measurement: { $max: '$timestamp' },

              // Data quality metrics
              avg_quality_score: { $avg: '$quality_metrics.data_quality_score' },
              min_quality_score: { $min: '$quality_metrics.data_quality_score' },

              // Network metrics
              avg_signal_strength: { $avg: '$device.network_info.signal_strength' },

              // Measurement statistics (dynamic based on available measurements)
              measurements: { $push: '$measurements' }
            }
          },
          {
            $addFields: {
              // Process measurements to calculate statistics for each type
              measurement_stats: {
                $reduce: {
                  input: '$measurements',
                  initialValue: {},
                  in: {
                    $mergeObjects: [
                      '$$value',
                      {
                        $arrayToObject: {
                          $map: {
                            input: { $objectToArray: '$$this' },
                            in: {
                              k: '$$this.k',
                              v: {
                                values: { $concatArrays: [{ $ifNull: [{ $getField: { field: 'values', input: { $getField: { field: '$$this.k', input: '$$value' } } } }, []] }, ['$$this.v.value']] },
                                unit: '$$this.v.unit'
                              }
                            }
                          }
                        }
                      }
                    ]
                  }
                }
              }
            }
          },
          {
            $addFields: {
              // Calculate final statistics for each measurement type
              final_measurement_stats: {
                $arrayToObject: {
                  $map: {
                    input: { $objectToArray: '$measurement_stats' },
                    in: {
                      k: '$$this.k',
                      v: {
                        count: { $size: '$$this.v.values' },
                        average: { $avg: '$$this.v.values' },
                        min: { $min: '$$this.v.values' },
                        max: { $max: '$$this.v.values' },
                        stddev: { $stdDevPop: '$$this.v.values' },
                        unit: '$$this.v.unit'
                      }
                    }
                  }
                }
              }
            }
          },
          {
            $sort: { '_id.hour': 1 }
          }
        );
      } else {
        // Simple data retrieval without aggregations
        pipeline.push(
          {
            $sort: { timestamp: -1 }
          },
          {
            $limit: options.limit || 1000
          }
        );
      }

      const results = await this.sensorData.aggregate(pipeline).toArray();

      // Get cached analytics for quick dashboard metrics
      const cachedAnalytics = await this.getCachedAnalytics(deviceId, measurementTypes);

      return {
        success: true,
        device_id: deviceId,
        time_range: timeRange,
        query_timestamp: new Date(),
        data_points: results,
        cached_analytics: cachedAnalytics,

        summary: {
          total_measurements: results.reduce((sum, item) => sum + (item.measurement_count || 1), 0),
          time_span: {
            start: startTime,
            end: new Date()
          },
          measurement_types: measurementTypes || 'all'
        }
      };

    } catch (error) {
      console.error('Error retrieving device analytics:', error);
      return {
        success: false,
        error: error.message,
        device_id: deviceId
      };
    }
  }

  async getCachedAnalytics(deviceId, measurementTypes = null) {
    const query = {
      'computation_type.device_id': deviceId
    };

    if (measurementTypes && measurementTypes.length > 0) {
      query['computation_type.measurement_type'] = { $in: measurementTypes };
    }

    const cachedResults = await this.analyticsCache.find(query)
      .sort({ computed_at: -1 })
      .toArray();

    // Organize cached results by measurement type and window
    const organized = {};

    cachedResults.forEach(result => {
      const measurementType = result.computation_type.measurement_type;
      const window = result.computation_type.window;

      if (!organized[measurementType]) {
        organized[measurementType] = {};
      }

      organized[measurementType][window] = {
        ...result.statistics,
        computed_at: result.computed_at
      };
    });

    return organized;
  }

  parseTimeRange(timeRange) {
    const ranges = {
      '5m': 5 * 60 * 1000,
      '15m': 15 * 60 * 1000,
      '1h': 60 * 60 * 1000,
      '6h': 6 * 60 * 60 * 1000,
      '24h': 24 * 60 * 60 * 1000,
      '7d': 7 * 24 * 60 * 60 * 1000,
      '30d': 30 * 24 * 60 * 60 * 1000
    };

    return ranges[timeRange] || ranges['24h'];
  }

  async getFacilityOverview(facility, options = {}) {
    const timeRange = options.timeRange || '24h';
    const timeRangeMs = this.parseTimeRange(timeRange);
    const startTime = new Date(Date.now() - timeRangeMs);

    try {
      const pipeline = [
        {
          $match: {
            'device.facility': facility,
            timestamp: { $gte: startTime }
          }
        },
        {
          $group: {
            _id: {
              device_id: '$device.id',
              zone: '$device.zone',
              sensor_type: '$device.sensor_type'
            },

            latest_timestamp: { $max: '$timestamp' },
            measurement_count: { $sum: 1 },
            avg_data_quality: { $avg: '$quality_metrics.data_quality_score' },
            avg_signal_strength: { $avg: '$device.network_info.signal_strength' },

            // Latest measurements for current values
            latest_measurements: { $last: '$measurements' },

            // Device info
            device_info: { $last: '$device' },
            latest_quality_metrics: { $last: '$quality_metrics' }
          }
        },
        {
          $addFields: {
            // Calculate device status
            device_status: {
              $switch: {
                branches: [
                  {
                    case: { $lt: ['$latest_timestamp', { $subtract: [new Date(), 300000] }] }, // 5 minutes
                    then: 'offline'
                  },
                  {
                    case: { $lt: ['$avg_data_quality', 50] },
                    then: 'degraded'
                  },
                  {
                    case: { $lt: ['$avg_signal_strength', -80] },
                    then: 'poor_connectivity'
                  }
                ],
                default: 'online'
              }
            },

            // Time since last measurement
            minutes_since_last_measurement: {
              $divide: [
                { $subtract: [new Date(), '$latest_timestamp'] },
                60000
              ]
            }
          }
        },
        {
          $group: {
            _id: '$_id.zone',

            device_count: { $sum: 1 },

            // Status distribution
            online_devices: {
              $sum: { $cond: [{ $eq: ['$device_status', 'online'] }, 1, 0] }
            },
            offline_devices: {
              $sum: { $cond: [{ $eq: ['$device_status', 'offline'] }, 1, 0] }
            },
            degraded_devices: {
              $sum: { $cond: [{ $eq: ['$device_status', 'degraded'] }, 1, 0] }
            },

            // Performance metrics
            avg_data_quality: { $avg: '$avg_data_quality' },
            avg_signal_strength: { $avg: '$avg_signal_strength' },
            total_measurements: { $sum: '$measurement_count' },

            // Sensor type distribution
            sensor_types: { $addToSet: '$_id.sensor_type' },

            // Device details
            devices: {
              $push: {
                device_id: '$_id.device_id',
                sensor_type: '$_id.sensor_type',
                status: '$device_status',
                last_seen: '$latest_timestamp',
                data_quality: '$avg_data_quality',
                signal_strength: '$avg_signal_strength',
                measurement_count: '$measurement_count',
                latest_measurements: '$latest_measurements'
              }
            }
          }
        },
        {
          $sort: { '_id': 1 }
        }
      ];

      const results = await this.sensorData.aggregate(pipeline).toArray();

      // Calculate facility-wide statistics
      const facilityStats = results.reduce((stats, zone) => {
        stats.total_devices += zone.device_count;
        stats.total_online += zone.online_devices;
        stats.total_offline += zone.offline_devices;
        stats.total_degraded += zone.degraded_devices;
        stats.total_measurements += zone.total_measurements;

        stats.avg_data_quality = (stats.avg_data_quality * stats.zones_processed + zone.avg_data_quality) / (stats.zones_processed + 1);
        stats.avg_signal_strength = (stats.avg_signal_strength * stats.zones_processed + zone.avg_signal_strength) / (stats.zones_processed + 1);
        stats.zones_processed += 1;

        // Collect unique sensor types
        zone.sensor_types.forEach(type => {
          if (!stats.sensor_types.includes(type)) {
            stats.sensor_types.push(type);
          }
        });

        return stats;
      }, {
        total_devices: 0,
        total_online: 0,
        total_offline: 0,
        total_degraded: 0,
        total_measurements: 0,
        avg_data_quality: 0,
        avg_signal_strength: 0,
        sensor_types: [],
        zones_processed: 0
      });

      // Calculate health score
      const healthScore = Math.round(
        (facilityStats.total_online / Math.max(facilityStats.total_devices, 1)) * 0.6 +
        (facilityStats.avg_data_quality / 100) * 0.3 +
        ((facilityStats.avg_signal_strength + 100) / 50) * 0.1
      );

      return {
        success: true,
        facility: facility,
        time_range: timeRange,
        generated_at: new Date(),

        facility_overview: {
          total_devices: facilityStats.total_devices,
          online_devices: facilityStats.total_online,
          offline_devices: facilityStats.total_offline,
          degraded_devices: facilityStats.total_degraded,

          uptime_percentage: Math.round((facilityStats.total_online / Math.max(facilityStats.total_devices, 1)) * 100),
          avg_data_quality: Math.round(facilityStats.avg_data_quality),
          avg_signal_strength: Math.round(facilityStats.avg_signal_strength),
          facility_health_score: healthScore,

          sensor_types: facilityStats.sensor_types,
          total_measurements_today: facilityStats.total_measurements,

          zones: results
        }
      };

    } catch (error) {
      console.error('Error generating facility overview:', error);
      return {
        success: false,
        error: error.message,
        facility: facility
      };
    }
  }

  async performBatchIngestion(batchData, options = {}) {
    const batchSize = options.batchSize || 1000;
    const enableValidation = options.enableValidation !== false;
    const startTime = Date.now();

    console.log(`Starting batch ingestion of ${batchData.length} records...`);

    try {
      const results = {
        total_records: batchData.length,
        processed_records: 0,
        failed_records: 0,
        batches_processed: 0,
        processing_time_ms: 0,
        errors: []
      };

      // Process in batches for optimal performance
      for (let i = 0; i < batchData.length; i += batchSize) {
        const batch = batchData.slice(i, i + batchSize);
        const batchStartTime = Date.now();

        // Prepare documents for insertion
        const documents = batch.map(record => {
          try {
            if (enableValidation) {
              this.validateBatchRecord(record);
            }

            return {
              timestamp: new Date(record.timestamp),

              device: {
                id: record.device_id,
                sensor_type: record.sensor_type,
                facility: record.facility || 'unknown',
                zone: record.zone || 'unspecified',
                location: record.location ? {
                  type: 'Point',
                  coordinates: [record.location.lng, record.location.lat]
                } : null,
                model: record.device_model || 'unknown',
                firmware_version: record.firmware_version || '1.0',
                network_info: {
                  connection_type: record.connection_type || 'unknown',
                  signal_strength: record.signal_strength || -50,
                  gateway_id: record.gateway_id
                }
              },

              measurements: this.normalizeMeasurements(record.measurements || record.values),

              quality_metrics: {
                data_quality_score: record.data_quality_score || 95,
                sensor_health: record.sensor_health || 'normal',
                calibration_status: record.calibration_status || 'valid',
                measurement_accuracy: record.measurement_accuracy || 0.95
              },

              processing_info: {
                ingestion_timestamp: new Date(),
                processing_latency_ms: 0,
                source: 'batch_import',
                batch_id: options.batchId || `batch_${Date.now()}`,
                schema_version: '2.0'
              }
            };
          } catch (validationError) {
            results.errors.push({
              record_index: i + batch.indexOf(record),
              error: validationError.message,
              record: record
            });
            return null;
          }
        }).filter(doc => doc !== null);

        if (documents.length > 0) {
          try {
            await this.sensorData.insertMany(documents, { 
              ordered: false,
              writeConcern: { w: 'majority', j: true }
            });

            results.processed_records += documents.length;
          } catch (insertError) {
            results.failed_records += documents.length;
            results.errors.push({
              batch_index: results.batches_processed,
              error: insertError.message,
              documents_count: documents.length
            });
          }
        }

        results.batches_processed += 1;
        const batchTime = Date.now() - batchStartTime;

        console.log(`Batch ${results.batches_processed} processed: ${documents.length} records in ${batchTime}ms`);
      }

      results.processing_time_ms = Date.now() - startTime;
      results.success_rate = (results.processed_records / results.total_records) * 100;
      results.throughput_records_per_second = Math.round(results.processed_records / (results.processing_time_ms / 1000));

      console.log(`Batch ingestion completed: ${results.processed_records}/${results.total_records} records processed in ${results.processing_time_ms}ms`);

      return {
        success: true,
        results: results
      };

    } catch (error) {
      console.error('Batch ingestion failed:', error);
      return {
        success: false,
        error: error.message,
        processing_time_ms: Date.now() - startTime
      };
    }
  }

  validateBatchRecord(record) {
    if (!record.device_id) {
      throw new Error('device_id is required');
    }

    if (!record.sensor_type) {
      throw new Error('sensor_type is required');
    }

    if (!record.timestamp) {
      throw new Error('timestamp is required');
    }

    if (!record.measurements && !record.values) {
      throw new Error('measurements or values are required');
    }

    // Validate timestamp format
    const timestamp = new Date(record.timestamp);
    if (isNaN(timestamp.getTime())) {
      throw new Error('Invalid timestamp format');
    }

    // Validate timestamp is not in the future
    if (timestamp > new Date()) {
      throw new Error('Timestamp cannot be in the future');
    }

    // Validate timestamp is not too old (more than 1 year)
    const oneYearAgo = new Date(Date.now() - 365 * 24 * 60 * 60 * 1000);
    if (timestamp < oneYearAgo) {
      throw new Error('Timestamp too old (more than 1 year)');
    }
  }
}

module.exports = { MongoDBTimeSeriesManager };

Advanced Time Series Analytics Patterns

Real-Time Aggregation Pipelines

Implement sophisticated real-time analytics using MongoDB aggregation frameworks:

// Advanced analytics and alerting system
class TimeSeriesAnalyticsEngine {
  constructor(timeSeriesManager) {
    this.tsManager = timeSeriesManager;
    this.db = timeSeriesManager.db;
    this.alertRules = new Map();
    this.analyticsCache = new Map();
  }

  async createAdvancedAnalyticsPipeline(analysisConfig) {
    const {
      deviceFilter = {},
      timeRange = '24h',
      aggregationLevel = 'hour',
      analysisTypes = ['trend', 'anomaly', 'correlation'],
      realTimeEnabled = true
    } = analysisConfig;

    try {
      const timeRangeMs = this.tsManager.parseTimeRange(timeRange);
      const startTime = new Date(Date.now() - timeRangeMs);

      // Build comprehensive analytics pipeline
      const pipeline = [
        // Stage 1: Filter data by time and device criteria
        {
          $match: {
            timestamp: { $gte: startTime },
            ...this.buildDeviceFilter(deviceFilter)
          }
        },

        // Stage 2: Add time bucketing for aggregation
        {
          $addFields: {
            time_bucket: this.getTimeBucketExpression(aggregationLevel),
            hour_of_day: { $hour: '$timestamp' },
            day_of_week: { $dayOfWeek: '$timestamp' },
            is_business_hours: {
              $and: [
                { $gte: [{ $hour: '$timestamp' }, 8] },
                { $lte: [{ $hour: '$timestamp' }, 18] }
              ]
            }
          }
        },

        // Stage 3: Unwind measurements for individual analysis
        {
          $addFields: {
            measurement_array: {
              $objectToArray: '$measurements'
            }
          }
        },

        {
          $unwind: '$measurement_array'
        },

        // Stage 4: Group by time bucket, device, and measurement type
        {
          $group: {
            _id: {
              time_bucket: '$time_bucket',
              device_id: '$device.id',
              measurement_type: '$measurement_array.k',
              facility: '$device.facility',
              zone: '$device.zone'
            },

            // Statistical aggregations
            count: { $sum: 1 },
            avg_value: { $avg: '$measurement_array.v.value' },
            min_value: { $min: '$measurement_array.v.value' },
            max_value: { $max: '$measurement_array.v.value' },
            sum_value: { $sum: '$measurement_array.v.value' },
            stddev_value: { $stdDevPop: '$measurement_array.v.value' },

            // Data quality metrics
            avg_data_quality: { $avg: '$quality_metrics.data_quality_score' },
            min_data_quality: { $min: '$quality_metrics.data_quality_score' },

            // Network performance
            avg_signal_strength: { $avg: '$device.network_info.signal_strength' },

            // Time-based metrics
            first_timestamp: { $min: '$timestamp' },
            last_timestamp: { $max: '$timestamp' },

            // Business context
            business_hours_readings: {
              $sum: { $cond: ['$is_business_hours', 1, 0] }
            },

            // Value arrays for advanced calculations
            values: { $push: '$measurement_array.v.value' },
            timestamps: { $push: '$timestamp' },

            // Metadata
            unit: { $last: '$measurement_array.v.unit' },
            device_info: { $last: '$device' }
          }
        },

        // Stage 5: Calculate advanced metrics
        {
          $addFields: {
            // Variance and coefficient of variation
            variance: { $pow: ['$stddev_value', 2] },
            coefficient_of_variation: {
              $cond: [
                { $ne: ['$avg_value', 0] },
                { $divide: ['$stddev_value', '$avg_value'] },
                0
              ]
            },

            // Range and percentiles (approximated)
            value_range: { $subtract: ['$max_value', '$min_value'] },

            // Data completeness
            expected_readings: {
              $divide: [
                { $subtract: ['$last_timestamp', '$first_timestamp'] },
                { $multiply: [this.getExpectedInterval(aggregationLevel), 1000] }
              ]
            },

            // Time span coverage
            time_span_hours: {
              $divide: [
                { $subtract: ['$last_timestamp', '$first_timestamp'] },
                3600000
              ]
            },

            // Business hours coverage
            business_hours_percentage: {
              $multiply: [
                { $divide: ['$business_hours_readings', '$count'] },
                100
              ]
            }
          }
        },

        // Stage 6: Calculate trend indicators
        {
          $addFields: {
            // Simple trend approximation
            trend_direction: {
              $switch: {
                branches: [
                  {
                    case: { $gt: ['$coefficient_of_variation', 0.5] },
                    then: 'highly_variable'
                  },
                  {
                    case: { $gt: ['$max_value', { $multiply: ['$avg_value', 1.2] }] },
                    then: 'trending_high'
                  },
                  {
                    case: { $lt: ['$min_value', { $multiply: ['$avg_value', 0.8] }] },
                    then: 'trending_low'
                  }
                ],
                default: 'stable'
              }
            },

            // Anomaly detection flags
            anomaly_indicators: {
              $let: {
                vars: {
                  three_sigma_upper: { $add: ['$avg_value', { $multiply: ['$stddev_value', 3] }] },
                  three_sigma_lower: { $subtract: ['$avg_value', { $multiply: ['$stddev_value', 3] }] }
                },
                in: {
                  has_outliers: {
                    $or: [
                      { $gt: ['$max_value', '$$three_sigma_upper'] },
                      { $lt: ['$min_value', '$$three_sigma_lower'] }
                    ]
                  },
                  outlier_percentage: {
                    $multiply: [
                      {
                        $divide: [
                          {
                            $size: {
                              $filter: {
                                input: '$values',
                                cond: {
                                  $or: [
                                    { $gt: ['$$this', '$$three_sigma_upper'] },
                                    { $lt: ['$$this', '$$three_sigma_lower'] }
                                  ]
                                }
                              }
                            }
                          },
                          '$count'
                        ]
                      },
                      100
                    ]
                  }
                }
              }
            },

            // Performance indicators
            performance_score: {
              $multiply: [
                // Data quality component (40%)
                { $multiply: [{ $divide: ['$avg_data_quality', 100] }, 0.4] },

                // Connectivity component (30%)
                { $multiply: [{ $divide: [{ $add: ['$avg_signal_strength', 100] }, 50] }, 0.3] },

                // Completeness component (30%)
                { $multiply: [{ $min: [{ $divide: ['$count', '$expected_readings'] }, 1] }, 0.3] },

                100
              ]
            }
          }
        },

        // Stage 7: Add comparative context
        {
          $lookup: {
            from: 'sensor_readings',
            let: {
              device_id: '$_id.device_id',
              measurement_type: '$_id.measurement_type',
              current_start: '$first_timestamp'
            },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $eq: ['$device.id', '$$device_id'] },
                      { $lt: ['$timestamp', '$$current_start'] },
                      { $gte: ['$timestamp', { $subtract: ['$$current_start', timeRangeMs] }] }
                    ]
                  }
                }
              },
              {
                $group: {
                  _id: null,
                  historical_avg: { $avg: { $getField: { field: '$$measurement_type', input: '$measurements' } } }
                }
              }
            ],
            as: 'historical_context'
          }
        },

        // Stage 8: Final calculations and categorization
        {
          $addFields: {
            // Historical comparison
            historical_avg: {
              $ifNull: [
                { $arrayElemAt: ['$historical_context.historical_avg', 0] },
                '$avg_value'
              ]
            },

            // Calculate change from historical baseline
            historical_change_percentage: {
              $let: {
                vars: {
                  historical: {
                    $ifNull: [
                      { $arrayElemAt: ['$historical_context.historical_avg', 0] },
                      '$avg_value'
                    ]
                  }
                },
                in: {
                  $cond: [
                    { $ne: ['$$historical', 0] },
                    {
                      $multiply: [
                        { $divide: [{ $subtract: ['$avg_value', '$$historical'] }, '$$historical'] },
                        100
                      ]
                    },
                    0
                  ]
                }
              }
            },

            // Overall health assessment
            health_status: {
              $switch: {
                branches: [
                  {
                    case: { $lt: ['$performance_score', 50] },
                    then: 'critical'
                  },
                  {
                    case: { $lt: ['$performance_score', 70] },
                    then: 'warning'
                  },
                  {
                    case: { $gt: ['$anomaly_indicators.outlier_percentage', 10] },
                    then: 'anomalous'
                  }
                ],
                default: 'healthy'
              }
            },

            // Analysis timestamp
            analyzed_at: new Date(),
            analysis_duration_ms: { $subtract: [new Date(), startTime] }
          }
        },

        // Stage 9: Sort by relevance
        {
          $sort: {
            performance_score: 1, // Worst performers first
            'anomaly_indicators.outlier_percentage': -1,
            '_id.time_bucket': -1
          }
        }
      ];

      const results = await this.tsManager.sensorData.aggregate(pipeline).toArray();

      // Process results for real-time actions
      if (realTimeEnabled) {
        await this.processAnalyticsForAlerts(results);
      }

      // Cache results for dashboard performance
      const cacheKey = this.generateAnalyticsCacheKey(analysisConfig);
      this.analyticsCache.set(cacheKey, {
        results: results,
        generated_at: new Date(),
        config: analysisConfig
      });

      return {
        success: true,
        analysis_config: analysisConfig,
        results: results,
        summary: this.generateAnalyticsSummary(results),
        generated_at: new Date(),
        cache_key: cacheKey
      };

    } catch (error) {
      console.error('Advanced analytics pipeline failed:', error);
      return {
        success: false,
        error: error.message,
        analysis_config: analysisConfig
      };
    }
  }

  buildDeviceFilter(deviceFilter) {
    const filter = {};

    if (deviceFilter.device_ids && deviceFilter.device_ids.length > 0) {
      filter['device.id'] = { $in: deviceFilter.device_ids };
    }

    if (deviceFilter.facilities && deviceFilter.facilities.length > 0) {
      filter['device.facility'] = { $in: deviceFilter.facilities };
    }

    if (deviceFilter.zones && deviceFilter.zones.length > 0) {
      filter['device.zone'] = { $in: deviceFilter.zones };
    }

    if (deviceFilter.sensor_types && deviceFilter.sensor_types.length > 0) {
      filter['device.sensor_type'] = { $in: deviceFilter.sensor_types };
    }

    if (deviceFilter.location_radius) {
      const { center, radius_meters } = deviceFilter.location_radius;
      filter['device.location'] = {
        $geoWithin: {
          $centerSphere: [[center.lng, center.lat], radius_meters / 6378137] // Earth radius in meters
        }
      };
    }

    return filter;
  }

  getTimeBucketExpression(aggregationLevel) {
    const buckets = {
      minute: { $dateTrunc: { date: '$timestamp', unit: 'minute' } },
      hour: { $dateTrunc: { date: '$timestamp', unit: 'hour' } },
      day: { $dateTrunc: { date: '$timestamp', unit: 'day' } },
      week: {
        $dateAdd: {
          startDate: { $dateTrunc: { date: '$timestamp', unit: 'week', startOfWeek: 'monday' } },
          unit: 'day',
          amount: 0
        }
      },
      month: { $dateTrunc: { date: '$timestamp', unit: 'month' } }
    };

    return buckets[aggregationLevel] || buckets.hour;
  }

  getExpectedInterval(aggregationLevel) {
    const intervals = {
      minute: 60,     // 60 seconds
      hour: 3600,     // 3600 seconds  
      day: 86400,     // 86400 seconds
      week: 604800,   // 604800 seconds
      month: 2592000  // ~30 days in seconds
    };

    return intervals[aggregationLevel] || intervals.hour;
  }

  async processAnalyticsForAlerts(analyticsResults) {
    for (const result of analyticsResults) {
      // Check for alert conditions
      if (result.health_status === 'critical') {
        await this.createAnalyticsAlert('critical_performance', result);
      }

      if (result.anomaly_indicators.outlier_percentage > 15) {
        await this.createAnalyticsAlert('anomaly_detected', result);
      }

      if (Math.abs(result.historical_change_percentage) > 50) {
        await this.createAnalyticsAlert('significant_trend_change', result);
      }

      if (result.performance_score < 30) {
        await this.createAnalyticsAlert('poor_performance', result);
      }
    }
  }

  async createAnalyticsAlert(alertType, analyticsData) {
    const alertsCollection = this.db.collection('analytics_alerts');

    const alert = {
      _id: new ObjectId(),
      alert_type: alertType,
      device_id: analyticsData._id.device_id,
      measurement_type: analyticsData._id.measurement_type,
      facility: analyticsData._id.facility,
      zone: analyticsData._id.zone,

      // Alert details
      severity: this.calculateAlertSeverity(alertType, analyticsData),
      description: this.generateAlertDescription(alertType, analyticsData),

      // Analytics context
      analytics_data: {
        time_bucket: analyticsData._id.time_bucket,
        performance_score: analyticsData.performance_score,
        health_status: analyticsData.health_status,
        anomaly_indicators: analyticsData.anomaly_indicators,
        historical_change_percentage: analyticsData.historical_change_percentage,
        avg_value: analyticsData.avg_value,
        trend_direction: analyticsData.trend_direction
      },

      // Timestamps
      created_at: new Date(),
      acknowledged: false,
      resolved: false
    };

    await alertsCollection.insertOne(alert);
    console.log(`Analytics Alert Created: ${alertType} for device ${analyticsData._id.device_id}`);
  }

  calculateAlertSeverity(alertType, analyticsData) {
    switch (alertType) {
      case 'critical_performance':
        return analyticsData.performance_score < 20 ? 'critical' : 'high';

      case 'anomaly_detected':
        return analyticsData.anomaly_indicators.outlier_percentage > 25 ? 'high' : 'medium';

      case 'significant_trend_change':
        return Math.abs(analyticsData.historical_change_percentage) > 100 ? 'high' : 'medium';

      case 'poor_performance':
        return analyticsData.performance_score < 20 ? 'high' : 'medium';

      default:
        return 'medium';
    }
  }

  generateAlertDescription(alertType, analyticsData) {
    const device = analyticsData._id.device_id;
    const measurement = analyticsData._id.measurement_type;

    switch (alertType) {
      case 'critical_performance':
        return `Critical performance degradation detected for ${device} ${measurement} sensor. Performance score: ${Math.round(analyticsData.performance_score)}%`;

      case 'anomaly_detected':
        return `Anomalous readings detected for ${device} ${measurement}. ${Math.round(analyticsData.anomaly_indicators.outlier_percentage)}% of readings are outliers`;

      case 'significant_trend_change':
        return `Significant trend change for ${device} ${measurement}. ${Math.round(analyticsData.historical_change_percentage)}% change from historical baseline`;

      case 'poor_performance':
        return `Poor performance detected for ${device} ${measurement}. Performance score: ${Math.round(analyticsData.performance_score)}%`;

      default:
        return `Analytics alert for ${device} ${measurement}`;
    }
  }

  generateAnalyticsSummary(results) {
    if (results.length === 0) {
      return { total_devices: 0, total_measurements: 0 };
    }

    const summary = {
      total_analyses: results.length,
      unique_devices: new Set(results.map(r => r._id.device_id)).size,
      unique_measurements: new Set(results.map(r => r._id.measurement_type)).size,
      unique_facilities: new Set(results.map(r => r._id.facility)).size,

      // Health distribution
      health_status_distribution: {},

      // Performance metrics
      avg_performance_score: 0,
      min_performance_score: 100,
      max_performance_score: 0,

      // Anomaly statistics
      anomalous_analyses: 0,
      avg_outlier_percentage: 0,

      // Trend distribution
      trend_distribution: {},

      // Time range
      earliest_bucket: null,
      latest_bucket: null
    };

    // Calculate distributions and averages
    results.forEach(result => {
      // Health status distribution
      const status = result.health_status;
      summary.health_status_distribution[status] = (summary.health_status_distribution[status] || 0) + 1;

      // Performance metrics
      summary.avg_performance_score += result.performance_score;
      summary.min_performance_score = Math.min(summary.min_performance_score, result.performance_score);
      summary.max_performance_score = Math.max(summary.max_performance_score, result.performance_score);

      // Anomaly tracking
      if (result.anomaly_indicators.has_outliers) {
        summary.anomalous_analyses++;
      }
      summary.avg_outlier_percentage += result.anomaly_indicators.outlier_percentage;

      // Trend distribution
      const trend = result.trend_direction;
      summary.trend_distribution[trend] = (summary.trend_distribution[trend] || 0) + 1;

      // Time range
      const bucket = result._id.time_bucket;
      if (!summary.earliest_bucket || bucket < summary.earliest_bucket) {
        summary.earliest_bucket = bucket;
      }
      if (!summary.latest_bucket || bucket > summary.latest_bucket) {
        summary.latest_bucket = bucket;
      }
    });

    // Calculate averages
    summary.avg_performance_score = Math.round(summary.avg_performance_score / results.length);
    summary.avg_outlier_percentage = Math.round(summary.avg_outlier_percentage / results.length);
    summary.anomaly_rate = Math.round((summary.anomalous_analyses / results.length) * 100);

    return summary;
  }

  generateAnalyticsCacheKey(analysisConfig) {
    const keyData = {
      devices: JSON.stringify(analysisConfig.deviceFilter || {}),
      timeRange: analysisConfig.timeRange,
      aggregationLevel: analysisConfig.aggregationLevel,
      analysisTypes: JSON.stringify(analysisConfig.analysisTypes || [])
    };

    const crypto = require('crypto');
    return crypto.createHash('md5').update(JSON.stringify(keyData)).digest('hex');
  }
}

module.exports = { TimeSeriesAnalyticsEngine };

SQL-Style Time Series Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB time series operations:

-- QueryLeaf Time Series operations with SQL-familiar syntax

-- Insert time series data with automatic optimization
INSERT INTO sensor_readings (
  timestamp,
  device_id,
  sensor_type,
  measurements,
  location,
  quality_metrics
)
VALUES (
  CURRENT_TIMESTAMP,
  'device_001',
  'temperature',
  JSON_BUILD_OBJECT(
    'temperature', JSON_BUILD_OBJECT('value', 23.5, 'unit', 'celsius'),
    'humidity', JSON_BUILD_OBJECT('value', 65.2, 'unit', 'percent')
  ),
  ST_GeomFromText('POINT(-74.0060 40.7128)', 4326),
  JSON_BUILD_OBJECT(
    'data_quality_score', 95,
    'sensor_health', 'normal',
    'signal_strength', -45
  )
);

-- Advanced time series analytics with window functions
WITH hourly_analytics AS (
  SELECT 
    device_id,
    sensor_type,
    DATE_TRUNC('hour', timestamp) as hour_bucket,

    -- Basic statistics
    COUNT(*) as reading_count,
    AVG(measurements->>'temperature'->>'value') as avg_temperature,
    MIN(measurements->>'temperature'->>'value') as min_temperature,
    MAX(measurements->>'temperature'->>'value') as max_temperature,
    STDDEV(measurements->>'temperature'->>'value') as temp_stddev,

    -- Time series specific aggregations
    FIRST_VALUE(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id, DATE_TRUNC('hour', timestamp)
      ORDER BY timestamp ASC
      ROWS UNBOUNDED PRECEDING
    ) as first_reading,

    LAST_VALUE(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id, DATE_TRUNC('hour', timestamp) 
      ORDER BY timestamp ASC
      ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as last_reading,

    -- Moving averages
    AVG(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id
      ORDER BY timestamp
      ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
    ) as moving_avg_6_readings,

    -- Data quality metrics
    AVG(quality_metrics->>'data_quality_score') as avg_data_quality,
    MIN(quality_metrics->>'signal_strength') as min_signal_strength,

    -- Geographical aggregation
    device.facility,
    device.zone,
    ST_AsText(AVG(device.location)) as avg_location

  FROM sensor_readings
  WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    AND device.sensor_type = 'temperature'
  GROUP BY 
    device_id, 
    sensor_type, 
    DATE_TRUNC('hour', timestamp),
    device.facility,
    device.zone
),

trend_analysis AS (
  -- Calculate trends and changes over time
  SELECT 
    *,

    -- Hour-over-hour trend calculation
    LAG(avg_temperature, 1) OVER (
      PARTITION BY device_id 
      ORDER BY hour_bucket
    ) as prev_hour_temp,

    -- Trend direction
    CASE 
      WHEN avg_temperature > LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) + temp_stddev THEN 'rising_fast'
      WHEN avg_temperature > LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) THEN 'rising'
      WHEN avg_temperature < LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) - temp_stddev THEN 'falling_fast'
      WHEN avg_temperature < LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) THEN 'falling'
      ELSE 'stable'
    END as trend_direction,

    -- Anomaly detection using statistical boundaries
    CASE 
      WHEN ABS(avg_temperature - AVG(avg_temperature) OVER (
        PARTITION BY device_id 
        ORDER BY hour_bucket 
        ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
      )) > 3 * STDDEV(avg_temperature) OVER (
        PARTITION BY device_id 
        ORDER BY hour_bucket 
        ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
      ) THEN true
      ELSE false
    END as is_anomaly,

    -- Performance scoring
    CASE 
      WHEN avg_data_quality >= 90 AND min_signal_strength >= -60 THEN 'excellent'
      WHEN avg_data_quality >= 80 AND min_signal_strength >= -70 THEN 'good'
      WHEN avg_data_quality >= 70 AND min_signal_strength >= -80 THEN 'fair'
      ELSE 'poor'
    END as performance_rating,

    -- Operational status
    CASE 
      WHEN reading_count < 50 THEN 'low_frequency'  -- Expected: 60 readings per hour
      WHEN reading_count > 70 THEN 'high_frequency'
      ELSE 'normal_frequency'
    END as operational_status

  FROM hourly_analytics
),

facility_overview AS (
  -- Facility-level aggregations and insights
  SELECT 
    facility,
    zone,
    hour_bucket,

    -- Device and measurement counts
    COUNT(DISTINCT device_id) as active_devices,
    SUM(reading_count) as total_readings,

    -- Temperature analytics
    AVG(avg_temperature) as facility_avg_temp,
    MIN(min_temperature) as facility_min_temp,
    MAX(max_temperature) as facility_max_temp,

    -- Performance metrics
    AVG(avg_data_quality) as facility_data_quality,
    AVG(min_signal_strength) as facility_avg_signal,

    -- Status distribution
    COUNT(*) FILTER (WHERE performance_rating = 'excellent') as excellent_devices,
    COUNT(*) FILTER (WHERE performance_rating = 'good') as good_devices,
    COUNT(*) FILTER (WHERE performance_rating = 'fair') as fair_devices,
    COUNT(*) FILTER (WHERE performance_rating = 'poor') as poor_devices,

    -- Anomaly and trend insights
    COUNT(*) FILTER (WHERE is_anomaly = true) as anomalous_devices,
    COUNT(*) FILTER (WHERE trend_direction LIKE '%rising%') as rising_trend_devices,
    COUNT(*) FILTER (WHERE trend_direction LIKE '%falling%') as falling_trend_devices,

    -- Operational health
    COUNT(*) FILTER (WHERE operational_status = 'normal_frequency') as normal_operation_devices,
    COUNT(*) FILTER (WHERE operational_status = 'low_frequency') as low_frequency_devices,

    -- Geographic insights (if location data available)
    COUNT(DISTINCT avg_location) as location_diversity

  FROM trend_analysis
  GROUP BY facility, zone, hour_bucket
)

-- Final comprehensive time series analytics dashboard
SELECT 
  f.facility,
  f.zone,
  f.hour_bucket,

  -- Device and data summary
  f.active_devices,
  f.total_readings,
  ROUND(f.total_readings::numeric / NULLIF(f.active_devices, 0), 0) as avg_readings_per_device,

  -- Environmental metrics
  ROUND(f.facility_avg_temp, 2) as avg_temperature,
  ROUND(f.facility_min_temp, 2) as min_temperature,
  ROUND(f.facility_max_temp, 2) as max_temperature,
  ROUND(f.facility_max_temp - f.facility_min_temp, 2) as temperature_range,

  -- Performance assessment
  ROUND(f.facility_data_quality, 1) as data_quality_percentage,
  ROUND(f.facility_avg_signal, 0) as avg_signal_strength,

  -- Health score calculation
  ROUND(
    (f.excellent_devices * 100 + f.good_devices * 80 + f.fair_devices * 60 + f.poor_devices * 40) 
    / NULLIF(f.active_devices, 0), 
    1
  ) as facility_health_score,

  -- Status distribution percentages
  ROUND((f.excellent_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as excellent_percentage,
  ROUND((f.good_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as good_percentage,
  ROUND((f.fair_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as fair_percentage,
  ROUND((f.poor_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as poor_percentage,

  -- Trend and anomaly insights
  f.anomalous_devices,
  ROUND((f.anomalous_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as anomaly_percentage,
  f.rising_trend_devices,
  f.falling_trend_devices,

  -- Operational status
  f.normal_operation_devices,
  f.low_frequency_devices,
  ROUND((f.normal_operation_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as operational_health_percentage,

  -- Alert conditions
  CASE 
    WHEN f.anomalous_devices > (f.active_devices * 0.2) THEN 'high_anomaly_alert'
    WHEN f.poor_devices > (f.active_devices * 0.3) THEN 'poor_performance_alert'
    WHEN f.low_frequency_devices > (f.active_devices * 0.4) THEN 'connectivity_alert'
    WHEN f.facility_avg_temp > 40 OR f.facility_avg_temp < 0 THEN 'environmental_alert'
    ELSE 'normal'
  END as alert_status,

  -- Recommendations
  CASE 
    WHEN f.poor_devices > (f.active_devices * 0.2) THEN 'Investigate device performance issues'
    WHEN f.anomalous_devices > (f.active_devices * 0.1) THEN 'Review anomalous readings for pattern analysis'
    WHEN f.facility_data_quality < 80 THEN 'Improve data quality monitoring and sensor calibration'
    WHEN f.facility_avg_signal < -70 THEN 'Consider network infrastructure improvements'
    ELSE 'System operating within normal parameters'
  END as recommendation,

  -- Metadata
  CURRENT_TIMESTAMP as report_generated_at,
  '24h_analysis' as analysis_type

FROM facility_overview f
WHERE f.hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY 
  f.facility, 
  f.zone, 
  f.hour_bucket DESC;

-- Real-time alerting with time series patterns
WITH real_time_thresholds AS (
  SELECT 
    device_id,
    sensor_type,

    -- Current reading
    measurements->>'temperature'->>'value' as current_temp,
    measurements->>'humidity'->>'value' as current_humidity,
    quality_metrics->>'data_quality_score' as current_quality,

    -- Historical context (last hour average)
    AVG(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id
      ORDER BY timestamp
      RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND INTERVAL '1 minute' PRECEDING
    ) as historical_avg_temp,

    -- Recent trend (last 5 readings)
    AVG(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id
      ORDER BY timestamp
      ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
    ) as recent_avg_temp,

    -- Device metadata
    device.facility,
    device.zone,
    device.location,
    timestamp,

    -- Network health
    quality_metrics->>'signal_strength' as signal_strength

  FROM sensor_readings
  WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
    AND device.sensor_type = 'temperature'
),

alert_conditions AS (
  SELECT 
    *,

    -- Threshold breaches
    CASE 
      WHEN current_temp > 80 THEN 'critical_high_temperature'
      WHEN current_temp < -10 THEN 'critical_low_temperature'
      WHEN current_temp > 60 THEN 'high_temperature_warning'
      WHEN current_temp < 5 THEN 'low_temperature_warning'
      ELSE null
    END as temperature_alert,

    -- Rapid changes
    CASE 
      WHEN ABS(current_temp - recent_avg_temp) > 10 THEN 'rapid_temperature_change'
      WHEN ABS(current_temp - historical_avg_temp) > 15 THEN 'significant_deviation'
      ELSE null
    END as change_alert,

    -- Data quality issues
    CASE 
      WHEN current_quality < 50 THEN 'critical_data_quality'
      WHEN current_quality < 70 THEN 'poor_data_quality'
      WHEN signal_strength < -90 THEN 'poor_connectivity'
      ELSE null
    END as quality_alert,

    -- Combined severity assessment
    CASE 
      WHEN current_temp > 80 OR current_temp < -10 OR current_quality < 50 THEN 'critical'
      WHEN current_temp > 60 OR current_temp < 5 OR current_quality < 70 OR ABS(current_temp - recent_avg_temp) > 10 THEN 'warning'
      WHEN ABS(current_temp - historical_avg_temp) > 15 OR signal_strength < -80 THEN 'info'
      ELSE 'normal'
    END as overall_severity

  FROM real_time_thresholds
)

-- Generate active alerts with context
SELECT 
  device_id,
  facility,
  zone,
  ST_AsText(location) as device_location,
  timestamp as alert_timestamp,
  overall_severity,

  -- Primary alert
  COALESCE(temperature_alert, change_alert, quality_alert, 'normal') as primary_alert_type,

  -- Alert message
  CASE 
    WHEN temperature_alert IS NOT NULL THEN 
      CONCAT('Temperature alert: ', current_temp, '°C detected')
    WHEN change_alert IS NOT NULL THEN 
      CONCAT('Temperature change alert: ', ROUND(ABS(current_temp - recent_avg_temp), 1), '°C change detected')
    WHEN quality_alert IS NOT NULL THEN 
      CONCAT('Data quality alert: ', current_quality, '% quality score')
    ELSE 'System normal'
  END as alert_message,

  -- Current readings
  ROUND(current_temp, 2) as current_temperature,
  ROUND(current_humidity, 1) as current_humidity,
  current_quality as data_quality_percentage,
  signal_strength as signal_strength_dbm,

  -- Context
  ROUND(historical_avg_temp, 2) as hourly_avg_temperature,
  ROUND(recent_avg_temp, 2) as recent_avg_temperature,
  ROUND(ABS(current_temp - historical_avg_temp), 2) as deviation_from_hourly_avg,

  -- Action required
  CASE overall_severity
    WHEN 'critical' THEN 'IMMEDIATE ACTION REQUIRED'
    WHEN 'warning' THEN 'Investigation recommended'
    WHEN 'info' THEN 'Monitor for trends'
    ELSE 'No action required'
  END as recommended_action,

  -- Contact priority
  CASE overall_severity
    WHEN 'critical' THEN 'Notify operations team immediately'
    WHEN 'warning' THEN 'Escalate to facility manager'
    ELSE 'Log for review'
  END as escalation_level

FROM alert_conditions
WHERE overall_severity IN ('critical', 'warning', 'info')
ORDER BY 
  CASE overall_severity 
    WHEN 'critical' THEN 1 
    WHEN 'warning' THEN 2 
    WHEN 'info' THEN 3 
  END,
  timestamp DESC;

-- Time series data lifecycle management
WITH data_lifecycle_analysis AS (
  SELECT 
    DATE_TRUNC('day', timestamp) as date_bucket,
    device.facility,
    COUNT(*) as daily_record_count,
    AVG(quality_metrics->>'data_quality_score') as avg_daily_quality,

    -- Data size estimation (approximate)
    COUNT(*) * 1024 as estimated_bytes_per_day, -- Rough estimate

    -- Retention category
    CASE 
      WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '7 days' THEN 'hot_data'
      WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '90 days' THEN 'warm_data'
      WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '365 days' THEN 'cold_data'
      ELSE 'archive_data'
    END as retention_category,

    -- Archive recommendations
    CASE 
      WHEN DATE_TRUNC('day', timestamp) < CURRENT_DATE - INTERVAL '2 years' THEN 'candidate_for_deletion'
      WHEN DATE_TRUNC('day', timestamp) < CURRENT_DATE - INTERVAL '1 year' 
           AND AVG(quality_metrics->>'data_quality_score') < 70 THEN 'candidate_for_archive'
      ELSE 'keep_active'
    END as lifecycle_recommendation

  FROM sensor_readings
  WHERE timestamp >= CURRENT_DATE - INTERVAL '2 years'
  GROUP BY DATE_TRUNC('day', timestamp), device.facility
)

SELECT 
  retention_category,
  COUNT(*) as day_buckets,
  SUM(daily_record_count) as total_records,
  ROUND(AVG(avg_daily_quality), 1) as avg_quality_score,

  -- Storage estimates
  ROUND(SUM(estimated_bytes_per_day) / (1024 * 1024), 1) as estimated_mb,
  ROUND(SUM(estimated_bytes_per_day) / (1024 * 1024 * 1024), 2) as estimated_gb,

  -- Lifecycle recommendations
  SUM(CASE WHEN lifecycle_recommendation = 'candidate_for_deletion' THEN daily_record_count ELSE 0 END) as records_for_deletion,
  SUM(CASE WHEN lifecycle_recommendation = 'candidate_for_archive' THEN daily_record_count ELSE 0 END) as records_for_archive,
  SUM(CASE WHEN lifecycle_recommendation = 'keep_active' THEN daily_record_count ELSE 0 END) as records_to_keep,

  -- Storage optimization potential
  ROUND(
    SUM(CASE WHEN lifecycle_recommendation IN ('candidate_for_deletion', 'candidate_for_archive') 
             THEN estimated_bytes_per_day ELSE 0 END) / (1024 * 1024), 1
  ) as potential_storage_savings_mb

FROM data_lifecycle_analysis
GROUP BY retention_category
ORDER BY 
  CASE retention_category 
    WHEN 'hot_data' THEN 1 
    WHEN 'warm_data' THEN 2 
    WHEN 'cold_data' THEN 3 
    WHEN 'archive_data' THEN 4 
  END;

-- QueryLeaf provides comprehensive time series capabilities:
-- 1. High-performance data ingestion with automatic time series optimization
-- 2. Advanced analytics using familiar SQL window functions and aggregations
-- 3. Real-time alerting and threshold monitoring with SQL expressions
-- 4. Facility and device-level dashboards using complex analytical queries
-- 5. Trend analysis and anomaly detection through statistical SQL functions
-- 6. Data lifecycle management with retention and archiving recommendations
-- 7. Geospatial analytics for location-aware IoT deployments
-- 8. Integration with MongoDB's native time series compression and bucketing

Best Practices for Production Time Series Deployments

Performance Optimization Strategies

Essential optimization techniques for high-throughput time series workloads:

  1. Time Series Collection Configuration: Choose optimal granularity and bucket settings based on data patterns
  2. Index Strategy: Create compound indexes optimized for time-range and device queries
  3. Data Retention: Implement automated lifecycle policies for different data temperatures
  4. Aggregation Performance: Design materialized views for frequently accessed analytics
  5. Real-Time Processing: Optimize change streams and triggers for low-latency analytics
  6. Compression Settings: Configure appropriate compression algorithms for time series data patterns

IoT Architecture Design

Design principles for scalable IoT time series systems:

  1. Device Management: Implement device registration, health monitoring, and metadata management
  2. Network Optimization: Design efficient data transmission protocols for IoT constraints
  3. Edge Processing: Implement edge analytics to reduce data transmission and latency
  4. Fault Tolerance: Design robust error handling and offline data synchronization
  5. Security Implementation: Implement device authentication, encryption, and access controls
  6. Scalability Planning: Plan for horizontal scaling across geographic regions and device growth

Conclusion

MongoDB Time Series Collections provide enterprise-grade IoT data management capabilities that address the unique challenges of sensor data ingestion, real-time analytics, and long-term historical analysis. The purpose-built time series optimizations eliminate the complexity and performance limitations of traditional database approaches while delivering sophisticated analytics and monitoring capabilities at IoT scale.

Key MongoDB Time Series advantages include:

  • Optimized Storage: Automatic bucketing and compression specifically designed for time-stamped data
  • High-Velocity Ingestion: Purpose-built write optimization for high-frequency sensor data streams
  • Advanced Analytics: Sophisticated aggregation pipelines for real-time and historical analytics
  • Automatic Lifecycle Management: Built-in data retention and archiving capabilities
  • Scalable Architecture: Horizontal scaling optimized for time series query patterns
  • SQL Accessibility: Familiar time series operations through QueryLeaf's SQL interface

Whether you're building IoT monitoring systems, industrial sensor networks, environmental tracking applications, or real-time analytics platforms, MongoDB Time Series Collections with QueryLeaf's SQL interface provide the foundation for efficient, scalable, and maintainable time series data management that can adapt to evolving IoT requirements while maintaining familiar database interaction patterns.

QueryLeaf Integration: QueryLeaf automatically optimizes SQL-style time series operations for MongoDB's specialized time series collections, enabling developers to leverage advanced IoT analytics through familiar SQL syntax. Complex sensor data aggregations, real-time alerting logic, and trend analysis queries are seamlessly translated into MongoDB's high-performance time series operations, making sophisticated IoT analytics accessible without requiring specialized time series expertise.

The combination of MongoDB's time series optimizations with SQL-familiar operations creates an ideal platform for IoT applications requiring both high-performance data processing and familiar database interaction patterns, ensuring your IoT systems can scale efficiently while maintaining data accessibility and analytical capabilities.

MongoDB ETL and Data Pipeline Processing: High-Performance Data Transformation and Stream Processing with SQL-Familiar Pipeline Architecture

Modern data-driven organizations require sophisticated ETL (Extract, Transform, Load) processes that can handle diverse data sources, perform complex transformations, and deliver processed data to multiple downstream systems in real-time. Traditional ETL tools often struggle with the volume, variety, and velocity requirements of contemporary data workflows, particularly when dealing with semi-structured data, real-time streaming sources, and the need for flexible schema evolution.

MongoDB's aggregation framework, combined with change streams and flexible document storage, provides a powerful foundation for building high-performance ETL pipelines that can process data at scale while maintaining the familiar SQL-style operations that development teams understand. This approach enables efficient data transformation, real-time processing capabilities, and seamless integration with existing data infrastructure.

The Traditional ETL Challenge

Conventional ETL architectures face significant limitations when dealing with modern data requirements:

-- Traditional PostgreSQL ETL limitations
-- Fixed schema constraints limit data source flexibility

CREATE TABLE raw_customer_data (
    customer_id BIGINT PRIMARY KEY,
    email VARCHAR(255),
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    registration_date DATE,

    -- Static schema can't accommodate varying data structures
    profile_data JSONB  -- Limited JSON support
);

-- Complex transformation logic requires stored procedures
CREATE OR REPLACE FUNCTION transform_customer_data()
RETURNS TABLE(
    customer_key BIGINT,
    full_name VARCHAR(201),
    email_domain VARCHAR(100),
    registration_month VARCHAR(7),
    profile_score INTEGER
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        c.customer_id,
        CONCAT(c.first_name, ' ', c.last_name),
        SUBSTRING(c.email FROM POSITION('@' IN c.email) + 1),
        TO_CHAR(c.registration_date, 'YYYY-MM'),
        CASE 
            WHEN c.profile_data->>'premium' = 'true' THEN 100
            WHEN c.profile_data->>'verified' = 'true' THEN 50
            ELSE 25
        END
    FROM raw_customer_data c
    WHERE c.registration_date >= CURRENT_DATE - INTERVAL '30 days';
END;
$$ LANGUAGE plpgsql;

-- Batch processing limitations
INSERT INTO transformed_customers 
SELECT * FROM transform_customer_data();

-- Problems:
-- 1. Rigid schema requirements
-- 2. Limited real-time processing
-- 3. Complex stored procedure logic
-- 4. Poor scaling for large datasets
-- 5. Limited support for nested/complex data structures

MongoDB ETL pipelines address these limitations with flexible aggregation-based transformations:

// MongoDB flexible ETL pipeline
const customerTransformPipeline = [
  // Extract: Flexible data ingestion from multiple sources
  {
    $match: {
      registration_date: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) },
      status: { $ne: "deleted" }
    }
  },

  // Transform: Complex data transformation with aggregation operators
  {
    $addFields: {
      full_name: { $concat: ["$first_name", " ", "$last_name"] },
      email_domain: {
        $substr: [
          "$email",
          { $add: [{ $indexOfBytes: ["$email", "@"] }, 1] },
          { $strLenCP: "$email" }
        ]
      },
      registration_month: {
        $dateToString: { format: "%Y-%m", date: "$registration_date" }
      },
      profile_score: {
        $switch: {
          branches: [
            { case: { $eq: ["$profile.premium", true] }, then: 100 },
            { case: { $eq: ["$profile.verified", true] }, then: 50 }
          ],
          default: 25
        }
      },
      // Handle complex nested transformations
      preferences: {
        $map: {
          input: "$profile.preferences",
          as: "pref",
          in: {
            category: "$$pref.type",
            enabled: "$$pref.active",
            weight: { $multiply: ["$$pref.priority", 10] }
          }
        }
      }
    }
  },

  // Load: Flexible output with computed fields
  {
    $project: {
      customer_key: "$customer_id",
      full_name: 1,
      email_domain: 1,
      registration_month: 1,
      profile_score: 1,
      preferences: 1,
      transformation_timestamp: new Date(),
      source_system: "customer_api"
    }
  }
];

// Process with high-performance aggregation
const transformedCustomers = await db.customers.aggregate(customerTransformPipeline).toArray();

// Benefits:
// - Flexible schema handling
// - Complex nested data transformation
// - High-performance parallel processing
// - Real-time processing capabilities
// - Rich transformation operators

ETL Pipeline Architecture

Data Extraction Patterns

Implement flexible data extraction from multiple sources:

// Comprehensive data extraction service
class DataExtractionService {
  constructor(db, config) {
    this.db = db;
    this.config = config;
    this.rawCollection = db.collection('raw_data');
    this.metadataCollection = db.collection('etl_metadata');
  }

  async extractFromAPI(sourceConfig, extractionId) {
    const extractionMetadata = {
      extraction_id: extractionId,
      source_type: 'api',
      source_config: sourceConfig,
      start_time: new Date(),
      status: 'in_progress',
      records_processed: 0,
      errors: []
    };

    try {
      // API data extraction with pagination
      let hasMore = true;
      let offset = 0;
      const batchSize = sourceConfig.batch_size || 1000;

      while (hasMore) {
        const response = await this.fetchAPIData(sourceConfig, offset, batchSize);

        if (response.data && response.data.length > 0) {
          // Prepare documents for insertion
          const documents = response.data.map(record => ({
            ...record,
            _extraction_metadata: {
              extraction_id: extractionId,
              source_url: sourceConfig.url,
              extracted_at: new Date(),
              batch_offset: offset,
              raw_record: true
            }
          }));

          // Bulk insert with ordered operations
          await this.rawCollection.insertMany(documents, {
            ordered: false,
            writeConcern: { w: 1, j: true }
          });

          extractionMetadata.records_processed += documents.length;
          offset += batchSize;
          hasMore = response.has_more;
        } else {
          hasMore = false;
        }

        // Update progress
        await this.updateExtractionProgress(extractionId, extractionMetadata);
      }

      extractionMetadata.status = 'completed';
      extractionMetadata.end_time = new Date();

    } catch (error) {
      extractionMetadata.status = 'failed';
      extractionMetadata.error = error.message;
      extractionMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.metadataCollection.replaceOne(
        { extraction_id: extractionId },
        extractionMetadata,
        { upsert: true }
      );
    }

    return extractionMetadata;
  }

  async extractFromDatabase(dbConfig, query, extractionId) {
    // Database extraction with change tracking
    const extractionMetadata = {
      extraction_id: extractionId,
      source_type: 'database',
      source_config: dbConfig,
      start_time: new Date(),
      status: 'in_progress',
      records_processed: 0
    };

    try {
      // Get last extraction timestamp for incremental updates
      const lastExtraction = await this.metadataCollection.findOne(
        {
          source_type: 'database',
          'source_config.connection_string': dbConfig.connection_string,
          status: 'completed'
        },
        { sort: { end_time: -1 } }
      );

      // Build incremental query
      let incrementalQuery = { ...query };
      if (lastExtraction && dbConfig.incremental_field) {
        incrementalQuery[dbConfig.incremental_field] = {
          $gt: lastExtraction.end_time
        };
      }

      // Extract with cursor for memory efficiency
      const cursor = this.db.collection(dbConfig.collection).find(incrementalQuery);
      const batchSize = 1000;
      let batch = [];
      let recordCount = 0;

      for await (const doc of cursor) {
        // Add extraction metadata
        const enrichedDoc = {
          ...doc,
          _extraction_metadata: {
            extraction_id: extractionId,
            source_database: dbConfig.database,
            source_collection: dbConfig.collection,
            extracted_at: new Date()
          }
        };

        batch.push(enrichedDoc);

        if (batch.length >= batchSize) {
          await this.rawCollection.insertMany(batch, { ordered: false });
          recordCount += batch.length;
          batch = [];

          // Update progress
          extractionMetadata.records_processed = recordCount;
          await this.updateExtractionProgress(extractionId, extractionMetadata);
        }
      }

      // Insert final batch
      if (batch.length > 0) {
        await this.rawCollection.insertMany(batch, { ordered: false });
        recordCount += batch.length;
      }

      extractionMetadata.records_processed = recordCount;
      extractionMetadata.status = 'completed';
      extractionMetadata.end_time = new Date();

    } catch (error) {
      extractionMetadata.status = 'failed';
      extractionMetadata.error = error.message;
      extractionMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.metadataCollection.replaceOne(
        { extraction_id: extractionId },
        extractionMetadata,
        { upsert: true }
      );
    }

    return extractionMetadata;
  }

  async extractFromFiles(fileConfig, extractionId) {
    // File-based extraction (CSV, JSON, XML, etc.)
    const extractionMetadata = {
      extraction_id: extractionId,
      source_type: 'file',
      source_config: fileConfig,
      start_time: new Date(),
      status: 'in_progress',
      files_processed: 0,
      records_processed: 0
    };

    try {
      const files = await this.getFilesFromSource(fileConfig);

      for (const filePath of files) {
        const fileData = await this.parseFile(filePath, fileConfig.format);

        if (fileData && fileData.length > 0) {
          const enrichedDocuments = fileData.map(record => ({
            ...record,
            _extraction_metadata: {
              extraction_id: extractionId,
              source_file: filePath,
              extracted_at: new Date(),
              file_format: fileConfig.format
            }
          }));

          // Batch insert file data
          const batchSize = 1000;
          for (let i = 0; i < enrichedDocuments.length; i += batchSize) {
            const batch = enrichedDocuments.slice(i, i + batchSize);
            await this.rawCollection.insertMany(batch, { ordered: false });
            extractionMetadata.records_processed += batch.length;
          }
        }

        extractionMetadata.files_processed++;
        await this.updateExtractionProgress(extractionId, extractionMetadata);
      }

      extractionMetadata.status = 'completed';
      extractionMetadata.end_time = new Date();

    } catch (error) {
      extractionMetadata.status = 'failed';
      extractionMetadata.error = error.message;
      extractionMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.metadataCollection.replaceOne(
        { extraction_id: extractionId },
        extractionMetadata,
        { upsert: true }
      );
    }

    return extractionMetadata;
  }

  async fetchAPIData(config, offset, limit) {
    // Implement API-specific data fetching logic
    // This would integrate with actual API clients
    const url = `${config.url}?offset=${offset}&limit=${limit}`;
    // Return { data: [...], has_more: boolean }
    return { data: [], has_more: false }; // Placeholder
  }

  async parseFile(filePath, format) {
    // Implement file parsing logic for various formats
    // CSV, JSON, XML, Parquet, etc.
    return []; // Placeholder
  }

  async getFilesFromSource(config) {
    // Get file list from various sources (S3, FTP, local filesystem)
    return []; // Placeholder
  }

  async updateExtractionProgress(extractionId, metadata) {
    await this.metadataCollection.updateOne(
      { extraction_id: extractionId },
      { $set: metadata },
      { upsert: true }
    );
  }
}

Data Transformation Engine

Build sophisticated data transformation pipelines:

// Advanced data transformation service
class DataTransformationService {
  constructor(db) {
    this.db = db;
    this.rawCollection = db.collection('raw_data');
    this.transformedCollection = db.collection('transformed_data');
    this.errorCollection = db.collection('transformation_errors');
  }

  async executeTransformationPipeline(pipelineConfig, transformationId) {
    const transformationMetadata = {
      transformation_id: transformationId,
      pipeline_name: pipelineConfig.name,
      start_time: new Date(),
      status: 'in_progress',
      records_processed: 0,
      records_transformed: 0,
      errors_encountered: 0,
      stages: []
    };

    try {
      // Build dynamic aggregation pipeline
      const aggregationPipeline = this.buildAggregationPipeline(pipelineConfig);

      // Execute transformation with error handling
      const transformationResults = await this.executeWithErrorHandling(
        aggregationPipeline, 
        transformationId,
        transformationMetadata
      );

      transformationMetadata.records_transformed = transformationResults.successCount;
      transformationMetadata.errors_encountered = transformationResults.errorCount;
      transformationMetadata.status = 'completed';
      transformationMetadata.end_time = new Date();

      return transformationMetadata;

    } catch (error) {
      transformationMetadata.status = 'failed';
      transformationMetadata.error = error.message;
      transformationMetadata.end_time = new Date();
      throw error;
    }
  }

  buildAggregationPipeline(config) {
    const pipeline = [];

    // Stage 1: Data filtering and source selection
    if (config.source_filter) {
      pipeline.push({
        $match: {
          ...config.source_filter,
          '_extraction_metadata.extraction_id': { $exists: true }
        }
      });
    }

    // Stage 2: Data cleansing and validation
    if (config.cleansing_rules) {
      pipeline.push({
        $addFields: {
          // Clean and validate data
          cleaned_data: {
            $let: {
              vars: {
                cleaned: {
                  $switch: {
                    branches: config.cleansing_rules.map(rule => ({
                      case: rule.condition,
                      then: rule.transformation
                    })),
                    default: "$$ROOT"
                  }
                }
              },
              in: "$$cleaned"
            }
          }
        }
      });
    }

    // Stage 3: Data enrichment and computed fields
    if (config.enrichment_rules) {
      const enrichmentFields = {};

      config.enrichment_rules.forEach(rule => {
        enrichmentFields[rule.target_field] = rule.computation;
      });

      pipeline.push({
        $addFields: enrichmentFields
      });
    }

    // Stage 4: Nested document transformations
    if (config.nested_transformations) {
      config.nested_transformations.forEach(transformation => {
        if (transformation.type === 'array_processing') {
          pipeline.push({
            $addFields: {
              [transformation.target_field]: {
                $map: {
                  input: `$${transformation.source_field}`,
                  as: "item",
                  in: transformation.item_transformation
                }
              }
            }
          });
        } else if (transformation.type === 'object_flattening') {
          pipeline.push({
            $addFields: this.buildFlatteningTransformation(transformation)
          });
        }
      });
    }

    // Stage 5: Data aggregation and grouping
    if (config.aggregation_rules) {
      config.aggregation_rules.forEach(rule => {
        if (rule.type === 'group') {
          pipeline.push({
            $group: {
              _id: rule.group_by,
              ...rule.aggregations
            }
          });
        } else if (rule.type === 'bucket') {
          pipeline.push({
            $bucket: {
              groupBy: rule.group_by,
              boundaries: rule.boundaries,
              default: rule.default_bucket,
              output: rule.output
            }
          });
        }
      });
    }

    // Stage 6: Output formatting and projection
    if (config.output_format) {
      pipeline.push({
        $project: {
          ...config.output_format.fields,
          _transformation_metadata: {
            transformation_id: config.transformation_id,
            pipeline_name: config.name,
            transformed_at: new Date(),
            source_extraction_id: "$_extraction_metadata.extraction_id"
          }
        }
      });
    }

    return pipeline;
  }

  async executeWithErrorHandling(pipeline, transformationId, metadata) {
    let successCount = 0;
    let errorCount = 0;
    const batchSize = 1000;

    try {
      // Use aggregation cursor for memory-efficient processing
      const cursor = this.rawCollection.aggregate(pipeline, {
        allowDiskUse: true,
        cursor: { batchSize }
      });

      let batch = [];

      for await (const document of cursor) {
        try {
          // Additional validation can be performed here
          if (this.validateTransformedDocument(document)) {
            batch.push(document);
            successCount++;
          } else {
            await this.logTransformationError(
              transformationId,
              document,
              'validation_failed',
              'Document failed validation rules'
            );
            errorCount++;
          }

          // Process batch when full
          if (batch.length >= batchSize) {
            await this.insertTransformedBatch(batch, transformationId);
            batch = [];
          }

        } catch (docError) {
          await this.logTransformationError(
            transformationId,
            document,
            'processing_error',
            docError.message
          );
          errorCount++;
        }
      }

      // Insert remaining documents
      if (batch.length > 0) {
        await this.insertTransformedBatch(batch, transformationId);
      }

      return { successCount, errorCount };

    } catch (pipelineError) {
      throw new Error(`Pipeline execution failed: ${pipelineError.message}`);
    }
  }

  buildFlatteningTransformation(config) {
    const flattenedFields = {};

    // Flatten nested object structure
    const flattenObject = (obj, prefix = '') => {
      for (const [key, value] of Object.entries(obj)) {
        const newKey = prefix ? `${prefix}.${key}` : key;

        if (typeof value === 'object' && !Array.isArray(value)) {
          flattenObject(value, newKey);
        } else {
          flattenedFields[newKey.replace('.', '_')] = `$${newKey}`;
        }
      }
    };

    flattenObject(config.source_structure);
    return flattenedFields;
  }

  async insertTransformedBatch(batch, transformationId) {
    try {
      await this.transformedCollection.insertMany(batch, {
        ordered: false,
        writeConcern: { w: 1, j: true }
      });
    } catch (error) {
      // Handle partial failures in batch
      if (error.writeErrors) {
        for (const writeError of error.writeErrors) {
          await this.logTransformationError(
            transformationId,
            batch[writeError.index],
            'insert_error',
            writeError.errmsg
          );
        }
      } else {
        throw error;
      }
    }
  }

  validateTransformedDocument(doc) {
    // Implement document validation logic
    // Check required fields, data types, business rules, etc.
    return true; // Placeholder
  }

  async logTransformationError(transformationId, document, errorType, errorMessage) {
    const errorDoc = {
      transformation_id: transformationId,
      error_type: errorType,
      error_message: errorMessage,
      failed_document_id: document._id,
      failed_document_sample: JSON.stringify(document).substring(0, 1000),
      timestamp: new Date()
    };

    await this.errorCollection.insertOne(errorDoc);
  }

  // Complex transformation functions
  async executeCustomTransformations(documents, transformationConfig) {
    return documents.map(doc => {
      let transformedDoc = { ...doc };

      // Text processing transformations
      if (transformationConfig.text_processing) {
        transformedDoc = this.applyTextTransformations(transformedDoc, transformationConfig.text_processing);
      }

      // Date and time transformations
      if (transformationConfig.date_processing) {
        transformedDoc = this.applyDateTransformations(transformedDoc, transformationConfig.date_processing);
      }

      // Numerical computations
      if (transformationConfig.numerical_processing) {
        transformedDoc = this.applyNumericalTransformations(transformedDoc, transformationConfig.numerical_processing);
      }

      return transformedDoc;
    });
  }

  applyTextTransformations(doc, config) {
    // Text cleaning, normalization, extraction
    config.forEach(rule => {
      const fieldValue = this.getNestedValue(doc, rule.field);
      if (fieldValue && typeof fieldValue === 'string') {
        switch (rule.operation) {
          case 'normalize':
            this.setNestedValue(doc, rule.field, fieldValue.toLowerCase().trim());
            break;
          case 'extract_email':
            const emailMatch = fieldValue.match(/[\w\.-]+@[\w\.-]+\.\w+/);
            if (emailMatch) {
              this.setNestedValue(doc, rule.target_field, emailMatch[0]);
            }
            break;
          case 'extract_phone':
            const phoneMatch = fieldValue.match(/(\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}/);
            if (phoneMatch) {
              this.setNestedValue(doc, rule.target_field, phoneMatch[0]);
            }
            break;
          case 'split':
            const parts = fieldValue.split(rule.delimiter);
            this.setNestedValue(doc, rule.target_field, parts);
            break;
        }
      }
    });

    return doc;
  }

  applyDateTransformations(doc, config) {
    // Date parsing, formatting, calculations
    config.forEach(rule => {
      const fieldValue = this.getNestedValue(doc, rule.field);
      if (fieldValue) {
        switch (rule.operation) {
          case 'parse_date':
            const parsedDate = new Date(fieldValue);
            if (!isNaN(parsedDate.getTime())) {
              this.setNestedValue(doc, rule.target_field, parsedDate);
            }
            break;
          case 'extract_components':
            const date = new Date(fieldValue);
            if (!isNaN(date.getTime())) {
              this.setNestedValue(doc, `${rule.target_field}_year`, date.getFullYear());
              this.setNestedValue(doc, `${rule.target_field}_month`, date.getMonth() + 1);
              this.setNestedValue(doc, `${rule.target_field}_day`, date.getDate());
            }
            break;
          case 'age_calculation':
            const birthDate = new Date(fieldValue);
            const age = Math.floor((Date.now() - birthDate.getTime()) / (1000 * 60 * 60 * 24 * 365));
            this.setNestedValue(doc, rule.target_field, age);
            break;
        }
      }
    });

    return doc;
  }

  applyNumericalTransformations(doc, config) {
    // Numerical calculations, conversions, aggregations
    config.forEach(rule => {
      const fieldValue = this.getNestedValue(doc, rule.field);
      if (typeof fieldValue === 'number') {
        switch (rule.operation) {
          case 'currency_conversion':
            const convertedValue = fieldValue * rule.exchange_rate;
            this.setNestedValue(doc, rule.target_field, Math.round(convertedValue * 100) / 100);
            break;
          case 'percentage_calculation':
            const total = this.getNestedValue(doc, rule.total_field);
            if (total && total !== 0) {
              const percentage = (fieldValue / total) * 100;
              this.setNestedValue(doc, rule.target_field, Math.round(percentage * 100) / 100);
            }
            break;
          case 'range_classification':
            let classification = 'unknown';
            for (const range of rule.ranges) {
              if (fieldValue >= range.min && fieldValue <= range.max) {
                classification = range.label;
                break;
              }
            }
            this.setNestedValue(doc, rule.target_field, classification);
            break;
        }
      }
    });

    return doc;
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, prop) => current?.[prop], obj);
  }

  setNestedValue(obj, path, value) {
    const props = path.split('.');
    const lastProp = props.pop();
    const target = props.reduce((current, prop) => {
      if (!current[prop]) current[prop] = {};
      return current[prop];
    }, obj);
    target[lastProp] = value;
  }
}

Data Loading and Output Management

Implement flexible data loading strategies:

// Data loading and output service
class DataLoadingService {
  constructor(db) {
    this.db = db;
    this.transformedCollection = db.collection('transformed_data');
    this.outputMetadataCollection = db.collection('output_metadata');
  }

  async loadToMongoDB(targetConfig, loadId) {
    const loadMetadata = {
      load_id: loadId,
      target_type: 'mongodb',
      target_config: targetConfig,
      start_time: new Date(),
      status: 'in_progress',
      records_loaded: 0,
      errors: []
    };

    try {
      const targetCollection = this.db.collection(targetConfig.collection);

      // Configure loading strategy
      const loadingStrategy = targetConfig.strategy || 'append';

      if (loadingStrategy === 'replace') {
        // Clear target collection before loading
        await targetCollection.deleteMany({});
      }

      // Load data in batches
      const batchSize = targetConfig.batch_size || 1000;
      const pipeline = this.buildLoadingPipeline(targetConfig);
      const cursor = this.transformedCollection.aggregate(pipeline, {
        allowDiskUse: true,
        cursor: { batchSize }
      });

      let batch = [];
      let recordCount = 0;

      for await (const doc of cursor) {
        // Apply final transformations for target schema
        const finalDoc = this.applyTargetTransformations(doc, targetConfig);

        batch.push(finalDoc);

        if (batch.length >= batchSize) {
          await this.insertBatch(targetCollection, batch, loadingStrategy);
          recordCount += batch.length;
          batch = [];

          // Update progress
          loadMetadata.records_loaded = recordCount;
          await this.updateLoadProgress(loadId, loadMetadata);
        }
      }

      // Insert final batch
      if (batch.length > 0) {
        await this.insertBatch(targetCollection, batch, loadingStrategy);
        recordCount += batch.length;
      }

      // Create indexes if specified
      if (targetConfig.indexes) {
        await this.createTargetIndexes(targetCollection, targetConfig.indexes);
      }

      loadMetadata.records_loaded = recordCount;
      loadMetadata.status = 'completed';
      loadMetadata.end_time = new Date();

    } catch (error) {
      loadMetadata.status = 'failed';
      loadMetadata.error = error.message;
      loadMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.outputMetadataCollection.replaceOne(
        { load_id: loadId },
        loadMetadata,
        { upsert: true }
      );
    }

    return loadMetadata;
  }

  async loadToWarehouse(warehouseConfig, loadId) {
    // Load data to external data warehouse (Snowflake, Redshift, BigQuery)
    const loadMetadata = {
      load_id: loadId,
      target_type: 'warehouse',
      target_config: warehouseConfig,
      start_time: new Date(),
      status: 'in_progress'
    };

    try {
      // Export data to format compatible with warehouse
      const exportFormat = warehouseConfig.format || 'parquet';
      const exportPath = await this.exportToFile(warehouseConfig, exportFormat);

      // Upload to warehouse staging area
      await this.uploadToWarehouse(exportPath, warehouseConfig);

      // Execute warehouse loading commands
      const loadResults = await this.executeWarehouseLoad(warehouseConfig);

      loadMetadata.records_loaded = loadResults.recordCount;
      loadMetadata.warehouse_table = loadResults.tableName;
      loadMetadata.status = 'completed';
      loadMetadata.end_time = new Date();

    } catch (error) {
      loadMetadata.status = 'failed';
      loadMetadata.error = error.message;
      loadMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.outputMetadataCollection.replaceOne(
        { load_id: loadId },
        loadMetadata,
        { upsert: true }
      );
    }

    return loadMetadata;
  }

  async loadToAPI(apiConfig, loadId) {
    // Push data to external APIs
    const loadMetadata = {
      load_id: loadId,
      target_type: 'api',
      target_config: apiConfig,
      start_time: new Date(),
      status: 'in_progress',
      api_calls_made: 0,
      records_sent: 0
    };

    try {
      const batchSize = apiConfig.batch_size || 100;
      const pipeline = this.buildLoadingPipeline(apiConfig);
      const cursor = this.transformedCollection.aggregate(pipeline);

      let batch = [];
      let apiCallCount = 0;
      let recordCount = 0;

      for await (const doc of cursor) {
        const apiDoc = this.formatForAPI(doc, apiConfig);
        batch.push(apiDoc);

        if (batch.length >= batchSize) {
          await this.sendToAPI(batch, apiConfig);
          apiCallCount++;
          recordCount += batch.length;
          batch = [];

          // Rate limiting
          if (apiConfig.rate_limit_delay) {
            await this.delay(apiConfig.rate_limit_delay);
          }

          // Update progress
          loadMetadata.api_calls_made = apiCallCount;
          loadMetadata.records_sent = recordCount;
          await this.updateLoadProgress(loadId, loadMetadata);
        }
      }

      // Send final batch
      if (batch.length > 0) {
        await this.sendToAPI(batch, apiConfig);
        apiCallCount++;
        recordCount += batch.length;
      }

      loadMetadata.api_calls_made = apiCallCount;
      loadMetadata.records_sent = recordCount;
      loadMetadata.status = 'completed';
      loadMetadata.end_time = new Date();

    } catch (error) {
      loadMetadata.status = 'failed';
      loadMetadata.error = error.message;
      loadMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.outputMetadataCollection.replaceOne(
        { load_id: loadId },
        loadMetadata,
        { upsert: true }
      );
    }

    return loadMetadata;
  }

  buildLoadingPipeline(config) {
    const pipeline = [];

    // Filter data for loading
    if (config.filter) {
      pipeline.push({ $match: config.filter });
    }

    // Sort for consistent ordering
    if (config.sort) {
      pipeline.push({ $sort: config.sort });
    }

    // Limit if specified
    if (config.limit) {
      pipeline.push({ $limit: config.limit });
    }

    // Project fields for target format
    if (config.projection) {
      pipeline.push({ $project: config.projection });
    }

    return pipeline;
  }

  applyTargetTransformations(doc, config) {
    let transformedDoc = { ...doc };

    // Apply target-specific field mappings
    if (config.field_mappings) {
      const mappedDoc = {};
      for (const [sourceField, targetField] of Object.entries(config.field_mappings)) {
        const value = this.getNestedValue(transformedDoc, sourceField);
        if (value !== undefined) {
          this.setNestedValue(mappedDoc, targetField, value);
        }
      }
      transformedDoc = { ...transformedDoc, ...mappedDoc };
    }

    // Apply data type conversions
    if (config.type_conversions) {
      config.type_conversions.forEach(conversion => {
        const value = this.getNestedValue(transformedDoc, conversion.field);
        if (value !== undefined) {
          const convertedValue = this.convertDataType(value, conversion.target_type, conversion.options);
          this.setNestedValue(transformedDoc, conversion.field, convertedValue);
        }
      });
    }

    return transformedDoc;
  }

  async insertBatch(collection, batch, strategy) {
    if (strategy === 'upsert') {
      // Perform upsert operations
      const bulkOps = batch.map(doc => ({
        replaceOne: {
          filter: { [doc._id ? '_id' : 'unique_key']: doc._id || doc.unique_key },
          replacement: doc,
          upsert: true
        }
      }));
      await collection.bulkWrite(bulkOps, { ordered: false });
    } else {
      // Regular insert
      await collection.insertMany(batch, { ordered: false });
    }
  }

  async createTargetIndexes(collection, indexDefinitions) {
    for (const indexDef of indexDefinitions) {
      try {
        await collection.createIndex(indexDef.fields, indexDef.options || {});
      } catch (error) {
        console.warn(`Failed to create index: ${error.message}`);
      }
    }
  }

  convertDataType(value, targetType, options = {}) {
    switch (targetType) {
      case 'string':
        return String(value);
      case 'number':
        return Number(value);
      case 'date':
        return new Date(value);
      case 'boolean':
        return Boolean(value);
      case 'array':
        return Array.isArray(value) ? value : [value];
      case 'object':
        return typeof value === 'object' ? value : { value };
      default:
        return value;
    }
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, prop) => current?.[prop], obj);
  }

  setNestedValue(obj, path, value) {
    const props = path.split('.');
    const lastProp = props.pop();
    const target = props.reduce((current, prop) => {
      if (!current[prop]) current[prop] = {};
      return current[prop];
    }, obj);
    target[lastProp] = value;
  }

  async updateLoadProgress(loadId, metadata) {
    await this.outputMetadataCollection.updateOne(
      { load_id: loadId },
      { $set: metadata },
      { upsert: true }
    );
  }

  async exportToFile(config, format) {
    // Implement file export logic
    return '/tmp/export.parquet'; // Placeholder
  }

  async uploadToWarehouse(filePath, config) {
    // Implement warehouse upload logic
  }

  async executeWarehouseLoad(config) {
    // Execute warehouse-specific loading commands
    return { recordCount: 0, tableName: config.table }; // Placeholder
  }

  formatForAPI(doc, config) {
    // Format document according to API requirements
    return doc; // Placeholder
  }

  async sendToAPI(batch, config) {
    // Send data to external API
  }

  delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Real-Time Stream Processing

Change Stream ETL

Implement real-time ETL using MongoDB change streams:

// Real-time ETL using change streams
class StreamETLProcessor {
  constructor(db, config) {
    this.db = db;
    this.config = config;
    this.changeStreams = new Map();
    this.transformationService = new DataTransformationService(db);
    this.loadingService = new DataLoadingService(db);
  }

  async startStreamProcessing(streamConfig) {
    const sourceCollection = this.db.collection(streamConfig.source_collection);

    // Configure change stream options
    const changeStreamOptions = {
      fullDocument: 'updateLookup',
      resumeAfter: streamConfig.resume_token,
      maxAwaitTimeMS: 1000
    };

    // Create change stream with pipeline filter
    const pipeline = [];
    if (streamConfig.operation_filter) {
      pipeline.push({
        $match: {
          'operationType': { $in: streamConfig.operation_filter }
        }
      });
    }

    if (streamConfig.document_filter) {
      pipeline.push({
        $match: {
          'fullDocument': streamConfig.document_filter
        }
      });
    }

    const changeStream = sourceCollection.watch(pipeline, changeStreamOptions);
    this.changeStreams.set(streamConfig.stream_id, changeStream);

    // Process change events
    changeStream.on('change', async (changeEvent) => {
      try {
        await this.processChangeEvent(changeEvent, streamConfig);
      } catch (error) {
        console.error('Error processing change event:', error);
        await this.handleStreamError(error, changeEvent, streamConfig);
      }
    });

    changeStream.on('error', async (error) => {
      console.error('Change stream error:', error);
      await this.handleStreamError(error, null, streamConfig);
    });

    console.log(`Started stream processing for ${streamConfig.stream_id}`);
    return changeStream;
  }

  async processChangeEvent(changeEvent, streamConfig) {
    const processingMetadata = {
      stream_id: streamConfig.stream_id,
      change_event_id: changeEvent._id,
      operation_type: changeEvent.operationType,
      processing_time: new Date(),
      status: 'processing'
    };

    try {
      let documentToProcess = null;

      // Extract document based on operation type
      switch (changeEvent.operationType) {
        case 'insert':
        case 'replace':
          documentToProcess = changeEvent.fullDocument;
          break;
        case 'update':
          documentToProcess = changeEvent.fullDocument;
          // Include update description for delta processing
          documentToProcess._updateDescription = changeEvent.updateDescription;
          break;
        case 'delete':
          documentToProcess = changeEvent.fullDocumentBeforeChange || 
                             { _id: changeEvent.documentKey._id, _deleted: true };
          break;
      }

      if (documentToProcess) {
        // Apply real-time transformations
        const transformedDocument = await this.applyStreamTransformations(
          documentToProcess, 
          streamConfig.transformations,
          changeEvent
        );

        // Apply business rules and validations
        if (streamConfig.business_rules) {
          const validationResult = await this.validateBusinessRules(
            transformedDocument,
            streamConfig.business_rules,
            changeEvent
          );

          if (!validationResult.valid) {
            await this.handleValidationFailure(validationResult, transformedDocument, streamConfig);
            return;
          }
        }

        // Load to target systems
        if (streamConfig.target_systems) {
          await this.loadToTargetSystems(
            transformedDocument,
            streamConfig.target_systems,
            changeEvent
          );
        }

        // Update processing status
        processingMetadata.status = 'completed';
        processingMetadata.records_processed = 1;

      }

      await this.logStreamProcessing(processingMetadata);

    } catch (error) {
      processingMetadata.status = 'failed';
      processingMetadata.error = error.message;
      await this.logStreamProcessing(processingMetadata);
      throw error;
    }
  }

  async applyStreamTransformations(document, transformations, changeEvent) {
    let transformedDoc = { ...document };

    for (const transformation of transformations) {
      switch (transformation.type) {
        case 'field_mapping':
          transformedDoc = this.applyFieldMapping(transformedDoc, transformation.mapping);
          break;

        case 'computed_fields':
          transformedDoc = await this.applyComputedFields(transformedDoc, transformation.computations);
          break;

        case 'enrichment':
          transformedDoc = await this.applyEnrichment(transformedDoc, transformation.enrichment_config);
          break;

        case 'aggregation':
          transformedDoc = await this.applyStreamAggregation(transformedDoc, transformation.aggregation_config, changeEvent);
          break;

        case 'custom_function':
          transformedDoc = await transformation.function(transformedDoc, changeEvent);
          break;
      }
    }

    // Add stream processing metadata
    transformedDoc._stream_metadata = {
      stream_id: changeEvent.streamId,
      change_event_id: changeEvent._id,
      operation_type: changeEvent.operationType,
      processed_at: new Date(),
      cluster_time: changeEvent.clusterTime
    };

    return transformedDoc;
  }

  async applyComputedFields(document, computations) {
    const enrichedDoc = { ...document };

    for (const computation of computations) {
      try {
        let computedValue = null;

        switch (computation.type) {
          case 'lookup':
            computedValue = await this.performLookup(document, computation.config);
            break;

          case 'calculation':
            computedValue = this.performCalculation(document, computation.config);
            break;

          case 'text_processing':
            computedValue = this.performTextProcessing(document, computation.config);
            break;

          case 'date_calculation':
            computedValue = this.performDateCalculation(document, computation.config);
            break;
        }

        if (computedValue !== null) {
          this.setNestedValue(enrichedDoc, computation.target_field, computedValue);
        }

      } catch (error) {
        console.warn(`Failed to compute field ${computation.target_field}:`, error);
      }
    }

    return enrichedDoc;
  }

  async applyEnrichment(document, enrichmentConfig) {
    const enrichedDoc = { ...document };

    for (const enrichment of enrichmentConfig) {
      try {
        const lookupCollection = this.db.collection(enrichment.lookup_collection);
        const lookupKey = this.getNestedValue(document, enrichment.source_field);

        if (lookupKey) {
          const lookupDoc = await lookupCollection.findOne({
            [enrichment.lookup_field]: lookupKey
          });

          if (lookupDoc) {
            // Merge lookup data
            if (enrichment.merge_strategy === 'full') {
              Object.assign(enrichedDoc, lookupDoc);
            } else if (enrichment.merge_strategy === 'selective') {
              enrichment.fields_to_merge.forEach(field => {
                if (lookupDoc[field] !== undefined) {
                  this.setNestedValue(enrichedDoc, enrichment.target_prefix + field, lookupDoc[field]);
                }
              });
            }
          }
        }

      } catch (error) {
        console.warn(`Failed to apply enrichment ${enrichment.name}:`, error);
      }
    }

    return enrichedDoc;
  }

  async applyStreamAggregation(document, aggregationConfig, changeEvent) {
    const aggregatedDoc = { ...document };

    // Real-time aggregation using upsert operations
    for (const aggregation of aggregationConfig) {
      try {
        const aggregationCollection = this.db.collection(aggregation.target_collection);
        const groupingKey = this.buildGroupingKey(document, aggregation.group_by);

        // Build update operations for real-time aggregation
        const updateOps = {};

        aggregation.aggregations.forEach(agg => {
          const sourceValue = this.getNestedValue(document, agg.source_field);

          switch (agg.operation) {
            case 'sum':
              updateOps.$inc = updateOps.$inc || {};
              updateOps.$inc[agg.target_field] = sourceValue || 0;
              break;

            case 'count':
              updateOps.$inc = updateOps.$inc || {};
              updateOps.$inc[agg.target_field] = 1;
              break;

            case 'avg':
              updateOps.$inc = updateOps.$inc || {};
              updateOps.$inc[`${agg.target_field}_sum`] = sourceValue || 0;
              updateOps.$inc[`${agg.target_field}_count`] = 1;
              break;

            case 'min':
              updateOps.$min = updateOps.$min || {};
              updateOps.$min[agg.target_field] = sourceValue;
              break;

            case 'max':
              updateOps.$max = updateOps.$max || {};
              updateOps.$max[agg.target_field] = sourceValue;
              break;

            case 'addToSet':
              updateOps.$addToSet = updateOps.$addToSet || {};
              updateOps.$addToSet[agg.target_field] = sourceValue;
              break;
          }
        });

        // Set metadata fields
        updateOps.$set = updateOps.$set || {};
        updateOps.$set.last_updated = new Date();
        updateOps.$set.last_change_event = changeEvent._id;

        // Perform upsert operation
        await aggregationCollection.updateOne(
          groupingKey,
          updateOps,
          { upsert: true }
        );

      } catch (error) {
        console.warn(`Failed to apply stream aggregation ${aggregation.name}:`, error);
      }
    }

    return aggregatedDoc;
  }

  buildGroupingKey(document, groupByConfig) {
    const groupingKey = {};

    if (Array.isArray(groupByConfig)) {
      groupByConfig.forEach(field => {
        const value = this.getNestedValue(document, field);
        groupingKey[field.replace('.', '_')] = value;
      });
    } else if (typeof groupByConfig === 'object') {
      Object.entries(groupByConfig).forEach(([key, expression]) => {
        // Support for complex grouping expressions
        groupingKey[key] = this.evaluateExpression(document, expression);
      });
    }

    return groupingKey;
  }

  async validateBusinessRules(document, businessRules, changeEvent) {
    const validationResult = {
      valid: true,
      failures: [],
      warnings: []
    };

    for (const rule of businessRules) {
      try {
        const ruleResult = await this.evaluateBusinessRule(document, rule, changeEvent);

        if (!ruleResult.passed) {
          if (rule.severity === 'error') {
            validationResult.valid = false;
            validationResult.failures.push({
              rule: rule.name,
              message: ruleResult.message
            });
          } else if (rule.severity === 'warning') {
            validationResult.warnings.push({
              rule: rule.name,
              message: ruleResult.message
            });
          }
        }

      } catch (error) {
        validationResult.valid = false;
        validationResult.failures.push({
          rule: rule.name,
          message: `Rule evaluation failed: ${error.message}`
        });
      }
    }

    return validationResult;
  }

  async evaluateBusinessRule(document, rule, changeEvent) {
    switch (rule.type) {
      case 'field_validation':
        return this.validateField(document, rule.config);

      case 'cross_document_validation':
        return await this.validateCrossDocument(document, rule.config);

      case 'temporal_validation':
        return this.validateTemporal(document, changeEvent, rule.config);

      case 'custom_validation':
        return await rule.config.validator(document, changeEvent);

      default:
        return { passed: true };
    }
  }

  validateField(document, config) {
    const fieldValue = this.getNestedValue(document, config.field);

    // Check required fields
    if (config.required && (fieldValue === undefined || fieldValue === null)) {
      return {
        passed: false,
        message: `Required field '${config.field}' is missing`
      };
    }

    // Check data type
    if (fieldValue !== undefined && config.data_type) {
      const actualType = typeof fieldValue;
      if (actualType !== config.data_type) {
        return {
          passed: false,
          message: `Field '${config.field}' expected type ${config.data_type}, got ${actualType}`
        };
      }
    }

    // Check value range
    if (fieldValue !== undefined && config.range) {
      if (fieldValue < config.range.min || fieldValue > config.range.max) {
        return {
          passed: false,
          message: `Field '${config.field}' value ${fieldValue} is outside range [${config.range.min}, ${config.range.max}]`
        };
      }
    }

    // Check allowed values
    if (fieldValue !== undefined && config.allowed_values) {
      if (!config.allowed_values.includes(fieldValue)) {
        return {
          passed: false,
          message: `Field '${config.field}' value '${fieldValue}' is not in allowed values: ${config.allowed_values.join(', ')}`
        };
      }
    }

    return { passed: true };
  }

  async validateCrossDocument(document, config) {
    const referenceCollection = this.db.collection(config.reference_collection);
    const referenceValue = this.getNestedValue(document, config.source_field);

    if (referenceValue) {
      const referenceDoc = await referenceCollection.findOne({
        [config.reference_field]: referenceValue
      });

      if (!referenceDoc && config.required) {
        return {
          passed: false,
          message: `Reference document not found for ${config.source_field}: ${referenceValue}`
        };
      }

      // Additional cross-document validations
      if (referenceDoc && config.additional_checks) {
        for (const check of config.additional_checks) {
          const refValue = this.getNestedValue(referenceDoc, check.reference_field);
          const docValue = this.getNestedValue(document, check.document_field);

          if (!this.compareValues(docValue, refValue, check.comparison)) {
            return {
              passed: false,
              message: `Cross-document validation failed: ${check.message}`
            };
          }
        }
      }
    }

    return { passed: true };
  }

  validateTemporal(document, changeEvent, config) {
    const timestamp = changeEvent.clusterTime || new Date();

    // Check business hours
    if (config.business_hours) {
      const hour = timestamp.getHours();
      if (hour < config.business_hours.start || hour > config.business_hours.end) {
        return {
          passed: false,
          message: `Operation outside business hours: ${hour}`
        };
      }
    }

    // Check rate limits
    if (config.rate_limit) {
      // This would require additional state tracking
      // Implementation depends on specific rate limiting strategy
    }

    return { passed: true };
  }

  compareValues(value1, value2, comparison) {
    switch (comparison) {
      case 'eq': return value1 === value2;
      case 'ne': return value1 !== value2;
      case 'gt': return value1 > value2;
      case 'gte': return value1 >= value2;
      case 'lt': return value1 < value2;
      case 'lte': return value1 <= value2;
      default: return false;
    }
  }

  async loadToTargetSystems(document, targetSystems, changeEvent) {
    for (const target of targetSystems) {
      try {
        switch (target.type) {
          case 'mongodb':
            await this.loadToMongoTarget(document, target.config);
            break;

          case 'elasticsearch':
            await this.loadToElasticsearch(document, target.config);
            break;

          case 'kafka':
            await this.loadToKafka(document, target.config, changeEvent);
            break;

          case 'webhook':
            await this.loadToWebhook(document, target.config);
            break;
        }

      } catch (error) {
        console.error(`Failed to load to target system ${target.name}:`, error);
        await this.handleTargetLoadError(error, document, target, changeEvent);
      }
    }
  }

  async loadToMongoTarget(document, config) {
    const targetCollection = this.db.collection(config.collection);

    if (config.strategy === 'upsert') {
      const filter = {};
      config.unique_fields.forEach(field => {
        filter[field] = this.getNestedValue(document, field);
      });

      await targetCollection.replaceOne(filter, document, { upsert: true });
    } else {
      await targetCollection.insertOne(document);
    }
  }

  async loadToKafka(document, config, changeEvent) {
    // Send to Kafka topic
    const message = {
      key: this.getNestedValue(document, config.key_field),
      value: JSON.stringify(document),
      headers: {
        operation_type: changeEvent.operationType,
        timestamp: new Date().toISOString()
      }
    };

    // This would use a Kafka producer client
    console.log(`Would send to Kafka topic ${config.topic}:`, message);
  }

  async loadToWebhook(document, config) {
    // Send HTTP request to webhook
    const payload = {
      data: document,
      timestamp: new Date().toISOString(),
      source: 'mongodb-etl'
    };

    // This would use an HTTP client
    console.log(`Would send webhook to ${config.url}:`, payload);
  }

  async handleStreamError(error, changeEvent, streamConfig) {
    // Log stream processing errors
    const errorDoc = {
      stream_id: streamConfig.stream_id,
      error_type: 'stream_processing_error',
      error_message: error.message,
      change_event: changeEvent,
      timestamp: new Date()
    };

    await this.db.collection('stream_errors').insertOne(errorDoc);
  }

  async handleValidationFailure(validationResult, document, streamConfig) {
    // Handle business rule validation failures
    const failureDoc = {
      stream_id: streamConfig.stream_id,
      failure_type: 'validation_failure',
      validation_failures: validationResult.failures,
      validation_warnings: validationResult.warnings,
      document: document,
      timestamp: new Date()
    };

    await this.db.collection('validation_failures').insertOne(failureDoc);
  }

  async handleTargetLoadError(error, document, target, changeEvent) {
    // Handle target system loading errors
    const errorDoc = {
      target_system: target.name,
      error_type: 'target_load_error',
      error_message: error.message,
      document: document,
      change_event_id: changeEvent._id,
      timestamp: new Date()
    };

    await this.db.collection('target_load_errors').insertOne(errorDoc);
  }

  performLookup(document, config) {
    // Implement lookup logic
    return null; // Placeholder
  }

  performCalculation(document, config) {
    // Implement calculation logic
    return null; // Placeholder
  }

  performTextProcessing(document, config) {
    // Implement text processing logic
    return null; // Placeholder
  }

  performDateCalculation(document, config) {
    // Implement date calculation logic
    return null; // Placeholder
  }

  evaluateExpression(document, expression) {
    // Implement expression evaluation
    return null; // Placeholder
  }

  applyFieldMapping(document, mapping) {
    // Implement field mapping logic
    return document; // Placeholder
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, prop) => current?.[prop], obj);
  }

  setNestedValue(obj, path, value) {
    const props = path.split('.');
    const lastProp = props.pop();
    const target = props.reduce((current, prop) => {
      if (!current[prop]) current[prop] = {};
      return current[prop];
    }, obj);
    target[lastProp] = value;
  }

  async logStreamProcessing(metadata) {
    await this.db.collection('stream_processing_log').insertOne(metadata);
  }

  async stopStreamProcessing(streamId) {
    const changeStream = this.changeStreams.get(streamId);
    if (changeStream) {
      await changeStream.close();
      this.changeStreams.delete(streamId);
      console.log(`Stopped stream processing for ${streamId}`);
    }
  }
}

SQL-Style ETL with QueryLeaf

QueryLeaf provides familiar SQL-style ETL operations with MongoDB's powerful aggregation capabilities:

-- QueryLeaf ETL operations with SQL-familiar syntax

-- Data extraction with SQL-style filtering and projection
WITH extracted_data AS (
  SELECT 
    customer_id,
    email,
    first_name,
    last_name,
    registration_date,
    profile_data,
    last_login_date,
    CASE 
      WHEN profile_data->>'premium' = 'true' THEN 'premium'
      WHEN profile_data->>'verified' = 'true' THEN 'verified'
      ELSE 'basic'
    END AS customer_tier
  FROM raw_customers
  WHERE registration_date >= CURRENT_DATE - INTERVAL '30 days'
    AND email IS NOT NULL
    AND email LIKE '%@%.%'
)

-- Data transformation with complex calculations
, transformed_data AS (
  SELECT 
    customer_id,
    CONCAT(first_name, ' ', last_name) AS full_name,
    LOWER(email) AS normalized_email,
    SUBSTRING(email FROM POSITION('@' IN email) + 1) AS email_domain,
    DATE_TRUNC('month', registration_date) AS registration_month,
    customer_tier,

    -- Customer lifecycle calculations
    CASE 
      WHEN last_login_date >= CURRENT_DATE - INTERVAL '7 days' THEN 'active'
      WHEN last_login_date >= CURRENT_DATE - INTERVAL '30 days' THEN 'inactive'
      ELSE 'dormant'
    END AS activity_status,

    -- Age calculation
    EXTRACT(DAYS FROM (CURRENT_DATE - registration_date)) AS days_since_registration,

    -- JSON processing and nested field extraction
    JSON_EXTRACT(profile_data, '$.preferences.notifications') AS notification_preferences,
    ARRAY_LENGTH(JSON_EXTRACT(profile_data, '$.purchase_history')) AS purchase_count,

    -- Computed engagement score
    CASE 
      WHEN customer_tier = 'premium' THEN 100
      WHEN days_since_registration < 30 AND last_login_date >= CURRENT_DATE - INTERVAL '7 days' THEN 75
      WHEN customer_tier = 'verified' THEN 50
      ELSE 25
    END AS engagement_score,

    CURRENT_TIMESTAMP AS transformation_timestamp
  FROM extracted_data
)

-- Data aggregation and analytical computations
, aggregated_metrics AS (
  SELECT 
    email_domain,
    registration_month,
    customer_tier,
    activity_status,
    COUNT(*) as customer_count,
    AVG(engagement_score) as avg_engagement_score,
    COUNT(CASE WHEN activity_status = 'active' THEN 1 END) as active_customers,
    COUNT(CASE WHEN days_since_registration < 7 THEN 1 END) as new_customers,

    -- Advanced aggregations
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY engagement_score) as median_engagement,
    STDDEV(engagement_score) as engagement_variance,

    -- Array aggregations
    ARRAY_AGG(customer_id ORDER BY engagement_score DESC LIMIT 10) as top_customers,

    -- JSON aggregation for complex data structures
    JSON_OBJECT_AGG(
      customer_tier, 
      JSON_OBJECT(
        'count', COUNT(*),
        'avg_score', AVG(engagement_score)
      )
    ) as tier_metrics

  FROM transformed_data
  GROUP BY email_domain, registration_month, customer_tier, activity_status
  HAVING COUNT(*) >= 5  -- Filter out small groups
)

-- Final data loading with upsert semantics
INSERT INTO customer_analytics (
  email_domain,
  registration_month, 
  customer_tier,
  activity_status,
  customer_count,
  avg_engagement_score,
  active_customers,
  new_customers,
  median_engagement,
  engagement_variance,
  top_customers,
  tier_metrics,
  last_updated,
  etl_run_id
)
SELECT 
  email_domain,
  registration_month,
  customer_tier, 
  activity_status,
  customer_count,
  ROUND(avg_engagement_score, 2),
  active_customers,
  new_customers,
  ROUND(median_engagement, 2),
  ROUND(engagement_variance, 2),
  top_customers,
  tier_metrics,
  CURRENT_TIMESTAMP,
  'etl_run_' || EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
FROM aggregated_metrics
ON CONFLICT (email_domain, registration_month, customer_tier, activity_status)
DO UPDATE SET
  customer_count = EXCLUDED.customer_count,
  avg_engagement_score = EXCLUDED.avg_engagement_score,
  active_customers = EXCLUDED.active_customers,
  new_customers = EXCLUDED.new_customers,
  median_engagement = EXCLUDED.median_engagement,
  engagement_variance = EXCLUDED.engagement_variance,
  top_customers = EXCLUDED.top_customers,
  tier_metrics = EXCLUDED.tier_metrics,
  last_updated = EXCLUDED.last_updated,
  etl_run_id = EXCLUDED.etl_run_id;

-- Real-time streaming ETL with change data capture
CREATE OR REPLACE TRIGGER customer_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON customers
FOR EACH ROW
BEGIN
  -- Capture change event
  INSERT INTO customer_change_stream (
    change_type,
    customer_id,
    old_data,
    new_data,
    change_timestamp,
    change_sequence
  ) VALUES (
    TG_OP,  -- INSERT, UPDATE, or DELETE
    COALESCE(NEW.customer_id, OLD.customer_id),
    CASE WHEN TG_OP = 'DELETE' THEN ROW_TO_JSON(OLD) ELSE NULL END,
    CASE WHEN TG_OP != 'DELETE' THEN ROW_TO_JSON(NEW) ELSE NULL END,
    CURRENT_TIMESTAMP,
    nextval('change_sequence')
  );

  -- Trigger downstream processing
  PERFORM pg_notify('customer_changed', 
    JSON_BUILD_OBJECT(
      'operation', TG_OP,
      'customer_id', COALESCE(NEW.customer_id, OLD.customer_id),
      'timestamp', CURRENT_TIMESTAMP
    )::TEXT
  );
END;

-- Advanced window functions for trend analysis
WITH customer_trends AS (
  SELECT 
    customer_id,
    registration_date,
    engagement_score,

    -- Running totals and moving averages
    SUM(engagement_score) OVER (
      PARTITION BY email_domain 
      ORDER BY registration_date 
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_engagement,

    AVG(engagement_score) OVER (
      PARTITION BY customer_tier
      ORDER BY registration_date
      ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_engagement,

    -- Ranking and percentile calculations
    PERCENT_RANK() OVER (
      PARTITION BY registration_month 
      ORDER BY engagement_score
    ) as engagement_percentile,

    ROW_NUMBER() OVER (
      PARTITION BY email_domain, customer_tier 
      ORDER BY engagement_score DESC
    ) as tier_rank,

    -- Lead/Lag for sequential analysis
    LAG(engagement_score, 1) OVER (
      PARTITION BY customer_id 
      ORDER BY last_login_date
    ) as previous_engagement,

    -- First/Last value analytics
    FIRST_VALUE(engagement_score) OVER (
      PARTITION BY customer_id 
      ORDER BY registration_date 
      ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as initial_engagement

  FROM transformed_data
)

-- Pivot tables for cross-dimensional analysis
SELECT *
FROM (
  SELECT 
    registration_month,
    customer_tier,
    activity_status,
    customer_count
  FROM aggregated_metrics
) AS source_data
PIVOT (
  SUM(customer_count)
  FOR activity_status IN ('active', 'inactive', 'dormant')
) AS pivoted_activity;

-- Time-series analysis for trend detection
WITH monthly_trends AS (
  SELECT 
    DATE_TRUNC('month', registration_date) as month,
    customer_tier,
    COUNT(*) as registrations,
    AVG(engagement_score) as avg_engagement,

    -- Year-over-year comparisons
    LAG(COUNT(*), 12) OVER (
      PARTITION BY customer_tier 
      ORDER BY DATE_TRUNC('month', registration_date)
    ) as registrations_year_ago,

    -- Growth rate calculations
    CASE 
      WHEN LAG(COUNT(*), 1) OVER (
        PARTITION BY customer_tier 
        ORDER BY DATE_TRUNC('month', registration_date)
      ) > 0 THEN
        ROUND(
          ((COUNT(*)::FLOAT / LAG(COUNT(*), 1) OVER (
            PARTITION BY customer_tier 
            ORDER BY DATE_TRUNC('month', registration_date)
          )) - 1) * 100, 2
        )
      ELSE NULL
    END as month_over_month_growth

  FROM customers
  WHERE registration_date >= CURRENT_DATE - INTERVAL '24 months'
  GROUP BY DATE_TRUNC('month', registration_date), customer_tier
)

-- Data quality monitoring and validation
, quality_metrics AS (
  SELECT 
    'customers' as table_name,
    COUNT(*) as total_records,
    COUNT(CASE WHEN email IS NULL OR email = '' THEN 1 END) as missing_email,
    COUNT(CASE WHEN first_name IS NULL OR first_name = '' THEN 1 END) as missing_first_name,
    COUNT(CASE WHEN email !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 1 END) as invalid_email,
    COUNT(CASE WHEN registration_date > CURRENT_DATE THEN 1 END) as future_registration,

    -- Data completeness percentage
    ROUND(
      (COUNT(*) - COUNT(CASE WHEN email IS NULL OR email = '' THEN 1 END)) * 100.0 / COUNT(*), 
      2
    ) as email_completeness_pct,

    -- Data freshness
    MAX(registration_date) as latest_registration,
    MIN(registration_date) as earliest_registration,
    EXTRACT(DAYS FROM (CURRENT_DATE - MAX(registration_date))) as days_since_last_record

  FROM customers
)

-- Output final results with data lineage tracking
SELECT 
  m.*,
  q.total_records,
  q.email_completeness_pct,
  q.days_since_last_record,
  'etl_pipeline_v2.1' as pipeline_version,
  CURRENT_TIMESTAMP as processed_at
FROM monthly_trends m
CROSS JOIN quality_metrics q
ORDER BY m.month DESC, m.customer_tier;

-- QueryLeaf automatically handles:
-- 1. MongoDB aggregation pipeline generation for complex SQL operations
-- 2. Nested document processing and JSON operations
-- 3. Change stream integration for real-time ETL
-- 4. Parallel processing and optimization for large datasets
-- 5. Error handling and data validation
-- 6. Integration with external systems and data formats

ETL Monitoring and Management

Performance Monitoring

Implement comprehensive ETL monitoring:

// ETL monitoring and performance tracking service
class ETLMonitoringService {
  constructor(db) {
    this.db = db;
    this.metricsCollection = db.collection('etl_metrics');
    this.alertsCollection = db.collection('etl_alerts');
    this.performanceCollection = db.collection('etl_performance');
  }

  async trackPipelineExecution(pipelineId, executionMetadata) {
    const metrics = {
      pipeline_id: pipelineId,
      execution_id: executionMetadata.execution_id,
      start_time: executionMetadata.start_time,
      end_time: executionMetadata.end_time,
      duration_ms: executionMetadata.end_time - executionMetadata.start_time,
      status: executionMetadata.status,

      // Data volume metrics
      records_extracted: executionMetadata.records_extracted || 0,
      records_transformed: executionMetadata.records_transformed || 0,
      records_loaded: executionMetadata.records_loaded || 0,
      records_failed: executionMetadata.records_failed || 0,

      // Performance metrics
      extraction_duration_ms: executionMetadata.extraction_duration || 0,
      transformation_duration_ms: executionMetadata.transformation_duration || 0,
      loading_duration_ms: executionMetadata.loading_duration || 0,

      // Resource utilization
      memory_peak_mb: executionMetadata.memory_peak || 0,
      cpu_usage_percent: executionMetadata.cpu_usage || 0,
      disk_io_mb: executionMetadata.disk_io || 0,

      // Error information
      error_count: executionMetadata.errors?.length || 0,
      error_details: executionMetadata.errors || [],

      timestamp: new Date()
    };

    await this.metricsCollection.insertOne(metrics);

    // Check for performance anomalies
    await this.checkPerformanceAlerts(pipelineId, metrics);

    return metrics;
  }

  async generatePipelineReport(pipelineId, timeRange = 7) {
    const sinceDate = new Date(Date.now() - timeRange * 24 * 60 * 60 * 1000);

    const reportPipeline = [
      {
        $match: {
          pipeline_id: pipelineId,
          timestamp: { $gte: sinceDate }
        }
      },
      {
        $group: {
          _id: {
            date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
          },

          // Execution metrics
          total_executions: { $sum: 1 },
          successful_executions: {
            $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          failed_executions: {
            $sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
          },

          // Duration statistics
          avg_duration_ms: { $avg: "$duration_ms" },
          max_duration_ms: { $max: "$duration_ms" },
          min_duration_ms: { $min: "$duration_ms" },

          // Data volume statistics
          total_records_processed: { $sum: "$records_transformed" },
          avg_records_per_execution: { $avg: "$records_transformed" },

          // Performance metrics
          avg_memory_usage_mb: { $avg: "$memory_peak_mb" },
          avg_cpu_usage: { $avg: "$cpu_usage_percent" },

          // Error analysis
          total_errors: { $sum: "$error_count" },
          unique_error_types: { $addToSet: "$error_details.error_type" }
        }
      },
      {
        $addFields: {
          success_rate: {
            $round: [
              { $multiply: [
                { $divide: ["$successful_executions", "$total_executions"] },
                100
              ]},
              2
            ]
          },
          throughput_records_per_minute: {
            $round: [
              { $divide: [
                "$avg_records_per_execution",
                { $divide: ["$avg_duration_ms", 60000] }
              ]},
              0
            ]
          }
        }
      },
      {
        $sort: { "_id.date": -1 }
      }
    ];

    const dailyMetrics = await this.metricsCollection.aggregate(reportPipeline).toArray();

    // Generate overall statistics
    const overallStats = await this.generateOverallStatistics(pipelineId, sinceDate);

    // Generate trend analysis
    const trendAnalysis = await this.generateTrendAnalysis(pipelineId, sinceDate);

    return {
      pipeline_id: pipelineId,
      report_period_days: timeRange,
      generated_at: new Date(),
      daily_metrics: dailyMetrics,
      overall_statistics: overallStats,
      trend_analysis: trendAnalysis,
      recommendations: await this.generateRecommendations(pipelineId, dailyMetrics, overallStats)
    };
  }

  async generateOverallStatistics(pipelineId, sinceDate) {
    const statsPipeline = [
      {
        $match: {
          pipeline_id: pipelineId,
          timestamp: { $gte: sinceDate }
        }
      },
      {
        $group: {
          _id: null,
          total_executions: { $sum: 1 },
          successful_executions: {
            $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          failed_executions: {
            $sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
          },
          avg_duration_minutes: {
            $avg: { $divide: ["$duration_ms", 60000] }
          },
          total_records_processed: { $sum: "$records_transformed" },
          total_errors: { $sum: "$error_count" },

          // Performance percentiles
          duration_p50: { $percentile: { input: "$duration_ms", p: [0.5], method: 'approximate' } },
          duration_p95: { $percentile: { input: "$duration_ms", p: [0.95], method: 'approximate' } },
          duration_p99: { $percentile: { input: "$duration_ms", p: [0.99], method: 'approximate' } },

          memory_p95: { $percentile: { input: "$memory_peak_mb", p: [0.95], method: 'approximate' } },

          // First and last execution
          first_execution: { $min: "$timestamp" },
          last_execution: { $max: "$timestamp" }
        }
      },
      {
        $addFields: {
          success_rate: {
            $round: [
              { $multiply: [
                { $divide: ["$successful_executions", "$total_executions"] },
                100
              ]},
              2
            ]
          },
          avg_throughput_records_per_hour: {
            $round: [
              { $divide: [
                "$total_records_processed",
                { $divide: [
                  { $subtract: ["$last_execution", "$first_execution"] },
                  3600000  // Convert ms to hours
                ]}
              ]},
              0
            ]
          },
          error_rate: {
            $round: [
              { $multiply: [
                { $divide: ["$failed_executions", "$total_executions"] },
                100
              ]},
              2
            ]
          }
        }
      }
    ];

    const stats = await this.metricsCollection.aggregate(statsPipeline).toArray();
    return stats[0] || {};
  }

  async generateTrendAnalysis(pipelineId, sinceDate) {
    const trendPipeline = [
      {
        $match: {
          pipeline_id: pipelineId,
          timestamp: { $gte: sinceDate }
        }
      },
      {
        $group: {
          _id: {
            week: { $week: "$timestamp" },
            year: { $year: "$timestamp" }
          },
          avg_duration: { $avg: "$duration_ms" },
          avg_records_processed: { $avg: "$records_transformed" },
          success_rate: {
            $avg: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          execution_count: { $sum: 1 }
        }
      },
      {
        $sort: { "_id.year": 1, "_id.week": 1 }
      },
      {
        $group: {
          _id: null,
          weekly_data: {
            $push: {
              week: "$_id.week",
              year: "$_id.year",
              avg_duration: "$avg_duration",
              avg_records: "$avg_records_processed",
              success_rate: "$success_rate",
              execution_count: "$execution_count"
            }
          }
        }
      },
      {
        $addFields: {
          // Calculate trends using linear regression approximation
          duration_trend: {
            $let: {
              vars: {
                n: { $size: "$weekly_data" },
                data: "$weekly_data"
              },
              in: {
                $cond: {
                  if: { $gte: ["$$n", 3] },
                  then: {
                    // Simple trend calculation (last value vs first value)
                    $subtract: [
                      { $arrayElemAt: ["$$data.avg_duration", -1] },
                      { $arrayElemAt: ["$$data.avg_duration", 0] }
                    ]
                  },
                  else: null
                }
              }
            }
          },
          performance_trend: {
            $let: {
              vars: {
                n: { $size: "$weekly_data" },
                data: "$weekly_data"
              },
              in: {
                $cond: {
                  if: { $gte: ["$$n", 3] },
                  then: {
                    $subtract: [
                      { $arrayElemAt: ["$$data.avg_records", -1] },
                      { $arrayElemAt: ["$$data.avg_records", 0] }
                    ]
                  },
                  else: null
                }
              }
            }
          }
        }
      }
    ];

    const trendData = await this.metricsCollection.aggregate(trendPipeline).toArray();
    return trendData[0] || { weekly_data: [], duration_trend: null, performance_trend: null };
  }

  async generateRecommendations(pipelineId, dailyMetrics, overallStats) {
    const recommendations = [];

    // Performance recommendations
    if (overallStats.success_rate < 95) {
      recommendations.push({
        type: 'reliability',
        priority: 'high',
        message: `Pipeline success rate is ${overallStats.success_rate}%. Consider implementing error handling and retry logic.`,
        suggested_actions: [
          'Add retry mechanisms for transient failures',
          'Implement circuit breakers for external dependencies',
          'Add validation checks for input data quality'
        ]
      });
    }

    if (overallStats.avg_duration_minutes > 60) {
      recommendations.push({
        type: 'performance',
        priority: 'medium',
        message: `Average execution time is ${Math.round(overallStats.avg_duration_minutes)} minutes. Consider optimization.`,
        suggested_actions: [
          'Implement parallel processing for transformation steps',
          'Optimize database queries and indexing',
          'Consider breaking pipeline into smaller, concurrent jobs'
        ]
      });
    }

    // Resource utilization recommendations
    if (overallStats.memory_p95 > 8000) {  // 8GB
      recommendations.push({
        type: 'resource',
        priority: 'medium',
        message: `Memory usage is high (95th percentile: ${Math.round(overallStats.memory_p95)}MB).`,
        suggested_actions: [
          'Implement streaming processing for large datasets',
          'Optimize data structures and reduce memory footprint',
          'Consider increasing available memory resources'
        ]
      });
    }

    // Data quality recommendations
    if (overallStats.error_rate > 5) {
      recommendations.push({
        type: 'data_quality',
        priority: 'high',
        message: `High error rate detected (${overallStats.error_rate}%).`,
        suggested_actions: [
          'Implement comprehensive data validation',
          'Add data profiling to identify quality issues',
          'Set up data quality monitoring dashboards'
        ]
      });
    }

    // Operational recommendations
    const recentFailures = dailyMetrics.filter(day => day.failed_executions > 0).length;
    if (recentFailures > dailyMetrics.length * 0.3) {
      recommendations.push({
        type: 'operational',
        priority: 'high',
        message: `Frequent failures detected in ${recentFailures} of the last ${dailyMetrics.length} days.`,
        suggested_actions: [
          'Set up real-time alerting for pipeline failures',
          'Implement automated recovery procedures',
          'Schedule regular pipeline health checks'
        ]
      });
    }

    return recommendations;
  }

  async checkPerformanceAlerts(pipelineId, currentMetrics) {
    // Define alert thresholds
    const alertThresholds = await this.getAlertThresholds(pipelineId);

    const alerts = [];

    // Duration alert
    if (currentMetrics.duration_ms > alertThresholds.max_duration_ms) {
      alerts.push({
        type: 'performance_degradation',
        severity: 'warning',
        message: `Pipeline execution took ${currentMetrics.duration_ms}ms, exceeding threshold of ${alertThresholds.max_duration_ms}ms`,
        metric_value: currentMetrics.duration_ms,
        threshold: alertThresholds.max_duration_ms
      });
    }

    // Failure rate alert
    if (currentMetrics.status === 'failed') {
      const recentFailures = await this.getRecentFailureCount(pipelineId, 24); // Last 24 hours
      if (recentFailures >= alertThresholds.max_failures_per_day) {
        alerts.push({
          type: 'high_failure_rate',
          severity: 'critical',
          message: `Pipeline has failed ${recentFailures} times in the last 24 hours`,
          metric_value: recentFailures,
          threshold: alertThresholds.max_failures_per_day
        });
      }
    }

    // Memory usage alert
    if (currentMetrics.memory_peak_mb > alertThresholds.max_memory_mb) {
      alerts.push({
        type: 'high_memory_usage',
        severity: 'warning',
        message: `Memory usage peaked at ${currentMetrics.memory_peak_mb}MB, exceeding threshold`,
        metric_value: currentMetrics.memory_peak_mb,
        threshold: alertThresholds.max_memory_mb
      });
    }

    // Data volume anomaly
    if (currentMetrics.records_transformed < alertThresholds.min_records_expected * 0.5) {
      alerts.push({
        type: 'low_data_volume',
        severity: 'warning',
        message: `Processed only ${currentMetrics.records_transformed} records, significantly below expected ${alertThresholds.min_records_expected}`,
        metric_value: currentMetrics.records_transformed,
        threshold: alertThresholds.min_records_expected
      });
    }

    // Store alerts
    for (const alert of alerts) {
      const alertDoc = {
        ...alert,
        pipeline_id: pipelineId,
        execution_id: currentMetrics.execution_id,
        timestamp: new Date(),
        status: 'active'
      };

      await this.alertsCollection.insertOne(alertDoc);

      // Send notifications
      await this.sendAlertNotification(alertDoc);
    }

    return alerts;
  }

  async getAlertThresholds(pipelineId) {
    // Get baseline performance metrics for threshold calculation
    const baseline = await this.metricsCollection.aggregate([
      {
        $match: {
          pipeline_id: pipelineId,
          status: 'completed',
          timestamp: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
        }
      },
      {
        $group: {
          _id: null,
          avg_duration: { $avg: "$duration_ms" },
          p95_duration: { $percentile: { input: "$duration_ms", p: [0.95], method: 'approximate' } },
          avg_memory: { $avg: "$memory_peak_mb" },
          p95_memory: { $percentile: { input: "$memory_peak_mb", p: [0.95], method: 'approximate' } },
          avg_records: { $avg: "$records_transformed" }
        }
      }
    ]).toArray();

    if (baseline.length === 0) {
      // Default thresholds for new pipelines
      return {
        max_duration_ms: 3600000,  // 1 hour
        max_memory_mb: 4000,       // 4GB
        max_failures_per_day: 3,
        min_records_expected: 1000
      };
    }

    const stats = baseline[0];
    return {
      max_duration_ms: Math.max(stats.p95_duration * 1.5, stats.avg_duration * 2),
      max_memory_mb: Math.max(stats.p95_memory * 1.5, stats.avg_memory * 2),
      max_failures_per_day: 3,
      min_records_expected: Math.max(stats.avg_records * 0.1, 100)
    };
  }

  async getRecentFailureCount(pipelineId, hours) {
    const since = new Date(Date.now() - hours * 60 * 60 * 1000);
    const result = await this.metricsCollection.countDocuments({
      pipeline_id: pipelineId,
      status: 'failed',
      timestamp: { $gte: since }
    });

    return result;
  }

  async sendAlertNotification(alert) {
    // Implement notification logic (email, Slack, PagerDuty, etc.)
    console.log('ALERT:', alert.message);

    // This would integrate with actual notification services
    // Example: await this.slackService.sendAlert(alert);
    // Example: await this.emailService.sendAlert(alert);
  }

  async getDashboardMetrics(pipelineIds = [], timeRange = 24) {
    const sinceDate = new Date(Date.now() - timeRange * 60 * 60 * 1000);

    const matchStage = {
      timestamp: { $gte: sinceDate }
    };

    if (pipelineIds.length > 0) {
      matchStage.pipeline_id = { $in: pipelineIds };
    }

    const dashboardPipeline = [
      { $match: matchStage },
      {
        $group: {
          _id: {
            pipeline_id: "$pipeline_id",
            hour: { $dateToString: { format: "%Y-%m-%d %H:00", date: "$timestamp" } }
          },
          executions: { $sum: 1 },
          successes: {
            $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          failures: {
            $sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
          },
          avg_duration: { $avg: "$duration_ms" },
          total_records: { $sum: "$records_transformed" },
          total_errors: { $sum: "$error_count" }
        }
      },
      {
        $group: {
          _id: "$_id.pipeline_id",
          hourly_metrics: {
            $push: {
              hour: "$_id.hour",
              executions: "$executions",
              success_rate: {
                $round: [
                  { $multiply: [{ $divide: ["$successes", "$executions"] }, 100] },
                  1
                ]
              },
              avg_duration_minutes: { $round: [{ $divide: ["$avg_duration", 60000] }, 1] },
              total_records: "$total_records",
              total_errors: "$total_errors"
            }
          },
          total_executions: { $sum: "$executions" },
          overall_success_rate: {
            $round: [
              { $multiply: [
                { $divide: [{ $sum: "$successes" }, { $sum: "$executions" }] },
                100
              ]},
              1
            ]
          },
          total_records_processed: { $sum: "$total_records" },
          total_errors: { $sum: "$total_errors" }
        }
      },
      { $sort: { "_id": 1 } }
    ];

    const dashboardData = await this.metricsCollection.aggregate(dashboardPipeline).toArray();

    return {
      time_range_hours: timeRange,
      generated_at: new Date(),
      pipeline_metrics: dashboardData
    };
  }
}

Best Practices for ETL Implementation

ETL Design Principles

Essential guidelines for building robust ETL pipelines:

  1. Idempotency: Design pipelines that can be safely re-run without side effects
  2. Error Handling: Implement comprehensive error handling and recovery mechanisms
  3. Data Validation: Validate data quality at every stage of the pipeline
  4. Monitoring: Track performance, data quality, and operational metrics
  5. Scalability: Design for horizontal scaling and parallel processing
  6. Documentation: Maintain clear documentation of data transformations and business logic

Performance Optimization

Optimize ETL pipeline performance:

  1. Parallel Processing: Use MongoDB's aggregation framework for concurrent data processing
  2. Incremental Loading: Process only changed data to reduce processing time
  3. Index Optimization: Create appropriate indexes for extraction and lookup operations
  4. Batch Size Tuning: Optimize batch sizes for memory and throughput balance
  5. Resource Management: Monitor and optimize CPU, memory, and I/O utilization
  6. Caching: Cache frequently accessed reference data and transformation results

QueryLeaf ETL Integration

QueryLeaf enables familiar SQL-style ETL development while leveraging MongoDB's powerful aggregation capabilities. This integration provides teams with the flexibility to implement complex data transformations using familiar SQL patterns while benefiting from MongoDB's document-oriented storage and processing advantages.

Key QueryLeaf ETL benefits include:

  • SQL Familiarity: Write ETL logic using familiar SQL syntax and patterns
  • MongoDB Performance: Leverage MongoDB's high-performance aggregation pipeline
  • Flexible Schema: Handle semi-structured and evolving data schemas effortlessly
  • Real-Time Processing: Integrate change streams for real-time ETL processing
  • Scalable Architecture: Build ETL pipelines that scale horizontally with data growth

Whether you're migrating from traditional ETL tools or building new data processing workflows, MongoDB with QueryLeaf's SQL interface provides a powerful foundation for modern ETL architectures that can handle the complexity and scale requirements of contemporary data environments.

Conclusion

MongoDB ETL and data pipeline processing capabilities provide enterprise-grade data transformation and processing infrastructure that addresses the challenges of modern data workflows. Combined with QueryLeaf's familiar SQL interface, MongoDB enables teams to build sophisticated ETL pipelines while preserving development patterns and query approaches they already understand.

The combination of MongoDB's flexible document model, powerful aggregation framework, and real-time change streams creates an ideal platform for handling diverse data sources, complex transformations, and scalable processing requirements. QueryLeaf's SQL-style syntax makes these capabilities accessible to broader development teams while maintaining the performance and flexibility advantages of MongoDB's native architecture.

Whether you're building batch ETL processes, real-time streaming pipelines, or hybrid architectures, MongoDB with QueryLeaf provides the tools and patterns necessary to implement robust, scalable, and maintainable data processing solutions that can adapt and evolve with your organization's growing data requirements.

MongoDB Change Data Capture and Event-Driven Architecture: Real-Time Data Processing and System Integration

Modern distributed systems require real-time data synchronization and event-driven communication to maintain consistency across microservices, trigger automated workflows, and enable responsive user experiences. Traditional databases provide limited change capture capabilities that require complex polling mechanisms, trigger-based solutions, or external tools that add significant operational overhead and latency to data processing pipelines.

MongoDB Change Data Capture through Change Streams provides native, real-time monitoring of database changes that enables building sophisticated event-driven architectures without external dependencies. Unlike traditional databases that require complex trigger setups or third-party CDC tools, MongoDB's Change Streams deliver ordered, resumable streams of data changes that can power real-time analytics, data synchronization, and reactive application architectures.

The Traditional Change Detection Challenge

Implementing change detection and event-driven patterns in traditional databases requires complex infrastructure:

-- Traditional PostgreSQL change detection - complex trigger-based approach

-- Change tracking table for audit and CDC
CREATE TABLE data_change_log (
    change_id SERIAL PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    operation_type VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
    record_id UUID NOT NULL,
    old_values JSONB,
    new_values JSONB,
    changed_columns TEXT[],
    change_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    user_context JSONB,
    transaction_id BIGINT,

    -- CDC processing metadata
    processed BOOLEAN DEFAULT FALSE,
    processed_at TIMESTAMP,
    processing_errors TEXT[],
    retry_count INTEGER DEFAULT 0,

    -- Event routing information
    event_type VARCHAR(100),
    event_source VARCHAR(100),
    correlation_id UUID
);

-- Complex trigger function for change capture
CREATE OR REPLACE FUNCTION capture_table_changes() 
RETURNS TRIGGER AS $$
DECLARE
    old_record JSONB := '{}';
    new_record JSONB := '{}';
    changed_cols TEXT[] := '{}';
    col_name TEXT;
    event_type_val VARCHAR(100);
    correlation_id_val UUID;
BEGIN
    -- Determine operation type and build change record
    IF TG_OP = 'DELETE' THEN
        old_record := row_to_json(OLD)::JSONB;
        event_type_val := TG_TABLE_NAME || '_deleted';

        -- Extract correlation ID from old record if available
        correlation_id_val := (old_record->>'correlation_id')::UUID;

        INSERT INTO data_change_log (
            table_name, operation_type, record_id, old_values, 
            event_type, event_source, correlation_id, transaction_id
        ) VALUES (
            TG_TABLE_NAME, 'DELETE', (old_record->>'id')::UUID, old_record,
            event_type_val, 'database_trigger', correlation_id_val, txid_current()
        );

        RETURN OLD;

    ELSIF TG_OP = 'UPDATE' THEN
        old_record := row_to_json(OLD)::JSONB;
        new_record := row_to_json(NEW)::JSONB;
        event_type_val := TG_TABLE_NAME || '_updated';

        -- Identify changed columns
        FOR col_name IN 
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_name = TG_TABLE_NAME 
                AND table_schema = TG_TABLE_SCHEMA
        LOOP
            IF (old_record->>col_name) IS DISTINCT FROM (new_record->>col_name) THEN
                changed_cols := array_append(changed_cols, col_name);
            END IF;
        END LOOP;

        -- Only log if there are actual changes
        IF array_length(changed_cols, 1) > 0 THEN
            correlation_id_val := COALESCE(
                (new_record->>'correlation_id')::UUID,
                (old_record->>'correlation_id')::UUID
            );

            INSERT INTO data_change_log (
                table_name, operation_type, record_id, old_values, new_values,
                changed_columns, event_type, event_source, correlation_id, transaction_id
            ) VALUES (
                TG_TABLE_NAME, 'UPDATE', (new_record->>'id')::UUID, old_record, new_record,
                changed_cols, event_type_val, 'database_trigger', correlation_id_val, txid_current()
            );
        END IF;

        RETURN NEW;

    ELSIF TG_OP = 'INSERT' THEN
        new_record := row_to_json(NEW)::JSONB;
        event_type_val := TG_TABLE_NAME || '_created';
        correlation_id_val := (new_record->>'correlation_id')::UUID;

        INSERT INTO data_change_log (
            table_name, operation_type, record_id, new_values,
            event_type, event_source, correlation_id, transaction_id
        ) VALUES (
            TG_TABLE_NAME, 'INSERT', (new_record->>'id')::UUID, new_record,
            event_type_val, 'database_trigger', correlation_id_val, txid_current()
        );

        RETURN NEW;
    END IF;

    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Apply triggers to tables that need change tracking
CREATE TRIGGER users_change_trigger
    AFTER INSERT OR UPDATE OR DELETE ON users
    FOR EACH ROW EXECUTE FUNCTION capture_table_changes();

CREATE TRIGGER orders_change_trigger
    AFTER INSERT OR UPDATE OR DELETE ON orders
    FOR EACH ROW EXECUTE FUNCTION capture_table_changes();

CREATE TRIGGER products_change_trigger
    AFTER INSERT OR UPDATE OR DELETE ON products
    FOR EACH ROW EXECUTE FUNCTION capture_table_changes();

-- Complex change processing and event dispatch
CREATE OR REPLACE FUNCTION process_pending_changes()
RETURNS INTEGER AS $$
DECLARE
    change_record RECORD;
    processed_count INTEGER := 0;
    event_payload JSONB;
    webhook_url TEXT;
    http_response INTEGER;
    max_retries INTEGER := 3;
BEGIN
    -- Process unprocessed changes in chronological order
    FOR change_record IN 
        SELECT * FROM data_change_log 
        WHERE processed = FALSE 
            AND retry_count < max_retries
        ORDER BY change_timestamp ASC
        LIMIT 1000 -- Process in batches
    LOOP
        BEGIN
            -- Build event payload for external systems
            event_payload := jsonb_build_object(
                'eventId', change_record.change_id,
                'eventType', change_record.event_type,
                'eventSource', change_record.event_source,
                'eventTime', change_record.change_timestamp,
                'correlationId', change_record.correlation_id,
                'data', jsonb_build_object(
                    'tableName', change_record.table_name,
                    'operationType', change_record.operation_type,
                    'recordId', change_record.record_id,
                    'oldValues', change_record.old_values,
                    'newValues', change_record.new_values,
                    'changedColumns', change_record.changed_columns
                ),
                'metadata', jsonb_build_object(
                    'transactionId', change_record.transaction_id,
                    'processingAttempt', change_record.retry_count + 1,
                    'processingTime', CURRENT_TIMESTAMP
                )
            );

            -- Route events based on event type (simplified webhook example)
            webhook_url := CASE 
                WHEN change_record.event_type LIKE '%_user_%' THEN 'http://user-service/api/events'
                WHEN change_record.event_type LIKE '%_order_%' THEN 'http://order-service/api/events'
                WHEN change_record.event_type LIKE '%_product_%' THEN 'http://catalog-service/api/events'
                ELSE 'http://default-event-handler/api/events'
            END;

            -- Simulate HTTP webhook call (would use actual HTTP extension in practice)
            -- SELECT http_post(webhook_url, event_payload::TEXT, 'application/json') INTO http_response;
            http_response := 200; -- Simulated success

            IF http_response BETWEEN 200 AND 299 THEN
                -- Mark as successfully processed
                UPDATE data_change_log 
                SET processed = TRUE,
                    processed_at = CURRENT_TIMESTAMP,
                    processing_errors = NULL
                WHERE change_id = change_record.change_id;

                processed_count := processed_count + 1;
            ELSE
                -- Record processing failure
                UPDATE data_change_log 
                SET retry_count = retry_count + 1,
                    processing_errors = array_append(
                        COALESCE(processing_errors, '{}'), 
                        'HTTP ' || http_response || ' at ' || CURRENT_TIMESTAMP
                    )
                WHERE change_id = change_record.change_id;
            END IF;

        EXCEPTION WHEN OTHERS THEN
            -- Record processing exception
            UPDATE data_change_log 
            SET retry_count = retry_count + 1,
                processing_errors = array_append(
                    COALESCE(processing_errors, '{}'), 
                    'Exception: ' || SQLERRM || ' at ' || CURRENT_TIMESTAMP
                )
            WHERE change_id = change_record.change_id;
        END;
    END LOOP;

    RETURN processed_count;
END;
$$ LANGUAGE plpgsql;

-- Scheduled job to process changes (requires external cron setup)
-- */5 * * * * psql -d production -c "SELECT process_pending_changes();"

-- Complex monitoring for change processing pipeline
SELECT 
    table_name,
    operation_type,
    event_type,

    -- Processing statistics
    COUNT(*) as total_changes,
    COUNT(*) FILTER (WHERE processed = TRUE) as processed_changes,
    COUNT(*) FILTER (WHERE processed = FALSE) as pending_changes,
    COUNT(*) FILTER (WHERE retry_count >= 3) as failed_changes,

    -- Performance metrics
    AVG(EXTRACT(EPOCH FROM (processed_at - change_timestamp))) as avg_processing_latency_seconds,
    MAX(EXTRACT(EPOCH FROM (processed_at - change_timestamp))) as max_processing_latency_seconds,

    -- Error analysis
    COUNT(*) FILTER (WHERE processing_errors IS NOT NULL) as changes_with_errors,
    AVG(retry_count) as avg_retry_count,

    -- Time-based analysis
    MIN(change_timestamp) as oldest_change,
    MAX(change_timestamp) as newest_change,

    -- Health indicators
    ROUND(
        COUNT(*) FILTER (WHERE processed = TRUE)::DECIMAL / COUNT(*) * 100, 
        2
    ) as success_rate_percent

FROM data_change_log
WHERE change_timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY table_name, operation_type, event_type
ORDER BY total_changes DESC;

-- Problems with traditional change data capture:
-- 1. Complex trigger infrastructure requiring careful maintenance and testing
-- 2. Performance overhead from trigger execution on every database operation  
-- 3. Manual event routing and delivery logic with limited reliability guarantees
-- 4. Difficulty handling high-throughput scenarios without impacting database performance
-- 5. Complex error handling and retry logic for failed event deliveries
-- 6. Limited ordering guarantees for related changes across multiple tables
-- 7. Challenges with transaction boundaries and event atomicity
-- 8. Manual setup and maintenance of change processing infrastructure
-- 9. Limited scalability for high-volume change streams
-- 10. Complex monitoring and alerting for change processing pipeline health

MongoDB provides native Change Data Capture through Change Streams with real-time event processing:

// MongoDB Change Data Capture - native real-time event-driven architecture
const { MongoClient } = require('mongodb');

// Advanced MongoDB Change Data Capture Manager
class MongoChangeDataCaptureManager {
  constructor() {
    this.client = null;
    this.db = null;
    this.changeStreams = new Map();
    this.eventHandlers = new Map();
    this.processingMetrics = new Map();
    this.eventQueue = [];
    this.isProcessing = false;
  }

  async initialize() {
    console.log('Initializing MongoDB Change Data Capture Manager...');

    // Connect with optimized settings for change streams
    this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://localhost:27017', {
      // Replica set required for change streams
      replicaSet: process.env.MONGODB_REPLICA_SET || 'rs0',

      // Connection pool settings for change streams
      maxPoolSize: 20,
      minPoolSize: 5,
      maxIdleTimeMS: 60000,

      // Read preferences for change streams
      readPreference: 'primary',
      readConcern: { level: 'majority' },

      // Write concern for reliable change stream processing
      writeConcern: { w: 'majority', j: true },

      // Compression for change stream data
      compressors: ['zlib'],

      appName: 'ChangeDataCaptureManager'
    });

    await this.client.connect();
    this.db = this.client.db('ecommerce');

    // Initialize event handlers and change stream configurations
    await this.setupEventHandlers();
    await this.initializeChangeStreams();

    console.log('✅ MongoDB Change Data Capture Manager initialized');
  }

  async setupEventHandlers() {
    console.log('Setting up event handlers for different change types...');

    // User-related event handlers
    this.eventHandlers.set('user_created', async (changeEvent) => {
      await this.handleUserCreated(changeEvent);
    });

    this.eventHandlers.set('user_updated', async (changeEvent) => {
      await this.handleUserUpdated(changeEvent);
    });

    this.eventHandlers.set('user_deleted', async (changeEvent) => {
      await this.handleUserDeleted(changeEvent);
    });

    // Order-related event handlers
    this.eventHandlers.set('order_created', async (changeEvent) => {
      await this.handleOrderCreated(changeEvent);
    });

    this.eventHandlers.set('order_status_updated', async (changeEvent) => {
      await this.handleOrderStatusUpdated(changeEvent);
    });

    this.eventHandlers.set('order_cancelled', async (changeEvent) => {
      await this.handleOrderCancelled(changeEvent);
    });

    // Product catalog event handlers
    this.eventHandlers.set('product_created', async (changeEvent) => {
      await this.handleProductCreated(changeEvent);
    });

    this.eventHandlers.set('product_updated', async (changeEvent) => {
      await this.handleProductUpdated(changeEvent);
    });

    this.eventHandlers.set('inventory_updated', async (changeEvent) => {
      await this.handleInventoryUpdated(changeEvent);
    });

    console.log('✅ Event handlers configured');
  }

  async initializeChangeStreams() {
    console.log('Initializing MongoDB change streams...');

    // Watch users collection for account-related events
    await this.createChangeStream('users', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processUserChanges.bind(this));

    // Watch orders collection for order lifecycle events
    await this.createChangeStream('orders', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processOrderChanges.bind(this));

    // Watch products collection for catalog changes
    await this.createChangeStream('products', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processProductChanges.bind(this));

    // Watch inventory collection for stock changes
    await this.createChangeStream('inventory', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processInventoryChanges.bind(this));

    console.log('✅ Change streams initialized and watching for changes');
  }

  async createChangeStream(collectionName, options, changeHandler) {
    try {
      const collection = this.db.collection(collectionName);
      const changeStream = collection.watch([], options);

      // Store change stream for management
      this.changeStreams.set(collectionName, {
        stream: changeStream,
        collection: collectionName,
        options: options,
        handler: changeHandler,
        createdAt: new Date(),
        isActive: true,
        errorCount: 0,
        lastError: null,
        processedEvents: 0
      });

      // Set up change event processing
      changeStream.on('change', async (changeDoc) => {
        try {
          await changeHandler(changeDoc);

          // Update metrics
          const streamInfo = this.changeStreams.get(collectionName);
          streamInfo.processedEvents++;
          streamInfo.lastProcessedAt = new Date();

        } catch (error) {
          console.error(`Error processing change for ${collectionName}:`, error);
          this.recordStreamError(collectionName, error);
        }
      });

      // Handle stream errors
      changeStream.on('error', (error) => {
        console.error(`Change stream error for ${collectionName}:`, error);
        this.recordStreamError(collectionName, error);
        this.handleStreamError(collectionName, error);
      });

      // Handle stream close
      changeStream.on('close', () => {
        console.warn(`Change stream closed for ${collectionName}`);
        const streamInfo = this.changeStreams.get(collectionName);
        if (streamInfo) {
          streamInfo.isActive = false;
          streamInfo.closedAt = new Date();
        }
      });

      console.log(`✅ Change stream created for collection: ${collectionName}`);

    } catch (error) {
      console.error(`Error creating change stream for ${collectionName}:`, error);
      throw error;
    }
  }

  async processUserChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    // Build standardized event object
    const event = {
      eventId: changeDoc._id,
      eventType: `user_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      // Document information
      documentId: documentKey._id,
      operationType: operationType,

      // Document data
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      // Change metadata
      namespace: changeDoc.ns,
      transactionId: changeDoc.txnNumber,
      sessionId: changeDoc.lsid,

      // Processing metadata
      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Add operation-specific data
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};
      event.removedFields = changeDoc.updateDescription?.removedFields || [];

      // Detect specific user events
      if (event.updatedFields.status) {
        event.eventType = `user_status_changed`;
        event.statusChange = {
          from: fullDocumentBeforeChange?.status,
          to: fullDocument?.status
        };
      }

      if (event.updatedFields.email) {
        event.eventType = `user_email_changed`;
        event.emailChange = {
          from: fullDocumentBeforeChange?.email,
          to: fullDocument?.email
        };
      }
    }

    // Route to appropriate event handler
    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      console.warn(`No handler found for event type: ${event.eventType}`);
      await this.handleGenericEvent(event);
    }
  }

  async processOrderChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    const event = {
      eventId: changeDoc._id,
      eventType: `order_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      documentId: documentKey._id,
      operationType: operationType,
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      namespace: changeDoc.ns,
      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Detect order-specific events
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};

      // Order status changes
      if (event.updatedFields.status) {
        event.eventType = 'order_status_updated';
        event.statusChange = {
          from: fullDocumentBeforeChange?.status,
          to: fullDocument?.status,
          orderId: fullDocument?.orderNumber,
          customerId: fullDocument?.customerId
        };

        // Specific status-based events
        if (fullDocument?.status === 'cancelled') {
          event.eventType = 'order_cancelled';
        } else if (fullDocument?.status === 'shipped') {
          event.eventType = 'order_shipped';
        } else if (fullDocument?.status === 'delivered') {
          event.eventType = 'order_delivered';
        }
      }

      // Payment status changes
      if (event.updatedFields['payment.status']) {
        event.eventType = 'order_payment_updated';
        event.paymentChange = {
          from: fullDocumentBeforeChange?.payment?.status,
          to: fullDocument?.payment?.status
        };
      }
    }

    // Route to handler
    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      await this.handleGenericEvent(event);
    }
  }

  async processProductChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    const event = {
      eventId: changeDoc._id,
      eventType: `product_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      documentId: documentKey._id,
      operationType: operationType,
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Detect product-specific events
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};

      // Price changes
      if (event.updatedFields.price) {
        event.eventType = 'product_price_changed';
        event.priceChange = {
          from: fullDocumentBeforeChange?.price,
          to: fullDocument?.price,
          sku: fullDocument?.sku,
          changePercent: fullDocumentBeforeChange?.price ? 
            ((fullDocument.price - fullDocumentBeforeChange.price) / fullDocumentBeforeChange.price * 100) : null
        };
      }

      // Status changes (active/inactive)
      if (event.updatedFields.status) {
        event.eventType = 'product_status_changed';
        event.statusChange = {
          from: fullDocumentBeforeChange?.status,
          to: fullDocument?.status,
          sku: fullDocument?.sku
        };
      }
    }

    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      await this.handleGenericEvent(event);
    }
  }

  async processInventoryChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    const event = {
      eventId: changeDoc._id,
      eventType: `inventory_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      documentId: documentKey._id,
      operationType: operationType,
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Inventory-specific event detection
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};

      // Stock level changes
      if (event.updatedFields.stockQuantity !== undefined) {
        event.eventType = 'inventory_updated';
        event.stockChange = {
          from: fullDocumentBeforeChange?.stockQuantity || 0,
          to: fullDocument?.stockQuantity || 0,
          productId: fullDocument?.productId,
          sku: fullDocument?.sku,
          change: (fullDocument?.stockQuantity || 0) - (fullDocumentBeforeChange?.stockQuantity || 0)
        };

        // Low stock alerts
        if (fullDocument?.stockQuantity <= (fullDocument?.lowStockThreshold || 10)) {
          event.eventType = 'inventory_low_stock';
          event.lowStockAlert = {
            currentStock: fullDocument?.stockQuantity,
            threshold: fullDocument?.lowStockThreshold,
            productId: fullDocument?.productId
          };
        }

        // Out of stock alerts  
        if (fullDocument?.stockQuantity <= 0 && (fullDocumentBeforeChange?.stockQuantity || 0) > 0) {
          event.eventType = 'inventory_out_of_stock';
        }
      }
    }

    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      await this.handleGenericEvent(event);
    }
  }

  // Event handler implementations
  async handleUserCreated(event) {
    console.log(`Processing user created event: ${event.currentDocument.email}`);

    try {
      // Send welcome email
      await this.sendWelcomeEmail(event.currentDocument);

      // Create user profile in analytics system
      await this.createAnalyticsProfile(event.currentDocument);

      // Add to mailing list
      await this.addToMailingList(event.currentDocument);

      // Log event processing
      await this.logEventProcessed(event, 'success');

    } catch (error) {
      console.error('Error handling user created event:', error);
      await this.logEventProcessed(event, 'error', error.message);
      throw error;
    }
  }

  async handleOrderStatusUpdated(event) {
    console.log(`Processing order status update: ${event.statusChange.from} -> ${event.statusChange.to}`);

    try {
      // Send status update notification
      await this.sendOrderStatusNotification(event);

      // Update order analytics
      await this.updateOrderAnalytics(event);

      // Trigger fulfillment workflows
      if (event.statusChange.to === 'confirmed') {
        await this.triggerFulfillmentWorkflow(event.currentDocument);
      }

      // Update inventory reservations
      if (event.statusChange.to === 'cancelled') {
        await this.releaseInventoryReservation(event.currentDocument);
      }

      await this.logEventProcessed(event, 'success');

    } catch (error) {
      console.error('Error handling order status update:', error);
      await this.logEventProcessed(event, 'error', error.message);
      throw error;
    }
  }

  async handleInventoryUpdated(event) {
    console.log(`Processing inventory update: ${event.stockChange.sku} stock changed by ${event.stockChange.change}`);

    try {
      // Update search index with new stock levels
      await this.updateSearchIndex(event.currentDocument);

      // Notify interested customers about restocking
      if (event.stockChange.change > 0 && event.stockChange.from === 0) {
        await this.notifyRestocking(event.currentDocument);
      }

      // Update real-time inventory dashboard
      await this.updateInventoryDashboard(event);

      // Trigger reorder notifications for low stock
      if (event.eventType === 'inventory_low_stock') {
        await this.triggerReorderAlert(event.lowStockAlert);
      }

      await this.logEventProcessed(event, 'success');

    } catch (error) {
      console.error('Error handling inventory update:', error);
      await this.logEventProcessed(event, 'error', error.message);
      throw error;
    }
  }

  async handleGenericEvent(event) {
    console.log(`Processing generic event: ${event.eventType}`);

    // Store event for audit purposes
    await this.db.collection('event_audit_log').insertOne({
      eventId: event.eventId,
      eventType: event.eventType,
      eventTime: event.eventTime,
      documentId: event.documentId,
      operationType: event.operationType,
      processedAt: new Date(),
      handlerType: 'generic'
    });
  }

  // Helper methods for event processing
  async sendWelcomeEmail(user) {
    // Integration with email service
    console.log(`Sending welcome email to ${user.email}`);
    // await emailService.sendWelcomeEmail(user);
  }

  async sendOrderStatusNotification(event) {
    // Integration with notification service
    console.log(`Sending order notification for order ${event.currentDocument.orderNumber}`);
    // await notificationService.sendOrderUpdate(event);
  }

  async updateSearchIndex(inventoryDoc) {
    // Integration with search service
    console.log(`Updating search index for product ${inventoryDoc.sku}`);
    // await searchService.updateProductInventory(inventoryDoc);
  }

  async logEventProcessed(event, status, errorMessage = null) {
    await this.db.collection('event_processing_log').insertOne({
      eventId: event.eventId,
      eventType: event.eventType,
      documentId: event.documentId,
      processingStatus: status,
      processedAt: new Date(),
      receivedAt: event.receivedAt,
      processingDuration: Date.now() - event.receivedAt.getTime(),
      errorMessage: errorMessage
    });
  }

  recordStreamError(collectionName, error) {
    const streamInfo = this.changeStreams.get(collectionName);
    if (streamInfo) {
      streamInfo.errorCount++;
      streamInfo.lastError = {
        message: error.message,
        timestamp: new Date(),
        stack: error.stack
      };
    }
  }

  async handleStreamError(collectionName, error) {
    console.error(`Handling stream error for ${collectionName}:`, error);

    // Attempt to restart the change stream
    setTimeout(async () => {
      try {
        const streamInfo = this.changeStreams.get(collectionName);
        if (streamInfo && !streamInfo.isActive) {
          console.log(`Attempting to restart change stream for ${collectionName}`);
          await this.createChangeStream(
            collectionName, 
            streamInfo.options, 
            streamInfo.handler
          );
        }
      } catch (restartError) {
        console.error(`Failed to restart change stream for ${collectionName}:`, restartError);
      }
    }, 5000); // Wait 5 seconds before restart attempt
  }

  async getChangeStreamMetrics() {
    const metrics = {
      timestamp: new Date(),
      streams: {},
      systemHealth: 'unknown',
      totalEventsProcessed: 0,
      activeStreams: 0
    };

    for (const [collectionName, streamInfo] of this.changeStreams) {
      metrics.streams[collectionName] = {
        collection: collectionName,
        isActive: streamInfo.isActive,
        createdAt: streamInfo.createdAt,
        processedEvents: streamInfo.processedEvents,
        errorCount: streamInfo.errorCount,
        lastError: streamInfo.lastError,
        lastProcessedAt: streamInfo.lastProcessedAt,

        healthStatus: streamInfo.isActive ? 
          (streamInfo.errorCount < 5 ? 'healthy' : 'warning') : 'inactive'
      };

      metrics.totalEventsProcessed += streamInfo.processedEvents;
      if (streamInfo.isActive) metrics.activeStreams++;
    }

    // Determine system health
    const totalStreams = this.changeStreams.size;
    if (metrics.activeStreams === totalStreams) {
      metrics.systemHealth = 'healthy';
    } else if (metrics.activeStreams > totalStreams / 2) {
      metrics.systemHealth = 'degraded';
    } else {
      metrics.systemHealth = 'critical';
    }

    return metrics;
  }

  async shutdown() {
    console.log('Shutting down MongoDB Change Data Capture Manager...');

    // Close all change streams
    for (const [collectionName, streamInfo] of this.changeStreams) {
      try {
        if (streamInfo.stream && streamInfo.isActive) {
          await streamInfo.stream.close();
          console.log(`✅ Closed change stream for ${collectionName}`);
        }
      } catch (error) {
        console.error(`Error closing change stream for ${collectionName}:`, error);
      }
    }

    // Close MongoDB connection
    if (this.client) {
      await this.client.close();
      console.log('✅ MongoDB connection closed');
    }

    this.changeStreams.clear();
    this.eventHandlers.clear();
    this.processingMetrics.clear();
  }
}

// Export the change data capture manager
module.exports = { MongoChangeDataCaptureManager };

// Benefits of MongoDB Change Data Capture:
// - Native real-time change streams eliminate polling and trigger complexity
// - Ordered, resumable event streams ensure reliable event processing
// - Full document context provides complete change information
// - Built-in error handling and automatic reconnection capabilities
// - Transaction-aware change detection with ACID guarantees
// - Scalable event processing without performance impact on source database
// - Flexible event routing and transformation capabilities
// - Production-ready monitoring and metrics for change stream health
// - Zero external dependencies for change data capture functionality
// - SQL-compatible event processing patterns through QueryLeaf integration

SQL-Style Change Data Capture with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB change data capture and event processing:

-- QueryLeaf Change Data Capture with SQL-familiar syntax

-- Create change stream monitors
CREATE CHANGE STREAM user_changes 
ON COLLECTION users
WITH OPTIONS (
  full_document = 'updateLookup',
  full_document_before_change = 'whenAvailable',
  resume_token_collection = 'change_stream_tokens'
)
AS SELECT 
  change_id,
  operation_type,
  document_id,
  cluster_time as event_time,

  -- Document data
  full_document as current_document,
  full_document_before_change as previous_document,

  -- Change details
  updated_fields,
  removed_fields,

  -- Event classification
  CASE operation_type
    WHEN 'insert' THEN 'user_created'
    WHEN 'update' THEN 
      CASE 
        WHEN updated_fields ? 'status' THEN 'user_status_changed'
        WHEN updated_fields ? 'email' THEN 'user_email_changed'
        ELSE 'user_updated'
      END
    WHEN 'delete' THEN 'user_deleted'
  END as event_type,

  -- Processing metadata
  CURRENT_TIMESTAMP as received_at,
  'pending' as processing_status

FROM mongodb_change_stream;

-- Query change stream events
SELECT 
  event_type,
  event_time,
  document_id,
  operation_type,

  -- Extract specific field changes
  current_document->>'email' as current_email,
  previous_document->>'email' as previous_email,
  current_document->>'status' as current_status,
  previous_document->>'status' as previous_status,

  -- Change analysis
  CASE 
    WHEN operation_type = 'update' AND updated_fields ? 'status' THEN
      JSON_OBJECT(
        'field', 'status',
        'from', previous_document->>'status',
        'to', current_document->>'status',
        'change_type', 'status_transition'
      )
  END as change_details,

  processing_status,
  received_at

FROM user_changes
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
ORDER BY event_time DESC;

-- Event processing pipeline with SQL
WITH processed_events AS (
  SELECT 
    change_id,
    event_type,
    document_id,

    -- Route events to handlers
    CASE event_type
      WHEN 'user_created' THEN 'user_management_service'
      WHEN 'user_status_changed' THEN 'notification_service'
      WHEN 'order_status_updated' THEN 'order_fulfillment_service'
      WHEN 'inventory_updated' THEN 'inventory_management_service'
      ELSE 'default_event_handler'
    END as target_service,

    -- Event priority
    CASE event_type
      WHEN 'order_cancelled' THEN 'high'
      WHEN 'inventory_out_of_stock' THEN 'high'
      WHEN 'user_created' THEN 'medium'
      ELSE 'low'
    END as priority,

    -- Event payload
    JSON_OBJECT(
      'eventId', change_id,
      'eventType', event_type,
      'documentId', document_id,
      'currentDocument', current_document,
      'previousDocument', previous_document,
      'changeDetails', change_details,
      'eventTime', event_time,
      'receivedAt', received_at
    ) as event_payload

  FROM user_changes
  WHERE processing_status = 'pending'
),

event_routing AS (
  SELECT 
    *,
    -- Generate webhook URLs for event delivery
    CONCAT('https://api.example.com/services/', target_service, '/events') as webhook_url,

    -- Retry configuration
    CASE priority
      WHEN 'high' THEN 5
      WHEN 'medium' THEN 3
      ELSE 1
    END as max_retries

  FROM processed_events
)

-- Process events (would integrate with actual webhook delivery)
SELECT 
  change_id,
  event_type,
  target_service,
  priority,
  webhook_url,
  event_payload,
  max_retries,

  -- Processing recommendations
  CASE priority
    WHEN 'high' THEN 'Process immediately with dedicated queue'
    WHEN 'medium' THEN 'Process within 30 seconds'
    ELSE 'Process in batch queue'
  END as processing_strategy

FROM event_routing
ORDER BY 
  CASE priority
    WHEN 'high' THEN 1
    WHEN 'medium' THEN 2
    ELSE 3
  END,
  event_time;

-- Change stream performance monitoring
SELECT 
  stream_name,
  collection_name,

  -- Activity metrics
  total_events_processed,
  events_per_hour,

  -- Event type distribution
  (events_by_type->>'insert')::INTEGER as insert_events,
  (events_by_type->>'update')::INTEGER as update_events,
  (events_by_type->>'delete')::INTEGER as delete_events,

  -- Performance metrics
  ROUND(avg_processing_latency_ms::NUMERIC, 2) as avg_latency_ms,
  ROUND(p95_processing_latency_ms::NUMERIC, 2) as p95_latency_ms,

  -- Error handling
  error_count,
  ROUND(error_rate::NUMERIC * 100, 2) as error_rate_percent,
  last_error_time,

  -- Stream health
  is_active,
  last_heartbeat,

  CASE 
    WHEN NOT is_active THEN 'critical'
    WHEN error_rate > 0.05 THEN 'warning'
    WHEN avg_processing_latency_ms > 1000 THEN 'slow'
    ELSE 'healthy'
  END as health_status

FROM change_stream_metrics
WHERE last_updated >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
ORDER BY events_per_hour DESC;

-- Event-driven architecture analytics
CREATE VIEW event_driven_analytics AS
WITH event_patterns AS (
  SELECT 
    event_type,
    target_service,
    DATE_TRUNC('hour', event_time) as hour_bucket,

    -- Volume metrics
    COUNT(*) as event_count,
    COUNT(DISTINCT document_id) as unique_documents,

    -- Processing metrics
    AVG(EXTRACT(EPOCH FROM (processed_at - received_at))) as avg_processing_time_seconds,
    COUNT(*) FILTER (WHERE processing_status = 'success') as successful_events,
    COUNT(*) FILTER (WHERE processing_status = 'error') as failed_events,

    -- Event characteristics
    AVG(JSON_LENGTH(event_payload)) as avg_payload_size,
    COUNT(*) FILTER (WHERE priority = 'high') as high_priority_events

  FROM change_stream_events
  WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
  GROUP BY event_type, target_service, DATE_TRUNC('hour', event_time)
)

SELECT 
  event_type,
  target_service,
  TO_CHAR(hour_bucket, 'YYYY-MM-DD HH24:00') as analysis_hour,

  -- Volume analysis
  event_count,
  unique_documents,
  high_priority_events,

  -- Performance analysis
  ROUND(avg_processing_time_seconds::NUMERIC, 3) as avg_processing_seconds,
  ROUND((successful_events::DECIMAL / event_count * 100)::NUMERIC, 2) as success_rate_percent,

  -- System load indicators
  CASE 
    WHEN event_count > 10000 THEN 'very_high'
    WHEN event_count > 1000 THEN 'high'
    WHEN event_count > 100 THEN 'medium'
    ELSE 'low'
  END as event_volume_category,

  -- Performance assessment
  CASE 
    WHEN avg_processing_time_seconds > 5 THEN 'processing_slow'
    WHEN successful_events::DECIMAL / event_count < 0.95 THEN 'high_error_rate'
    WHEN event_count > 5000 AND avg_processing_time_seconds > 1 THEN 'capacity_strain'
    ELSE 'performing_well'
  END as performance_indicator,

  -- Optimization recommendations
  CASE 
    WHEN high_priority_events > event_count * 0.3 THEN 'Consider dedicated high-priority queue'
    WHEN failed_events > 10 THEN 'Review error handling and retry logic'
    WHEN avg_processing_time_seconds > 2 THEN 'Optimize event processing pipeline'
    WHEN event_count > 1000 AND unique_documents < event_count * 0.1 THEN 'Consider event deduplication'
    ELSE 'Event processing optimized for current load'
  END as optimization_recommendation

FROM event_patterns
ORDER BY event_count DESC, hour_bucket DESC;

-- QueryLeaf provides comprehensive Change Data Capture capabilities:
-- 1. SQL-familiar syntax for creating and managing change streams
-- 2. Real-time event processing with automatic routing and prioritization
-- 3. Comprehensive monitoring and analytics for event-driven architectures
-- 4. Error handling and retry logic integrated into SQL workflows
-- 5. Performance optimization recommendations based on event patterns
-- 6. Integration with MongoDB's native change stream capabilities
-- 7. Enterprise-grade event processing accessible through familiar SQL constructs
-- 8. Scalable event-driven architecture patterns with SQL-style management

Best Practices for MongoDB Change Data Capture

Change Stream Design Patterns

Essential practices for implementing change data capture:

  1. Event Classification: Design clear event taxonomies that map business operations to technical changes
  2. Error Handling Strategy: Implement comprehensive retry logic and dead letter queues for failed events
  3. Performance Monitoring: Establish metrics and alerting for change stream health and processing latency
  4. Resumability: Use resume tokens to ensure reliable event processing across application restarts
  5. Filtering Strategy: Apply appropriate filters to change streams to process only relevant events
  6. Scalability Planning: Design event processing pipelines that can handle high-throughput scenarios

Production Deployment Considerations

Key factors for enterprise change data capture deployments:

  1. Replica Set Requirements: Ensure proper replica set configuration for change stream availability
  2. Resource Planning: Account for change stream resource consumption and event processing overhead
  3. Event Ordering: Understand and leverage MongoDB's event ordering guarantees for related changes
  4. Disaster Recovery: Plan for change stream recovery and event replay scenarios
  5. Security Configuration: Implement proper authentication and authorization for change stream access
  6. Monitoring Integration: Integrate change stream metrics with existing monitoring and alerting systems

Conclusion

MongoDB Change Data Capture through Change Streams provides enterprise-grade real-time event processing that enables sophisticated event-driven architectures without external dependencies. The combination of native change detection, ordered event delivery, and comprehensive error handling enables applications to build reactive systems that respond instantly to data changes.

Key MongoDB Change Data Capture benefits include:

  • Real-Time Processing: Native change streams provide immediate notification of data changes with minimal latency
  • Event Ordering: Guaranteed ordering of related events ensures consistent event processing across services
  • Resumable Streams: Built-in resume token support enables reliable event processing across application restarts
  • Full Context: Complete document information including before and after states for comprehensive change analysis
  • Production Ready: Enterprise-grade error handling, monitoring, and scalability capabilities
  • SQL Compatibility: Familiar change processing patterns accessible through SQL-style operations

Whether you're building microservices architectures, real-time analytics pipelines, or reactive user interfaces, MongoDB Change Data Capture with QueryLeaf's SQL-familiar interface provides the foundation for scalable event-driven systems that maintain consistency and responsiveness while simplifying operational complexity.

QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB Change Data Capture while providing SQL-familiar syntax for creating, monitoring, and processing change streams. Advanced event routing, error handling, and performance analytics are seamlessly accessible through familiar SQL constructs, making sophisticated event-driven architecture both powerful and approachable for SQL-oriented teams.

The combination of MongoDB's intelligent change detection with familiar SQL-style management makes it an ideal platform for applications that require both real-time data processing and operational simplicity, ensuring your event-driven architecture scales efficiently while maintaining familiar development and operational patterns.