MongoDB GridFS Advanced File Streaming and Compression: High-Performance Large File Management and Optimization
Modern applications require efficient handling of large files, from media assets and document repositories to backup systems and content delivery networks. Traditional file storage approaches struggle with distributed architectures, automatic failover, and efficient streaming, especially when dealing with multi-gigabyte files or high-throughput workloads.
MongoDB GridFS provides advanced file storage capabilities that integrate seamlessly with your database infrastructure, offering automatic sharding, compression, streaming, and distributed replication. Unlike traditional file systems that require separate infrastructure and complex synchronization mechanisms, GridFS stores files as documents with built-in metadata, versioning, and query capabilities.
The Large File Storage Challenge
Traditional file storage approaches have significant limitations for modern distributed applications:
// Traditional file system approach - limited scalability and integration
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
class TraditionalFileStorage {
constructor(baseDirectory) {
this.baseDirectory = baseDirectory;
this.metadata = new Map(); // In-memory metadata - lost on restart
}
async uploadFile(filename, fileStream, metadata = {}) {
try {
const filePath = path.join(this.baseDirectory, filename);
const writeStream = fs.createWriteStream(filePath);
// No built-in compression
fileStream.pipe(writeStream);
await new Promise((resolve, reject) => {
writeStream.on('finish', resolve);
writeStream.on('error', reject);
});
// Manual metadata management
const stats = fs.statSync(filePath);
this.metadata.set(filename, {
size: stats.size,
uploadDate: new Date(),
contentType: metadata.contentType || 'application/octet-stream',
...metadata
});
return { success: true, filename, size: stats.size };
} catch (error) {
console.error('Upload failed:', error);
return { success: false, error: error.message };
}
}
async downloadFile(filename) {
try {
const filePath = path.join(this.baseDirectory, filename);
// No streaming optimization
if (!fs.existsSync(filePath)) {
throw new Error('File not found');
}
const readStream = fs.createReadStream(filePath);
const metadata = this.metadata.get(filename) || {};
return {
success: true,
stream: readStream,
metadata: metadata
};
} catch (error) {
console.error('Download failed:', error);
return { success: false, error: error.message };
}
}
async getFileMetadata(filename) {
// Limited metadata capabilities
return this.metadata.get(filename) || null;
}
async deleteFile(filename) {
try {
const filePath = path.join(this.baseDirectory, filename);
fs.unlinkSync(filePath);
this.metadata.delete(filename);
return { success: true };
} catch (error) {
return { success: false, error: error.message };
}
}
// Problems with traditional file storage:
// 1. No automatic replication or high availability
// 2. No built-in compression or optimization
// 3. Limited metadata and search capabilities
// 4. No streaming optimization for large files
// 5. Manual synchronization across distributed systems
// 6. No versioning or audit trail capabilities
// 7. Limited concurrent access and locking mechanisms
// 8. No integration with database transactions
// 9. Complex backup and recovery procedures
// 10. No automatic sharding for very large files
}
MongoDB GridFS eliminates these limitations with comprehensive file storage features:
// MongoDB GridFS - comprehensive file storage with advanced features
const { MongoClient, GridFSBucket, ObjectId } = require('mongodb');
const { Transform, PassThrough } = require('stream');
const zlib = require('zlib');
const sharp = require('sharp'); // For image processing
const ffmpeg = require('fluent-ffmpeg'); // For video processing
class AdvancedGridFSManager {
constructor(db) {
this.db = db;
this.bucket = new GridFSBucket(db, {
bucketName: 'advanced_files',
chunkSizeBytes: 1048576 * 4 // 4MB chunks for optimal performance
});
// Specialized buckets for different file types
this.imageBucket = new GridFSBucket(db, {
bucketName: 'images',
chunkSizeBytes: 1048576 * 2 // 2MB for images
});
this.videoBucket = new GridFSBucket(db, {
bucketName: 'videos',
chunkSizeBytes: 1048576 * 8 // 8MB for videos
});
this.documentBucket = new GridFSBucket(db, {
bucketName: 'documents',
chunkSizeBytes: 1048576 * 1 // 1MB for documents
});
// Performance monitoring
this.metrics = {
uploads: { count: 0, totalBytes: 0, totalTime: 0 },
downloads: { count: 0, totalBytes: 0, totalTime: 0 },
compressionRatios: []
};
}
async uploadFileWithAdvancedFeatures(fileStream, metadata = {}) {
const startTime = Date.now();
try {
// Determine optimal bucket and processing pipeline
const fileType = this.detectFileType(metadata.contentType);
const bucket = this.selectOptimalBucket(fileType);
// Generate unique filename with collision prevention
const filename = this.generateUniqueFilename(metadata.originalName || 'file');
// Create processing pipeline based on file type
const { processedStream, finalMetadata } = await this.createProcessingPipeline(
fileStream, fileType, metadata
);
// Advanced upload with compression and optimization
const uploadResult = await this.performAdvancedUpload(
processedStream, filename, finalMetadata, bucket
);
// Record performance metrics
const duration = Date.now() - startTime;
this.updateMetrics('upload', uploadResult.size, duration);
// Create file registry entry for advanced querying
await this.createFileRegistryEntry(uploadResult, finalMetadata);
return {
success: true,
fileId: uploadResult._id,
filename: filename,
size: uploadResult.size,
processingTime: duration,
compressionRatio: finalMetadata.compressionRatio || 1.0,
optimizations: finalMetadata.optimizations || []
};
} catch (error) {
console.error('Advanced upload failed:', error);
return {
success: false,
error: error.message,
processingTime: Date.now() - startTime
};
}
}
detectFileType(contentType) {
if (!contentType) return 'unknown';
if (contentType.startsWith('image/')) return 'image';
if (contentType.startsWith('video/')) return 'video';
if (contentType.startsWith('audio/')) return 'audio';
if (contentType.includes('pdf')) return 'document';
if (contentType.includes('text/')) return 'text';
if (contentType.includes('application/json')) return 'data';
if (contentType.includes('zip') || contentType.includes('gzip')) return 'archive';
return 'binary';
}
selectOptimalBucket(fileType) {
switch (fileType) {
case 'image': return this.imageBucket;
case 'video':
case 'audio': return this.videoBucket;
case 'document':
case 'text': return this.documentBucket;
default: return this.bucket;
}
}
generateUniqueFilename(originalName) {
const timestamp = Date.now();
const random = Math.random().toString(36).substring(2, 15);
const extension = originalName.includes('.') ?
originalName.split('.').pop() : '';
return `${timestamp}_${random}${extension ? '.' + extension : ''}`;
}
async createProcessingPipeline(inputStream, fileType, metadata) {
const transforms = [];
const finalMetadata = { ...metadata };
let compressionRatio = 1.0;
// Add compression based on file type
if (this.shouldCompress(fileType, metadata)) {
const compressionLevel = this.getOptimalCompressionLevel(fileType);
const compressionTransform = this.createCompressionTransform(
fileType, compressionLevel
);
transforms.push(compressionTransform);
finalMetadata.compressed = true;
finalMetadata.compressionLevel = compressionLevel;
finalMetadata.originalContentType = metadata.contentType;
}
// Add file type specific optimizations
if (fileType === 'image' && metadata.enableImageOptimization !== false) {
const imageTransform = await this.createImageOptimizationTransform(metadata);
transforms.push(imageTransform);
finalMetadata.optimizations = ['image_optimization'];
}
// Add encryption if required
if (metadata.encrypt === true) {
const encryptionTransform = this.createEncryptionTransform(metadata.encryptionKey);
transforms.push(encryptionTransform);
finalMetadata.encrypted = true;
}
// Add integrity checking
const integrityTransform = this.createIntegrityTransform();
transforms.push(integrityTransform);
// Create processing pipeline
let processedStream = inputStream;
for (const transform of transforms) {
processedStream = processedStream.pipe(transform);
}
// Add size tracking
const sizeTracker = this.createSizeTrackingTransform();
processedStream = processedStream.pipe(sizeTracker);
// Calculate compression ratio after processing
finalMetadata.compressionRatio = compressionRatio;
return {
processedStream,
finalMetadata: {
...finalMetadata,
processingTimestamp: new Date(),
pipeline: transforms.map(t => t.constructor.name)
}
};
}
shouldCompress(fileType, metadata) {
// Don't compress already compressed formats
const skipCompression = ['image/jpeg', 'image/png', 'video/', 'audio/', 'zip', 'gzip'];
if (skipCompression.some(type => metadata.contentType?.includes(type))) {
return false;
}
// Always compress text and data files
return ['text', 'document', 'data', 'binary'].includes(fileType);
}
getOptimalCompressionLevel(fileType) {
const compressionLevels = {
'text': 9, // Maximum compression for text
'document': 7, // High compression for documents
'data': 8, // High compression for data files
'binary': 6 // Moderate compression for binary
};
return compressionLevels[fileType] || 6;
}
createCompressionTransform(fileType, level) {
// Use gzip compression with optimal settings
return zlib.createGzip({
level: level,
windowBits: 15,
memLevel: 8,
strategy: fileType === 'text' ? zlib.constants.Z_TEXT : zlib.constants.Z_DEFAULT_STRATEGY
});
}
async createImageOptimizationTransform(metadata) {
const options = {
quality: metadata.imageQuality || 85,
progressive: true,
optimizeScans: true
};
// Create image optimization transform
return sharp()
.jpeg(options)
.png({ compressionLevel: 9, adaptiveFiltering: true })
.webp({ quality: options.quality, effort: 6 });
}
createEncryptionTransform(encryptionKey) {
const crypto = require('crypto');
const algorithm = 'aes-256-gcm';
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipher(algorithm, encryptionKey);
return new Transform({
transform(chunk, encoding, callback) {
try {
const encrypted = cipher.update(chunk);
callback(null, encrypted);
} catch (error) {
callback(error);
}
},
flush(callback) {
try {
const final = cipher.final();
callback(null, final);
} catch (error) {
callback(error);
}
}
});
}
createIntegrityTransform() {
const crypto = require('crypto');
const hash = crypto.createHash('sha256');
return new Transform({
transform(chunk, encoding, callback) {
hash.update(chunk);
callback(null, chunk); // Pass through while calculating hash
},
flush(callback) {
this.fileHash = hash.digest('hex');
callback();
}
});
}
createSizeTrackingTransform() {
let totalSize = 0;
return new Transform({
transform(chunk, encoding, callback) {
totalSize += chunk.length;
callback(null, chunk);
},
flush(callback) {
this.totalSize = totalSize;
callback();
}
});
}
async performAdvancedUpload(processedStream, filename, metadata, bucket) {
return new Promise((resolve, reject) => {
const uploadStream = bucket.openUploadStream(filename, {
metadata: {
...metadata,
uploadedAt: new Date(),
processingVersion: '2.0',
// Add searchable tags
tags: this.generateSearchTags(metadata),
// Add file categorization
category: this.categorizeFile(metadata),
// Add retention policy
retentionPolicy: metadata.retentionDays || 365,
expirationDate: metadata.retentionDays ?
new Date(Date.now() + metadata.retentionDays * 24 * 60 * 60 * 1000) : null
}
});
uploadStream.on('error', reject);
uploadStream.on('finish', (file) => {
resolve({
_id: file._id,
filename: file.filename,
size: file.length,
uploadDate: file.uploadDate,
metadata: file.metadata
});
});
// Start the upload
processedStream.pipe(uploadStream);
});
}
generateSearchTags(metadata) {
const tags = [];
if (metadata.contentType) {
tags.push(metadata.contentType.split('/')[0]); // e.g., 'image' from 'image/jpeg'
tags.push(metadata.contentType); // Full content type
}
if (metadata.originalName) {
const extension = metadata.originalName.split('.').pop()?.toLowerCase();
if (extension) tags.push(extension);
}
if (metadata.category) tags.push(metadata.category);
if (metadata.compressed) tags.push('compressed');
if (metadata.encrypted) tags.push('encrypted');
if (metadata.optimized) tags.push('optimized');
return tags;
}
categorizeFile(metadata) {
const contentType = metadata.contentType || '';
if (contentType.startsWith('image/')) {
return metadata.category || 'media';
} else if (contentType.startsWith('video/') || contentType.startsWith('audio/')) {
return metadata.category || 'multimedia';
} else if (contentType.includes('pdf') || contentType.includes('document')) {
return metadata.category || 'document';
} else if (contentType.includes('text/')) {
return metadata.category || 'text';
} else {
return metadata.category || 'data';
}
}
async downloadFileWithStreaming(fileId, options = {}) {
const startTime = Date.now();
try {
// Get file metadata for processing decisions
const fileMetadata = await this.getFileMetadata(fileId);
if (!fileMetadata) {
throw new Error(`File not found: ${fileId}`);
}
// Select optimal bucket
const bucket = this.selectBucketByMetadata(fileMetadata);
// Create download stream with range support
const downloadOptions = this.createDownloadOptions(options, fileMetadata);
const downloadStream = bucket.openDownloadStream(
ObjectId(fileId),
downloadOptions
);
// Create decompression/decoding pipeline
const { processedStream, streamMetadata } = this.createDownloadPipeline(
downloadStream, fileMetadata, options
);
// Record performance metrics
const setupTime = Date.now() - startTime;
return {
success: true,
stream: processedStream,
metadata: {
...fileMetadata,
streamingOptions: streamMetadata,
setupTime: setupTime
}
};
} catch (error) {
console.error('Streaming download failed:', error);
return {
success: false,
error: error.message,
setupTime: Date.now() - startTime
};
}
}
selectBucketByMetadata(fileMetadata) {
const category = fileMetadata.metadata?.category;
switch (category) {
case 'media': return this.imageBucket;
case 'multimedia': return this.videoBucket;
case 'document':
case 'text': return this.documentBucket;
default: return this.bucket;
}
}
createDownloadOptions(options, fileMetadata) {
const downloadOptions = {};
// Range/partial content support
if (options.start !== undefined || options.end !== undefined) {
downloadOptions.start = options.start || 0;
downloadOptions.end = options.end || fileMetadata.length - 1;
}
return downloadOptions;
}
createDownloadPipeline(downloadStream, fileMetadata, options) {
const transforms = [];
const streamMetadata = {
originalSize: fileMetadata.length,
compressed: fileMetadata.metadata?.compressed || false,
encrypted: fileMetadata.metadata?.encrypted || false
};
// Add decryption if file is encrypted
if (fileMetadata.metadata?.encrypted && options.decryptionKey) {
const decryptionTransform = this.createDecryptionTransform(options.decryptionKey);
transforms.push(decryptionTransform);
streamMetadata.decrypted = true;
}
// Add decompression if file is compressed
if (fileMetadata.metadata?.compressed && options.decompress !== false) {
const decompressionTransform = this.createDecompressionTransform();
transforms.push(decompressionTransform);
streamMetadata.decompressed = true;
}
// Add format conversion if requested
if (options.convertTo && this.supportsConversion(fileMetadata, options.convertTo)) {
const conversionTransform = this.createConversionTransform(
fileMetadata, options.convertTo, options.conversionOptions || {}
);
transforms.push(conversionTransform);
streamMetadata.converted = options.convertTo;
}
// Add bandwidth throttling if specified
if (options.throttle) {
const throttleTransform = this.createThrottleTransform(options.throttle);
transforms.push(throttleTransform);
streamMetadata.throttled = options.throttle;
}
// Build pipeline
let processedStream = downloadStream;
for (const transform of transforms) {
processedStream = processedStream.pipe(transform);
}
return { processedStream, streamMetadata };
}
createDecryptionTransform(decryptionKey) {
const crypto = require('crypto');
const algorithm = 'aes-256-gcm';
const decipher = crypto.createDecipher(algorithm, decryptionKey);
return new Transform({
transform(chunk, encoding, callback) {
try {
const decrypted = decipher.update(chunk);
callback(null, decrypted);
} catch (error) {
callback(error);
}
},
flush(callback) {
try {
const final = decipher.final();
callback(null, final);
} catch (error) {
callback(error);
}
}
});
}
createDecompressionTransform() {
return zlib.createGunzip();
}
supportsConversion(fileMetadata, targetFormat) {
const sourceType = fileMetadata.metadata?.contentType;
if (!sourceType) return false;
// Image conversions
if (sourceType.startsWith('image/') && ['jpeg', 'png', 'webp'].includes(targetFormat)) {
return true;
}
// Video conversions (basic)
if (sourceType.startsWith('video/') && ['mp4', 'webm'].includes(targetFormat)) {
return true;
}
return false;
}
createConversionTransform(fileMetadata, targetFormat, options) {
const sourceType = fileMetadata.metadata?.contentType;
if (sourceType?.startsWith('image/')) {
return this.createImageConversionTransform(targetFormat, options);
} else if (sourceType?.startsWith('video/')) {
return this.createVideoConversionTransform(targetFormat, options);
}
throw new Error(`Unsupported conversion: ${sourceType} to ${targetFormat}`);
}
createImageConversionTransform(targetFormat, options) {
const sharpInstance = sharp();
switch (targetFormat) {
case 'jpeg':
return sharpInstance.jpeg({
quality: options.quality || 85,
progressive: options.progressive !== false
});
case 'png':
return sharpInstance.png({
compressionLevel: options.compressionLevel || 6
});
case 'webp':
return sharpInstance.webp({
quality: options.quality || 80,
effort: options.effort || 4
});
default:
throw new Error(`Unsupported image format: ${targetFormat}`);
}
}
createVideoConversionTransform(targetFormat, options) {
// Note: This is a simplified example. Real video conversion
// would require more sophisticated stream handling
const passThrough = new PassThrough();
const command = ffmpeg()
.input(passThrough)
.format(targetFormat)
.videoCodec(options.videoCodec || 'libx264')
.audioCodec(options.audioCodec || 'aac');
if (options.bitrate) {
command.videoBitrate(options.bitrate);
}
const outputStream = new PassThrough();
command.pipe(outputStream);
return outputStream;
}
createThrottleTransform(bytesPerSecond) {
let lastTime = Date.now();
let bytesWritten = 0;
return new Transform({
transform(chunk, encoding, callback) {
const now = Date.now();
const elapsed = (now - lastTime) / 1000;
bytesWritten += chunk.length;
const expectedTime = bytesWritten / bytesPerSecond;
const delay = Math.max(0, expectedTime - elapsed);
setTimeout(() => {
callback(null, chunk);
}, delay * 1000);
lastTime = now;
}
});
}
async createFileRegistryEntry(uploadResult, metadata) {
// Create searchable registry for advanced file management
const registryEntry = {
_id: new ObjectId(),
fileId: uploadResult._id,
filename: uploadResult.filename,
// File attributes
size: uploadResult.size,
contentType: metadata.originalContentType || metadata.contentType,
category: metadata.category,
tags: metadata.tags || [],
// Upload information
uploadDate: uploadResult.uploadDate,
uploadedBy: metadata.uploadedBy,
uploadSource: metadata.uploadSource || 'api',
// Processing information
compressed: metadata.compressed || false,
encrypted: metadata.encrypted || false,
optimized: metadata.optimizations?.length > 0,
processingPipeline: metadata.pipeline || [],
compressionRatio: metadata.compressionRatio || 1.0,
// Lifecycle management
retentionPolicy: metadata.retentionDays || 365,
expirationDate: metadata.expirationDate,
accessCount: 0,
lastAccessed: null,
// Search optimization
searchableText: this.generateSearchableText(metadata),
// Audit trail
auditLog: [{
action: 'uploaded',
timestamp: new Date(),
user: metadata.uploadedBy,
details: {
originalSize: metadata.originalSize,
finalSize: uploadResult.size,
compressionRatio: metadata.compressionRatio
}
}]
};
await this.db.collection('file_registry').insertOne(registryEntry);
// Create indexes for efficient searching
await this.ensureRegistryIndexes();
return registryEntry;
}
generateSearchableText(metadata) {
const searchTerms = [];
if (metadata.originalName) {
searchTerms.push(metadata.originalName);
}
if (metadata.description) {
searchTerms.push(metadata.description);
}
if (metadata.tags) {
searchTerms.push(...metadata.tags);
}
if (metadata.category) {
searchTerms.push(metadata.category);
}
return searchTerms.join(' ').toLowerCase();
}
async ensureRegistryIndexes() {
const registryCollection = this.db.collection('file_registry');
// Create text index for searching
await registryCollection.createIndex({
'searchableText': 'text',
'filename': 'text',
'contentType': 'text'
}, { name: 'file_search_index' });
// Create compound indexes for common queries
await registryCollection.createIndex({
'category': 1,
'uploadDate': -1
}, { name: 'category_date_index' });
await registryCollection.createIndex({
'tags': 1,
'size': -1
}, { name: 'tags_size_index' });
await registryCollection.createIndex({
'expirationDate': 1
}, {
name: 'expiration_index',
expireAfterSeconds: 0 // Automatic cleanup of expired files
});
}
updateMetrics(operation, bytes, duration) {
if (operation === 'upload') {
this.metrics.uploads.count++;
this.metrics.uploads.totalBytes += bytes;
this.metrics.uploads.totalTime += duration;
} else if (operation === 'download') {
this.metrics.downloads.count++;
this.metrics.downloads.totalBytes += bytes;
this.metrics.downloads.totalTime += duration;
}
}
async getPerformanceMetrics() {
const uploadStats = this.metrics.uploads;
const downloadStats = this.metrics.downloads;
return {
uploads: {
count: uploadStats.count,
totalMB: Math.round(uploadStats.totalBytes / (1024 * 1024)),
avgDurationMs: uploadStats.count > 0 ? uploadStats.totalTime / uploadStats.count : 0,
throughputMBps: uploadStats.totalTime > 0 ?
(uploadStats.totalBytes / (1024 * 1024)) / (uploadStats.totalTime / 1000) : 0
},
downloads: {
count: downloadStats.count,
totalMB: Math.round(downloadStats.totalBytes / (1024 * 1024)),
avgDurationMs: downloadStats.count > 0 ? downloadStats.totalTime / downloadStats.count : 0,
throughputMBps: downloadStats.totalTime > 0 ?
(downloadStats.totalBytes / (1024 * 1024)) / (downloadStats.totalTime / 1000) : 0
},
compressionRatios: this.metrics.compressionRatios,
averageCompressionRatio: this.metrics.compressionRatios.length > 0 ?
this.metrics.compressionRatios.reduce((a, b) => a + b) / this.metrics.compressionRatios.length : 1.0
};
}
}
Advanced GridFS Streaming Patterns
Chunked Upload with Progress Tracking
Implement efficient chunked uploads for large files with real-time progress monitoring:
// Advanced chunked upload with progress tracking and resume capability
class ChunkedUploadManager {
constructor(gridFSManager) {
this.gridFS = gridFSManager;
this.activeUploads = new Map();
this.chunkSize = 1048576 * 5; // 5MB chunks
}
async initiateChunkedUpload(metadata) {
const uploadId = new ObjectId();
const uploadSession = {
uploadId: uploadId,
metadata: metadata,
chunks: [],
totalSize: metadata.totalSize || 0,
uploadedSize: 0,
status: 'initiated',
createdAt: new Date(),
lastActivity: new Date()
};
this.activeUploads.set(uploadId.toString(), uploadSession);
// Store upload session in database for persistence
await this.gridFS.db.collection('upload_sessions').insertOne({
_id: uploadId,
...uploadSession,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000) // 24 hour expiration
});
return {
success: true,
uploadId: uploadId,
chunkSize: this.chunkSize,
session: uploadSession
};
}
async uploadChunk(uploadId, chunkIndex, chunkData) {
const uploadSession = this.activeUploads.get(uploadId.toString()) ||
await this.loadUploadSession(uploadId);
if (!uploadSession) {
throw new Error('Upload session not found');
}
try {
// Validate chunk
const validationResult = this.validateChunk(uploadSession, chunkIndex, chunkData);
if (!validationResult.valid) {
throw new Error(`Invalid chunk: ${validationResult.reason}`);
}
// Store chunk with metadata
const chunkDocument = {
_id: new ObjectId(),
uploadId: ObjectId(uploadId),
chunkIndex: chunkIndex,
size: chunkData.length,
hash: this.calculateChunkHash(chunkData),
data: chunkData,
uploadedAt: new Date()
};
await this.gridFS.db.collection('upload_chunks').insertOne(chunkDocument);
// Update upload session
uploadSession.chunks[chunkIndex] = {
chunkId: chunkDocument._id,
size: chunkData.length,
hash: chunkDocument.hash,
uploadedAt: new Date()
};
uploadSession.uploadedSize += chunkData.length;
uploadSession.lastActivity = new Date();
// Calculate progress
const progress = uploadSession.totalSize > 0 ?
(uploadSession.uploadedSize / uploadSession.totalSize) * 100 : 0;
// Update session in database and memory
await this.updateUploadSession(uploadId, uploadSession);
this.activeUploads.set(uploadId.toString(), uploadSession);
return {
success: true,
chunkIndex: chunkIndex,
uploadedSize: uploadSession.uploadedSize,
totalSize: uploadSession.totalSize,
progress: Math.round(progress * 100) / 100,
remainingChunks: this.calculateRemainingChunks(uploadSession)
};
} catch (error) {
console.error(`Chunk upload failed for upload ${uploadId}, chunk ${chunkIndex}:`, error);
throw error;
}
}
validateChunk(uploadSession, chunkIndex, chunkData) {
// Check chunk size
if (chunkData.length > this.chunkSize) {
return { valid: false, reason: 'Chunk too large' };
}
// Check for duplicate chunks
if (uploadSession.chunks[chunkIndex]) {
return { valid: false, reason: 'Chunk already exists' };
}
// Validate chunk index sequence
const expectedIndex = uploadSession.chunks.filter(c => c !== undefined).length;
if (chunkIndex < 0 || (chunkIndex > expectedIndex && chunkIndex !== expectedIndex)) {
return { valid: false, reason: 'Invalid chunk index sequence' };
}
return { valid: true };
}
calculateChunkHash(chunkData) {
const crypto = require('crypto');
return crypto.createHash('sha256').update(chunkData).digest('hex');
}
calculateRemainingChunks(uploadSession) {
if (!uploadSession.totalSize) return null;
const totalChunks = Math.ceil(uploadSession.totalSize / this.chunkSize);
const uploadedChunks = uploadSession.chunks.filter(c => c !== undefined).length;
return totalChunks - uploadedChunks;
}
async finalizeChunkedUpload(uploadId) {
const uploadSession = this.activeUploads.get(uploadId.toString()) ||
await this.loadUploadSession(uploadId);
if (!uploadSession) {
throw new Error('Upload session not found');
}
try {
// Validate all chunks are present
const missingChunks = this.findMissingChunks(uploadSession);
if (missingChunks.length > 0) {
throw new Error(`Missing chunks: ${missingChunks.join(', ')}`);
}
// Create combined file stream from chunks
const combinedStream = await this.createCombinedStream(uploadId, uploadSession);
// Upload to GridFS using the advanced manager
const uploadResult = await this.gridFS.uploadFileWithAdvancedFeatures(
combinedStream,
{
...uploadSession.metadata,
originalUploadId: uploadId,
uploadMethod: 'chunked',
chunkCount: uploadSession.chunks.length,
finalizedAt: new Date()
}
);
// Cleanup temporary chunks
await this.cleanupChunkedUpload(uploadId);
return {
success: true,
fileId: uploadResult.fileId,
filename: uploadResult.filename,
size: uploadResult.size,
compressionRatio: uploadResult.compressionRatio,
uploadMethod: 'chunked',
totalChunks: uploadSession.chunks.length
};
} catch (error) {
console.error(`Finalization failed for upload ${uploadId}:`, error);
throw error;
}
}
findMissingChunks(uploadSession) {
const missingChunks = [];
const totalChunks = Math.ceil(uploadSession.totalSize / this.chunkSize);
for (let i = 0; i < totalChunks; i++) {
if (!uploadSession.chunks[i]) {
missingChunks.push(i);
}
}
return missingChunks;
}
async createCombinedStream(uploadId, uploadSession) {
const { Readable } = require('stream');
return new Readable({
read() {
// This is a simplified implementation
// In production, you'd want to stream chunks sequentially
this.push(null); // End of stream
}
});
}
async cleanupChunkedUpload(uploadId) {
// Remove chunks from database
await this.gridFS.db.collection('upload_chunks').deleteMany({
uploadId: ObjectId(uploadId)
});
// Remove upload session
await this.gridFS.db.collection('upload_sessions').deleteOne({
_id: ObjectId(uploadId)
});
// Remove from memory
this.activeUploads.delete(uploadId.toString());
}
async loadUploadSession(uploadId) {
const session = await this.gridFS.db.collection('upload_sessions').findOne({
_id: ObjectId(uploadId)
});
if (session) {
this.activeUploads.set(uploadId.toString(), session);
return session;
}
return null;
}
async updateUploadSession(uploadId, session) {
await this.gridFS.db.collection('upload_sessions').updateOne(
{ _id: ObjectId(uploadId) },
{
$set: {
chunks: session.chunks,
uploadedSize: session.uploadedSize,
lastActivity: session.lastActivity
}
}
);
}
async getUploadProgress(uploadId) {
const uploadSession = this.activeUploads.get(uploadId.toString()) ||
await this.loadUploadSession(uploadId);
if (!uploadSession) {
return { found: false };
}
const progress = uploadSession.totalSize > 0 ?
(uploadSession.uploadedSize / uploadSession.totalSize) * 100 : 0;
return {
found: true,
uploadId: uploadId,
progress: Math.round(progress * 100) / 100,
uploadedSize: uploadSession.uploadedSize,
totalSize: uploadSession.totalSize,
uploadedChunks: uploadSession.chunks.filter(c => c !== undefined).length,
totalChunks: uploadSession.totalSize > 0 ?
Math.ceil(uploadSession.totalSize / this.chunkSize) : 0,
status: uploadSession.status,
createdAt: uploadSession.createdAt,
lastActivity: uploadSession.lastActivity
};
}
}
SQL-Style GridFS Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB GridFS operations:
-- QueryLeaf GridFS operations with SQL-familiar syntax
-- Upload file with advanced features
INSERT INTO gridfs_files (
filename,
content_type,
metadata,
data_stream,
compression_enabled,
optimization_level
)
VALUES (
'large_video.mp4',
'video/mp4',
JSON_BUILD_OBJECT(
'category', 'multimedia',
'uploadedBy', 'user_123',
'description', 'Training video content',
'tags', ARRAY['training', 'video', 'multimedia'],
'retentionDays', 730,
'enableCompression', true,
'qualityOptimization', true
),
-- Stream data (handled by QueryLeaf GridFS interface)
@video_stream,
true, -- Enable compression
'high' -- Optimization level
);
-- Query files with advanced filtering
SELECT
file_id,
filename,
content_type,
file_size_mb,
upload_date,
metadata.category,
metadata.tags,
metadata.uploadedBy,
-- Calculated fields
CASE
WHEN file_size_mb > 100 THEN 'large'
WHEN file_size_mb > 10 THEN 'medium'
ELSE 'small'
END as size_category,
-- Compression information
metadata.compressed as is_compressed,
metadata.compressionRatio as compression_ratio,
ROUND((original_size_mb - file_size_mb) / original_size_mb * 100, 1) as space_saved_percent,
-- Access statistics
metadata.accessCount as total_downloads,
metadata.lastAccessed,
-- Lifecycle information
metadata.expirationDate,
CASE
WHEN metadata.expirationDate < CURRENT_TIMESTAMP THEN 'expired'
WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '30 days' THEN 'expiring_soon'
ELSE 'active'
END as lifecycle_status
FROM gridfs_files
WHERE metadata.category = 'multimedia'
AND upload_date >= CURRENT_TIMESTAMP - INTERVAL '90 days'
AND file_size_mb BETWEEN 1 AND 1000
ORDER BY upload_date DESC, file_size_mb DESC
LIMIT 50;
-- Advanced file search with full-text capabilities
SELECT
file_id,
filename,
content_type,
file_size_mb,
metadata.description,
metadata.tags,
-- Search relevance scoring
TEXTRANK() as relevance_score,
-- File categorization
metadata.category,
-- Performance metrics
metadata.compressionRatio,
metadata.optimizations
FROM gridfs_files
WHERE TEXTSEARCH('training video multimedia')
OR filename ILIKE '%training%'
OR metadata.tags && ARRAY['training', 'video']
OR metadata.description ILIKE '%training%'
ORDER BY relevance_score DESC, upload_date DESC;
-- Aggregated file statistics by category
WITH file_analytics AS (
SELECT
metadata.category,
COUNT(*) as file_count,
SUM(file_size_mb) as total_size_mb,
AVG(file_size_mb) as avg_size_mb,
MIN(file_size_mb) as min_size_mb,
MAX(file_size_mb) as max_size_mb,
-- Compression analysis
COUNT(*) FILTER (WHERE metadata.compressed = true) as compressed_files,
AVG(metadata.compressionRatio) FILTER (WHERE metadata.compressed = true) as avg_compression_ratio,
-- Access patterns
SUM(metadata.accessCount) as total_downloads,
AVG(metadata.accessCount) as avg_downloads_per_file,
-- Date ranges
MIN(upload_date) as earliest_upload,
MAX(upload_date) as latest_upload,
-- Content type distribution
array_agg(DISTINCT content_type) as content_types
FROM gridfs_files
WHERE upload_date >= CURRENT_TIMESTAMP - INTERVAL '1 year'
GROUP BY metadata.category
),
storage_efficiency AS (
SELECT
category,
file_count,
total_size_mb,
compressed_files,
-- Storage efficiency metrics
ROUND((compressed_files::numeric / file_count * 100), 1) as compression_rate_percent,
ROUND(avg_compression_ratio, 2) as avg_compression_ratio,
ROUND((total_size_mb * (1 - avg_compression_ratio)), 1) as estimated_space_saved_mb,
-- Performance insights
ROUND(avg_size_mb, 1) as avg_file_size_mb,
ROUND(total_downloads::numeric / file_count, 1) as avg_downloads_per_file,
-- Category health score
CASE
WHEN compression_rate_percent > 80 AND avg_compression_ratio < 0.7 THEN 'excellent'
WHEN compression_rate_percent > 60 AND avg_compression_ratio < 0.8 THEN 'good'
WHEN compression_rate_percent > 40 THEN 'fair'
ELSE 'poor'
END as storage_efficiency_rating,
content_types
FROM file_analytics
)
SELECT
category,
file_count,
total_size_mb,
avg_file_size_mb,
compression_rate_percent,
avg_compression_ratio,
estimated_space_saved_mb,
storage_efficiency_rating,
avg_downloads_per_file,
-- Recommendations
CASE storage_efficiency_rating
WHEN 'poor' THEN 'Consider enabling compression for more files in this category'
WHEN 'fair' THEN 'Review compression settings to improve storage efficiency'
WHEN 'good' THEN 'Storage efficiency is good, monitor for further optimization'
ELSE 'Excellent storage efficiency - current settings are optimal'
END as optimization_recommendation,
ARRAY_LENGTH(content_types, 1) as content_type_variety,
content_types
FROM storage_efficiency
ORDER BY total_size_mb DESC;
-- File lifecycle management with retention policies
WITH expiring_files AS (
SELECT
file_id,
filename,
content_type,
file_size_mb,
upload_date,
metadata.expirationDate,
metadata.retentionDays,
metadata.category,
metadata.uploadedBy,
metadata.accessCount,
metadata.lastAccessed,
-- Calculate time until expiration
metadata.expirationDate - CURRENT_TIMESTAMP as time_until_expiration,
-- Classify expiration urgency
CASE
WHEN metadata.expirationDate < CURRENT_TIMESTAMP THEN 'expired'
WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '7 days' THEN 'expires_this_week'
WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '30 days' THEN 'expires_this_month'
WHEN metadata.expirationDate < CURRENT_TIMESTAMP + INTERVAL '90 days' THEN 'expires_this_quarter'
ELSE 'expires_later'
END as expiration_urgency,
-- Calculate retention recommendation
CASE
WHEN metadata.accessCount = 0 THEN 'delete_unused'
WHEN metadata.lastAccessed < CURRENT_TIMESTAMP - INTERVAL '180 days' THEN 'archive_old'
WHEN metadata.accessCount > 10 AND metadata.lastAccessed > CURRENT_TIMESTAMP - INTERVAL '30 days' THEN 'extend_retention'
ELSE 'maintain_current'
END as retention_recommendation
FROM gridfs_files
WHERE metadata.expirationDate IS NOT NULL
),
retention_actions AS (
SELECT
expiration_urgency,
retention_recommendation,
COUNT(*) as file_count,
SUM(file_size_mb) as total_size_mb,
-- Files by category
array_agg(DISTINCT category) as categories_affected,
-- Size distribution
ROUND(AVG(file_size_mb), 1) as avg_file_size_mb,
ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY file_size_mb), 1) as median_file_size_mb,
-- Access patterns
ROUND(AVG(accessCount), 1) as avg_access_count,
MIN(lastAccessed) as oldest_last_access,
MAX(lastAccessed) as newest_last_access
FROM expiring_files
GROUP BY expiration_urgency, retention_recommendation
)
SELECT
expiration_urgency,
retention_recommendation,
file_count,
total_size_mb,
-- Action priority scoring
CASE expiration_urgency
WHEN 'expired' THEN 100
WHEN 'expires_this_week' THEN 90
WHEN 'expires_this_month' THEN 70
WHEN 'expires_this_quarter' THEN 50
ELSE 30
END as action_priority_score,
-- Recommended actions
CASE retention_recommendation
WHEN 'delete_unused' THEN 'DELETE - No access history, safe to remove'
WHEN 'archive_old' THEN 'ARCHIVE - Move to cold storage or compress further'
WHEN 'extend_retention' THEN 'EXTEND - Popular file, consider extending retention period'
ELSE 'MONITOR - Continue with current retention policy'
END as recommended_action,
-- Resource impact
CONCAT(ROUND((total_size_mb / 1024), 1), ' GB') as storage_impact,
categories_affected,
avg_file_size_mb,
avg_access_count
FROM retention_actions
ORDER BY action_priority_score DESC, total_size_mb DESC;
-- Real-time file transfer monitoring
SELECT
transfer_id,
operation_type, -- 'upload' or 'download'
file_id,
filename,
-- Progress tracking
bytes_transferred,
total_bytes,
ROUND((bytes_transferred::numeric / NULLIF(total_bytes, 0)) * 100, 1) as progress_percent,
-- Performance metrics
transfer_rate_mbps,
estimated_time_remaining_seconds,
-- Transfer details
client_ip,
user_agent,
compression_enabled,
encryption_enabled,
-- Status and timing
status, -- 'in_progress', 'completed', 'failed', 'paused'
started_at,
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at)) as duration_seconds,
-- Quality metrics
error_count,
retry_count,
CASE
WHEN transfer_rate_mbps > 10 THEN 'fast'
WHEN transfer_rate_mbps > 1 THEN 'normal'
ELSE 'slow'
END as transfer_speed_rating
FROM active_file_transfers
WHERE status IN ('in_progress', 'paused')
AND started_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY started_at DESC;
-- Performance optimization analysis
WITH transfer_performance AS (
SELECT
DATE_TRUNC('hour', started_at) as time_bucket,
operation_type,
-- Volume metrics
COUNT(*) as transfer_count,
SUM(total_bytes) / (1024 * 1024 * 1024) as total_gb_transferred,
-- Performance metrics
AVG(transfer_rate_mbps) as avg_transfer_rate_mbps,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY transfer_rate_mbps) as p95_transfer_rate_mbps,
MIN(transfer_rate_mbps) as min_transfer_rate_mbps,
-- Success rates
COUNT(*) FILTER (WHERE status = 'completed') as successful_transfers,
COUNT(*) FILTER (WHERE status = 'failed') as failed_transfers,
COUNT(*) FILTER (WHERE retry_count > 0) as transfers_with_retries,
-- Timing analysis
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_duration_seconds,
MAX(EXTRACT(EPOCH FROM (completed_at - started_at))) as max_duration_seconds,
-- Feature usage
COUNT(*) FILTER (WHERE compression_enabled = true) as compressed_transfers,
COUNT(*) FILTER (WHERE encryption_enabled = true) as encrypted_transfers
FROM active_file_transfers
WHERE started_at >= CURRENT_TIMESTAMP - INTERVAL '7 days'
AND status IN ('completed', 'failed')
GROUP BY DATE_TRUNC('hour', started_at), operation_type
)
SELECT
time_bucket,
operation_type,
transfer_count,
total_gb_transferred,
-- Performance indicators
ROUND(avg_transfer_rate_mbps, 1) as avg_speed_mbps,
ROUND(p95_transfer_rate_mbps, 1) as p95_speed_mbps,
-- Success metrics
ROUND((successful_transfers::numeric / transfer_count * 100), 1) as success_rate_percent,
ROUND((transfers_with_retries::numeric / transfer_count * 100), 1) as retry_rate_percent,
-- Duration insights
ROUND(avg_duration_seconds / 60, 1) as avg_duration_minutes,
ROUND(max_duration_seconds / 60, 1) as max_duration_minutes,
-- Feature adoption
ROUND((compressed_transfers::numeric / transfer_count * 100), 1) as compression_usage_percent,
ROUND((encrypted_transfers::numeric / transfer_count * 100), 1) as encryption_usage_percent,
-- Performance rating
CASE
WHEN avg_transfer_rate_mbps > 50 THEN 'excellent'
WHEN avg_transfer_rate_mbps > 20 THEN 'good'
WHEN avg_transfer_rate_mbps > 5 THEN 'fair'
ELSE 'poor'
END as performance_rating,
-- Optimization recommendations
CASE
WHEN avg_transfer_rate_mbps < 5 AND compression_usage_percent < 50 THEN 'Enable compression to improve transfer speeds'
WHEN retry_rate_percent > 20 THEN 'High retry rate indicates network issues or oversized chunks'
WHEN success_rate_percent < 95 THEN 'Investigate transfer failures and improve error handling'
WHEN max_duration_minutes > 60 THEN 'Consider chunked uploads for very large files'
ELSE 'Performance within acceptable ranges'
END as optimization_recommendation
FROM transfer_performance
ORDER BY time_bucket DESC;
-- QueryLeaf provides comprehensive GridFS capabilities:
-- 1. SQL-familiar file upload/download operations with streaming support
-- 2. Advanced compression and optimization through SQL parameters
-- 3. Full-text search capabilities for file metadata and content
-- 4. Comprehensive file analytics and storage optimization insights
-- 5. Automated lifecycle management with retention policy enforcement
-- 6. Real-time transfer monitoring and performance analysis
-- 7. Integration with MongoDB's native GridFS optimizations
-- 8. Familiar SQL patterns for complex file management operations
Best Practices for GridFS Production Deployment
Performance Optimization Strategies
Essential optimization techniques for high-throughput GridFS deployments:
- Chunk Size Optimization: Configure appropriate chunk sizes based on file types and access patterns
- Index Strategy: Create compound indexes on file metadata for efficient queries
- Compression Algorithms: Choose optimal compression based on file type and performance requirements
- Connection Pooling: Implement appropriate connection pooling for concurrent file operations
- Caching Layer: Add CDN or caching layer for frequently accessed files
- Monitoring Setup: Implement comprehensive monitoring for file operations and storage usage
Storage Architecture Design
Design principles for scalable GridFS storage systems:
- Sharding Strategy: Plan sharding keys for optimal file distribution across cluster nodes
- Replica Configuration: Configure appropriate read preferences for file access patterns
- Storage Tiering: Implement hot/cold storage strategies for lifecycle management
- Backup Strategy: Design comprehensive backup and recovery procedures for file data
- Security Implementation: Implement encryption, access controls, and audit logging
- Capacity Planning: Plan storage growth and performance scaling requirements
Conclusion
MongoDB GridFS Advanced Streaming and Compression provides enterprise-grade file storage capabilities that eliminate the complexity and limitations of traditional file systems while delivering sophisticated streaming, optimization, and management features. The ability to store, process, and serve large files with built-in compression, encryption, and metadata management makes building robust file storage systems both powerful and straightforward.
Key GridFS Advanced Features include:
- Streaming Optimization: Efficient chunked uploads and downloads with progress tracking
- Advanced Compression: Intelligent compression strategies based on file type and content
- Metadata Integration: Rich metadata storage with full-text search capabilities
- Performance Monitoring: Real-time transfer monitoring and optimization insights
- Lifecycle Management: Automated retention policies and storage optimization
- SQL Accessibility: Familiar file operations through QueryLeaf's SQL interface
Whether you're building content management systems, media platforms, document repositories, or backup solutions, MongoDB GridFS with QueryLeaf's SQL interface provides the foundation for scalable file storage that integrates seamlessly with your application data while maintaining familiar database interaction patterns.
QueryLeaf Integration: QueryLeaf automatically translates SQL file operations into MongoDB GridFS commands, making advanced file storage accessible through familiar SQL patterns. Complex streaming scenarios, compression settings, and metadata queries are seamlessly handled through standard SQL syntax, enabling developers to build powerful file management features without learning GridFS-specific APIs.
The combination of MongoDB's robust file storage capabilities with SQL-familiar operations creates an ideal platform for applications requiring both sophisticated file management and familiar database interaction patterns, ensuring your file storage systems remain both scalable and maintainable as your data requirements evolve.