Skip to content

MongoDB Bulk Operations and Performance Optimization: High-Throughput Data Processing with Advanced Batch Operations and SQL-Style Bulk Operations

Modern applications frequently need to process large volumes of data efficiently, requiring sophisticated approaches to batch operations that can maintain high throughput while ensuring data consistency and optimal resource utilization. Traditional single-document operations become a significant bottleneck when dealing with thousands or millions of records, leading to performance degradation and resource exhaustion.

MongoDB's bulk operations provide powerful batch processing capabilities that can dramatically improve throughput for high-volume data operations. Unlike traditional databases that require complex orchestration for batch processing, MongoDB's bulk operations offer native support for optimized batch writes, reads, and updates with built-in error handling and performance optimization.

The Traditional High-Volume Data Processing Challenge

Conventional approaches to processing large datasets suffer from significant performance and scalability limitations:

-- Traditional PostgreSQL bulk processing - inefficient and resource-intensive

-- Standard approach using individual INSERT statements
DO $$
DECLARE
    record_data RECORD;
    batch_size INTEGER := 1000;
    processed_count INTEGER := 0;
    total_records INTEGER;
BEGIN
    -- Count total records to process
    SELECT COUNT(*) INTO total_records FROM staging_data;

    -- Process records one by one (inefficient)
    FOR record_data IN 
        SELECT id, user_id, transaction_amount, transaction_date, metadata 
        FROM staging_data 
        ORDER BY id
    LOOP
        -- Individual INSERT - causes overhead per operation
        INSERT INTO user_transactions (
            user_id,
            amount,
            transaction_date,
            metadata,
            processed_at
        ) VALUES (
            record_data.user_id,
            record_data.transaction_amount,
            record_data.transaction_date,
            record_data.metadata,
            CURRENT_TIMESTAMP
        );

        processed_count := processed_count + 1;

        -- Commit every batch_size records to avoid long transactions
        IF processed_count % batch_size = 0 THEN
            COMMIT;
            RAISE NOTICE 'Processed % of % records', processed_count, total_records;
        END IF;
    END LOOP;

    -- Final commit
    COMMIT;
    RAISE NOTICE 'Completed processing % total records', total_records;

EXCEPTION 
    WHEN OTHERS THEN
        ROLLBACK;
        RAISE NOTICE 'Error processing bulk data: %', SQLERRM;
END $$;

-- Problems with traditional single-record approach:
-- 1. Extremely high overhead - one network round-trip per operation
-- 2. Poor throughput - typically 1,000-5,000 operations/second maximum
-- 3. Resource exhaustion - excessive connection and memory usage
-- 4. Limited error handling - single failure can abort entire batch
-- 5. Lock contention - frequent commits cause index lock overhead
-- 6. No optimization for similar operations
-- 7. Difficult progress tracking and resume capability
-- 8. Poor CPU and I/O efficiency due to constant context switching

-- Attempt at bulk INSERT (better but still limited)
INSERT INTO user_transactions (user_id, amount, transaction_date, metadata, processed_at)
SELECT 
    user_id,
    transaction_amount,
    transaction_date,
    metadata,
    CURRENT_TIMESTAMP
FROM staging_data;

-- Issues with basic bulk INSERT:
-- - All-or-nothing behavior - single bad record fails entire batch
-- - No granular error reporting
-- - Limited to INSERT operations only
-- - Difficult to handle conflicts or duplicates
-- - No support for conditional operations
-- - Memory constraints with very large datasets
-- - Poor performance with complex transformations

-- MySQL limitations (even more restrictive)
INSERT INTO user_transactions (user_id, amount, transaction_date, metadata)
VALUES 
    (1001, 150.00, '2024-01-15', '{"category": "purchase"}'),
    (1002, 75.50, '2024-01-15', '{"category": "refund"}'),
    (1003, 200.00, '2024-01-15', '{"category": "purchase"}');
    -- Repeat for potentially millions of records...

-- MySQL bulk processing problems:
-- - Maximum query size limitations
-- - Poor error handling for mixed success/failure scenarios  
-- - Limited transaction size support
-- - No built-in upsert capabilities
-- - Difficult conflict resolution
-- - Poor performance with large batch sizes
-- - Memory exhaustion with complex operations

MongoDB provides comprehensive bulk operation capabilities:

// MongoDB Bulk Operations - high-performance batch processing
const { MongoClient } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('high_performance_db');

// Advanced bulk operations system for high-throughput data processing
class HighThroughputBulkProcessor {
  constructor(db, config = {}) {
    this.db = db;
    this.config = {
      batchSize: config.batchSize || 10000,
      maxParallelBatches: config.maxParallelBatches || 5,
      retryAttempts: config.retryAttempts || 3,
      retryDelay: config.retryDelay || 1000,
      enableMetrics: config.enableMetrics !== false,
      ...config
    };

    this.metrics = {
      totalOperations: 0,
      successfulOperations: 0,
      failedOperations: 0,
      batchesProcessed: 0,
      processingTime: 0,
      throughputHistory: []
    };

    this.collections = new Map();
    this.activeOperations = new Set();
  }

  async setupOptimizedCollections() {
    console.log('Setting up collections for high-performance bulk operations...');

    const collectionConfigs = [
      {
        name: 'user_transactions',
        indexes: [
          { key: { userId: 1, transactionDate: -1 } },
          { key: { transactionType: 1, status: 1 } },
          { key: { amount: 1 } },
          { key: { createdAt: -1 } }
        ],
        options: { 
          writeConcern: { w: 'majority', j: true },
          readConcern: { level: 'majority' }
        }
      },
      {
        name: 'user_profiles',
        indexes: [
          { key: { email: 1 }, unique: true },
          { key: { userId: 1 }, unique: true },
          { key: { lastLoginDate: -1 } },
          { key: { 'preferences.category': 1 } }
        ]
      },
      {
        name: 'product_analytics',
        indexes: [
          { key: { productId: 1, eventDate: -1 } },
          { key: { eventType: 1, processed: 1 } },
          { key: { userId: 1, eventDate: -1 } }
        ]
      },
      {
        name: 'audit_logs',
        indexes: [
          { key: { entityId: 1, timestamp: -1 } },
          { key: { action: 1, timestamp: -1 } },
          { key: { userId: 1, timestamp: -1 } }
        ]
      }
    ];

    for (const config of collectionConfigs) {
      const collection = this.db.collection(config.name);
      this.collections.set(config.name, collection);

      // Create indexes for optimal bulk operation performance
      for (const index of config.indexes) {
        try {
          await collection.createIndex(index.key, index.unique ? { unique: true } : {});
          console.log(`Created index on ${config.name}:`, index.key);
        } catch (error) {
          console.warn(`Index creation warning for ${config.name}:`, error.message);
        }
      }
    }

    console.log(`Initialized ${collectionConfigs.length} collections for bulk operations`);
    return this.collections;
  }

  async bulkInsertOptimized(collectionName, documents, options = {}) {
    console.log(`Starting bulk insert for ${documents.length} documents in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    if (!collection) {
      throw new Error(`Collection ${collectionName} not found. Initialize collections first.`);
    }

    const {
      batchSize = this.config.batchSize,
      ordered = false,
      writeConcern = { w: 'majority', j: true },
      enableValidation = true,
      enableProgressReporting = true
    } = options;

    let totalInserted = 0;
    let totalErrors = 0;
    const results = [];
    const errors = [];

    // Process documents in optimized batches
    for (let i = 0; i < documents.length; i += batchSize) {
      const batch = documents.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        // Validate documents if enabled
        if (enableValidation) {
          await this.validateDocumentBatch(batch, collectionName);
        }

        // Execute bulk insert with optimized settings
        const result = await collection.insertMany(batch, {
          ordered: ordered,
          writeConcern: writeConcern,
          bypassDocumentValidation: !enableValidation
        });

        totalInserted += result.insertedCount;
        results.push({
          batchNumber,
          insertedCount: result.insertedCount,
          insertedIds: result.insertedIds
        });

        // Progress reporting
        if (enableProgressReporting) {
          const progress = ((i + batch.length) / documents.length * 100).toFixed(1);
          const throughput = Math.round(totalInserted / ((Date.now() - startTime) / 1000));
          console.log(`Batch ${batchNumber}: ${result.insertedCount} inserted, ${progress}% complete, ${throughput} docs/sec`);
        }

      } catch (error) {
        totalErrors++;
        console.error(`Batch ${batchNumber} failed:`, error.message);

        // Handle partial failures in unordered mode
        if (!ordered && error.result) {
          const partialInserted = error.result.nInserted || 0;
          totalInserted += partialInserted;

          errors.push({
            batchNumber,
            error: error.message,
            insertedCount: partialInserted,
            failedOperations: error.writeErrors || []
          });
        } else {
          errors.push({
            batchNumber,
            error: error.message,
            insertedCount: 0
          });
        }
      }
    }

    const totalTime = Date.now() - startTime;
    const throughput = Math.round(totalInserted / (totalTime / 1000));

    // Update metrics
    this.updateMetrics('insert', totalInserted, totalErrors, totalTime);

    const summary = {
      success: totalErrors === 0,
      totalDocuments: documents.length,
      totalInserted: totalInserted,
      totalBatches: Math.ceil(documents.length / batchSize),
      failedBatches: totalErrors,
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };

    console.log(`Bulk insert completed: ${totalInserted}/${documents.length} documents in ${totalTime}ms (${throughput} docs/sec)`);
    return summary;
  }

  async bulkUpdateOptimized(collectionName, updateOperations, options = {}) {
    console.log(`Starting bulk update for ${updateOperations.length} operations in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    if (!collection) {
      throw new Error(`Collection ${collectionName} not found`);
    }

    const {
      batchSize = this.config.batchSize,
      ordered = false,
      writeConcern = { w: 'majority', j: true }
    } = options;

    let totalModified = 0;
    let totalMatched = 0;
    let totalUpserted = 0;
    const results = [];
    const errors = [];

    // Process operations in batches
    for (let i = 0; i < updateOperations.length; i += batchSize) {
      const batch = updateOperations.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        // Build bulk update operation
        const bulkOp = collection.initializeUnorderedBulkOp();

        for (const operation of batch) {
          const { filter, update, options: opOptions = {} } = operation;

          if (opOptions.upsert) {
            bulkOp.find(filter).upsert().updateOne(update);
          } else if (operation.type === 'updateMany') {
            bulkOp.find(filter).update(update);
          } else {
            bulkOp.find(filter).updateOne(update);
          }
        }

        // Execute bulk operation
        const result = await bulkOp.execute({ writeConcern });

        totalModified += result.modifiedCount || 0;
        totalMatched += result.matchedCount || 0;  
        totalUpserted += result.upsertedCount || 0;

        results.push({
          batchNumber,
          matchedCount: result.matchedCount,
          modifiedCount: result.modifiedCount,
          upsertedCount: result.upsertedCount,
          upsertedIds: result.upsertedIds
        });

        const progress = ((i + batch.length) / updateOperations.length * 100).toFixed(1);
        console.log(`Update batch ${batchNumber}: ${result.modifiedCount} modified, ${progress}% complete`);

      } catch (error) {
        console.error(`Update batch ${batchNumber} failed:`, error.message);

        // Handle partial results from bulk write errors
        if (error.result) {
          totalModified += error.result.nModified || 0;
          totalMatched += error.result.nMatched || 0;
          totalUpserted += error.result.nUpserted || 0;
        }

        errors.push({
          batchNumber,
          error: error.message,
          writeErrors: error.writeErrors || []
        });
      }
    }

    const totalTime = Date.now() - startTime;
    const throughput = Math.round(totalModified / (totalTime / 1000));

    this.updateMetrics('update', totalModified, errors.length, totalTime);

    return {
      success: errors.length === 0,
      totalOperations: updateOperations.length,
      totalMatched: totalMatched,
      totalModified: totalModified, 
      totalUpserted: totalUpserted,
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };
  }

  async bulkUpsertOptimized(collectionName, upsertOperations, options = {}) {
    console.log(`Starting bulk upsert for ${upsertOperations.length} operations in ${collectionName}...`);

    const {
      batchSize = this.config.batchSize,
      enableDeduplication = true
    } = options;

    // Deduplicate operations based on filter if enabled
    let processedOperations = upsertOperations;
    if (enableDeduplication) {
      processedOperations = this.deduplicateUpsertOperations(upsertOperations);
      console.log(`Deduplicated ${upsertOperations.length} operations to ${processedOperations.length}`);
    }

    // Convert upsert operations to bulk update operations
    const updateOperations = processedOperations.map(op => ({
      filter: op.filter,
      update: op.update,
      options: { upsert: true }
    }));

    return await this.bulkUpdateOptimized(collectionName, updateOperations, {
      ...options,
      batchSize
    });
  }

  async bulkDeleteOptimized(collectionName, deleteFilters, options = {}) {
    console.log(`Starting bulk delete for ${deleteFilters.length} operations in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    if (!collection) {
      throw new Error(`Collection ${collectionName} not found`);
    }

    const {
      batchSize = this.config.batchSize,
      deleteMany = false,
      writeConcern = { w: 'majority', j: true }
    } = options;

    let totalDeleted = 0;
    const results = [];
    const errors = [];

    for (let i = 0; i < deleteFilters.length; i += batchSize) {
      const batch = deleteFilters.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        const bulkOp = collection.initializeUnorderedBulkOp();

        for (const filter of batch) {
          if (deleteMany) {
            bulkOp.find(filter).delete();
          } else {
            bulkOp.find(filter).deleteOne();
          }
        }

        const result = await bulkOp.execute({ writeConcern });
        const deletedCount = result.deletedCount || 0;

        totalDeleted += deletedCount;
        results.push({
          batchNumber,
          deletedCount
        });

        const progress = ((i + batch.length) / deleteFilters.length * 100).toFixed(1);
        console.log(`Delete batch ${batchNumber}: ${deletedCount} deleted, ${progress}% complete`);

      } catch (error) {
        console.error(`Delete batch ${batchNumber} failed:`, error.message);
        errors.push({
          batchNumber,
          error: error.message
        });
      }
    }

    const totalTime = Date.now() - startTime;
    const throughput = Math.round(totalDeleted / (totalTime / 1000));

    this.updateMetrics('delete', totalDeleted, errors.length, totalTime);

    return {
      success: errors.length === 0,
      totalOperations: deleteFilters.length,
      totalDeleted: totalDeleted,
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };
  }

  async mixedBulkOperations(collectionName, operations, options = {}) {
    console.log(`Starting mixed bulk operations: ${operations.length} operations in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    const {
      batchSize = this.config.batchSize,
      writeConcern = { w: 'majority', j: true }
    } = options;

    let totalInserted = 0;
    let totalModified = 0;
    let totalDeleted = 0;
    let totalUpserted = 0;
    const results = [];
    const errors = [];

    for (let i = 0; i < operations.length; i += batchSize) {
      const batch = operations.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        const bulkOp = collection.initializeUnorderedBulkOp();

        for (const operation of batch) {
          switch (operation.type) {
            case 'insert':
              bulkOp.insert(operation.document);
              break;

            case 'update':
              bulkOp.find(operation.filter).updateOne(operation.update);
              break;

            case 'updateMany':
              bulkOp.find(operation.filter).update(operation.update);
              break;

            case 'upsert':
              bulkOp.find(operation.filter).upsert().updateOne(operation.update);
              break;

            case 'delete':
              bulkOp.find(operation.filter).deleteOne();
              break;

            case 'deleteMany':
              bulkOp.find(operation.filter).delete();
              break;

            case 'replace':
              bulkOp.find(operation.filter).replaceOne(operation.replacement);
              break;

            default:
              throw new Error(`Unknown operation type: ${operation.type}`);
          }
        }

        const result = await bulkOp.execute({ writeConcern });

        totalInserted += result.insertedCount || 0;
        totalModified += result.modifiedCount || 0;
        totalDeleted += result.deletedCount || 0;
        totalUpserted += result.upsertedCount || 0;

        results.push({
          batchNumber,
          insertedCount: result.insertedCount,
          modifiedCount: result.modifiedCount,
          deletedCount: result.deletedCount,
          upsertedCount: result.upsertedCount,
          matchedCount: result.matchedCount
        });

        const progress = ((i + batch.length) / operations.length * 100).toFixed(1);
        console.log(`Mixed batch ${batchNumber}: ${batch.length} operations completed, ${progress}% complete`);

      } catch (error) {
        console.error(`Mixed batch ${batchNumber} failed:`, error.message);
        errors.push({
          batchNumber,
          error: error.message,
          writeErrors: error.writeErrors || []
        });
      }
    }

    const totalTime = Date.now() - startTime;
    const totalOperations = totalInserted + totalModified + totalDeleted;
    const throughput = Math.round(totalOperations / (totalTime / 1000));

    this.updateMetrics('mixed', totalOperations, errors.length, totalTime);

    return {
      success: errors.length === 0,
      totalOperations: operations.length,
      executionSummary: {
        inserted: totalInserted,
        modified: totalModified,
        deleted: totalDeleted,
        upserted: totalUpserted
      },
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };
  }

  async parallelBulkProcessing(collectionName, operationBatches, operationType = 'insert') {
    console.log(`Starting parallel bulk processing: ${operationBatches.length} batches in ${collectionName}...`);
    const startTime = Date.now();

    const maxParallel = Math.min(this.config.maxParallelBatches, operationBatches.length);
    const results = [];
    const errors = [];

    // Process batches in parallel with controlled concurrency
    const processBatch = async (batch, batchIndex) => {
      try {
        let result;
        switch (operationType) {
          case 'insert':
            result = await this.bulkInsertOptimized(collectionName, batch, {
              enableProgressReporting: false
            });
            break;
          case 'update':
            result = await this.bulkUpdateOptimized(collectionName, batch, {});
            break;
          case 'delete':
            result = await this.bulkDeleteOptimized(collectionName, batch, {});
            break;
          default:
            throw new Error(`Unsupported parallel operation type: ${operationType}`);
        }

        return { batchIndex, result, success: true };
      } catch (error) {
        console.error(`Parallel batch ${batchIndex} failed:`, error.message);
        return { batchIndex, error: error.message, success: false };
      }
    };

    // Execute batches with controlled parallelism
    for (let i = 0; i < operationBatches.length; i += maxParallel) {
      const parallelBatch = operationBatches.slice(i, i + maxParallel);
      const promises = parallelBatch.map((batch, index) => processBatch(batch, i + index));

      const batchResults = await Promise.all(promises);

      for (const batchResult of batchResults) {
        if (batchResult.success) {
          results.push(batchResult.result);
        } else {
          errors.push(batchResult);
        }
      }

      const progress = ((i + parallelBatch.length) / operationBatches.length * 100).toFixed(1);
      console.log(`Parallel processing: ${progress}% complete`);
    }

    const totalTime = Date.now() - startTime;

    // Aggregate results
    const aggregatedResult = {
      success: errors.length === 0,
      totalBatches: operationBatches.length,
      successfulBatches: results.length,
      failedBatches: errors.length,
      executionTime: totalTime,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };

    // Calculate total operations processed
    let totalOperations = 0;
    for (const result of results) {
      totalOperations += result.totalInserted || result.totalModified || result.totalDeleted || 0;
    }

    aggregatedResult.totalOperations = totalOperations;
    aggregatedResult.throughput = Math.round(totalOperations / (totalTime / 1000));

    console.log(`Parallel bulk processing completed: ${totalOperations} operations in ${totalTime}ms (${aggregatedResult.throughput} ops/sec)`);
    return aggregatedResult;
  }

  async validateDocumentBatch(documents, collectionName) {
    // Basic document validation
    const requiredFields = this.getRequiredFields(collectionName);

    for (const doc of documents) {
      for (const field of requiredFields) {
        if (doc[field] === undefined || doc[field] === null) {
          throw new Error(`Required field '${field}' missing in document`);
        }
      }
    }

    return true;
  }

  getRequiredFields(collectionName) {
    const fieldMap = {
      'user_transactions': ['userId', 'amount', 'transactionType'],
      'user_profiles': ['userId', 'email'],
      'product_analytics': ['productId', 'eventType', 'eventDate'],
      'audit_logs': ['entityId', 'action', 'timestamp']
    };

    return fieldMap[collectionName] || [];
  }

  deduplicateUpsertOperations(operations) {
    const seen = new Map();
    const deduplicated = [];

    for (const operation of operations) {
      const filterKey = JSON.stringify(operation.filter);
      if (!seen.has(filterKey)) {
        seen.set(filterKey, true);
        deduplicated.push(operation);
      }
    }

    return deduplicated;
  }

  updateMetrics(operationType, successCount, errorCount, executionTime) {
    if (!this.config.enableMetrics) return;

    this.metrics.totalOperations += successCount + errorCount;
    this.metrics.successfulOperations += successCount;
    this.metrics.failedOperations += errorCount;
    this.metrics.batchesProcessed++;
    this.metrics.processingTime += executionTime;

    const throughput = Math.round(successCount / (executionTime / 1000));
    this.metrics.throughputHistory.push({
      timestamp: new Date(),
      operationType,
      throughput,
      successCount,
      errorCount
    });

    // Keep only last 100 throughput measurements
    if (this.metrics.throughputHistory.length > 100) {
      this.metrics.throughputHistory.shift();
    }
  }

  getPerformanceMetrics() {
    const recentThroughput = this.metrics.throughputHistory.slice(-10);
    const avgThroughput = recentThroughput.length > 0 
      ? Math.round(recentThroughput.reduce((sum, t) => sum + t.throughput, 0) / recentThroughput.length)
      : 0;

    return {
      totalOperations: this.metrics.totalOperations,
      successfulOperations: this.metrics.successfulOperations,
      failedOperations: this.metrics.failedOperations,
      successRate: this.metrics.totalOperations > 0 
        ? ((this.metrics.successfulOperations / this.metrics.totalOperations) * 100).toFixed(2) + '%'
        : '0%',
      batchesProcessed: this.metrics.batchesProcessed,
      totalProcessingTime: this.metrics.processingTime,
      averageThroughput: avgThroughput,
      recentThroughputHistory: recentThroughput
    };
  }

  async shutdown() {
    console.log('Shutting down bulk processor...');

    // Wait for active operations to complete
    if (this.activeOperations.size > 0) {
      console.log(`Waiting for ${this.activeOperations.size} active operations to complete...`);
      await Promise.allSettled(Array.from(this.activeOperations));
    }

    console.log('Bulk processor shutdown complete');
    console.log('Final Performance Metrics:', this.getPerformanceMetrics());
  }
}

// Example usage and demonstration
const demonstrateBulkOperations = async () => {
  try {
    const processor = new HighThroughputBulkProcessor(db, {
      batchSize: 5000,
      maxParallelBatches: 3,
      enableMetrics: true
    });

    // Initialize collections
    await processor.setupOptimizedCollections();

    // Generate sample data for demonstration
    const sampleTransactions = Array.from({ length: 50000 }, (_, index) => ({
      _id: new ObjectId(),
      userId: `user_${Math.floor(Math.random() * 10000)}`,
      amount: Math.round((Math.random() * 1000 + 10) * 100) / 100,
      transactionType: ['purchase', 'refund', 'transfer'][Math.floor(Math.random() * 3)],
      transactionDate: new Date(Date.now() - Math.random() * 30 * 24 * 60 * 60 * 1000),
      metadata: {
        category: ['electronics', 'clothing', 'books', 'food'][Math.floor(Math.random() * 4)],
        channel: ['web', 'mobile', 'api'][Math.floor(Math.random() * 3)]
      },
      createdAt: new Date()
    }));

    // Bulk insert demonstration
    console.log('\n=== Bulk Insert Demonstration ===');
    const insertResult = await processor.bulkInsertOptimized('user_transactions', sampleTransactions);
    console.log('Insert Result:', insertResult);

    // Bulk update demonstration
    console.log('\n=== Bulk Update Demonstration ===');
    const updateOperations = Array.from({ length: 10000 }, (_, index) => ({
      filter: { userId: `user_${index % 1000}` },
      update: { 
        $inc: { totalSpent: Math.round(Math.random() * 100) },
        $set: { lastUpdated: new Date() }
      },
      type: 'updateMany'
    }));

    const updateResult = await processor.bulkUpdateOptimized('user_transactions', updateOperations);
    console.log('Update Result:', updateResult);

    // Mixed operations demonstration
    console.log('\n=== Mixed Operations Demonstration ===');
    const mixedOperations = [
      // Insert operations
      ...Array.from({ length: 1000 }, (_, index) => ({
        type: 'insert',
        document: {
          userId: `new_user_${index}`,
          email: `newuser${index}@example.com`,
          createdAt: new Date()
        }
      })),

      // Update operations
      ...Array.from({ length: 500 }, (_, index) => ({
        type: 'update',
        filter: { userId: `user_${index}` },
        update: { $set: { status: 'active', lastLogin: new Date() } }
      })),

      // Upsert operations
      ...Array.from({ length: 300 }, (_, index) => ({
        type: 'upsert',
        filter: { email: `upsert${index}@example.com` },
        update: { 
          $set: { 
            email: `upsert${index}@example.com`,
            status: 'new'
          },
          $setOnInsert: { createdAt: new Date() }
        }
      }))
    ];

    const mixedResult = await processor.mixedBulkOperations('user_profiles', mixedOperations);
    console.log('Mixed Operations Result:', mixedResult);

    // Performance metrics
    console.log('\n=== Performance Metrics ===');
    console.log(processor.getPerformanceMetrics());

    return processor;

  } catch (error) {
    console.error('Bulk operations demonstration failed:', error);
    throw error;
  }
};

// Benefits of MongoDB Bulk Operations:
// - Dramatically improved throughput (10x-100x faster than individual operations)
// - Reduced network overhead with batch processing
// - Built-in error handling for partial failures
// - Flexible operation mixing (insert, update, delete in same batch)
// - Automatic optimization and connection pooling
// - Support for ordered and unordered operations
// - Native upsert capabilities with conflict resolution
// - Comprehensive result reporting and metrics
// - Memory-efficient processing of large datasets
// - Integration with MongoDB's write concerns and read preferences

module.exports = {
  HighThroughputBulkProcessor,
  demonstrateBulkOperations
};

Understanding MongoDB Bulk Operation Performance Patterns

Advanced Bulk Processing Strategies

Implement sophisticated bulk processing patterns for different high-volume scenarios:

// Advanced bulk processing patterns and optimization strategies
class ProductionBulkProcessor extends HighThroughputBulkProcessor {
  constructor(db, config = {}) {
    super(db, config);
    this.processingQueue = [];
    this.workerPool = [];
    this.compressionEnabled = config.compressionEnabled || false;
    this.retryQueue = [];
    this.deadLetterQueue = [];
  }

  async setupStreamingBulkProcessor() {
    console.log('Setting up streaming bulk processor for continuous data ingestion...');

    const streamProcessor = {
      bufferSize: this.config.streamBufferSize || 1000,
      flushInterval: this.config.streamFlushInterval || 5000, // 5 seconds
      buffer: [],
      lastFlush: Date.now(),
      totalProcessed: 0
    };

    // Streaming insert processor
    const streamingInsert = async (documents, collectionName) => {
      streamProcessor.buffer.push(...documents);

      const shouldFlush = streamProcessor.buffer.length >= streamProcessor.bufferSize ||
                         (Date.now() - streamProcessor.lastFlush) >= streamProcessor.flushInterval;

      if (shouldFlush && streamProcessor.buffer.length > 0) {
        const toProcess = [...streamProcessor.buffer];
        streamProcessor.buffer = [];
        streamProcessor.lastFlush = Date.now();

        try {
          const result = await this.bulkInsertOptimized(collectionName, toProcess, {
            enableProgressReporting: false
          });

          streamProcessor.totalProcessed += result.totalInserted;
          console.log(`Streamed ${result.totalInserted} documents, total: ${streamProcessor.totalProcessed}`);

          return result;
        } catch (error) {
          console.error('Streaming insert error:', error);
          // Add failed documents to retry queue
          this.retryQueue.push(...toProcess);
        }
      }
    };

    // Automatic flushing interval
    const flushInterval = setInterval(async () => {
      if (streamProcessor.buffer.length > 0) {
        await streamingInsert([], 'user_transactions'); // Flush buffer
      }
    }, streamProcessor.flushInterval);

    return {
      streamingInsert,
      getStats: () => ({
        bufferSize: streamProcessor.buffer.length,
        totalProcessed: streamProcessor.totalProcessed,
        lastFlush: streamProcessor.lastFlush
      }),
      shutdown: () => {
        clearInterval(flushInterval);
        return streamingInsert([], 'user_transactions'); // Final flush
      }
    };
  }

  async setupPriorityQueueProcessor() {
    console.log('Setting up priority queue bulk processor...');

    const priorityLevels = {
      CRITICAL: 1,
      HIGH: 2, 
      NORMAL: 3,
      LOW: 4
    };

    const priorityQueues = new Map();
    Object.values(priorityLevels).forEach(level => {
      priorityQueues.set(level, []);
    });

    const processQueue = async () => {
      // Process queues by priority
      for (const [priority, queue] of priorityQueues.entries()) {
        if (queue.length === 0) continue;

        const batchSize = this.calculateDynamicBatchSize(priority, queue.length);
        const batch = queue.splice(0, batchSize);

        if (batch.length > 0) {
          try {
            await this.processPriorityBatch(batch, priority);
          } catch (error) {
            console.error(`Priority ${priority} batch failed:`, error);
            // Re-queue with lower priority or move to retry queue
            this.handlePriorityFailure(batch, priority);
          }
        }
      }
    };

    // Start priority processor
    const processorInterval = setInterval(processQueue, 1000);

    return {
      addToPriorityQueue: (operations, priority = priorityLevels.NORMAL) => {
        if (!priorityQueues.has(priority)) {
          throw new Error(`Invalid priority level: ${priority}`);
        }
        priorityQueues.get(priority).push(...operations);
      },
      getQueueStats: () => {
        const stats = {};
        for (const [priority, queue] of priorityQueues.entries()) {
          stats[`priority_${priority}`] = queue.length;
        }
        return stats;
      },
      shutdown: () => clearInterval(processorInterval)
    };
  }

  calculateDynamicBatchSize(priority, queueLength) {
    // Adjust batch size based on priority and queue length
    const baseBatchSize = this.config.batchSize;

    switch (priority) {
      case 1: // CRITICAL - smaller batches for faster processing
        return Math.min(baseBatchSize / 2, queueLength);
      case 2: // HIGH - normal batch size
        return Math.min(baseBatchSize, queueLength);
      case 3: // NORMAL - larger batches for efficiency
        return Math.min(baseBatchSize * 1.5, queueLength);
      case 4: // LOW - maximum batch size for throughput
        return Math.min(baseBatchSize * 2, queueLength);
      default:
        return Math.min(baseBatchSize, queueLength);
    }
  }

  async setupAdaptiveBatchSizing() {
    console.log('Setting up adaptive batch sizing system...');

    const adaptiveConfig = {
      minBatchSize: 100,
      maxBatchSize: 20000,
      targetLatency: 2000, // 2 seconds
      adjustmentFactor: 0.1,
      performanceHistory: []
    };

    const adjustBatchSize = (currentSize, latency, throughput) => {
      let newSize = currentSize;

      if (latency > adaptiveConfig.targetLatency) {
        // Latency too high - reduce batch size
        newSize = Math.max(
          adaptiveConfig.minBatchSize,
          Math.floor(currentSize * (1 - adaptiveConfig.adjustmentFactor))
        );
      } else if (latency < adaptiveConfig.targetLatency * 0.5) {
        // Latency good - try increasing batch size for better throughput
        newSize = Math.min(
          adaptiveConfig.maxBatchSize,
          Math.floor(currentSize * (1 + adaptiveConfig.adjustmentFactor))
        );
      }

      adaptiveConfig.performanceHistory.push({
        timestamp: new Date(),
        batchSize: currentSize,
        latency,
        throughput,
        newBatchSize: newSize
      });

      // Keep only last 50 measurements
      if (adaptiveConfig.performanceHistory.length > 50) {
        adaptiveConfig.performanceHistory.shift();
      }

      return newSize;
    };

    return {
      getOptimalBatchSize: (operationType, currentLatency, currentThroughput) => {
        return adjustBatchSize(this.config.batchSize, currentLatency, currentThroughput);
      },
      getPerformanceHistory: () => adaptiveConfig.performanceHistory,
      getConfig: () => adaptiveConfig
    };
  }

  async setupCompressedBulkOperations() {
    console.log('Setting up compressed bulk operations...');

    if (!this.compressionEnabled) {
      console.log('Compression not enabled, skipping setup');
      return null;
    }

    const zlib = require('zlib');

    const compressDocuments = async (documents) => {
      const serialized = JSON.stringify(documents);
      return new Promise((resolve, reject) => {
        zlib.gzip(serialized, (error, compressed) => {
          if (error) reject(error);
          else resolve(compressed);
        });
      });
    };

    const decompressDocuments = async (compressed) => {
      return new Promise((resolve, reject) => {
        zlib.gunzip(compressed, (error, decompressed) => {
          if (error) reject(error);
          else resolve(JSON.parse(decompressed.toString()));
        });
      });
    };

    return {
      compressedBulkInsert: async (collectionName, documents) => {
        const startCompress = Date.now();
        const compressed = await compressDocuments(documents);
        const compressTime = Date.now() - startCompress;

        const originalSize = JSON.stringify(documents).length;
        const compressedSize = compressed.length;
        const compressionRatio = (compressedSize / originalSize * 100).toFixed(2);

        console.log(`Compression: ${originalSize} -> ${compressedSize} bytes (${compressionRatio}%) in ${compressTime}ms`);

        // For demo - in practice you'd send compressed data to a queue or storage
        const decompressed = await decompressDocuments(compressed);
        return await this.bulkInsertOptimized(collectionName, decompressed);
      },

      compressionStats: (documents) => {
        const originalSize = JSON.stringify(documents).length;
        return {
          originalSizeBytes: originalSize,
          estimatedCompressionRatio: '60-80%', // Typical JSON compression
          potentialSavings: `${Math.round(originalSize * 0.3)} bytes`
        };
      }
    };
  }

  async setupRetryMechanism() {
    console.log('Setting up bulk operation retry mechanism...');

    const retryConfig = {
      maxRetries: 3,
      backoffMultiplier: 2,
      baseDelay: 1000,
      maxDelay: 30000,
      retryableErrors: [
        'NetworkTimeout',
        'ConnectionPoolClosed',
        'WriteConcernError'
      ]
    };

    const isRetryableError = (error) => {
      return retryConfig.retryableErrors.some(retryableError => 
        error.message.includes(retryableError)
      );
    };

    const calculateDelay = (attempt) => {
      const delay = retryConfig.baseDelay * Math.pow(retryConfig.backoffMultiplier, attempt - 1);
      return Math.min(delay, retryConfig.maxDelay);
    };

    const retryOperation = async (operation, attempt = 1) => {
      try {
        return await operation();
      } catch (error) {
        if (attempt >= retryConfig.maxRetries || !isRetryableError(error)) {
          console.error(`Operation failed after ${attempt} attempts:`, error.message);
          throw error;
        }

        const delay = calculateDelay(attempt);
        console.log(`Retry attempt ${attempt}/${retryConfig.maxRetries} after ${delay}ms delay`);

        await new Promise(resolve => setTimeout(resolve, delay));
        return await retryOperation(operation, attempt + 1);
      }
    };

    return {
      retryBulkOperation: retryOperation,
      isRetryable: isRetryableError,
      getRetryConfig: () => retryConfig
    };
  }

  async setupBulkOperationMonitoring() {
    console.log('Setting up comprehensive bulk operation monitoring...');

    const monitoring = {
      activeOperations: new Map(),
      operationHistory: [],
      alerts: [],
      thresholds: {
        maxLatency: 10000, // 10 seconds
        minThroughput: 1000, // 1000 ops/sec
        maxErrorRate: 0.05 // 5%
      }
    };

    const trackOperation = (operationId, operationType, startTime) => {
      monitoring.activeOperations.set(operationId, {
        type: operationType,
        startTime,
        status: 'running'
      });
    };

    const completeOperation = (operationId, result) => {
      const operation = monitoring.activeOperations.get(operationId);
      if (!operation) return;

      const endTime = Date.now();
      const duration = endTime - operation.startTime;

      const historyEntry = {
        operationId,
        type: operation.type,
        duration,
        success: result.success,
        throughput: result.throughput,
        errorCount: result.errors?.length || 0,
        timestamp: new Date(endTime)
      };

      monitoring.operationHistory.push(historyEntry);
      monitoring.activeOperations.delete(operationId);

      // Keep only last 1000 history entries
      if (monitoring.operationHistory.length > 1000) {
        monitoring.operationHistory.shift();
      }

      // Check for alerts
      this.checkPerformanceAlerts(historyEntry, monitoring);
    };

    return {
      trackOperation,
      completeOperation,
      getActiveOperations: () => Array.from(monitoring.activeOperations.entries()),
      getOperationHistory: () => monitoring.operationHistory,
      getAlerts: () => monitoring.alerts,
      getPerformanceSummary: () => this.generatePerformanceSummary(monitoring)
    };
  }

  checkPerformanceAlerts(operation, monitoring) {
    const alerts = [];

    // Latency alert
    if (operation.duration > monitoring.thresholds.maxLatency) {
      alerts.push({
        type: 'HIGH_LATENCY',
        message: `Operation ${operation.operationId} took ${operation.duration}ms (threshold: ${monitoring.thresholds.maxLatency}ms)`,
        severity: 'warning',
        timestamp: new Date()
      });
    }

    // Throughput alert
    if (operation.throughput < monitoring.thresholds.minThroughput) {
      alerts.push({
        type: 'LOW_THROUGHPUT',
        message: `Operation ${operation.operationId} achieved ${operation.throughput} ops/sec (threshold: ${monitoring.thresholds.minThroughput} ops/sec)`,
        severity: 'warning',
        timestamp: new Date()
      });
    }

    // Error rate alert
    const recentOperations = monitoring.operationHistory.slice(-10);
    const errorRate = recentOperations.reduce((sum, op) => sum + (op.success ? 0 : 1), 0) / recentOperations.length;

    if (errorRate > monitoring.thresholds.maxErrorRate) {
      alerts.push({
        type: 'HIGH_ERROR_RATE',
        message: `Error rate ${(errorRate * 100).toFixed(1)}% exceeds threshold ${monitoring.thresholds.maxErrorRate * 100}%`,
        severity: 'critical',
        timestamp: new Date()
      });
    }

    monitoring.alerts.push(...alerts);

    // Keep only last 100 alerts
    if (monitoring.alerts.length > 100) {
      monitoring.alerts.splice(0, monitoring.alerts.length - 100);
    }
  }

  generatePerformanceSummary(monitoring) {
    const recentOperations = monitoring.operationHistory.slice(-50);

    if (recentOperations.length === 0) {
      return { message: 'No recent operations' };
    }

    const avgDuration = recentOperations.reduce((sum, op) => sum + op.duration, 0) / recentOperations.length;
    const avgThroughput = recentOperations.reduce((sum, op) => sum + op.throughput, 0) / recentOperations.length;
    const successRate = recentOperations.reduce((sum, op) => sum + (op.success ? 1 : 0), 0) / recentOperations.length;

    return {
      recentOperations: recentOperations.length,
      averageDuration: Math.round(avgDuration),
      averageThroughput: Math.round(avgThroughput),
      successRate: (successRate * 100).toFixed(1) + '%',
      activeOperations: monitoring.activeOperations.size,
      recentAlerts: monitoring.alerts.filter(alert => 
        Date.now() - alert.timestamp.getTime() < 300000 // Last 5 minutes
      ).length
    };
  }

  async demonstrateAdvancedPatterns() {
    console.log('\n=== Advanced Bulk Processing Patterns Demo ===');

    try {
      // Setup advanced processors
      const streamProcessor = await this.setupStreamingBulkProcessor();
      const priorityProcessor = await this.setupPriorityQueueProcessor();
      const adaptiveSizing = await this.setupAdaptiveBatchSizing();
      const retryMechanism = await this.setupRetryMechanism();
      const monitoring = await this.setupBulkOperationMonitoring();

      // Demo streaming processing
      console.log('\n--- Streaming Processing Demo ---');
      const streamingData = Array.from({ length: 2500 }, (_, i) => ({
        _id: new ObjectId(),
        streamId: `stream_${i}`,
        data: `streaming_data_${i}`,
        timestamp: new Date()
      }));

      // Add data to stream in chunks
      for (let i = 0; i < streamingData.length; i += 300) {
        const chunk = streamingData.slice(i, i + 300);
        await streamProcessor.streamingInsert(chunk, 'user_transactions');
        await new Promise(resolve => setTimeout(resolve, 100)); // Simulate streaming delay
      }

      console.log('Streaming stats:', streamProcessor.getStats());

      // Demo priority processing
      console.log('\n--- Priority Processing Demo ---');
      const criticalOps = Array.from({ length: 50 }, (_, i) => ({
        type: 'insert',
        document: { priority: 'critical', id: i, timestamp: new Date() }
      }));

      const normalOps = Array.from({ length: 200 }, (_, i) => ({
        type: 'insert', 
        document: { priority: 'normal', id: i + 1000, timestamp: new Date() }
      }));

      priorityProcessor.addToPriorityQueue(criticalOps, 1); // CRITICAL
      priorityProcessor.addToPriorityQueue(normalOps, 3);   // NORMAL

      await new Promise(resolve => setTimeout(resolve, 3000)); // Let priority processor work
      console.log('Priority queue stats:', priorityProcessor.getQueueStats());

      // Cleanup
      await streamProcessor.shutdown();
      priorityProcessor.shutdown();

      return {
        streamingDemo: streamProcessor.getStats(),
        priorityDemo: priorityProcessor.getQueueStats(),
        adaptiveConfig: adaptiveSizing.getConfig()
      };

    } catch (error) {
      console.error('Advanced patterns demo failed:', error);
      throw error;
    }
  }
}

SQL-Style Bulk Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB bulk operations:

-- QueryLeaf bulk operations with SQL-familiar syntax

-- Bulk INSERT operations
INSERT INTO user_transactions 
SELECT 
    UUID() as _id,
    user_id,
    transaction_amount as amount,
    transaction_type,
    transaction_date,
    JSON_BUILD_OBJECT(
        'category', category,
        'channel', channel,
        'location', location
    ) as metadata,
    CURRENT_TIMESTAMP as created_at
FROM staging_transactions
WHERE processing_status = 'pending'
LIMIT 100000;

-- Batch configuration for optimal performance
SET BULK_INSERT_BATCH_SIZE = 10000;
SET BULK_INSERT_ORDERED = false;
SET BULK_INSERT_WRITE_CONCERN = JSON_BUILD_OBJECT('w', 'majority', 'j', true);

-- Advanced bulk INSERT with conflict resolution
INSERT INTO user_profiles (user_id, email, profile_data, created_at)
SELECT 
    user_id,
    email,
    JSON_BUILD_OBJECT(
        'first_name', first_name,
        'last_name', last_name,
        'preferences', preferences,
        'registration_source', source
    ) as profile_data,
    registration_date as created_at
FROM user_registrations
WHERE processed = false
ON DUPLICATE KEY UPDATE 
    profile_data = JSON_MERGE_PATCH(profile_data, VALUES(profile_data)),
    last_updated = CURRENT_TIMESTAMP,
    update_count = COALESCE(update_count, 0) + 1;

-- Bulk UPDATE operations with complex conditions
UPDATE user_transactions 
SET 
    status = CASE 
        WHEN amount > 1000 AND transaction_type = 'purchase' THEN 'requires_approval'
        WHEN transaction_date < CURRENT_DATE - INTERVAL '30 days' THEN 'archived'
        WHEN metadata->>'$.category' = 'refund' THEN 'processed'
        ELSE 'completed'
    END,
    processing_fee = CASE
        WHEN amount > 500 THEN amount * 0.025
        WHEN amount > 100 THEN amount * 0.035
        ELSE amount * 0.05
    END,
    risk_score = CASE
        WHEN user_id IN (SELECT user_id FROM high_risk_users) THEN 100
        WHEN amount > 2000 THEN 75
        WHEN metadata->>'$.channel' = 'api' THEN 50
        ELSE 25
    END,
    last_updated = CURRENT_TIMESTAMP
WHERE status = 'pending'
    AND created_at >= CURRENT_DATE - INTERVAL '7 days'
BULK_OPTIONS (
    batch_size = 5000,
    ordered = false,
    write_concern = JSON_BUILD_OBJECT('w', 'majority')
);

-- Bulk UPSERT operations for data synchronization
UPSERT INTO user_analytics (
    user_id,
    daily_stats,
    calculation_date,
    last_updated
)
WITH daily_calculations AS (
    SELECT 
        user_id,
        DATE_TRUNC('day', transaction_date) as calculation_date,

        -- Aggregate daily statistics
        JSON_BUILD_OBJECT(
            'total_transactions', COUNT(*),
            'total_amount', SUM(amount),
            'avg_transaction', ROUND(AVG(amount)::NUMERIC, 2),
            'transaction_types', JSON_AGG(DISTINCT transaction_type),
            'categories', JSON_AGG(DISTINCT metadata->>'$.category'),
            'channels', JSON_AGG(DISTINCT metadata->>'$.channel'),

            -- Advanced metrics
            'largest_transaction', MAX(amount),
            'smallest_transaction', MIN(amount),
            'morning_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 6 AND 11),
            'afternoon_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 12 AND 17),
            'evening_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 18 AND 23),
            'night_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 0 AND 5),

            -- Spending patterns
            'high_value_transactions', COUNT(*) FILTER (WHERE amount > 500),
            'refund_count', COUNT(*) FILTER (WHERE transaction_type = 'refund'),
            'refund_amount', SUM(amount) FILTER (WHERE transaction_type = 'refund')
        ) as daily_stats

    FROM user_transactions
    WHERE transaction_date >= CURRENT_DATE - INTERVAL '30 days'
        AND status = 'completed'
    GROUP BY user_id, DATE_TRUNC('day', transaction_date)
)
SELECT 
    user_id,
    daily_stats,
    calculation_date,
    CURRENT_TIMESTAMP as last_updated
FROM daily_calculations
ON CONFLICT (user_id, calculation_date) DO UPDATE
SET 
    daily_stats = EXCLUDED.daily_stats,
    last_updated = EXCLUDED.last_updated,
    version = COALESCE(version, 0) + 1
BULK_OPTIONS (
    batch_size = 2000,
    ordered = false,
    enable_upsert = true
);

-- High-performance bulk DELETE with cascading
DELETE FROM user_sessions us
WHERE us.session_id IN (
    SELECT session_id 
    FROM expired_sessions 
    WHERE expiry_date < CURRENT_TIMESTAMP - INTERVAL '7 days'
)
BULK_OPTIONS (
    batch_size = 10000,
    cascade_delete = JSON_ARRAY(
        JSON_BUILD_OBJECT('collection', 'user_activity_logs', 'field', 'session_id'),
        JSON_BUILD_OBJECT('collection', 'session_analytics', 'field', 'session_id')
    )
);

-- Mixed bulk operations in single transaction
START BULK_TRANSACTION;

-- Insert new user activities
INSERT INTO user_activities (user_id, activity_type, activity_data, timestamp)
SELECT 
    user_id,
    'login' as activity_type,
    JSON_BUILD_OBJECT(
        'ip_address', ip_address,
        'user_agent', user_agent,
        'location', location
    ) as activity_data,
    login_timestamp as timestamp
FROM pending_logins
WHERE processed = false;

-- Update user login statistics
UPDATE user_profiles 
SET 
    last_login_date = (
        SELECT MAX(timestamp) 
        FROM user_activities ua 
        WHERE ua.user_id = user_profiles.user_id 
            AND ua.activity_type = 'login'
    ),
    login_count = COALESCE(login_count, 0) + (
        SELECT COUNT(*) 
        FROM pending_logins pl 
        WHERE pl.user_id = user_profiles.user_id 
            AND pl.processed = false
    ),
    profile_updated_at = CURRENT_TIMESTAMP
WHERE user_id IN (SELECT DISTINCT user_id FROM pending_logins WHERE processed = false);

-- Mark source data as processed
UPDATE pending_logins 
SET 
    processed = true,
    processed_at = CURRENT_TIMESTAMP
WHERE processed = false;

COMMIT BULK_TRANSACTION
WITH ROLLBACK_ON_ERROR = true;

-- Advanced bulk operation monitoring and analytics
WITH bulk_operation_metrics AS (
    SELECT 
        operation_type,
        collection_name,
        DATE_TRUNC('hour', operation_timestamp) as hour_bucket,

        -- Volume metrics
        COUNT(*) as operation_count,
        SUM(documents_processed) as total_documents,
        SUM(batch_count) as total_batches,
        AVG(documents_processed) as avg_documents_per_operation,

        -- Performance metrics
        AVG(execution_time_ms) as avg_execution_time,
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY execution_time_ms) as median_execution_time,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY execution_time_ms) as p95_execution_time,
        MAX(execution_time_ms) as max_execution_time,

        -- Throughput calculations
        AVG(throughput_ops_per_sec) as avg_throughput,
        MAX(throughput_ops_per_sec) as peak_throughput,
        SUM(documents_processed) / GREATEST(SUM(execution_time_ms) / 1000.0, 1) as overall_throughput,

        -- Error tracking
        COUNT(*) FILTER (WHERE success = false) as failed_operations,
        SUM(error_count) as total_errors,
        AVG(error_count) FILTER (WHERE error_count > 0) as avg_errors_per_failed_op,

        -- Resource utilization
        AVG(memory_usage_mb) as avg_memory_usage,
        MAX(memory_usage_mb) as peak_memory_usage,
        AVG(cpu_usage_percent) as avg_cpu_usage

    FROM bulk_operation_logs 
    WHERE operation_timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    GROUP BY operation_type, collection_name, DATE_TRUNC('hour', operation_timestamp)
),
performance_analysis AS (
    SELECT *,
        -- Performance classification
        CASE 
            WHEN avg_throughput >= 10000 THEN 'excellent'
            WHEN avg_throughput >= 5000 THEN 'good'
            WHEN avg_throughput >= 1000 THEN 'acceptable'
            ELSE 'needs_optimization'
        END as throughput_rating,

        -- Reliability assessment
        CASE 
            WHEN failed_operations = 0 THEN 'perfect'
            WHEN failed_operations::numeric / operation_count < 0.01 THEN 'excellent'
            WHEN failed_operations::numeric / operation_count < 0.05 THEN 'good'
            ELSE 'needs_attention'
        END as reliability_rating,

        -- Resource efficiency
        CASE 
            WHEN avg_memory_usage <= 100 AND avg_cpu_usage <= 50 THEN 'efficient'
            WHEN avg_memory_usage <= 500 AND avg_cpu_usage <= 75 THEN 'moderate'
            ELSE 'resource_intensive'
        END as resource_efficiency,

        -- Performance trends
        LAG(avg_throughput, 1) OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY hour_bucket
        ) as prev_hour_throughput,

        LAG(p95_execution_time, 1) OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY hour_bucket
        ) as prev_hour_p95_latency

    FROM bulk_operation_metrics
)
SELECT 
    operation_type,
    collection_name,
    hour_bucket,

    -- Volume summary
    operation_count,
    total_documents,
    ROUND(avg_documents_per_operation::NUMERIC, 0) as avg_docs_per_op,

    -- Performance summary
    ROUND(avg_execution_time::NUMERIC, 0) as avg_time_ms,
    ROUND(p95_execution_time::NUMERIC, 0) as p95_time_ms,
    ROUND(avg_throughput::NUMERIC, 0) as avg_throughput,
    ROUND(overall_throughput::NUMERIC, 0) as measured_throughput,

    -- Quality indicators
    throughput_rating,
    reliability_rating,
    resource_efficiency,

    -- Error statistics
    failed_operations,
    total_errors,
    ROUND((failed_operations::numeric / operation_count * 100)::NUMERIC, 2) as error_rate_pct,

    -- Resource usage
    ROUND(avg_memory_usage::NUMERIC, 1) as avg_memory_mb,
    ROUND(avg_cpu_usage::NUMERIC, 1) as avg_cpu_pct,

    -- Performance trends
    CASE 
        WHEN prev_hour_throughput IS NOT NULL AND avg_throughput > prev_hour_throughput * 1.1 THEN 'improving'
        WHEN prev_hour_throughput IS NOT NULL AND avg_throughput < prev_hour_throughput * 0.9 THEN 'degrading'
        ELSE 'stable'
    END as throughput_trend,

    CASE 
        WHEN prev_hour_p95_latency IS NOT NULL AND p95_execution_time > prev_hour_p95_latency * 1.2 THEN 'latency_increasing'
        WHEN prev_hour_p95_latency IS NOT NULL AND p95_execution_time < prev_hour_p95_latency * 0.8 THEN 'latency_improving'
        ELSE 'latency_stable'
    END as latency_trend,

    -- Optimization recommendations
    CASE 
        WHEN throughput_rating = 'needs_optimization' AND resource_efficiency = 'efficient' THEN 'Increase batch size or parallelism'
        WHEN throughput_rating = 'needs_optimization' AND resource_efficiency = 'resource_intensive' THEN 'Optimize query patterns or reduce batch size'
        WHEN reliability_rating = 'needs_attention' THEN 'Review error handling and retry logic'
        WHEN resource_efficiency = 'resource_intensive' THEN 'Consider memory optimization or connection pooling'
        ELSE 'Performance within acceptable parameters'
    END as optimization_recommendation

FROM performance_analysis
WHERE hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '12 hours'
ORDER BY hour_bucket DESC, total_documents DESC;

-- Adaptive batch size optimization query
WITH batch_performance_analysis AS (
    SELECT 
        operation_type,
        collection_name,
        batch_size,

        -- Performance metrics per batch size
        COUNT(*) as operation_count,
        AVG(execution_time_ms) as avg_execution_time,
        AVG(throughput_ops_per_sec) as avg_throughput,
        STDDEV(throughput_ops_per_sec) as throughput_variance,

        -- Error rates
        COUNT(*) FILTER (WHERE success = false) as failed_ops,
        AVG(error_count) as avg_errors,

        -- Resource utilization
        AVG(memory_usage_mb) as avg_memory,
        AVG(cpu_usage_percent) as avg_cpu,

        -- Efficiency calculation
        AVG(throughput_ops_per_sec) / GREATEST(AVG(memory_usage_mb), 1) as memory_efficiency,
        AVG(throughput_ops_per_sec) / GREATEST(AVG(cpu_usage_percent), 1) as cpu_efficiency

    FROM bulk_operation_logs
    WHERE operation_timestamp >= CURRENT_TIMESTAMP - INTERVAL '7 days'
    GROUP BY operation_type, collection_name, batch_size
),
optimal_batch_analysis AS (
    SELECT *,
        -- Rank batch sizes by different criteria
        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY avg_throughput DESC
        ) as throughput_rank,

        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY avg_execution_time ASC
        ) as latency_rank,

        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY memory_efficiency DESC
        ) as memory_efficiency_rank,

        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY failed_ops ASC, avg_errors ASC
        ) as reliability_rank,

        -- Calculate composite score
        (
            -- Throughput weight: 40%
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY avg_throughput DESC) * 0.4) +
            -- Latency weight: 30%  
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY avg_execution_time ASC) * 0.3) +
            -- Reliability weight: 20%
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY failed_ops ASC) * 0.2) +
            -- Efficiency weight: 10%
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY memory_efficiency DESC) * 0.1)
        ) as composite_score

    FROM batch_performance_analysis
    WHERE operation_count >= 5 -- Minimum sample size for reliability
)
SELECT 
    operation_type,
    collection_name,
    batch_size,
    operation_count,

    -- Performance metrics
    ROUND(avg_execution_time::NUMERIC, 0) as avg_time_ms,
    ROUND(avg_throughput::NUMERIC, 0) as avg_throughput,
    ROUND(throughput_variance::NUMERIC, 0) as throughput_std_dev,

    -- Rankings
    throughput_rank,
    latency_rank,
    reliability_rank,
    ROUND(composite_score::NUMERIC, 2) as composite_score,

    -- Recommendations
    CASE 
        WHEN ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY composite_score ASC) = 1 
        THEN 'OPTIMAL - Recommended batch size'
        WHEN throughput_rank <= 2 THEN 'HIGH_THROUGHPUT - Consider for bulk operations'
        WHEN latency_rank <= 2 THEN 'LOW_LATENCY - Consider for real-time operations'  
        WHEN reliability_rank <= 2 THEN 'HIGH_RELIABILITY - Consider for critical operations'
        ELSE 'SUBOPTIMAL - Not recommended'
    END as recommendation,

    -- Resource usage
    ROUND(avg_memory::NUMERIC, 1) as avg_memory_mb,
    ROUND(avg_cpu::NUMERIC, 1) as avg_cpu_pct,

    -- Quality indicators
    failed_ops,
    ROUND((failed_ops::numeric / operation_count * 100)::NUMERIC, 2) as error_rate_pct,

    -- Next steps
    CASE 
        WHEN batch_size < 1000 AND throughput_rank > 3 THEN 'Try larger batch size (2000-5000)'
        WHEN batch_size > 10000 AND reliability_rank > 3 THEN 'Try smaller batch size (5000-8000)'
        WHEN throughput_variance > avg_throughput * 0.5 THEN 'Inconsistent performance - review system load'
        ELSE 'Batch size appears well-tuned'
    END as tuning_suggestion

FROM optimal_batch_analysis
ORDER BY operation_type, collection_name, composite_score ASC;

-- QueryLeaf provides comprehensive bulk operation capabilities:
-- 1. High-performance bulk INSERT, UPDATE, DELETE, and UPSERT operations
-- 2. Advanced batch processing with configurable batch sizes and write concerns  
-- 3. Mixed operation support within single transactions
-- 4. Comprehensive error handling and partial failure recovery
-- 5. Real-time monitoring and performance analytics
-- 6. Adaptive batch size optimization based on performance metrics
-- 7. Resource usage tracking and efficiency analysis
-- 8. SQL-familiar syntax for complex bulk operations
-- 9. Integration with MongoDB's native bulk operation optimizations
-- 10. Production-ready patterns for high-volume data processing

Best Practices for MongoDB Bulk Operations

Performance Optimization Strategies

Essential techniques for maximizing bulk operation throughput:

  1. Batch Size Optimization: Start with 1,000-10,000 documents per batch and adjust based on document size and system resources
  2. Unordered Operations: Use unordered bulk operations when possible to allow parallel processing and partial failure handling
  3. Write Concern Tuning: Balance durability and performance by configuring appropriate write concerns for your use case
  4. Index Strategy: Ensure optimal indexes exist before bulk operations, but consider temporarily dropping non-essential indexes for large imports
  5. Connection Pooling: Configure adequate connection pools to handle concurrent bulk operations efficiently
  6. Memory Management: Monitor memory usage and adjust batch sizes to avoid memory pressure and garbage collection overhead

Operational Excellence

Implement robust operational practices for production bulk processing:

  1. Error Handling: Design comprehensive error handling with retry logic for transient failures and dead letter queues for persistent errors
  2. Progress Monitoring: Implement detailed progress tracking and monitoring for long-running bulk operations
  3. Resource Monitoring: Monitor CPU, memory, and I/O usage during bulk operations to identify bottlenecks
  4. Graceful Degradation: Design fallback mechanisms and circuit breakers for bulk operation failures
  5. Testing at Scale: Test bulk operations with production-size datasets to validate performance and reliability
  6. Documentation: Maintain comprehensive documentation of bulk operation patterns, configurations, and troubleshooting procedures

Conclusion

MongoDB bulk operations provide exceptional capabilities for high-throughput data processing that far exceed traditional single-operation approaches. The combination of flexible batch processing, intelligent error handling, and comprehensive monitoring makes MongoDB an ideal platform for applications requiring efficient bulk data management.

Key bulk operation benefits include:

  • Dramatic Performance Improvements: 10x-100x faster processing compared to individual operations
  • Intelligent Batch Processing: Configurable batch sizes with automatic optimization and adaptive sizing
  • Robust Error Handling: Partial failure recovery and comprehensive error reporting
  • Flexible Operation Mixing: Support for mixed INSERT, UPDATE, DELETE, and UPSERT operations in single batches
  • Production-Ready Features: Built-in monitoring, retry mechanisms, and resource management
  • Scalable Architecture: Seamless scaling across replica sets and sharded clusters

Whether you're processing data migrations, real-time analytics ingestion, or high-volume transaction processing, MongoDB bulk operations with QueryLeaf's familiar SQL interface provide the foundation for efficient, scalable data processing solutions.

QueryLeaf Integration: QueryLeaf seamlessly manages MongoDB bulk operations while providing SQL-familiar batch processing syntax, performance optimization patterns, and comprehensive monitoring capabilities. Advanced bulk processing strategies including adaptive batch sizing, priority queues, and streaming operations are elegantly handled through familiar SQL constructs, making sophisticated high-volume data processing both powerful and accessible to SQL-oriented development teams.

The combination of MongoDB's native bulk operation capabilities with SQL-style batch processing makes it an ideal platform for applications requiring both high-throughput data processing and familiar database interaction patterns, ensuring your bulk processing solutions remain both efficient and maintainable as they scale.