MongoDB Aggregation Framework for Real-Time Analytics: Advanced Data Processing Pipelines and SQL-Compatible Query Patterns
Modern applications require sophisticated data processing capabilities that can handle complex analytical queries, real-time aggregations, and advanced transformations at scale. Traditional approaches to data analytics often rely on separate ETL processes, batch processing systems, and complex data warehouses that introduce latency, complexity, and operational overhead that becomes increasingly problematic as data volumes and processing demands grow.
MongoDB's Aggregation Framework provides powerful in-database processing capabilities that enable real-time analytics, complex data transformations, and sophisticated analytical queries directly within the operational database. Unlike traditional batch-oriented analytics approaches, MongoDB aggregation pipelines process data in real-time, support complex multi-stage transformations, and integrate seamlessly with operational workloads while delivering high-performance analytical results.
The Traditional Data Analytics Limitations
Conventional relational database analytics approaches have significant constraints for modern real-time processing requirements:
-- Traditional PostgreSQL analytics - limited window functions and complex subqueries
-- Basic sales analytics with traditional SQL limitations
WITH monthly_sales_summary AS (
SELECT
DATE_TRUNC('month', order_date) as month,
product_category,
customer_id,
salesperson_id,
region,
-- Basic aggregations
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MIN(total_amount) as min_order_value,
MAX(total_amount) as max_order_value,
-- Limited window function capabilities
SUM(total_amount) OVER (
PARTITION BY product_category, region
ORDER BY DATE_TRUNC('month', order_date)
RANGE BETWEEN INTERVAL '3 months' PRECEDING AND CURRENT ROW
) as rolling_3_month_revenue,
LAG(SUM(total_amount)) OVER (
PARTITION BY product_category, region
ORDER BY DATE_TRUNC('month', order_date)
) as previous_month_revenue,
-- Row number for ranking (limited functionality)
ROW_NUMBER() OVER (
PARTITION BY DATE_TRUNC('month', order_date), region
ORDER BY SUM(total_amount) DESC
) as revenue_rank_in_region
FROM orders o
LEFT JOIN order_items oi ON o.order_id = oi.order_id
LEFT JOIN products p ON oi.product_id = p.product_id
LEFT JOIN customers c ON o.customer_id = c.customer_id
LEFT JOIN salespeople s ON o.salesperson_id = s.salesperson_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '12 months'
AND o.status = 'completed'
GROUP BY
DATE_TRUNC('month', order_date),
product_category,
customer_id,
salesperson_id,
region
),
customer_segmentation AS (
SELECT
customer_id,
region,
-- Customer metrics calculation
COUNT(*) as total_orders,
SUM(total_revenue) as lifetime_revenue,
AVG(avg_order_value) as avg_order_value,
MAX(month) as last_order_month,
MIN(month) as first_order_month,
-- Recency, Frequency, Monetary calculation (limited)
EXTRACT(DAYS FROM (CURRENT_DATE - MAX(month))) as days_since_last_order,
COUNT(*) as frequency_score,
SUM(total_revenue) as monetary_score,
-- Simple percentile calculation (limited support)
PERCENT_RANK() OVER (ORDER BY SUM(total_revenue)) as revenue_percentile,
PERCENT_RANK() OVER (ORDER BY COUNT(*)) as frequency_percentile,
-- Basic customer categorization
CASE
WHEN SUM(total_revenue) > 10000 AND COUNT(*) > 10 THEN 'high_value'
WHEN SUM(total_revenue) > 5000 OR COUNT(*) > 5 THEN 'medium_value'
WHEN EXTRACT(DAYS FROM (CURRENT_DATE - MAX(month))) > 90 THEN 'at_risk'
ELSE 'low_value'
END as customer_segment,
-- Growth trend analysis (very limited)
CASE
WHEN COUNT(*) FILTER (WHERE month >= CURRENT_DATE - INTERVAL '3 months') > 0 THEN 'active'
WHEN COUNT(*) FILTER (WHERE month >= CURRENT_DATE - INTERVAL '6 months') > 0 THEN 'declining'
ELSE 'inactive'
END as activity_trend
FROM monthly_sales_summary
GROUP BY customer_id, region
),
product_performance AS (
SELECT
product_category,
region,
month,
-- Product metrics
SUM(order_count) as total_orders,
SUM(total_revenue) as category_revenue,
AVG(avg_order_value) as avg_category_order_value,
COUNT(DISTINCT customer_id) as unique_customers,
-- Market share calculation (complex with traditional SQL)
SUM(total_revenue) / (
SELECT SUM(total_revenue)
FROM monthly_sales_summary mss2
WHERE mss2.month = monthly_sales_summary.month
AND mss2.region = monthly_sales_summary.region
) * 100 as market_share_percent,
-- Growth rate calculation
SUM(total_revenue) / NULLIF(LAG(SUM(total_revenue)) OVER (
PARTITION BY product_category, region
ORDER BY month
), 0) - 1 as month_over_month_growth,
-- Seasonal analysis (limited capabilities)
AVG(SUM(total_revenue)) OVER (
PARTITION BY product_category, region, EXTRACT(MONTH FROM month)
ORDER BY month
ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
) as seasonal_avg_revenue
FROM monthly_sales_summary
GROUP BY product_category, region, month
),
advanced_analytics AS (
SELECT
cs.customer_segment,
cs.region,
cs.activity_trend,
-- Customer segment analysis
COUNT(*) as customers_in_segment,
AVG(cs.lifetime_revenue) as avg_lifetime_value,
AVG(cs.total_orders) as avg_orders_per_customer,
AVG(cs.days_since_last_order) as avg_days_since_last_order,
-- Revenue contribution by segment
SUM(cs.lifetime_revenue) as segment_total_revenue,
SUM(cs.lifetime_revenue) / (
SELECT SUM(lifetime_revenue) FROM customer_segmentation
) * 100 as revenue_contribution_percent,
-- Top products for each segment (limited subquery approach)
(
SELECT product_category
FROM monthly_sales_summary mss
WHERE mss.customer_id IN (
SELECT cs2.customer_id
FROM customer_segmentation cs2
WHERE cs2.customer_segment = cs.customer_segment
AND cs2.region = cs.region
)
GROUP BY product_category
ORDER BY SUM(total_revenue) DESC
LIMIT 1
) as top_product_category,
-- Cohort analysis (very complex with traditional SQL)
COUNT(*) FILTER (
WHERE cs.first_order_month >= CURRENT_DATE - INTERVAL '1 month'
) as new_customers_this_month,
COUNT(*) FILTER (
WHERE cs.last_order_month >= CURRENT_DATE - INTERVAL '1 month'
AND cs.first_order_month < CURRENT_DATE - INTERVAL '1 month'
) as returning_customers_this_month
FROM customer_segmentation cs
GROUP BY cs.customer_segment, cs.region, cs.activity_trend
)
SELECT
customer_segment,
region,
activity_trend,
customers_in_segment,
ROUND(avg_lifetime_value::numeric, 2) as avg_lifetime_value,
ROUND(avg_orders_per_customer::numeric, 2) as avg_orders_per_customer,
ROUND(avg_days_since_last_order::numeric, 1) as avg_days_since_last_order,
ROUND(segment_total_revenue::numeric, 2) as segment_revenue,
ROUND(revenue_contribution_percent::numeric, 2) as revenue_contribution_pct,
top_product_category,
new_customers_this_month,
returning_customers_this_month,
-- Customer health score (simplified)
CASE
WHEN customer_segment = 'high_value' AND activity_trend = 'active' THEN 95
WHEN customer_segment = 'high_value' AND activity_trend = 'declining' THEN 70
WHEN customer_segment = 'medium_value' AND activity_trend = 'active' THEN 80
WHEN customer_segment = 'medium_value' AND activity_trend = 'declining' THEN 55
WHEN customer_segment = 'low_value' AND activity_trend = 'active' THEN 65
WHEN activity_trend = 'inactive' THEN 25
ELSE 40
END as customer_health_score,
-- Recommendations (limited business logic)
CASE
WHEN customer_segment = 'high_value' AND activity_trend = 'declining' THEN 'Urgent: Re-engagement campaign needed'
WHEN customer_segment = 'medium_value' AND activity_trend = 'active' THEN 'Opportunity: Upsell to premium products'
WHEN customer_segment = 'at_risk' THEN 'Action: Retention campaign required'
WHEN new_customers_this_month > returning_customers_this_month THEN 'Focus: Improve customer retention'
ELSE 'Monitor: Continue current strategy'
END as recommended_action
FROM advanced_analytics
ORDER BY
CASE customer_segment
WHEN 'high_value' THEN 1
WHEN 'medium_value' THEN 2
WHEN 'low_value' THEN 3
ELSE 4
END,
segment_revenue DESC;
-- Traditional PostgreSQL analytics problems:
-- 1. Complex multi-table JOINs required for comprehensive analysis
-- 2. Limited window function capabilities for advanced analytics
-- 3. Difficult to implement complex transformations and nested aggregations
-- 4. Poor performance with large datasets and complex calculations
-- 5. Limited support for hierarchical and nested data structures
-- 6. No built-in support for time-series analytics and forecasting
-- 7. Complex subqueries required for conditional aggregations
-- 8. Difficult to implement real-time analytics and streaming calculations
-- 9. Limited flexibility for dynamic grouping and pivot operations
-- 10. No native support for advanced statistical functions and machine learning
-- MySQL limitations are even more severe
SELECT
DATE_FORMAT(order_date, '%Y-%m') as month,
product_category,
region,
COUNT(*) as order_count,
SUM(total_amount) as revenue,
AVG(total_amount) as avg_order_value,
-- Very limited analytical capabilities
-- No window functions in older MySQL versions
-- No complex aggregation support
-- Limited JSON processing capabilities
-- Poor performance with complex queries
(SELECT SUM(total_amount)
FROM orders o2
WHERE DATE_FORMAT(o2.order_date, '%Y-%m') = DATE_FORMAT(orders.order_date, '%Y-%m')
AND o2.region = orders.region) as region_monthly_total
FROM orders
JOIN order_items ON orders.order_id = order_items.order_id
JOIN products ON order_items.product_id = products.product_id
WHERE order_date >= DATE_SUB(CURDATE(), INTERVAL 12 MONTH)
AND status = 'completed'
GROUP BY
DATE_FORMAT(order_date, '%Y-%m'),
product_category,
region
ORDER BY month DESC, revenue DESC;
-- MySQL problems:
-- - No window functions in older versions
-- - Very limited JSON support and processing
-- - Basic aggregation functions only
-- - Poor performance with complex analytical queries
-- - No support for advanced statistical calculations
-- - Limited date/time processing capabilities
-- - No native support for real-time analytics
-- - Basic subquery support with performance issues
MongoDB's Aggregation Framework provides comprehensive real-time analytics capabilities:
// MongoDB Advanced Aggregation Framework - powerful real-time analytics and data processing
const { MongoClient } = require('mongodb');
class MongoDBAnalyticsEngine {
constructor(db) {
this.db = db;
this.collections = {
orders: db.collection('orders'),
products: db.collection('products'),
customers: db.collection('customers'),
analytics: db.collection('analytics_cache')
};
this.pipelineCache = new Map();
}
async performComprehensiveAnalytics() {
console.log('Executing comprehensive real-time analytics with MongoDB Aggregation Framework...');
// Execute multiple analytical pipelines in parallel
const [
salesAnalytics,
customerSegmentation,
productPerformance,
timeSeriesAnalysis,
predictiveInsights
] = await Promise.all([
this.executeSalesAnalyticsPipeline(),
this.executeCustomerSegmentationPipeline(),
this.executeProductPerformancePipeline(),
this.executeTimeSeriesAnalytics(),
this.executePredictiveAnalytics()
]);
return {
salesAnalytics,
customerSegmentation,
productPerformance,
timeSeriesAnalysis,
predictiveInsights,
generatedAt: new Date()
};
}
async executeSalesAnalyticsPipeline() {
console.log('Executing advanced sales analytics pipeline...');
const pipeline = [
// Stage 1: Match recent completed orders
{
$match: {
status: 'completed',
orderDate: { $gte: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000) }, // Last 12 months
'totals.total': { $gt: 0 }
}
},
// Stage 2: Add computed fields and date transformations
{
$addFields: {
year: { $year: '$orderDate' },
month: { $month: '$orderDate' },
dayOfYear: { $dayOfYear: '$orderDate' },
weekOfYear: { $week: '$orderDate' },
quarter: {
$ceil: { $divide: [{ $month: '$orderDate' }, 3] }
},
// Calculate order metrics
orderValue: '$totals.total',
itemCount: { $size: '$items' },
avgItemValue: {
$divide: ['$totals.total', { $size: '$items' }]
},
// Customer type classification
customerType: {
$switch: {
branches: [
{ case: { $gte: ['$totals.total', 1000] }, then: 'high_value' },
{ case: { $gte: ['$totals.total', 500] }, then: 'medium_value' },
{ case: { $gte: ['$totals.total', 100] }, then: 'regular' }
],
default: 'low_value'
}
},
// Season classification
season: {
$switch: {
branches: [
{ case: { $in: [{ $month: '$orderDate' }, [12, 1, 2]] }, then: 'winter' },
{ case: { $in: [{ $month: '$orderDate' }, [3, 4, 5]] }, then: 'spring' },
{ case: { $in: [{ $month: '$orderDate' }, [6, 7, 8]] }, then: 'summer' },
{ case: { $in: [{ $month: '$orderDate' }, [9, 10, 11]] }, then: 'fall' }
],
default: 'unknown'
}
}
}
},
// Stage 3: Lookup customer information
{
$lookup: {
from: 'customers',
localField: 'customerId',
foreignField: '_id',
as: 'customer',
pipeline: [
{
$project: {
name: 1,
email: 1,
'profile.location.country': 1,
'profile.location.region': 1,
'account.type': 1,
'account.registrationDate': 1,
'preferences.category': 1
}
}
]
}
},
// Stage 4: Unwind customer data
{ $unwind: '$customer' },
// Stage 5: Unwind order items for detailed analysis
{ $unwind: '$items' },
// Stage 6: Lookup product information
{
$lookup: {
from: 'products',
localField: 'items.productId',
foreignField: '_id',
as: 'product',
pipeline: [
{
$project: {
name: 1,
category: 1,
brand: 1,
'pricing.cost': 1,
'specifications.weight': 1,
'inventory.supplier': 1
}
}
]
}
},
// Stage 7: Unwind product data
{ $unwind: '$product' },
// Stage 8: Calculate item-level metrics
{
$addFields: {
itemRevenue: { $multiply: ['$items.quantity', '$items.unitPrice'] },
itemProfit: {
$multiply: [
'$items.quantity',
{ $subtract: ['$items.unitPrice', '$product.pricing.cost'] }
]
},
profitMargin: {
$divide: [
{ $subtract: ['$items.unitPrice', '$product.pricing.cost'] },
'$items.unitPrice'
]
}
}
},
// Stage 9: Group by multiple dimensions for comprehensive analysis
{
$group: {
_id: {
year: '$year',
month: '$month',
quarter: '$quarter',
season: '$season',
category: '$product.category',
brand: '$product.brand',
country: '$customer.profile.location.country',
region: '$customer.profile.location.region',
customerType: '$customerType',
accountType: '$customer.account.type'
},
// Order-level metrics
totalOrders: { $sum: 1 },
uniqueCustomers: { $addToSet: '$customerId' },
totalRevenue: { $sum: '$itemRevenue' },
totalProfit: { $sum: '$itemProfit' },
totalQuantity: { $sum: '$items.quantity' },
// Statistical measures
avgOrderValue: { $avg: '$orderValue' },
minOrderValue: { $min: '$orderValue' },
maxOrderValue: { $max: '$orderValue' },
stdDevOrderValue: { $stdDevPop: '$orderValue' },
// Product performance
avgProfitMargin: { $avg: '$profitMargin' },
avgItemPrice: { $avg: '$items.unitPrice' },
totalWeight: { $sum: { $multiply: ['$items.quantity', '$product.specifications.weight'] } },
// Customer insights
newCustomers: {
$sum: {
$cond: [
{ $gte: [
'$customer.account.registrationDate',
{ $dateFromParts: { year: '$year', month: '$month', day: 1 } }
]},
1, 0
]
}
},
// Supplier diversity
uniqueSuppliers: { $addToSet: '$product.inventory.supplier' },
// Sample orders for detailed analysis
sampleOrders: { $push: {
orderId: '$_id',
customerId: '$customerId',
orderValue: '$orderValue',
itemCount: '$itemCount',
orderDate: '$orderDate'
}}
}
},
// Stage 10: Calculate derived metrics
{
$addFields: {
uniqueCustomerCount: { $size: '$uniqueCustomers' },
uniqueSupplierCount: { $size: '$uniqueSuppliers' },
averageOrdersPerCustomer: {
$divide: ['$totalOrders', { $size: '$uniqueCustomers' }]
},
revenuePerCustomer: {
$divide: ['$totalRevenue', { $size: '$uniqueCustomers' }]
},
profitMarginPercent: {
$multiply: [{ $divide: ['$totalProfit', '$totalRevenue'] }, 100]
},
customerAcquisitionRate: {
$divide: ['$newCustomers', { $size: '$uniqueCustomers' }]
}
}
},
// Stage 11: Add ranking and percentile information
{
$setWindowFields: {
partitionBy: { year: '$_id.year', quarter: '$_id.quarter' },
sortBy: { totalRevenue: -1 },
output: {
revenueRank: { $rank: {} },
revenuePercentile: { $percentRank: {} },
cumulativeRevenue: { $sum: '$totalRevenue', window: { documents: ['unbounded preceding', 'current'] } },
movingAvgRevenue: { $avg: '$totalRevenue', window: { documents: [-2, 2] } }
}
}
},
// Stage 12: Calculate growth rates using window functions
{
$setWindowFields: {
partitionBy: {
category: '$_id.category',
country: '$_id.country'
},
sortBy: { year: 1, month: 1 },
output: {
previousMonthRevenue: {
$shift: { output: '$totalRevenue', by: -1 }
},
previousYearRevenue: {
$shift: { output: '$totalRevenue', by: -12 }
}
}
}
},
// Stage 13: Calculate final growth metrics
{
$addFields: {
monthOverMonthGrowth: {
$cond: [
{ $gt: ['$previousMonthRevenue', 0] },
{
$subtract: [
{ $divide: ['$totalRevenue', '$previousMonthRevenue'] },
1
]
},
null
]
},
yearOverYearGrowth: {
$cond: [
{ $gt: ['$previousYearRevenue', 0] },
{
$subtract: [
{ $divide: ['$totalRevenue', '$previousYearRevenue'] },
1
]
},
null
]
}
}
},
// Stage 14: Add performance indicators
{
$addFields: {
performanceIndicator: {
$switch: {
branches: [
{
case: { $and: [
{ $gt: ['$monthOverMonthGrowth', 0.1] },
{ $gt: ['$profitMarginPercent', 20] }
]},
then: 'excellent'
},
{
case: { $and: [
{ $gt: ['$monthOverMonthGrowth', 0.05] },
{ $gt: ['$profitMarginPercent', 15] }
]},
then: 'good'
},
{
case: { $or: [
{ $lt: ['$monthOverMonthGrowth', -0.1] },
{ $lt: ['$profitMarginPercent', 5] }
]},
then: 'concerning'
}
],
default: 'average'
}
},
// Business recommendations
recommendation: {
$switch: {
branches: [
{
case: { $lt: ['$monthOverMonthGrowth', -0.2] },
then: 'Urgent: Investigate revenue decline and implement recovery strategy'
},
{
case: { $lt: ['$profitMarginPercent', 5] },
then: 'Action: Review pricing strategy and cost structure'
},
{
case: { $and: [
{ $gt: ['$monthOverMonthGrowth', 0.15] },
{ $gt: ['$revenuePercentile', 0.8] }
]},
then: 'Opportunity: Scale successful strategies and increase investment'
},
{
case: { $lt: ['$customerAcquisitionRate', 0.1] },
then: 'Focus: Improve customer acquisition and marketing effectiveness'
}
],
default: 'Monitor: Continue current strategies with minor optimizations'
}
}
}
},
// Stage 15: Sort by strategic importance
{
$sort: {
'totalRevenue': -1,
'profitMarginPercent': -1,
'_id.year': -1,
'_id.month': -1
}
},
// Stage 16: Limit to top performing segments for detailed analysis
{ $limit: 100 }
];
const results = await this.collections.orders.aggregate(pipeline).toArray();
console.log(`Sales analytics completed: ${results.length} segments analyzed`);
return results;
}
async executeCustomerSegmentationPipeline() {
console.log('Executing advanced customer segmentation pipeline...');
const pipeline = [
// Stage 1: Match active customers with orders
{
$match: {
'account.status': 'active',
'account.createdAt': { $gte: new Date(Date.now() - 730 * 24 * 60 * 60 * 1000) } // Last 2 years
}
},
// Stage 2: Lookup customer orders
{
$lookup: {
from: 'orders',
localField: '_id',
foreignField: 'customerId',
as: 'orders',
pipeline: [
{
$match: {
status: 'completed',
orderDate: { $gte: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000) }
}
},
{
$project: {
orderDate: 1,
'totals.total': 1,
'totals.currency': 1,
items: 1
}
}
]
}
},
// Stage 3: Calculate RFM metrics (Recency, Frequency, Monetary)
{
$addFields: {
// Recency: Days since last order
recency: {
$cond: [
{ $gt: [{ $size: '$orders' }, 0] },
{
$divide: [
{ $subtract: [new Date(), { $max: '$orders.orderDate' }] },
1000 * 60 * 60 * 24 // Convert to days
]
},
999 // Default high recency for customers with no orders
]
},
// Frequency: Number of orders
frequency: { $size: '$orders' },
// Monetary: Total spending
monetary: {
$reduce: {
input: '$orders',
initialValue: 0,
in: { $add: ['$$value', '$$this.totals.total'] }
}
},
// Additional customer metrics
avgOrderValue: {
$cond: [
{ $gt: [{ $size: '$orders' }, 0] },
{
$divide: [
{
$reduce: {
input: '$orders',
initialValue: 0,
in: { $add: ['$$value', '$$this.totals.total'] }
}
},
{ $size: '$orders' }
]
},
0
]
},
firstOrderDate: { $min: '$orders.orderDate' },
lastOrderDate: { $max: '$orders.orderDate' },
// Calculate customer lifetime (days)
customerLifetime: {
$cond: [
{ $gt: [{ $size: '$orders' }, 0] },
{
$divide: [
{ $subtract: [{ $max: '$orders.orderDate' }, { $min: '$orders.orderDate' }] },
1000 * 60 * 60 * 24
]
},
0
]
}
}
},
// Stage 4: Calculate RFM scores using percentile ranking
{
$setWindowFields: {
sortBy: { recency: 1 }, // Lower recency is better (more recent)
output: {
recencyScore: {
$percentRank: {}
}
}
}
},
{
$setWindowFields: {
sortBy: { frequency: -1 }, // Higher frequency is better
output: {
frequencyScore: {
$percentRank: {}
}
}
}
},
{
$setWindowFields: {
sortBy: { monetary: -1 }, // Higher monetary is better
output: {
monetaryScore: {
$percentRank: {}
}
}
}
},
// Stage 5: Create RFM segments
{
$addFields: {
// Convert percentile scores to 1-5 scale
recencyBucket: {
$ceil: { $multiply: [{ $subtract: [1, '$recencyScore'] }, 5] }
},
frequencyBucket: {
$ceil: { $multiply: ['$frequencyScore', 5] }
},
monetaryBucket: {
$ceil: { $multiply: ['$monetaryScore', 5] }
}
}
},
// Stage 6: Create customer segments based on RFM
{
$addFields: {
rfmScore: {
$concat: [
{ $toString: '$recencyBucket' },
{ $toString: '$frequencyBucket' },
{ $toString: '$monetaryBucket' }
]
},
customerSegment: {
$switch: {
branches: [
// Champions: High value, bought recently, buy often
{
case: { $and: [
{ $gte: ['$recencyBucket', 4] },
{ $gte: ['$frequencyBucket', 4] },
{ $gte: ['$monetaryBucket', 4] }
]},
then: 'champions'
},
// Loyal customers: High frequency and monetary, but not recent
{
case: { $and: [
{ $gte: ['$frequencyBucket', 4] },
{ $gte: ['$monetaryBucket', 4] }
]},
then: 'loyal_customers'
},
// Potential loyalists: Recent customers with good frequency
{
case: { $and: [
{ $gte: ['$recencyBucket', 4] },
{ $gte: ['$frequencyBucket', 3] }
]},
then: 'potential_loyalists'
},
// New customers: Recent but low frequency/monetary
{
case: { $and: [
{ $gte: ['$recencyBucket', 4] },
{ $lte: ['$frequencyBucket', 2] }
]},
then: 'new_customers'
},
// Promising: Recent moderate spenders
{
case: { $and: [
{ $gte: ['$recencyBucket', 3] },
{ $gte: ['$monetaryBucket', 3] }
]},
then: 'promising'
},
// Need attention: Recent low spenders
{
case: { $and: [
{ $gte: ['$recencyBucket', 3] },
{ $lte: ['$monetaryBucket', 2] }
]},
then: 'need_attention'
},
// About to sleep: Low recency but good historical value
{
case: { $and: [
{ $lte: ['$recencyBucket', 2] },
{ $gte: ['$monetaryBucket', 3] }
]},
then: 'about_to_sleep'
},
// At risk: Low recency and frequency but good monetary
{
case: { $and: [
{ $lte: ['$recencyBucket', 2] },
{ $lte: ['$frequencyBucket', 2] },
{ $gte: ['$monetaryBucket', 3] }
]},
then: 'at_risk'
},
// Cannot lose: Very low recency but high monetary
{
case: { $and: [
{ $eq: ['$recencyBucket', 1] },
{ $gte: ['$monetaryBucket', 4] }
]},
then: 'cannot_lose'
},
// Hibernating: Low across all dimensions
{
case: { $and: [
{ $lte: ['$recencyBucket', 2] },
{ $lte: ['$frequencyBucket', 2] },
{ $lte: ['$monetaryBucket', 2] }
]},
then: 'hibernating'
}
],
default: 'others'
}
},
// Calculate customer lifetime value
customerLifetimeValue: {
$multiply: [
'$avgOrderValue',
{ $divide: ['$frequency', { $max: [1, { $divide: ['$customerLifetime', 365] }] }] }, // Orders per year
3 // Projected future years
]
},
// Churn risk assessment
churnRisk: {
$switch: {
branches: [
{ case: { $gte: ['$recency', 180] }, then: 'high' },
{ case: { $gte: ['$recency', 90] }, then: 'medium' },
{ case: { $gte: ['$recency', 30] }, then: 'low' }
],
default: 'very_low'
}
}
}
},
// Stage 7: Enrich with customer profile data
{
$addFields: {
profileCompleteness: {
$divide: [
{
$add: [
{ $cond: [{ $ne: ['$profile.firstName', null] }, 1, 0] },
{ $cond: [{ $ne: ['$profile.lastName', null] }, 1, 0] },
{ $cond: [{ $ne: ['$profile.phone', null] }, 1, 0] },
{ $cond: [{ $ne: ['$profile.location', null] }, 1, 0] },
{ $cond: [{ $ne: ['$profile.dateOfBirth', null] }, 1, 0] },
{ $cond: [{ $ne: ['$preferences', null] }, 1, 0] }
]
},
6
]
},
engagementLevel: {
$switch: {
branches: [
{
case: { $and: [
{ $gte: ['$frequency', 10] },
{ $lte: ['$recency', 30] }
]},
then: 'highly_engaged'
},
{
case: { $and: [
{ $gte: ['$frequency', 5] },
{ $lte: ['$recency', 60] }
]},
then: 'moderately_engaged'
},
{
case: { $and: [
{ $gte: ['$frequency', 2] },
{ $lte: ['$recency', 120] }
]},
then: 'lightly_engaged'
}
],
default: 'disengaged'
}
}
}
},
// Stage 8: Create final customer analysis
{
$project: {
_id: 1,
email: 1,
'profile.firstName': 1,
'profile.lastName': 1,
'profile.location.country': 1,
'profile.location.region': 1,
'account.type': 1,
'account.createdAt': 1,
// RFM Analysis
recency: { $round: ['$recency', 1] },
frequency: 1,
monetary: { $round: ['$monetary', 2] },
rfmScore: 1,
recencyBucket: 1,
frequencyBucket: 1,
monetaryBucket: 1,
// Customer Classification
customerSegment: 1,
churnRisk: 1,
engagementLevel: 1,
// Business Metrics
avgOrderValue: { $round: ['$avgOrderValue', 2] },
customerLifetimeValue: { $round: ['$customerLifetimeValue', 2] },
customerLifetime: { $round: ['$customerLifetime', 0] },
profileCompleteness: { $round: [{ $multiply: ['$profileCompleteness', 100] }, 1] },
// Timeline
firstOrderDate: 1,
lastOrderDate: 1,
// Marketing recommendations
marketingAction: {
$switch: {
branches: [
{ case: { $eq: ['$customerSegment', 'champions'] }, then: 'Reward and advocate program' },
{ case: { $eq: ['$customerSegment', 'loyal_customers'] }, then: 'Upsell and cross-sell premium products' },
{ case: { $eq: ['$customerSegment', 'potential_loyalists'] }, then: 'Loyalty program enrollment' },
{ case: { $eq: ['$customerSegment', 'new_customers'] }, then: 'Onboarding and education campaign' },
{ case: { $eq: ['$customerSegment', 'promising'] }, then: 'Targeted promotions and engagement' },
{ case: { $eq: ['$customerSegment', 'need_attention'] }, then: 'Value demonstration and support' },
{ case: { $eq: ['$customerSegment', 'about_to_sleep'] }, then: 'Re-engagement campaign with incentives' },
{ case: { $eq: ['$customerSegment', 'at_risk'] }, then: 'Urgent retention program' },
{ case: { $eq: ['$customerSegment', 'cannot_lose'] }, then: 'Win-back campaign with premium offers' },
{ case: { $eq: ['$customerSegment', 'hibernating'] }, then: 'Reactivation with significant discount' }
],
default: 'Monitor and nurture'
}
}
}
},
// Stage 9: Sort by customer value and risk
{
$sort: {
customerLifetimeValue: -1,
recency: 1,
frequency: -1
}
}
];
const results = await this.collections.customers.aggregate(pipeline).toArray();
console.log(`Customer segmentation completed: ${results.length} customers analyzed`);
return results;
}
async executeProductPerformancePipeline() {
console.log('Executing product performance analysis pipeline...');
const pipeline = [
// Stage 1: Match products with sales data
{
$lookup: {
from: 'orders',
let: { productId: '$_id' },
pipeline: [
{
$match: {
status: 'completed',
orderDate: { $gte: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000) }
}
},
{ $unwind: '$items' },
{
$match: {
$expr: { $eq: ['$items.productId', '$$productId'] }
}
},
{
$project: {
orderDate: 1,
customerId: 1,
quantity: '$items.quantity',
unitPrice: '$items.unitPrice',
totalPrice: '$items.totalPrice',
'customer.location.country': 1
}
}
],
as: 'sales'
}
},
// Stage 2: Calculate comprehensive product metrics
{
$addFields: {
// Sales volume metrics
totalUnitsSold: {
$reduce: {
input: '$sales',
initialValue: 0,
in: { $add: ['$$value', '$$this.quantity'] }
}
},
totalRevenue: {
$reduce: {
input: '$sales',
initialValue: 0,
in: { $add: ['$$value', '$$this.totalPrice'] }
}
},
totalOrders: { $size: '$sales' },
uniqueCustomers: { $size: { $setUnion: [{ $map: { input: '$sales', as: 'sale', in: '$$sale.customerId' } }, []] } },
// Pricing analysis
avgSellingPrice: {
$cond: [
{ $gt: [{ $size: '$sales' }, 0] },
{
$divide: [
{
$reduce: {
input: '$sales',
initialValue: 0,
in: { $add: ['$$value', '$$this.unitPrice'] }
}
},
{ $size: '$sales' }
]
},
0
]
},
// Profit analysis
totalProfit: {
$reduce: {
input: '$sales',
initialValue: 0,
in: {
$add: [
'$$value',
{
$multiply: [
'$$this.quantity',
{ $subtract: ['$$this.unitPrice', '$pricing.cost'] }
]
}
]
}
}
},
// Time-based analysis
firstSaleDate: { $min: '$sales.orderDate' },
lastSaleDate: { $max: '$sales.orderDate' },
// Calculate monthly sales trend
monthlySales: {
$map: {
input: { $range: [0, 12] },
as: 'monthOffset',
in: {
month: {
$dateFromParts: {
year: { $year: { $dateSubtract: { startDate: new Date(), unit: 'month', amount: '$$monthOffset' } } },
month: { $month: { $dateSubtract: { startDate: new Date(), unit: 'month', amount: '$$monthOffset' } } },
day: 1
}
},
sales: {
$reduce: {
input: {
$filter: {
input: '$sales',
cond: {
$and: [
{ $gte: ['$$this.orderDate', { $dateSubtract: { startDate: new Date(), unit: 'month', amount: { $add: ['$$monthOffset', 1] } } }] },
{ $lt: ['$$this.orderDate', { $dateSubtract: { startDate: new Date(), unit: 'month', amount: '$$monthOffset' } }] }
]
}
}
},
initialValue: 0,
in: { $add: ['$$value', '$$this.totalPrice'] }
}
}
}
}
}
}
},
// Stage 3: Calculate performance indicators
{
$addFields: {
// Performance ratios
profitMargin: {
$cond: [
{ $gt: ['$totalRevenue', 0] },
{ $divide: ['$totalProfit', '$totalRevenue'] },
0
]
},
revenuePerCustomer: {
$cond: [
{ $gt: ['$uniqueCustomers', 0] },
{ $divide: ['$totalRevenue', '$uniqueCustomers'] },
0
]
},
avgOrderValue: {
$cond: [
{ $gt: ['$totalOrders', 0] },
{ $divide: ['$totalRevenue', '$totalOrders'] },
0
]
},
// Inventory turnover (simplified)
inventoryTurnover: {
$cond: [
{ $gt: ['$inventory.quantity', 0] },
{ $divide: ['$totalUnitsSold', '$inventory.quantity'] },
0
]
},
// Product lifecycle stage
lifecycleStage: {
$switch: {
branches: [
{
case: {
$lt: [
'$firstSaleDate',
{ $dateSubtract: { startDate: new Date(), unit: 'day', amount: 90 } }
]
},
then: 'new'
},
{
case: { $and: [
{ $gt: ['$totalRevenue', 10000] },
{ $gt: ['$profitMargin', 0.2] }
]},
then: 'growth'
},
{
case: { $and: [
{ $gt: ['$totalRevenue', 50000] },
{ $gte: ['$profitMargin', 0.15] }
]},
then: 'maturity'
},
{
case: { $or: [
{ $lt: ['$profitMargin', 0.1] },
{ $lt: [
'$lastSaleDate',
{ $dateSubtract: { startDate: new Date(), unit: 'day', amount: 60 } }
]}
]},
then: 'decline'
}
],
default: 'development'
}
},
// Sales trend analysis
salesTrend: {
$let: {
vars: {
recentSales: { $slice: ['$monthlySales.sales', 0, 6] },
olderSales: { $slice: ['$monthlySales.sales', 6, 6] }
},
in: {
$cond: [
{ $and: [
{ $gt: [{ $avg: '$$recentSales' }, { $avg: '$$olderSales' }] },
{ $gt: [{ $avg: '$$recentSales' }, 0] }
]},
'growing',
{
$cond: [
{ $lt: [{ $avg: '$$recentSales' }, { $multiply: [{ $avg: '$$olderSales' }, 0.8] }] },
'declining',
'stable'
]
}
]
}
}
}
}
},
// Stage 4: Add competitive analysis using window functions
{
$setWindowFields: {
partitionBy: '$category',
sortBy: { totalRevenue: -1 },
output: {
categoryRank: { $rank: {} },
categoryPercentile: { $percentRank: {} },
marketShareInCategory: {
$divide: [
'$totalRevenue',
{ $sum: '$totalRevenue', window: { documents: ['unbounded preceding', 'unbounded following'] } }
]
}
}
}
},
// Stage 5: Calculate final performance scores
{
$addFields: {
// Overall performance score (0-100)
performanceScore: {
$multiply: [
{
$add: [
{ $multiply: ['$categoryPercentile', 0.3] }, // Market position
{ $multiply: [{ $min: ['$profitMargin', 0.5] }, 0.25] }, // Profitability (capped at 50%)
{ $multiply: [{ $divide: [{ $min: ['$inventoryTurnover', 10] }, 10] }, 0.2] }, // Efficiency (capped at 10x)
{
$multiply: [
{
$switch: {
branches: [
{ case: { $eq: ['$salesTrend', 'growing'] }, then: 1 },
{ case: { $eq: ['$salesTrend', 'stable'] }, then: 0.7 },
{ case: { $eq: ['$salesTrend', 'declining'] }, then: 0.3 }
],
default: 0.5
}
},
0.25
]
} // Growth trend
]
},
100
]
},
// Strategic recommendations
strategicRecommendation: {
$switch: {
branches: [
{
case: { $and: [
{ $eq: ['$salesTrend', 'growing'] },
{ $gt: ['$profitMargin', 0.25] },
{ $lt: ['$categoryRank', 5] }
]},
then: 'Star Product: Increase investment and marketing focus'
},
{
case: { $and: [
{ $eq: ['$lifecycleStage', 'maturity'] },
{ $gt: ['$profitMargin', 0.2] }
]},
then: 'Cash Cow: Optimize operations and maintain market share'
},
{
case: { $and: [
{ $eq: ['$salesTrend', 'growing'] },
{ $lt: ['$profitMargin', 0.15] }
]},
then: 'Question Mark: Improve margins or consider repositioning'
},
{
case: { $and: [
{ $eq: ['$salesTrend', 'declining'] },
{ $lt: ['$profitMargin', 0.1] }
]},
then: 'Dog: Consider discontinuation or major repositioning'
},
{
case: { $eq: ['$lifecycleStage', 'new'] },
then: 'Monitor closely and provide marketing support'
}
],
default: 'Maintain current strategy with regular monitoring'
}
}
}
},
// Stage 6: Final projection and sorting
{
$project: {
_id: 1,
name: 1,
category: 1,
brand: 1,
'pricing.cost': 1,
'pricing.retail': 1,
// Sales performance
totalUnitsSold: 1,
totalRevenue: { $round: ['$totalRevenue', 2] },
totalProfit: { $round: ['$totalProfit', 2] },
totalOrders: 1,
uniqueCustomers: 1,
// Financial metrics
avgSellingPrice: { $round: ['$avgSellingPrice', 2] },
profitMargin: { $round: [{ $multiply: ['$profitMargin', 100] }, 2] },
revenuePerCustomer: { $round: ['$revenuePerCustomer', 2] },
avgOrderValue: { $round: ['$avgOrderValue', 2] },
// Performance indicators
performanceScore: { $round: ['$performanceScore', 1] },
lifecycleStage: 1,
salesTrend: 1,
categoryRank: 1,
marketShareInCategory: { $round: [{ $multiply: ['$marketShareInCategory', 100] }, 3] },
// Operational metrics
inventoryTurnover: { $round: ['$inventoryTurnover', 2] },
'inventory.quantity': 1,
'inventory.lowStockThreshold': 1,
// Timeline
firstSaleDate: 1,
lastSaleDate: 1,
// Strategic guidance
strategicRecommendation: 1,
// Monthly trend data (last 6 months)
recentMonthlySales: { $slice: ['$monthlySales', 0, 6] }
}
},
// Stage 7: Sort by performance score and revenue
{
$sort: {
performanceScore: -1,
totalRevenue: -1
}
}
];
const results = await this.collections.products.aggregate(pipeline).toArray();
console.log(`Product performance analysis completed: ${results.length} products analyzed`);
return results;
}
async executeTimeSeriesAnalytics() {
console.log('Executing time-series analytics with advanced forecasting...');
const pipeline = [
// Stage 1: Match recent orders for time-series analysis
{
$match: {
status: 'completed',
orderDate: { $gte: new Date(Date.now() - 730 * 24 * 60 * 60 * 1000) } // Last 2 years
}
},
// Stage 2: Group by time periods
{
$group: {
_id: {
year: { $year: '$orderDate' },
month: { $month: '$orderDate' },
week: { $week: '$orderDate' },
dayOfWeek: { $dayOfWeek: '$orderDate' },
hour: { $hour: '$orderDate' }
},
// Core metrics
orderCount: { $sum: 1 },
totalRevenue: { $sum: '$totals.total' },
avgOrderValue: { $avg: '$totals.total' },
uniqueCustomers: { $addToSet: '$customerId' },
// Item-level aggregations
totalItemsSold: {
$sum: {
$reduce: {
input: '$items',
initialValue: 0,
in: { $add: ['$$value', '$$this.quantity'] }
}
}
},
// Distribution analysis
orderValues: { $push: '$totals.total' },
// Customer behavior
newCustomers: {
$sum: {
$cond: [
{ $eq: [{ $dayOfYear: '$orderDate' }, { $dayOfYear: '$customer.account.createdAt' }] },
1,
0
]
}
},
// Geographic distribution
countries: { $addToSet: '$shippingAddress.country' },
regions: { $addToSet: '$shippingAddress.region' }
}
},
// Stage 3: Add time-based calculations
{
$addFields: {
// Convert _id to more usable date format
date: {
$dateFromParts: {
year: '$_id.year',
month: '$_id.month',
day: 1
}
},
uniqueCustomerCount: { $size: '$uniqueCustomers' },
uniqueCountryCount: { $size: '$countries' },
// Statistical measures
revenueStdDev: { $stdDevPop: '$orderValues' },
medianOrderValue: {
$let: {
vars: {
sortedValues: {
$sortArray: {
input: '$orderValues',
sortBy: 1
}
}
},
in: {
$arrayElemAt: [
'$$sortedValues',
{ $floor: { $divide: [{ $size: '$$sortedValues' }, 2] } }
]
}
}
},
// Time period classifications
periodType: {
$switch: {
branches: [
{ case: { $gte: ['$_id.dayOfWeek', 2] }, then: 'weekend' },
{ case: { $lte: ['$_id.dayOfWeek', 6] }, then: 'weekday' }
],
default: 'weekend'
}
},
timeOfDay: {
$switch: {
branches: [
{ case: { $lt: ['$_id.hour', 6] }, then: 'late_night' },
{ case: { $lt: ['$_id.hour', 12] }, then: 'morning' },
{ case: { $lt: ['$_id.hour', 18] }, then: 'afternoon' },
{ case: { $lt: ['$_id.hour', 22] }, then: 'evening' }
],
default: 'night'
}
}
}
},
// Stage 4: Add moving averages and trend analysis
{
$setWindowFields: {
partitionBy: null,
sortBy: { date: 1 },
output: {
// Moving averages
revenue7DayMA: {
$avg: '$totalRevenue',
window: { documents: [-6, 0] }
},
revenue30DayMA: {
$avg: '$totalRevenue',
window: { documents: [-29, 0] }
},
// Growth calculations
previousDayRevenue: {
$shift: { output: '$totalRevenue', by: -1 }
},
previousWeekRevenue: {
$shift: { output: '$totalRevenue', by: -7 }
},
previousMonthRevenue: {
$shift: { output: '$totalRevenue', by: -30 }
},
// Volatility measures
revenueVolatility: {
$stdDevPop: '$totalRevenue',
window: { documents: [-29, 0] }
},
// Trend strength
trendLine: {
$linearFill: '$totalRevenue'
}
}
}
},
// Stage 5: Calculate growth rates and trend indicators
{
$addFields: {
dayOverDayGrowth: {
$cond: [
{ $gt: ['$previousDayRevenue', 0] },
{ $subtract: [{ $divide: ['$totalRevenue', '$previousDayRevenue'] }, 1] },
null
]
},
weekOverWeekGrowth: {
$cond: [
{ $gt: ['$previousWeekRevenue', 0] },
{ $subtract: [{ $divide: ['$totalRevenue', '$previousWeekRevenue'] }, 1] },
null
]
},
monthOverMonthGrowth: {
$cond: [
{ $gt: ['$previousMonthRevenue', 0] },
{ $subtract: [{ $divide: ['$totalRevenue', '$previousMonthRevenue'] }, 1] },
null
]
},
// Trend classification
trendDirection: {
$switch: {
branches: [
{
case: { $gt: ['$revenue7DayMA', { $multiply: ['$revenue30DayMA', 1.05] }] },
then: 'strong_upward'
},
{
case: { $gt: ['$revenue7DayMA', { $multiply: ['$revenue30DayMA', 1.02] }] },
then: 'upward'
},
{
case: { $lt: ['$revenue7DayMA', { $multiply: ['$revenue30DayMA', 0.95] }] },
then: 'strong_downward'
},
{
case: { $lt: ['$revenue7DayMA', { $multiply: ['$revenue30DayMA', 0.98] }] },
then: 'downward'
}
],
default: 'stable'
}
},
// Seasonality detection
seasonalityScore: {
$divide: [
'$revenueStdDev',
{ $max: ['$revenue30DayMA', 1] }
]
},
// Performance classification
performanceCategory: {
$switch: {
branches: [
{
case: { $gte: ['$totalRevenue', { $multiply: ['$revenue30DayMA', 1.2] }] },
then: 'exceptional'
},
{
case: { $gte: ['$totalRevenue', { $multiply: ['$revenue30DayMA', 1.1] }] },
then: 'above_average'
},
{
case: { $lte: ['$totalRevenue', { $multiply: ['$revenue30DayMA', 0.8] }] },
then: 'below_average'
},
{
case: { $lte: ['$totalRevenue', { $multiply: ['$revenue30DayMA', 0.9] }] },
then: 'poor'
}
],
default: 'average'
}
}
}
},
// Stage 6: Add forecasting indicators
{
$addFields: {
// Simple linear trend projection (next 7 days)
next7DayForecast: {
$add: [
'$revenue7DayMA',
{
$multiply: [
7,
{ $subtract: ['$revenue7DayMA', { $shift: { output: '$revenue7DayMA', by: -7 } }] }
]
}
]
},
// Confidence interval for forecast
forecastConfidence: {
$subtract: [
100,
{ $multiply: [{ $divide: ['$revenueVolatility', '$revenue7DayMA'] }, 100] }
]
},
// Anomaly detection
isAnomaly: {
$or: [
{ $gt: ['$totalRevenue', { $add: ['$revenue7DayMA', { $multiply: ['$revenueVolatility', 2] }] }] },
{ $lt: ['$totalRevenue', { $subtract: ['$revenue7DayMA', { $multiply: ['$revenueVolatility', 2] }] }] }
]
},
// Business recommendations
recommendation: {
$switch: {
branches: [
{
case: { $eq: ['$trendDirection', 'strong_downward'] },
then: 'Urgent: Investigate revenue decline and implement recovery strategies'
},
{
case: { $eq: ['$trendDirection', 'strong_upward'] },
then: 'Opportunity: Scale successful initiatives and increase capacity'
},
{
case: { $gt: ['$seasonalityScore', 0.5] },
then: 'High volatility detected: Implement demand smoothing strategies'
},
{
case: { $eq: ['$performanceCategory', 'exceptional'] },
then: 'Analyze success factors for replication'
}
],
default: 'Continue monitoring with current strategy'
}
}
}
},
// Stage 7: Final projection and filtering
{
$project: {
date: 1,
year: '$_id.year',
month: '$_id.month',
week: '$_id.week',
dayOfWeek: '$_id.dayOfWeek',
hour: '$_id.hour',
// Core metrics
orderCount: 1,
totalRevenue: { $round: ['$totalRevenue', 2] },
avgOrderValue: { $round: ['$avgOrderValue', 2] },
uniqueCustomerCount: 1,
totalItemsSold: 1,
// Statistical measures
medianOrderValue: { $round: ['$medianOrderValue', 2] },
revenueStdDev: { $round: ['$revenueStdDev', 2] },
// Trend analysis
revenue7DayMA: { $round: ['$revenue7DayMA', 2] },
revenue30DayMA: { $round: ['$revenue30DayMA', 2] },
dayOverDayGrowth: { $round: [{ $multiply: ['$dayOverDayGrowth', 100] }, 2] },
weekOverWeekGrowth: { $round: [{ $multiply: ['$weekOverWeekGrowth', 100] }, 2] },
monthOverMonthGrowth: { $round: [{ $multiply: ['$monthOverMonthGrowth', 100] }, 2] },
trendDirection: 1,
performanceCategory: 1,
seasonalityScore: { $round: ['$seasonalityScore', 3] },
// Forecasting
next7DayForecast: { $round: ['$next7DayForecast', 2] },
forecastConfidence: { $round: ['$forecastConfidence', 1] },
isAnomaly: 1,
// Context
periodType: 1,
timeOfDay: 1,
uniqueCountryCount: 1,
// Business intelligence
recommendation: 1
}
},
// Stage 8: Sort by date descending
{
$sort: { date: -1 }
},
// Stage 9: Limit to recent data for performance
{
$limit: 365 // Last year of daily data
}
];
const results = await this.collections.orders.aggregate(pipeline).toArray();
console.log(`Time-series analytics completed: ${results.length} time periods analyzed`);
return results;
}
async executePredictiveAnalytics() {
console.log('Executing predictive analytics and machine learning insights...');
const pipeline = [
// Stage 1: Create customer behavioral features
{
$match: {
'account.status': 'active'
}
},
// Stage 2: Lookup order history
{
$lookup: {
from: 'orders',
localField: '_id',
foreignField: 'customerId',
as: 'orders',
pipeline: [
{
$match: {
status: 'completed',
orderDate: { $gte: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000) }
}
},
{
$project: {
orderDate: 1,
'totals.total': 1,
daysSinceRegistration: {
$divide: [
{ $subtract: ['$orderDate', '$customer.account.createdAt'] },
1000 * 60 * 60 * 24
]
}
}
},
{ $sort: { orderDate: 1 } }
]
}
},
// Stage 3: Calculate predictive features
{
$addFields: {
// Temporal features
daysSinceRegistration: {
$divide: [
{ $subtract: [new Date(), '$account.createdAt'] },
1000 * 60 * 60 * 24
]
},
daysSinceLastOrder: {
$cond: [
{ $gt: [{ $size: '$orders' }, 0] },
{
$divide: [
{ $subtract: [new Date(), { $max: '$orders.orderDate' }] },
1000 * 60 * 60 * 24
]
},
999
]
},
// Purchase behavior features
totalOrders: { $size: '$orders' },
totalSpent: {
$reduce: {
input: '$orders',
initialValue: 0,
in: { $add: ['$$value', '$$this.totals.total'] }
}
},
// Purchase frequency and regularity
avgDaysBetweenOrders: {
$cond: [
{ $gt: [{ $size: '$orders' }, 1] },
{
$divide: [
{
$divide: [
{ $subtract: [{ $max: '$orders.orderDate' }, { $min: '$orders.orderDate' }] },
1000 * 60 * 60 * 24
]
},
{ $subtract: [{ $size: '$orders' }, 1] }
]
},
null
]
},
// Purchase pattern analysis
orderFrequencyTrend: {
$let: {
vars: {
recentOrders: {
$size: {
$filter: {
input: '$orders',
cond: { $gte: ['$$this.orderDate', { $dateSubtract: { startDate: new Date(), unit: 'day', amount: 90 } }] }
}
}
},
olderOrders: {
$size: {
$filter: {
input: '$orders',
cond: {
$and: [
{ $lt: ['$$this.orderDate', { $dateSubtract: { startDate: new Date(), unit: 'day', amount: 90 } }] },
{ $gte: ['$$this.orderDate', { $dateSubtract: { startDate: new Date(), unit: 'day', amount: 180 } }] }
]
}
}
}
}
},
in: {
$cond: [
{ $gt: ['$$olderOrders', 0] },
{ $subtract: [{ $divide: ['$$recentOrders', 90] }, { $divide: ['$$olderOrders', 90] }] },
0
]
}
}
}
}
},
// Stage 4: Calculate churn probability using logistic regression approximation
{
$addFields: {
// Feature normalization and scoring
recencyScore: {
$cond: [
{ $gt: ['$daysSinceLastOrder', 180] }, 0.8,
{ $cond: [
{ $gt: ['$daysSinceLastOrder', 90] }, 0.6,
{ $cond: [
{ $gt: ['$daysSinceLastOrder', 30] }, 0.3,
0.1
]}
]}
]
},
frequencyScore: {
$cond: [
{ $lt: ['$totalOrders', 2] }, 0.7,
{ $cond: [
{ $lt: ['$totalOrders', 5] }, 0.5,
{ $cond: [
{ $lt: ['$totalOrders', 10] }, 0.3,
0.1
]}
]}
]
},
monetaryScore: {
$cond: [
{ $lt: ['$totalSpent', 100] }, 0.6,
{ $cond: [
{ $lt: ['$totalSpent', 500] }, 0.4,
{ $cond: [
{ $lt: ['$totalSpent', 1000] }, 0.2,
0.1
]}
]}
]
},
engagementScore: {
$cond: [
{ $lt: ['$orderFrequencyTrend', -0.5] }, 0.8,
{ $cond: [
{ $lt: ['$orderFrequencyTrend', 0] }, 0.6,
{ $cond: [
{ $gt: ['$orderFrequencyTrend', 0.5] }, 0.1,
0.3
]}
]}
]
}
}
},
// Stage 5: Calculate composite churn probability
{
$addFields: {
churnProbability: {
$multiply: [
{
$add: [
{ $multiply: ['$recencyScore', 0.35] },
{ $multiply: ['$frequencyScore', 0.25] },
{ $multiply: ['$monetaryScore', 0.25] },
{ $multiply: ['$engagementScore', 0.15] }
]
},
100
]
},
// Customer lifetime value prediction
predictedLifetimeValue: {
$cond: [
{ $and: [
{ $gt: ['$totalOrders', 0] },
{ $gt: ['$avgDaysBetweenOrders', 0] }
]},
{
$multiply: [
{ $divide: ['$totalSpent', '$totalOrders'] }, // Average order value
{ $divide: [365, '$avgDaysBetweenOrders'] }, // Orders per year
{ $subtract: [5, { $multiply: ['$churnProbability', 0.05] }] } // Expected years (adjusted for churn risk)
]
},
'$totalSpent'
]
},
// Next purchase prediction
nextPurchasePrediction: {
$cond: [
{ $gt: ['$avgDaysBetweenOrders', 0] },
{
$dateAdd: {
startDate: { $max: '$orders.orderDate' },
unit: 'day',
amount: {
$multiply: [
'$avgDaysBetweenOrders',
{ $add: [1, { $multiply: ['$churnProbability', 0.01] }] } // Adjust for churn risk
]
}
}
},
null
]
},
// Upselling opportunity score
upsellOpportunity: {
$multiply: [
{
$add: [
{ $cond: [{ $gt: ['$totalOrders', 5] }, 0.3, 0] },
{ $cond: [{ $gt: ['$totalSpent', 500] }, 0.3, 0] },
{ $cond: [{ $lt: ['$daysSinceLastOrder', 30] }, 0.25, 0] },
{ $cond: [{ $gt: ['$orderFrequencyTrend', 0] }, 0.15, 0] }
]
},
100
]
}
}
},
// Stage 6: Risk segmentation and recommendations
{
$addFields: {
riskSegment: {
$switch: {
branches: [
{ case: { $gte: ['$churnProbability', 70] }, then: 'high_risk' },
{ case: { $gte: ['$churnProbability', 50] }, then: 'medium_risk' },
{ case: { $gte: ['$churnProbability', 30] }, then: 'low_risk' }
],
default: 'stable'
}
},
valueSegment: {
$switch: {
branches: [
{ case: { $gte: ['$predictedLifetimeValue', 2000] }, then: 'high_value' },
{ case: { $gte: ['$predictedLifetimeValue', 1000] }, then: 'medium_value' },
{ case: { $gte: ['$predictedLifetimeValue', 500] }, then: 'moderate_value' }
],
default: 'low_value'
}
},
// AI-driven marketing recommendations
marketingRecommendation: {
$switch: {
branches: [
{
case: { $and: [
{ $eq: ['$riskSegment', 'high_risk'] },
{ $in: ['$valueSegment', ['high_value', 'medium_value']] }
]},
then: 'Urgent win-back campaign with premium incentives'
},
{
case: { $and: [
{ $eq: ['$riskSegment', 'medium_risk'] },
{ $gte: ['$upsellOpportunity', 60] }
]},
then: 'Proactive engagement with upselling opportunities'
},
{
case: { $and: [
{ $eq: ['$riskSegment', 'stable'] },
{ $gte: ['$upsellOpportunity', 70] }
]},
then: 'Cross-sell and premium product recommendations'
},
{
case: { $eq: ['$riskSegment', 'low_risk'] },
then: 'Retention campaign with loyalty program enrollment'
}
],
default: 'Monitor and maintain current engagement level'
}
}
}
},
// Stage 7: Add market basket analysis
{
$lookup: {
from: 'orders',
let: { customerId: '$_id' },
pipeline: [
{
$match: {
$expr: { $eq: ['$customerId', '$$customerId'] },
status: 'completed'
}
},
{ $unwind: '$items' },
{
$group: {
_id: '$items.productId',
purchaseCount: { $sum: 1 },
totalQuantity: { $sum: '$items.quantity' },
totalSpent: { $sum: '$items.totalPrice' }
}
},
{ $sort: { purchaseCount: -1 } },
{ $limit: 5 }
],
as: 'topProducts'
}
},
// Stage 8: Final projection
{
$project: {
_id: 1,
email: 1,
'profile.firstName': 1,
'profile.lastName': 1,
'account.type': 1,
'account.createdAt': 1,
// Behavioral metrics
daysSinceRegistration: { $round: ['$daysSinceRegistration', 0] },
daysSinceLastOrder: { $round: ['$daysSinceLastOrder', 0] },
totalOrders: 1,
totalSpent: { $round: ['$totalSpent', 2] },
avgDaysBetweenOrders: { $round: ['$avgDaysBetweenOrders', 1] },
// Predictive scores
churnProbability: { $round: ['$churnProbability', 1] },
predictedLifetimeValue: { $round: ['$predictedLifetimeValue', 2] },
upsellOpportunity: { $round: ['$upsellOpportunity', 1] },
// Segmentation
riskSegment: 1,
valueSegment: 1,
// Predictions
nextPurchasePrediction: 1,
marketingRecommendation: 1,
// Product affinity
topProducts: 1,
// Trend analysis
orderFrequencyTrend: { $round: ['$orderFrequencyTrend', 3] }
}
},
// Stage 9: Sort by strategic importance
{
$sort: {
predictedLifetimeValue: -1,
churnProbability: -1,
upsellOpportunity: -1
}
},
// Stage 10: Limit to top opportunities
{
$limit: 1000
}
];
const results = await this.collections.customers.aggregate(pipeline).toArray();
console.log(`Predictive analytics completed: ${results.length} customers analyzed with ML insights`);
return results;
}
async cacheAnalyticsResults(analysisType, data) {
console.log(`Caching ${analysisType} analytics results...`);
try {
await this.collections.analytics.replaceOne(
{ type: analysisType },
{
type: analysisType,
data: data,
generatedAt: new Date(),
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000) // 24 hour TTL
},
{ upsert: true }
);
} catch (error) {
console.warn('Failed to cache analytics results:', error.message);
}
}
async getAnalyticsDashboard() {
console.log('Generating comprehensive analytics dashboard...');
const [
salesSummary,
customerInsights,
productInsights,
timeSeriesInsights,
predictiveInsights
] = await Promise.all([
this.getSalesSummary(),
this.getCustomerInsights(),
this.getProductInsights(),
this.getTimeSeriesInsights(),
this.getPredictiveInsights()
]);
return {
dashboard: {
salesSummary,
customerInsights,
productInsights,
timeSeriesInsights,
predictiveInsights
},
metadata: {
generatedAt: new Date(),
dataFreshness: '< 1 hour',
recordsCovered: {
orders: salesSummary.totalOrders || 0,
customers: customerInsights.totalCustomers || 0,
products: productInsights.totalProducts || 0
}
}
};
}
async getSalesSummary() {
const pipeline = [
{
$match: {
status: 'completed',
orderDate: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
}
},
{
$group: {
_id: null,
totalOrders: { $sum: 1 },
totalRevenue: { $sum: '$totals.total' },
avgOrderValue: { $avg: '$totals.total' },
uniqueCustomers: { $addToSet: '$customerId' }
}
},
{
$project: {
totalOrders: 1,
totalRevenue: { $round: ['$totalRevenue', 2] },
avgOrderValue: { $round: ['$avgOrderValue', 2] },
uniqueCustomers: { $size: '$uniqueCustomers' }
}
}
];
const result = await this.collections.orders.aggregate(pipeline).toArray();
return result[0] || {};
}
async getCustomerInsights() {
const pipeline = [
{
$group: {
_id: null,
totalCustomers: { $sum: 1 },
activeCustomers: { $sum: { $cond: [{ $eq: ['$account.status', 'active'] }, 1, 0] } },
premiumCustomers: { $sum: { $cond: [{ $eq: ['$account.type', 'premium'] }, 1, 0] } }
}
}
];
const result = await this.collections.customers.aggregate(pipeline).toArray();
return result[0] || {};
}
async getProductInsights() {
const pipeline = [
{
$group: {
_id: null,
totalProducts: { $sum: 1 },
activeProducts: { $sum: { $cond: [{ $eq: ['$status', 'active'] }, 1, 0] } },
avgPrice: { $avg: '$pricing.retail' }
}
},
{
$project: {
totalProducts: 1,
activeProducts: 1,
avgPrice: { $round: ['$avgPrice', 2] }
}
}
];
const result = await this.collections.products.aggregate(pipeline).toArray();
return result[0] || {};
}
async getTimeSeriesInsights() {
return {
trend: 'upward',
growthRate: 12.5,
volatility: 'moderate'
};
}
async getPredictiveInsights() {
return {
averageChurnRisk: 25.3,
highValueCustomers: 150,
upsellOpportunities: 320
};
}
}
// Benefits of MongoDB Advanced Aggregation Framework:
// - Real-time analytics processing without ETL pipelines or data warehouses
// - Complex multi-stage transformations with window functions and statistical operations
// - Advanced time-series analysis with forecasting and trend detection capabilities
// - Machine learning integration for predictive analytics and customer segmentation
// - Flexible aggregation patterns that adapt to changing analytical requirements
// - High-performance processing that scales with data volume and complexity
// - SQL-compatible analytical operations through QueryLeaf integration
// - Comprehensive business intelligence capabilities within the operational database
// - Advanced statistical functions and mathematical operations for data science
// - Real-time dashboard generation with automated insights and recommendations
module.exports = {
MongoDBAnalyticsEngine
};
SQL-Style Aggregation with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB aggregation operations:
-- QueryLeaf advanced analytics with SQL-familiar aggregation syntax
-- Complex sales analytics with window functions and advanced aggregations
WITH monthly_sales_analysis AS (
SELECT
DATE_TRUNC('month', order_date) as month,
product_category,
customer_location.country,
customer_type,
-- Basic aggregations
COUNT(*) as order_count,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_amount) as median_order_value,
STDDEV_POP(total_amount) as order_value_stddev,
-- Item-level aggregations
SUM(item_quantity) as total_items_sold,
AVG(item_quantity) as avg_items_per_order,
SUM(item_quantity * item_unit_price) as item_revenue,
AVG(item_unit_price) as avg_item_price,
-- Advanced calculations
SUM(item_quantity * (item_unit_price - product_cost)) as total_profit,
AVG((item_unit_price - product_cost) / item_unit_price) as avg_profit_margin,
-- Customer behavior metrics
COUNT(*) FILTER (WHERE customer_registration_date >= DATE_TRUNC('month', order_date)) as new_customers,
COUNT(DISTINCT customer_id) FILTER (WHERE previous_order_date < DATE_TRUNC('month', order_date) - INTERVAL '3 months') as returning_customers,
-- Geographic diversity
COUNT(DISTINCT customer_location.country) as unique_countries,
COUNT(DISTINCT customer_location.region) as unique_regions
FROM orders o
CROSS JOIN UNNEST(o.items) as item
JOIN products p ON item.product_id = p._id
JOIN customers c ON o.customer_id = c._id
WHERE o.status = 'completed'
AND o.order_date >= CURRENT_DATE - INTERVAL '24 months'
GROUP BY
DATE_TRUNC('month', order_date),
product_category,
customer_location.country,
customer_type
),
-- Advanced window functions for trend analysis
sales_with_trends AS (
SELECT
*,
-- Moving averages
AVG(total_revenue) OVER (
PARTITION BY product_category, customer_location.country
ORDER BY month
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as revenue_3month_ma,
AVG(total_revenue) OVER (
PARTITION BY product_category, customer_location.country
ORDER BY month
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) as revenue_6month_ma,
-- Growth calculations
LAG(total_revenue, 1) OVER (
PARTITION BY product_category, customer_location.country
ORDER BY month
) as prev_month_revenue,
LAG(total_revenue, 12) OVER (
PARTITION BY product_category, customer_location.country
ORDER BY month
) as prev_year_revenue,
-- Ranking and percentiles
RANK() OVER (
PARTITION BY month
ORDER BY total_revenue DESC
) as monthly_revenue_rank,
PERCENT_RANK() OVER (
PARTITION BY month
ORDER BY total_revenue
) as monthly_revenue_percentile,
-- Cumulative calculations
SUM(total_revenue) OVER (
PARTITION BY product_category, customer_location.country
ORDER BY month
ROWS UNBOUNDED PRECEDING
) as cumulative_revenue,
-- Volatility measures
STDDEV(total_revenue) OVER (
PARTITION BY product_category, customer_location.country
ORDER BY month
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) as revenue_volatility,
-- Lead/lag for forecasting
LEAD(total_revenue, 1) OVER (
PARTITION BY product_category, customer_location.country
ORDER BY month
) as next_month_actual,
-- Dense rank for market position
DENSE_RANK() OVER (
PARTITION BY month
ORDER BY total_revenue DESC
) as market_position
FROM monthly_sales_analysis
),
-- Calculate growth rates and performance indicators
performance_metrics AS (
SELECT
*,
-- Growth rate calculations
CASE
WHEN prev_month_revenue > 0 THEN
ROUND(((total_revenue - prev_month_revenue) / prev_month_revenue * 100), 2)
ELSE NULL
END as month_over_month_growth,
CASE
WHEN prev_year_revenue > 0 THEN
ROUND(((total_revenue - prev_year_revenue) / prev_year_revenue * 100), 2)
ELSE NULL
END as year_over_year_growth,
-- Trend classification
CASE
WHEN revenue_3month_ma > revenue_6month_ma * 1.05 THEN 'strong_growth'
WHEN revenue_3month_ma > revenue_6month_ma * 1.02 THEN 'moderate_growth'
WHEN revenue_3month_ma < revenue_6month_ma * 0.95 THEN 'declining'
WHEN revenue_3month_ma < revenue_6month_ma * 0.98 THEN 'weak_growth'
ELSE 'stable'
END as trend_classification,
-- Performance assessment
CASE
WHEN monthly_revenue_percentile >= 0.9 THEN 'top_performer'
WHEN monthly_revenue_percentile >= 0.75 THEN 'strong_performer'
WHEN monthly_revenue_percentile >= 0.5 THEN 'average_performer'
WHEN monthly_revenue_percentile >= 0.25 THEN 'weak_performer'
ELSE 'bottom_performer'
END as performance_category,
-- Volatility assessment
CASE
WHEN revenue_volatility / NULLIF(revenue_6month_ma, 0) > 0.3 THEN 'high_volatility'
WHEN revenue_volatility / NULLIF(revenue_6month_ma, 0) > 0.15 THEN 'moderate_volatility'
ELSE 'low_volatility'
END as volatility_level,
-- Market share approximation
ROUND(
(total_revenue / SUM(total_revenue) OVER (PARTITION BY month) * 100),
3
) as market_share_percent,
-- Customer metrics
ROUND((total_revenue / unique_customers), 2) as revenue_per_customer,
ROUND((total_profit / total_revenue * 100), 2) as profit_margin_percent,
ROUND((new_customers / unique_customers * 100), 2) as new_customer_rate,
-- Operational efficiency
ROUND((total_items_sold / order_count), 2) as items_per_order,
ROUND((total_revenue / total_items_sold), 2) as revenue_per_item
FROM sales_with_trends
),
-- Advanced customer segmentation with RFM analysis
customer_rfm_analysis AS (
SELECT
customer_id,
customer_type,
customer_location.country,
customer_registration_date,
-- Recency calculation (days since last order)
EXTRACT(DAYS FROM (CURRENT_DATE - MAX(order_date))) as recency_days,
-- Frequency (number of orders)
COUNT(*) as frequency,
-- Monetary (total spending)
SUM(total_amount) as monetary_value,
-- Additional behavioral metrics
AVG(total_amount) as avg_order_value,
MIN(order_date) as first_order_date,
MAX(order_date) as last_order_date,
COUNT(DISTINCT product_category) as unique_categories_purchased,
EXTRACT(DAYS FROM (MAX(order_date) - MIN(order_date))) as customer_lifetime_days,
-- Purchase patterns
AVG(EXTRACT(DAYS FROM (order_date - LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date)))) as avg_days_between_orders,
STDDEV(total_amount) as order_value_consistency,
-- Seasonal analysis
COUNT(*) FILTER (WHERE EXTRACT(QUARTER FROM order_date) = 1) as q1_orders,
COUNT(*) FILTER (WHERE EXTRACT(QUARTER FROM order_date) = 2) as q2_orders,
COUNT(*) FILTER (WHERE EXTRACT(QUARTER FROM order_date) = 3) as q3_orders,
COUNT(*) FILTER (WHERE EXTRACT(QUARTER FROM order_date) = 4) as q4_orders
FROM orders o
JOIN customers c ON o.customer_id = c._id
WHERE o.status = 'completed'
AND o.order_date >= CURRENT_DATE - INTERVAL '24 months'
GROUP BY customer_id, customer_type, customer_location.country, customer_registration_date
),
-- Calculate RFM scores and customer segments
customer_segments AS (
SELECT
*,
-- RFM score calculations using percentile ranking
NTILE(5) OVER (ORDER BY recency_days ASC) as recency_score, -- Lower recency is better
NTILE(5) OVER (ORDER BY frequency DESC) as frequency_score, -- Higher frequency is better
NTILE(5) OVER (ORDER BY monetary_value DESC) as monetary_score, -- Higher monetary is better
-- Customer lifetime value prediction
CASE
WHEN avg_days_between_orders > 0 THEN
ROUND(
(avg_order_value * (365.0 / avg_days_between_orders) * 3), -- 3 year projection
2
)
ELSE monetary_value
END as predicted_lifetime_value,
-- Churn risk assessment
CASE
WHEN recency_days > 180 THEN 'high_risk'
WHEN recency_days > 90 THEN 'medium_risk'
WHEN recency_days > 30 THEN 'low_risk'
ELSE 'active'
END as churn_risk,
-- Engagement level
CASE
WHEN frequency >= 10 AND recency_days <= 30 THEN 'highly_engaged'
WHEN frequency >= 5 AND recency_days <= 60 THEN 'moderately_engaged'
WHEN frequency >= 2 AND recency_days <= 120 THEN 'lightly_engaged'
ELSE 'disengaged'
END as engagement_level,
-- Purchase diversity
CASE
WHEN unique_categories_purchased >= 5 THEN 'diverse_buyer'
WHEN unique_categories_purchased >= 3 THEN 'selective_buyer'
ELSE 'focused_buyer'
END as purchase_diversity
FROM customer_rfm_analysis
),
-- Final customer classification
customer_classification AS (
SELECT
*,
-- RFM segment classification
CASE
WHEN recency_score >= 4 AND frequency_score >= 4 AND monetary_score >= 4 THEN 'champions'
WHEN recency_score >= 2 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'loyal_customers'
WHEN recency_score >= 3 AND frequency_score <= 3 AND monetary_score <= 3 THEN 'potential_loyalists'
WHEN recency_score >= 4 AND frequency_score <= 1 THEN 'new_customers'
WHEN recency_score >= 3 AND frequency_score <= 1 AND monetary_score <= 2 THEN 'promising'
WHEN recency_score <= 2 AND frequency_score >= 2 AND monetary_score >= 2 THEN 'need_attention'
WHEN recency_score <= 2 AND frequency_score <= 2 AND monetary_score >= 3 THEN 'about_to_sleep'
WHEN recency_score <= 2 AND frequency_score <= 2 AND monetary_score <= 2 THEN 'at_risk'
WHEN recency_score <= 1 AND frequency_score <= 2 AND monetary_score >= 4 THEN 'cannot_lose_them'
ELSE 'hibernating'
END as rfm_segment,
-- Marketing action recommendations
CASE
WHEN recency_score >= 4 AND frequency_score >= 4 THEN 'Reward with loyalty program and exclusive offers'
WHEN monetary_score >= 4 AND recency_score <= 2 THEN 'Win-back campaign with premium incentives'
WHEN recency_score >= 4 AND frequency_score <= 2 THEN 'Nurture with educational content and onboarding'
WHEN frequency_score >= 3 AND recency_days > 60 THEN 'Re-engagement campaign with personalized offers'
WHEN churn_risk = 'high_risk' AND monetary_score >= 3 THEN 'Urgent retention campaign'
ELSE 'Monitor and maintain regular communication'
END as marketing_recommendation
FROM customer_segments
),
-- Product performance analysis with advanced metrics
product_performance AS (
SELECT
p._id as product_id,
p.name as product_name,
p.category,
p.brand,
p.pricing.cost,
p.pricing.retail,
-- Sales metrics from orders
COALESCE(sales.total_units_sold, 0) as total_units_sold,
COALESCE(sales.total_revenue, 0) as total_revenue,
COALESCE(sales.total_orders, 0) as total_orders,
COALESCE(sales.unique_customers, 0) as unique_customers,
COALESCE(sales.avg_selling_price, p.pricing.retail) as avg_selling_price,
-- Profitability analysis
COALESCE(sales.total_profit, 0) as total_profit,
CASE
WHEN COALESCE(sales.total_revenue, 0) > 0 THEN
ROUND((COALESCE(sales.total_profit, 0) / sales.total_revenue * 100), 2)
ELSE 0
END as profit_margin_percent,
-- Performance indicators
CASE
WHEN COALESCE(sales.total_revenue, 0) = 0 THEN 'no_sales'
WHEN sales.first_sale_date >= CURRENT_DATE - INTERVAL '90 days' THEN 'new_product'
WHEN sales.total_revenue >= 50000 AND sales.total_profit / sales.total_revenue >= 0.2 THEN 'star'
WHEN sales.total_revenue >= 10000 AND sales.total_profit / sales.total_revenue >= 0.15 THEN 'promising'
WHEN sales.last_sale_date < CURRENT_DATE - INTERVAL '60 days' THEN 'declining'
ELSE 'stable'
END as performance_category,
-- Inventory analysis
p.inventory.quantity as current_stock,
CASE
WHEN COALESCE(sales.total_units_sold, 0) > 0 AND p.inventory.quantity > 0 THEN
ROUND((sales.total_units_sold / p.inventory.quantity), 2)
ELSE 0
END as inventory_turnover,
-- Market position
sales.category_rank,
sales.category_market_share,
-- Time-based metrics
sales.first_sale_date,
sales.last_sale_date,
sales.sales_trend
FROM products p
LEFT JOIN (
SELECT
item.product_id,
COUNT(DISTINCT o.order_id) as total_orders,
SUM(item.quantity) as total_units_sold,
SUM(item.quantity * item.unit_price) as total_revenue,
COUNT(DISTINCT o.customer_id) as unique_customers,
AVG(item.unit_price) as avg_selling_price,
MIN(o.order_date) as first_sale_date,
MAX(o.order_date) as last_sale_date,
SUM(item.quantity * (item.unit_price - p.pricing.cost)) as total_profit,
-- Category ranking
RANK() OVER (PARTITION BY p.category ORDER BY SUM(item.quantity * item.unit_price) DESC) as category_rank,
-- Market share within category
ROUND(
(SUM(item.quantity * item.unit_price) /
SUM(SUM(item.quantity * item.unit_price)) OVER (PARTITION BY p.category) * 100),
2
) as category_market_share,
-- Sales trend analysis
CASE
WHEN COUNT(*) FILTER (WHERE o.order_date >= CURRENT_DATE - INTERVAL '90 days') >
COUNT(*) FILTER (WHERE o.order_date BETWEEN CURRENT_DATE - INTERVAL '180 days' AND CURRENT_DATE - INTERVAL '90 days')
THEN 'growing'
WHEN COUNT(*) FILTER (WHERE o.order_date >= CURRENT_DATE - INTERVAL '90 days') <
COUNT(*) FILTER (WHERE o.order_date BETWEEN CURRENT_DATE - INTERVAL '180 days' AND CURRENT_DATE - INTERVAL '90 days') * 0.8
THEN 'declining'
ELSE 'stable'
END as sales_trend
FROM orders o
CROSS JOIN UNNEST(o.items) as item
JOIN products p ON item.product_id = p._id
WHERE o.status = 'completed'
AND o.order_date >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY item.product_id, p.category, p.pricing.cost
) sales ON p._id = sales.product_id
)
-- Final consolidated analytics report
SELECT
'EXECUTIVE_SUMMARY' as report_section,
-- Overall business performance
(SELECT COUNT(*) FROM performance_metrics WHERE month >= CURRENT_DATE - INTERVAL '1 month') as current_month_segments,
(SELECT ROUND(AVG(total_revenue), 2) FROM performance_metrics WHERE month >= CURRENT_DATE - INTERVAL '1 month') as avg_monthly_revenue,
(SELECT ROUND(AVG(month_over_month_growth), 2) FROM performance_metrics WHERE month_over_month_growth IS NOT NULL) as avg_growth_rate,
-- Customer insights
(SELECT COUNT(*) FROM customer_classification WHERE rfm_segment = 'champions') as champion_customers,
(SELECT COUNT(*) FROM customer_classification WHERE churn_risk = 'high_risk') as high_risk_customers,
(SELECT ROUND(AVG(predicted_lifetime_value), 2) FROM customer_classification) as avg_customer_lifetime_value,
-- Product insights
(SELECT COUNT(*) FROM product_performance WHERE performance_category = 'star') as star_products,
(SELECT COUNT(*) FROM product_performance WHERE performance_category = 'declining') as declining_products,
(SELECT ROUND(AVG(profit_margin_percent), 2) FROM product_performance WHERE total_revenue > 0) as avg_profit_margin,
-- Strategic recommendations
CASE
WHEN (SELECT AVG(month_over_month_growth) FROM performance_metrics WHERE month_over_month_growth IS NOT NULL) < -10
THEN 'URGENT: Implement revenue recovery strategy'
WHEN (SELECT COUNT(*) FROM customer_classification WHERE churn_risk = 'high_risk') >
(SELECT COUNT(*) FROM customer_classification WHERE rfm_segment = 'champions')
THEN 'FOCUS: Customer retention and re-engagement programs'
WHEN (SELECT COUNT(*) FROM product_performance WHERE performance_category = 'star') < 5
THEN 'OPPORTUNITY: Invest in product development and innovation'
ELSE 'MAINTAIN: Continue current strategies with incremental improvements'
END as primary_strategic_recommendation
UNION ALL
-- Performance trends
SELECT
'PERFORMANCE_TRENDS',
month::text,
product_category,
customer_location.country,
total_revenue,
month_over_month_growth,
trend_classification,
performance_category,
market_share_percent
FROM performance_metrics
WHERE month >= CURRENT_DATE - INTERVAL '6 months'
ORDER BY month DESC, total_revenue DESC
LIMIT 20
UNION ALL
-- Top customer segments
SELECT
'CUSTOMER_SEGMENTS',
rfm_segment,
COUNT(*)::text as customer_count,
ROUND(AVG(monetary_value), 2)::text as avg_lifetime_value,
churn_risk,
engagement_level,
marketing_recommendation
FROM customer_classification
GROUP BY rfm_segment, churn_risk, engagement_level, marketing_recommendation
ORDER BY AVG(monetary_value) DESC
LIMIT 15
UNION ALL
-- Product performance summary
SELECT
'PRODUCT_PERFORMANCE',
product_name,
category,
total_revenue::text,
profit_margin_percent::text,
performance_category,
inventory_turnover::text,
sales_trend,
CASE category_rank WHEN 1 THEN 'Category Leader' ELSE category_rank::text END
FROM product_performance
WHERE total_revenue > 0
ORDER BY total_revenue DESC
LIMIT 25;
-- QueryLeaf provides comprehensive aggregation capabilities:
-- 1. SQL-familiar window functions with OVER clauses and frame specifications
-- 2. Advanced statistical functions including percentiles, standard deviation, and ranking
-- 3. Complex GROUP BY operations with ROLLUP, CUBE, and GROUPING SETS support
-- 4. Sophisticated JOIN operations including LATERAL joins for nested processing
-- 5. CTEs (Common Table Expressions) for complex multi-stage analytical queries
-- 6. CASE expressions and conditional logic for business rule implementation
-- 7. Date/time functions for temporal analysis and time-series processing
-- 8. String and array functions for text processing and data transformation
-- 9. JSON processing functions for nested document analysis and extraction
-- 10. Integration with MongoDB's native aggregation optimizations and indexing
Best Practices for MongoDB Aggregation Implementation
Pipeline Design Principles
Essential guidelines for effective aggregation pipeline construction:
- Early Filtering: Place
$matchstages early to reduce data volume through the pipeline - Index Utilization: Design pipelines to leverage existing indexes for optimal performance
- Stage Ordering: Order stages to minimize computational overhead and data transfer
- Memory Management: Monitor memory usage and use
allowDiskUsefor large datasets - Field Projection: Use
$projectto limit fields and reduce document size early - Pipeline Caching: Cache frequently-used aggregation results for improved performance
Performance Optimization Strategies
Optimize MongoDB aggregation pipelines for production workloads:
- Compound Indexes: Create indexes that support multiple pipeline stages
- Covered Queries: Design pipelines that can be satisfied entirely from indexes
- Parallel Processing: Use multiple concurrent pipelines for independent analyses
- Result Caching: Implement intelligent caching for expensive aggregations
- Incremental Updates: Process only new/changed data for time-series analytics
- Resource Monitoring: Track aggregation performance and optimize accordingly
Conclusion
MongoDB's Aggregation Framework provides comprehensive real-time analytics capabilities that eliminate the need for separate ETL processes, data warehouses, and batch processing systems. The powerful pipeline architecture enables sophisticated data transformations, statistical analysis, and predictive modeling directly within the operational database, delivering immediate insights while maintaining high performance at scale.
Key MongoDB Aggregation Framework benefits include:
- Real-Time Processing: Immediate analytical results without data movement or batch delays
- Advanced Analytics: Comprehensive statistical functions, window operations, and machine learning integration
- Flexible Pipelines: Multi-stage transformations that adapt to evolving analytical requirements
- Scalable Performance: High-performance processing that scales with data volume and complexity
- SQL Compatibility: Familiar analytical operations through QueryLeaf's SQL interface
- Integrated Architecture: Seamless integration with operational workloads and existing applications
Whether you're building real-time dashboards, customer analytics platforms, financial reporting systems, or any application requiring sophisticated data analysis, MongoDB's Aggregation Framework with QueryLeaf's familiar SQL interface provides the foundation for powerful, maintainable analytical solutions.
QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB aggregation operations while providing SQL-familiar analytics syntax, window functions, and statistical operations. Complex analytical queries, predictive models, and real-time insights are seamlessly handled through familiar SQL constructs, making advanced data processing both powerful and accessible to SQL-oriented development teams.
The combination of MongoDB's native aggregation capabilities with SQL-style analytics operations makes MongoDB an ideal platform for applications requiring both operational efficiency and analytical sophistication, ensuring your applications can deliver immediate insights while maintaining optimal performance as data volumes and complexity grow.