MongoDB Aggregation Pipeline Optimization: SQL-Style Performance Tuning for Complex Data Analytics
Modern applications generate vast amounts of data requiring complex analytical processing - real-time reporting, business intelligence, data transformation, and advanced analytics. Traditional SQL databases handle complex queries through sophisticated query planners and optimization engines, but often struggle with unstructured data and horizontal scaling requirements.
MongoDB's aggregation pipeline provides powerful data processing capabilities that can handle complex analytics workloads at scale, but requires careful optimization to achieve optimal performance. Unlike traditional SQL query optimization that relies heavily on automatic query planning, MongoDB aggregation pipeline optimization requires understanding pipeline stage execution order, memory management, and strategic indexing approaches.
The Complex Analytics Performance Challenge
Traditional SQL analytics approaches face scalability and flexibility limitations:
-- Traditional SQL complex analytics - performance challenges at scale
WITH regional_sales AS (
SELECT
r.region_name,
p.category,
p.subcategory,
DATE_TRUNC('month', o.order_date) as month,
SUM(oi.quantity * oi.unit_price) as gross_revenue,
SUM(oi.quantity * p.cost_basis) as cost_of_goods,
COUNT(DISTINCT o.customer_id) as unique_customers,
COUNT(o.order_id) as total_orders
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
JOIN customers c ON o.customer_id = c.customer_id
JOIN regions r ON c.region_id = r.region_id
WHERE o.order_date >= '2024-01-01'
AND o.status IN ('completed', 'shipped')
GROUP BY r.region_name, p.category, p.subcategory, DATE_TRUNC('month', o.order_date)
),
monthly_trends AS (
SELECT
region_name,
category,
month,
SUM(gross_revenue) as monthly_revenue,
SUM(cost_of_goods) as monthly_costs,
(SUM(gross_revenue) - SUM(cost_of_goods)) as monthly_profit,
SUM(unique_customers) as monthly_customers,
SUM(total_orders) as monthly_orders,
-- Window functions for trend analysis
LAG(SUM(gross_revenue), 1) OVER (
PARTITION BY region_name, category
ORDER BY month
) as previous_month_revenue,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SUM(gross_revenue)) OVER (
PARTITION BY region_name, category
) as median_monthly_revenue
FROM regional_sales
GROUP BY region_name, category, month
)
SELECT
region_name,
category,
month,
monthly_revenue,
monthly_profit,
monthly_customers,
-- Growth calculations
ROUND(
((monthly_revenue - previous_month_revenue) / previous_month_revenue) * 100, 2
) as revenue_growth_percent,
-- Performance vs median
ROUND(
(monthly_revenue / median_monthly_revenue) * 100, 2
) as performance_vs_median,
-- Customer metrics
ROUND(monthly_revenue / monthly_customers, 2) as revenue_per_customer,
ROUND(monthly_orders / monthly_customers, 2) as orders_per_customer
FROM monthly_trends
WHERE month >= '2024-06-01'
ORDER BY region_name, category, month;
-- Problems with traditional approaches:
-- - Complex joins across multiple large tables
-- - Window functions require full data scanning
-- - Memory intensive for large datasets
-- - Limited horizontal scaling capabilities
-- - Rigid schema requirements
-- - Poor performance with nested/dynamic data structures
-- - Difficult to optimize for distributed processing
MongoDB aggregation pipelines provide optimized analytics processing:
// MongoDB optimized aggregation pipeline - high performance analytics
db.orders.aggregate([
// Stage 1: Early filtering with index support
{
$match: {
orderDate: { $gte: ISODate('2024-01-01') },
status: { $in: ['completed', 'shipped'] }
}
},
// Stage 2: Efficient lookup with optimized joins
{
$lookup: {
from: 'customers',
localField: 'customerId',
foreignField: '_id',
as: 'customer',
pipeline: [
{ $project: { regionId: 1, _id: 0 } } // Project only needed fields
]
}
},
// Stage 3: Unwind with preserveNullAndEmptyArrays for performance
{ $unwind: '$customer' },
{ $unwind: '$items' },
// Stage 4: Second lookup for product data
{
$lookup: {
from: 'products',
localField: 'items.productId',
foreignField: '_id',
as: 'product',
pipeline: [
{ $project: { category: 1, subcategory: 1, costBasis: 1, _id: 0 } }
]
}
},
{ $unwind: '$product' },
// Stage 5: Third lookup for region data
{
$lookup: {
from: 'regions',
localField: 'customer.regionId',
foreignField: '_id',
as: 'region',
pipeline: [
{ $project: { regionName: 1, _id: 0 } }
]
}
},
{ $unwind: '$region' },
// Stage 6: Add computed fields efficiently
{
$addFields: {
month: {
$dateFromParts: {
year: { $year: '$orderDate' },
month: { $month: '$orderDate' },
day: 1
}
},
itemRevenue: { $multiply: ['$items.quantity', '$items.unitPrice'] },
itemCost: { $multiply: ['$items.quantity', '$product.costBasis'] }
}
},
// Stage 7: Group for initial aggregation
{
$group: {
_id: {
region: '$region.regionName',
category: '$product.category',
subcategory: '$product.subcategory',
month: '$month'
},
grossRevenue: { $sum: '$itemRevenue' },
costOfGoods: { $sum: '$itemCost' },
uniqueCustomers: { $addToSet: '$customerId' },
totalOrders: { $sum: 1 }
}
},
// Stage 8: Transform unique customers to count
{
$addFields: {
uniqueCustomerCount: { $size: '$uniqueCustomers' }
}
},
// Stage 9: Project final structure
{
$project: {
region: '$_id.region',
category: '$_id.category',
subcategory: '$_id.subcategory',
month: '$_id.month',
grossRevenue: 1,
costOfGoods: 1,
profit: { $subtract: ['$grossRevenue', '$costOfGoods'] },
uniqueCustomerCount: 1,
totalOrders: 1,
revenuePerCustomer: {
$round: [
{ $divide: ['$grossRevenue', '$uniqueCustomerCount'] },
2
]
},
ordersPerCustomer: {
$round: [
{ $divide: ['$totalOrders', '$uniqueCustomerCount'] },
2
]
},
_id: 0
}
},
// Stage 10: Sort for consistent output
{
$sort: {
region: 1,
category: 1,
month: 1
}
},
// Stage 11: Add window functions for trend analysis
{
$setWindowFields: {
partitionBy: { region: '$region', category: '$category' },
sortBy: { month: 1 },
output: {
previousMonthRevenue: {
$shift: {
output: '$grossRevenue',
by: -1
}
},
medianMonthlyRevenue: {
$median: '$grossRevenue',
window: {
documents: ['unbounded preceding', 'unbounded following']
}
},
revenueGrowthPercent: {
$round: [
{
$multiply: [
{
$divide: [
{ $subtract: ['$grossRevenue', '$previousMonthRevenue'] },
'$previousMonthRevenue'
]
},
100
]
},
2
]
}
}
}
},
// Stage 12: Final filtering for recent months
{
$match: {
month: { $gte: ISODate('2024-06-01') }
}
}
], {
// Pipeline options for optimization
allowDiskUse: true, // Allow spilling to disk for large datasets
maxTimeMS: 300000, // 5 minute timeout
hint: { orderDate: 1, status: 1 }, // Suggest index usage
readConcern: { level: 'majority' } // Consistency level
});
// Benefits of optimized aggregation pipelines:
// - Early filtering reduces data volume through pipeline
// - Efficient $lookup stages with projected fields
// - Strategic index utilization
// - Memory-efficient processing with disk spilling
// - Native support for complex analytical operations
// - Horizontal scaling across shards
// - Flexible handling of nested/dynamic data
// - Built-in window functions for trend analysis
Understanding MongoDB Aggregation Pipeline Performance
Pipeline Stage Optimization and Ordering
Implement strategic pipeline stage ordering for optimal performance:
// Advanced aggregation pipeline optimization patterns
class AggregationOptimizer {
constructor(db) {
this.db = db;
this.performanceMetrics = new Map();
this.indexRecommendations = [];
}
async optimizeEarlyFiltering(collection, pipeline) {
// Move filtering stages as early as possible
const optimizedPipeline = [];
const filterStages = [];
const nonFilterStages = [];
// Separate filter stages from other stages
pipeline.forEach(stage => {
const stageType = Object.keys(stage)[0];
if (stageType === '$match' || stageType === '$limit') {
filterStages.push(stage);
} else {
nonFilterStages.push(stage);
}
});
// Early filtering reduces document flow through pipeline
optimizedPipeline.push(...filterStages);
optimizedPipeline.push(...nonFilterStages);
return optimizedPipeline;
}
async createProjectionOptimizedPipeline(baseCollection, lookupCollections, projections) {
// Optimize projections and lookups for minimal data transfer
return [
// Stage 1: Early projection to reduce document size
{
$project: {
// Only include fields needed for subsequent stages
...projections.baseFields,
// Include fields needed for lookups
...projections.lookupKeys
}
},
// Stage 2: Optimized lookups with sub-pipelines
...lookupCollections.map(lookup => ({
$lookup: {
from: lookup.collection,
localField: lookup.localField,
foreignField: lookup.foreignField,
as: lookup.as,
pipeline: [
// Project only needed fields in lookup
{ $project: lookup.projection },
// Add filters within lookup when possible
...(lookup.filters ? [{ $match: lookup.filters }] : [])
]
}
})),
// Stage 3: Unwind with null preservation for performance
...lookupCollections.map(lookup => ({
$unwind: {
path: `$${lookup.as}`,
preserveNullAndEmptyArrays: lookup.preserveNulls || false
}
})),
// Stage 4: Final projection after all joins
{
$project: projections.finalFields
}
];
}
async analyzeIndexUsage(collection, pipeline) {
// Analyze pipeline for index optimization opportunities
const explanation = await this.db.collection(collection).aggregate(
pipeline,
{ explain: true }
).toArray();
const indexAnalysis = {
stagesAnalyzed: [],
indexesUsed: [],
indexesRecommended: [],
performanceIssues: []
};
// Analyze each stage for index usage
explanation.forEach((stage, index) => {
const stageType = Object.keys(pipeline[index])[0];
const stageAnalysis = {
stage: index,
type: stageType,
indexUsed: false,
collectionScanned: false,
documentsExamined: 0,
documentsReturned: 0
};
if (stage.executionStats) {
stageAnalysis.indexUsed = stage.executionStats.executionTimeMillisEstimate < 100;
stageAnalysis.documentsExamined = stage.executionStats.totalDocsExamined;
stageAnalysis.documentsReturned = stage.executionStats.totalDocsReturned;
// Identify inefficient stages
if (stageAnalysis.documentsExamined > stageAnalysis.documentsReturned * 10) {
indexAnalysis.performanceIssues.push({
stage: index,
issue: 'high_document_examination_ratio',
ratio: stageAnalysis.documentsExamined / stageAnalysis.documentsReturned,
recommendation: 'Consider adding index for this stage'
});
}
}
indexAnalysis.stagesAnalyzed.push(stageAnalysis);
});
return indexAnalysis;
}
async createPerformanceOptimizedPipeline(collection, analyticsQuery) {
// Create comprehensive performance-optimized pipeline
const pipeline = [
// Stage 1: Efficient date range filtering with index
{
$match: {
[analyticsQuery.dateField]: {
$gte: analyticsQuery.startDate,
$lte: analyticsQuery.endDate
},
// Add compound index filters
...analyticsQuery.filters
}
},
// Stage 2: Early sampling for large datasets (if needed)
...(analyticsQuery.sampleSize ? [{
$sample: { size: analyticsQuery.sampleSize }
}] : []),
// Stage 3: Efficient faceted search
{
$facet: {
// Main aggregation pipeline
data: [
// Lookup with optimized sub-pipeline
{
$lookup: {
from: analyticsQuery.lookupCollection,
localField: analyticsQuery.localField,
foreignField: analyticsQuery.foreignField,
as: 'lookupData',
pipeline: [
{ $project: analyticsQuery.lookupProjection },
{ $limit: 1 } // Limit lookup results when appropriate
]
}
},
{ $unwind: '$lookupData' },
// Grouping with efficient accumulators
{
$group: {
_id: analyticsQuery.groupBy,
// Use $sum for counting instead of $addToSet when possible
totalCount: { $sum: 1 },
totalValue: { $sum: analyticsQuery.valueField },
averageValue: { $avg: analyticsQuery.valueField },
// Efficient min/max calculations
minValue: { $min: analyticsQuery.valueField },
maxValue: { $max: analyticsQuery.valueField },
// Use $push only when needed for arrays
...(analyticsQuery.collectArrays ? {
samples: { $push: analyticsQuery.sampleField }
} : {})
}
},
// Add calculated fields
{
$addFields: {
efficiency: {
$round: [
{ $divide: ['$totalValue', '$totalCount'] },
2
]
},
valueRange: { $subtract: ['$maxValue', '$minValue'] }
}
},
// Sort for consistent results
{ $sort: { totalValue: -1 } },
// Limit results to prevent memory issues
{ $limit: analyticsQuery.maxResults || 1000 }
],
// Metadata pipeline for counts and statistics
metadata: [
{
$group: {
_id: null,
totalDocuments: { $sum: 1 },
totalValue: { $sum: analyticsQuery.valueField },
avgValue: { $avg: analyticsQuery.valueField }
}
}
]
}
},
// Stage 4: Combine faceted results
{
$project: {
data: 1,
metadata: { $arrayElemAt: ['$metadata', 0] },
processingTimestamp: new Date()
}
}
];
return pipeline;
}
async benchmarkPipeline(collection, pipeline, options = {}) {
// Comprehensive pipeline performance benchmarking
const benchmarkResults = {
pipelineName: options.name || 'unnamed_pipeline',
startTime: new Date(),
stages: [],
totalExecutionTime: 0,
documentsProcessed: 0,
memoryUsage: 0,
indexesUsed: [],
recommendations: []
};
try {
// Get execution statistics
const startTime = Date.now();
const explanation = await this.db.collection(collection).aggregate(
pipeline,
{
explain: true,
allowDiskUse: true,
...options
}
).toArray();
// Analyze execution plan
explanation.forEach((stageExplan, index) => {
const stageBenchmark = {
stageIndex: index,
stageType: Object.keys(pipeline[index])[0],
executionTimeMs: stageExplan.executionStats?.executionTimeMillisEstimate || 0,
documentsIn: stageExplan.executionStats?.totalDocsExamined || 0,
documentsOut: stageExplan.executionStats?.totalDocsReturned || 0,
indexUsed: stageExplan.executionStats?.inputStage?.stage === 'IXSCAN',
memoryUsageBytes: stageExplan.executionStats?.memUsage || 0
};
benchmarkResults.stages.push(stageBenchmark);
benchmarkResults.totalExecutionTime += stageBenchmark.executionTimeMs;
benchmarkResults.memoryUsage += stageBenchmark.memoryUsageBytes;
});
// Run actual pipeline for real-world timing
const realStartTime = Date.now();
const results = await this.db.collection(collection).aggregate(
pipeline,
{ allowDiskUse: true, ...options }
).toArray();
const realExecutionTime = Date.now() - realStartTime;
benchmarkResults.realExecutionTime = realExecutionTime;
benchmarkResults.documentsProcessed = results.length;
// Generate recommendations
benchmarkResults.recommendations = this.generateOptimizationRecommendations(
benchmarkResults
);
} catch (error) {
benchmarkResults.error = error.message;
} finally {
benchmarkResults.endTime = new Date();
}
// Store benchmark results for comparison
this.performanceMetrics.set(benchmarkResults.pipelineName, benchmarkResults);
return benchmarkResults;
}
generateOptimizationRecommendations(benchmarkResults) {
const recommendations = [];
// Check for stages without index usage
benchmarkResults.stages.forEach((stage, index) => {
if (!stage.indexUsed && stage.documentsIn > 1000) {
recommendations.push({
type: 'index_recommendation',
stage: index,
message: `Consider adding index for stage ${index} (${stage.stageType})`,
priority: 'high',
potentialImprovement: 'significant'
});
}
if (stage.documentsIn > stage.documentsOut * 100) {
recommendations.push({
type: 'filtering_recommendation',
stage: index,
message: `Move filtering earlier in pipeline for stage ${index}`,
priority: 'medium',
potentialImprovement: 'moderate'
});
}
});
// Memory usage recommendations
if (benchmarkResults.memoryUsage > 100 * 1024 * 1024) { // 100MB
recommendations.push({
type: 'memory_optimization',
message: 'High memory usage detected - consider using allowDiskUse: true',
priority: 'medium',
potentialImprovement: 'prevents memory errors'
});
}
// Execution time recommendations
if (benchmarkResults.totalExecutionTime > 30000) { // 30 seconds
recommendations.push({
type: 'performance_optimization',
message: 'Long execution time - review pipeline optimization opportunities',
priority: 'high',
potentialImprovement: 'significant'
});
}
return recommendations;
}
async createIndexRecommendations(collection, commonPipelines) {
// Generate index recommendations based on common pipeline patterns
const recommendations = [];
for (const pipeline of commonPipelines) {
const analysis = await this.analyzeIndexUsage(collection, pipeline.stages);
pipeline.stages.forEach((stage, index) => {
const stageType = Object.keys(stage)[0];
switch (stageType) {
case '$match':
const matchFields = Object.keys(stage.$match);
if (matchFields.length > 0) {
recommendations.push({
type: 'compound_index',
collection: collection,
fields: matchFields,
reason: `Optimize $match stage ${index}`,
estimatedImprovement: 'high'
});
}
break;
case '$sort':
const sortFields = Object.keys(stage.$sort);
recommendations.push({
type: 'sort_index',
collection: collection,
fields: sortFields,
reason: `Optimize $sort stage ${index}`,
estimatedImprovement: 'high'
});
break;
case '$group':
const groupField = stage.$group._id;
if (typeof groupField === 'string' && groupField.startsWith('$')) {
recommendations.push({
type: 'grouping_index',
collection: collection,
fields: [groupField.substring(1)],
reason: `Optimize $group stage ${index}`,
estimatedImprovement: 'medium'
});
}
break;
}
});
}
// Deduplicate and prioritize recommendations
return this.prioritizeIndexRecommendations(recommendations);
}
prioritizeIndexRecommendations(recommendations) {
// Remove duplicates and prioritize by impact
const uniqueRecommendations = new Map();
recommendations.forEach(rec => {
const key = `${rec.collection}_${rec.fields.join('_')}`;
const existing = uniqueRecommendations.get(key);
if (!existing || this.getImpactScore(rec) > this.getImpactScore(existing)) {
uniqueRecommendations.set(key, rec);
}
});
return Array.from(uniqueRecommendations.values())
.sort((a, b) => this.getImpactScore(b) - this.getImpactScore(a));
}
getImpactScore(recommendation) {
const impactScores = {
high: 3,
medium: 2,
low: 1
};
return impactScores[recommendation.estimatedImprovement] || 0;
}
async generatePerformanceReport() {
// Generate comprehensive performance analysis report
const report = {
generatedAt: new Date(),
totalPipelinesAnalyzed: this.performanceMetrics.size,
performanceSummary: {
fastPipelines: 0, // < 1 second
moderatePipelines: 0, // 1-10 seconds
slowPipelines: 0 // > 10 seconds
},
topPerformers: [],
performanceIssues: [],
indexRecommendations: [],
overallRecommendations: []
};
// Analyze all benchmarked pipelines
for (const [name, metrics] of this.performanceMetrics.entries()) {
const executionTime = metrics.realExecutionTime || metrics.totalExecutionTime;
if (executionTime < 1000) {
report.performanceSummary.fastPipelines++;
} else if (executionTime < 10000) {
report.performanceSummary.moderatePipelines++;
} else {
report.performanceSummary.slowPipelines++;
}
// Identify top performers and issues
if (executionTime < 500 && metrics.documentsProcessed > 1000) {
report.topPerformers.push({
name: name,
executionTime: executionTime,
documentsProcessed: metrics.documentsProcessed,
efficiency: metrics.documentsProcessed / executionTime
});
}
if (executionTime > 30000 || metrics.memoryUsage > 500 * 1024 * 1024) {
report.performanceIssues.push({
name: name,
executionTime: executionTime,
memoryUsage: metrics.memoryUsage,
recommendations: metrics.recommendations
});
}
}
// Sort top performers by efficiency
report.topPerformers.sort((a, b) => b.efficiency - a.efficiency);
// Generate overall recommendations
if (report.performanceSummary.slowPipelines > 0) {
report.overallRecommendations.push(
'Multiple slow pipelines detected - prioritize optimization efforts'
);
}
if (this.indexRecommendations.length > 5) {
report.overallRecommendations.push(
'Consider implementing recommended indexes to improve query performance'
);
}
return report;
}
}
Memory Management and Disk Spilling
Implement efficient memory management for large aggregations:
// Advanced memory management and optimization strategies
class AggregationMemoryManager {
constructor(db) {
this.db = db;
this.memoryThresholds = {
warning: 100 * 1024 * 1024, // 100MB
critical: 500 * 1024 * 1024, // 500MB
maximum: 1024 * 1024 * 1024 // 1GB
};
}
async createMemoryEfficientPipeline(collection, aggregationConfig) {
// Design pipeline with memory efficiency in mind
const memoryOptimizedPipeline = [
// Stage 1: Early filtering to reduce dataset size
{
$match: {
...aggregationConfig.filters,
// Add indexed filters first
[aggregationConfig.dateField]: {
$gte: aggregationConfig.startDate,
$lte: aggregationConfig.endDate
}
}
},
// Stage 2: Project only necessary fields early
{
$project: {
// Include only fields needed for processing
...aggregationConfig.requiredFields,
// Exclude large text fields unless necessary
...(aggregationConfig.excludeFields.reduce((acc, field) => {
acc[field] = 0;
return acc;
}, {}))
}
},
// Stage 3: Use streaming-friendly operations
{
$group: {
_id: aggregationConfig.groupBy,
// Use memory-efficient accumulators
count: { $sum: 1 },
totalValue: { $sum: aggregationConfig.valueField },
// Avoid $addToSet for large arrays - use $mergeObjects for smaller sets
...(aggregationConfig.collectSets && aggregationConfig.expectedSetSize < 1000 ? {
uniqueValues: { $addToSet: aggregationConfig.setField }
} : {}),
// Use $first/$last instead of $push for single values
firstValue: { $first: aggregationConfig.valueField },
lastValue: { $last: aggregationConfig.valueField },
// Calculated fields at group level to avoid later processing
averageValue: { $avg: aggregationConfig.valueField }
}
},
// Stage 4: Add computed fields efficiently
{
$addFields: {
efficiency: {
$cond: {
if: { $gt: ['$count', 0] },
then: { $divide: ['$totalValue', '$count'] },
else: 0
}
},
// Avoid complex calculations on large arrays
setSize: {
$cond: {
if: { $isArray: '$uniqueValues' },
then: { $size: '$uniqueValues' },
else: 0
}
}
}
},
// Stage 5: Sort with limit to prevent large result sets
{ $sort: { totalValue: -1 } },
{ $limit: aggregationConfig.maxResults || 10000 },
// Stage 6: Final projection to minimize output size
{
$project: {
groupKey: '$_id',
metrics: {
count: '$count',
totalValue: '$totalValue',
averageValue: '$averageValue',
efficiency: '$efficiency'
},
_id: 0
}
}
];
return memoryOptimizedPipeline;
}
async processLargeDatasetWithBatching(collection, pipeline, batchConfig) {
// Process large datasets in batches to manage memory
const results = [];
const batchSize = batchConfig.batchSize || 10000;
const totalBatches = Math.ceil(batchConfig.totalDocuments / batchSize);
console.log(`Processing ${batchConfig.totalDocuments} documents in ${totalBatches} batches`);
for (let batch = 0; batch < totalBatches; batch++) {
const skip = batch * batchSize;
const batchPipeline = [
// Add skip and limit for batching
{ $skip: skip },
{ $limit: batchSize },
// Original pipeline stages
...pipeline
];
try {
const batchResults = await this.db.collection(collection).aggregate(
batchPipeline,
{
allowDiskUse: true,
maxTimeMS: 60000, // 1 minute per batch
readConcern: { level: 'available' } // Use available for better performance
}
).toArray();
results.push(...batchResults);
console.log(`Completed batch ${batch + 1}/${totalBatches} (${batchResults.length} results)`);
// Optional: Add delay between batches to reduce load
if (batchConfig.delayMs && batch < totalBatches - 1) {
await new Promise(resolve => setTimeout(resolve, batchConfig.delayMs));
}
} catch (error) {
console.error(`Batch ${batch + 1} failed:`, error.message);
// Optionally continue with remaining batches
if (batchConfig.continueOnError) {
continue;
} else {
throw error;
}
}
}
return results;
}
async createStreamingAggregation(collection, pipeline, outputHandler) {
// Create streaming aggregation for real-time processing
const cursor = this.db.collection(collection).aggregate(pipeline, {
allowDiskUse: true,
batchSize: 1000, // Small batch size for streaming
readConcern: { level: 'available' }
});
const streamingStats = {
documentsProcessed: 0,
startTime: new Date(),
memoryPeakUsage: 0,
batchesProcessed: 0
};
try {
while (await cursor.hasNext()) {
const document = await cursor.next();
// Process document through handler
await outputHandler(document, streamingStats);
streamingStats.documentsProcessed++;
// Monitor memory usage (approximate)
if (streamingStats.documentsProcessed % 1000 === 0) {
const memoryUsage = process.memoryUsage();
streamingStats.memoryPeakUsage = Math.max(
streamingStats.memoryPeakUsage,
memoryUsage.heapUsed
);
console.log(`Processed ${streamingStats.documentsProcessed} documents, Memory: ${Math.round(memoryUsage.heapUsed / 1024 / 1024)}MB`);
}
}
} finally {
await cursor.close();
streamingStats.endTime = new Date();
streamingStats.totalProcessingTime = streamingStats.endTime - streamingStats.startTime;
}
return streamingStats;
}
async optimizePipelineForLargeArrays(collection, pipeline, arrayOptimizations) {
// Optimize pipelines that work with large arrays
const optimizedPipeline = [];
pipeline.forEach((stage, index) => {
const stageType = Object.keys(stage)[0];
switch (stageType) {
case '$unwind':
// Add preserveNullAndEmptyArrays and includeArrayIndex for efficiency
optimizedPipeline.push({
$unwind: {
path: stage.$unwind.path || stage.$unwind,
preserveNullAndEmptyArrays: true,
includeArrayIndex: `${stage.$unwind.path || stage.$unwind}_index`
}
});
break;
case '$group':
// Optimize group operations for array handling
const groupStage = { ...stage };
// Replace $addToSet with $mergeObjects for better performance when possible
Object.keys(groupStage.$group).forEach(key => {
if (key !== '_id') {
const accumulator = groupStage.$group[key];
if (accumulator.$addToSet && arrayOptimizations.convertAddToSetToMerge) {
// Convert to more efficient operation when possible
groupStage.$group[key] = { $push: accumulator.$addToSet };
}
}
});
optimizedPipeline.push(groupStage);
break;
case '$project':
// Optimize array operations in projection
const projectStage = { ...stage };
Object.keys(projectStage.$project).forEach(key => {
const projection = projectStage.$project[key];
// Replace array operations with more efficient alternatives
if (projection && typeof projection === 'object' && projection.$size) {
// $size can be expensive on very large arrays
if (arrayOptimizations.approximateArraySizes) {
projectStage.$project[`${key}_approx`] = {
$cond: {
if: { $isArray: projection.$size },
then: { $min: [{ $size: projection.$size }, 10000] }, // Cap at 10k
else: 0
}
};
}
}
});
optimizedPipeline.push(projectStage);
break;
default:
optimizedPipeline.push(stage);
}
});
// Add array-specific optimizations
if (arrayOptimizations.limitArrayProcessing) {
// Add $limit stages after $unwind to prevent processing too many array elements
optimizedPipeline.forEach((stage, index) => {
if (stage.$unwind && index < optimizedPipeline.length - 1) {
optimizedPipeline.splice(index + 1, 0, {
$limit: arrayOptimizations.maxArrayElements || 100000
});
}
});
}
return optimizedPipeline;
}
async monitorAggregationPerformance(collection, pipeline, options = {}) {
// Comprehensive performance monitoring for aggregations
const performanceMonitor = {
startTime: new Date(),
memorySnapshots: [],
stageTimings: [],
resourceUsage: {
cpuStart: process.cpuUsage(),
memoryStart: process.memoryUsage()
}
};
// Function to take memory snapshots
const takeMemorySnapshot = () => {
const memoryUsage = process.memoryUsage();
performanceMonitor.memorySnapshots.push({
timestamp: new Date(),
heapUsed: memoryUsage.heapUsed,
heapTotal: memoryUsage.heapTotal,
external: memoryUsage.external,
rss: memoryUsage.rss
});
};
// Take initial snapshot
takeMemorySnapshot();
try {
let results;
if (options.explain) {
// Get execution plan with timing
results = await this.db.collection(collection).aggregate(
pipeline,
{
explain: true,
allowDiskUse: options.allowDiskUse || true,
maxTimeMS: options.maxTimeMS || 300000
}
).toArray();
// Analyze execution plan
results.forEach((stageExplan, index) => {
performanceMonitor.stageTimings.push({
stage: index,
type: Object.keys(pipeline[index])[0],
executionTimeMs: stageExplan.executionStats?.executionTimeMillisEstimate || 0,
documentsIn: stageExplan.executionStats?.totalDocsExamined || 0,
documentsOut: stageExplan.executionStats?.totalDocsReturned || 0
});
});
} else {
// Execute actual pipeline with monitoring
const monitoringInterval = setInterval(takeMemorySnapshot, 5000); // Every 5 seconds
try {
results = await this.db.collection(collection).aggregate(
pipeline,
{
allowDiskUse: options.allowDiskUse || true,
maxTimeMS: options.maxTimeMS || 300000,
batchSize: options.batchSize || 1000
}
).toArray();
} finally {
clearInterval(monitoringInterval);
}
}
// Take final snapshot
takeMemorySnapshot();
// Calculate performance metrics
const endTime = new Date();
const totalTime = endTime - performanceMonitor.startTime;
const finalCpuUsage = process.cpuUsage(performanceMonitor.resourceUsage.cpuStart);
const finalMemoryUsage = process.memoryUsage();
performanceMonitor.summary = {
totalExecutionTime: totalTime,
documentsReturned: results.length,
avgMemoryUsage: performanceMonitor.memorySnapshots.reduce(
(sum, snapshot) => sum + snapshot.heapUsed, 0
) / performanceMonitor.memorySnapshots.length,
peakMemoryUsage: Math.max(
...performanceMonitor.memorySnapshots.map(s => s.heapUsed)
),
cpuUserTime: finalCpuUsage.user / 1000, // Convert to milliseconds
cpuSystemTime: finalCpuUsage.system / 1000,
memoryDifference: finalMemoryUsage.heapUsed - performanceMonitor.resourceUsage.memoryStart.heapUsed
};
return {
results: results,
performanceData: performanceMonitor
};
} catch (error) {
performanceMonitor.error = error.message;
throw error;
}
}
async optimizeForShardedCollection(collection, pipeline, shardingConfig) {
// Optimize pipeline for sharded collections
const shardOptimizedPipeline = [];
// Add shard key filtering early if possible
if (shardingConfig.shardKey && shardingConfig.shardKeyValues) {
shardOptimizedPipeline.push({
$match: {
[shardingConfig.shardKey]: {
$in: shardingConfig.shardKeyValues
}
}
});
}
pipeline.forEach((stage, index) => {
const stageType = Object.keys(stage)[0];
switch (stageType) {
case '$group':
// Ensure group operations can be parallelized across shards
const groupStage = { ...stage };
// Add shard key to group _id when possible for better parallelization
if (typeof groupStage.$group._id === 'object' && shardingConfig.includeShardKeyInGroup) {
groupStage.$group._id[shardingConfig.shardKey] = `$${shardingConfig.shardKey}`;
}
shardOptimizedPipeline.push(groupStage);
break;
case '$sort':
// Optimize sort for sharded collections
const sortStage = { ...stage };
// Include shard key in sort to prevent scatter-gather when possible
if (shardingConfig.includeShardKeyInSort) {
sortStage.$sort = {
[shardingConfig.shardKey]: 1,
...sortStage.$sort
};
}
shardOptimizedPipeline.push(sortStage);
break;
case '$lookup':
// Optimize lookups for sharded collections
const lookupStage = { ...stage };
// Add hint to use shard key when doing lookups
if (shardingConfig.optimizeLookups) {
lookupStage.$lookup.pipeline = lookupStage.$lookup.pipeline || [];
lookupStage.$lookup.pipeline.unshift({
$match: {
// Add efficient filters in lookup pipeline
}
});
}
shardOptimizedPipeline.push(lookupStage);
break;
default:
shardOptimizedPipeline.push(stage);
}
});
return shardOptimizedPipeline;
}
}
Advanced Aggregation Patterns and Optimizations
Complex Analytics with Window Functions
Implement sophisticated analytics using MongoDB's window functions:
// Advanced analytics patterns with window functions and time-series analysis
class AdvancedAnalyticsEngine {
constructor(db) {
this.db = db;
this.analysisCache = new Map();
}
async createTimeSeriesAnalysisPipeline(collection, timeSeriesConfig) {
// Advanced time-series analysis with window functions
return [
// Stage 1: Filter and prepare time series data
{
$match: {
[timeSeriesConfig.timestampField]: {
$gte: timeSeriesConfig.startDate,
$lte: timeSeriesConfig.endDate
},
...timeSeriesConfig.filters
}
},
// Stage 2: Add time bucket fields for grouping
{
$addFields: {
timeBucket: {
$dateTrunc: {
date: `$${timeSeriesConfig.timestampField}`,
unit: timeSeriesConfig.timeUnit, // 'hour', 'day', 'week', 'month'
binSize: timeSeriesConfig.binSize || 1
}
},
// Extract time components for analysis
hour: { $hour: `$${timeSeriesConfig.timestampField}` },
dayOfWeek: { $dayOfWeek: `$${timeSeriesConfig.timestampField}` },
dayOfMonth: { $dayOfMonth: `$${timeSeriesConfig.timestampField}` },
month: { $month: `$${timeSeriesConfig.timestampField}` },
year: { $year: `$${timeSeriesConfig.timestampField}` }
}
},
// Stage 3: Group by time bucket and dimensions
{
$group: {
_id: {
timeBucket: '$timeBucket',
// Add dimensional grouping
...timeSeriesConfig.dimensions.reduce((acc, dim) => {
acc[dim] = `$${dim}`;
return acc;
}, {})
},
// Aggregate metrics
totalValue: { $sum: `$${timeSeriesConfig.valueField}` },
count: { $sum: 1 },
averageValue: { $avg: `$${timeSeriesConfig.valueField}` },
minValue: { $min: `$${timeSeriesConfig.valueField}` },
maxValue: { $max: `$${timeSeriesConfig.valueField}` },
// Collect samples for percentile calculations
values: { $push: `$${timeSeriesConfig.valueField}` },
// Time pattern analysis
hourDistribution: {
$push: {
hour: '$hour',
value: `$${timeSeriesConfig.valueField}`
}
}
}
},
// Stage 4: Add calculated fields and percentiles
{
$addFields: {
// Calculate percentiles from collected values
p50: { $percentile: { input: '$values', p: [0.5], method: 'approximate' } },
p90: { $percentile: { input: '$values', p: [0.9], method: 'approximate' } },
p95: { $percentile: { input: '$values', p: [0.95], method: 'approximate' } },
p99: { $percentile: { input: '$values', p: [0.99], method: 'approximate' } },
// Calculate variance and standard deviation
variance: { $stdDevPop: '$values' },
// Calculate value range
valueRange: { $subtract: ['$maxValue', '$minValue'] },
// Calculate coefficient of variation
coefficientOfVariation: {
$cond: {
if: { $gt: ['$averageValue', 0] },
then: {
$divide: [
{ $stdDevPop: '$values' },
'$averageValue'
]
},
else: 0
}
}
}
},
// Stage 5: Sort by time for window function processing
{
$sort: {
'_id.timeBucket': 1,
...timeSeriesConfig.dimensions.reduce((acc, dim) => {
acc[`_id.${dim}`] = 1;
return acc;
}, {})
}
},
// Stage 6: Apply window functions for trend analysis
{
$setWindowFields: {
partitionBy: timeSeriesConfig.dimensions.reduce((acc, dim) => {
acc[dim] = `$_id.${dim}`;
return acc;
}, {}),
sortBy: { '_id.timeBucket': 1 },
output: {
// Moving averages
movingAvg7: {
$avg: '$totalValue',
window: {
documents: [-6, 0] // 7-period moving average
}
},
movingAvg30: {
$avg: '$totalValue',
window: {
documents: [-29, 0] // 30-period moving average
}
},
// Growth calculations
previousPeriodValue: {
$shift: {
output: '$totalValue',
by: -1
}
},
// Cumulative calculations
cumulativeSum: {
$sum: '$totalValue',
window: {
documents: ['unbounded preceding', 'current']
}
},
// Rank and dense rank
valueRank: {
$rank: {},
window: {
documents: ['unbounded preceding', 'unbounded following']
}
},
// Min/Max within window
windowMin: {
$min: '$totalValue',
window: {
documents: [-6, 6] // 13-period window
}
},
windowMax: {
$max: '$totalValue',
window: {
documents: [-6, 6] // 13-period window
}
},
// Calculate period-over-period changes
periodChange: {
$subtract: [
'$totalValue',
{ $shift: { output: '$totalValue', by: -1 } }
]
},
// Volatility measures
volatility: {
$stdDevPop: '$totalValue',
window: {
documents: [-29, 0] // 30-period volatility
}
}
}
}
},
// Stage 7: Calculate derived metrics
{
$addFields: {
// Growth rates
periodGrowthRate: {
$cond: {
if: { $gt: ['$previousPeriodValue', 0] },
then: {
$multiply: [
{ $divide: ['$periodChange', '$previousPeriodValue'] },
100
]
},
else: null
}
},
// Trend indicators
trendDirection: {
$cond: {
if: { $gt: ['$totalValue', '$movingAvg7'] },
then: 'up',
else: {
$cond: {
if: { $lt: ['$totalValue', '$movingAvg7'] },
then: 'down',
else: 'stable'
}
}
}
},
// Anomaly detection (simple z-score based)
zScore: {
$cond: {
if: { $gt: ['$volatility', 0] },
then: {
$divide: [
{ $subtract: ['$totalValue', '$movingAvg30'] },
'$volatility'
]
},
else: 0
}
},
// Position within window range
positionInRange: {
$cond: {
if: { $gt: [{ $subtract: ['$windowMax', '$windowMin'] }, 0] },
then: {
$multiply: [
{
$divide: [
{ $subtract: ['$totalValue', '$windowMin'] },
{ $subtract: ['$windowMax', '$windowMin'] }
]
},
100
]
},
else: 50
}
}
}
},
// Stage 8: Add anomaly flags
{
$addFields: {
isAnomaly: {
$or: [
{ $gt: ['$zScore', 2.5] }, // High anomaly
{ $lt: ['$zScore', -2.5] } // Low anomaly
]
},
anomalyLevel: {
$cond: {
if: { $gt: [{ $abs: '$zScore' }, 3] },
then: 'extreme',
else: {
$cond: {
if: { $gt: [{ $abs: '$zScore' }, 2] },
then: 'high',
else: 'normal'
}
}
}
}
}
},
// Stage 9: Final projection with clean structure
{
$project: {
// Time dimension
timeBucket: '$_id.timeBucket',
// Other dimensions
...timeSeriesConfig.dimensions.reduce((acc, dim) => {
acc[dim] = `$_id.${dim}`;
return acc;
}, {}),
// Core metrics
metrics: {
totalValue: { $round: ['$totalValue', 2] },
count: '$count',
averageValue: { $round: ['$averageValue', 2] },
minValue: '$minValue',
maxValue: '$maxValue',
valueRange: '$valueRange'
},
// Statistical measures
statistics: {
p50: { $arrayElemAt: ['$p50', 0] },
p90: { $arrayElemAt: ['$p90', 0] },
p95: { $arrayElemAt: ['$p95', 0] },
p99: { $arrayElemAt: ['$p99', 0] },
variance: { $round: ['$variance', 2] },
coefficientOfVariation: { $round: ['$coefficientOfVariation', 4] }
},
// Trend analysis
trends: {
movingAvg7: { $round: ['$movingAvg7', 2] },
movingAvg30: { $round: ['$movingAvg30', 2] },
periodChange: { $round: ['$periodChange', 2] },
periodGrowthRate: { $round: ['$periodGrowthRate', 2] },
trendDirection: '$trendDirection',
cumulativeSum: { $round: ['$cumulativeSum', 2] }
},
// Anomaly detection
anomalies: {
zScore: { $round: ['$zScore', 3] },
isAnomaly: '$isAnomaly',
anomalyLevel: '$anomalyLevel',
positionInRange: { $round: ['$positionInRange', 1] }
},
// Rankings
rankings: {
valueRank: '$valueRank',
volatility: { $round: ['$volatility', 2] }
},
_id: 0
}
},
// Stage 10: Sort final results
{
$sort: {
timeBucket: 1,
...timeSeriesConfig.dimensions.reduce((acc, dim) => {
acc[dim] = 1;
return acc;
}, {})
}
}
];
}
async createCohortAnalysisPipeline(collection, cohortConfig) {
// Advanced cohort analysis for user behavior tracking
return [
// Stage 1: Filter and prepare user event data
{
$match: {
[cohortConfig.eventDateField]: {
$gte: cohortConfig.startDate,
$lte: cohortConfig.endDate
},
[cohortConfig.eventTypeField]: { $in: cohortConfig.eventTypes }
}
},
// Stage 2: Determine cohort assignment based on first event
{
$group: {
_id: `$${cohortConfig.userIdField}`,
firstEventDate: { $min: `$${cohortConfig.eventDateField}` },
allEvents: {
$push: {
eventDate: `$${cohortConfig.eventDateField}`,
eventType: `$${cohortConfig.eventTypeField}`,
eventValue: `$${cohortConfig.valueField}`
}
}
}
},
// Stage 3: Add cohort period (week/month of first event)
{
$addFields: {
cohortPeriod: {
$dateTrunc: {
date: '$firstEventDate',
unit: cohortConfig.cohortTimeUnit, // 'week' or 'month'
binSize: 1
}
}
}
},
// Stage 4: Unwind events for period analysis
{ $unwind: '$allEvents' },
// Stage 5: Calculate periods since cohort start
{
$addFields: {
periodsSinceCohort: {
$floor: {
$divide: [
{ $subtract: ['$allEvents.eventDate', '$firstEventDate'] },
cohortConfig.cohortTimeUnit === 'week' ? 604800000 : 2629746000 // ms in week/month
]
}
}
}
},
// Stage 6: Group by cohort and period for retention analysis
{
$group: {
_id: {
cohortPeriod: '$cohortPeriod',
periodNumber: '$periodsSinceCohort'
},
// Cohort metrics
activeUsers: { $addToSet: '$_id' }, // Unique users active in this period
totalEvents: { $sum: 1 },
totalValue: { $sum: '$allEvents.eventValue' },
// Event type breakdown
eventTypeBreakdown: {
$push: {
eventType: '$allEvents.eventType',
value: '$allEvents.eventValue'
}
}
}
},
// Stage 7: Calculate active user counts
{
$addFields: {
activeUserCount: { $size: '$activeUsers' }
}
},
// Stage 8: Get cohort size (period 0 users) for retention calculation
{
$lookup: {
from: collection,
let: {
cohortPeriod: '$_id.cohortPeriod'
},
pipeline: [
{
$match: {
$expr: {
$and: [
{ $eq: ['$$cohortPeriod', '$_id.cohortPeriod'] },
{ $eq: ['$_id.periodNumber', 0] }
]
}
}
},
{
$project: {
cohortSize: '$activeUserCount',
_id: 0
}
}
],
as: 'cohortSizeData'
}
},
// Stage 9: Calculate retention rates
{
$addFields: {
cohortSize: {
$ifNull: [
{ $arrayElemAt: ['$cohortSizeData.cohortSize', 0] },
'$activeUserCount' // Use current count if period 0 data not found
]
}
}
},
{
$addFields: {
retentionRate: {
$cond: {
if: { $gt: ['$cohortSize', 0] },
then: {
$round: [
{ $multiply: [{ $divide: ['$activeUserCount', '$cohortSize'] }, 100] },
2
]
},
else: 0
}
}
}
},
// Stage 10: Add cohort analysis metrics
{
$addFields: {
// Average events per user
eventsPerUser: {
$cond: {
if: { $gt: ['$activeUserCount', 0] },
then: { $round: [{ $divide: ['$totalEvents', '$activeUserCount'] }, 2] },
else: 0
}
},
// Average value per user
valuePerUser: {
$cond: {
if: { $gt: ['$activeUserCount', 0] },
then: { $round: [{ $divide: ['$totalValue', '$activeUserCount'] }, 2] },
else: 0
}
},
// Average value per event
valuePerEvent: {
$cond: {
if: { $gt: ['$totalEvents', 0] },
then: { $round: [{ $divide: ['$totalValue', '$totalEvents'] }, 2] },
else: 0
}
}
}
},
// Stage 11: Group event types for analysis
{
$addFields: {
eventTypeSummary: {
$reduce: {
input: '$eventTypeBreakdown',
initialValue: {},
in: {
$mergeObjects: [
'$$value',
{
$arrayToObject: [{
k: '$$this.eventType',
v: {
$add: [
{ $ifNull: [{ $getField: { field: '$$this.eventType', input: '$$value' } }, 0] },
'$$this.value'
]
}
}]
}
]
}
}
}
}
},
// Stage 12: Final projection
{
$project: {
cohortPeriod: '$_id.cohortPeriod',
periodNumber: '$_id.periodNumber',
cohortSize: '$cohortSize',
activeUsers: '$activeUserCount',
retentionRate: '$retentionRate',
engagement: {
totalEvents: '$totalEvents',
eventsPerUser: '$eventsPerUser',
totalValue: { $round: ['$totalValue', 2] },
valuePerUser: '$valuePerUser',
valuePerEvent: '$valuePerEvent'
},
eventBreakdown: '$eventTypeSummary',
// Cohort health indicators
healthIndicators: {
isHealthyCohort: { $gte: ['$retentionRate', cohortConfig.healthyRetentionThreshold || 20] },
engagementLevel: {
$cond: {
if: { $gte: ['$eventsPerUser', cohortConfig.highEngagementThreshold || 5] },
then: 'high',
else: {
$cond: {
if: { $gte: ['$eventsPerUser', cohortConfig.mediumEngagementThreshold || 2] },
then: 'medium',
else: 'low'
}
}
}
}
},
_id: 0
}
},
// Stage 13: Sort results
{
$sort: {
cohortPeriod: 1,
periodNumber: 1
}
}
];
}
async createAdvancedRFMAnalysis(collection, rfmConfig) {
// RFM (Recency, Frequency, Monetary) analysis for customer segmentation
return [
// Stage 1: Filter customer transactions
{
$match: {
[rfmConfig.transactionDateField]: {
$gte: rfmConfig.analysisStartDate,
$lte: rfmConfig.analysisEndDate
},
[rfmConfig.amountField]: { $gt: 0 }
}
},
// Stage 2: Calculate RFM metrics per customer
{
$group: {
_id: `$${rfmConfig.customerIdField}`,
// Recency: Days since last transaction
lastTransactionDate: { $max: `$${rfmConfig.transactionDateField}` },
// Frequency: Number of transactions
transactionCount: { $sum: 1 },
// Monetary: Total transaction value
totalSpent: { $sum: `$${rfmConfig.amountField}` },
// Additional metrics
averageTransactionValue: { $avg: `$${rfmConfig.amountField}` },
firstTransactionDate: { $min: `$${rfmConfig.transactionDateField}` },
// Transaction patterns
transactions: {
$push: {
date: `$${rfmConfig.transactionDateField}`,
amount: `$${rfmConfig.amountField}`
}
}
}
},
// Stage 3: Calculate recency in days
{
$addFields: {
recencyDays: {
$floor: {
$divide: [
{ $subtract: [rfmConfig.currentDate, '$lastTransactionDate'] },
86400000 // milliseconds in a day
]
}
},
customerLifetimeDays: {
$floor: {
$divide: [
{ $subtract: ['$lastTransactionDate', '$firstTransactionDate'] },
86400000
]
}
}
}
},
// Stage 4: Calculate percentiles for scoring using window functions
{
$setWindowFields: {
sortBy: { recencyDays: 1 },
output: {
recencyPercentile: {
$percentRank: {},
window: {
documents: ['unbounded preceding', 'unbounded following']
}
}
}
}
},
{
$setWindowFields: {
sortBy: { transactionCount: 1 },
output: {
frequencyPercentile: {
$percentRank: {},
window: {
documents: ['unbounded preceding', 'unbounded following']
}
}
}
}
},
{
$setWindowFields: {
sortBy: { totalSpent: 1 },
output: {
monetaryPercentile: {
$percentRank: {},
window: {
documents: ['unbounded preceding', 'unbounded following']
}
}
}
}
},
// Stage 5: Calculate RFM scores (1-5 scale)
{
$addFields: {
recencyScore: {
$cond: {
if: { $lte: ['$recencyPercentile', 0.2] },
then: 5, // Most recent customers get highest score
else: {
$cond: {
if: { $lte: ['$recencyPercentile', 0.4] },
then: 4,
else: {
$cond: {
if: { $lte: ['$recencyPercentile', 0.6] },
then: 3,
else: {
$cond: {
if: { $lte: ['$recencyPercentile', 0.8] },
then: 2,
else: 1
}
}
}
}
}
}
}
},
frequencyScore: {
$cond: {
if: { $gte: ['$frequencyPercentile', 0.8] },
then: 5,
else: {
$cond: {
if: { $gte: ['$frequencyPercentile', 0.6] },
then: 4,
else: {
$cond: {
if: { $gte: ['$frequencyPercentile', 0.4] },
then: 3,
else: {
$cond: {
if: { $gte: ['$frequencyPercentile', 0.2] },
then: 2,
else: 1
}
}
}
}
}
}
}
},
monetaryScore: {
$cond: {
if: { $gte: ['$monetaryPercentile', 0.8] },
then: 5,
else: {
$cond: {
if: { $gte: ['$monetaryPercentile', 0.6] },
then: 4,
else: {
$cond: {
if: { $gte: ['$monetaryPercentile', 0.4] },
then: 3,
else: {
$cond: {
if: { $gte: ['$monetaryPercentile', 0.2] },
then: 2,
else: 1
}
}
}
}
}
}
}
}
}
},
// Stage 6: Create combined RFM score and segment
{
$addFields: {
rfmScore: {
$concat: [
{ $toString: '$recencyScore' },
{ $toString: '$frequencyScore' },
{ $toString: '$monetaryScore' }
]
},
// Calculate overall customer value score
customerValueScore: {
$round: [
{
$add: [
{ $multiply: ['$recencyScore', rfmConfig.recencyWeight || 0.3] },
{ $multiply: ['$frequencyScore', rfmConfig.frequencyWeight || 0.3] },
{ $multiply: ['$monetaryScore', rfmConfig.monetaryWeight || 0.4] }
]
},
2
]
}
}
},
// Stage 7: Assign customer segments
{
$addFields: {
customerSegment: {
$switch: {
branches: [
{
case: {
$and: [
{ $gte: ['$recencyScore', 4] },
{ $gte: ['$frequencyScore', 4] },
{ $gte: ['$monetaryScore', 4] }
]
},
then: 'Champions'
},
{
case: {
$and: [
{ $gte: ['$recencyScore', 3] },
{ $gte: ['$frequencyScore', 3] },
{ $gte: ['$monetaryScore', 4] }
]
},
then: 'Loyal Customers'
},
{
case: {
$and: [
{ $gte: ['$recencyScore', 4] },
{ $lte: ['$frequencyScore', 2] },
{ $gte: ['$monetaryScore', 3] }
]
},
then: 'Potential Loyalists'
},
{
case: {
$and: [
{ $gte: ['$recencyScore', 4] },
{ $lte: ['$frequencyScore', 1] },
{ $lte: ['$monetaryScore', 1] }
]
},
then: 'New Customers'
},
{
case: {
$and: [
{ $gte: ['$recencyScore', 3] },
{ $lte: ['$frequencyScore', 3] },
{ $gte: ['$monetaryScore', 3] }
]
},
then: 'Promising'
},
{
case: {
$and: [
{ $lte: ['$recencyScore', 2] },
{ $gte: ['$frequencyScore', 3] },
{ $gte: ['$monetaryScore', 3] }
]
},
then: 'Need Attention'
},
{
case: {
$and: [
{ $lte: ['$recencyScore', 2] },
{ $lte: ['$frequencyScore', 2] },
{ $gte: ['$monetaryScore', 3] }
]
},
then: 'About to Sleep'
},
{
case: {
$and: [
{ $lte: ['$recencyScore', 2] },
{ $gte: ['$frequencyScore', 4] },
{ $lte: ['$monetaryScore', 2] }
]
},
then: 'At Risk'
},
{
case: {
$and: [
{ $lte: ['$recencyScore', 1] },
{ $gte: ['$frequencyScore', 4] },
{ $gte: ['$monetaryScore', 4] }
]
},
then: 'Cannot Lose Them'
},
{
case: {
$and: [
{ $eq: ['$recencyScore', 3] },
{ $eq: ['$frequencyScore', 1] },
{ $eq: ['$monetaryScore', 1] }
]
},
then: 'Hibernating'
}
],
default: 'Lost Customers'
}
}
}
},
// Stage 8: Add customer insights and recommendations
{
$addFields: {
insights: {
daysSinceLastPurchase: '$recencyDays',
lifetimeValue: { $round: ['$totalSpent', 2] },
averageOrderValue: { $round: ['$averageTransactionValue', 2] },
purchaseFrequency: {
$cond: {
if: { $gt: ['$customerLifetimeDays', 0] },
then: {
$round: [
{ $divide: ['$transactionCount', { $divide: ['$customerLifetimeDays', 30] }] },
2
]
},
else: 0
}
},
// Customer lifecycle stage
lifecycleStage: {
$cond: {
if: { $lte: ['$customerLifetimeDays', 30] },
then: 'New',
else: {
$cond: {
if: { $lte: ['$customerLifetimeDays', 180] },
then: 'Developing',
else: {
$cond: {
if: { $lte: ['$recencyDays', 90] },
then: 'Established',
else: 'Declining'
}
}
}
}
}
}
},
// Marketing recommendations
recommendations: {
$switch: {
branches: [
{
case: { $eq: ['$customerSegment', 'Champions'] },
then: ['Reward loyalty', 'VIP treatment', 'Brand advocacy program']
},
{
case: { $eq: ['$customerSegment', 'New Customers'] },
then: ['Onboarding campaign', 'Product education', 'Early engagement']
},
{
case: { $eq: ['$customerSegment', 'At Risk'] },
then: ['Win-back campaign', 'Special offers', 'Survey for feedback']
},
{
case: { $eq: ['$customerSegment', 'Lost Customers'] },
then: ['Aggressive win-back offers', 'Product updates', 'Reactivation campaign']
}
],
default: ['Standard marketing', 'Regular engagement']
}
}
}
},
// Stage 9: Final projection
{
$project: {
customerId: '$_id',
rfmScores: {
recency: '$recencyScore',
frequency: '$frequencyScore',
monetary: '$monetaryScore',
combined: '$rfmScore',
customerValue: '$customerValueScore'
},
segment: '$customerSegment',
insights: '$insights',
recommendations: '$recommendations',
rawMetrics: {
recencyDays: '$recencyDays',
transactionCount: '$transactionCount',
totalSpent: { $round: ['$totalSpent', 2] },
averageTransactionValue: { $round: ['$averageTransactionValue', 2] },
customerLifetimeDays: '$customerLifetimeDays'
},
_id: 0
}
},
// Stage 10: Sort by customer value score
{
$sort: {
'rfmScores.customerValue': -1,
'rawMetrics.totalSpent': -1
}
}
];
}
}
SQL-Style Aggregation Optimization with QueryLeaf
QueryLeaf provides familiar SQL approaches to MongoDB aggregation optimization:
-- QueryLeaf aggregation optimization with SQL-style syntax
-- Optimized complex analytics query with early filtering
WITH filtered_data AS (
SELECT *
FROM orders
WHERE order_date >= '2024-01-01'
AND order_date <= '2024-12-31'
AND status IN ('completed', 'shipped')
-- QueryLeaf optimizes this to use compound index on (order_date, status)
),
enriched_data AS (
SELECT
o.*,
c.region_id,
c.customer_segment,
r.region_name,
oi.product_id,
oi.quantity,
oi.unit_price,
p.category,
p.subcategory,
p.cost_basis,
-- Calculate metrics early in pipeline
(oi.quantity * oi.unit_price) as item_revenue,
(oi.quantity * p.cost_basis) as item_cost
FROM filtered_data o
-- QueryLeaf optimizes joins with $lookup sub-pipelines
JOIN customers c ON o.customer_id = c.customer_id
JOIN regions r ON c.region_id = r.region_id
CROSS JOIN UNNEST(o.items) AS oi
JOIN products p ON oi.product_id = p.product_id
),
monthly_aggregates AS (
SELECT
DATE_TRUNC('month', order_date) as month,
region_name,
category,
subcategory,
customer_segment,
-- Standard aggregations
COUNT(*) as order_count,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(item_revenue) as total_revenue,
SUM(item_cost) as total_cost,
(SUM(item_revenue) - SUM(item_cost)) as profit,
AVG(item_revenue) as avg_item_revenue,
-- Statistical measures
STDDEV_POP(item_revenue) as revenue_stddev,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY item_revenue) as median_revenue,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY item_revenue) as p95_revenue,
-- Collect sample for detailed analysis
ARRAY_AGG(item_revenue ORDER BY item_revenue DESC LIMIT 100) as top_revenues
FROM enriched_data
GROUP BY
DATE_TRUNC('month', order_date),
region_name,
category,
subcategory,
customer_segment
-- QueryLeaf creates efficient $group stage with proper field projections
)
-- Advanced window functions for trend analysis
SELECT
month,
region_name,
category,
subcategory,
customer_segment,
-- Core metrics
order_count,
unique_customers,
total_revenue,
profit,
ROUND(profit / total_revenue * 100, 2) as profit_margin_pct,
ROUND(total_revenue / unique_customers, 2) as revenue_per_customer,
-- Trend analysis using window functions
LAG(total_revenue, 1) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
) as previous_month_revenue,
-- Growth calculations
ROUND(
((total_revenue - LAG(total_revenue, 1) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
)) / LAG(total_revenue, 1) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
)) * 100, 2
) as month_over_month_growth,
-- Moving averages
AVG(total_revenue) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_3month,
AVG(total_revenue) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) as moving_avg_6month,
-- Cumulative totals
SUM(total_revenue) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
ROWS UNBOUNDED PRECEDING
) as cumulative_revenue,
-- Rankings and percentiles
RANK() OVER (
PARTITION BY month
ORDER BY total_revenue DESC
) as revenue_rank,
PERCENT_RANK() OVER (
PARTITION BY month
ORDER BY total_revenue
) as revenue_percentile,
-- Volatility measures
STDDEV(total_revenue) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) as revenue_volatility,
-- Min/Max within window
MIN(total_revenue) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING
) as window_min,
MAX(total_revenue) OVER (
PARTITION BY region_name, category, customer_segment
ORDER BY month
ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING
) as window_max,
-- Position within range
CASE
WHEN MAX(total_revenue) OVER (...) - MIN(total_revenue) OVER (...) > 0
THEN ROUND(
((total_revenue - MIN(total_revenue) OVER (...)) /
(MAX(total_revenue) OVER (...) - MIN(total_revenue) OVER (...)
)) * 100, 1
)
ELSE 50.0
END as position_in_range_pct
FROM monthly_aggregates
WHERE month >= '2024-06-01' -- Filter for recent months
ORDER BY month, region_name, category, total_revenue DESC
-- QueryLeaf optimization features:
-- ALLOW_DISK_USE for large aggregations
-- MAX_TIME_MS for timeout control
-- HINT for index suggestions
-- READ_CONCERN for consistency control
WITH AGGREGATION_OPTIONS (
ALLOW_DISK_USE = true,
MAX_TIME_MS = 300000,
HINT = 'order_date_status_idx',
READ_CONCERN = 'majority'
);
-- Performance monitoring and optimization
SELECT
stage_name,
execution_time_ms,
documents_examined,
documents_returned,
index_used,
memory_usage_mb,
-- Efficiency metrics
ROUND(documents_returned::FLOAT / documents_examined, 4) as selectivity,
ROUND(documents_returned / (execution_time_ms / 1000.0), 0) as docs_per_second,
-- Performance flags
CASE
WHEN execution_time_ms > 30000 THEN 'SLOW_STAGE'
WHEN documents_examined > documents_returned * 100 THEN 'INEFFICIENT_FILTERING'
WHEN NOT index_used AND documents_examined > 10000 THEN 'MISSING_INDEX'
ELSE 'OPTIMAL'
END as performance_flag,
-- Optimization recommendations
CASE
WHEN NOT index_used AND documents_examined > 10000
THEN 'Add index for this stage'
WHEN documents_examined > documents_returned * 100
THEN 'Move filtering earlier in pipeline'
WHEN memory_usage_mb > 100
THEN 'Consider using allowDiskUse'
ELSE 'No optimization needed'
END as recommendation
FROM EXPLAIN_AGGREGATION_PIPELINE('orders', @pipeline_query)
ORDER BY execution_time_ms DESC;
-- Index recommendations based on aggregation patterns
WITH pipeline_analysis AS (
SELECT
collection_name,
stage_type,
stage_index,
field_name,
operation_type,
estimated_improvement
FROM ANALYZE_AGGREGATION_INDEXES(@common_pipelines)
),
index_recommendations AS (
SELECT
collection_name,
STRING_AGG(field_name, ', ' ORDER BY stage_index) as compound_index_fields,
COUNT(*) as stages_optimized,
MAX(estimated_improvement) as max_improvement,
STRING_AGG(DISTINCT operation_type, ', ') as optimization_types
FROM pipeline_analysis
GROUP BY collection_name
)
SELECT
collection_name,
'CREATE INDEX idx_' || REPLACE(compound_index_fields, ', ', '_') ||
' ON ' || collection_name || ' (' || compound_index_fields || ')' as create_index_statement,
stages_optimized,
max_improvement as estimated_improvement,
optimization_types,
-- Priority scoring
CASE
WHEN max_improvement = 'high' AND stages_optimized >= 3 THEN 1
WHEN max_improvement = 'high' AND stages_optimized >= 2 THEN 2
WHEN max_improvement = 'medium' AND stages_optimized >= 3 THEN 3
ELSE 4
END as priority_rank
FROM index_recommendations
ORDER BY priority_rank, stages_optimized DESC;
-- Memory usage optimization strategies
SELECT
pipeline_name,
total_memory_mb,
peak_memory_mb,
documents_processed,
-- Memory efficiency metrics
ROUND(peak_memory_mb / (documents_processed / 1000.0), 2) as mb_per_1k_docs,
-- Memory optimization recommendations
CASE
WHEN peak_memory_mb > 500 THEN 'Use allowDiskUse: true'
WHEN mb_per_1k_docs > 10 THEN 'Reduce projection fields early'
WHEN documents_processed > 1000000 THEN 'Consider batch processing'
ELSE 'Memory usage optimal'
END as memory_recommendation,
-- Suggested batch size for large datasets
CASE
WHEN peak_memory_mb > 1000 THEN 10000
WHEN peak_memory_mb > 500 THEN 25000
WHEN peak_memory_mb > 100 THEN 50000
ELSE NULL
END as suggested_batch_size
FROM PIPELINE_PERFORMANCE_METRICS()
WHERE total_memory_mb > 50 -- Focus on memory-intensive pipelines
ORDER BY peak_memory_mb DESC;
-- QueryLeaf aggregation optimization provides:
-- 1. Automatic pipeline stage reordering for optimal performance
-- 2. Index usage hints and recommendations
-- 3. Memory management with disk spilling controls
-- 4. Window function optimization with efficient partitioning
-- 5. Early filtering and projection optimization
-- 6. Compound index recommendations based on pipeline analysis
-- 7. Performance monitoring and bottleneck identification
-- 8. Batch processing strategies for large datasets
-- 9. SQL-familiar syntax for complex analytical operations
-- 10. Integration with MongoDB's native aggregation performance features
Best Practices for Aggregation Pipeline Optimization
Performance Design Guidelines
Essential practices for high-performance aggregation pipelines:
- Early Filtering: Move
$match
stages as early as possible to reduce data volume - Index Utilization: Design compound indexes specifically for aggregation patterns
- Memory Management: Use
allowDiskUse: true
for large datasets - Stage Ordering: Optimize stage sequence to minimize document flow
- Projection Optimization: Project only necessary fields at each stage
- Lookup Efficiency: Use sub-pipelines in
$lookup
to reduce data transfer
Monitoring and Optimization
Implement comprehensive performance monitoring:
- Execution Analysis: Use
explain()
to identify bottlenecks and inefficiencies - Memory Tracking: Monitor memory usage patterns and disk spilling
- Index Usage: Verify optimal index utilization across pipeline stages
- Performance Metrics: Track execution times and document processing rates
- Resource Utilization: Monitor CPU, memory, and I/O during aggregations
- Benchmark Comparison: Establish performance baselines and track improvements
Conclusion
MongoDB aggregation pipeline optimization requires strategic approach to stage ordering, memory management, and index design. Unlike traditional SQL query optimization that relies on automated query planners, MongoDB aggregation optimization demands understanding of pipeline execution, data flow patterns, and resource utilization characteristics.
Key optimization benefits include:
- Predictable Performance: Optimized pipelines deliver consistent execution times regardless of data growth
- Efficient Resource Usage: Strategic memory management and disk spilling prevent resource exhaustion
- Scalable Analytics: Proper optimization enables complex analytics on large datasets
- Index Integration: Strategic indexing dramatically improves pipeline performance
- Flexible Processing: Support for complex analytical operations with optimal resource usage
Whether you're building real-time analytics platforms, business intelligence systems, or complex data transformation pipelines, MongoDB aggregation optimization with QueryLeaf's familiar SQL interface provides the foundation for high-performance analytical processing. This combination enables you to implement sophisticated analytics solutions while preserving familiar query patterns and optimization approaches.
QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB aggregation pipeline execution through intelligent stage reordering, index recommendations, and memory management while providing SQL-familiar syntax for complex analytical operations. Advanced window functions, statistical calculations, and performance monitoring are seamlessly handled through familiar SQL patterns, making high-performance analytics both powerful and accessible.
The integration of sophisticated aggregation optimization with SQL-style analytics makes MongoDB an ideal platform for applications requiring both complex analytical processing and familiar database interaction patterns, ensuring your analytics solutions remain both performant and maintainable as they scale and evolve.