MongoDB Bulk Operations and Performance Optimization: Advanced Batch Processing and High-Throughput Data Management
High-performance data processing applications require sophisticated bulk operation strategies that can handle large volumes of data efficiently while maintaining consistency and performance under varying load conditions. Traditional row-by-row database operations become prohibitively slow when processing thousands or millions of records, leading to application bottlenecks, extended processing times, and resource exhaustion in production environments.
MongoDB provides comprehensive bulk operation capabilities that enable high-throughput batch processing for insertions, updates, and deletions through optimized write strategies and intelligent batching mechanisms. Unlike traditional databases that require complex stored procedures or application-level batching logic, MongoDB's bulk operations leverage server-side optimization, write concern management, and atomic operation guarantees to deliver superior performance for large-scale data processing scenarios.
The Traditional Batch Processing Challenge
Conventional database batch processing approaches often struggle with performance and complexity:
-- Traditional PostgreSQL batch processing - limited throughput and complex error handling
-- Basic batch insert approach with poor performance characteristics
CREATE OR REPLACE FUNCTION batch_insert_products(
product_data JSONB[]
) RETURNS TABLE(
inserted_count INTEGER,
failed_count INTEGER,
processing_time_ms INTEGER,
error_details JSONB
) AS $$
DECLARE
product_record JSONB;
insert_count INTEGER := 0;
error_count INTEGER := 0;
start_time TIMESTAMP := clock_timestamp();
current_error TEXT;
error_list JSONB := '[]'::JSONB;
BEGIN
-- Individual row processing (extremely inefficient for large datasets)
FOREACH product_record IN ARRAY product_data
LOOP
BEGIN
INSERT INTO products (
product_name,
category,
price,
stock_quantity,
supplier_id,
created_at,
updated_at,
-- Basic validation during insertion
sku,
description,
weight_kg,
dimensions_cm,
-- Limited metadata support
tags,
attributes
)
VALUES (
product_record->>'product_name',
product_record->>'category',
(product_record->>'price')::DECIMAL(10,2),
(product_record->>'stock_quantity')::INTEGER,
(product_record->>'supplier_id')::UUID,
CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP,
-- Manual data extraction and validation
product_record->>'sku',
product_record->>'description',
(product_record->>'weight_kg')::DECIMAL(8,3),
product_record->>'dimensions_cm',
-- Limited JSON processing capabilities
string_to_array(product_record->>'tags', ','),
product_record->'attributes'
);
insert_count := insert_count + 1;
EXCEPTION
WHEN unique_violation THEN
error_count := error_count + 1;
error_list := error_list || jsonb_build_object(
'sku', product_record->>'sku',
'error', 'Duplicate SKU violation',
'error_code', 'UNIQUE_VIOLATION'
);
WHEN check_violation THEN
error_count := error_count + 1;
error_list := error_list || jsonb_build_object(
'sku', product_record->>'sku',
'error', 'Data validation failed',
'error_code', 'CHECK_VIOLATION'
);
WHEN OTHERS THEN
error_count := error_count + 1;
GET STACKED DIAGNOSTICS current_error = MESSAGE_TEXT;
error_list := error_list || jsonb_build_object(
'sku', product_record->>'sku',
'error', current_error,
'error_code', 'GENERAL_ERROR'
);
END;
END LOOP;
RETURN QUERY SELECT
insert_count,
error_count,
EXTRACT(MILLISECONDS FROM clock_timestamp() - start_time)::INTEGER,
error_list;
END;
$$ LANGUAGE plpgsql;
-- Batch update operation with limited optimization
CREATE OR REPLACE FUNCTION batch_update_inventory(
updates JSONB[]
) RETURNS TABLE(
updated_count INTEGER,
not_found_count INTEGER,
error_count INTEGER,
processing_details JSONB
) AS $$
DECLARE
update_record JSONB;
updated_rows INTEGER := 0;
not_found_rows INTEGER := 0;
error_rows INTEGER := 0;
temp_table_name TEXT := 'temp_inventory_updates_' || extract(epoch from now())::INTEGER;
processing_stats JSONB := '{}'::JSONB;
BEGIN
-- Create temporary table for batch processing (complex setup)
EXECUTE format('
CREATE TEMP TABLE %I (
sku VARCHAR(100),
stock_adjustment INTEGER,
price_adjustment DECIMAL(10,2),
update_reason VARCHAR(200),
batch_id UUID DEFAULT gen_random_uuid()
)', temp_table_name);
-- Insert updates into temporary table
FOREACH update_record IN ARRAY updates
LOOP
EXECUTE format('
INSERT INTO %I (sku, stock_adjustment, price_adjustment, update_reason)
VALUES ($1, $2, $3, $4)',
temp_table_name
) USING
update_record->>'sku',
(update_record->>'stock_adjustment')::INTEGER,
(update_record->>'price_adjustment')::DECIMAL(10,2),
update_record->>'update_reason';
END LOOP;
-- Perform batch update with limited atomicity
EXECUTE format('
WITH update_results AS (
UPDATE products p
SET
stock_quantity = p.stock_quantity + t.stock_adjustment,
price = CASE
WHEN t.price_adjustment IS NOT NULL THEN p.price + t.price_adjustment
ELSE p.price
END,
updated_at = CURRENT_TIMESTAMP,
last_update_reason = t.update_reason
FROM %I t
WHERE p.sku = t.sku
RETURNING p.sku, p.stock_quantity, p.price
),
stats AS (
SELECT COUNT(*) as updated_count FROM update_results
)
SELECT updated_count FROM stats',
temp_table_name
) INTO updated_rows;
-- Calculate not found items (complex logic)
EXECUTE format('
SELECT COUNT(*)
FROM %I t
WHERE NOT EXISTS (
SELECT 1 FROM products p WHERE p.sku = t.sku
)', temp_table_name
) INTO not_found_rows;
-- Cleanup temporary table
EXECUTE format('DROP TABLE %I', temp_table_name);
processing_stats := jsonb_build_object(
'total_processed', array_length(updates, 1),
'success_rate', CASE
WHEN array_length(updates, 1) > 0 THEN
ROUND((updated_rows::DECIMAL / array_length(updates, 1)) * 100, 2)
ELSE 0
END
);
RETURN QUERY SELECT
updated_rows,
not_found_rows,
error_rows,
processing_stats;
END;
$$ LANGUAGE plpgsql;
-- Complex batch delete with limited performance optimization
WITH batch_delete_products AS (
-- Identify products to delete based on complex criteria
SELECT
product_id,
sku,
category,
last_sold_date,
stock_quantity,
-- Complex deletion logic
CASE
WHEN stock_quantity = 0 AND last_sold_date < CURRENT_DATE - INTERVAL '365 days' THEN 'discontinued'
WHEN category = 'seasonal' AND EXTRACT(MONTH FROM CURRENT_DATE) NOT BETWEEN 6 AND 8 THEN 'seasonal_cleanup'
WHEN supplier_id IN (
SELECT supplier_id FROM suppliers WHERE status = 'inactive'
) THEN 'supplier_inactive'
ELSE 'no_delete'
END as delete_reason
FROM products
WHERE
-- Multi-condition filtering
(stock_quantity = 0 AND last_sold_date < CURRENT_DATE - INTERVAL '365 days')
OR (category = 'seasonal' AND EXTRACT(MONTH FROM CURRENT_DATE) NOT BETWEEN 6 AND 8)
OR supplier_id IN (
SELECT supplier_id FROM suppliers WHERE status = 'inactive'
)
),
deletion_validation AS (
-- Validate deletion constraints (complex dependency checking)
SELECT
bdp.*,
CASE
WHEN EXISTS (
SELECT 1 FROM order_items oi
WHERE oi.product_id = bdp.product_id
AND oi.order_date > CURRENT_DATE - INTERVAL '90 days'
) THEN 'recent_orders_exist'
WHEN EXISTS (
SELECT 1 FROM shopping_carts sc
WHERE sc.product_id = bdp.product_id
) THEN 'in_shopping_carts'
WHEN EXISTS (
SELECT 1 FROM wishlists w
WHERE w.product_id = bdp.product_id
) THEN 'in_wishlists'
ELSE 'safe_to_delete'
END as validation_status
FROM batch_delete_products bdp
WHERE bdp.delete_reason != 'no_delete'
),
safe_deletions AS (
-- Only proceed with safe deletions
SELECT product_id, sku, delete_reason
FROM deletion_validation
WHERE validation_status = 'safe_to_delete'
),
delete_execution AS (
-- Perform the actual deletion (limited batch efficiency)
DELETE FROM products
WHERE product_id IN (
SELECT product_id FROM safe_deletions
)
RETURNING product_id, sku
)
SELECT
COUNT(*) as deleted_count,
-- Limited statistics and reporting
json_agg(
json_build_object(
'sku', de.sku,
'delete_reason', sd.delete_reason
)
) as deleted_items,
-- Processing summary
(
SELECT COUNT(*)
FROM batch_delete_products
WHERE delete_reason != 'no_delete'
) as candidates_identified,
(
SELECT COUNT(*)
FROM deletion_validation
WHERE validation_status != 'safe_to_delete'
) as unsafe_deletions_blocked
FROM delete_execution de
JOIN safe_deletions sd ON de.product_id = sd.product_id;
-- Problems with traditional batch processing approaches:
-- 1. Poor performance due to row-by-row processing instead of set-based operations
-- 2. Complex error handling that doesn't scale with data volume
-- 3. Limited transaction management and rollback capabilities for batch operations
-- 4. No built-in support for partial failures and retry mechanisms
-- 5. Difficulty in maintaining data consistency during large batch operations
-- 6. Complex temporary table management and cleanup requirements
-- 7. Limited monitoring and progress tracking capabilities
-- 8. No native support for ordered vs unordered bulk operations
-- 9. Inefficient memory usage and connection management for large batches
-- 10. Lack of automatic optimization based on operation types and data patterns
-- Attempt at optimized bulk insert (still limited)
INSERT INTO products (
product_name, category, price, stock_quantity,
supplier_id, sku, description, created_at, updated_at
)
SELECT
batch_data.product_name,
batch_data.category,
batch_data.price::DECIMAL(10,2),
batch_data.stock_quantity::INTEGER,
batch_data.supplier_id::UUID,
batch_data.sku,
batch_data.description,
CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP
FROM (
VALUES
('Product A', 'Electronics', '299.99', '100', '123e4567-e89b-12d3-a456-426614174000', 'SKU001', 'Description A'),
('Product B', 'Electronics', '199.99', '50', '123e4567-e89b-12d3-a456-426614174000', 'SKU002', 'Description B')
-- Limited to small static datasets
) AS batch_data(product_name, category, price, stock_quantity, supplier_id, sku, description)
ON CONFLICT (sku) DO UPDATE SET
stock_quantity = products.stock_quantity + EXCLUDED.stock_quantity,
price = EXCLUDED.price,
updated_at = CURRENT_TIMESTAMP;
-- Traditional approach limitations:
-- 1. No dynamic batch size optimization based on system resources
-- 2. Limited support for complex document structures and nested data
-- 3. Poor error reporting and partial failure handling
-- 4. No built-in retry logic for transient failures
-- 5. Complex application logic required for batch orchestration
-- 6. Limited write concern and consistency level management
-- 7. No automatic performance monitoring and optimization
-- 8. Difficulty in handling mixed operation types (insert, update, delete) efficiently
-- 9. No native support for bulk operations with custom validation logic
-- 10. Limited scalability for distributed database deployments
MongoDB provides comprehensive bulk operation capabilities with advanced optimization:
// MongoDB Advanced Bulk Operations - high-performance batch processing with intelligent optimization
const { MongoClient, ObjectId } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('advanced_bulk_operations');
// Comprehensive MongoDB Bulk Operations Manager
class AdvancedBulkOperationsManager {
constructor(db, config = {}) {
this.db = db;
this.collections = {
products: db.collection('products'),
inventory: db.collection('inventory'),
orders: db.collection('orders'),
customers: db.collection('customers'),
bulkOperationLogs: db.collection('bulk_operation_logs'),
performanceMetrics: db.collection('performance_metrics')
};
// Advanced bulk operation configuration
this.config = {
defaultBatchSize: config.defaultBatchSize || 1000,
maxBatchSize: config.maxBatchSize || 10000,
maxRetries: config.maxRetries || 3,
retryDelay: config.retryDelay || 1000,
// Performance optimization settings
enableOptimisticBatching: config.enableOptimisticBatching !== false,
enableAdaptiveBatchSize: config.enableAdaptiveBatchSize !== false,
enablePerformanceMonitoring: config.enablePerformanceMonitoring !== false,
enableErrorAggregation: config.enableErrorAggregation !== false,
// Write concern and consistency settings
writeConcern: config.writeConcern || {
w: 'majority',
j: true,
wtimeout: 30000
},
// Bulk operation strategies
unorderedOperations: config.unorderedOperations !== false,
enablePartialFailures: config.enablePartialFailures !== false,
enableTransactionalBulk: config.enableTransactionalBulk || false,
// Memory and resource management
maxMemoryUsage: config.maxMemoryUsage || '1GB',
enableGarbageCollection: config.enableGarbageCollection !== false,
parallelOperations: config.parallelOperations || 4
};
// Performance tracking
this.performanceMetrics = {
operationsPerSecond: new Map(),
averageBatchTime: new Map(),
errorRates: new Map(),
throughputHistory: []
};
this.initializeBulkOperations();
}
async initializeBulkOperations() {
console.log('Initializing advanced bulk operations system...');
try {
// Create optimized indexes for bulk operations
await this.setupOptimizedIndexes();
// Initialize performance monitoring
if (this.config.enablePerformanceMonitoring) {
await this.setupPerformanceMonitoring();
}
// Setup bulk operation logging
await this.setupBulkOperationLogging();
console.log('Bulk operations system initialized successfully');
} catch (error) {
console.error('Error initializing bulk operations:', error);
throw error;
}
}
async setupOptimizedIndexes() {
console.log('Setting up indexes optimized for bulk operations...');
try {
// Product collection indexes for efficient bulk operations
await this.collections.products.createIndexes([
{ key: { sku: 1 }, unique: true, background: true },
{ key: { category: 1, createdAt: -1 }, background: true },
{ key: { supplier_id: 1, status: 1 }, background: true },
{ key: { 'pricing.lastUpdated': -1 }, background: true, sparse: true },
{ key: { tags: 1 }, background: true },
{ key: { 'inventory.lastStockUpdate': -1 }, background: true, sparse: true }
]);
// Inventory collection indexes
await this.collections.inventory.createIndexes([
{ key: { product_id: 1, warehouse_id: 1 }, unique: true, background: true },
{ key: { lastUpdated: -1 }, background: true },
{ key: { quantity: 1, status: 1 }, background: true }
]);
console.log('Bulk operation indexes created successfully');
} catch (error) {
console.error('Error creating bulk operation indexes:', error);
throw error;
}
}
async performAdvancedBulkInsert(documents, options = {}) {
console.log(`Performing advanced bulk insert for ${documents.length} documents...`);
const startTime = Date.now();
try {
// Validate and prepare documents for bulk insertion
const preparedDocuments = await this.prepareDocumentsForInsertion(documents, options);
// Determine optimal batch configuration
const batchConfig = this.calculateOptimalBatchConfiguration(preparedDocuments, 'insert');
// Execute bulk insert with advanced error handling
const insertResults = await this.executeBulkInsertBatches(preparedDocuments, batchConfig, options);
// Process and aggregate results
const aggregatedResults = await this.aggregateBulkResults(insertResults, 'insert');
// Log operation performance
await this.logBulkOperation('bulk_insert', {
documentCount: documents.length,
batchConfiguration: batchConfig,
results: aggregatedResults,
processingTime: Date.now() - startTime
});
return {
operation: 'bulk_insert',
totalDocuments: documents.length,
successful: aggregatedResults.successfulInserts,
failed: aggregatedResults.failedInserts,
// Detailed results
insertedIds: aggregatedResults.insertedIds,
errors: aggregatedResults.errors,
duplicates: aggregatedResults.duplicateErrors,
// Performance metrics
processingTime: Date.now() - startTime,
documentsPerSecond: Math.round((aggregatedResults.successfulInserts / (Date.now() - startTime)) * 1000),
batchesProcessed: insertResults.length,
averageBatchTime: insertResults.reduce((sum, r) => sum + r.processingTime, 0) / insertResults.length,
// Configuration used
batchConfiguration: batchConfig,
// Quality metrics
successRate: (aggregatedResults.successfulInserts / documents.length) * 100,
errorRate: (aggregatedResults.failedInserts / documents.length) * 100
};
} catch (error) {
console.error('Bulk insert operation failed:', error);
// Log failed operation
await this.logBulkOperation('bulk_insert_failed', {
documentCount: documents.length,
error: error.message,
processingTime: Date.now() - startTime
});
throw error;
}
}
async prepareDocumentsForInsertion(documents, options = {}) {
console.log('Preparing documents for bulk insertion with validation and enhancement...');
const preparedDocuments = [];
const validationErrors = [];
for (let i = 0; i < documents.length; i++) {
const document = documents[i];
try {
// Document validation and standardization
const preparedDoc = {
...document,
// Ensure consistent ObjectId handling
_id: document._id || new ObjectId(),
// Standardize timestamps
createdAt: document.createdAt || new Date(),
updatedAt: document.updatedAt || new Date(),
// Add bulk operation metadata
bulkOperationMetadata: {
batchId: options.batchId || new ObjectId(),
sourceOperation: 'bulk_insert',
insertionIndex: i,
processingTimestamp: new Date()
}
};
// Apply custom document transformations if provided
if (options.documentTransform) {
const transformedDoc = await options.documentTransform(preparedDoc, i);
preparedDocuments.push(transformedDoc);
} else {
preparedDocuments.push(preparedDoc);
}
// Enhanced document preparation for specific collections
if (options.collection === 'products') {
preparedDoc.searchKeywords = this.generateSearchKeywords(preparedDoc);
preparedDoc.categoryHierarchy = this.buildCategoryHierarchy(preparedDoc.category);
preparedDoc.pricingTiers = this.calculatePricingTiers(preparedDoc.price);
}
} catch (validationError) {
validationErrors.push({
index: i,
document: document,
error: validationError.message
});
}
}
if (validationErrors.length > 0 && !options.allowPartialFailures) {
throw new Error(`Document validation failed for ${validationErrors.length} documents`);
}
return {
documents: preparedDocuments,
validationErrors: validationErrors
};
}
calculateOptimalBatchConfiguration(preparedDocuments, operationType) {
console.log(`Calculating optimal batch configuration for ${operationType}...`);
const documentCount = preparedDocuments.documents ? preparedDocuments.documents.length : preparedDocuments.length;
const avgDocumentSize = this.estimateAverageDocumentSize(preparedDocuments);
// Adaptive batch sizing based on document characteristics
let optimalBatchSize = this.config.defaultBatchSize;
// Adjust based on document size
if (avgDocumentSize > 100000) { // Large documents (>100KB)
optimalBatchSize = Math.min(100, this.config.defaultBatchSize);
} else if (avgDocumentSize > 10000) { // Medium documents (>10KB)
optimalBatchSize = Math.min(500, this.config.defaultBatchSize);
} else { // Small documents
optimalBatchSize = Math.min(this.config.maxBatchSize, documentCount);
}
// Adjust based on operation type
const operationMultiplier = {
'insert': 1.0,
'update': 0.8,
'delete': 1.2,
'upsert': 0.7
};
optimalBatchSize = Math.round(optimalBatchSize * (operationMultiplier[operationType] || 1.0));
// Calculate number of batches
const numberOfBatches = Math.ceil(documentCount / optimalBatchSize);
return {
batchSize: optimalBatchSize,
numberOfBatches: numberOfBatches,
estimatedDocumentSize: avgDocumentSize,
operationType: operationType,
// Advanced configuration
unordered: this.config.unorderedOperations,
writeConcern: this.config.writeConcern,
maxTimeMS: 30000,
// Parallel processing configuration
parallelBatches: Math.min(this.config.parallelOperations, numberOfBatches)
};
}
async executeBulkInsertBatches(preparedDocuments, batchConfig, options = {}) {
console.log(`Executing ${batchConfig.numberOfBatches} bulk insert batches...`);
const documents = preparedDocuments.documents || preparedDocuments;
const batchResults = [];
const batches = this.createBatches(documents, batchConfig.batchSize);
// Execute batches with parallel processing
if (batchConfig.parallelBatches > 1) {
const batchGroups = this.createBatchGroups(batches, batchConfig.parallelBatches);
for (const batchGroup of batchGroups) {
const groupResults = await Promise.all(
batchGroup.map(batch => this.executeSingleInsertBatch(batch, batchConfig, options))
);
batchResults.push(...groupResults);
}
} else {
// Sequential execution for ordered operations
for (const batch of batches) {
const result = await this.executeSingleInsertBatch(batch, batchConfig, options);
batchResults.push(result);
}
}
return batchResults;
}
async executeSingleInsertBatch(batchDocuments, batchConfig, options = {}) {
const batchStartTime = Date.now();
try {
// Create collection reference
const collection = options.collection ? this.db.collection(options.collection) : this.collections.products;
// Configure bulk insert operation
const insertOptions = {
ordered: !batchConfig.unordered,
writeConcern: batchConfig.writeConcern,
maxTimeMS: batchConfig.maxTimeMS,
bypassDocumentValidation: options.bypassValidation || false
};
// Execute bulk insert
const insertResult = await collection.insertMany(batchDocuments, insertOptions);
return {
success: true,
batchSize: batchDocuments.length,
insertedCount: insertResult.insertedCount,
insertedIds: insertResult.insertedIds,
processingTime: Date.now() - batchStartTime,
errors: [],
// Performance metrics
documentsPerSecond: Math.round((insertResult.insertedCount / (Date.now() - batchStartTime)) * 1000),
avgDocumentProcessingTime: (Date.now() - batchStartTime) / batchDocuments.length
};
} catch (error) {
console.error('Batch insert failed:', error);
// Handle bulk write errors with detailed analysis
if (error.name === 'BulkWriteError' || error.name === 'MongoBulkWriteError') {
return this.processBulkWriteError(error, batchDocuments, batchStartTime);
}
return {
success: false,
batchSize: batchDocuments.length,
insertedCount: 0,
insertedIds: {},
processingTime: Date.now() - batchStartTime,
errors: [{
error: error.message,
errorCode: error.code,
batchIndex: 0
}]
};
}
}
processBulkWriteError(bulkError, batchDocuments, startTime) {
console.log('Processing bulk write error with detailed analysis...');
const processedResults = {
success: false,
batchSize: batchDocuments.length,
insertedCount: bulkError.result?.insertedCount || 0,
insertedIds: bulkError.result?.insertedIds || {},
processingTime: Date.now() - startTime,
errors: []
};
// Process individual write errors
if (bulkError.writeErrors) {
for (const writeError of bulkError.writeErrors) {
processedResults.errors.push({
index: writeError.index,
error: writeError.errmsg,
errorCode: writeError.code,
document: batchDocuments[writeError.index]
});
}
}
// Process write concern errors
if (bulkError.writeConcernErrors) {
for (const wcError of bulkError.writeConcernErrors) {
processedResults.errors.push({
error: wcError.errmsg,
errorCode: wcError.code,
type: 'write_concern_error'
});
}
}
return processedResults;
}
async performAdvancedBulkUpdate(updates, options = {}) {
console.log(`Performing advanced bulk update for ${updates.length} operations...`);
const startTime = Date.now();
try {
// Prepare update operations
const preparedUpdates = await this.prepareUpdateOperations(updates, options);
// Calculate optimal batching strategy
const batchConfig = this.calculateOptimalBatchConfiguration(preparedUpdates, 'update');
// Execute bulk updates with error handling
const updateResults = await this.executeBulkUpdateBatches(preparedUpdates, batchConfig, options);
// Aggregate and analyze results
const aggregatedResults = await this.aggregateBulkResults(updateResults, 'update');
return {
operation: 'bulk_update',
totalOperations: updates.length,
successful: aggregatedResults.successfulUpdates,
failed: aggregatedResults.failedUpdates,
modified: aggregatedResults.modifiedCount,
matched: aggregatedResults.matchedCount,
upserted: aggregatedResults.upsertedCount,
// Detailed results
errors: aggregatedResults.errors,
upsertedIds: aggregatedResults.upsertedIds,
// Performance metrics
processingTime: Date.now() - startTime,
operationsPerSecond: Math.round((aggregatedResults.successfulUpdates / (Date.now() - startTime)) * 1000),
batchesProcessed: updateResults.length,
// Update-specific metrics
updateEfficiency: aggregatedResults.modifiedCount / Math.max(aggregatedResults.matchedCount, 1),
batchConfiguration: batchConfig
};
} catch (error) {
console.error('Bulk update operation failed:', error);
throw error;
}
}
async prepareUpdateOperations(updates, options = {}) {
console.log('Preparing update operations with validation and optimization...');
const preparedOperations = [];
for (let i = 0; i < updates.length; i++) {
const update = updates[i];
// Standardize update operation structure
const preparedOperation = {
updateOne: {
filter: update.filter || { _id: update._id },
update: {
$set: {
...update.$set,
updatedAt: new Date(),
'bulkOperationMetadata.lastBulkUpdate': new Date(),
'bulkOperationMetadata.updateIndex': i
},
...(update.$inc && { $inc: update.$inc }),
...(update.$unset && { $unset: update.$unset }),
...(update.$push && { $push: update.$push }),
...(update.$pull && { $pull: update.$pull })
},
upsert: update.upsert || options.upsert || false,
arrayFilters: update.arrayFilters,
hint: update.hint
}
};
// Add conditional updates based on operation type
if (options.operationType === 'inventory_update') {
preparedOperation.updateOne.update.$set.lastStockUpdate = new Date();
// Add inventory-specific validation
if (update.$inc && update.$inc.quantity) {
preparedOperation.updateOne.update.$max = {
quantity: 0 // Prevent negative inventory
};
}
}
preparedOperations.push(preparedOperation);
}
return preparedOperations;
}
async executeBulkUpdateBatches(operations, batchConfig, options = {}) {
console.log(`Executing ${batchConfig.numberOfBatches} bulk update batches...`);
const collection = options.collection ? this.db.collection(options.collection) : this.collections.products;
const batches = this.createBatches(operations, batchConfig.batchSize);
const batchResults = [];
for (const batch of batches) {
const batchStartTime = Date.now();
try {
// Execute bulk write operations
const bulkResult = await collection.bulkWrite(batch, {
ordered: !batchConfig.unordered,
writeConcern: batchConfig.writeConcern,
maxTimeMS: batchConfig.maxTimeMS
});
batchResults.push({
success: true,
batchSize: batch.length,
matchedCount: bulkResult.matchedCount,
modifiedCount: bulkResult.modifiedCount,
upsertedCount: bulkResult.upsertedCount,
upsertedIds: bulkResult.upsertedIds,
processingTime: Date.now() - batchStartTime,
errors: []
});
} catch (error) {
console.error('Bulk update batch failed:', error);
if (error.name === 'BulkWriteError') {
batchResults.push(this.processBulkWriteError(error, batch, batchStartTime));
} else {
batchResults.push({
success: false,
batchSize: batch.length,
matchedCount: 0,
modifiedCount: 0,
processingTime: Date.now() - batchStartTime,
errors: [{ error: error.message, errorCode: error.code }]
});
}
}
}
return batchResults;
}
async performAdvancedBulkDelete(deletions, options = {}) {
console.log(`Performing advanced bulk delete for ${deletions.length} operations...`);
const startTime = Date.now();
try {
// Prepare deletion operations with safety checks
const preparedDeletions = await this.prepareDeletionOperations(deletions, options);
// Calculate optimal batching
const batchConfig = this.calculateOptimalBatchConfiguration(preparedDeletions, 'delete');
// Execute bulk deletions
const deleteResults = await this.executeBulkDeleteBatches(preparedDeletions, batchConfig, options);
// Aggregate results
const aggregatedResults = await this.aggregateBulkResults(deleteResults, 'delete');
return {
operation: 'bulk_delete',
totalOperations: deletions.length,
successful: aggregatedResults.successfulDeletes,
failed: aggregatedResults.failedDeletes,
deletedCount: aggregatedResults.deletedCount,
// Safety and audit information
safeguardsApplied: preparedDeletions.safeguards || [],
blockedDeletions: preparedDeletions.blocked || [],
// Performance metrics
processingTime: Date.now() - startTime,
operationsPerSecond: Math.round((aggregatedResults.successfulDeletes / (Date.now() - startTime)) * 1000),
errors: aggregatedResults.errors,
batchConfiguration: batchConfig
};
} catch (error) {
console.error('Bulk delete operation failed:', error);
throw error;
}
}
async prepareDeletionOperations(deletions, options = {}) {
console.log('Preparing deletion operations with safety validations...');
const preparedOperations = [];
const blockedDeletions = [];
const appliedSafeguards = [];
for (const deletion of deletions) {
// Apply safety checks for deletion operations
const safetyCheck = await this.validateDeletionSafety(deletion, options);
if (safetyCheck.safe) {
preparedOperations.push({
deleteOne: {
filter: deletion.filter || { _id: deletion._id },
hint: deletion.hint,
collation: deletion.collation
}
});
} else {
blockedDeletions.push({
operation: deletion,
reason: safetyCheck.reason,
dependencies: safetyCheck.dependencies
});
}
if (safetyCheck.safeguards) {
appliedSafeguards.push(...safetyCheck.safeguards);
}
}
return {
operations: preparedOperations,
blocked: blockedDeletions,
safeguards: appliedSafeguards
};
}
async validateDeletionSafety(deletion, options = {}) {
// Implement comprehensive safety checks for deletion operations
const safeguards = [];
const dependencies = [];
// Check for referential integrity
if (options.checkReferences !== false) {
const refCheck = await this.checkReferentialIntegrity(deletion.filter);
if (refCheck.hasReferences) {
dependencies.push(...refCheck.references);
}
}
// Check for recent activity
if (options.checkRecentActivity !== false) {
const activityCheck = await this.checkRecentActivity(deletion.filter);
if (activityCheck.hasRecentActivity) {
safeguards.push('recent_activity_detected');
}
}
// Determine if deletion is safe
const safe = dependencies.length === 0 && (!options.requireConfirmation || deletion.confirmed);
return {
safe: safe,
reason: safe ? null : `Dependencies found: ${dependencies.join(', ')}`,
dependencies: dependencies,
safeguards: safeguards
};
}
// Utility methods for batch processing and optimization
createBatches(items, batchSize) {
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
return batches;
}
createBatchGroups(batches, groupSize) {
const groups = [];
for (let i = 0; i < batches.length; i += groupSize) {
groups.push(batches.slice(i, i + groupSize));
}
return groups;
}
estimateAverageDocumentSize(documents) {
if (!documents || documents.length === 0) return 1000; // Default estimate
const sampleSize = Math.min(10, documents.length);
const sample = documents.slice(0, sampleSize);
const totalSize = sample.reduce((size, doc) => {
return size + JSON.stringify(doc).length;
}, 0);
return Math.round(totalSize / sampleSize);
}
async aggregateBulkResults(batchResults, operationType) {
console.log(`Aggregating results for ${batchResults.length} batches...`);
const aggregated = {
successfulOperations: 0,
failedOperations: 0,
errors: [],
totalProcessingTime: 0
};
// Operation-specific aggregation
switch (operationType) {
case 'insert':
aggregated.successfulInserts = 0;
aggregated.failedInserts = 0;
aggregated.insertedIds = {};
aggregated.duplicateErrors = [];
break;
case 'update':
aggregated.successfulUpdates = 0;
aggregated.failedUpdates = 0;
aggregated.matchedCount = 0;
aggregated.modifiedCount = 0;
aggregated.upsertedCount = 0;
aggregated.upsertedIds = {};
break;
case 'delete':
aggregated.successfulDeletes = 0;
aggregated.failedDeletes = 0;
aggregated.deletedCount = 0;
break;
}
// Aggregate results from all batches
for (const batchResult of batchResults) {
aggregated.totalProcessingTime += batchResult.processingTime;
if (batchResult.success) {
switch (operationType) {
case 'insert':
aggregated.successfulInserts += batchResult.insertedCount;
Object.assign(aggregated.insertedIds, batchResult.insertedIds);
break;
case 'update':
aggregated.successfulUpdates += batchResult.batchSize;
aggregated.matchedCount += batchResult.matchedCount;
aggregated.modifiedCount += batchResult.modifiedCount;
aggregated.upsertedCount += batchResult.upsertedCount || 0;
Object.assign(aggregated.upsertedIds, batchResult.upsertedIds || {});
break;
case 'delete':
aggregated.successfulDeletes += batchResult.batchSize;
aggregated.deletedCount += batchResult.deletedCount || batchResult.batchSize;
break;
}
} else {
switch (operationType) {
case 'insert':
aggregated.failedInserts += batchResult.batchSize - (batchResult.insertedCount || 0);
break;
case 'update':
aggregated.failedUpdates += batchResult.batchSize - (batchResult.matchedCount || 0);
break;
case 'delete':
aggregated.failedDeletes += batchResult.batchSize;
break;
}
}
// Aggregate errors
if (batchResult.errors && batchResult.errors.length > 0) {
aggregated.errors.push(...batchResult.errors);
}
}
return aggregated;
}
async logBulkOperation(operationType, operationData) {
try {
const logEntry = {
operationType: operationType,
timestamp: new Date(),
...operationData,
// System context
systemMetrics: {
memoryUsage: process.memoryUsage(),
nodeVersion: process.version
}
};
await this.collections.bulkOperationLogs.insertOne(logEntry);
} catch (error) {
console.error('Error logging bulk operation:', error);
// Don't throw - logging shouldn't break bulk operations
}
}
// Additional utility methods for comprehensive bulk operations
generateSearchKeywords(document) {
const keywords = [];
if (document.title) {
keywords.push(...document.title.toLowerCase().split(/\s+/));
}
if (document.description) {
keywords.push(...document.description.toLowerCase().split(/\s+/));
}
if (document.tags) {
keywords.push(...document.tags.map(tag => tag.toLowerCase()));
}
// Remove duplicates and filter short words
return [...new Set(keywords)].filter(word => word.length > 2);
}
buildCategoryHierarchy(category) {
if (!category) return [];
const hierarchy = category.split('/');
const hierarchyPath = [];
for (let i = 0; i < hierarchy.length; i++) {
hierarchyPath.push(hierarchy.slice(0, i + 1).join('/'));
}
return hierarchyPath;
}
calculatePricingTiers(price) {
if (!price) return {};
return {
tier: price < 50 ? 'budget' : price < 200 ? 'mid-range' : 'premium',
priceRange: {
min: Math.floor(price / 50) * 50,
max: Math.ceil(price / 50) * 50
}
};
}
async checkReferentialIntegrity(filter) {
// Simplified referential integrity check
// In production, implement comprehensive relationship checking
return {
hasReferences: false,
references: []
};
}
async checkRecentActivity(filter) {
// Simplified activity check
// In production, check recent orders, updates, etc.
return {
hasRecentActivity: false,
lastActivity: null
};
}
}
// Benefits of MongoDB Advanced Bulk Operations:
// - High-performance batch processing with intelligent batch size optimization
// - Comprehensive error handling and partial failure recovery
// - Advanced write concern and consistency management
// - Optimized memory usage and resource management
// - Built-in performance monitoring and metrics collection
// - Sophisticated validation and safety checks for data integrity
// - Parallel processing capabilities for maximum throughput
// - Transaction support for atomic multi-document operations
// - Automatic retry logic with exponential backoff
// - SQL-compatible bulk operations through QueryLeaf integration
module.exports = {
AdvancedBulkOperationsManager
};
Understanding MongoDB Bulk Operations Architecture
Advanced Batch Processing and Performance Optimization Strategies
Implement sophisticated bulk operation patterns for production MongoDB deployments:
// Production-ready MongoDB bulk operations with advanced optimization and monitoring
class ProductionBulkProcessor extends AdvancedBulkOperationsManager {
constructor(db, productionConfig) {
super(db, productionConfig);
this.productionConfig = {
...productionConfig,
enableDistributedProcessing: true,
enableLoadBalancing: true,
enableFailoverHandling: true,
enableCapacityPlanning: true,
enableAutomaticOptimization: true,
enableComplianceAuditing: true
};
this.setupProductionOptimizations();
this.initializeDistributedProcessing();
this.setupCapacityPlanning();
}
async implementDistributedBulkProcessing(operations, distributionStrategy) {
console.log('Implementing distributed bulk processing across multiple nodes...');
const distributedStrategy = {
// Sharding-aware distribution
shardAwareDistribution: {
enableShardKeyOptimization: true,
balanceAcrossShards: true,
minimizeCrossShardOperations: true,
optimizeForShardKey: distributionStrategy.shardKey
},
// Load balancing strategies
loadBalancing: {
dynamicBatchSizing: true,
nodeCapacityAware: true,
latencyOptimized: true,
throughputMaximization: true
},
// Fault tolerance and recovery
faultTolerance: {
automaticFailover: true,
retryFailedBatches: true,
partialFailureRecovery: true,
deadlockDetection: true
}
};
return await this.executeDistributedBulkOperations(operations, distributedStrategy);
}
async setupAdvancedBulkOptimization() {
console.log('Setting up advanced bulk operation optimization...');
const optimizationStrategies = {
// Write optimization patterns
writeOptimization: {
journalingSyncOptimization: true,
writeBufferOptimization: true,
concurrencyControlOptimization: true,
lockMinimizationStrategies: true
},
// Memory management optimization
memoryOptimization: {
documentBatching: true,
memoryPooling: true,
garbageCollectionOptimization: true,
cacheOptimization: true
},
// Network optimization
networkOptimization: {
compressionOptimization: true,
connectionPoolingOptimization: true,
batchTransmissionOptimization: true,
networkLatencyMinimization: true
}
};
return await this.deployOptimizationStrategies(optimizationStrategies);
}
async implementAdvancedErrorHandlingAndRecovery() {
console.log('Implementing advanced error handling and recovery mechanisms...');
const errorHandlingStrategy = {
// Error classification and handling
errorClassification: {
transientErrors: ['NetworkTimeout', 'TemporaryUnavailable'],
permanentErrors: ['ValidationError', 'DuplicateKey'],
retriableErrors: ['WriteConflict', 'LockTimeout'],
fatalErrors: ['OutOfMemory', 'DiskFull']
},
// Recovery strategies
recoveryStrategies: {
automaticRetry: {
maxRetries: 5,
exponentialBackoff: true,
jitterRandomization: true
},
partialFailureHandling: {
isolateFailedOperations: true,
continueWithSuccessful: true,
generateFailureReport: true
},
circuitBreaker: {
failureThreshold: 10,
recoveryTimeout: 60000,
halfOpenRetryCount: 3
}
}
};
return await this.deployErrorHandlingStrategy(errorHandlingStrategy);
}
}
SQL-Style Bulk Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB bulk operations and high-throughput data processing:
-- QueryLeaf advanced bulk operations with SQL-familiar syntax for MongoDB
-- Configure bulk operation settings
SET bulk_operation_batch_size = 1000;
SET bulk_operation_parallel_batches = 4;
SET bulk_operation_write_concern = 'majority';
SET bulk_operation_ordered = false;
SET bulk_operation_bypass_validation = false;
-- Advanced bulk insert with comprehensive error handling and performance optimization
WITH product_data_preparation AS (
SELECT
-- Prepare product data with validation and enhancement
product_id,
product_name,
category,
CAST(price AS DECIMAL(10,2)) as validated_price,
CAST(stock_quantity AS INTEGER) as validated_stock,
supplier_id,
-- Generate enhanced metadata for optimal MongoDB storage
ARRAY[
LOWER(product_name),
LOWER(category),
LOWER(supplier_name)
] as search_keywords,
-- Build category hierarchy for efficient querying
STRING_TO_ARRAY(category, '/') as category_hierarchy,
-- Calculate pricing tiers for analytics
CASE
WHEN price < 50 THEN 'budget'
WHEN price < 200 THEN 'mid-range'
ELSE 'premium'
END as pricing_tier,
-- Add bulk operation metadata
JSON_OBJECT(
'batch_id', GENERATE_UUID(),
'source_system', 'inventory_import',
'import_timestamp', CURRENT_TIMESTAMP,
'validation_status', 'passed'
) as bulk_metadata,
-- Standard timestamps
CURRENT_TIMESTAMP as created_at,
CURRENT_TIMESTAMP as updated_at,
-- Data quality scoring
(
CASE WHEN product_name IS NOT NULL AND LENGTH(TRIM(product_name)) > 0 THEN 1 ELSE 0 END +
CASE WHEN category IS NOT NULL AND LENGTH(TRIM(category)) > 0 THEN 1 ELSE 0 END +
CASE WHEN price > 0 THEN 1 ELSE 0 END +
CASE WHEN stock_quantity >= 0 THEN 1 ELSE 0 END +
CASE WHEN supplier_id IS NOT NULL THEN 1 ELSE 0 END
) / 5.0 as data_quality_score
FROM staging_products sp
JOIN suppliers s ON sp.supplier_id = s.supplier_id
WHERE
-- Data validation filters
sp.product_name IS NOT NULL
AND TRIM(sp.product_name) != ''
AND sp.price > 0
AND sp.stock_quantity >= 0
AND s.status = 'active'
),
bulk_insert_configuration AS (
SELECT
COUNT(*) as total_documents,
-- Calculate optimal batch configuration
CASE
WHEN AVG(LENGTH(product_name::TEXT) + LENGTH(COALESCE(description, '')::TEXT)) > 10000 THEN 500
WHEN AVG(LENGTH(product_name::TEXT) + LENGTH(COALESCE(description, '')::TEXT)) > 1000 THEN 1000
ELSE 2000
END as optimal_batch_size,
-- Parallel processing configuration
LEAST(4, CEIL(COUNT(*) / 1000.0)) as parallel_batches,
-- Performance prediction
CASE
WHEN COUNT(*) < 1000 THEN 'fast'
WHEN COUNT(*) < 10000 THEN 'moderate'
ELSE 'extended'
END as expected_processing_time
FROM product_data_preparation
WHERE data_quality_score >= 0.8
)
-- Execute advanced bulk insert operation
INSERT INTO products (
product_id,
product_name,
category,
category_hierarchy,
price,
pricing_tier,
stock_quantity,
supplier_id,
search_keywords,
bulk_operation_metadata,
created_at,
updated_at,
data_quality_score
)
SELECT
pdp.product_id,
pdp.product_name,
pdp.category,
pdp.category_hierarchy,
pdp.validated_price,
pdp.pricing_tier,
pdp.validated_stock,
pdp.supplier_id,
pdp.search_keywords,
pdp.bulk_metadata,
pdp.created_at,
pdp.updated_at,
pdp.data_quality_score
FROM product_data_preparation pdp
CROSS JOIN bulk_insert_configuration bic
WHERE pdp.data_quality_score >= 0.8
-- Advanced bulk insert configuration
WITH (
batch_size = (SELECT optimal_batch_size FROM bulk_insert_configuration),
parallel_batches = (SELECT parallel_batches FROM bulk_insert_configuration),
write_concern = 'majority',
ordered_operations = false,
-- Error handling configuration
continue_on_error = true,
duplicate_key_handling = 'skip',
validation_bypass = false,
-- Performance optimization
enable_compression = true,
connection_pooling = true,
write_buffer_size = '64MB',
-- Monitoring and logging
enable_performance_monitoring = true,
log_detailed_errors = true,
track_operation_metrics = true
);
-- Advanced bulk update with intelligent batching and conflict resolution
WITH inventory_updates AS (
SELECT
product_id,
warehouse_id,
quantity_adjustment,
price_adjustment,
update_reason,
source_system,
-- Calculate update priority
CASE
WHEN ABS(quantity_adjustment) > 1000 THEN 'high'
WHEN ABS(quantity_adjustment) > 100 THEN 'medium'
ELSE 'low'
END as update_priority,
-- Validate adjustments
CASE
WHEN quantity_adjustment < 0 THEN
-- Ensure we don't create negative inventory
GREATEST(quantity_adjustment, -current_stock_quantity)
ELSE quantity_adjustment
END as safe_quantity_adjustment,
-- Add update metadata
JSON_OBJECT(
'update_batch_id', GENERATE_UUID(),
'update_timestamp', CURRENT_TIMESTAMP,
'update_source', source_system,
'validation_status', 'approved'
) as update_metadata
FROM staging_inventory_updates siu
JOIN current_inventory ci ON siu.product_id = ci.product_id
AND siu.warehouse_id = ci.warehouse_id
WHERE
-- Update validation
ABS(siu.quantity_adjustment) <= 10000 -- Prevent massive adjustments
AND siu.price_adjustment IS NULL OR ABS(siu.price_adjustment) <= siu.current_price * 0.5 -- Max 50% price change
),
conflict_resolution AS (
-- Handle potential update conflicts
SELECT
iu.*,
-- Detect conflicting updates
CASE
WHEN EXISTS (
SELECT 1 FROM recent_inventory_updates riu
WHERE riu.product_id = iu.product_id
AND riu.warehouse_id = iu.warehouse_id
AND riu.update_timestamp > CURRENT_TIMESTAMP - INTERVAL '5 minutes'
) THEN 'potential_conflict'
ELSE 'safe_to_update'
END as conflict_status,
-- Calculate final values
ci.stock_quantity + iu.safe_quantity_adjustment as final_stock_quantity,
COALESCE(ci.price + iu.price_adjustment, ci.price) as final_price
FROM inventory_updates iu
JOIN current_inventory ci ON iu.product_id = ci.product_id
AND iu.warehouse_id = ci.warehouse_id
)
-- Execute bulk update with advanced error handling
UPDATE products
SET
-- Core field updates
stock_quantity = cr.final_stock_quantity,
price = cr.final_price,
updated_at = CURRENT_TIMESTAMP,
-- Audit trail updates
last_inventory_update = CURRENT_TIMESTAMP,
inventory_update_reason = cr.update_reason,
inventory_update_source = cr.source_system,
-- Metadata updates
bulk_operation_metadata = JSON_SET(
COALESCE(bulk_operation_metadata, '{}'),
'$.last_bulk_update', CURRENT_TIMESTAMP,
'$.update_batch_info', cr.update_metadata
),
-- Analytics updates
total_adjustments = COALESCE(total_adjustments, 0) + 1,
cumulative_quantity_adjustments = COALESCE(cumulative_quantity_adjustments, 0) + cr.safe_quantity_adjustment
FROM conflict_resolution cr
WHERE products.product_id = cr.product_id
AND cr.conflict_status = 'safe_to_update'
AND cr.final_stock_quantity >= 0 -- Additional safety check
-- Bulk update configuration
WITH (
batch_size = 1500,
parallel_batches = 3,
write_concern = 'majority',
max_time_ms = 30000,
-- Conflict handling
retry_on_conflict = true,
max_retries = 3,
backoff_strategy = 'exponential',
-- Validation and safety
enable_pre_update_validation = true,
enable_post_update_validation = true,
rollback_on_validation_failure = true,
-- Performance optimization
hint_index = 'product_warehouse_compound',
bypass_document_validation = false
);
-- Advanced bulk upsert operation combining insert and update logic
WITH product_sync_data AS (
SELECT
external_product_id,
product_name,
category,
price,
stock_quantity,
supplier_code,
last_modified_external,
-- Determine if this should be insert or update
CASE
WHEN EXISTS (
SELECT 1 FROM products p
WHERE p.external_product_id = spd.external_product_id
) THEN 'update'
ELSE 'insert'
END as operation_type,
-- Calculate data freshness
EXTRACT(DAYS FROM CURRENT_TIMESTAMP - last_modified_external) as days_since_modified,
-- Prepare upsert metadata
JSON_OBJECT(
'sync_batch_id', GENERATE_UUID(),
'sync_timestamp', CURRENT_TIMESTAMP,
'source_system', 'external_catalog',
'operation_type', 'upsert',
'data_freshness_days', EXTRACT(DAYS FROM CURRENT_TIMESTAMP - last_modified_external)
) as upsert_metadata
FROM staging_product_data spd
WHERE spd.last_modified_external > CURRENT_TIMESTAMP - INTERVAL '7 days' -- Only sync recent changes
),
upsert_validation AS (
SELECT
psd.*,
-- Validate data quality for upsert
(
CASE WHEN product_name IS NOT NULL AND LENGTH(TRIM(product_name)) > 0 THEN 1 ELSE 0 END +
CASE WHEN category IS NOT NULL THEN 1 ELSE 0 END +
CASE WHEN price > 0 THEN 1 ELSE 0 END +
CASE WHEN supplier_code IS NOT NULL THEN 1 ELSE 0 END
) / 4.0 as validation_score,
-- Check for significant changes (for updates)
CASE
WHEN psd.operation_type = 'update' THEN
COALESCE(
(SELECT
CASE
WHEN ABS(p.price - psd.price) > p.price * 0.1 OR -- 10% price change
ABS(p.stock_quantity - psd.stock_quantity) > 10 OR -- Stock change > 10
p.product_name != psd.product_name -- Name change
THEN 'significant_changes'
ELSE 'minor_changes'
END
FROM products p
WHERE p.external_product_id = psd.external_product_id),
'new_record'
)
ELSE 'new_record'
END as change_significance
FROM product_sync_data psd
)
-- Execute bulk upsert operation
INSERT INTO products (
external_product_id,
product_name,
category,
price,
stock_quantity,
supplier_code,
bulk_operation_metadata,
created_at,
updated_at,
data_validation_score,
sync_status
)
SELECT
uv.external_product_id,
uv.product_name,
uv.category,
uv.price,
uv.stock_quantity,
uv.supplier_code,
uv.upsert_metadata,
CASE WHEN uv.operation_type = 'insert' THEN CURRENT_TIMESTAMP ELSE NULL END,
CURRENT_TIMESTAMP,
uv.validation_score,
'synchronized'
FROM upsert_validation uv
WHERE uv.validation_score >= 0.75
-- Handle conflicts with upsert logic
ON CONFLICT (external_product_id)
DO UPDATE SET
product_name = CASE
WHEN EXCLUDED.change_significance = 'significant_changes' THEN EXCLUDED.product_name
ELSE products.product_name
END,
category = EXCLUDED.category,
price = CASE
WHEN ABS(EXCLUDED.price - products.price) > products.price * 0.05 -- 5% threshold
THEN EXCLUDED.price
ELSE products.price
END,
stock_quantity = EXCLUDED.stock_quantity,
updated_at = CURRENT_TIMESTAMP,
last_sync_timestamp = CURRENT_TIMESTAMP,
sync_status = 'synchronized',
-- Update metadata with merge information
bulk_operation_metadata = JSON_SET(
COALESCE(products.bulk_operation_metadata, '{}'),
'$.last_upsert_operation', EXCLUDED.upsert_metadata,
'$.upsert_history', JSON_ARRAY_APPEND(
COALESCE(JSON_EXTRACT(products.bulk_operation_metadata, '$.upsert_history'), '[]'),
'$', JSON_OBJECT(
'timestamp', CURRENT_TIMESTAMP,
'changes_applied', EXCLUDED.change_significance
)
)
)
-- Upsert operation configuration
WITH (
batch_size = 800, -- Smaller batches for upsert complexity
parallel_batches = 2,
write_concern = 'majority',
-- Upsert-specific configuration
conflict_resolution = 'merge_strategy',
enable_change_detection = true,
preserve_existing_metadata = true,
-- Performance optimization for upsert
enable_index_hints = true,
optimize_for_update_heavy = true
);
-- Advanced bulk delete with comprehensive safety checks and audit trail
WITH deletion_candidates AS (
SELECT
product_id,
product_name,
category,
created_at,
last_sold_date,
stock_quantity,
-- Determine deletion reason and safety
CASE
WHEN stock_quantity = 0 AND last_sold_date < CURRENT_DATE - INTERVAL '2 years' THEN 'discontinued_product'
WHEN category IN ('seasonal', 'limited_edition') AND created_at < CURRENT_DATE - INTERVAL '1 year' THEN 'seasonal_cleanup'
WHEN supplier_id IN (SELECT supplier_id FROM suppliers WHERE status = 'inactive') THEN 'inactive_supplier'
ELSE 'no_deletion'
END as deletion_reason,
-- Safety checks
NOT EXISTS (
SELECT 1 FROM order_items oi
WHERE oi.product_id = p.product_id
AND oi.order_date > CURRENT_DATE - INTERVAL '6 months'
) as no_recent_orders,
NOT EXISTS (
SELECT 1 FROM shopping_carts sc
WHERE sc.product_id = p.product_id
) as not_in_carts,
NOT EXISTS (
SELECT 1 FROM pending_shipments ps
WHERE ps.product_id = p.product_id
) as no_pending_shipments
FROM products p
WHERE p.status IN ('discontinued', 'inactive', 'marked_for_deletion')
),
safe_deletions AS (
SELECT
dc.*,
-- Overall safety assessment
(dc.no_recent_orders AND dc.not_in_carts AND dc.no_pending_shipments) as safe_to_delete,
-- Create audit record
JSON_OBJECT(
'deletion_batch_id', GENERATE_UUID(),
'deletion_timestamp', CURRENT_TIMESTAMP,
'deletion_reason', dc.deletion_reason,
'safety_checks_passed', (dc.no_recent_orders AND dc.not_in_carts AND dc.no_pending_shipments),
'product_snapshot', JSON_OBJECT(
'product_id', dc.product_id,
'product_name', dc.product_name,
'category', dc.category,
'last_sold_date', dc.last_sold_date,
'stock_quantity', dc.stock_quantity
)
) as audit_record
FROM deletion_candidates dc
WHERE dc.deletion_reason != 'no_deletion'
)
-- Create audit trail before deletion
INSERT INTO product_deletion_audit (
product_id,
deletion_reason,
audit_record,
deleted_at
)
SELECT
sd.product_id,
sd.deletion_reason,
sd.audit_record,
CURRENT_TIMESTAMP
FROM safe_deletions sd
WHERE sd.safe_to_delete = true;
-- Execute bulk delete operation
DELETE FROM products
WHERE product_id IN (
SELECT sd.product_id
FROM safe_deletions sd
WHERE sd.safe_to_delete = true
)
-- Bulk delete configuration
WITH (
batch_size = 500, -- Conservative batch size for deletes
parallel_batches = 2,
write_concern = 'majority',
-- Safety configuration
enable_referential_integrity_check = true,
enable_audit_trail = true,
require_confirmation = true,
-- Performance and safety balance
max_deletions_per_batch = 500,
enable_soft_delete = false, -- True deletion for cleanup
create_backup_before_delete = true
);
-- Comprehensive bulk operation monitoring and analytics
WITH bulk_operation_performance AS (
SELECT
operation_type,
DATE_TRUNC('hour', operation_timestamp) as hour_bucket,
-- Volume metrics
COUNT(*) as total_operations,
SUM(documents_processed) as total_documents_processed,
SUM(successful_operations) as total_successful,
SUM(failed_operations) as total_failed,
-- Performance metrics
AVG(processing_time_ms) as avg_processing_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY processing_time_ms) as p95_processing_time,
AVG(documents_per_second) as avg_throughput,
MAX(documents_per_second) as peak_throughput,
-- Error analysis
AVG(CASE WHEN total_failed > 0 THEN (failed_operations * 100.0 / documents_processed) ELSE 0 END) as avg_error_rate,
-- Resource utilization
AVG(batch_size_used) as avg_batch_size,
AVG(parallel_batches_used) as avg_parallel_batches,
AVG(memory_usage_mb) as avg_memory_usage,
-- Configuration analysis
MODE() WITHIN GROUP (ORDER BY write_concern) as most_common_write_concern,
AVG(CASE WHEN ordered_operations THEN 1 ELSE 0 END) as ordered_operations_ratio
FROM bulk_operation_logs
WHERE operation_timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY operation_type, DATE_TRUNC('hour', operation_timestamp)
),
performance_trends AS (
SELECT
bop.*,
-- Trend analysis
LAG(avg_throughput) OVER (
PARTITION BY operation_type
ORDER BY hour_bucket
) as prev_hour_throughput,
LAG(avg_error_rate) OVER (
PARTITION BY operation_type
ORDER BY hour_bucket
) as prev_hour_error_rate,
-- Performance classification
CASE
WHEN avg_throughput > 1000 THEN 'high_performance'
WHEN avg_throughput > 500 THEN 'good_performance'
WHEN avg_throughput > 100 THEN 'adequate_performance'
ELSE 'low_performance'
END as performance_classification,
-- Optimization recommendations
CASE
WHEN avg_error_rate > 5 THEN 'investigate_error_patterns'
WHEN p95_processing_time > avg_processing_time * 2 THEN 'optimize_batch_sizing'
WHEN avg_memory_usage > 500 THEN 'optimize_memory_usage'
WHEN avg_throughput < 100 THEN 'review_indexing_strategy'
ELSE 'performance_optimal'
END as optimization_recommendation
FROM bulk_operation_performance bop
)
SELECT
operation_type,
hour_bucket,
-- Volume summary
total_operations,
total_documents_processed,
ROUND((total_successful * 100.0 / NULLIF(total_documents_processed, 0)), 2) as success_rate_percent,
-- Performance summary
ROUND(avg_processing_time, 1) as avg_processing_time_ms,
ROUND(p95_processing_time, 1) as p95_processing_time_ms,
ROUND(avg_throughput, 0) as avg_documents_per_second,
ROUND(peak_throughput, 0) as peak_documents_per_second,
-- Trend indicators
CASE
WHEN prev_hour_throughput IS NOT NULL THEN
ROUND(((avg_throughput - prev_hour_throughput) / prev_hour_throughput * 100), 1)
ELSE NULL
END as throughput_change_percent,
CASE
WHEN prev_hour_error_rate IS NOT NULL THEN
ROUND((avg_error_rate - prev_hour_error_rate), 2)
ELSE NULL
END as error_rate_change,
-- Configuration insights
ROUND(avg_batch_size, 0) as optimal_batch_size,
ROUND(avg_parallel_batches, 1) as avg_parallelization,
most_common_write_concern,
-- Performance assessment
performance_classification,
optimization_recommendation,
-- Detailed recommendations
CASE optimization_recommendation
WHEN 'investigate_error_patterns' THEN 'Review error logs and implement better validation'
WHEN 'optimize_batch_sizing' THEN 'Reduce batch size or increase timeout thresholds'
WHEN 'optimize_memory_usage' THEN 'Implement memory pooling and document streaming'
WHEN 'review_indexing_strategy' THEN 'Add missing indexes for bulk operation filters'
ELSE 'Continue current configuration - performance is optimal'
END as detailed_recommendation
FROM performance_trends
WHERE total_operations > 0
ORDER BY operation_type, hour_bucket DESC;
-- QueryLeaf provides comprehensive bulk operation capabilities:
-- 1. Advanced batch processing with intelligent sizing and parallelization
-- 2. Sophisticated error handling and partial failure recovery
-- 3. Comprehensive data validation and quality scoring
-- 4. Built-in audit trails and compliance tracking
-- 5. Performance monitoring and optimization recommendations
-- 6. Advanced conflict resolution and upsert strategies
-- 7. Safety checks and referential integrity validation
-- 8. Production-ready bulk operations with monitoring and alerting
-- 9. SQL-familiar syntax for complex bulk operation workflows
-- 10. Integration with MongoDB's native bulk operation optimizations
Best Practices for Production Bulk Operations
Performance Optimization and Batch Strategy
Essential principles for effective MongoDB bulk operation deployment:
- Batch Size Optimization: Calculate optimal batch sizes based on document size, operation type, and system resources
- Write Concern Management: Configure appropriate write concerns balancing performance with durability requirements
- Error Handling Strategy: Implement comprehensive error classification and recovery mechanisms for production resilience
- Validation and Safety: Design robust validation pipelines to ensure data quality and prevent harmful operations
- Performance Monitoring: Track operation metrics, throughput, and resource utilization for continuous optimization
- Resource Management: Monitor memory usage, connection pooling, and system resources during bulk operations
Scalability and Production Deployment
Optimize bulk operations for enterprise-scale requirements:
- Distributed Processing: Implement shard-aware batch distribution for optimal performance across MongoDB clusters
- Load Balancing: Design intelligent load balancing strategies that consider node capacity and network latency
- Fault Tolerance: Implement automatic failover and retry mechanisms for resilient bulk operation processing
- Capacity Planning: Monitor historical patterns and predict resource requirements for bulk operation scaling
- Compliance Integration: Ensure bulk operations meet audit, security, and compliance requirements
- Operational Integration: Integrate bulk operations with existing monitoring, alerting, and operational workflows
Conclusion
MongoDB bulk operations provide comprehensive high-performance batch processing capabilities that enable efficient handling of large-scale data operations through intelligent batching, advanced error handling, and sophisticated optimization strategies. The native bulk operation support ensures that batch processing benefits from MongoDB's write optimization, consistency guarantees, and scalability features.
Key MongoDB Bulk Operations benefits include:
- High-Performance Processing: Optimized batch processing with intelligent sizing and parallel execution capabilities
- Advanced Error Management: Comprehensive error handling with partial failure recovery and retry mechanisms
- Data Quality Assurance: Built-in validation and safety checks to ensure data integrity during bulk operations
- Resource Optimization: Intelligent memory management and resource utilization for optimal system performance
- Production Readiness: Enterprise-ready bulk operations with monitoring, auditing, and compliance features
- SQL Accessibility: Familiar SQL-style bulk operations through QueryLeaf for accessible high-throughput data management
Whether you're handling data imports, batch updates, inventory synchronization, or large-scale data cleanup operations, MongoDB bulk operations with QueryLeaf's familiar SQL interface provide the foundation for efficient, reliable, and scalable batch processing.
QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB bulk operations while providing SQL-familiar syntax for batch processing, error handling, and performance monitoring. Advanced bulk operation patterns, validation strategies, and optimization techniques are seamlessly handled through familiar SQL constructs, making high-performance batch processing accessible to SQL-oriented development teams.
The combination of MongoDB's robust bulk operation capabilities with SQL-style batch processing operations makes it an ideal platform for applications requiring both high-throughput data processing and familiar database management patterns, ensuring your bulk operations can scale efficiently while maintaining data quality and operational reliability.