Skip to content

2026

MongoDB Bulk Operations and Performance Optimization: High-Throughput Data Processing with Advanced Batch Operations and SQL-Style Bulk Operations

Modern applications frequently need to process large volumes of data efficiently, requiring sophisticated approaches to batch operations that can maintain high throughput while ensuring data consistency and optimal resource utilization. Traditional single-document operations become a significant bottleneck when dealing with thousands or millions of records, leading to performance degradation and resource exhaustion.

MongoDB's bulk operations provide powerful batch processing capabilities that can dramatically improve throughput for high-volume data operations. Unlike traditional databases that require complex orchestration for batch processing, MongoDB's bulk operations offer native support for optimized batch writes, reads, and updates with built-in error handling and performance optimization.

The Traditional High-Volume Data Processing Challenge

Conventional approaches to processing large datasets suffer from significant performance and scalability limitations:

-- Traditional PostgreSQL bulk processing - inefficient and resource-intensive

-- Standard approach using individual INSERT statements
DO $$
DECLARE
    record_data RECORD;
    batch_size INTEGER := 1000;
    processed_count INTEGER := 0;
    total_records INTEGER;
BEGIN
    -- Count total records to process
    SELECT COUNT(*) INTO total_records FROM staging_data;

    -- Process records one by one (inefficient)
    FOR record_data IN 
        SELECT id, user_id, transaction_amount, transaction_date, metadata 
        FROM staging_data 
        ORDER BY id
    LOOP
        -- Individual INSERT - causes overhead per operation
        INSERT INTO user_transactions (
            user_id,
            amount,
            transaction_date,
            metadata,
            processed_at
        ) VALUES (
            record_data.user_id,
            record_data.transaction_amount,
            record_data.transaction_date,
            record_data.metadata,
            CURRENT_TIMESTAMP
        );

        processed_count := processed_count + 1;

        -- Commit every batch_size records to avoid long transactions
        IF processed_count % batch_size = 0 THEN
            COMMIT;
            RAISE NOTICE 'Processed % of % records', processed_count, total_records;
        END IF;
    END LOOP;

    -- Final commit
    COMMIT;
    RAISE NOTICE 'Completed processing % total records', total_records;

EXCEPTION 
    WHEN OTHERS THEN
        ROLLBACK;
        RAISE NOTICE 'Error processing bulk data: %', SQLERRM;
END $$;

-- Problems with traditional single-record approach:
-- 1. Extremely high overhead - one network round-trip per operation
-- 2. Poor throughput - typically 1,000-5,000 operations/second maximum
-- 3. Resource exhaustion - excessive connection and memory usage
-- 4. Limited error handling - single failure can abort entire batch
-- 5. Lock contention - frequent commits cause index lock overhead
-- 6. No optimization for similar operations
-- 7. Difficult progress tracking and resume capability
-- 8. Poor CPU and I/O efficiency due to constant context switching

-- Attempt at bulk INSERT (better but still limited)
INSERT INTO user_transactions (user_id, amount, transaction_date, metadata, processed_at)
SELECT 
    user_id,
    transaction_amount,
    transaction_date,
    metadata,
    CURRENT_TIMESTAMP
FROM staging_data;

-- Issues with basic bulk INSERT:
-- - All-or-nothing behavior - single bad record fails entire batch
-- - No granular error reporting
-- - Limited to INSERT operations only
-- - Difficult to handle conflicts or duplicates
-- - No support for conditional operations
-- - Memory constraints with very large datasets
-- - Poor performance with complex transformations

-- MySQL limitations (even more restrictive)
INSERT INTO user_transactions (user_id, amount, transaction_date, metadata)
VALUES 
    (1001, 150.00, '2024-01-15', '{"category": "purchase"}'),
    (1002, 75.50, '2024-01-15', '{"category": "refund"}'),
    (1003, 200.00, '2024-01-15', '{"category": "purchase"}');
    -- Repeat for potentially millions of records...

-- MySQL bulk processing problems:
-- - Maximum query size limitations
-- - Poor error handling for mixed success/failure scenarios  
-- - Limited transaction size support
-- - No built-in upsert capabilities
-- - Difficult conflict resolution
-- - Poor performance with large batch sizes
-- - Memory exhaustion with complex operations

MongoDB provides comprehensive bulk operation capabilities:

// MongoDB Bulk Operations - high-performance batch processing
const { MongoClient } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('high_performance_db');

// Advanced bulk operations system for high-throughput data processing
class HighThroughputBulkProcessor {
  constructor(db, config = {}) {
    this.db = db;
    this.config = {
      batchSize: config.batchSize || 10000,
      maxParallelBatches: config.maxParallelBatches || 5,
      retryAttempts: config.retryAttempts || 3,
      retryDelay: config.retryDelay || 1000,
      enableMetrics: config.enableMetrics !== false,
      ...config
    };

    this.metrics = {
      totalOperations: 0,
      successfulOperations: 0,
      failedOperations: 0,
      batchesProcessed: 0,
      processingTime: 0,
      throughputHistory: []
    };

    this.collections = new Map();
    this.activeOperations = new Set();
  }

  async setupOptimizedCollections() {
    console.log('Setting up collections for high-performance bulk operations...');

    const collectionConfigs = [
      {
        name: 'user_transactions',
        indexes: [
          { key: { userId: 1, transactionDate: -1 } },
          { key: { transactionType: 1, status: 1 } },
          { key: { amount: 1 } },
          { key: { createdAt: -1 } }
        ],
        options: { 
          writeConcern: { w: 'majority', j: true },
          readConcern: { level: 'majority' }
        }
      },
      {
        name: 'user_profiles',
        indexes: [
          { key: { email: 1 }, unique: true },
          { key: { userId: 1 }, unique: true },
          { key: { lastLoginDate: -1 } },
          { key: { 'preferences.category': 1 } }
        ]
      },
      {
        name: 'product_analytics',
        indexes: [
          { key: { productId: 1, eventDate: -1 } },
          { key: { eventType: 1, processed: 1 } },
          { key: { userId: 1, eventDate: -1 } }
        ]
      },
      {
        name: 'audit_logs',
        indexes: [
          { key: { entityId: 1, timestamp: -1 } },
          { key: { action: 1, timestamp: -1 } },
          { key: { userId: 1, timestamp: -1 } }
        ]
      }
    ];

    for (const config of collectionConfigs) {
      const collection = this.db.collection(config.name);
      this.collections.set(config.name, collection);

      // Create indexes for optimal bulk operation performance
      for (const index of config.indexes) {
        try {
          await collection.createIndex(index.key, index.unique ? { unique: true } : {});
          console.log(`Created index on ${config.name}:`, index.key);
        } catch (error) {
          console.warn(`Index creation warning for ${config.name}:`, error.message);
        }
      }
    }

    console.log(`Initialized ${collectionConfigs.length} collections for bulk operations`);
    return this.collections;
  }

  async bulkInsertOptimized(collectionName, documents, options = {}) {
    console.log(`Starting bulk insert for ${documents.length} documents in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    if (!collection) {
      throw new Error(`Collection ${collectionName} not found. Initialize collections first.`);
    }

    const {
      batchSize = this.config.batchSize,
      ordered = false,
      writeConcern = { w: 'majority', j: true },
      enableValidation = true,
      enableProgressReporting = true
    } = options;

    let totalInserted = 0;
    let totalErrors = 0;
    const results = [];
    const errors = [];

    // Process documents in optimized batches
    for (let i = 0; i < documents.length; i += batchSize) {
      const batch = documents.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        // Validate documents if enabled
        if (enableValidation) {
          await this.validateDocumentBatch(batch, collectionName);
        }

        // Execute bulk insert with optimized settings
        const result = await collection.insertMany(batch, {
          ordered: ordered,
          writeConcern: writeConcern,
          bypassDocumentValidation: !enableValidation
        });

        totalInserted += result.insertedCount;
        results.push({
          batchNumber,
          insertedCount: result.insertedCount,
          insertedIds: result.insertedIds
        });

        // Progress reporting
        if (enableProgressReporting) {
          const progress = ((i + batch.length) / documents.length * 100).toFixed(1);
          const throughput = Math.round(totalInserted / ((Date.now() - startTime) / 1000));
          console.log(`Batch ${batchNumber}: ${result.insertedCount} inserted, ${progress}% complete, ${throughput} docs/sec`);
        }

      } catch (error) {
        totalErrors++;
        console.error(`Batch ${batchNumber} failed:`, error.message);

        // Handle partial failures in unordered mode
        if (!ordered && error.result) {
          const partialInserted = error.result.nInserted || 0;
          totalInserted += partialInserted;

          errors.push({
            batchNumber,
            error: error.message,
            insertedCount: partialInserted,
            failedOperations: error.writeErrors || []
          });
        } else {
          errors.push({
            batchNumber,
            error: error.message,
            insertedCount: 0
          });
        }
      }
    }

    const totalTime = Date.now() - startTime;
    const throughput = Math.round(totalInserted / (totalTime / 1000));

    // Update metrics
    this.updateMetrics('insert', totalInserted, totalErrors, totalTime);

    const summary = {
      success: totalErrors === 0,
      totalDocuments: documents.length,
      totalInserted: totalInserted,
      totalBatches: Math.ceil(documents.length / batchSize),
      failedBatches: totalErrors,
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };

    console.log(`Bulk insert completed: ${totalInserted}/${documents.length} documents in ${totalTime}ms (${throughput} docs/sec)`);
    return summary;
  }

  async bulkUpdateOptimized(collectionName, updateOperations, options = {}) {
    console.log(`Starting bulk update for ${updateOperations.length} operations in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    if (!collection) {
      throw new Error(`Collection ${collectionName} not found`);
    }

    const {
      batchSize = this.config.batchSize,
      ordered = false,
      writeConcern = { w: 'majority', j: true }
    } = options;

    let totalModified = 0;
    let totalMatched = 0;
    let totalUpserted = 0;
    const results = [];
    const errors = [];

    // Process operations in batches
    for (let i = 0; i < updateOperations.length; i += batchSize) {
      const batch = updateOperations.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        // Build bulk update operation
        const bulkOp = collection.initializeUnorderedBulkOp();

        for (const operation of batch) {
          const { filter, update, options: opOptions = {} } = operation;

          if (opOptions.upsert) {
            bulkOp.find(filter).upsert().updateOne(update);
          } else if (operation.type === 'updateMany') {
            bulkOp.find(filter).update(update);
          } else {
            bulkOp.find(filter).updateOne(update);
          }
        }

        // Execute bulk operation
        const result = await bulkOp.execute({ writeConcern });

        totalModified += result.modifiedCount || 0;
        totalMatched += result.matchedCount || 0;  
        totalUpserted += result.upsertedCount || 0;

        results.push({
          batchNumber,
          matchedCount: result.matchedCount,
          modifiedCount: result.modifiedCount,
          upsertedCount: result.upsertedCount,
          upsertedIds: result.upsertedIds
        });

        const progress = ((i + batch.length) / updateOperations.length * 100).toFixed(1);
        console.log(`Update batch ${batchNumber}: ${result.modifiedCount} modified, ${progress}% complete`);

      } catch (error) {
        console.error(`Update batch ${batchNumber} failed:`, error.message);

        // Handle partial results from bulk write errors
        if (error.result) {
          totalModified += error.result.nModified || 0;
          totalMatched += error.result.nMatched || 0;
          totalUpserted += error.result.nUpserted || 0;
        }

        errors.push({
          batchNumber,
          error: error.message,
          writeErrors: error.writeErrors || []
        });
      }
    }

    const totalTime = Date.now() - startTime;
    const throughput = Math.round(totalModified / (totalTime / 1000));

    this.updateMetrics('update', totalModified, errors.length, totalTime);

    return {
      success: errors.length === 0,
      totalOperations: updateOperations.length,
      totalMatched: totalMatched,
      totalModified: totalModified, 
      totalUpserted: totalUpserted,
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };
  }

  async bulkUpsertOptimized(collectionName, upsertOperations, options = {}) {
    console.log(`Starting bulk upsert for ${upsertOperations.length} operations in ${collectionName}...`);

    const {
      batchSize = this.config.batchSize,
      enableDeduplication = true
    } = options;

    // Deduplicate operations based on filter if enabled
    let processedOperations = upsertOperations;
    if (enableDeduplication) {
      processedOperations = this.deduplicateUpsertOperations(upsertOperations);
      console.log(`Deduplicated ${upsertOperations.length} operations to ${processedOperations.length}`);
    }

    // Convert upsert operations to bulk update operations
    const updateOperations = processedOperations.map(op => ({
      filter: op.filter,
      update: op.update,
      options: { upsert: true }
    }));

    return await this.bulkUpdateOptimized(collectionName, updateOperations, {
      ...options,
      batchSize
    });
  }

  async bulkDeleteOptimized(collectionName, deleteFilters, options = {}) {
    console.log(`Starting bulk delete for ${deleteFilters.length} operations in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    if (!collection) {
      throw new Error(`Collection ${collectionName} not found`);
    }

    const {
      batchSize = this.config.batchSize,
      deleteMany = false,
      writeConcern = { w: 'majority', j: true }
    } = options;

    let totalDeleted = 0;
    const results = [];
    const errors = [];

    for (let i = 0; i < deleteFilters.length; i += batchSize) {
      const batch = deleteFilters.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        const bulkOp = collection.initializeUnorderedBulkOp();

        for (const filter of batch) {
          if (deleteMany) {
            bulkOp.find(filter).delete();
          } else {
            bulkOp.find(filter).deleteOne();
          }
        }

        const result = await bulkOp.execute({ writeConcern });
        const deletedCount = result.deletedCount || 0;

        totalDeleted += deletedCount;
        results.push({
          batchNumber,
          deletedCount
        });

        const progress = ((i + batch.length) / deleteFilters.length * 100).toFixed(1);
        console.log(`Delete batch ${batchNumber}: ${deletedCount} deleted, ${progress}% complete`);

      } catch (error) {
        console.error(`Delete batch ${batchNumber} failed:`, error.message);
        errors.push({
          batchNumber,
          error: error.message
        });
      }
    }

    const totalTime = Date.now() - startTime;
    const throughput = Math.round(totalDeleted / (totalTime / 1000));

    this.updateMetrics('delete', totalDeleted, errors.length, totalTime);

    return {
      success: errors.length === 0,
      totalOperations: deleteFilters.length,
      totalDeleted: totalDeleted,
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };
  }

  async mixedBulkOperations(collectionName, operations, options = {}) {
    console.log(`Starting mixed bulk operations: ${operations.length} operations in ${collectionName}...`);
    const startTime = Date.now();

    const collection = this.collections.get(collectionName);
    const {
      batchSize = this.config.batchSize,
      writeConcern = { w: 'majority', j: true }
    } = options;

    let totalInserted = 0;
    let totalModified = 0;
    let totalDeleted = 0;
    let totalUpserted = 0;
    const results = [];
    const errors = [];

    for (let i = 0; i < operations.length; i += batchSize) {
      const batch = operations.slice(i, i + batchSize);
      const batchNumber = Math.floor(i / batchSize) + 1;

      try {
        const bulkOp = collection.initializeUnorderedBulkOp();

        for (const operation of batch) {
          switch (operation.type) {
            case 'insert':
              bulkOp.insert(operation.document);
              break;

            case 'update':
              bulkOp.find(operation.filter).updateOne(operation.update);
              break;

            case 'updateMany':
              bulkOp.find(operation.filter).update(operation.update);
              break;

            case 'upsert':
              bulkOp.find(operation.filter).upsert().updateOne(operation.update);
              break;

            case 'delete':
              bulkOp.find(operation.filter).deleteOne();
              break;

            case 'deleteMany':
              bulkOp.find(operation.filter).delete();
              break;

            case 'replace':
              bulkOp.find(operation.filter).replaceOne(operation.replacement);
              break;

            default:
              throw new Error(`Unknown operation type: ${operation.type}`);
          }
        }

        const result = await bulkOp.execute({ writeConcern });

        totalInserted += result.insertedCount || 0;
        totalModified += result.modifiedCount || 0;
        totalDeleted += result.deletedCount || 0;
        totalUpserted += result.upsertedCount || 0;

        results.push({
          batchNumber,
          insertedCount: result.insertedCount,
          modifiedCount: result.modifiedCount,
          deletedCount: result.deletedCount,
          upsertedCount: result.upsertedCount,
          matchedCount: result.matchedCount
        });

        const progress = ((i + batch.length) / operations.length * 100).toFixed(1);
        console.log(`Mixed batch ${batchNumber}: ${batch.length} operations completed, ${progress}% complete`);

      } catch (error) {
        console.error(`Mixed batch ${batchNumber} failed:`, error.message);
        errors.push({
          batchNumber,
          error: error.message,
          writeErrors: error.writeErrors || []
        });
      }
    }

    const totalTime = Date.now() - startTime;
    const totalOperations = totalInserted + totalModified + totalDeleted;
    const throughput = Math.round(totalOperations / (totalTime / 1000));

    this.updateMetrics('mixed', totalOperations, errors.length, totalTime);

    return {
      success: errors.length === 0,
      totalOperations: operations.length,
      executionSummary: {
        inserted: totalInserted,
        modified: totalModified,
        deleted: totalDeleted,
        upserted: totalUpserted
      },
      executionTime: totalTime,
      throughput: throughput,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };
  }

  async parallelBulkProcessing(collectionName, operationBatches, operationType = 'insert') {
    console.log(`Starting parallel bulk processing: ${operationBatches.length} batches in ${collectionName}...`);
    const startTime = Date.now();

    const maxParallel = Math.min(this.config.maxParallelBatches, operationBatches.length);
    const results = [];
    const errors = [];

    // Process batches in parallel with controlled concurrency
    const processBatch = async (batch, batchIndex) => {
      try {
        let result;
        switch (operationType) {
          case 'insert':
            result = await this.bulkInsertOptimized(collectionName, batch, {
              enableProgressReporting: false
            });
            break;
          case 'update':
            result = await this.bulkUpdateOptimized(collectionName, batch, {});
            break;
          case 'delete':
            result = await this.bulkDeleteOptimized(collectionName, batch, {});
            break;
          default:
            throw new Error(`Unsupported parallel operation type: ${operationType}`);
        }

        return { batchIndex, result, success: true };
      } catch (error) {
        console.error(`Parallel batch ${batchIndex} failed:`, error.message);
        return { batchIndex, error: error.message, success: false };
      }
    };

    // Execute batches with controlled parallelism
    for (let i = 0; i < operationBatches.length; i += maxParallel) {
      const parallelBatch = operationBatches.slice(i, i + maxParallel);
      const promises = parallelBatch.map((batch, index) => processBatch(batch, i + index));

      const batchResults = await Promise.all(promises);

      for (const batchResult of batchResults) {
        if (batchResult.success) {
          results.push(batchResult.result);
        } else {
          errors.push(batchResult);
        }
      }

      const progress = ((i + parallelBatch.length) / operationBatches.length * 100).toFixed(1);
      console.log(`Parallel processing: ${progress}% complete`);
    }

    const totalTime = Date.now() - startTime;

    // Aggregate results
    const aggregatedResult = {
      success: errors.length === 0,
      totalBatches: operationBatches.length,
      successfulBatches: results.length,
      failedBatches: errors.length,
      executionTime: totalTime,
      results: results,
      errors: errors.length > 0 ? errors : undefined
    };

    // Calculate total operations processed
    let totalOperations = 0;
    for (const result of results) {
      totalOperations += result.totalInserted || result.totalModified || result.totalDeleted || 0;
    }

    aggregatedResult.totalOperations = totalOperations;
    aggregatedResult.throughput = Math.round(totalOperations / (totalTime / 1000));

    console.log(`Parallel bulk processing completed: ${totalOperations} operations in ${totalTime}ms (${aggregatedResult.throughput} ops/sec)`);
    return aggregatedResult;
  }

  async validateDocumentBatch(documents, collectionName) {
    // Basic document validation
    const requiredFields = this.getRequiredFields(collectionName);

    for (const doc of documents) {
      for (const field of requiredFields) {
        if (doc[field] === undefined || doc[field] === null) {
          throw new Error(`Required field '${field}' missing in document`);
        }
      }
    }

    return true;
  }

  getRequiredFields(collectionName) {
    const fieldMap = {
      'user_transactions': ['userId', 'amount', 'transactionType'],
      'user_profiles': ['userId', 'email'],
      'product_analytics': ['productId', 'eventType', 'eventDate'],
      'audit_logs': ['entityId', 'action', 'timestamp']
    };

    return fieldMap[collectionName] || [];
  }

  deduplicateUpsertOperations(operations) {
    const seen = new Map();
    const deduplicated = [];

    for (const operation of operations) {
      const filterKey = JSON.stringify(operation.filter);
      if (!seen.has(filterKey)) {
        seen.set(filterKey, true);
        deduplicated.push(operation);
      }
    }

    return deduplicated;
  }

  updateMetrics(operationType, successCount, errorCount, executionTime) {
    if (!this.config.enableMetrics) return;

    this.metrics.totalOperations += successCount + errorCount;
    this.metrics.successfulOperations += successCount;
    this.metrics.failedOperations += errorCount;
    this.metrics.batchesProcessed++;
    this.metrics.processingTime += executionTime;

    const throughput = Math.round(successCount / (executionTime / 1000));
    this.metrics.throughputHistory.push({
      timestamp: new Date(),
      operationType,
      throughput,
      successCount,
      errorCount
    });

    // Keep only last 100 throughput measurements
    if (this.metrics.throughputHistory.length > 100) {
      this.metrics.throughputHistory.shift();
    }
  }

  getPerformanceMetrics() {
    const recentThroughput = this.metrics.throughputHistory.slice(-10);
    const avgThroughput = recentThroughput.length > 0 
      ? Math.round(recentThroughput.reduce((sum, t) => sum + t.throughput, 0) / recentThroughput.length)
      : 0;

    return {
      totalOperations: this.metrics.totalOperations,
      successfulOperations: this.metrics.successfulOperations,
      failedOperations: this.metrics.failedOperations,
      successRate: this.metrics.totalOperations > 0 
        ? ((this.metrics.successfulOperations / this.metrics.totalOperations) * 100).toFixed(2) + '%'
        : '0%',
      batchesProcessed: this.metrics.batchesProcessed,
      totalProcessingTime: this.metrics.processingTime,
      averageThroughput: avgThroughput,
      recentThroughputHistory: recentThroughput
    };
  }

  async shutdown() {
    console.log('Shutting down bulk processor...');

    // Wait for active operations to complete
    if (this.activeOperations.size > 0) {
      console.log(`Waiting for ${this.activeOperations.size} active operations to complete...`);
      await Promise.allSettled(Array.from(this.activeOperations));
    }

    console.log('Bulk processor shutdown complete');
    console.log('Final Performance Metrics:', this.getPerformanceMetrics());
  }
}

// Example usage and demonstration
const demonstrateBulkOperations = async () => {
  try {
    const processor = new HighThroughputBulkProcessor(db, {
      batchSize: 5000,
      maxParallelBatches: 3,
      enableMetrics: true
    });

    // Initialize collections
    await processor.setupOptimizedCollections();

    // Generate sample data for demonstration
    const sampleTransactions = Array.from({ length: 50000 }, (_, index) => ({
      _id: new ObjectId(),
      userId: `user_${Math.floor(Math.random() * 10000)}`,
      amount: Math.round((Math.random() * 1000 + 10) * 100) / 100,
      transactionType: ['purchase', 'refund', 'transfer'][Math.floor(Math.random() * 3)],
      transactionDate: new Date(Date.now() - Math.random() * 30 * 24 * 60 * 60 * 1000),
      metadata: {
        category: ['electronics', 'clothing', 'books', 'food'][Math.floor(Math.random() * 4)],
        channel: ['web', 'mobile', 'api'][Math.floor(Math.random() * 3)]
      },
      createdAt: new Date()
    }));

    // Bulk insert demonstration
    console.log('\n=== Bulk Insert Demonstration ===');
    const insertResult = await processor.bulkInsertOptimized('user_transactions', sampleTransactions);
    console.log('Insert Result:', insertResult);

    // Bulk update demonstration
    console.log('\n=== Bulk Update Demonstration ===');
    const updateOperations = Array.from({ length: 10000 }, (_, index) => ({
      filter: { userId: `user_${index % 1000}` },
      update: { 
        $inc: { totalSpent: Math.round(Math.random() * 100) },
        $set: { lastUpdated: new Date() }
      },
      type: 'updateMany'
    }));

    const updateResult = await processor.bulkUpdateOptimized('user_transactions', updateOperations);
    console.log('Update Result:', updateResult);

    // Mixed operations demonstration
    console.log('\n=== Mixed Operations Demonstration ===');
    const mixedOperations = [
      // Insert operations
      ...Array.from({ length: 1000 }, (_, index) => ({
        type: 'insert',
        document: {
          userId: `new_user_${index}`,
          email: `newuser${index}@example.com`,
          createdAt: new Date()
        }
      })),

      // Update operations
      ...Array.from({ length: 500 }, (_, index) => ({
        type: 'update',
        filter: { userId: `user_${index}` },
        update: { $set: { status: 'active', lastLogin: new Date() } }
      })),

      // Upsert operations
      ...Array.from({ length: 300 }, (_, index) => ({
        type: 'upsert',
        filter: { email: `upsert${index}@example.com` },
        update: { 
          $set: { 
            email: `upsert${index}@example.com`,
            status: 'new'
          },
          $setOnInsert: { createdAt: new Date() }
        }
      }))
    ];

    const mixedResult = await processor.mixedBulkOperations('user_profiles', mixedOperations);
    console.log('Mixed Operations Result:', mixedResult);

    // Performance metrics
    console.log('\n=== Performance Metrics ===');
    console.log(processor.getPerformanceMetrics());

    return processor;

  } catch (error) {
    console.error('Bulk operations demonstration failed:', error);
    throw error;
  }
};

// Benefits of MongoDB Bulk Operations:
// - Dramatically improved throughput (10x-100x faster than individual operations)
// - Reduced network overhead with batch processing
// - Built-in error handling for partial failures
// - Flexible operation mixing (insert, update, delete in same batch)
// - Automatic optimization and connection pooling
// - Support for ordered and unordered operations
// - Native upsert capabilities with conflict resolution
// - Comprehensive result reporting and metrics
// - Memory-efficient processing of large datasets
// - Integration with MongoDB's write concerns and read preferences

module.exports = {
  HighThroughputBulkProcessor,
  demonstrateBulkOperations
};

Understanding MongoDB Bulk Operation Performance Patterns

Advanced Bulk Processing Strategies

Implement sophisticated bulk processing patterns for different high-volume scenarios:

// Advanced bulk processing patterns and optimization strategies
class ProductionBulkProcessor extends HighThroughputBulkProcessor {
  constructor(db, config = {}) {
    super(db, config);
    this.processingQueue = [];
    this.workerPool = [];
    this.compressionEnabled = config.compressionEnabled || false;
    this.retryQueue = [];
    this.deadLetterQueue = [];
  }

  async setupStreamingBulkProcessor() {
    console.log('Setting up streaming bulk processor for continuous data ingestion...');

    const streamProcessor = {
      bufferSize: this.config.streamBufferSize || 1000,
      flushInterval: this.config.streamFlushInterval || 5000, // 5 seconds
      buffer: [],
      lastFlush: Date.now(),
      totalProcessed: 0
    };

    // Streaming insert processor
    const streamingInsert = async (documents, collectionName) => {
      streamProcessor.buffer.push(...documents);

      const shouldFlush = streamProcessor.buffer.length >= streamProcessor.bufferSize ||
                         (Date.now() - streamProcessor.lastFlush) >= streamProcessor.flushInterval;

      if (shouldFlush && streamProcessor.buffer.length > 0) {
        const toProcess = [...streamProcessor.buffer];
        streamProcessor.buffer = [];
        streamProcessor.lastFlush = Date.now();

        try {
          const result = await this.bulkInsertOptimized(collectionName, toProcess, {
            enableProgressReporting: false
          });

          streamProcessor.totalProcessed += result.totalInserted;
          console.log(`Streamed ${result.totalInserted} documents, total: ${streamProcessor.totalProcessed}`);

          return result;
        } catch (error) {
          console.error('Streaming insert error:', error);
          // Add failed documents to retry queue
          this.retryQueue.push(...toProcess);
        }
      }
    };

    // Automatic flushing interval
    const flushInterval = setInterval(async () => {
      if (streamProcessor.buffer.length > 0) {
        await streamingInsert([], 'user_transactions'); // Flush buffer
      }
    }, streamProcessor.flushInterval);

    return {
      streamingInsert,
      getStats: () => ({
        bufferSize: streamProcessor.buffer.length,
        totalProcessed: streamProcessor.totalProcessed,
        lastFlush: streamProcessor.lastFlush
      }),
      shutdown: () => {
        clearInterval(flushInterval);
        return streamingInsert([], 'user_transactions'); // Final flush
      }
    };
  }

  async setupPriorityQueueProcessor() {
    console.log('Setting up priority queue bulk processor...');

    const priorityLevels = {
      CRITICAL: 1,
      HIGH: 2, 
      NORMAL: 3,
      LOW: 4
    };

    const priorityQueues = new Map();
    Object.values(priorityLevels).forEach(level => {
      priorityQueues.set(level, []);
    });

    const processQueue = async () => {
      // Process queues by priority
      for (const [priority, queue] of priorityQueues.entries()) {
        if (queue.length === 0) continue;

        const batchSize = this.calculateDynamicBatchSize(priority, queue.length);
        const batch = queue.splice(0, batchSize);

        if (batch.length > 0) {
          try {
            await this.processPriorityBatch(batch, priority);
          } catch (error) {
            console.error(`Priority ${priority} batch failed:`, error);
            // Re-queue with lower priority or move to retry queue
            this.handlePriorityFailure(batch, priority);
          }
        }
      }
    };

    // Start priority processor
    const processorInterval = setInterval(processQueue, 1000);

    return {
      addToPriorityQueue: (operations, priority = priorityLevels.NORMAL) => {
        if (!priorityQueues.has(priority)) {
          throw new Error(`Invalid priority level: ${priority}`);
        }
        priorityQueues.get(priority).push(...operations);
      },
      getQueueStats: () => {
        const stats = {};
        for (const [priority, queue] of priorityQueues.entries()) {
          stats[`priority_${priority}`] = queue.length;
        }
        return stats;
      },
      shutdown: () => clearInterval(processorInterval)
    };
  }

  calculateDynamicBatchSize(priority, queueLength) {
    // Adjust batch size based on priority and queue length
    const baseBatchSize = this.config.batchSize;

    switch (priority) {
      case 1: // CRITICAL - smaller batches for faster processing
        return Math.min(baseBatchSize / 2, queueLength);
      case 2: // HIGH - normal batch size
        return Math.min(baseBatchSize, queueLength);
      case 3: // NORMAL - larger batches for efficiency
        return Math.min(baseBatchSize * 1.5, queueLength);
      case 4: // LOW - maximum batch size for throughput
        return Math.min(baseBatchSize * 2, queueLength);
      default:
        return Math.min(baseBatchSize, queueLength);
    }
  }

  async setupAdaptiveBatchSizing() {
    console.log('Setting up adaptive batch sizing system...');

    const adaptiveConfig = {
      minBatchSize: 100,
      maxBatchSize: 20000,
      targetLatency: 2000, // 2 seconds
      adjustmentFactor: 0.1,
      performanceHistory: []
    };

    const adjustBatchSize = (currentSize, latency, throughput) => {
      let newSize = currentSize;

      if (latency > adaptiveConfig.targetLatency) {
        // Latency too high - reduce batch size
        newSize = Math.max(
          adaptiveConfig.minBatchSize,
          Math.floor(currentSize * (1 - adaptiveConfig.adjustmentFactor))
        );
      } else if (latency < adaptiveConfig.targetLatency * 0.5) {
        // Latency good - try increasing batch size for better throughput
        newSize = Math.min(
          adaptiveConfig.maxBatchSize,
          Math.floor(currentSize * (1 + adaptiveConfig.adjustmentFactor))
        );
      }

      adaptiveConfig.performanceHistory.push({
        timestamp: new Date(),
        batchSize: currentSize,
        latency,
        throughput,
        newBatchSize: newSize
      });

      // Keep only last 50 measurements
      if (adaptiveConfig.performanceHistory.length > 50) {
        adaptiveConfig.performanceHistory.shift();
      }

      return newSize;
    };

    return {
      getOptimalBatchSize: (operationType, currentLatency, currentThroughput) => {
        return adjustBatchSize(this.config.batchSize, currentLatency, currentThroughput);
      },
      getPerformanceHistory: () => adaptiveConfig.performanceHistory,
      getConfig: () => adaptiveConfig
    };
  }

  async setupCompressedBulkOperations() {
    console.log('Setting up compressed bulk operations...');

    if (!this.compressionEnabled) {
      console.log('Compression not enabled, skipping setup');
      return null;
    }

    const zlib = require('zlib');

    const compressDocuments = async (documents) => {
      const serialized = JSON.stringify(documents);
      return new Promise((resolve, reject) => {
        zlib.gzip(serialized, (error, compressed) => {
          if (error) reject(error);
          else resolve(compressed);
        });
      });
    };

    const decompressDocuments = async (compressed) => {
      return new Promise((resolve, reject) => {
        zlib.gunzip(compressed, (error, decompressed) => {
          if (error) reject(error);
          else resolve(JSON.parse(decompressed.toString()));
        });
      });
    };

    return {
      compressedBulkInsert: async (collectionName, documents) => {
        const startCompress = Date.now();
        const compressed = await compressDocuments(documents);
        const compressTime = Date.now() - startCompress;

        const originalSize = JSON.stringify(documents).length;
        const compressedSize = compressed.length;
        const compressionRatio = (compressedSize / originalSize * 100).toFixed(2);

        console.log(`Compression: ${originalSize} -> ${compressedSize} bytes (${compressionRatio}%) in ${compressTime}ms`);

        // For demo - in practice you'd send compressed data to a queue or storage
        const decompressed = await decompressDocuments(compressed);
        return await this.bulkInsertOptimized(collectionName, decompressed);
      },

      compressionStats: (documents) => {
        const originalSize = JSON.stringify(documents).length;
        return {
          originalSizeBytes: originalSize,
          estimatedCompressionRatio: '60-80%', // Typical JSON compression
          potentialSavings: `${Math.round(originalSize * 0.3)} bytes`
        };
      }
    };
  }

  async setupRetryMechanism() {
    console.log('Setting up bulk operation retry mechanism...');

    const retryConfig = {
      maxRetries: 3,
      backoffMultiplier: 2,
      baseDelay: 1000,
      maxDelay: 30000,
      retryableErrors: [
        'NetworkTimeout',
        'ConnectionPoolClosed',
        'WriteConcernError'
      ]
    };

    const isRetryableError = (error) => {
      return retryConfig.retryableErrors.some(retryableError => 
        error.message.includes(retryableError)
      );
    };

    const calculateDelay = (attempt) => {
      const delay = retryConfig.baseDelay * Math.pow(retryConfig.backoffMultiplier, attempt - 1);
      return Math.min(delay, retryConfig.maxDelay);
    };

    const retryOperation = async (operation, attempt = 1) => {
      try {
        return await operation();
      } catch (error) {
        if (attempt >= retryConfig.maxRetries || !isRetryableError(error)) {
          console.error(`Operation failed after ${attempt} attempts:`, error.message);
          throw error;
        }

        const delay = calculateDelay(attempt);
        console.log(`Retry attempt ${attempt}/${retryConfig.maxRetries} after ${delay}ms delay`);

        await new Promise(resolve => setTimeout(resolve, delay));
        return await retryOperation(operation, attempt + 1);
      }
    };

    return {
      retryBulkOperation: retryOperation,
      isRetryable: isRetryableError,
      getRetryConfig: () => retryConfig
    };
  }

  async setupBulkOperationMonitoring() {
    console.log('Setting up comprehensive bulk operation monitoring...');

    const monitoring = {
      activeOperations: new Map(),
      operationHistory: [],
      alerts: [],
      thresholds: {
        maxLatency: 10000, // 10 seconds
        minThroughput: 1000, // 1000 ops/sec
        maxErrorRate: 0.05 // 5%
      }
    };

    const trackOperation = (operationId, operationType, startTime) => {
      monitoring.activeOperations.set(operationId, {
        type: operationType,
        startTime,
        status: 'running'
      });
    };

    const completeOperation = (operationId, result) => {
      const operation = monitoring.activeOperations.get(operationId);
      if (!operation) return;

      const endTime = Date.now();
      const duration = endTime - operation.startTime;

      const historyEntry = {
        operationId,
        type: operation.type,
        duration,
        success: result.success,
        throughput: result.throughput,
        errorCount: result.errors?.length || 0,
        timestamp: new Date(endTime)
      };

      monitoring.operationHistory.push(historyEntry);
      monitoring.activeOperations.delete(operationId);

      // Keep only last 1000 history entries
      if (monitoring.operationHistory.length > 1000) {
        monitoring.operationHistory.shift();
      }

      // Check for alerts
      this.checkPerformanceAlerts(historyEntry, monitoring);
    };

    return {
      trackOperation,
      completeOperation,
      getActiveOperations: () => Array.from(monitoring.activeOperations.entries()),
      getOperationHistory: () => monitoring.operationHistory,
      getAlerts: () => monitoring.alerts,
      getPerformanceSummary: () => this.generatePerformanceSummary(monitoring)
    };
  }

  checkPerformanceAlerts(operation, monitoring) {
    const alerts = [];

    // Latency alert
    if (operation.duration > monitoring.thresholds.maxLatency) {
      alerts.push({
        type: 'HIGH_LATENCY',
        message: `Operation ${operation.operationId} took ${operation.duration}ms (threshold: ${monitoring.thresholds.maxLatency}ms)`,
        severity: 'warning',
        timestamp: new Date()
      });
    }

    // Throughput alert
    if (operation.throughput < monitoring.thresholds.minThroughput) {
      alerts.push({
        type: 'LOW_THROUGHPUT',
        message: `Operation ${operation.operationId} achieved ${operation.throughput} ops/sec (threshold: ${monitoring.thresholds.minThroughput} ops/sec)`,
        severity: 'warning',
        timestamp: new Date()
      });
    }

    // Error rate alert
    const recentOperations = monitoring.operationHistory.slice(-10);
    const errorRate = recentOperations.reduce((sum, op) => sum + (op.success ? 0 : 1), 0) / recentOperations.length;

    if (errorRate > monitoring.thresholds.maxErrorRate) {
      alerts.push({
        type: 'HIGH_ERROR_RATE',
        message: `Error rate ${(errorRate * 100).toFixed(1)}% exceeds threshold ${monitoring.thresholds.maxErrorRate * 100}%`,
        severity: 'critical',
        timestamp: new Date()
      });
    }

    monitoring.alerts.push(...alerts);

    // Keep only last 100 alerts
    if (monitoring.alerts.length > 100) {
      monitoring.alerts.splice(0, monitoring.alerts.length - 100);
    }
  }

  generatePerformanceSummary(monitoring) {
    const recentOperations = monitoring.operationHistory.slice(-50);

    if (recentOperations.length === 0) {
      return { message: 'No recent operations' };
    }

    const avgDuration = recentOperations.reduce((sum, op) => sum + op.duration, 0) / recentOperations.length;
    const avgThroughput = recentOperations.reduce((sum, op) => sum + op.throughput, 0) / recentOperations.length;
    const successRate = recentOperations.reduce((sum, op) => sum + (op.success ? 1 : 0), 0) / recentOperations.length;

    return {
      recentOperations: recentOperations.length,
      averageDuration: Math.round(avgDuration),
      averageThroughput: Math.round(avgThroughput),
      successRate: (successRate * 100).toFixed(1) + '%',
      activeOperations: monitoring.activeOperations.size,
      recentAlerts: monitoring.alerts.filter(alert => 
        Date.now() - alert.timestamp.getTime() < 300000 // Last 5 minutes
      ).length
    };
  }

  async demonstrateAdvancedPatterns() {
    console.log('\n=== Advanced Bulk Processing Patterns Demo ===');

    try {
      // Setup advanced processors
      const streamProcessor = await this.setupStreamingBulkProcessor();
      const priorityProcessor = await this.setupPriorityQueueProcessor();
      const adaptiveSizing = await this.setupAdaptiveBatchSizing();
      const retryMechanism = await this.setupRetryMechanism();
      const monitoring = await this.setupBulkOperationMonitoring();

      // Demo streaming processing
      console.log('\n--- Streaming Processing Demo ---');
      const streamingData = Array.from({ length: 2500 }, (_, i) => ({
        _id: new ObjectId(),
        streamId: `stream_${i}`,
        data: `streaming_data_${i}`,
        timestamp: new Date()
      }));

      // Add data to stream in chunks
      for (let i = 0; i < streamingData.length; i += 300) {
        const chunk = streamingData.slice(i, i + 300);
        await streamProcessor.streamingInsert(chunk, 'user_transactions');
        await new Promise(resolve => setTimeout(resolve, 100)); // Simulate streaming delay
      }

      console.log('Streaming stats:', streamProcessor.getStats());

      // Demo priority processing
      console.log('\n--- Priority Processing Demo ---');
      const criticalOps = Array.from({ length: 50 }, (_, i) => ({
        type: 'insert',
        document: { priority: 'critical', id: i, timestamp: new Date() }
      }));

      const normalOps = Array.from({ length: 200 }, (_, i) => ({
        type: 'insert', 
        document: { priority: 'normal', id: i + 1000, timestamp: new Date() }
      }));

      priorityProcessor.addToPriorityQueue(criticalOps, 1); // CRITICAL
      priorityProcessor.addToPriorityQueue(normalOps, 3);   // NORMAL

      await new Promise(resolve => setTimeout(resolve, 3000)); // Let priority processor work
      console.log('Priority queue stats:', priorityProcessor.getQueueStats());

      // Cleanup
      await streamProcessor.shutdown();
      priorityProcessor.shutdown();

      return {
        streamingDemo: streamProcessor.getStats(),
        priorityDemo: priorityProcessor.getQueueStats(),
        adaptiveConfig: adaptiveSizing.getConfig()
      };

    } catch (error) {
      console.error('Advanced patterns demo failed:', error);
      throw error;
    }
  }
}

SQL-Style Bulk Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB bulk operations:

-- QueryLeaf bulk operations with SQL-familiar syntax

-- Bulk INSERT operations
INSERT INTO user_transactions 
SELECT 
    UUID() as _id,
    user_id,
    transaction_amount as amount,
    transaction_type,
    transaction_date,
    JSON_BUILD_OBJECT(
        'category', category,
        'channel', channel,
        'location', location
    ) as metadata,
    CURRENT_TIMESTAMP as created_at
FROM staging_transactions
WHERE processing_status = 'pending'
LIMIT 100000;

-- Batch configuration for optimal performance
SET BULK_INSERT_BATCH_SIZE = 10000;
SET BULK_INSERT_ORDERED = false;
SET BULK_INSERT_WRITE_CONCERN = JSON_BUILD_OBJECT('w', 'majority', 'j', true);

-- Advanced bulk INSERT with conflict resolution
INSERT INTO user_profiles (user_id, email, profile_data, created_at)
SELECT 
    user_id,
    email,
    JSON_BUILD_OBJECT(
        'first_name', first_name,
        'last_name', last_name,
        'preferences', preferences,
        'registration_source', source
    ) as profile_data,
    registration_date as created_at
FROM user_registrations
WHERE processed = false
ON DUPLICATE KEY UPDATE 
    profile_data = JSON_MERGE_PATCH(profile_data, VALUES(profile_data)),
    last_updated = CURRENT_TIMESTAMP,
    update_count = COALESCE(update_count, 0) + 1;

-- Bulk UPDATE operations with complex conditions
UPDATE user_transactions 
SET 
    status = CASE 
        WHEN amount > 1000 AND transaction_type = 'purchase' THEN 'requires_approval'
        WHEN transaction_date < CURRENT_DATE - INTERVAL '30 days' THEN 'archived'
        WHEN metadata->>'$.category' = 'refund' THEN 'processed'
        ELSE 'completed'
    END,
    processing_fee = CASE
        WHEN amount > 500 THEN amount * 0.025
        WHEN amount > 100 THEN amount * 0.035
        ELSE amount * 0.05
    END,
    risk_score = CASE
        WHEN user_id IN (SELECT user_id FROM high_risk_users) THEN 100
        WHEN amount > 2000 THEN 75
        WHEN metadata->>'$.channel' = 'api' THEN 50
        ELSE 25
    END,
    last_updated = CURRENT_TIMESTAMP
WHERE status = 'pending'
    AND created_at >= CURRENT_DATE - INTERVAL '7 days'
BULK_OPTIONS (
    batch_size = 5000,
    ordered = false,
    write_concern = JSON_BUILD_OBJECT('w', 'majority')
);

-- Bulk UPSERT operations for data synchronization
UPSERT INTO user_analytics (
    user_id,
    daily_stats,
    calculation_date,
    last_updated
)
WITH daily_calculations AS (
    SELECT 
        user_id,
        DATE_TRUNC('day', transaction_date) as calculation_date,

        -- Aggregate daily statistics
        JSON_BUILD_OBJECT(
            'total_transactions', COUNT(*),
            'total_amount', SUM(amount),
            'avg_transaction', ROUND(AVG(amount)::NUMERIC, 2),
            'transaction_types', JSON_AGG(DISTINCT transaction_type),
            'categories', JSON_AGG(DISTINCT metadata->>'$.category'),
            'channels', JSON_AGG(DISTINCT metadata->>'$.channel'),

            -- Advanced metrics
            'largest_transaction', MAX(amount),
            'smallest_transaction', MIN(amount),
            'morning_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 6 AND 11),
            'afternoon_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 12 AND 17),
            'evening_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 18 AND 23),
            'night_transactions', COUNT(*) FILTER (WHERE EXTRACT(HOUR FROM transaction_date) BETWEEN 0 AND 5),

            -- Spending patterns
            'high_value_transactions', COUNT(*) FILTER (WHERE amount > 500),
            'refund_count', COUNT(*) FILTER (WHERE transaction_type = 'refund'),
            'refund_amount', SUM(amount) FILTER (WHERE transaction_type = 'refund')
        ) as daily_stats

    FROM user_transactions
    WHERE transaction_date >= CURRENT_DATE - INTERVAL '30 days'
        AND status = 'completed'
    GROUP BY user_id, DATE_TRUNC('day', transaction_date)
)
SELECT 
    user_id,
    daily_stats,
    calculation_date,
    CURRENT_TIMESTAMP as last_updated
FROM daily_calculations
ON CONFLICT (user_id, calculation_date) DO UPDATE
SET 
    daily_stats = EXCLUDED.daily_stats,
    last_updated = EXCLUDED.last_updated,
    version = COALESCE(version, 0) + 1
BULK_OPTIONS (
    batch_size = 2000,
    ordered = false,
    enable_upsert = true
);

-- High-performance bulk DELETE with cascading
DELETE FROM user_sessions us
WHERE us.session_id IN (
    SELECT session_id 
    FROM expired_sessions 
    WHERE expiry_date < CURRENT_TIMESTAMP - INTERVAL '7 days'
)
BULK_OPTIONS (
    batch_size = 10000,
    cascade_delete = JSON_ARRAY(
        JSON_BUILD_OBJECT('collection', 'user_activity_logs', 'field', 'session_id'),
        JSON_BUILD_OBJECT('collection', 'session_analytics', 'field', 'session_id')
    )
);

-- Mixed bulk operations in single transaction
START BULK_TRANSACTION;

-- Insert new user activities
INSERT INTO user_activities (user_id, activity_type, activity_data, timestamp)
SELECT 
    user_id,
    'login' as activity_type,
    JSON_BUILD_OBJECT(
        'ip_address', ip_address,
        'user_agent', user_agent,
        'location', location
    ) as activity_data,
    login_timestamp as timestamp
FROM pending_logins
WHERE processed = false;

-- Update user login statistics
UPDATE user_profiles 
SET 
    last_login_date = (
        SELECT MAX(timestamp) 
        FROM user_activities ua 
        WHERE ua.user_id = user_profiles.user_id 
            AND ua.activity_type = 'login'
    ),
    login_count = COALESCE(login_count, 0) + (
        SELECT COUNT(*) 
        FROM pending_logins pl 
        WHERE pl.user_id = user_profiles.user_id 
            AND pl.processed = false
    ),
    profile_updated_at = CURRENT_TIMESTAMP
WHERE user_id IN (SELECT DISTINCT user_id FROM pending_logins WHERE processed = false);

-- Mark source data as processed
UPDATE pending_logins 
SET 
    processed = true,
    processed_at = CURRENT_TIMESTAMP
WHERE processed = false;

COMMIT BULK_TRANSACTION
WITH ROLLBACK_ON_ERROR = true;

-- Advanced bulk operation monitoring and analytics
WITH bulk_operation_metrics AS (
    SELECT 
        operation_type,
        collection_name,
        DATE_TRUNC('hour', operation_timestamp) as hour_bucket,

        -- Volume metrics
        COUNT(*) as operation_count,
        SUM(documents_processed) as total_documents,
        SUM(batch_count) as total_batches,
        AVG(documents_processed) as avg_documents_per_operation,

        -- Performance metrics
        AVG(execution_time_ms) as avg_execution_time,
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY execution_time_ms) as median_execution_time,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY execution_time_ms) as p95_execution_time,
        MAX(execution_time_ms) as max_execution_time,

        -- Throughput calculations
        AVG(throughput_ops_per_sec) as avg_throughput,
        MAX(throughput_ops_per_sec) as peak_throughput,
        SUM(documents_processed) / GREATEST(SUM(execution_time_ms) / 1000.0, 1) as overall_throughput,

        -- Error tracking
        COUNT(*) FILTER (WHERE success = false) as failed_operations,
        SUM(error_count) as total_errors,
        AVG(error_count) FILTER (WHERE error_count > 0) as avg_errors_per_failed_op,

        -- Resource utilization
        AVG(memory_usage_mb) as avg_memory_usage,
        MAX(memory_usage_mb) as peak_memory_usage,
        AVG(cpu_usage_percent) as avg_cpu_usage

    FROM bulk_operation_logs 
    WHERE operation_timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    GROUP BY operation_type, collection_name, DATE_TRUNC('hour', operation_timestamp)
),
performance_analysis AS (
    SELECT *,
        -- Performance classification
        CASE 
            WHEN avg_throughput >= 10000 THEN 'excellent'
            WHEN avg_throughput >= 5000 THEN 'good'
            WHEN avg_throughput >= 1000 THEN 'acceptable'
            ELSE 'needs_optimization'
        END as throughput_rating,

        -- Reliability assessment
        CASE 
            WHEN failed_operations = 0 THEN 'perfect'
            WHEN failed_operations::numeric / operation_count < 0.01 THEN 'excellent'
            WHEN failed_operations::numeric / operation_count < 0.05 THEN 'good'
            ELSE 'needs_attention'
        END as reliability_rating,

        -- Resource efficiency
        CASE 
            WHEN avg_memory_usage <= 100 AND avg_cpu_usage <= 50 THEN 'efficient'
            WHEN avg_memory_usage <= 500 AND avg_cpu_usage <= 75 THEN 'moderate'
            ELSE 'resource_intensive'
        END as resource_efficiency,

        -- Performance trends
        LAG(avg_throughput, 1) OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY hour_bucket
        ) as prev_hour_throughput,

        LAG(p95_execution_time, 1) OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY hour_bucket
        ) as prev_hour_p95_latency

    FROM bulk_operation_metrics
)
SELECT 
    operation_type,
    collection_name,
    hour_bucket,

    -- Volume summary
    operation_count,
    total_documents,
    ROUND(avg_documents_per_operation::NUMERIC, 0) as avg_docs_per_op,

    -- Performance summary
    ROUND(avg_execution_time::NUMERIC, 0) as avg_time_ms,
    ROUND(p95_execution_time::NUMERIC, 0) as p95_time_ms,
    ROUND(avg_throughput::NUMERIC, 0) as avg_throughput,
    ROUND(overall_throughput::NUMERIC, 0) as measured_throughput,

    -- Quality indicators
    throughput_rating,
    reliability_rating,
    resource_efficiency,

    -- Error statistics
    failed_operations,
    total_errors,
    ROUND((failed_operations::numeric / operation_count * 100)::NUMERIC, 2) as error_rate_pct,

    -- Resource usage
    ROUND(avg_memory_usage::NUMERIC, 1) as avg_memory_mb,
    ROUND(avg_cpu_usage::NUMERIC, 1) as avg_cpu_pct,

    -- Performance trends
    CASE 
        WHEN prev_hour_throughput IS NOT NULL AND avg_throughput > prev_hour_throughput * 1.1 THEN 'improving'
        WHEN prev_hour_throughput IS NOT NULL AND avg_throughput < prev_hour_throughput * 0.9 THEN 'degrading'
        ELSE 'stable'
    END as throughput_trend,

    CASE 
        WHEN prev_hour_p95_latency IS NOT NULL AND p95_execution_time > prev_hour_p95_latency * 1.2 THEN 'latency_increasing'
        WHEN prev_hour_p95_latency IS NOT NULL AND p95_execution_time < prev_hour_p95_latency * 0.8 THEN 'latency_improving'
        ELSE 'latency_stable'
    END as latency_trend,

    -- Optimization recommendations
    CASE 
        WHEN throughput_rating = 'needs_optimization' AND resource_efficiency = 'efficient' THEN 'Increase batch size or parallelism'
        WHEN throughput_rating = 'needs_optimization' AND resource_efficiency = 'resource_intensive' THEN 'Optimize query patterns or reduce batch size'
        WHEN reliability_rating = 'needs_attention' THEN 'Review error handling and retry logic'
        WHEN resource_efficiency = 'resource_intensive' THEN 'Consider memory optimization or connection pooling'
        ELSE 'Performance within acceptable parameters'
    END as optimization_recommendation

FROM performance_analysis
WHERE hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '12 hours'
ORDER BY hour_bucket DESC, total_documents DESC;

-- Adaptive batch size optimization query
WITH batch_performance_analysis AS (
    SELECT 
        operation_type,
        collection_name,
        batch_size,

        -- Performance metrics per batch size
        COUNT(*) as operation_count,
        AVG(execution_time_ms) as avg_execution_time,
        AVG(throughput_ops_per_sec) as avg_throughput,
        STDDEV(throughput_ops_per_sec) as throughput_variance,

        -- Error rates
        COUNT(*) FILTER (WHERE success = false) as failed_ops,
        AVG(error_count) as avg_errors,

        -- Resource utilization
        AVG(memory_usage_mb) as avg_memory,
        AVG(cpu_usage_percent) as avg_cpu,

        -- Efficiency calculation
        AVG(throughput_ops_per_sec) / GREATEST(AVG(memory_usage_mb), 1) as memory_efficiency,
        AVG(throughput_ops_per_sec) / GREATEST(AVG(cpu_usage_percent), 1) as cpu_efficiency

    FROM bulk_operation_logs
    WHERE operation_timestamp >= CURRENT_TIMESTAMP - INTERVAL '7 days'
    GROUP BY operation_type, collection_name, batch_size
),
optimal_batch_analysis AS (
    SELECT *,
        -- Rank batch sizes by different criteria
        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY avg_throughput DESC
        ) as throughput_rank,

        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY avg_execution_time ASC
        ) as latency_rank,

        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY memory_efficiency DESC
        ) as memory_efficiency_rank,

        ROW_NUMBER() OVER (
            PARTITION BY operation_type, collection_name 
            ORDER BY failed_ops ASC, avg_errors ASC
        ) as reliability_rank,

        -- Calculate composite score
        (
            -- Throughput weight: 40%
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY avg_throughput DESC) * 0.4) +
            -- Latency weight: 30%  
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY avg_execution_time ASC) * 0.3) +
            -- Reliability weight: 20%
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY failed_ops ASC) * 0.2) +
            -- Efficiency weight: 10%
            (ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY memory_efficiency DESC) * 0.1)
        ) as composite_score

    FROM batch_performance_analysis
    WHERE operation_count >= 5 -- Minimum sample size for reliability
)
SELECT 
    operation_type,
    collection_name,
    batch_size,
    operation_count,

    -- Performance metrics
    ROUND(avg_execution_time::NUMERIC, 0) as avg_time_ms,
    ROUND(avg_throughput::NUMERIC, 0) as avg_throughput,
    ROUND(throughput_variance::NUMERIC, 0) as throughput_std_dev,

    -- Rankings
    throughput_rank,
    latency_rank,
    reliability_rank,
    ROUND(composite_score::NUMERIC, 2) as composite_score,

    -- Recommendations
    CASE 
        WHEN ROW_NUMBER() OVER (PARTITION BY operation_type, collection_name ORDER BY composite_score ASC) = 1 
        THEN 'OPTIMAL - Recommended batch size'
        WHEN throughput_rank <= 2 THEN 'HIGH_THROUGHPUT - Consider for bulk operations'
        WHEN latency_rank <= 2 THEN 'LOW_LATENCY - Consider for real-time operations'  
        WHEN reliability_rank <= 2 THEN 'HIGH_RELIABILITY - Consider for critical operations'
        ELSE 'SUBOPTIMAL - Not recommended'
    END as recommendation,

    -- Resource usage
    ROUND(avg_memory::NUMERIC, 1) as avg_memory_mb,
    ROUND(avg_cpu::NUMERIC, 1) as avg_cpu_pct,

    -- Quality indicators
    failed_ops,
    ROUND((failed_ops::numeric / operation_count * 100)::NUMERIC, 2) as error_rate_pct,

    -- Next steps
    CASE 
        WHEN batch_size < 1000 AND throughput_rank > 3 THEN 'Try larger batch size (2000-5000)'
        WHEN batch_size > 10000 AND reliability_rank > 3 THEN 'Try smaller batch size (5000-8000)'
        WHEN throughput_variance > avg_throughput * 0.5 THEN 'Inconsistent performance - review system load'
        ELSE 'Batch size appears well-tuned'
    END as tuning_suggestion

FROM optimal_batch_analysis
ORDER BY operation_type, collection_name, composite_score ASC;

-- QueryLeaf provides comprehensive bulk operation capabilities:
-- 1. High-performance bulk INSERT, UPDATE, DELETE, and UPSERT operations
-- 2. Advanced batch processing with configurable batch sizes and write concerns  
-- 3. Mixed operation support within single transactions
-- 4. Comprehensive error handling and partial failure recovery
-- 5. Real-time monitoring and performance analytics
-- 6. Adaptive batch size optimization based on performance metrics
-- 7. Resource usage tracking and efficiency analysis
-- 8. SQL-familiar syntax for complex bulk operations
-- 9. Integration with MongoDB's native bulk operation optimizations
-- 10. Production-ready patterns for high-volume data processing

Best Practices for MongoDB Bulk Operations

Performance Optimization Strategies

Essential techniques for maximizing bulk operation throughput:

  1. Batch Size Optimization: Start with 1,000-10,000 documents per batch and adjust based on document size and system resources
  2. Unordered Operations: Use unordered bulk operations when possible to allow parallel processing and partial failure handling
  3. Write Concern Tuning: Balance durability and performance by configuring appropriate write concerns for your use case
  4. Index Strategy: Ensure optimal indexes exist before bulk operations, but consider temporarily dropping non-essential indexes for large imports
  5. Connection Pooling: Configure adequate connection pools to handle concurrent bulk operations efficiently
  6. Memory Management: Monitor memory usage and adjust batch sizes to avoid memory pressure and garbage collection overhead

Operational Excellence

Implement robust operational practices for production bulk processing:

  1. Error Handling: Design comprehensive error handling with retry logic for transient failures and dead letter queues for persistent errors
  2. Progress Monitoring: Implement detailed progress tracking and monitoring for long-running bulk operations
  3. Resource Monitoring: Monitor CPU, memory, and I/O usage during bulk operations to identify bottlenecks
  4. Graceful Degradation: Design fallback mechanisms and circuit breakers for bulk operation failures
  5. Testing at Scale: Test bulk operations with production-size datasets to validate performance and reliability
  6. Documentation: Maintain comprehensive documentation of bulk operation patterns, configurations, and troubleshooting procedures

Conclusion

MongoDB bulk operations provide exceptional capabilities for high-throughput data processing that far exceed traditional single-operation approaches. The combination of flexible batch processing, intelligent error handling, and comprehensive monitoring makes MongoDB an ideal platform for applications requiring efficient bulk data management.

Key bulk operation benefits include:

  • Dramatic Performance Improvements: 10x-100x faster processing compared to individual operations
  • Intelligent Batch Processing: Configurable batch sizes with automatic optimization and adaptive sizing
  • Robust Error Handling: Partial failure recovery and comprehensive error reporting
  • Flexible Operation Mixing: Support for mixed INSERT, UPDATE, DELETE, and UPSERT operations in single batches
  • Production-Ready Features: Built-in monitoring, retry mechanisms, and resource management
  • Scalable Architecture: Seamless scaling across replica sets and sharded clusters

Whether you're processing data migrations, real-time analytics ingestion, or high-volume transaction processing, MongoDB bulk operations with QueryLeaf's familiar SQL interface provide the foundation for efficient, scalable data processing solutions.

QueryLeaf Integration: QueryLeaf seamlessly manages MongoDB bulk operations while providing SQL-familiar batch processing syntax, performance optimization patterns, and comprehensive monitoring capabilities. Advanced bulk processing strategies including adaptive batch sizing, priority queues, and streaming operations are elegantly handled through familiar SQL constructs, making sophisticated high-volume data processing both powerful and accessible to SQL-oriented development teams.

The combination of MongoDB's native bulk operation capabilities with SQL-style batch processing makes it an ideal platform for applications requiring both high-throughput data processing and familiar database interaction patterns, ensuring your bulk processing solutions remain both efficient and maintainable as they scale.

MongoDB GridFS for Large File Storage and Management: SQL-Style File Operations and Binary Data Handling

Modern applications frequently need to handle large files, multimedia content, binary data, and document attachments that exceed traditional database storage limitations. Whether you're building content management systems, media libraries, document repositories, or data archival platforms, efficient large file storage and retrieval becomes critical for application performance and user experience.

Traditional relational databases struggle with large binary data storage, often requiring complex external storage solutions, fragmented file management approaches, and intricate metadata synchronization. MongoDB GridFS provides a comprehensive solution for storing and managing files larger than the 16MB BSON document size limit while maintaining the benefits of database-integrated storage, atomic operations, and familiar query patterns.

The Large File Storage Challenge

Traditional database approaches to file storage face significant limitations:

-- PostgreSQL large file storage - complex and limited binary data handling

-- Basic file storage table with bytea limitations (limited to available memory)
CREATE TABLE file_storage (
    file_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    filename VARCHAR(255) NOT NULL,
    content_type VARCHAR(100) NOT NULL,
    file_size BIGINT NOT NULL,
    file_data BYTEA, -- Limited by available memory, inefficient for large files
    upload_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    created_by UUID NOT NULL,

    -- Basic metadata
    description TEXT,
    tags TEXT[],
    category VARCHAR(50),
    is_public BOOLEAN DEFAULT false,

    -- File characteristics
    file_hash VARCHAR(64), -- For duplicate detection
    original_filename VARCHAR(255),
    compression_type VARCHAR(20),

    CONSTRAINT check_file_size CHECK (file_size > 0 AND file_size <= 1073741824) -- 1GB limit
);

-- Large object storage approach (pg_largeobject) - complex management
CREATE TABLE file_metadata (
    file_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    filename VARCHAR(255) NOT NULL,
    content_type VARCHAR(100) NOT NULL,
    file_size BIGINT NOT NULL,
    large_object_oid OID NOT NULL, -- Reference to pg_largeobject
    upload_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    created_by UUID NOT NULL,
    description TEXT,
    tags TEXT[],
    category VARCHAR(50),
    is_public BOOLEAN DEFAULT false,
    file_hash VARCHAR(64),

    -- Complex management required
    CONSTRAINT check_file_size CHECK (file_size > 0)
);

-- File chunks table for manual chunking (complex to manage)
CREATE TABLE file_chunks (
    chunk_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    file_id UUID NOT NULL REFERENCES file_metadata(file_id) ON DELETE CASCADE,
    chunk_index INTEGER NOT NULL,
    chunk_data BYTEA NOT NULL,
    chunk_size INTEGER NOT NULL,
    checksum VARCHAR(32),

    UNIQUE(file_id, chunk_index)
);

-- Complex file upload process with manual chunking
CREATE OR REPLACE FUNCTION upload_file_chunked(
    p_filename VARCHAR(255),
    p_content_type VARCHAR(100),
    p_file_data BYTEA,
    p_created_by UUID,
    p_chunk_size INTEGER DEFAULT 262144 -- 256KB chunks
) RETURNS UUID AS $$
DECLARE
    v_file_id UUID;
    v_file_size BIGINT;
    v_chunk_count INTEGER;
    v_chunk_data BYTEA;
    v_offset INTEGER := 1;
    v_chunk_index INTEGER := 0;
    v_current_chunk_size INTEGER;
BEGIN
    -- Get file size
    v_file_size := LENGTH(p_file_data);
    v_chunk_count := CEIL(v_file_size::DECIMAL / p_chunk_size);

    -- Insert file metadata
    INSERT INTO file_metadata (filename, content_type, file_size, large_object_oid, created_by)
    VALUES (p_filename, p_content_type, v_file_size, 0, p_created_by) -- Placeholder OID
    RETURNING file_id INTO v_file_id;

    -- Insert chunks
    WHILE v_offset <= v_file_size LOOP
        v_current_chunk_size := LEAST(p_chunk_size, v_file_size - v_offset + 1);
        v_chunk_data := SUBSTRING(p_file_data FROM v_offset FOR v_current_chunk_size);

        INSERT INTO file_chunks (file_id, chunk_index, chunk_data, chunk_size, checksum)
        VALUES (
            v_file_id, 
            v_chunk_index, 
            v_chunk_data, 
            v_current_chunk_size,
            MD5(v_chunk_data)
        );

        v_offset := v_offset + p_chunk_size;
        v_chunk_index := v_chunk_index + 1;
    END LOOP;

    RETURN v_file_id;
END;
$$ LANGUAGE plpgsql;

-- Complex file retrieval with manual chunk reassembly
CREATE OR REPLACE FUNCTION download_file_chunked(p_file_id UUID)
RETURNS BYTEA AS $$
DECLARE
    v_file_data BYTEA := '';
    v_chunk RECORD;
BEGIN
    -- Reassemble file from chunks
    FOR v_chunk IN 
        SELECT chunk_data 
        FROM file_chunks 
        WHERE file_id = p_file_id 
        ORDER BY chunk_index
    LOOP
        v_file_data := v_file_data || v_chunk.chunk_data;
    END LOOP;

    RETURN v_file_data;
END;
$$ LANGUAGE plpgsql;

-- File search with basic metadata queries (limited functionality)
WITH file_search AS (
    SELECT 
        fm.file_id,
        fm.filename,
        fm.content_type,
        fm.file_size,
        fm.upload_timestamp,
        fm.created_by,
        fm.description,
        fm.tags,
        fm.category,

        -- Basic relevance scoring (very limited)
        CASE 
            WHEN LOWER(fm.filename) LIKE '%search_term%' THEN 3
            WHEN LOWER(fm.description) LIKE '%search_term%' THEN 2  
            WHEN 'search_term' = ANY(fm.tags) THEN 2
            ELSE 1
        END as relevance_score,

        -- File size categorization
        CASE 
            WHEN fm.file_size < 1048576 THEN 'Small (< 1MB)'
            WHEN fm.file_size < 10485760 THEN 'Medium (1-10MB)'
            WHEN fm.file_size < 104857600 THEN 'Large (10-100MB)'
            ELSE 'Very Large (> 100MB)'
        END as size_category,

        -- Check if file exists (chunks available)
        EXISTS(
            SELECT 1 FROM file_chunks fc WHERE fc.file_id = fm.file_id
        ) as file_available,

        -- Get chunk count for integrity verification
        (
            SELECT COUNT(*) FROM file_chunks fc WHERE fc.file_id = fm.file_id
        ) as chunk_count

    FROM file_metadata fm
    WHERE 
        (
            LOWER(fm.filename) LIKE '%search_term%' OR
            LOWER(fm.description) LIKE '%search_term%' OR  
            'search_term' = ANY(fm.tags)
        )
        AND fm.is_public = true
),
file_stats AS (
    SELECT 
        COUNT(*) as total_files,
        SUM(fs.file_size) as total_storage_used,
        AVG(fs.file_size) as avg_file_size,
        COUNT(*) FILTER (WHERE fs.file_available = false) as corrupted_files
    FROM file_search fs
)
SELECT 
    fs.file_id,
    fs.filename,
    fs.content_type,
    pg_size_pretty(fs.file_size) as formatted_size,
    fs.size_category,
    fs.upload_timestamp,
    u.name as uploaded_by,
    fs.description,
    fs.tags,
    fs.relevance_score,
    fs.file_available,
    fs.chunk_count,

    -- Download URL (requires application logic for reassembly)
    '/api/files/' || fs.file_id || '/download' as download_url,

    -- File status
    CASE 
        WHEN NOT fs.file_available THEN 'Missing/Corrupted'
        WHEN fs.chunk_count = 0 THEN 'Empty File'
        ELSE 'Available'
    END as file_status,

    -- Storage efficiency warning
    CASE 
        WHEN fs.chunk_count > 1000 THEN 'High fragmentation - consider optimization'
        ELSE 'Normal'
    END as storage_health

FROM file_search fs
JOIN users u ON fs.created_by = u.user_id
CROSS JOIN file_stats fst
WHERE fs.file_available = true
ORDER BY fs.relevance_score DESC, fs.upload_timestamp DESC
LIMIT 50;

-- Problems with traditional file storage approaches:
-- 1. Memory limitations with BYTEA for large files
-- 2. Complex manual chunking and reassembly processes
-- 3. No built-in file streaming or partial reads
-- 4. Limited metadata integration and search capabilities
-- 5. No automatic integrity checking or corruption detection
-- 6. Poor performance with large binary data operations
-- 7. Complex backup and replication scenarios
-- 8. No built-in compression or storage optimization
-- 9. Difficult scaling with growing file storage requirements
-- 10. Manual transaction management for file operations
-- 11. No streaming uploads or downloads
-- 12. Limited duplicate detection and deduplication
-- 13. Complex permission and access control implementation
-- 14. Poor integration with application object models
-- 15. No automatic metadata extraction capabilities

MongoDB GridFS provides comprehensive file storage capabilities:

// MongoDB GridFS - comprehensive large file storage with built-in optimization
const { MongoClient, GridFSBucket, ObjectId } = require('mongodb');
const fs = require('fs');
const crypto = require('crypto');
const path = require('path');

const client = new MongoClient('mongodb+srv://username:[email protected]');
const db = client.db('file_storage_platform');

// Advanced GridFS file management system
class AdvancedGridFSManager {
  constructor(db, options = {}) {
    this.db = db;
    this.buckets = new Map();

    // Default bucket for general files
    this.defaultBucket = new GridFSBucket(db, {
      bucketName: 'files',
      chunkSizeBytes: options.chunkSize || 255 * 1024, // 255KB chunks
      writeConcern: { w: 'majority', j: true },
      readConcern: { level: 'majority' }
    });

    this.buckets.set('files', this.defaultBucket);

    // Specialized buckets for different content types
    this.setupSpecializedBuckets(options);

    // Configuration
    this.config = {
      maxFileSize: options.maxFileSize || 5 * 1024 * 1024 * 1024, // 5GB
      enableCompression: options.enableCompression !== false,
      enableDeduplication: options.enableDeduplication !== false,
      enableThumbnails: options.enableThumbnails !== false,
      enableMetadataExtraction: options.enableMetadataExtraction !== false,
      supportedMimeTypes: options.supportedMimeTypes || [
        'image/*', 'video/*', 'audio/*', 'application/pdf',
        'application/msword', 'application/vnd.openxmlformats-officedocument.*',
        'text/*', 'application/json', 'application/zip'
      ],

      // Advanced features
      enableVersioning: options.enableVersioning || false,
      enableEncryption: options.enableEncryption || false,
      enableAuditLogging: options.enableAuditLogging !== false
    };

    this.setupIndexes();
    this.initializeFileProcessing();
  }

  setupSpecializedBuckets(options) {
    // Images bucket with smaller chunks for better streaming
    const imagesBucket = new GridFSBucket(this.db, {
      bucketName: 'images',
      chunkSizeBytes: 64 * 1024 // 64KB for images
    });
    this.buckets.set('images', imagesBucket);

    // Videos bucket with larger chunks for efficiency
    const videosBucket = new GridFSBucket(this.db, {
      bucketName: 'videos', 
      chunkSizeBytes: 1024 * 1024 // 1MB for videos
    });
    this.buckets.set('videos', videosBucket);

    // Documents bucket for office files and PDFs
    const documentsBucket = new GridFSBucket(this.db, {
      bucketName: 'documents',
      chunkSizeBytes: 256 * 1024 // 256KB for documents
    });
    this.buckets.set('documents', documentsBucket);

    // Archives bucket for compressed files
    const archivesBucket = new GridFSBucket(this.db, {
      bucketName: 'archives',
      chunkSizeBytes: 512 * 1024 // 512KB for archives
    });
    this.buckets.set('archives', archivesBucket);
  }

  async setupIndexes() {
    console.log('Setting up GridFS indexes...');

    try {
      // Create indexes for each bucket
      for (const [bucketName, bucket] of this.buckets) {
        const filesCollection = this.db.collection(`${bucketName}.files`);
        const chunksCollection = this.db.collection(`${bucketName}.chunks`);

        // Files collection indexes
        await filesCollection.createIndexes([
          { key: { filename: 1, uploadDate: -1 } },
          { key: { 'metadata.contentType': 1, uploadDate: -1 } },
          { key: { 'metadata.tags': 1 } },
          { key: { 'metadata.category': 1, uploadDate: -1 } },
          { key: { 'metadata.uploadedBy': 1, uploadDate: -1 } },
          { key: { 'metadata.fileHash': 1 }, unique: true, sparse: true },
          { key: { 'metadata.isPublic': 1, uploadDate: -1 } },

          // Text search index for filename and metadata
          { key: { 
            filename: 'text', 
            'metadata.description': 'text',
            'metadata.tags': 'text'
          } },

          // Compound indexes for common queries
          { key: { 
            'metadata.contentType': 1, 
            'metadata.isPublic': 1, 
            uploadDate: -1 
          } }
        ]);

        // Chunks collection index (usually created automatically)
        await chunksCollection.createIndex({ files_id: 1, n: 1 }, { unique: true });
      }

      console.log('GridFS indexes created successfully');
    } catch (error) {
      console.error('Error setting up GridFS indexes:', error);
      throw error;
    }
  }

  async uploadFile(fileBuffer, filename, options = {}) {
    console.log(`Uploading file: ${filename} (${fileBuffer.length} bytes)`);

    try {
      // Validate file
      await this.validateFile(fileBuffer, filename, options);

      // Determine bucket based on content type
      const contentType = options.contentType || this.detectContentType(filename);
      const bucket = this.selectBucket(contentType);

      // Check for duplicates if enabled
      let existingFile = null;
      if (this.config.enableDeduplication) {
        existingFile = await this.checkForDuplicate(fileBuffer, options);
        if (existingFile) {
          console.log(`Duplicate file found: ${existingFile._id}`);
          return await this.handleDuplicate(existingFile, filename, options);
        }
      }

      // Prepare metadata
      const metadata = await this.prepareFileMetadata(fileBuffer, filename, contentType, options);

      // Create upload stream
      const uploadStream = bucket.openUploadStream(filename, {
        metadata: metadata,
        chunkSizeBytes: this.getOptimalChunkSize(fileBuffer.length, contentType)
      });

      return new Promise((resolve, reject) => {
        uploadStream.on('error', reject);
        uploadStream.on('finish', async () => {
          console.log(`File uploaded successfully: ${uploadStream.id}`);

          try {
            // Post-upload processing
            const processingResult = await this.postUploadProcessing(
              uploadStream.id, 
              fileBuffer, 
              filename, 
              contentType, 
              metadata
            );

            // Return comprehensive file information
            resolve({
              fileId: uploadStream.id,
              filename: filename,
              contentType: contentType,
              size: fileBuffer.length,
              metadata: metadata,
              bucket: bucket.bucketName,
              uploadDate: new Date(),
              processingResult: processingResult,
              downloadUrl: `/api/files/${uploadStream.id}`,
              thumbnailUrl: processingResult.thumbnail ? 
                `/api/files/${uploadStream.id}/thumbnail` : null
            });

          } catch (processingError) {
            console.error('Post-upload processing failed:', processingError);
            // Still resolve with basic file info even if processing fails
            resolve({
              fileId: uploadStream.id,
              filename: filename,
              contentType: contentType,
              size: fileBuffer.length,
              metadata: metadata,
              bucket: bucket.bucketName,
              uploadDate: new Date(),
              warning: 'Post-upload processing failed'
            });
          }
        });

        // Write file buffer to stream
        uploadStream.end(fileBuffer);
      });

    } catch (error) {
      console.error(`Error uploading file ${filename}:`, error);
      throw error;
    }
  }

  async validateFile(fileBuffer, filename, options) {
    // File size validation
    if (fileBuffer.length > this.config.maxFileSize) {
      throw new Error(
        `File too large: ${fileBuffer.length} bytes (max: ${this.config.maxFileSize})`
      );
    }

    if (fileBuffer.length === 0) {
      throw new Error('Cannot upload empty file');
    }

    // Content type validation
    const contentType = options.contentType || this.detectContentType(filename);
    if (!this.isContentTypeSupported(contentType)) {
      throw new Error(`Unsupported file type: ${contentType}`);
    }

    // Filename validation
    if (!filename || filename.trim().length === 0) {
      throw new Error('Filename is required');
    }

    // Check for malicious content (basic checks)
    await this.scanForMaliciousContent(fileBuffer, filename, contentType);
  }

  async prepareFileMetadata(fileBuffer, filename, contentType, options) {
    const fileHash = crypto.createHash('sha256').update(fileBuffer).digest('hex');

    const metadata = {
      originalFilename: filename,
      contentType: contentType,
      fileSize: fileBuffer.length,
      fileHash: fileHash,
      uploadedBy: options.uploadedBy || null,
      uploadedAt: new Date(),

      // File characteristics
      mimeType: contentType,
      fileExtension: path.extname(filename).toLowerCase(),

      // User-provided metadata
      description: options.description || null,
      tags: options.tags || [],
      category: options.category || this.categorizeByContentType(contentType),
      isPublic: options.isPublic !== false,
      accessLevel: options.accessLevel || 'public',

      // System metadata
      version: options.version || 1,
      parentFileId: options.parentFileId || null,
      processingStatus: 'pending',

      // Storage information
      compressionType: null,
      encryptionType: options.enableEncryption ? 'AES256' : null,

      // Additional metadata
      customFields: options.customFields || {}
    };

    // Extract additional metadata based on file type
    if (this.config.enableMetadataExtraction) {
      const extractedMetadata = await this.extractFileMetadata(fileBuffer, contentType);
      metadata.extracted = extractedMetadata;
    }

    return metadata;
  }

  async extractFileMetadata(fileBuffer, contentType) {
    const metadata = {};

    try {
      if (contentType.startsWith('image/')) {
        // Extract image metadata (simplified - would use actual image processing library)
        metadata.imageInfo = {
          format: contentType.split('/')[1],
          // In production, use libraries like sharp or jimp for actual metadata extraction
          estimated_width: null,
          estimated_height: null,
          color_space: null,
          has_transparency: null
        };
      } else if (contentType.startsWith('video/')) {
        // Extract video metadata
        metadata.videoInfo = {
          format: contentType.split('/')[1],
          estimated_duration: null,
          estimated_bitrate: null,
          estimated_resolution: null
        };
      } else if (contentType === 'application/pdf') {
        // Extract PDF metadata
        metadata.documentInfo = {
          estimated_page_count: null,
          estimated_word_count: null,
          has_text_layer: null,
          has_forms: null
        };
      }
    } catch (extractionError) {
      console.error('Metadata extraction failed:', extractionError);
      metadata.extraction_error = extractionError.message;
    }

    return metadata;
  }

  async postUploadProcessing(fileId, fileBuffer, filename, contentType, metadata) {
    console.log(`Starting post-upload processing for file: ${fileId}`);

    const processingResult = {
      thumbnail: null,
      textContent: null,
      additionalFormats: [],
      processingErrors: []
    };

    try {
      // Generate thumbnail for images and videos
      if (this.config.enableThumbnails) {
        if (contentType.startsWith('image/') || contentType.startsWith('video/')) {
          processingResult.thumbnail = await this.generateThumbnail(
            fileId, 
            fileBuffer, 
            contentType
          );
        }
      }

      // Extract text content for searchable documents
      if (this.isTextExtractable(contentType)) {
        processingResult.textContent = await this.extractTextContent(
          fileBuffer, 
          contentType
        );
      }

      // Update processing status
      await this.updateFileMetadata(fileId, {
        'metadata.processingStatus': 'completed',
        'metadata.processingResult': processingResult,
        'metadata.processedAt': new Date()
      });

    } catch (processingError) {
      console.error('Post-upload processing failed:', processingError);
      processingResult.processingErrors.push(processingError.message);

      await this.updateFileMetadata(fileId, {
        'metadata.processingStatus': 'failed',
        'metadata.processingError': processingError.message,
        'metadata.processedAt': new Date()
      });
    }

    return processingResult;
  }

  async downloadFile(fileId, options = {}) {
    console.log(`Downloading file: ${fileId}`);

    try {
      // Get file metadata first
      const fileInfo = await this.getFileInfo(fileId);
      if (!fileInfo) {
        throw new Error(`File not found: ${fileId}`);
      }

      // Check permissions
      if (!this.hasAccessPermission(fileInfo, options.user)) {
        throw new Error('Access denied');
      }

      // Select appropriate bucket
      const bucket = this.selectBucketByFileInfo(fileInfo);

      // Create download stream
      const downloadStream = bucket.openDownloadStream(new ObjectId(fileId));

      // Handle range requests for partial downloads
      if (options.range) {
        return this.handleRangeRequest(downloadStream, fileInfo, options.range);
      }

      // Return full file stream
      return {
        stream: downloadStream,
        fileInfo: fileInfo,
        contentType: fileInfo.metadata.contentType,
        filename: fileInfo.filename,
        size: fileInfo.length
      };

    } catch (error) {
      console.error(`Error downloading file ${fileId}:`, error);
      throw error;
    }
  }

  async searchFiles(query, options = {}) {
    console.log(`Searching files with query: ${query}`);

    try {
      const searchCriteria = this.buildSearchCriteria(query, options);
      const pipeline = this.buildFileSearchPipeline(searchCriteria, options);

      // Execute search across all relevant buckets
      const results = [];
      const buckets = options.bucket ? [options.bucket] : Array.from(this.buckets.keys());

      for (const bucketName of buckets) {
        const filesCollection = this.db.collection(`${bucketName}.files`);
        const bucketResults = await filesCollection.aggregate(pipeline).toArray();

        // Add bucket information to results
        bucketResults.forEach(result => {
          result.bucket = bucketName;
          result.downloadUrl = `/api/files/${result._id}`;
          result.thumbnailUrl = result.metadata.processingResult?.thumbnail ? 
            `/api/files/${result._id}/thumbnail` : null;
        });

        results.push(...bucketResults);
      }

      // Sort combined results
      const sortedResults = this.sortSearchResults(results, options);

      // Apply pagination
      const limit = options.limit || 20;
      const offset = options.offset || 0;
      const paginatedResults = sortedResults.slice(offset, offset + limit);

      return {
        files: paginatedResults,
        totalCount: results.length,
        query: query,
        searchOptions: options,
        executionTime: Date.now() - (options.startTime || Date.now())
      };

    } catch (error) {
      console.error('Error searching files:', error);
      throw error;
    }
  }

  buildSearchCriteria(query, options) {
    const criteria = { $and: [] };

    // Text search
    if (query && query.trim().length > 0) {
      criteria.$and.push({
        $or: [
          { filename: { $regex: query, $options: 'i' } },
          { 'metadata.description': { $regex: query, $options: 'i' } },
          { 'metadata.tags': { $in: [new RegExp(query, 'i')] } },
          { $text: { $search: query } }
        ]
      });
    }

    // Content type filter
    if (options.contentType) {
      criteria.$and.push({
        'metadata.contentType': options.contentType
      });
    }

    // Category filter
    if (options.category) {
      criteria.$and.push({
        'metadata.category': options.category
      });
    }

    // Access level filter
    if (options.accessLevel) {
      criteria.$and.push({
        'metadata.accessLevel': options.accessLevel
      });
    }

    // Date range filter
    if (options.dateFrom || options.dateTo) {
      const dateFilter = {};
      if (options.dateFrom) dateFilter.$gte = new Date(options.dateFrom);
      if (options.dateTo) dateFilter.$lte = new Date(options.dateTo);
      criteria.$and.push({ uploadDate: dateFilter });
    }

    // Size filter
    if (options.minSize || options.maxSize) {
      const sizeFilter = {};
      if (options.minSize) sizeFilter.$gte = options.minSize;
      if (options.maxSize) sizeFilter.$lte = options.maxSize;
      criteria.$and.push({ length: sizeFilter });
    }

    // User filter
    if (options.uploadedBy) {
      criteria.$and.push({
        'metadata.uploadedBy': options.uploadedBy
      });
    }

    // Public access filter
    if (options.publicOnly) {
      criteria.$and.push({
        'metadata.isPublic': true
      });
    }

    return criteria.$and.length > 0 ? criteria : {};
  }

  buildFileSearchPipeline(criteria, options) {
    const pipeline = [];

    // Match stage
    if (Object.keys(criteria).length > 0) {
      pipeline.push({ $match: criteria });
    }

    // Add computed fields for relevance scoring
    pipeline.push({
      $addFields: {
        relevanceScore: {
          $add: [
            // Filename match bonus
            {
              $cond: {
                if: { $regexMatch: { input: '$filename', regex: options.query || '', options: 'i' } },
                then: 3,
                else: 0
              }
            },
            // Size factor (prefer reasonable file sizes)
            {
              $cond: {
                if: { $and: [{ $gte: ['$length', 1000] }, { $lte: ['$length', 10000000] }] },
                then: 1,
                else: 0
              }
            },
            // Recency bonus
            {
              $cond: {
                if: { $gte: ['$uploadDate', new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)] },
                then: 2,
                else: 0
              }
            },
            // Processing status bonus
            {
              $cond: {
                if: { $eq: ['$metadata.processingStatus', 'completed'] },
                then: 1,
                else: 0
              }
            }
          ]
        },

        // Formatted file size
        formattedSize: {
          $switch: {
            branches: [
              { case: { $lt: ['$length', 1024] }, then: { $concat: [{ $toString: '$length' }, ' bytes'] } },
              { case: { $lt: ['$length', 1048576] }, then: { $concat: [{ $toString: { $round: [{ $divide: ['$length', 1024] }, 1] } }, ' KB'] } },
              { case: { $lt: ['$length', 1073741824] }, then: { $concat: [{ $toString: { $round: [{ $divide: ['$length', 1048576] }, 1] } }, ' MB'] } }
            ],
            default: { $concat: [{ $toString: { $round: [{ $divide: ['$length', 1073741824] }, 1] } }, ' GB'] }
          }
        }
      }
    });

    // Project relevant fields
    pipeline.push({
      $project: {
        _id: 1,
        filename: 1,
        length: 1,
        formattedSize: 1,
        uploadDate: 1,
        relevanceScore: 1,
        'metadata.contentType': 1,
        'metadata.category': 1,
        'metadata.description': 1,
        'metadata.tags': 1,
        'metadata.isPublic': 1,
        'metadata.uploadedBy': 1,
        'metadata.processingStatus': 1,
        'metadata.processingResult': 1,
        'metadata.fileHash': 1
      }
    });

    return pipeline;
  }

  // Utility methods

  selectBucket(contentType) {
    if (contentType.startsWith('image/')) return this.buckets.get('images');
    if (contentType.startsWith('video/')) return this.buckets.get('videos');
    if (contentType.includes('pdf') || contentType.includes('document')) return this.buckets.get('documents');
    if (contentType.includes('zip') || contentType.includes('archive')) return this.buckets.get('archives');
    return this.defaultBucket;
  }

  detectContentType(filename) {
    const ext = path.extname(filename).toLowerCase();
    const mimeTypes = {
      '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif',
      '.mp4': 'video/mp4', '.mov': 'video/quicktime', '.avi': 'video/x-msvideo',
      '.mp3': 'audio/mpeg', '.wav': 'audio/wav', '.flac': 'audio/flac',
      '.pdf': 'application/pdf', '.doc': 'application/msword',
      '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
      '.zip': 'application/zip', '.tar': 'application/x-tar', '.gz': 'application/gzip',
      '.txt': 'text/plain', '.csv': 'text/csv', '.json': 'application/json'
    };
    return mimeTypes[ext] || 'application/octet-stream';
  }

  isContentTypeSupported(contentType) {
    return this.config.supportedMimeTypes.some(pattern => 
      pattern.endsWith('*') ? 
        contentType.startsWith(pattern.slice(0, -1)) : 
        contentType === pattern
    );
  }

  categorizeByContentType(contentType) {
    if (contentType.startsWith('image/')) return 'images';
    if (contentType.startsWith('video/')) return 'videos';  
    if (contentType.startsWith('audio/')) return 'audio';
    if (contentType.includes('pdf')) return 'documents';
    if (contentType.includes('document') || contentType.includes('text')) return 'documents';
    if (contentType.includes('zip') || contentType.includes('archive')) return 'archives';
    return 'misc';
  }

  getOptimalChunkSize(fileSize, contentType) {
    // Optimize chunk size based on file size and type
    if (contentType.startsWith('image/') && fileSize < 1024 * 1024) return 64 * 1024; // 64KB for small images
    if (contentType.startsWith('video/')) return 1024 * 1024; // 1MB for videos
    if (fileSize > 100 * 1024 * 1024) return 512 * 1024; // 512KB for large files
    return 255 * 1024; // Default 255KB
  }
}

// Benefits of MongoDB GridFS:
// - Automatic file chunking and reassembly
// - Built-in streaming for large files
// - Integrated metadata storage and indexing
// - High-performance binary data operations
// - Automatic replication and sharding support
// - ACID transactions for file operations
// - Advanced query capabilities on file metadata
// - Built-in compression and optimization
// - Seamless integration with MongoDB operations
// - Production-ready scalability and performance

module.exports = { AdvancedGridFSManager };

File Management and Advanced Operations

Comprehensive File Operations and Metadata Management

Implement sophisticated file management capabilities:

// Advanced file management operations with GridFS
class ProductionGridFSOperations extends AdvancedGridFSManager {
  constructor(db, options) {
    super(db, options);
    this.setupAdvancedCapabilities();
  }

  async implementAdvancedFileOperations() {
    console.log('Setting up advanced GridFS operations...');

    // File versioning system
    await this.setupFileVersioning();

    // Duplicate detection and deduplication
    await this.setupDeduplicationSystem();

    // File sharing and collaboration
    await this.setupFileSharingSystem();

    // Automated file lifecycle management
    await this.setupLifecycleManagement();

    // File analytics and reporting
    await this.setupFileAnalytics();
  }

  async createFileVersion(originalFileId, fileBuffer, versionMetadata = {}) {
    console.log(`Creating new version for file: ${originalFileId}`);

    try {
      // Get original file information
      const originalFile = await this.getFileInfo(originalFileId);
      if (!originalFile) {
        throw new Error(`Original file not found: ${originalFileId}`);
      }

      // Increment version number
      const newVersion = (originalFile.metadata.version || 1) + 1;

      // Upload new version with linked metadata
      const uploadOptions = {
        ...versionMetadata,
        parentFileId: originalFileId,
        version: newVersion,
        originalFilename: originalFile.filename,
        versionType: versionMetadata.versionType || 'update',
        versionComment: versionMetadata.comment || 'Updated version',
        uploadedBy: versionMetadata.uploadedBy,
        contentType: originalFile.metadata.contentType
      };

      const newVersionFile = await this.uploadFile(fileBuffer, originalFile.filename, uploadOptions);

      // Update version history in original file
      await this.updateFileMetadata(originalFileId, {
        'metadata.hasVersions': true,
        'metadata.latestVersion': newVersion,
        'metadata.latestVersionId': newVersionFile.fileId,
        $push: {
          'metadata.versionHistory': {
            versionId: newVersionFile.fileId,
            version: newVersion,
            createdAt: new Date(),
            createdBy: versionMetadata.uploadedBy,
            comment: versionMetadata.comment || '',
            fileSize: fileBuffer.length
          }
        }
      });

      return {
        originalFileId: originalFileId,
        newVersionId: newVersionFile.fileId,
        version: newVersion,
        versionInfo: newVersionFile
      };

    } catch (error) {
      console.error('Error creating file version:', error);
      throw error;
    }
  }

  async getFileVersionHistory(fileId) {
    console.log(`Getting version history for file: ${fileId}`);

    try {
      const file = await this.getFileInfo(fileId);
      if (!file || !file.metadata.hasVersions) {
        return { fileId: fileId, versions: [] };
      }

      // Get all versions
      const pipeline = [
        {
          $match: {
            $or: [
              { _id: new ObjectId(fileId) },
              { 'metadata.parentFileId': fileId }
            ]
          }
        },
        {
          $sort: { 'metadata.version': 1 }
        },
        {
          $project: {
            _id: 1,
            filename: 1,
            length: 1,
            uploadDate: 1,
            'metadata.version': 1,
            'metadata.versionType': 1,
            'metadata.versionComment': 1,
            'metadata.uploadedBy': 1,
            'metadata.contentType': 1
          }
        }
      ];

      const filesCollection = this.db.collection('files.files');
      const versions = await filesCollection.aggregate(pipeline).toArray();

      return {
        fileId: fileId,
        originalFile: file,
        versions: versions,
        totalVersions: versions.length
      };

    } catch (error) {
      console.error('Error getting version history:', error);
      throw error;
    }
  }

  async shareFile(fileId, shareOptions = {}) {
    console.log(`Creating file share for: ${fileId}`);

    try {
      const file = await this.getFileInfo(fileId);
      if (!file) {
        throw new Error(`File not found: ${fileId}`);
      }

      // Generate share token
      const shareToken = crypto.randomBytes(32).toString('hex');

      const shareRecord = {
        _id: new ObjectId(),
        fileId: fileId,
        shareToken: shareToken,
        sharedBy: shareOptions.sharedBy,
        createdAt: new Date(),
        expiresAt: shareOptions.expiresAt || new Date(Date.now() + 7 * 24 * 60 * 60 * 1000), // 7 days

        // Share settings
        allowDownload: shareOptions.allowDownload !== false,
        allowView: shareOptions.allowView !== false,
        allowComment: shareOptions.allowComment || false,
        requireAuth: shareOptions.requireAuth || false,

        // Access tracking
        accessCount: 0,
        lastAccessedAt: null,
        accessLog: [],

        // Share metadata
        shareNote: shareOptions.note || '',
        shareName: shareOptions.name || `Share of ${file.filename}`,
        shareType: shareOptions.shareType || 'public_link',

        // Restrictions
        maxDownloads: shareOptions.maxDownloads || null,
        allowedDomains: shareOptions.allowedDomains || [],
        allowedUsers: shareOptions.allowedUsers || []
      };

      // Store share record
      const sharesCollection = this.db.collection('file_shares');
      await sharesCollection.insertOne(shareRecord);

      // Update file metadata
      await this.updateFileMetadata(fileId, {
        'metadata.isShared': true,
        'metadata.shareCount': { $inc: 1 },
        $push: {
          'metadata.shareHistory': {
            shareId: shareRecord._id,
            shareToken: shareToken,
            createdAt: new Date(),
            sharedBy: shareOptions.sharedBy
          }
        }
      });

      return {
        shareId: shareRecord._id,
        shareToken: shareToken,
        shareUrl: `/api/shared/${shareToken}`,
        expiresAt: shareRecord.expiresAt,
        shareSettings: {
          allowDownload: shareRecord.allowDownload,
          allowView: shareRecord.allowView,
          allowComment: shareRecord.allowComment
        }
      };

    } catch (error) {
      console.error('Error creating file share:', error);
      throw error;
    }
  }

  async analyzeStorageUsage(options = {}) {
    console.log('Analyzing GridFS storage usage...');

    try {
      const analysisResults = {};

      // Analyze each bucket
      for (const [bucketName, bucket] of this.buckets) {
        const filesCollection = this.db.collection(`${bucketName}.files`);

        const bucketAnalysis = await filesCollection.aggregate([
          {
            $group: {
              _id: null,
              totalFiles: { $sum: 1 },
              totalSize: { $sum: '$length' },
              avgFileSize: { $avg: '$length' },
              maxFileSize: { $max: '$length' },
              minFileSize: { $min: '$length' },

              // Content type distribution
              contentTypes: {
                $push: '$metadata.contentType'
              },

              // Upload date analysis
              oldestFile: { $min: '$uploadDate' },
              newestFile: { $max: '$uploadDate' },

              // User analysis
              uploaders: {
                $addToSet: '$metadata.uploadedBy'
              }
            }
          },
          {
            $addFields: {
              // Content type statistics
              contentTypeStats: {
                $reduce: {
                  input: '$contentTypes',
                  initialValue: {},
                  in: {
                    $mergeObjects: [
                      '$$value',
                      {
                        $arrayToObject: [
                          [{ k: '$$this', v: { $add: [{ $ifNull: [{ $getField: { field: '$$this', input: '$$value' } }, 0] }, 1] } }]
                        ]
                      }
                    ]
                  }
                }
              },

              // Storage efficiency metrics
              avgChunkCount: {
                $divide: ['$totalSize', 255 * 1024] // Assuming 255KB chunks
              },

              storageEfficiency: {
                $multiply: [
                  { $divide: ['$totalSize', { $add: ['$totalSize', { $multiply: ['$totalFiles', 1024] }] }] }, // Account for metadata overhead
                  100
                ]
              }
            }
          }
        ]).toArray();

        // Additional bucket-specific analysis
        const categoryAnalysis = await filesCollection.aggregate([
          {
            $group: {
              _id: '$metadata.category',
              fileCount: { $sum: 1 },
              totalSize: { $sum: '$length' },
              avgSize: { $avg: '$length' }
            }
          },
          { $sort: { fileCount: -1 } }
        ]).toArray();

        const recentActivity = await filesCollection.aggregate([
          {
            $match: {
              uploadDate: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
            }
          },
          {
            $group: {
              _id: { 
                $dateToString: { format: '%Y-%m-%d', date: '$uploadDate' }
              },
              filesUploaded: { $sum: 1 },
              sizeUploaded: { $sum: '$length' }
            }
          },
          { $sort: { _id: 1 } }
        ]).toArray();

        analysisResults[bucketName] = {
          overview: bucketAnalysis[0] || {},
          categoryBreakdown: categoryAnalysis,
          recentActivity: recentActivity,
          recommendations: this.generateStorageRecommendations(bucketAnalysis[0], categoryAnalysis)
        };
      }

      // Overall system analysis
      const systemStats = {
        totalBuckets: this.buckets.size,
        analysisDate: new Date(),
        recommendations: this.generateSystemRecommendations(analysisResults)
      };

      return {
        systemStats: systemStats,
        bucketAnalysis: analysisResults,
        summary: this.generateStorageSummary(analysisResults)
      };

    } catch (error) {
      console.error('Error analyzing storage usage:', error);
      throw error;
    }
  }

  generateStorageRecommendations(bucketStats, categoryStats) {
    const recommendations = [];

    if (bucketStats) {
      // Size-based recommendations
      if (bucketStats.avgFileSize > 50 * 1024 * 1024) { // 50MB
        recommendations.push({
          type: 'optimization',
          priority: 'medium',
          message: 'Large average file size detected. Consider implementing file compression.',
          action: 'Enable compression for new uploads'
        });
      }

      if (bucketStats.totalFiles > 10000) {
        recommendations.push({
          type: 'performance',
          priority: 'high', 
          message: 'High file count may impact query performance.',
          action: 'Consider implementing file archiving or additional indexing'
        });
      }

      if (bucketStats.storageEfficiency < 85) {
        recommendations.push({
          type: 'efficiency',
          priority: 'low',
          message: 'Storage efficiency could be improved.',
          action: 'Review chunk size settings and consider deduplication'
        });
      }
    }

    // Category-based recommendations
    if (categoryStats) {
      const topCategory = categoryStats[0];
      if (topCategory && topCategory.avgSize > 100 * 1024 * 1024) { // 100MB
        recommendations.push({
          type: 'category_optimization',
          priority: 'medium',
          message: `Category "${topCategory._id}" has large average file sizes.`,
          action: 'Consider specialized handling for this content type'
        });
      }
    }

    return recommendations;
  }

  async cleanupExpiredShares() {
    console.log('Cleaning up expired file shares...');

    try {
      const sharesCollection = this.db.collection('file_shares');

      // Find expired shares
      const expiredShares = await sharesCollection.find({
        expiresAt: { $lt: new Date() }
      }).toArray();

      if (expiredShares.length === 0) {
        console.log('No expired shares found');
        return { deletedCount: 0, updatedFiles: 0 };
      }

      // Remove expired shares
      const deleteResult = await sharesCollection.deleteMany({
        expiresAt: { $lt: new Date() }
      });

      // Update affected files
      const fileIds = expiredShares.map(share => share.fileId);
      const updateResult = await this.db.collection('files.files').updateMany(
        { _id: { $in: fileIds } },
        {
          $set: { 'metadata.isShared': false },
          $unset: { 'metadata.activeShares': '' }
        }
      );

      console.log(`Cleaned up ${deleteResult.deletedCount} expired shares`);

      return {
        deletedCount: deleteResult.deletedCount,
        updatedFiles: updateResult.modifiedCount,
        expiredShareIds: expiredShares.map(s => s._id)
      };

    } catch (error) {
      console.error('Error cleaning up expired shares:', error);
      throw error;
    }
  }
}

QueryLeaf GridFS Integration

QueryLeaf provides familiar SQL syntax for GridFS file operations and management:

-- QueryLeaf GridFS operations with SQL-familiar syntax

-- File upload and metadata management using SQL-style syntax
INSERT INTO gridfs_files (
  filename, 
  content, 
  content_type, 
  metadata
) VALUES (
  'product_manual.pdf',
  FILE_CONTENT('/path/to/local/file.pdf'),
  'application/pdf',
  JSON_OBJECT(
    'category', 'documentation',
    'tags', ARRAY['manual', 'product', 'guide'],
    'description', 'Product user manual version 2.1',
    'uploadedBy', CURRENT_USER_ID(),
    'accessLevel', 'public',
    'department', 'customer_support'
  )
);

-- Advanced file search with metadata filtering and full-text search
SELECT 
  f.file_id,
  f.filename,
  f.content_type,
  f.file_size,
  FORMAT_BYTES(f.file_size) as formatted_size,
  f.upload_date,
  f.metadata->>'category' as category,
  f.metadata->>'description' as description,
  JSON_EXTRACT(f.metadata, '$.tags') as tags,
  f.metadata->>'uploadedBy' as uploaded_by,

  -- File characteristics and analysis
  CASE f.content_type
    WHEN 'image/jpeg' THEN 'Image'
    WHEN 'image/png' THEN 'Image'
    WHEN 'application/pdf' THEN 'Document'
    WHEN 'video/mp4' THEN 'Video'
    WHEN 'audio/mpeg' THEN 'Audio'
    ELSE 'Other'
  END as file_type,

  -- File age and recency
  EXTRACT(DAYS FROM CURRENT_DATE - f.upload_date) as days_old,
  CASE 
    WHEN f.upload_date > CURRENT_DATE - INTERVAL '7 days' THEN 'Recent'
    WHEN f.upload_date > CURRENT_DATE - INTERVAL '30 days' THEN 'Current'
    WHEN f.upload_date > CURRENT_DATE - INTERVAL '90 days' THEN 'Older'
    ELSE 'Archive'
  END as age_category,

  -- Access and sharing information
  f.metadata->>'isPublic' as is_public,
  f.metadata->>'accessLevel' as access_level,
  COALESCE(f.metadata->>'shareCount', '0')::INTEGER as share_count,

  -- Processing status
  f.metadata->>'processingStatus' as processing_status,
  CASE f.metadata->>'processingStatus'
    WHEN 'completed' THEN '✓ Processed'
    WHEN 'pending' THEN '⏳ Processing'
    WHEN 'failed' THEN '❌ Failed'
    ELSE '❓ Unknown'
  END as processing_display,

  -- File URLs and access
  CONCAT('/api/files/', f.file_id, '/download') as download_url,
  CONCAT('/api/files/', f.file_id, '/view') as view_url,
  CASE 
    WHEN f.metadata->'processingResult'->>'thumbnail' IS NOT NULL 
    THEN CONCAT('/api/files/', f.file_id, '/thumbnail')
    ELSE NULL
  END as thumbnail_url,

  -- Relevance scoring for search
  (
    CASE 
      WHEN f.filename ILIKE '%search_term%' THEN 5
      WHEN f.metadata->>'description' ILIKE '%search_term%' THEN 3
      WHEN JSON_EXTRACT(f.metadata, '$.tags') @> '["search_term"]' THEN 4
      ELSE 1
    END +
    CASE f.metadata->>'processingStatus'
      WHEN 'completed' THEN 2
      ELSE 0
    END +
    CASE 
      WHEN f.upload_date > CURRENT_DATE - INTERVAL '30 days' THEN 1
      ELSE 0
    END
  ) as relevance_score

FROM gridfs_files f
WHERE 
  -- Text search across filename, description, and tags
  (
    f.filename ILIKE '%document%' OR
    f.metadata->>'description' ILIKE '%document%' OR
    JSON_EXTRACT(f.metadata, '$.tags') @> '["document"]'
  )

  -- Content type filtering
  AND f.content_type IN ('application/pdf', 'application/msword', 'text/plain')

  -- Access level filtering
  AND f.metadata->>'accessLevel' IN ('public', 'internal')

  -- Size filtering (documents between 1KB and 50MB)
  AND f.file_size BETWEEN 1024 AND 52428800

  -- Date range filtering (last 6 months)
  AND f.upload_date >= CURRENT_DATE - INTERVAL '6 months'

ORDER BY relevance_score DESC, f.upload_date DESC
LIMIT 25;

-- File analytics and storage insights
WITH file_statistics AS (
  SELECT 
    COUNT(*) as total_files,
    SUM(f.file_size) as total_storage_bytes,
    AVG(f.file_size) as avg_file_size,
    MIN(f.file_size) as smallest_file,
    MAX(f.file_size) as largest_file,
    COUNT(*) FILTER (WHERE f.upload_date > CURRENT_DATE - INTERVAL '30 days') as recent_uploads,
    COUNT(*) FILTER (WHERE f.metadata->>'processingStatus' = 'completed') as processed_files,
    COUNT(DISTINCT f.metadata->>'uploadedBy') as unique_uploaders,

    -- Content type distribution
    JSON_OBJECT_AGG(
      CASE f.content_type
        WHEN 'image/jpeg' THEN 'JPEG Images'
        WHEN 'image/png' THEN 'PNG Images'
        WHEN 'application/pdf' THEN 'PDF Documents'
        WHEN 'video/mp4' THEN 'MP4 Videos'
        WHEN 'audio/mpeg' THEN 'MP3 Audio'
        ELSE 'Other Files'
      END,
      COUNT(*)
    ) as content_type_distribution,

    -- Category breakdown
    JSON_OBJECT_AGG(
      COALESCE(f.metadata->>'category', 'Uncategorized'),
      COUNT(*)
    ) as category_distribution,

    -- Size category analysis
    JSON_OBJECT(
      'Small (<1MB)', COUNT(*) FILTER (WHERE f.file_size < 1048576),
      'Medium (1-10MB)', COUNT(*) FILTER (WHERE f.file_size BETWEEN 1048576 AND 10485760),
      'Large (10-100MB)', COUNT(*) FILTER (WHERE f.file_size BETWEEN 10485760 AND 104857600),
      'Very Large (>100MB)', COUNT(*) FILTER (WHERE f.file_size > 104857600)
    ) as size_distribution

  FROM gridfs_files f
),
storage_efficiency AS (
  SELECT 
    -- Storage efficiency metrics
    ROUND((fs.total_storage_bytes / (1024.0 * 1024 * 1024))::numeric, 2) as storage_gb,
    ROUND((fs.avg_file_size / 1048576.0)::numeric, 2) as avg_size_mb,

    -- Upload trends
    ROUND((fs.recent_uploads::numeric / fs.total_files * 100)::numeric, 1) as recent_upload_percentage,

    -- Processing efficiency
    ROUND((fs.processed_files::numeric / fs.total_files * 100)::numeric, 1) as processing_success_rate,

    -- Storage growth estimation
    CASE 
      WHEN fs.recent_uploads > 0 THEN
        ROUND((fs.recent_uploads * 12.0 / fs.total_files * fs.total_storage_bytes / (1024.0 * 1024 * 1024))::numeric, 2)
      ELSE 0
    END as estimated_yearly_growth_gb

  FROM file_statistics fs
),
top_uploaders AS (
  SELECT 
    f.metadata->>'uploadedBy' as user_id,
    u.name as user_name,
    COUNT(*) as files_uploaded,
    SUM(f.file_size) as total_bytes_uploaded,
    FORMAT_BYTES(SUM(f.file_size)) as formatted_total_size,
    AVG(f.file_size) as avg_file_size,
    MIN(f.upload_date) as first_upload,
    MAX(f.upload_date) as last_upload,

    -- User activity patterns
    COUNT(*) FILTER (WHERE f.upload_date > CURRENT_DATE - INTERVAL '7 days') as uploads_last_week,
    COUNT(*) FILTER (WHERE f.upload_date > CURRENT_DATE - INTERVAL '30 days') as uploads_last_month,

    -- Content preferences
    MODE() WITHIN GROUP (ORDER BY f.content_type) as most_common_content_type,
    COUNT(DISTINCT f.content_type) as content_type_diversity

  FROM gridfs_files f
  LEFT JOIN users u ON f.metadata->>'uploadedBy' = u.user_id
  GROUP BY f.metadata->>'uploadedBy', u.name
  HAVING COUNT(*) >= 5  -- Only users with at least 5 uploads
  ORDER BY files_uploaded DESC
  LIMIT 10
)

-- Final comprehensive analytics report
SELECT 
  -- Overall statistics
  fs.total_files,
  se.storage_gb as total_storage_gb,
  se.avg_size_mb as average_file_size_mb,
  fs.unique_uploaders,
  se.recent_upload_percentage as recent_activity_percentage,
  se.processing_success_rate as processing_success_percentage,
  se.estimated_yearly_growth_gb,

  -- Distribution insights
  fs.content_type_distribution,
  fs.category_distribution, 
  fs.size_distribution,

  -- Top users summary (as JSON array)
  (
    SELECT JSON_AGG(
      JSON_OBJECT(
        'user_name', tu.user_name,
        'files_uploaded', tu.files_uploaded,
        'total_size', tu.formatted_total_size,
        'uploads_last_month', tu.uploads_last_month
      )
      ORDER BY tu.files_uploaded DESC
    )
    FROM top_uploaders tu
  ) as top_uploaders_summary,

  -- Storage optimization recommendations
  CASE 
    WHEN se.storage_gb > 100 THEN 'Consider implementing file archiving and compression policies'
    WHEN se.recent_upload_percentage > 25 THEN 'High upload activity - monitor storage growth'
    WHEN se.processing_success_rate < 90 THEN 'Review file processing pipeline for efficiency'
    ELSE 'Storage usage is within normal parameters'
  END as optimization_recommendation,

  -- Health indicators
  JSON_OBJECT(
    'storage_health', CASE 
      WHEN se.storage_gb > 500 THEN 'High Usage'
      WHEN se.storage_gb > 100 THEN 'Moderate Usage'
      ELSE 'Low Usage'
    END,
    'activity_level', CASE 
      WHEN se.recent_upload_percentage > 20 THEN 'High Activity'
      WHEN se.recent_upload_percentage > 5 THEN 'Normal Activity'
      ELSE 'Low Activity'
    END,
    'processing_health', CASE 
      WHEN se.processing_success_rate > 95 THEN 'Excellent'
      WHEN se.processing_success_rate > 80 THEN 'Good'
      ELSE 'Needs Attention'
    END
  ) as system_health

FROM file_statistics fs
CROSS JOIN storage_efficiency se;

-- File version management and history tracking
WITH version_analysis AS (
  SELECT 
    f.file_id,
    f.filename,
    f.metadata->>'parentFileId' as parent_file_id,
    (f.metadata->>'version')::INTEGER as version_number,
    f.metadata->>'versionType' as version_type,
    f.metadata->>'versionComment' as version_comment,
    f.file_size,
    f.upload_date as version_date,
    f.metadata->>'uploadedBy' as version_author,

    -- Version relationships
    LAG(f.file_size) OVER (PARTITION BY COALESCE(f.metadata->>'parentFileId', f.file_id) ORDER BY (f.metadata->>'version')::INTEGER) as previous_version_size,
    LAG(f.upload_date) OVER (PARTITION BY COALESCE(f.metadata->>'parentFileId', f.file_id) ORDER BY (f.metadata->>'version')::INTEGER) as previous_version_date,

    -- Version statistics
    COUNT(*) OVER (PARTITION BY COALESCE(f.metadata->>'parentFileId', f.file_id)) as total_versions,
    ROW_NUMBER() OVER (PARTITION BY COALESCE(f.metadata->>'parentFileId', f.file_id) ORDER BY (f.metadata->>'version')::INTEGER DESC) as version_rank

  FROM gridfs_files f
  WHERE f.metadata->>'version' IS NOT NULL
),
version_insights AS (
  SELECT 
    va.*,

    -- Size change analysis
    CASE 
      WHEN va.previous_version_size IS NOT NULL THEN
        va.file_size - va.previous_version_size
      ELSE 0
    END as size_change_bytes,

    CASE 
      WHEN va.previous_version_size IS NOT NULL AND va.previous_version_size > 0 THEN
        ROUND(((va.file_size - va.previous_version_size)::numeric / va.previous_version_size * 100)::numeric, 1)
      ELSE 0
    END as size_change_percentage,

    -- Time between versions
    CASE 
      WHEN va.previous_version_date IS NOT NULL THEN
        EXTRACT(DAYS FROM va.version_date - va.previous_version_date)
      ELSE 0
    END as days_since_previous_version,

    -- Version classification
    CASE va.version_type
      WHEN 'major' THEN '🔴 Major Update'
      WHEN 'minor' THEN '🟡 Minor Update'  
      WHEN 'patch' THEN '🟢 Patch/Fix'
      WHEN 'update' THEN '🔵 Content Update'
      ELSE '⚪ Standard Update'
    END as version_type_display

  FROM version_analysis va
)

SELECT 
  vi.file_id,
  vi.filename,
  vi.version_number,
  vi.version_type_display,
  vi.version_comment,
  FORMAT_BYTES(vi.file_size) as current_size,
  vi.version_date,
  vi.version_author,
  vi.total_versions,

  -- Change analysis
  CASE 
    WHEN vi.size_change_bytes > 0 THEN 
      CONCAT('+', FORMAT_BYTES(vi.size_change_bytes))
    WHEN vi.size_change_bytes < 0 THEN 
      CONCAT('-', FORMAT_BYTES(ABS(vi.size_change_bytes)))
    ELSE 'No change'
  END as size_change_display,

  CONCAT(
    CASE 
      WHEN vi.size_change_percentage > 0 THEN '+'
      ELSE ''
    END,
    vi.size_change_percentage::text, '%'
  ) as size_change_percentage_display,

  -- Version timing
  CASE 
    WHEN vi.days_since_previous_version = 0 THEN 'Same day'
    WHEN vi.days_since_previous_version = 1 THEN '1 day'
    WHEN vi.days_since_previous_version < 7 THEN CONCAT(vi.days_since_previous_version, ' days')
    WHEN vi.days_since_previous_version < 30 THEN CONCAT(ROUND(vi.days_since_previous_version / 7.0, 1), ' weeks')
    ELSE CONCAT(ROUND(vi.days_since_previous_version / 30.0, 1), ' months')
  END as time_since_previous,

  -- Version context
  CASE vi.version_rank
    WHEN 1 THEN 'Latest Version'
    WHEN 2 THEN 'Previous Version'
    ELSE CONCAT('Version -', vi.version_rank - 1)
  END as version_status,

  -- Access URLs
  CONCAT('/api/files/', vi.file_id, '/download') as download_url,
  CONCAT('/api/files/', vi.file_id, '/version-info') as version_info_url,
  CASE 
    WHEN vi.parent_file_id IS NOT NULL THEN 
      CONCAT('/api/files/', vi.parent_file_id, '/versions')
    ELSE 
      CONCAT('/api/files/', vi.file_id, '/versions')
  END as version_history_url

FROM version_insights vi
WHERE vi.total_versions > 1  -- Only show files with multiple versions
ORDER BY vi.filename, vi.version_number DESC;

-- QueryLeaf provides comprehensive GridFS capabilities:
-- 1. Native file upload and download operations with SQL syntax
-- 2. Advanced metadata management and search capabilities
-- 3. Automatic chunking and streaming for large files
-- 4. Built-in file versioning and history tracking
-- 5. Comprehensive file analytics and storage insights
-- 6. Integrated permission and sharing management
-- 7. Automatic file processing and thumbnail generation
-- 8. SQL-familiar syntax for complex file operations
-- 9. Production-ready scalability with MongoDB's GridFS
-- 10. Seamless integration with application data models

Best Practices for GridFS Implementation

Design Guidelines for Production File Storage

Essential practices for MongoDB GridFS deployments:

  1. Content Type Organization: Use specialized buckets for different content types to optimize performance
  2. Metadata Design: Structure metadata for efficient querying and filtering operations
  3. Chunk Size Optimization: Configure appropriate chunk sizes based on file types and access patterns
  4. Index Strategy: Create comprehensive indexes on metadata fields for fast file discovery
  5. Version Management: Implement systematic file versioning for collaborative environments
  6. Access Control: Design permission systems integrated with application security models

Performance and Scalability Optimization

Optimize GridFS for large-scale file storage requirements:

  1. Storage Efficiency: Implement deduplication and compression strategies
  2. Query Optimization: Design metadata structures for efficient search operations
  3. Streaming Operations: Use GridFS streaming for large file uploads and downloads
  4. Caching Strategy: Implement intelligent caching for frequently accessed files
  5. Monitoring: Track storage usage, access patterns, and performance metrics
  6. Cleanup Automation: Automate expired file deletion and storage optimization

Conclusion

MongoDB GridFS provides comprehensive large file storage capabilities that seamlessly integrate with database operations while delivering high performance, automatic chunking, and sophisticated metadata management. Unlike traditional file storage approaches, GridFS maintains ACID properties, supports complex queries, and scales horizontally with your application.

Key GridFS benefits include:

  • Seamless Integration: Native MongoDB integration with consistent APIs and operations
  • Automatic Management: Built-in chunking, streaming, and integrity checking without manual implementation
  • Scalable Architecture: Horizontal scaling with MongoDB's sharding and replication capabilities
  • Rich Metadata: Sophisticated metadata storage and indexing for complex file management scenarios
  • Performance Optimization: Optimized chunk sizes and streaming operations for various content types
  • Production Ready: Enterprise-grade reliability with comprehensive monitoring and analytics

Whether you're building content management systems, media libraries, document repositories, or data archival platforms, MongoDB GridFS with QueryLeaf's familiar SQL interface provides the foundation for scalable file storage solutions.

QueryLeaf Integration: QueryLeaf automatically manages MongoDB GridFS operations through SQL-familiar syntax, handling file uploads, metadata management, and complex file queries seamlessly. Advanced file operations, version management, and storage analytics are accessible through standard SQL constructs, making sophisticated file storage capabilities available to SQL-oriented development teams.

The combination of MongoDB GridFS capabilities with SQL-style file operations makes it an ideal platform for applications requiring both advanced file management and familiar database interaction patterns, ensuring your file storage solutions remain both powerful and maintainable as they scale and evolve.