Skip to content

MongoDB GridFS Advanced File Streaming and Compression: High-Performance Large File Management and Optimization

Modern applications require efficient handling of large files, from media assets and document repositories to backup systems and content delivery networks. Traditional file storage approaches struggle with distributed architectures, automatic failover, and efficient streaming, especially when dealing with multi-gigabyte files or high-throughput workloads.

MongoDB GridFS provides advanced file storage capabilities that integrate seamlessly with your database infrastructure, offering automatic sharding, compression, streaming, and distributed replication. Unlike traditional file systems that require separate infrastructure and complex synchronization mechanisms, GridFS stores files as documents with built-in metadata, versioning, and query capabilities.

The Large File Storage Challenge

Traditional file storage approaches have significant limitations for modern distributed applications:

// Traditional file system approach - limited scalability and integration
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');

class TraditionalFileStorage {
  constructor(baseDirectory) {
    this.baseDirectory = baseDirectory;
    this.metadata = new Map(); // In-memory metadata - lost on restart
  }

  async uploadFile(filename, fileStream, metadata = {}) {
    try {
      const filePath = path.join(this.baseDirectory, filename);
      const writeStream = fs.createWriteStream(filePath);

      // No built-in compression
      fileStream.pipe(writeStream);

      await new Promise((resolve, reject) => {
        writeStream.on('finish', resolve);
        writeStream.on('error', reject);
      });

      // Manual metadata management
      const stats = fs.statSync(filePath);
      this.metadata.set(filename, {
        size: stats.size,
        uploadDate: new Date(),
        contentType: metadata.contentType || 'application/octet-stream',
        ...metadata
      });

      return { success: true, filename, size: stats.size };

    } catch (error) {
      console.error('Upload failed:', error);
      return { success: false, error: error.message };
    }
  }

  async downloadFile(filename) {
    try {
      const filePath = path.join(this.baseDirectory, filename);

      // No streaming optimization
      if (!fs.existsSync(filePath)) {
        throw new Error('File not found');
      }

      const readStream = fs.createReadStream(filePath);
      const metadata = this.metadata.get(filename) || {};

      return {
        success: true,
        stream: readStream,
        metadata: metadata
      };

    } catch (error) {
      console.error('Download failed:', error);
      return { success: false, error: error.message };
    }
  }

  async getFileMetadata(filename) {
    // Limited metadata capabilities
    return this.metadata.get(filename) || null;
  }

  async deleteFile(filename) {
    try {
      const filePath = path.join(this.baseDirectory, filename);
      fs.unlinkSync(filePath);
      this.metadata.delete(filename);

      return { success: true };
    } catch (error) {
      return { success: false, error: error.message };
    }
  }

  // Problems with traditional file storage:
  // 1. No automatic replication or high availability
  // 2. No built-in compression or optimization
  // 3. Limited metadata and search capabilities  
  // 4. No streaming optimization for large files
  // 5. Manual synchronization across distributed systems
  // 6. No versioning or audit trail capabilities
  // 7. Limited concurrent access and locking mechanisms
  // 8. No integration with database transactions
  // 9. Complex backup and recovery procedures
  // 10. No automatic sharding for very large files
}

MongoDB GridFS eliminates these limitations with comprehensive file storage features:

// MongoDB GridFS - comprehensive file storage with advanced features
const { MongoClient, GridFSBucket, ObjectId } = require('mongodb');
const { Transform, PassThrough } = require('stream');
const zlib = require('zlib');
const sharp = require('sharp'); // For image processing
const ffmpeg = require('fluent-ffmpeg'); // For video processing

class AdvancedGridFSManager {
  constructor(db) {
    this.db = db;
    this.bucket = new GridFSBucket(db, {
      bucketName: 'advanced_files',
      chunkSizeBytes: 1048576 * 4 // 4MB chunks for optimal performance
    });

    // Specialized buckets for different file types
    this.imageBucket = new GridFSBucket(db, { 
      bucketName: 'images',
      chunkSizeBytes: 1048576 * 2 // 2MB for images
    });

    this.videoBucket = new GridFSBucket(db, { 
      bucketName: 'videos',
      chunkSizeBytes: 1048576 * 8 // 8MB for videos
    });

    this.documentBucket = new GridFSBucket(db, { 
      bucketName: 'documents',
      chunkSizeBytes: 1048576 * 1 // 1MB for documents
    });

    // Performance monitoring
    this.metrics = {
      uploads: { count: 0, totalBytes: 0, totalTime: 0 },
      downloads: { count: 0, totalBytes: 0, totalTime: 0 },
      compressionRatios: []
    };
  }

  async uploadFileWithAdvancedFeatures(fileStream, metadata = {}) {
    const startTime = Date.now();

    try {
      // Determine optimal bucket and processing pipeline
      const fileType = this.detectFileType(metadata.contentType);
      const bucket = this.selectOptimalBucket(fileType);

      // Generate unique filename with collision prevention
      const filename = this.generateUniqueFilename(metadata.originalName || 'file');

      // Create processing pipeline based on file type
      const { processedStream, finalMetadata } = await this.createProcessingPipeline(
        fileStream, fileType, metadata
      );

      // Advanced upload with compression and optimization
      const uploadResult = await this.performAdvancedUpload(
        processedStream, filename, finalMetadata, bucket
      );

      // Record performance metrics
      const duration = Date.now() - startTime;
      this.updateMetrics('upload', uploadResult.size, duration);

      // Create file registry entry for advanced querying
      await this.createFileRegistryEntry(uploadResult, finalMetadata);

      return {
        success: true,
        fileId: uploadResult._id,
        filename: filename,
        size: uploadResult.size,
        processingTime: duration,
        compressionRatio: finalMetadata.compressionRatio || 1.0,
        optimizations: finalMetadata.optimizations || []
      };

    } catch (error) {
      console.error('Advanced upload failed:', error);
      return {
        success: false,
        error: error.message,
        processingTime: Date.now() - startTime
      };
    }
  }

  detectFileType(contentType) {
    if (!contentType) return 'unknown';

    if (contentType.startsWith('image/')) return 'image';
    if (contentType.startsWith('video/')) return 'video';
    if (contentType.startsWith('audio/')) return 'audio';
    if (contentType.includes('pdf')) return 'document';
    if (contentType.includes('text/')) return 'text';
    if (contentType.includes('application/json')) return 'data';
    if (contentType.includes('zip') || contentType.includes('gzip')) return 'archive';

    return 'binary';
  }

  selectOptimalBucket(fileType) {
    switch (fileType) {
      case 'image': return this.imageBucket;
      case 'video': 
      case 'audio': return this.videoBucket;
      case 'document':
      case 'text': return this.documentBucket;
      default: return this.bucket;
    }
  }

  generateUniqueFilename(originalName) {
    const timestamp = Date.now();
    const random = Math.random().toString(36).substring(2, 15);
    const extension = originalName.includes('.') ? 
      originalName.split('.').pop() : '';

    return `${timestamp}_${random}${extension ? '.' + extension : ''}`;
  }

  async createProcessingPipeline(inputStream, fileType, metadata) {
    const transforms = [];
    const finalMetadata = { ...metadata };
    let compressionRatio = 1.0;

    // Add compression based on file type
    if (this.shouldCompress(fileType, metadata)) {
      const compressionLevel = this.getOptimalCompressionLevel(fileType);

      const compressionTransform = this.createCompressionTransform(
        fileType, compressionLevel
      );

      transforms.push(compressionTransform);

      finalMetadata.compressed = true;
      finalMetadata.compressionLevel = compressionLevel;
      finalMetadata.originalContentType = metadata.contentType;
    }

    // Add file type specific optimizations
    if (fileType === 'image' && metadata.enableImageOptimization !== false) {
      const imageTransform = await this.createImageOptimizationTransform(metadata);
      transforms.push(imageTransform);
      finalMetadata.optimizations = ['image_optimization'];
    }

    // Add encryption if required
    if (metadata.encrypt === true) {
      const encryptionTransform = this.createEncryptionTransform(metadata.encryptionKey);
      transforms.push(encryptionTransform);
      finalMetadata.encrypted = true;
    }

    // Add integrity checking
    const integrityTransform = this.createIntegrityTransform();
    transforms.push(integrityTransform);

    // Create processing pipeline
    let processedStream = inputStream;

    for (const transform of transforms) {
      processedStream = processedStream.pipe(transform);
    }

    // Add size tracking
    const sizeTracker = this.createSizeTrackingTransform();
    processedStream = processedStream.pipe(sizeTracker);

    // Calculate compression ratio after processing
    finalMetadata.compressionRatio = compressionRatio;

    return { 
      processedStream, 
      finalMetadata: {
        ...finalMetadata,
        processingTimestamp: new Date(),
        pipeline: transforms.map(t => t.constructor.name)
      }
    };
  }

  shouldCompress(fileType, metadata) {
    // Don't compress already compressed formats
    const skipCompression = ['image/jpeg', 'image/png', 'video/', 'audio/', 'zip', 'gzip'];
    if (skipCompression.some(type => metadata.contentType?.includes(type))) {
      return false;
    }

    // Always compress text and data files
    return ['text', 'document', 'data', 'binary'].includes(fileType);
  }

  getOptimalCompressionLevel(fileType) {
    const compressionLevels = {
      'text': 9,      // Maximum compression for text
      'document': 7,  // High compression for documents
      'data': 8,      // High compression for data files
      'binary': 6     // Moderate compression for binary
    };

    return compressionLevels[fileType] || 6;
  }

  createCompressionTransform(fileType, level) {
    // Use gzip compression with optimal settings
    return zlib.createGzip({
      level: level,
      windowBits: 15,
      memLevel: 8,
      strategy: fileType === 'text' ? zlib.constants.Z_TEXT : zlib.constants.Z_DEFAULT_STRATEGY
    });
  }

  async createImageOptimizationTransform(metadata) {
    const options = {
      quality: metadata.imageQuality || 85,
      progressive: true,
      optimizeScans: true
    };

    // Create image optimization transform
    return sharp()
      .jpeg(options)
      .png({ compressionLevel: 9, adaptiveFiltering: true })
      .webp({ quality: options.quality, effort: 6 });
  }

  createEncryptionTransform(encryptionKey) {
    const crypto = require('crypto');
    const algorithm = 'aes-256-gcm';
    const iv = crypto.randomBytes(16);

    const cipher = crypto.createCipher(algorithm, encryptionKey);

    return new Transform({
      transform(chunk, encoding, callback) {
        try {
          const encrypted = cipher.update(chunk);
          callback(null, encrypted);
        } catch (error) {
          callback(error);
        }
      },
      flush(callback) {
        try {
          const final = cipher.final();
          callback(null, final);
        } catch (error) {
          callback(error);
        }
      }
    });
  }

  createIntegrityTransform() {
    const crypto = require('crypto');
    const hash = crypto.createHash('sha256');

    return new Transform({
      transform(chunk, encoding, callback) {
        hash.update(chunk);
        callback(null, chunk); // Pass through while calculating hash
      },
      flush(callback) {
        this.fileHash = hash.digest('hex');
        callback();
      }
    });
  }

  createSizeTrackingTransform() {
    let totalSize = 0;

    return new Transform({
      transform(chunk, encoding, callback) {
        totalSize += chunk.length;
        callback(null, chunk);
      },
      flush(callback) {
        this.totalSize = totalSize;
        callback();
      }
    });
  }

  async performAdvancedUpload(processedStream, filename, metadata, bucket) {
    return new Promise((resolve, reject) => {
      const uploadStream = bucket.openUploadStream(filename, {
        metadata: {
          ...metadata,
          uploadedAt: new Date(),
          processingVersion: '2.0',

          // Add searchable tags
          tags: this.generateSearchTags(metadata),

          // Add file categorization
          category: this.categorizeFile(metadata),

          // Add retention policy
          retentionPolicy: metadata.retentionDays || 365,
          expirationDate: metadata.retentionDays ? 
            new Date(Date.now() + metadata.retentionDays * 24 * 60 * 60 * 1000) : null
        }
      });

      uploadStream.on('error', reject);
      uploadStream.on('finish', (file) => {
        resolve({
          _id: file._id,
          filename: file.filename,
          size: file.length,
          uploadDate: file.uploadDate,
          metadata: file.metadata
        });
      });

      // Start the upload
      processedStream.pipe(uploadStream);
    });
  }

  generateSearchTags(metadata) {
    const tags = [];

    if (metadata.contentType) {
      tags.push(metadata.contentType.split('/')[0]); // e.g., 'image' from 'image/jpeg'
      tags.push(metadata.contentType); // Full content type
    }

    if (metadata.originalName) {
      const extension = metadata.originalName.split('.').pop()?.toLowerCase();
      if (extension) tags.push(extension);
    }

    if (metadata.category) tags.push(metadata.category);
    if (metadata.compressed) tags.push('compressed');
    if (metadata.encrypted) tags.push('encrypted');
    if (metadata.optimized) tags.push('optimized');

    return tags;
  }

  categorizeFile(metadata) {
    const contentType = metadata.contentType || '';

    if (contentType.startsWith('image/')) {
      return metadata.category || 'media';
    } else if (contentType.startsWith('video/') || contentType.startsWith('audio/')) {
      return metadata.category || 'multimedia';
    } else if (contentType.includes('pdf') || contentType.includes('document')) {
      return metadata.category || 'document';
    } else if (contentType.includes('text/')) {
      return metadata.category || 'text';
    } else {
      return metadata.category || 'data';
    }
  }

  async downloadFileWithStreaming(fileId, options = {}) {
    const startTime = Date.now();

    try {
      // Get file metadata for processing decisions
      const fileMetadata = await this.getFileMetadata(fileId);

      if (!fileMetadata) {
        throw new Error(`File not found: ${fileId}`);
      }

      // Select optimal bucket
      const bucket = this.selectBucketByMetadata(fileMetadata);

      // Create download stream with range support
      const downloadOptions = this.createDownloadOptions(options, fileMetadata);
      const downloadStream = bucket.openDownloadStream(
        ObjectId(fileId), 
        downloadOptions
      );

      // Create decompression/decoding pipeline
      const { processedStream, streamMetadata } = this.createDownloadPipeline(
        downloadStream, fileMetadata, options
      );

      // Record performance metrics
      const setupTime = Date.now() - startTime;

      return {
        success: true,
        stream: processedStream,
        metadata: {
          ...fileMetadata,
          streamingOptions: streamMetadata,
          setupTime: setupTime
        }
      };

    } catch (error) {
      console.error('Streaming download failed:', error);
      return {
        success: false,
        error: error.message,
        setupTime: Date.now() - startTime
      };
    }
  }

  selectBucketByMetadata(fileMetadata) {
    const category = fileMetadata.metadata?.category;

    switch (category) {
      case 'media': return this.imageBucket;
      case 'multimedia': return this.videoBucket;
      case 'document':
      case 'text': return this.documentBucket;
      default: return this.bucket;
    }
  }

  createDownloadOptions(options, fileMetadata) {
    const downloadOptions = {};

    // Range/partial content support
    if (options.start !== undefined || options.end !== undefined) {
      downloadOptions.start = options.start || 0;
      downloadOptions.end = options.end || fileMetadata.length - 1;
    }

    return downloadOptions;
  }

  createDownloadPipeline(downloadStream, fileMetadata, options) {
    const transforms = [];
    const streamMetadata = {
      originalSize: fileMetadata.length,
      compressed: fileMetadata.metadata?.compressed || false,
      encrypted: fileMetadata.metadata?.encrypted || false
    };

    // Add decryption if file is encrypted
    if (fileMetadata.metadata?.encrypted && options.decryptionKey) {
      const decryptionTransform = this.createDecryptionTransform(options.decryptionKey);
      transforms.push(decryptionTransform);
      streamMetadata.decrypted = true;
    }

    // Add decompression if file is compressed
    if (fileMetadata.metadata?.compressed && options.decompress !== false) {
      const decompressionTransform = this.createDecompressionTransform();
      transforms.push(decompressionTransform);
      streamMetadata.decompressed = true;
    }

    // Add format conversion if requested
    if (options.convertTo && this.supportsConversion(fileMetadata, options.convertTo)) {
      const conversionTransform = this.createConversionTransform(
        fileMetadata, options.convertTo, options.conversionOptions || {}
      );
      transforms.push(conversionTransform);
      streamMetadata.converted = options.convertTo;
    }

    // Add bandwidth throttling if specified
    if (options.throttle) {
      const throttleTransform = this.createThrottleTransform(options.throttle);
      transforms.push(throttleTransform);
      streamMetadata.throttled = options.throttle;
    }

    // Build pipeline
    let processedStream = downloadStream;

    for (const transform of transforms) {
      processedStream = processedStream.pipe(transform);
    }

    return { processedStream, streamMetadata };
  }

  createDecryptionTransform(decryptionKey) {
    const crypto = require('crypto');
    const algorithm = 'aes-256-gcm';

    const decipher = crypto.createDecipher(algorithm, decryptionKey);

    return new Transform({
      transform(chunk, encoding, callback) {
        try {
          const decrypted = decipher.update(chunk);
          callback(null, decrypted);
        } catch (error) {
          callback(error);
        }
      },
      flush(callback) {
        try {
          const final = decipher.final();
          callback(null, final);
        } catch (error) {
          callback(error);
        }
      }
    });
  }

  createDecompressionTransform() {
    return zlib.createGunzip();
  }

  supportsConversion(fileMetadata, targetFormat) {
    const sourceType = fileMetadata.metadata?.contentType;

    if (!sourceType) return false;

    // Image conversions
    if (sourceType.startsWith('image/') && ['jpeg', 'png', 'webp'].includes(targetFormat)) {
      return true;
    }

    // Video conversions (basic)
    if (sourceType.startsWith('video/') && ['mp4', 'webm'].includes(targetFormat)) {
      return true;
    }

    return false;
  }

  createConversionTransform(fileMetadata, targetFormat, options) {
    const sourceType = fileMetadata.metadata?.contentType;

    if (sourceType?.startsWith('image/')) {
      return this.createImageConversionTransform(targetFormat, options);
    } else if (sourceType?.startsWith('video/')) {
      return this.createVideoConversionTransform(targetFormat, options);
    }

    throw new Error(`Unsupported conversion: ${sourceType} to ${targetFormat}`);
  }

  createImageConversionTransform(targetFormat, options) {
    const sharpInstance = sharp();

    switch (targetFormat) {
      case 'jpeg':
        return sharpInstance.jpeg({
          quality: options.quality || 85,
          progressive: options.progressive !== false
        });
      case 'png':
        return sharpInstance.png({
          compressionLevel: options.compressionLevel || 6
        });
      case 'webp':
        return sharpInstance.webp({
          quality: options.quality || 80,
          effort: options.effort || 4
        });
      default:
        throw new Error(`Unsupported image format: ${targetFormat}`);
    }
  }

  createVideoConversionTransform(targetFormat, options) {
    // Note: This is a simplified example. Real video conversion
    // would require more sophisticated stream handling
    const passThrough = new PassThrough();

    const command = ffmpeg()
      .input(passThrough)
      .format(targetFormat)
      .videoCodec(options.videoCodec || 'libx264')
      .audioCodec(options.audioCodec || 'aac');

    if (options.bitrate) {
      command.videoBitrate(options.bitrate);
    }

    const outputStream = new PassThrough();
    command.pipe(outputStream);

    return outputStream;
  }

  createThrottleTransform(bytesPerSecond) {
    let lastTime = Date.now();
    let bytesWritten = 0;

    return new Transform({
      transform(chunk, encoding, callback) {
        const now = Date.now();
        const elapsed = (now - lastTime) / 1000;
        bytesWritten += chunk.length;

        const expectedTime = bytesWritten / bytesPerSecond;
        const delay = Math.max(0, expectedTime - elapsed);

        setTimeout(() => {
          callback(null, chunk);
        }, delay * 1000);

        lastTime = now;
      }
    });
  }

  async createFileRegistryEntry(uploadResult, metadata) {
    // Create searchable registry for advanced file management
    const registryEntry = {
      _id: new ObjectId(),
      fileId: uploadResult._id,
      filename: uploadResult.filename,

      // File attributes
      size: uploadResult.size,
      contentType: metadata.originalContentType || metadata.contentType,
      category: metadata.category,
      tags: metadata.tags || [],

      // Upload information
      uploadDate: uploadResult.uploadDate,
      uploadedBy: metadata.uploadedBy,
      uploadSource: metadata.uploadSource || 'api',

      // Processing information
      compressed: metadata.compressed || false,
      encrypted: metadata.encrypted || false,
      optimized: metadata.optimizations?.length > 0,
      processingPipeline: metadata.pipeline || [],
      compressionRatio: metadata.compressionRatio || 1.0,

      // Lifecycle management
      retentionPolicy: metadata.retentionDays || 365,
      expirationDate: metadata.expirationDate,
      accessCount: 0,
      lastAccessed: null,

      // Search optimization
      searchableText: this.generateSearchableText(metadata),

      // Audit trail
      auditLog: [{
        action: 'uploaded',
        timestamp: new Date(),
        user: metadata.uploadedBy,
        details: {
          originalSize: metadata.originalSize,
          finalSize: uploadResult.size,
          compressionRatio: metadata.compressionRatio
        }
      }]
    };

    await this.db.collection('file_registry').insertOne(registryEntry);

    // Create indexes for efficient searching
    await this.ensureRegistryIndexes();

    return registryEntry;
  }

  generateSearchableText(metadata) {
    const searchTerms = [];

    if (metadata.originalName) {
      searchTerms.push(metadata.originalName);
    }

    if (metadata.description) {
      searchTerms.push(metadata.description);
    }

    if (metadata.tags) {
      searchTerms.push(...metadata.tags);
    }

    if (metadata.category) {
      searchTerms.push(metadata.category);
    }

    return searchTerms.join(' ').toLowerCase();
  }

  async ensureRegistryIndexes() {
    const registryCollection = this.db.collection('file_registry');

    // Create text index for searching
    await registryCollection.createIndex({
      'searchableText': 'text',
      'filename': 'text',
      'contentType': 'text'
    }, { name: 'file_search_index' });

    // Create compound indexes for common queries
    await registryCollection.createIndex({
      'category': 1,
      'uploadDate': -1
    }, { name: 'category_date_index' });

    await registryCollection.createIndex({
      'tags': 1,
      'size': -1
    }, { name: 'tags_size_index' });

    await registryCollection.createIndex({
      'expirationDate': 1
    }, { 
      name: 'expiration_index',
      expireAfterSeconds: 0 // Automatic cleanup of expired files
    });
  }

  updateMetrics(operation, bytes, duration) {
    if (operation === 'upload') {
      this.metrics.uploads.count++;
      this.metrics.uploads.totalBytes += bytes;
      this.metrics.uploads.totalTime += duration;
    } else if (operation === 'download') {
      this.metrics.downloads.count++;
      this.metrics.downloads.totalBytes += bytes;
      this.metrics.downloads.totalTime += duration;
    }
  }

  async getPerformanceMetrics() {
    const uploadStats = this.metrics.uploads;
    const downloadStats = this.metrics.downloads;

    return {
      uploads: {
        count: uploadStats.count,
        totalMB: Math.round(uploadStats.totalBytes / (1024 * 1024)),
        avgDurationMs: uploadStats.count > 0 ? uploadStats.totalTime / uploadStats.count : 0,
        throughputMBps: uploadStats.totalTime > 0 ? 
          (uploadStats.totalBytes / (1024 * 1024)) / (uploadStats.totalTime / 1000) : 0
      },
      downloads: {
        count: downloadStats.count,
        totalMB: Math.round(downloadStats.totalBytes / (1024 * 1024)),
        avgDurationMs: downloadStats.count > 0 ? downloadStats.totalTime / downloadStats.count : 0,
        throughputMBps: downloadStats.totalTime > 0 ? 
          (downloadStats.totalBytes / (1024 * 1024)) / (downloadStats.totalTime / 1000) : 0
      },
      compressionRatios: this.metrics.compressionRatios,
      averageCompressionRatio: this.metrics.compressionRatios.length > 0 ?
        this.metrics.compressionRatios.reduce((a, b) => a + b) / this.metrics.compressionRatios.length : 1.0
    };
  }
}

Advanced GridFS Streaming Patterns

Chunked Upload with Progress Tracking

Implement efficient chunked uploads for large files with real-time progress monitoring:

// Advanced chunked upload with progress tracking and resume capability
class ChunkedUploadManager {
  constructor(gridFSManager) {
    this.gridFS = gridFSManager;
    this.activeUploads = new Map();
    this.chunkSize = 1048576 * 5; // 5MB chunks
  }

  async initiateChunkedUpload(metadata) {
    const uploadId = new ObjectId();
    const uploadSession = {
      uploadId: uploadId,
      metadata: metadata,
      chunks: [],
      totalSize: metadata.totalSize || 0,
      uploadedSize: 0,
      status: 'initiated',
      createdAt: new Date(),
      lastActivity: new Date()
    };

    this.activeUploads.set(uploadId.toString(), uploadSession);

    // Store upload session in database for persistence
    await this.gridFS.db.collection('upload_sessions').insertOne({
      _id: uploadId,
      ...uploadSession,
      expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000) // 24 hour expiration
    });

    return {
      success: true,
      uploadId: uploadId,
      chunkSize: this.chunkSize,
      session: uploadSession
    };
  }

  async uploadChunk(uploadId, chunkIndex, chunkData) {
    const uploadSession = this.activeUploads.get(uploadId.toString()) ||
      await this.loadUploadSession(uploadId);

    if (!uploadSession) {
      throw new Error('Upload session not found');
    }

    try {
      // Validate chunk
      const validationResult = this.validateChunk(uploadSession, chunkIndex, chunkData);
      if (!validationResult.valid) {
        throw new Error(`Invalid chunk: ${validationResult.reason}`);
      }

      // Store chunk with metadata
      const chunkDocument = {
        _id: new ObjectId(),
        uploadId: ObjectId(uploadId),
        chunkIndex: chunkIndex,
        size: chunkData.length,
        hash: this.calculateChunkHash(chunkData),
        data: chunkData,
        uploadedAt: new Date()
      };

      await this.gridFS.db.collection('upload_chunks').insertOne(chunkDocument);

      // Update upload session
      uploadSession.chunks[chunkIndex] = {
        chunkId: chunkDocument._id,
        size: chunkData.length,
        hash: chunkDocument.hash,
        uploadedAt: new Date()
      };

      uploadSession.uploadedSize += chunkData.length;
      uploadSession.lastActivity = new Date();

      // Calculate progress
      const progress = uploadSession.totalSize > 0 ? 
        (uploadSession.uploadedSize / uploadSession.totalSize) * 100 : 0;

      // Update session in database and memory
      await this.updateUploadSession(uploadId, uploadSession);
      this.activeUploads.set(uploadId.toString(), uploadSession);

      return {
        success: true,
        chunkIndex: chunkIndex,
        uploadedSize: uploadSession.uploadedSize,
        totalSize: uploadSession.totalSize,
        progress: Math.round(progress * 100) / 100,
        remainingChunks: this.calculateRemainingChunks(uploadSession)
      };

    } catch (error) {
      console.error(`Chunk upload failed for upload ${uploadId}, chunk ${chunkIndex}:`, error);
      throw error;
    }
  }

  validateChunk(uploadSession, chunkIndex, chunkData) {
    // Check chunk size
    if (chunkData.length > this.chunkSize) {
      return { valid: false, reason: 'Chunk too large' };
    }

    // Check for duplicate chunks
    if (uploadSession.chunks[chunkIndex]) {
      return { valid: false, reason: 'Chunk already exists' };
    }

    // Validate chunk index sequence
    const expectedIndex = uploadSession.chunks.filter(c => c !== undefined).length;
    if (chunkIndex < 0 || (chunkIndex > expectedIndex && chunkIndex !== expectedIndex)) {
      return { valid: false, reason: 'Invalid chunk index sequence' };
    }

    return { valid: true };
  }

  calculateChunkHash(chunkData) {
    const crypto = require('crypto');
    return crypto.createHash('sha256').update(chunkData).digest('hex');
  }

  calculateRemainingChunks(uploadSession) {
    if (!uploadSession.totalSize) return null;

    const totalChunks = Math.ceil(uploadSession.totalSize / this.chunkSize);
    const uploadedChunks = uploadSession.chunks.filter(c => c !== undefined).length;

    return totalChunks - uploadedChunks;
  }

  async finalizeChunkedUpload(uploadId) {
    const uploadSession = this.activeUploads.get(uploadId.toString()) ||
      await this.loadUploadSession(uploadId);

    if (!uploadSession) {
      throw new Error('Upload session not found');
    }

    try {
      // Validate all chunks are present
      const missingChunks = this.findMissingChunks(uploadSession);
      if (missingChunks.length > 0) {
        throw new Error(`Missing chunks: ${missingChunks.join(', ')}`);
      }

      // Create combined file stream from chunks
      const combinedStream = await this.createCombinedStream(uploadId, uploadSession);

      // Upload to GridFS using the advanced manager
      const uploadResult = await this.gridFS.uploadFileWithAdvancedFeatures(
        combinedStream, 
        {
          ...uploadSession.metadata,
          originalUploadId: uploadId,
          uploadMethod: 'chunked',
          chunkCount: uploadSession.chunks.length,
          finalizedAt: new Date()
        }
      );

      // Cleanup temporary chunks
      await this.cleanupChunkedUpload(uploadId);

      return {
        success: true,
        fileId: uploadResult.fileId,
        filename: uploadResult.filename,
        size: uploadResult.size,
        compressionRatio: uploadResult.compressionRatio,
        uploadMethod: 'chunked',
        totalChunks: uploadSession.chunks.length
      };

    } catch (error) {
      console.error(`Finalization failed for upload ${uploadId}:`, error);
      throw error;
    }
  }

  findMissingChunks(uploadSession) {
    const missingChunks = [];
    const totalChunks = Math.ceil(uploadSession.totalSize / this.chunkSize);

    for (let i = 0; i < totalChunks; i++) {
      if (!uploadSession.chunks[i]) {
        missingChunks.push(i);
      }
    }

    return missingChunks;
  }

  async createCombinedStream(uploadId, uploadSession) {
    const { Readable } = require('stream');

    return new Readable({
      read() {
        // This is a simplified implementation
        // In production, you'd want to stream chunks sequentially
        this.push(null); // End of stream
      }
    });
  }

  async cleanupChunkedUpload(uploadId) {
    // Remove chunks from database
    await this.gridFS.db.collection('upload_chunks').deleteMany({
      uploadId: ObjectId(uploadId)
    });

    // Remove upload session
    await this.gridFS.db.collection('upload_sessions').deleteOne({
      _id: ObjectId(uploadId)
    });

    // Remove from memory
    this.activeUploads.delete(uploadId.toString());
  }

  async loadUploadSession(uploadId) {
    const session = await this.gridFS.db.collection('upload_sessions').findOne({
      _id: ObjectId(uploadId)
    });

    if (session) {
      this.activeUploads.set(uploadId.toString(), session);
      return session;
    }

    return null;
  }

  async updateUploadSession(uploadId, session) {
    await this.gridFS.db.collection('upload_sessions').updateOne(
      { _id: ObjectId(uploadId) },
      { 
        $set: {
          chunks: session.chunks,
          uploadedSize: session.uploadedSize,
          lastActivity: session.lastActivity
        }
      }
    );
  }

  async getUploadProgress(uploadId) {
    const uploadSession = this.activeUploads.get(uploadId.toString()) ||
      await this.loadUploadSession(uploadId);

    if (!uploadSession) {
      return { found: false };
    }

    const progress = uploadSession.totalSize > 0 ? 
      (uploadSession.uploadedSize / uploadSession.totalSize) * 100 : 0;

    return {
      found: true,
      uploadId: uploadId,
      progress: Math.round(progress * 100) / 100,
      uploadedSize: uploadSession.uploadedSize,
      totalSize: uploadSession.totalSize,
      uploadedChunks: uploadSession.chunks.filter(c => c !== undefined).length,
      totalChunks: uploadSession.totalSize > 0 ? 
        Math.ceil(uploadSession.totalSize / this.chunkSize) : 0,
      status: uploadSession.status,
      createdAt: uploadSession.createdAt,
      lastActivity: uploadSession.lastActivity
    };
  }
}

SQL-Style GridFS Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB GridFS operations:

-- QueryLeaf GridFS operations with SQL-familiar syntax

-- Upload file with advanced features
INSERT INTO gridfs_files (
  filename,
  content_type,
  metadata,
  data_stream,
  compression_enabled,
  optimization_level
)
VALUES (
  'large_video.mp4',
  'video/mp4',
  JSON_BUILD_OBJECT(
    'category', 'multimedia',
    'uploadedBy', 'user_123',
    'description', 'Training video content',
    'tags', ARRAY['training', 'video', 'multimedia'],
    'retentionDays', 730,
    'enableCompression', true,
    'qualityOptimization', true
  ),
  -- Stream data (handled by QueryLeaf GridFS interface)
  @video_stream,
  true,  -- Enable compression
  'high' -- Optimization level
);

-- Query files with advanced filtering
SELECT 
  file_id,
  filename,
  content_type,
  file_size_mb,
  upload_date,
  metadata.category,
  metadata.tags,
  metadata.uploadedBy,

  -- Calculated fields
  CASE 
    WHEN file_size_mb > 100 THEN 'large'
    WHEN file_size_mb > 10 THEN 'medium'
    ELSE 'small'
  END as size_category,

  -- Compression information
  metadata.compressed as is_compressed,
  metadata.compressionRatio as compression_ratio,
  ROUND((original_size_mb - file_size_mb) / original_size_mb * 100, 1) as space_saved_percent,

  -- Access statistics
  metadata.accessCount as total_downloads,
  metadata.lastAccessed,

  -- Lifecycle information
  metadata.expirationDate,
  CASE 
    WHEN metadata.expirationDate < CURRENT_TIMESTAMP THEN 'expired'
    WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '30 days' THEN 'expiring_soon'
    ELSE 'active'
  END as lifecycle_status

FROM gridfs_files
WHERE metadata.category = 'multimedia'
  AND upload_date >= CURRENT_TIMESTAMP - INTERVAL '90 days'
  AND file_size_mb BETWEEN 1 AND 1000
ORDER BY upload_date DESC, file_size_mb DESC
LIMIT 50;

-- Advanced file search with full-text capabilities
SELECT 
  file_id,
  filename,
  content_type,
  file_size_mb,
  metadata.description,
  metadata.tags,

  -- Search relevance scoring
  TEXTRANK() as relevance_score,

  -- File categorization
  metadata.category,

  -- Performance metrics
  metadata.compressionRatio,
  metadata.optimizations

FROM gridfs_files
WHERE TEXTSEARCH('training video multimedia')
   OR filename ILIKE '%training%'
   OR metadata.tags && ARRAY['training', 'video']
   OR metadata.description ILIKE '%training%'
ORDER BY relevance_score DESC, upload_date DESC;

-- Aggregated file statistics by category
WITH file_analytics AS (
  SELECT 
    metadata.category,
    COUNT(*) as file_count,
    SUM(file_size_mb) as total_size_mb,
    AVG(file_size_mb) as avg_size_mb,
    MIN(file_size_mb) as min_size_mb,
    MAX(file_size_mb) as max_size_mb,

    -- Compression analysis
    COUNT(*) FILTER (WHERE metadata.compressed = true) as compressed_files,
    AVG(metadata.compressionRatio) FILTER (WHERE metadata.compressed = true) as avg_compression_ratio,

    -- Access patterns
    SUM(metadata.accessCount) as total_downloads,
    AVG(metadata.accessCount) as avg_downloads_per_file,

    -- Date ranges
    MIN(upload_date) as earliest_upload,
    MAX(upload_date) as latest_upload,

    -- Content type distribution
    array_agg(DISTINCT content_type) as content_types

  FROM gridfs_files
  WHERE upload_date >= CURRENT_TIMESTAMP - INTERVAL '1 year'
  GROUP BY metadata.category
),

storage_efficiency AS (
  SELECT 
    category,
    file_count,
    total_size_mb,
    compressed_files,

    -- Storage efficiency metrics
    ROUND((compressed_files::numeric / file_count * 100), 1) as compression_rate_percent,
    ROUND(avg_compression_ratio, 2) as avg_compression_ratio,
    ROUND((total_size_mb * (1 - avg_compression_ratio)), 1) as estimated_space_saved_mb,

    -- Performance insights
    ROUND(avg_size_mb, 1) as avg_file_size_mb,
    ROUND(total_downloads::numeric / file_count, 1) as avg_downloads_per_file,

    -- Category health score
    CASE 
      WHEN compression_rate_percent > 80 AND avg_compression_ratio < 0.7 THEN 'excellent'
      WHEN compression_rate_percent > 60 AND avg_compression_ratio < 0.8 THEN 'good'
      WHEN compression_rate_percent > 40 THEN 'fair'
      ELSE 'poor'
    END as storage_efficiency_rating,

    content_types

  FROM file_analytics
)

SELECT 
  category,
  file_count,
  total_size_mb,
  avg_file_size_mb,
  compression_rate_percent,
  avg_compression_ratio,
  estimated_space_saved_mb,
  storage_efficiency_rating,
  avg_downloads_per_file,

  -- Recommendations
  CASE storage_efficiency_rating
    WHEN 'poor' THEN 'Consider enabling compression for more files in this category'
    WHEN 'fair' THEN 'Review compression settings to improve storage efficiency'
    WHEN 'good' THEN 'Storage efficiency is good, monitor for further optimization'
    ELSE 'Excellent storage efficiency - current settings are optimal'
  END as optimization_recommendation,

  ARRAY_LENGTH(content_types, 1) as content_type_variety,
  content_types

FROM storage_efficiency
ORDER BY total_size_mb DESC;

-- File lifecycle management with retention policies
WITH expiring_files AS (
  SELECT 
    file_id,
    filename,
    content_type,
    file_size_mb,
    upload_date,
    metadata.expirationDate,
    metadata.retentionDays,
    metadata.category,
    metadata.uploadedBy,
    metadata.accessCount,
    metadata.lastAccessed,

    -- Calculate time until expiration
    metadata.expirationDate - CURRENT_TIMESTAMP as time_until_expiration,

    -- Classify expiration urgency
    CASE 
      WHEN metadata.expirationDate < CURRENT_TIMESTAMP THEN 'expired'
      WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '7 days' THEN 'expires_this_week'
      WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '30 days' THEN 'expires_this_month'
      WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '90 days' THEN 'expires_this_quarter'
      ELSE 'expires_later'
    END as expiration_urgency,

    -- Calculate retention recommendation
    CASE 
      WHEN metadata.accessCount = 0 THEN 'delete_unused'
      WHEN metadata.lastAccessed < CURRENT_TIMESTAMP - INTERVAL '180 days' THEN 'archive_old'
      WHEN metadata.accessCount > 10 AND metadata.lastAccessed > CURRENT_TIMESTAMP - INTERVAL '30 days' THEN 'extend_retention'
      ELSE 'maintain_current'
    END as retention_recommendation

  FROM gridfs_files
  WHERE metadata.expirationDate IS NOT NULL
),

retention_actions AS (
  SELECT 
    expiration_urgency,
    retention_recommendation,
    COUNT(*) as file_count,
    SUM(file_size_mb) as total_size_mb,

    -- Files by category
    array_agg(DISTINCT category) as categories_affected,

    -- Size distribution
    ROUND(AVG(file_size_mb), 1) as avg_file_size_mb,
    ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY file_size_mb), 1) as median_file_size_mb,

    -- Access patterns
    ROUND(AVG(accessCount), 1) as avg_access_count,
    MIN(lastAccessed) as oldest_last_access,
    MAX(lastAccessed) as newest_last_access

  FROM expiring_files
  GROUP BY expiration_urgency, retention_recommendation
)

SELECT 
  expiration_urgency,
  retention_recommendation,
  file_count,
  total_size_mb,

  -- Action priority scoring
  CASE expiration_urgency
    WHEN 'expired' THEN 100
    WHEN 'expires_this_week' THEN 90
    WHEN 'expires_this_month' THEN 70
    WHEN 'expires_this_quarter' THEN 50
    ELSE 30
  END as action_priority_score,

  -- Recommended actions
  CASE retention_recommendation
    WHEN 'delete_unused' THEN 'DELETE - No access history, safe to remove'
    WHEN 'archive_old' THEN 'ARCHIVE - Move to cold storage or compress further'
    WHEN 'extend_retention' THEN 'EXTEND - Popular file, consider extending retention period'
    ELSE 'MONITOR - Continue with current retention policy'
  END as recommended_action,

  -- Resource impact
  CONCAT(ROUND((total_size_mb / 1024), 1), ' GB') as storage_impact,

  categories_affected,
  avg_file_size_mb,
  avg_access_count

FROM retention_actions
ORDER BY action_priority_score DESC, total_size_mb DESC;

-- Real-time file transfer monitoring
SELECT 
  transfer_id,
  operation_type, -- 'upload' or 'download'
  file_id,
  filename,

  -- Progress tracking
  bytes_transferred,
  total_bytes,
  ROUND((bytes_transferred::numeric / NULLIF(total_bytes, 0)) * 100, 1) as progress_percent,

  -- Performance metrics
  transfer_rate_mbps,
  estimated_time_remaining_seconds,

  -- Transfer details
  client_ip,
  user_agent,
  compression_enabled,
  encryption_enabled,

  -- Status and timing
  status, -- 'in_progress', 'completed', 'failed', 'paused'
  started_at,
  EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at)) as duration_seconds,

  -- Quality metrics
  error_count,
  retry_count,

  CASE 
    WHEN transfer_rate_mbps > 10 THEN 'fast'
    WHEN transfer_rate_mbps > 1 THEN 'normal'
    ELSE 'slow'
  END as transfer_speed_rating

FROM active_file_transfers
WHERE status IN ('in_progress', 'paused')
  AND started_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY started_at DESC;

-- Performance optimization analysis
WITH transfer_performance AS (
  SELECT 
    DATE_TRUNC('hour', started_at) as time_bucket,
    operation_type,

    -- Volume metrics
    COUNT(*) as transfer_count,
    SUM(total_bytes) / (1024 * 1024 * 1024) as total_gb_transferred,

    -- Performance metrics
    AVG(transfer_rate_mbps) as avg_transfer_rate_mbps,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY transfer_rate_mbps) as p95_transfer_rate_mbps,
    MIN(transfer_rate_mbps) as min_transfer_rate_mbps,

    -- Success rates
    COUNT(*) FILTER (WHERE status = 'completed') as successful_transfers,
    COUNT(*) FILTER (WHERE status = 'failed') as failed_transfers,
    COUNT(*) FILTER (WHERE retry_count > 0) as transfers_with_retries,

    -- Timing analysis
    AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_duration_seconds,
    MAX(EXTRACT(EPOCH FROM (completed_at - started_at))) as max_duration_seconds,

    -- Feature usage
    COUNT(*) FILTER (WHERE compression_enabled = true) as compressed_transfers,
    COUNT(*) FILTER (WHERE encryption_enabled = true) as encrypted_transfers

  FROM active_file_transfers
  WHERE started_at >= CURRENT_TIMESTAMP - INTERVAL '7 days'
    AND status IN ('completed', 'failed')
  GROUP BY DATE_TRUNC('hour', started_at), operation_type
)

SELECT 
  time_bucket,
  operation_type,
  transfer_count,
  total_gb_transferred,

  -- Performance indicators
  ROUND(avg_transfer_rate_mbps, 1) as avg_speed_mbps,
  ROUND(p95_transfer_rate_mbps, 1) as p95_speed_mbps,

  -- Success metrics
  ROUND((successful_transfers::numeric / transfer_count * 100), 1) as success_rate_percent,
  ROUND((transfers_with_retries::numeric / transfer_count * 100), 1) as retry_rate_percent,

  -- Duration insights
  ROUND(avg_duration_seconds / 60, 1) as avg_duration_minutes,
  ROUND(max_duration_seconds / 60, 1) as max_duration_minutes,

  -- Feature adoption
  ROUND((compressed_transfers::numeric / transfer_count * 100), 1) as compression_usage_percent,
  ROUND((encrypted_transfers::numeric / transfer_count * 100), 1) as encryption_usage_percent,

  -- Performance rating
  CASE 
    WHEN avg_transfer_rate_mbps > 50 THEN 'excellent'
    WHEN avg_transfer_rate_mbps > 20 THEN 'good'
    WHEN avg_transfer_rate_mbps > 5 THEN 'fair'
    ELSE 'poor'
  END as performance_rating,

  -- Optimization recommendations
  CASE 
    WHEN avg_transfer_rate_mbps < 5 AND compression_usage_percent < 50 THEN 'Enable compression to improve transfer speeds'
    WHEN retry_rate_percent > 20 THEN 'High retry rate indicates network issues or oversized chunks'
    WHEN success_rate_percent < 95 THEN 'Investigate transfer failures and improve error handling'
    WHEN max_duration_minutes > 60 THEN 'Consider chunked uploads for very large files'
    ELSE 'Performance within acceptable ranges'
  END as optimization_recommendation

FROM transfer_performance
ORDER BY time_bucket DESC;

-- QueryLeaf provides comprehensive GridFS capabilities:
-- 1. SQL-familiar file upload/download operations with streaming support
-- 2. Advanced compression and optimization through SQL parameters
-- 3. Full-text search capabilities for file metadata and content
-- 4. Comprehensive file analytics and storage optimization insights
-- 5. Automated lifecycle management with retention policy enforcement
-- 6. Real-time transfer monitoring and performance analysis
-- 7. Integration with MongoDB's native GridFS optimizations
-- 8. Familiar SQL patterns for complex file management operations

Best Practices for GridFS Production Deployment

Performance Optimization Strategies

Essential optimization techniques for high-throughput GridFS deployments:

  1. Chunk Size Optimization: Configure appropriate chunk sizes based on file types and access patterns
  2. Index Strategy: Create compound indexes on file metadata for efficient queries
  3. Compression Algorithms: Choose optimal compression based on file type and performance requirements
  4. Connection Pooling: Implement appropriate connection pooling for concurrent file operations
  5. Caching Layer: Add CDN or caching layer for frequently accessed files
  6. Monitoring Setup: Implement comprehensive monitoring for file operations and storage usage

Storage Architecture Design

Design principles for scalable GridFS storage systems:

  1. Sharding Strategy: Plan sharding keys for optimal file distribution across cluster nodes
  2. Replica Configuration: Configure appropriate read preferences for file access patterns
  3. Storage Tiering: Implement hot/cold storage strategies for lifecycle management
  4. Backup Strategy: Design comprehensive backup and recovery procedures for file data
  5. Security Implementation: Implement encryption, access controls, and audit logging
  6. Capacity Planning: Plan storage growth and performance scaling requirements

Conclusion

MongoDB GridFS Advanced Streaming and Compression provides enterprise-grade file storage capabilities that eliminate the complexity and limitations of traditional file systems while delivering sophisticated streaming, optimization, and management features. The ability to store, process, and serve large files with built-in compression, encryption, and metadata management makes building robust file storage systems both powerful and straightforward.

Key GridFS Advanced Features include:

  • Streaming Optimization: Efficient chunked uploads and downloads with progress tracking
  • Advanced Compression: Intelligent compression strategies based on file type and content
  • Metadata Integration: Rich metadata storage with full-text search capabilities
  • Performance Monitoring: Real-time transfer monitoring and optimization insights
  • Lifecycle Management: Automated retention policies and storage optimization
  • SQL Accessibility: Familiar file operations through QueryLeaf's SQL interface

Whether you're building content management systems, media platforms, document repositories, or backup solutions, MongoDB GridFS with QueryLeaf's SQL interface provides the foundation for scalable file storage that integrates seamlessly with your application data while maintaining familiar database interaction patterns.

QueryLeaf Integration: QueryLeaf automatically translates SQL file operations into MongoDB GridFS commands, making advanced file storage accessible through familiar SQL patterns. Complex streaming scenarios, compression settings, and metadata queries are seamlessly handled through standard SQL syntax, enabling developers to build powerful file management features without learning GridFS-specific APIs.

The combination of MongoDB's robust file storage capabilities with SQL-familiar operations creates an ideal platform for applications requiring both sophisticated file management and familiar database interaction patterns, ensuring your file storage systems remain both scalable and maintainable as your data requirements evolve.