MongoDB Aggregation Framework: Advanced Analytics and Real-Time Data Transformations for Enterprise Applications
Modern enterprise applications require sophisticated data processing capabilities that can handle complex transformations, real-time analytics, and multi-stage data aggregations with high performance and scalability. Traditional database approaches often struggle with complex analytical queries, requiring expensive joins, subqueries, and multiple round trips that create performance bottlenecks and operational complexity in production environments.
MongoDB's Aggregation Framework provides comprehensive data processing pipelines that enable sophisticated analytics, transformations, and real-time computations within the database itself. Unlike traditional SQL approaches that require complex joins and expensive operations, MongoDB's aggregation pipelines deliver optimized, single-pass data processing with automatic query optimization, distributed processing capabilities, and native support for complex document transformations.
The Traditional Analytics Challenge
Conventional relational database approaches to complex analytics face significant performance and scalability limitations:
-- Traditional PostgreSQL analytics - complex joins and expensive operations
-- Multi-table sales analytics with complex aggregations
WITH customer_segments AS (
SELECT
c.customer_id,
c.customer_name,
c.email,
c.registration_date,
c.country,
c.state,
-- Customer segmentation logic
CASE
WHEN c.registration_date >= CURRENT_DATE - INTERVAL '90 days' THEN 'new_customer'
WHEN c.last_order_date >= CURRENT_DATE - INTERVAL '30 days' THEN 'active_customer'
WHEN c.last_order_date >= CURRENT_DATE - INTERVAL '180 days' THEN 'returning_customer'
ELSE 'dormant_customer'
END as customer_segment,
-- Calculate customer lifetime metrics
c.total_orders,
c.total_spent,
c.average_order_value,
c.last_order_date,
-- Geographic classification
CASE
WHEN c.country = 'US' THEN 'domestic'
WHEN c.country IN ('CA', 'MX') THEN 'north_america'
WHEN c.country IN ('GB', 'DE', 'FR', 'IT', 'ES') THEN 'europe'
ELSE 'international'
END as geographic_segment
FROM customers c
WHERE c.is_active = true
),
order_analytics AS (
SELECT
o.order_id,
o.customer_id,
o.order_date,
o.order_status,
o.total_amount,
o.discount_amount,
o.tax_amount,
o.shipping_amount,
-- Time-based analytics
DATE_TRUNC('month', o.order_date) as order_month,
DATE_TRUNC('quarter', o.order_date) as order_quarter,
DATE_TRUNC('year', o.order_date) as order_year,
EXTRACT(dow FROM o.order_date) as day_of_week,
EXTRACT(hour FROM o.order_date) as hour_of_day,
-- Order categorization
CASE
WHEN o.total_amount >= 1000 THEN 'high_value'
WHEN o.total_amount >= 500 THEN 'medium_value'
WHEN o.total_amount >= 100 THEN 'low_value'
ELSE 'micro_transaction'
END as order_value_segment,
-- Seasonal analysis
CASE
WHEN EXTRACT(month FROM o.order_date) IN (12, 1, 2) THEN 'winter'
WHEN EXTRACT(month FROM o.order_date) IN (3, 4, 5) THEN 'spring'
WHEN EXTRACT(month FROM o.order_date) IN (6, 7, 8) THEN 'summer'
ELSE 'fall'
END as season,
-- Payment method analysis
o.payment_method,
o.payment_processor,
-- Fulfillment metrics
o.shipping_method,
o.warehouse_id,
EXTRACT(EPOCH FROM (o.shipped_date - o.order_date)) / 86400 as fulfillment_days
FROM orders o
WHERE o.order_date >= CURRENT_DATE - INTERVAL '2 years'
AND o.order_status IN ('completed', 'shipped', 'delivered')
),
product_analytics AS (
SELECT
oi.order_id,
oi.product_id,
p.product_name,
p.category,
p.subcategory,
p.brand,
p.supplier_id,
oi.quantity,
oi.unit_price,
oi.total_price,
oi.discount_amount as item_discount,
-- Product performance metrics
p.cost_per_unit,
(oi.unit_price - p.cost_per_unit) as unit_margin,
(oi.unit_price - p.cost_per_unit) * oi.quantity as total_margin,
-- Product categorization
CASE
WHEN p.category = 'Electronics' THEN 'tech'
WHEN p.category IN ('Clothing', 'Shoes', 'Accessories') THEN 'fashion'
WHEN p.category IN ('Home', 'Garden', 'Furniture') THEN 'home'
ELSE 'other'
END as product_group,
-- Inventory and supply chain
p.current_stock,
p.reorder_level,
CASE
WHEN p.current_stock <= p.reorder_level THEN 'low_stock'
WHEN p.current_stock <= p.reorder_level * 2 THEN 'medium_stock'
ELSE 'high_stock'
END as stock_status,
-- Supplier performance
s.supplier_name,
s.supplier_rating,
s.average_lead_time
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
JOIN suppliers s ON p.supplier_id = s.supplier_id
WHERE p.is_active = true
),
comprehensive_sales_analytics AS (
SELECT
cs.customer_id,
cs.customer_name,
cs.customer_segment,
cs.geographic_segment,
oa.order_id,
oa.order_date,
oa.order_month,
oa.order_quarter,
oa.order_value_segment,
oa.season,
oa.payment_method,
oa.shipping_method,
oa.fulfillment_days,
pa.product_id,
pa.product_name,
pa.category,
pa.brand,
pa.product_group,
pa.quantity,
pa.unit_price,
pa.total_price,
pa.total_margin,
pa.stock_status,
pa.supplier_name,
-- Advanced calculations requiring window functions
SUM(pa.total_price) OVER (
PARTITION BY cs.customer_id, oa.order_month
) as customer_monthly_spend,
AVG(pa.unit_price) OVER (
PARTITION BY pa.category, oa.order_quarter
) as category_avg_price_quarterly,
ROW_NUMBER() OVER (
PARTITION BY cs.customer_id
ORDER BY oa.order_date DESC
) as customer_order_recency,
RANK() OVER (
PARTITION BY oa.order_month
ORDER BY pa.total_margin DESC
) as product_margin_rank_monthly,
-- Complex aggregations with multiple groupings
COUNT(*) OVER (
PARTITION BY cs.geographic_segment, oa.season
) as segment_seasonal_order_count,
SUM(pa.total_price) OVER (
PARTITION BY pa.brand, oa.order_quarter
) as brand_quarterly_revenue
FROM customer_segments cs
JOIN order_analytics oa ON cs.customer_id = oa.customer_id
JOIN product_analytics pa ON oa.order_id = pa.order_id
),
performance_metrics AS (
SELECT
csa.*,
-- Customer behavior analysis
CASE
WHEN customer_order_recency <= 3 THEN 'frequent_buyer'
WHEN customer_order_recency <= 10 THEN 'regular_buyer'
ELSE 'occasional_buyer'
END as buying_frequency,
-- Product performance analysis
CASE
WHEN product_margin_rank_monthly <= 10 THEN 'top_margin_product'
WHEN product_margin_rank_monthly <= 50 THEN 'good_margin_product'
ELSE 'low_margin_product'
END as margin_performance,
-- Market analysis
ROUND(
(customer_monthly_spend / NULLIF(segment_seasonal_order_count::DECIMAL, 0)) * 100,
2
) as customer_segment_contribution_pct,
ROUND(
(brand_quarterly_revenue / SUM(brand_quarterly_revenue) OVER ()) * 100,
2
) as brand_market_share_pct
FROM comprehensive_sales_analytics csa
)
SELECT
-- Dimensional attributes
customer_segment,
geographic_segment,
order_quarter,
season,
product_group,
category,
brand,
payment_method,
shipping_method,
-- Aggregated metrics
COUNT(DISTINCT customer_id) as unique_customers,
COUNT(DISTINCT order_id) as total_orders,
COUNT(DISTINCT product_id) as unique_products,
-- Revenue metrics
SUM(total_price) as total_revenue,
AVG(total_price) as avg_order_value,
SUM(total_margin) as total_margin,
ROUND(AVG(total_margin), 2) as avg_margin_per_item,
ROUND((SUM(total_margin) / SUM(total_price)) * 100, 1) as margin_percentage,
-- Customer metrics
AVG(customer_monthly_spend) as avg_customer_monthly_spend,
COUNT(DISTINCT CASE WHEN buying_frequency = 'frequent_buyer' THEN customer_id END) as frequent_buyers,
COUNT(DISTINCT CASE WHEN buying_frequency = 'regular_buyer' THEN customer_id END) as regular_buyers,
-- Product performance
COUNT(CASE WHEN margin_performance = 'top_margin_product' THEN 1 END) as top_margin_products,
AVG(category_avg_price_quarterly) as avg_category_price,
-- Operational metrics
AVG(fulfillment_days) as avg_fulfillment_days,
COUNT(CASE WHEN stock_status = 'low_stock' THEN 1 END) as low_stock_items,
COUNT(DISTINCT supplier_name) as unique_suppliers,
-- Time-based trends
AVG(brand_market_share_pct) as avg_brand_market_share,
ROUND(AVG(customer_segment_contribution_pct), 1) as avg_segment_contribution,
-- Growth indicators (comparing to previous period)
LAG(SUM(total_price)) OVER (
PARTITION BY customer_segment, geographic_segment, product_group
ORDER BY order_quarter
) as prev_quarter_revenue,
ROUND(
((SUM(total_price) - LAG(SUM(total_price)) OVER (
PARTITION BY customer_segment, geographic_segment, product_group
ORDER BY order_quarter
)) / NULLIF(LAG(SUM(total_price)) OVER (
PARTITION BY customer_segment, geographic_segment, product_group
ORDER BY order_quarter
), 0)) * 100,
1
) as revenue_growth_pct
FROM performance_metrics
GROUP BY
customer_segment, geographic_segment, order_quarter, season,
product_group, category, brand, payment_method, shipping_method
HAVING
COUNT(DISTINCT customer_id) >= 10 -- Filter for statistical significance
AND SUM(total_price) >= 1000 -- Minimum revenue threshold
ORDER BY
order_quarter DESC,
total_revenue DESC,
unique_customers DESC
LIMIT 1000;
-- Problems with traditional SQL analytics approach:
-- 1. Extremely complex query structure with multiple CTEs and window functions
-- 2. Expensive JOIN operations across multiple large tables
-- 3. Poor performance due to multiple aggregation passes
-- 4. Limited support for nested data structures and arrays
-- 5. Difficult to maintain and modify complex analytical logic
-- 6. Memory-intensive operations with large intermediate result sets
-- 7. No native support for document-based data transformations
-- 8. Complex indexing requirements for optimal performance
-- 9. Difficult real-time processing due to query complexity
-- 10. Limited horizontal scaling for large analytical workloads
-- MySQL analytical limitations (even more restrictive)
SELECT
c.customer_segment,
DATE_FORMAT(o.order_date, '%Y-%m') as order_month,
COUNT(DISTINCT o.customer_id) as customers,
SUM(o.total_amount) as revenue,
AVG(o.total_amount) as avg_order_value
FROM (
SELECT
customer_id,
CASE
WHEN registration_date >= DATE_SUB(NOW(), INTERVAL 90 DAY) THEN 'new'
WHEN last_order_date >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 'active'
ELSE 'dormant'
END as customer_segment
FROM customers
) c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.order_date >= DATE_SUB(NOW(), INTERVAL 1 YEAR)
GROUP BY c.customer_segment, DATE_FORMAT(o.order_date, '%Y-%m')
ORDER BY order_month DESC, revenue DESC;
-- MySQL limitations for analytics:
-- - No window functions in older versions (pre-8.0)
-- - Limited CTE support
-- - Poor JSON handling for complex nested data
-- - Basic aggregation functions only
-- - No advanced analytical functions
-- - Limited support for complex data transformations
-- - Poor performance with large analytical queries
-- - No native support for real-time streaming analytics
MongoDB Aggregation Framework provides powerful, optimized data processing pipelines:
// MongoDB Aggregation Framework - Comprehensive analytics and data transformation
const { MongoClient, ObjectId } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('enterprise_analytics');
// Comprehensive Enterprise Analytics with MongoDB Aggregation Framework
class AdvancedAnalyticsProcessor {
constructor(db) {
this.db = db;
this.collections = {
customers: db.collection('customers'),
orders: db.collection('orders'),
products: db.collection('products'),
analytics: db.collection('analytics_results'),
realTimeMetrics: db.collection('real_time_metrics')
};
// Performance optimization settings
this.aggregationOptions = {
allowDiskUse: true,
maxTimeMS: 300000, // 5 minutes timeout
hint: null, // Will be set dynamically based on query
explain: false,
comment: 'enterprise_analytics_query'
};
this.setupAnalyticsIndexes();
}
async setupAnalyticsIndexes() {
console.log('Setting up optimized indexes for analytics...');
try {
// Customer collection indexes
await this.collections.customers.createIndexes([
{ key: { customerId: 1 }, background: true, name: 'customer_id_idx' },
{ key: { registrationDate: -1, customerSegment: 1 }, background: true, name: 'registration_segment_idx' },
{ key: { 'address.country': 1, 'address.state': 1 }, background: true, name: 'geographic_idx' },
{ key: { loyaltyTier: 1, totalSpent: -1 }, background: true, name: 'loyalty_spending_idx' },
{ key: { lastOrderDate: -1, isActive: 1 }, background: true, name: 'activity_idx' }
]);
// Orders collection indexes
await this.collections.orders.createIndexes([
{ key: { customerId: 1, orderDate: -1 }, background: true, name: 'customer_date_idx' },
{ key: { orderDate: -1, status: 1 }, background: true, name: 'date_status_idx' },
{ key: { 'financial.total': -1, orderDate: -1 }, background: true, name: 'value_date_idx' },
{ key: { 'items.productId': 1, orderDate: -1 }, background: true, name: 'product_date_idx' },
{ key: { 'shipping.region': 1, orderDate: -1 }, background: true, name: 'region_date_idx' }
]);
// Products collection indexes
await this.collections.products.createIndexes([
{ key: { productId: 1 }, background: true, name: 'product_id_idx' },
{ key: { category: 1, subcategory: 1 }, background: true, name: 'category_idx' },
{ key: { brand: 1, 'pricing.currentPrice': -1 }, background: true, name: 'brand_price_idx' },
{ key: { 'inventory.currentStock': 1, 'inventory.reorderLevel': 1 }, background: true, name: 'inventory_idx' },
{ key: { supplierId: 1, isActive: 1 }, background: true, name: 'supplier_active_idx' }
]);
console.log('Analytics indexes created successfully');
} catch (error) {
console.error('Error creating analytics indexes:', error);
}
}
async performComprehensiveCustomerAnalytics(timeRange = 'last_12_months', customerSegments = null) {
console.log(`Performing comprehensive customer analytics for ${timeRange}...`);
const startTime = Date.now();
// Calculate date range
const dateRanges = {
'last_30_days': new Date(Date.now() - 30 * 24 * 60 * 60 * 1000),
'last_90_days': new Date(Date.now() - 90 * 24 * 60 * 60 * 1000),
'last_6_months': new Date(Date.now() - 6 * 30 * 24 * 60 * 60 * 1000),
'last_12_months': new Date(Date.now() - 12 * 30 * 24 * 60 * 60 * 1000),
'last_2_years': new Date(Date.now() - 2 * 365 * 24 * 60 * 60 * 1000)
};
const startDate = dateRanges[timeRange] || dateRanges['last_12_months'];
const pipeline = [
// Stage 1: Match orders within time range
{
$match: {
orderDate: { $gte: startDate },
status: { $in: ['completed', 'shipped', 'delivered'] }
}
},
// Stage 2: Lookup customer information
{
$lookup: {
from: 'customers',
localField: 'customerId',
foreignField: 'customerId',
as: 'customer'
}
},
// Stage 3: Unwind customer array (should be single document)
{
$unwind: '$customer'
},
// Stage 4: Filter by customer segments if specified
...(customerSegments ? [{
$match: {
'customer.segment': { $in: customerSegments }
}
}] : []),
// Stage 5: Lookup product information for each order item
{
$lookup: {
from: 'products',
localField: 'items.productId',
foreignField: 'productId',
as: 'productDetails'
}
},
// Stage 6: Add comprehensive calculated fields
{
$addFields: {
// Time-based dimensions
orderMonth: {
$dateTrunc: {
date: '$orderDate',
unit: 'month'
}
},
orderQuarter: {
$concat: [
{ $toString: { $year: '$orderDate' } },
'-Q',
{ $toString: {
$ceil: { $divide: [{ $month: '$orderDate' }, 3] }
}}
]
},
orderYear: { $year: '$orderDate' },
dayOfWeek: { $dayOfWeek: '$orderDate' },
hourOfDay: { $hour: '$orderDate' },
// Seasonal classification
season: {
$switch: {
branches: [
{
case: { $in: [{ $month: '$orderDate' }, [12, 1, 2]] },
then: 'winter'
},
{
case: { $in: [{ $month: '$orderDate' }, [3, 4, 5]] },
then: 'spring'
},
{
case: { $in: [{ $month: '$orderDate' }, [6, 7, 8]] },
then: 'summer'
}
],
default: 'fall'
}
},
// Customer segmentation
customerSegment: {
$switch: {
branches: [
{
case: {
$gte: [
'$customer.registrationDate',
new Date(Date.now() - 90 * 24 * 60 * 60 * 1000)
]
},
then: 'new_customer'
},
{
case: {
$gte: [
'$customer.lastOrderDate',
new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
]
},
then: 'active_customer'
},
{
case: {
$gte: [
'$customer.lastOrderDate',
new Date(Date.now() - 180 * 24 * 60 * 60 * 1000)
]
},
then: 'returning_customer'
}
],
default: 'dormant_customer'
}
},
// Geographic classification
geographicSegment: {
$switch: {
branches: [
{
case: { $eq: ['$customer.address.country', 'US'] },
then: 'domestic'
},
{
case: { $in: ['$customer.address.country', ['CA', 'MX']] },
then: 'north_america'
},
{
case: { $in: ['$customer.address.country', ['GB', 'DE', 'FR', 'IT', 'ES']] },
then: 'europe'
}
],
default: 'international'
}
},
// Order value classification
orderValueSegment: {
$switch: {
branches: [
{
case: { $gte: ['$financial.total', 1000] },
then: 'high_value'
},
{
case: { $gte: ['$financial.total', 500] },
then: 'medium_value'
},
{
case: { $gte: ['$financial.total', 100] },
then: 'low_value'
}
],
default: 'micro_transaction'
}
},
// Enhanced item analysis with product details
enrichedItems: {
$map: {
input: '$items',
as: 'item',
in: {
$mergeObjects: [
'$$item',
{
productDetails: {
$arrayElemAt: [
{
$filter: {
input: '$productDetails',
cond: { $eq: ['$$this.productId', '$$item.productId'] }
}
},
0
]
}
},
{
// Calculate margins and performance metrics
unitMargin: {
$subtract: [
'$$item.unitPrice',
{
$arrayElemAt: [
{
$map: {
input: {
$filter: {
input: '$productDetails',
cond: { $eq: ['$$this.productId', '$$item.productId'] }
}
},
in: '$$this.costPerUnit'
}
},
0
]
}
]
},
categoryGroup: {
$let: {
vars: {
category: {
$arrayElemAt: [
{
$map: {
input: {
$filter: {
input: '$productDetails',
cond: { $eq: ['$$this.productId', '$$item.productId'] }
}
},
in: '$$this.category'
}
},
0
]
}
},
in: {
$switch: {
branches: [
{ case: { $eq: ['$$category', 'Electronics'] }, then: 'tech' },
{ case: { $in: ['$$category', ['Clothing', 'Shoes', 'Accessories']] }, then: 'fashion' },
{ case: { $in: ['$$category', ['Home', 'Garden', 'Furniture']] }, then: 'home' }
],
default: 'other'
}
}
}
}
}
]
}
}
},
// Customer lifetime metrics (approximation)
estimatedCustomerValue: {
$multiply: [
'$financial.total',
{ $add: ['$customer.averageOrdersPerYear', 1] }
]
},
// Fulfillment performance
fulfillmentDays: {
$cond: {
if: { $and: ['$fulfillment.shippedAt', '$orderDate'] },
then: {
$divide: [
{ $subtract: ['$fulfillment.shippedAt', '$orderDate'] },
86400000 // Convert milliseconds to days
]
},
else: null
}
}
}
},
// Stage 7: Group by multiple dimensions for comprehensive analytics
{
$group: {
_id: {
customerSegment: '$customerSegment',
geographicSegment: '$geographicSegment',
orderMonth: '$orderMonth',
orderQuarter: '$orderQuarter',
season: '$season',
orderValueSegment: '$orderValueSegment'
},
// Customer metrics
uniqueCustomers: { $addToSet: '$customerId' },
totalOrders: { $sum: 1 },
// Financial metrics
totalRevenue: { $sum: '$financial.total' },
totalDiscount: { $sum: '$financial.discount' },
totalTax: { $sum: '$financial.tax' },
totalShipping: { $sum: '$financial.shipping' },
// Order value statistics
avgOrderValue: { $avg: '$financial.total' },
maxOrderValue: { $max: '$financial.total' },
minOrderValue: { $min: '$financial.total' },
// Product and item metrics
totalItems: { $sum: { $size: '$items' } },
avgItemsPerOrder: { $avg: { $size: '$items' } },
uniqueProducts: {
$addToSet: {
$reduce: {
input: '$items',
initialValue: [],
in: { $concatArrays: ['$$value', ['$$this.productId']] }
}
}
},
// Category distribution
categoryBreakdown: {
$push: {
$map: {
input: '$enrichedItems',
in: '$$this.categoryGroup'
}
}
},
// Customer behavior metrics
avgCustomerValue: { $avg: '$estimatedCustomerValue' },
loyaltyTierDistribution: { $push: '$customer.loyaltyTier' },
// Operational metrics
avgFulfillmentDays: { $avg: '$fulfillmentDays' },
paymentMethodDistribution: { $push: '$payment.method' },
shippingMethodDistribution: { $push: '$shipping.method' },
// Geographic insights
stateDistribution: { $push: '$customer.address.state' },
countryDistribution: { $push: '$customer.address.country' },
// Time-based patterns
dayOfWeekDistribution: { $push: '$dayOfWeek' },
hourOfDayDistribution: { $push: '$hourOfDay' },
// Customer acquisition and retention
newCustomersCount: {
$sum: {
$cond: [{ $eq: ['$customerSegment', 'new_customer'] }, 1, 0]
}
},
returningCustomersCount: {
$sum: {
$cond: [{ $eq: ['$customerSegment', 'returning_customer'] }, 1, 0]
}
},
// First and last order dates for trend analysis
firstOrderDate: { $min: '$orderDate' },
lastOrderDate: { $max: '$orderDate' }
}
},
// Stage 8: Calculate derived metrics and insights
{
$addFields: {
// Calculate actual unique counts
uniqueCustomerCount: { $size: '$uniqueCustomers' },
uniqueProductCount: {
$size: {
$reduce: {
input: '$uniqueProducts',
initialValue: [],
in: { $setUnion: ['$$value', '$$this'] }
}
}
},
// Revenue per customer
revenuePerCustomer: {
$cond: {
if: { $gt: [{ $size: '$uniqueCustomers' }, 0] },
then: { $divide: ['$totalRevenue', { $size: '$uniqueCustomers' }] },
else: 0
}
},
// Margin analysis
grossMargin: { $subtract: ['$totalRevenue', '$totalDiscount'] },
marginPercentage: {
$multiply: [
{ $divide: [{ $subtract: ['$totalRevenue', '$totalDiscount'] }, '$totalRevenue'] },
100
]
},
// Category insights
topCategories: {
$slice: [
{
$map: {
input: {
$sortArray: {
input: {
$objectToArray: {
$reduce: {
input: {
$reduce: {
input: '$categoryBreakdown',
initialValue: [],
in: { $concatArrays: ['$$value', '$$this'] }
}
},
initialValue: {},
in: {
$mergeObjects: [
'$$value',
{ ['$$this']: { $add: [{ $ifNull: [{ $getField: ['$$this', '$$value'] }, 0] }, 1] } }
]
}
}
}
},
sortBy: { v: -1 }
}
},
in: { category: '$$this.k', count: '$$this.v' }
}
},
5 // Top 5 categories
]
},
// Customer distribution insights
customerSegmentMetrics: {
newCustomerPercentage: {
$multiply: [
{ $divide: ['$newCustomersCount', '$totalOrders'] },
100
]
},
returningCustomerPercentage: {
$multiply: [
{ $divide: ['$returningCustomersCount', '$totalOrders'] },
100
]
}
},
// Time range analysis
analysisPeriodDays: {
$divide: [
{ $subtract: ['$lastOrderDate', '$firstOrderDate'] },
86400000 // Convert to days
]
},
// Performance indicators
performanceMetrics: {
ordersPerDay: {
$divide: [
'$totalOrders',
{ $divide: [{ $subtract: ['$lastOrderDate', '$firstOrderDate'] }, 86400000] }
]
},
avgRevenuePerDay: {
$divide: [
'$totalRevenue',
{ $divide: [{ $subtract: ['$lastOrderDate', '$firstOrderDate'] }, 86400000] }
]
}
}
}
},
// Stage 9: Project final results with clean structure
{
$project: {
_id: 0,
// Dimensions
dimensions: '$_id',
// Core metrics
metrics: {
customers: {
total: '$uniqueCustomerCount',
new: '$newCustomersCount',
returning: '$returningCustomersCount',
newPercentage: '$customerSegmentMetrics.newCustomerPercentage',
returningPercentage: '$customerSegmentMetrics.returningCustomerPercentage'
},
orders: {
total: '$totalOrders',
averageValue: '$avgOrderValue',
maxValue: '$maxOrderValue',
minValue: '$minOrderValue',
itemsPerOrder: '$avgItemsPerOrder'
},
revenue: {
total: '$totalRevenue',
gross: '$grossMargin',
marginPercentage: '$marginPercentage',
revenuePerCustomer: '$revenuePerCustomer',
totalDiscount: '$totalDiscount',
totalTax: '$totalTax',
totalShipping: '$totalShipping'
},
products: {
uniqueCount: '$uniqueProductCount',
totalItems: '$totalItems',
topCategories: '$topCategories'
},
operations: {
avgFulfillmentDays: '$avgFulfillmentDays',
analysisPeriodDays: '$analysisPeriodDays'
},
performance: '$performanceMetrics'
},
// Insights and distributions
insights: {
loyaltyTiers: '$loyaltyTierDistribution',
paymentMethods: '$paymentMethodDistribution',
shippingMethods: '$shippingMethodDistribution',
geographic: {
states: '$stateDistribution',
countries: '$countryDistribution'
},
temporal: {
daysOfWeek: '$dayOfWeekDistribution',
hoursOfDay: '$hourOfDayDistribution'
}
},
// Time range
timeRange: {
startDate: '$firstOrderDate',
endDate: '$lastOrderDate'
}
}
},
// Stage 10: Sort results by significance
{
$sort: {
'metrics.revenue.total': -1,
'metrics.customers.total': -1
}
}
];
try {
const results = await this.collections.orders.aggregate(pipeline, this.aggregationOptions).toArray();
const processingTime = Date.now() - startTime;
console.log(`Customer analytics completed in ${processingTime}ms, found ${results.length} result groups`);
return {
success: true,
processingTimeMs: processingTime,
timeRange: timeRange,
resultCount: results.length,
analytics: results,
metadata: {
queryComplexity: 'high',
stagesCount: pipeline.length,
indexesUsed: 'multiple_compound_indexes',
aggregationFeatures: [
'lookup_joins',
'complex_expressions',
'grouping_aggregations',
'conditional_logic',
'array_operations',
'date_functions',
'mathematical_operations'
]
}
};
} catch (error) {
console.error('Error performing customer analytics:', error);
return {
success: false,
error: error.message,
processingTimeMs: Date.now() - startTime
};
}
}
async performRealTimeProductAnalytics(refreshInterval = 60000) {
console.log('Starting real-time product performance analytics...');
const pipeline = [
// Stage 1: Match recent orders (last 24 hours)
{
$match: {
orderDate: {
$gte: new Date(Date.now() - 24 * 60 * 60 * 1000)
},
status: { $in: ['completed', 'processing', 'shipped'] }
}
},
// Stage 2: Unwind order items for item-level analysis
{
$unwind: '$items'
},
// Stage 3: Lookup product details
{
$lookup: {
from: 'products',
localField: 'items.productId',
foreignField: 'productId',
as: 'product'
}
},
// Stage 4: Unwind product (should be single document)
{
$unwind: '$product'
},
// Stage 5: Lookup current inventory levels
{
$lookup: {
from: 'inventory',
localField: 'items.productId',
foreignField: 'productId',
as: 'inventory'
}
},
// Stage 6: Add calculated fields for real-time metrics
{
$addFields: {
// Time buckets for real-time analysis
hourBucket: {
$dateTrunc: {
date: '$orderDate',
unit: 'hour'
}
},
// Product performance metrics
itemRevenue: { $multiply: ['$items.quantity', '$items.unitPrice'] },
itemMargin: {
$multiply: [
'$items.quantity',
{ $subtract: ['$items.unitPrice', '$product.costPerUnit'] }
]
},
// Inventory status
currentStock: { $arrayElemAt: ['$inventory.currentStock', 0] },
reorderLevel: { $arrayElemAt: ['$inventory.reorderLevel', 0] },
// Product categorization
categoryGroup: {
$switch: {
branches: [
{ case: { $eq: ['$product.category', 'Electronics'] }, then: 'tech' },
{ case: { $in: ['$product.category', ['Clothing', 'Shoes']] }, then: 'fashion' },
{ case: { $in: ['$product.category', ['Home', 'Garden']] }, then: 'home' }
],
default: 'other'
}
},
// Price performance
pricePoint: {
$switch: {
branches: [
{ case: { $gte: ['$items.unitPrice', 500] }, then: 'premium' },
{ case: { $gte: ['$items.unitPrice', 100] }, then: 'mid_range' },
{ case: { $gte: ['$items.unitPrice', 25] }, then: 'budget' }
],
default: 'economy'
}
},
// Velocity indicators
orderRecency: {
$divide: [
{ $subtract: [new Date(), '$orderDate'] },
3600000 // Convert to hours
]
}
}
},
// Stage 7: Group by product and time buckets for real-time aggregation
{
$group: {
_id: {
productId: '$items.productId',
productName: '$product.name',
category: '$product.category',
categoryGroup: '$categoryGroup',
brand: '$product.brand',
pricePoint: '$pricePoint',
hourBucket: '$hourBucket'
},
// Sales metrics
totalQuantitySold: { $sum: '$items.quantity' },
totalRevenue: { $sum: '$itemRevenue' },
totalMargin: { $sum: '$itemMargin' },
uniqueOrders: { $addToSet: '$_id' },
avgOrderQuantity: { $avg: '$items.quantity' },
// Pricing metrics
avgSellingPrice: { $avg: '$items.unitPrice' },
maxSellingPrice: { $max: '$items.unitPrice' },
minSellingPrice: { $min: '$items.unitPrice' },
// Inventory insights
currentStockLevel: { $first: '$currentStock' },
reorderThreshold: { $first: '$reorderLevel' },
// Time-based insights
avgOrderRecency: { $avg: '$orderRecency' },
latestOrderTime: { $max: '$orderDate' },
earliestOrderTime: { $min: '$orderDate' },
// Customer insights
uniqueCustomers: { $addToSet: '$customerId' },
// Geographic distribution
regions: { $addToSet: '$customer.address.state' },
// Order characteristics
avgOrderValue: { $avg: '$financial.total' },
shippingMethodsUsed: { $addToSet: '$shipping.method' }
}
},
// Stage 8: Calculate performance indicators and rankings
{
$addFields: {
// Performance calculations
marginPercentage: {
$cond: {
if: { $gt: ['$totalRevenue', 0] },
then: { $multiply: [{ $divide: ['$totalMargin', '$totalRevenue'] }, 100] },
else: 0
}
},
uniqueOrderCount: { $size: '$uniqueOrders' },
uniqueCustomerCount: { $size: '$uniqueCustomers' },
// Inventory health
stockStatus: {
$switch: {
branches: [
{
case: { $lte: ['$currentStockLevel', 0] },
then: 'out_of_stock'
},
{
case: { $lte: ['$currentStockLevel', '$reorderThreshold'] },
then: 'low_stock'
},
{
case: { $lte: ['$currentStockLevel', { $multiply: ['$reorderThreshold', 2] }] },
then: 'medium_stock'
}
],
default: 'high_stock'
}
},
// Velocity metrics
salesVelocity: {
$divide: ['$totalQuantitySold', { $max: ['$avgOrderRecency', 1] }]
},
// Customer engagement
customerRetention: {
$divide: ['$uniqueCustomerCount', '$uniqueOrderCount']
},
// Regional penetration
regionalReach: { $size: '$regions' }
}
},
// Stage 9: Add ranking and performance classification
{
$setWindowFields: {
sortBy: { totalRevenue: -1 },
output: {
revenueRank: { $rank: {} },
revenuePercentile: { $percentRank: {} }
}
}
},
{
$setWindowFields: {
partitionBy: '$_id.categoryGroup',
sortBy: { totalQuantitySold: -1 },
output: {
categoryRank: { $rank: {} }
}
}
},
// Stage 10: Add performance classification
{
$addFields: {
performanceClassification: {
$switch: {
branches: [
{
case: {
$and: [
{ $lte: ['$revenueRank', 10] },
{ $gt: ['$marginPercentage', 20] },
{ $gt: ['$uniqueCustomerCount', 5] }
]
},
then: 'star_performer'
},
{
case: {
$and: [
{ $lte: ['$revenueRank', 50] },
{ $gt: ['$marginPercentage', 15] }
]
},
then: 'strong_performer'
},
{
case: {
$and: [
{ $gt: ['$totalRevenue', 100] },
{ $gt: ['$marginPercentage', 10] }
]
},
then: 'solid_performer'
},
{
case: { $lte: ['$totalRevenue', 50] },
then: 'low_performer'
}
],
default: 'average_performer'
}
},
// Action recommendations
recommendations: {
$switch: {
branches: [
{
case: { $eq: ['$stockStatus', 'out_of_stock'] },
then: ['urgent_restock', 'review_demand_forecast']
},
{
case: { $eq: ['$stockStatus', 'low_stock'] },
then: ['schedule_restock', 'monitor_sales_velocity']
},
{
case: {
$and: [
{ $eq: ['$performanceClassification', 'star_performer'] },
{ $gt: ['$currentStockLevel', '$reorderThreshold'] }
]
},
then: ['increase_marketing', 'optimize_pricing', 'expand_availability']
},
{
case: { $eq: ['$performanceClassification', 'low_performer'] },
then: ['review_pricing', 'improve_marketing', 'consider_discontinuation']
}
],
default: ['monitor_performance', 'optimize_inventory_levels']
}
},
// Real-time alerts
alerts: {
$filter: {
input: [
{
$cond: {
if: { $eq: ['$stockStatus', 'out_of_stock'] },
then: {
type: 'critical',
message: 'Product is out of stock with active sales',
priority: 'high'
},
else: null
}
},
{
$cond: {
if: {
$and: [
{ $eq: ['$performanceClassification', 'star_performer'] },
{ $eq: ['$stockStatus', 'low_stock'] }
]
},
then: {
type: 'opportunity',
message: 'High-performing product running low on stock',
priority: 'medium'
},
else: null
}
},
{
$cond: {
if: { $lt: ['$marginPercentage', 5] },
then: {
type: 'margin_concern',
message: 'Product margin below threshold',
priority: 'low'
},
else: null
}
}
],
cond: { $ne: ['$$this', null] }
}
}
}
},
// Stage 11: Final projection with structured output
{
$project: {
_id: 0,
// Product identification
product: {
id: '$_id.productId',
name: '$_id.productName',
category: '$_id.category',
categoryGroup: '$_id.categoryGroup',
brand: '$_id.brand',
pricePoint: '$_id.pricePoint'
},
// Time context
timeContext: {
hourBucket: '$_id.hourBucket',
latestOrder: '$latestOrderTime',
earliestOrder: '$earliestOrderTime',
avgOrderRecencyHours: '$avgOrderRecency'
},
// Performance metrics
performance: {
totalQuantitySold: '$totalQuantitySold',
totalRevenue: { $round: ['$totalRevenue', 2] },
totalMargin: { $round: ['$totalMargin', 2] },
marginPercentage: { $round: ['$marginPercentage', 1] },
uniqueOrders: '$uniqueOrderCount',
uniqueCustomers: '$uniqueCustomerCount',
avgOrderQuantity: { $round: ['$avgOrderQuantity', 2] },
salesVelocity: { $round: ['$salesVelocity', 3] },
customerRetention: { $round: ['$customerRetention', 3] }
},
// Pricing insights
pricing: {
avgSellingPrice: { $round: ['$avgSellingPrice', 2] },
maxSellingPrice: '$maxSellingPrice',
minSellingPrice: '$minSellingPrice',
priceVariation: { $subtract: ['$maxSellingPrice', '$minSellingPrice'] }
},
// Inventory status
inventory: {
currentStock: '$currentStockLevel',
reorderLevel: '$reorderThreshold',
stockStatus: '$stockStatus',
stockTurnover: {
$cond: {
if: { $gt: ['$currentStockLevel', 0] },
then: { $divide: ['$totalQuantitySold', '$currentStockLevel'] },
else: null
}
}
},
// Market position
marketPosition: {
revenueRank: '$revenueRank',
revenuePercentile: { $round: ['$revenuePercentile', 3] },
categoryRank: '$categoryRank',
performanceClass: '$performanceClassification'
},
// Geographic and market reach
marketReach: {
regionalReach: '$regionalReach',
regions: '$regions',
avgOrderValue: { $round: ['$avgOrderValue', 2] },
shippingMethods: '$shippingMethodsUsed'
},
// Actionable insights
insights: {
recommendations: '$recommendations',
alerts: '$alerts',
// Key insights derived from data
keyInsights: {
$filter: {
input: [
{
$cond: {
if: { $gt: ['$uniqueCustomerCount', 10] },
then: 'High customer engagement - good repeat purchase potential',
else: null
}
},
{
$cond: {
if: { $gt: ['$regionalReach', 5] },
then: 'Strong geographic distribution - consider expanding marketing',
else: null
}
},
{
$cond: {
if: { $gt: ['$salesVelocity', 1] },
then: 'Fast-moving product - ensure adequate inventory levels',
else: null
}
}
],
cond: { $ne: ['$$this', null] }
}
}
}
}
},
// Stage 12: Sort by performance and significance
{
$sort: {
'performance.totalRevenue': -1,
'performance.uniqueCustomers': -1,
'inventory.stockTurnover': -1
}
},
// Stage 13: Limit to top performers for real-time display
{
$limit: 100
}
];
try {
const results = await this.collections.orders.aggregate(pipeline, {
...this.aggregationOptions,
maxTimeMS: 30000 // Shorter timeout for real-time queries
}).toArray();
// Store results for real-time dashboard
await this.collections.realTimeMetrics.replaceOne(
{ type: 'product_performance' },
{
type: 'product_performance',
timestamp: new Date(),
refreshInterval: refreshInterval,
dataCount: results.length,
data: results
},
{ upsert: true }
);
console.log(`Real-time product analytics completed: ${results.length} products analyzed`);
return {
success: true,
timestamp: new Date(),
productCount: results.length,
analytics: results,
summary: {
totalRevenue: results.reduce((sum, product) => sum + product.performance.totalRevenue, 0),
totalQuantitySold: results.reduce((sum, product) => sum + product.performance.totalQuantitySold, 0),
avgMarginPercentage: results.reduce((sum, product) => sum + product.performance.marginPercentage, 0) / results.length,
outOfStockProducts: results.filter(product => product.inventory.stockStatus === 'out_of_stock').length,
starPerformers: results.filter(product => product.marketPosition.performanceClass === 'star_performer').length,
criticalAlerts: results.reduce((sum, product) =>
sum + product.insights.alerts.filter(alert => alert.priority === 'high').length, 0
)
}
};
} catch (error) {
console.error('Error performing real-time product analytics:', error);
return {
success: false,
error: error.message,
timestamp: new Date()
};
}
}
async performAdvancedCohortAnalysis(cohortType = 'monthly', lookbackPeriods = 12) {
console.log(`Performing ${cohortType} cohort analysis for ${lookbackPeriods} periods...`);
const startTime = Date.now();
// Calculate cohort periods based on type
const cohortConfig = {
'weekly': { unit: 'week', periodMs: 7 * 24 * 60 * 60 * 1000 },
'monthly': { unit: 'month', periodMs: 30 * 24 * 60 * 60 * 1000 },
'quarterly': { unit: 'quarter', periodMs: 90 * 24 * 60 * 60 * 1000 }
};
const config = cohortConfig[cohortType] || cohortConfig['monthly'];
const startDate = new Date(Date.now() - lookbackPeriods * config.periodMs);
const pipeline = [
// Stage 1: Get all customer first orders to establish cohorts
{
$match: {
orderDate: { $gte: startDate },
status: { $in: ['completed', 'shipped', 'delivered'] }
}
},
// Stage 2: Get customer first order dates
{
$group: {
_id: '$customerId',
firstOrderDate: { $min: '$orderDate' },
allOrders: {
$push: {
orderId: '$_id',
orderDate: '$orderDate',
total: '$financial.total',
items: '$items'
}
},
totalOrders: { $sum: 1 },
totalSpent: { $sum: '$financial.total' },
lastOrderDate: { $max: '$orderDate' }
}
},
// Stage 3: Calculate cohort membership and period analysis
{
$addFields: {
// Determine which cohort this customer belongs to
cohortPeriod: {
$dateTrunc: {
date: '$firstOrderDate',
unit: config.unit
}
},
// Calculate customer lifetime span
lifetimeSpanDays: {
$divide: [
{ $subtract: ['$lastOrderDate', '$firstOrderDate'] },
86400000
]
},
// Analyze orders by period
ordersByPeriod: {
$map: {
input: '$allOrders',
as: 'order',
in: {
$mergeObjects: [
'$$order',
{
orderPeriod: {
$dateTrunc: {
date: '$$order.orderDate',
unit: config.unit
}
},
periodsFromFirstOrder: {
$divide: [
{
$subtract: [
{
$dateTrunc: {
date: '$$order.orderDate',
unit: config.unit
}
},
{
$dateTrunc: {
date: '$firstOrderDate',
unit: config.unit
}
}
]
},
config.periodMs
]
}
}
]
}
}
}
}
},
// Stage 4: Unwind orders to analyze period-by-period behavior
{
$unwind: '$ordersByPeriod'
},
// Stage 5: Group by cohort and period for retention analysis
{
$group: {
_id: {
cohortPeriod: '$cohortPeriod',
orderPeriod: '$ordersByPeriod.orderPeriod',
periodsFromFirst: {
$floor: '$ordersByPeriod.periodsFromFirstOrder'
}
},
// Customer retention metrics
activeCustomers: { $addToSet: '$_id' },
totalOrders: { $sum: 1 },
totalRevenue: { $sum: '$ordersByPeriod.total' },
avgOrderValue: { $avg: '$ordersByPeriod.total' },
// Customer behavior metrics
avgLifetimeSpan: { $avg: '$lifetimeSpanDays' },
totalCustomerLifetimeValue: { $avg: '$totalSpent' },
avgOrdersPerCustomer: { $avg: '$totalOrders' },
// Period-specific insights
newCustomersInPeriod: {
$sum: {
$cond: [
{ $eq: ['$ordersByPeriod.periodsFromFirstOrder', 0] },
1,
0
]
}
},
// Revenue distribution
revenueDistribution: {
$push: '$ordersByPeriod.total'
},
// Order frequency analysis
orderFrequencyDistribution: {
$push: '$totalOrders'
}
}
},
// Stage 6: Calculate cohort size (initial customers in each cohort)
{
$lookup: {
from: 'orders',
pipeline: [
{
$match: {
orderDate: { $gte: startDate },
status: { $in: ['completed', 'shipped', 'delivered'] }
}
},
{
$group: {
_id: '$customerId',
firstOrderDate: { $min: '$orderDate' }
}
},
{
$addFields: {
cohortPeriod: {
$dateTrunc: {
date: '$firstOrderDate',
unit: config.unit
}
}
}
},
{
$group: {
_id: '$cohortPeriod',
cohortSize: { $sum: 1 }
}
}
],
as: 'cohortSizes'
}
},
// Stage 7: Add cohort size information
{
$addFields: {
cohortSize: {
$let: {
vars: {
matchingCohort: {
$arrayElemAt: [
{
$filter: {
input: '$cohortSizes',
cond: { $eq: ['$$this._id', '$_id.cohortPeriod'] }
}
},
0
]
}
},
in: '$$matchingCohort.cohortSize'
}
},
activeCustomerCount: { $size: '$activeCustomers' },
// Calculate retention rate
retentionRate: {
$let: {
vars: {
cohortSize: {
$arrayElemAt: [
{
$map: {
input: {
$filter: {
input: '$cohortSizes',
cond: { $eq: ['$$this._id', '$_id.cohortPeriod'] }
}
},
in: '$$this.cohortSize'
}
},
0
]
}
},
in: {
$multiply: [
{ $divide: [{ $size: '$activeCustomers' }, '$$cohortSize'] },
100
]
}
}
}
}
},
// Stage 8: Calculate advanced cohort metrics
{
$addFields: {
// Revenue per customer in this period
revenuePerCustomer: {
$divide: ['$totalRevenue', '$activeCustomerCount']
},
// Customer engagement score
engagementScore: {
$multiply: [
{ $divide: ['$totalOrders', '$activeCustomerCount'] },
{ $divide: ['$retentionRate', 100] }
]
},
// Revenue distribution analysis
revenueMetrics: {
median: {
$arrayElemAt: [
{
$sortArray: {
input: '$revenueDistribution',
sortBy: 1
}
},
{ $floor: { $divide: [{ $size: '$revenueDistribution' }, 2] } }
]
},
total: '$totalRevenue',
average: '$avgOrderValue',
max: { $max: '$revenueDistribution' },
min: { $min: '$revenueDistribution' }
},
// Period classification
periodClassification: {
$switch: {
branches: [
{ case: { $eq: ['$_id.periodsFromFirst', 0] }, then: 'acquisition' },
{ case: { $lte: ['$_id.periodsFromFirst', 3] }, then: 'early_engagement' },
{ case: { $lte: ['$_id.periodsFromFirst', 12] }, then: 'mature_relationship' }
],
default: 'long_term_loyalty'
}
}
}
},
// Stage 9: Group by cohort for final analysis
{
$group: {
_id: '$_id.cohortPeriod',
cohortSize: { $first: '$cohortSize' },
// Retention analysis by period
retentionByPeriod: {
$push: {
period: '$_id.periodsFromFirst',
orderPeriod: '$_id.orderPeriod',
activeCustomers: '$activeCustomerCount',
retentionRate: '$retentionRate',
totalRevenue: '$totalRevenue',
revenuePerCustomer: '$revenuePerCustomer',
avgOrderValue: '$avgOrderValue',
totalOrders: '$totalOrders',
engagementScore: '$engagementScore',
periodClassification: '$periodClassification',
revenueMetrics: '$revenueMetrics'
}
},
// Aggregate cohort metrics
totalLifetimeRevenue: { $sum: '$totalRevenue' },
avgLifetimeValue: { $avg: '$totalCustomerLifetimeValue' },
peakRetentionRate: { $max: '$retentionRate' },
finalRetentionRate: { $last: '$retentionRate' },
avgEngagementScore: { $avg: '$engagementScore' },
// Cohort performance classification
cohortHealth: {
$avg: {
$cond: [
{ $gte: ['$retentionRate', 30] }, // 30% retention considered healthy
1,
0
]
}
}
}
},
// Stage 10: Calculate cohort performance indicators
{
$addFields: {
// Lifetime value per customer in cohort
lifetimeValuePerCustomer: {
$divide: ['$totalLifetimeRevenue', '$cohortSize']
},
// Retention curve analysis
retentionTrend: {
$let: {
vars: {
firstPeriodRetention: {
$arrayElemAt: [
{
$map: {
input: {
$filter: {
input: '$retentionByPeriod',
cond: { $eq: ['$$this.period', 0] }
}
},
in: '$$this.retentionRate'
}
},
0
]
},
lastPeriodRetention: '$finalRetentionRate'
},
in: {
$subtract: ['$$lastPeriodRetention', '$$firstPeriodRetention']
}
}
},
// Cohort quality classification
cohortQuality: {
$switch: {
branches: [
{
case: {
$and: [
{ $gte: ['$peakRetentionRate', 50] },
{ $gte: ['$avgEngagementScore', 1.5] },
{ $gte: ['$lifetimeValuePerCustomer', 500] }
]
},
then: 'excellent'
},
{
case: {
$and: [
{ $gte: ['$peakRetentionRate', 35] },
{ $gte: ['$avgEngagementScore', 1.0] },
{ $gte: ['$lifetimeValuePerCustomer', 250] }
]
},
then: 'good'
},
{
case: {
$and: [
{ $gte: ['$peakRetentionRate', 20] },
{ $gte: ['$avgEngagementScore', 0.5] }
]
},
then: 'fair'
}
],
default: 'poor'
}
},
// Strategic recommendations
recommendations: {
$switch: {
branches: [
{
case: { $lt: ['$peakRetentionRate', 20] },
then: ['improve_onboarding', 'enhance_early_engagement', 'review_product_fit']
},
{
case: { $lt: ['$finalRetentionRate', 10] },
then: ['develop_loyalty_program', 'improve_long_term_value', 'increase_engagement']
},
{
case: { $lt: ['$avgEngagementScore', 0.5] },
then: ['enhance_customer_experience', 'increase_purchase_frequency', 'improve_product_recommendations']
}
],
default: ['maintain_excellence', 'scale_successful_strategies', 'explore_expansion_opportunities']
}
}
}
},
// Stage 11: Final projection and formatting
{
$project: {
_id: 0,
// Cohort identification
cohortPeriod: '$_id',
cohortSize: 1,
cohortQuality: 1,
// Key performance metrics
performance: {
lifetimeValuePerCustomer: { $round: ['$lifetimeValuePerCustomer', 2] },
avgLifetimeValue: { $round: ['$avgLifetimeValue', 2] },
totalLifetimeRevenue: { $round: ['$totalLifetimeRevenue', 2] },
peakRetentionRate: { $round: ['$peakRetentionRate', 1] },
finalRetentionRate: { $round: ['$finalRetentionRate', 1] },
retentionTrend: { $round: ['$retentionTrend', 1] },
avgEngagementScore: { $round: ['$avgEngagementScore', 2] },
cohortHealth: { $round: ['$cohortHealth', 2] }
},
// Detailed retention analysis
retentionAnalysis: {
$map: {
input: { $sortArray: { input: '$retentionByPeriod', sortBy: { period: 1 } } },
in: {
period: '$$this.period',
orderPeriod: '$$this.orderPeriod',
activeCustomers: '$$this.activeCustomers',
retentionRate: { $round: ['$$this.retentionRate', 1] },
revenuePerCustomer: { $round: ['$$this.revenuePerCustomer', 2] },
avgOrderValue: { $round: ['$$this.avgOrderValue', 2] },
totalRevenue: { $round: ['$$this.totalRevenue', 2] },
totalOrders: '$$this.totalOrders',
engagementScore: { $round: ['$$this.engagementScore', 2] },
periodClassification: '$$this.periodClassification',
revenueMetrics: {
median: { $round: ['$$this.revenueMetrics.median', 2] },
average: { $round: ['$$this.revenueMetrics.average', 2] },
max: '$$this.revenueMetrics.max',
min: '$$this.revenueMetrics.min'
}
}
}
},
// Strategic insights
insights: {
recommendations: '$recommendations',
keyInsights: {
$filter: {
input: [
{
$cond: {
if: { $gt: ['$peakRetentionRate', 40] },
then: 'High-quality cohort with strong initial engagement',
else: null
}
},
{
$cond: {
if: { $gt: ['$retentionTrend', 0] },
then: 'Retention improving over time - successful loyalty building',
else: null
}
},
{
$cond: {
if: { $gt: ['$avgEngagementScore', 2] },
then: 'Highly engaged cohort with frequent repeat purchases',
else: null
}
},
{
$cond: {
if: { $gt: ['$lifetimeValuePerCustomer', 1000] },
then: 'High-value cohort - focus on retention and expansion',
else: null
}
}
],
cond: { $ne: ['$$this', null] }
}
}
}
}
},
// Stage 12: Sort by cohort period (most recent first)
{
$sort: { cohortPeriod: -1 }
}
];
try {
const results = await this.collections.orders.aggregate(pipeline, this.aggregationOptions).toArray();
const processingTime = Date.now() - startTime;
console.log(`Cohort analysis completed in ${processingTime}ms, analyzed ${results.length} cohorts`);
// Calculate cross-cohort insights
const crossCohortInsights = {
totalCohorts: results.length,
avgCohortSize: results.reduce((sum, cohort) => sum + cohort.cohortSize, 0) / results.length,
avgLifetimeValue: results.reduce((sum, cohort) => sum + cohort.performance.lifetimeValuePerCustomer, 0) / results.length,
bestPerformingCohort: results.reduce((best, current) =>
current.performance.lifetimeValuePerCustomer > best.performance.lifetimeValuePerCustomer ? current : best, results[0]
),
retentionTrendAvg: results.reduce((sum, cohort) => sum + cohort.performance.retentionTrend, 0) / results.length,
excellentCohorts: results.filter(cohort => cohort.cohortQuality === 'excellent').length,
improvingCohorts: results.filter(cohort => cohort.performance.retentionTrend > 0).length
};
return {
success: true,
processingTimeMs: processingTime,
cohortType: cohortType,
lookbackPeriods: lookbackPeriods,
analysisDate: new Date(),
cohortCount: results.length,
cohorts: results,
crossCohortInsights: crossCohortInsights,
metadata: {
aggregationComplexity: 'very_high',
stagesCount: pipeline.length,
analyticsFeatures: [
'customer_lifetime_value',
'retention_analysis',
'cohort_segmentation',
'behavioral_analysis',
'revenue_attribution',
'trend_analysis',
'performance_classification',
'strategic_recommendations'
]
}
};
} catch (error) {
console.error('Error performing cohort analysis:', error);
return {
success: false,
error: error.message,
processingTimeMs: Date.now() - startTime
};
}
}
}
// Benefits of MongoDB Aggregation Framework:
// - Single-pass processing for complex multi-stage analytics
// - Native document transformation without expensive JOINs
// - Automatic query optimization and index utilization
// - Horizontal scaling across sharded clusters
// - Real-time processing capabilities with streaming aggregation
// - Rich expression language for complex calculations
// - Built-in statistical and analytical functions
// - Memory-efficient processing with spill-to-disk support
// - Integration with MongoDB's native features (GeoSpatial, Text Search, etc.)
// - SQL-compatible operations through QueryLeaf integration
module.exports = {
AdvancedAnalyticsProcessor
};
Understanding MongoDB Aggregation Framework Architecture
Advanced Pipeline Optimization and Performance Patterns
Implement sophisticated aggregation strategies for enterprise MongoDB deployments:
// Production-optimized MongoDB Aggregation with advanced performance tuning
class EnterpriseAggregationOptimizer {
constructor(db, optimizationConfig) {
this.db = db;
this.config = {
...optimizationConfig,
enableQueryPlanCache: true,
enableParallelProcessing: true,
enableIncrementalProcessing: true,
maxMemoryUsage: '2GB',
enableIndexHints: true,
enableResultCaching: true
};
this.queryPlanCache = new Map();
this.resultCache = new Map();
this.performanceMetrics = new Map();
}
async optimizeAggregationPipeline(pipeline, collectionName, options = {}) {
console.log(`Optimizing aggregation pipeline for ${collectionName}...`);
const optimizationStrategies = [
this.moveMatchToBeginning,
this.optimizeIndexUsage,
this.enableEarlyFiltering,
this.minimizeDataMovement,
this.optimizeGroupingOperations,
this.enableParallelExecution
];
let optimizedPipeline = [...pipeline];
for (const strategy of optimizationStrategies) {
optimizedPipeline = await strategy.call(this, optimizedPipeline, collectionName, options);
}
return {
originalStages: pipeline.length,
optimizedStages: optimizedPipeline.length,
optimizedPipeline: optimizedPipeline,
estimatedPerformanceGain: this.calculatePerformanceGain(pipeline, optimizedPipeline)
};
}
async enableRealTimeAggregation(pipeline, collectionName, refreshInterval = 5000) {
console.log(`Setting up real-time aggregation for ${collectionName}...`);
// Implementation of real-time aggregation with Change Streams
const changeStream = this.db.collection(collectionName).watch([
{
$match: {
operationType: { $in: ['insert', 'update', 'delete'] }
}
}
]);
const realTimeProcessor = {
pipeline: pipeline,
lastResults: null,
isProcessing: false,
async processChanges() {
if (this.isProcessing) return;
this.isProcessing = true;
try {
const results = await this.db.collection(collectionName)
.aggregate(pipeline, { allowDiskUse: true })
.toArray();
this.lastResults = results;
// Emit real-time results to subscribers
this.emitResults(results);
} catch (error) {
console.error('Real-time aggregation error:', error);
} finally {
this.isProcessing = false;
}
}
};
// Process changes as they occur
changeStream.on('change', () => {
realTimeProcessor.processChanges();
});
return realTimeProcessor;
}
async implementIncrementalAggregation(pipeline, collectionName, incrementField = 'updatedAt') {
console.log(`Setting up incremental aggregation for ${collectionName}...`);
// Track last processed timestamp
let lastProcessedTime = await this.getLastProcessedTime(collectionName);
const incrementalPipeline = [
// Only process new/updated documents
{
$match: {
[incrementField]: { $gt: lastProcessedTime }
}
},
...pipeline
];
const results = await this.db.collection(collectionName)
.aggregate(incrementalPipeline, { allowDiskUse: true })
.toArray();
// Update last processed time
await this.updateLastProcessedTime(collectionName, new Date());
return {
incrementalResults: results,
lastProcessedTime: lastProcessedTime,
newProcessedTime: new Date(),
documentsProcessed: results.length
};
}
}
SQL-Style Aggregation Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB Aggregation Framework operations:
-- QueryLeaf aggregation operations with SQL-familiar syntax
-- Complex customer analytics with CTEs and window functions
WITH customer_segments AS (
SELECT
customer_id,
customer_name,
registration_date,
-- Customer segmentation using CASE expressions
CASE
WHEN registration_date >= CURRENT_TIMESTAMP - INTERVAL '90 days' THEN 'new_customer'
WHEN last_order_date >= CURRENT_TIMESTAMP - INTERVAL '30 days' THEN 'active_customer'
WHEN last_order_date >= CURRENT_TIMESTAMP - INTERVAL '180 days' THEN 'returning_customer'
ELSE 'dormant_customer'
END as customer_segment,
-- Geographic classification using nested CASE
CASE
WHEN JSON_EXTRACT(address, '$.country') = 'US' THEN 'domestic'
WHEN JSON_EXTRACT(address, '$.country') IN ('CA', 'MX') THEN 'north_america'
WHEN JSON_EXTRACT(address, '$.country') IN ('GB', 'DE', 'FR', 'IT', 'ES') THEN 'europe'
ELSE 'international'
END as geographic_segment,
-- Customer value classification
total_spent,
total_orders,
average_order_value,
loyalty_tier
FROM customers
WHERE is_active = true
),
order_analytics AS (
SELECT
o._id as order_id,
o.customer_id,
o.order_date,
-- Time-based dimensions using date functions
DATE_TRUNC('month', o.order_date) as order_month,
DATE_TRUNC('quarter', o.order_date) as order_quarter,
EXTRACT(year FROM o.order_date) as order_year,
EXTRACT(dow FROM o.order_date) as day_of_week,
EXTRACT(hour FROM o.order_date) as hour_of_day,
-- Seasonal analysis
CASE
WHEN EXTRACT(month FROM o.order_date) IN (12, 1, 2) THEN 'winter'
WHEN EXTRACT(month FROM o.order_date) IN (3, 4, 5) THEN 'spring'
WHEN EXTRACT(month FROM o.order_date) IN (6, 7, 8) THEN 'summer'
ELSE 'fall'
END as season,
-- Financial metrics
JSON_EXTRACT(financial, '$.total') as order_total,
JSON_EXTRACT(financial, '$.discount') as discount_amount,
JSON_EXTRACT(financial, '$.tax') as tax_amount,
JSON_EXTRACT(financial, '$.shipping') as shipping_amount,
-- Order classification
CASE
WHEN JSON_EXTRACT(financial, '$.total') >= 1000 THEN 'high_value'
WHEN JSON_EXTRACT(financial, '$.total') >= 500 THEN 'medium_value'
WHEN JSON_EXTRACT(financial, '$.total') >= 100 THEN 'low_value'
ELSE 'micro_transaction'
END as order_value_segment,
-- Item analysis using JSON functions
JSON_ARRAY_LENGTH(items) as item_count,
-- Payment and shipping insights
JSON_EXTRACT(payment, '$.method') as payment_method,
JSON_EXTRACT(shipping, '$.method') as shipping_method,
JSON_EXTRACT(shipping, '$.region') as shipping_region
FROM orders o
WHERE o.order_date >= CURRENT_TIMESTAMP - INTERVAL '12 months'
AND o.status IN ('completed', 'shipped', 'delivered')
),
product_performance AS (
SELECT
oi.order_id,
-- Unnest items array for item-level analysis
JSON_EXTRACT(item, '$.product_id') as product_id,
JSON_EXTRACT(item, '$.quantity') as quantity,
JSON_EXTRACT(item, '$.unit_price') as unit_price,
JSON_EXTRACT(item, '$.total_price') as item_total,
-- Product details from JOIN
p.product_name,
p.category,
p.brand,
p.cost_per_unit,
-- Calculate margins
(JSON_EXTRACT(item, '$.unit_price') - p.cost_per_unit) as unit_margin,
(JSON_EXTRACT(item, '$.unit_price') - p.cost_per_unit) * JSON_EXTRACT(item, '$.quantity') as total_margin,
-- Product categorization
CASE
WHEN p.category = 'Electronics' THEN 'tech'
WHEN p.category IN ('Clothing', 'Shoes', 'Accessories') THEN 'fashion'
WHEN p.category IN ('Home', 'Garden', 'Furniture') THEN 'home'
ELSE 'other'
END as product_group
FROM order_analytics oa
CROSS JOIN JSON_TABLE(
oa.items, '$[*]' COLUMNS (
item JSON PATH '$'
)
) AS items_table
JOIN products p ON JSON_EXTRACT(items_table.item, '$.product_id') = p.product_id
WHERE p.is_active = true
),
comprehensive_analytics AS (
SELECT
-- Dimensional attributes
cs.customer_segment,
cs.geographic_segment,
oa.order_month,
oa.order_quarter,
oa.season,
oa.order_value_segment,
pp.product_group,
pp.category,
pp.brand,
-- Aggregated metrics using window functions
COUNT(DISTINCT cs.customer_id) as unique_customers,
COUNT(DISTINCT oa.order_id) as total_orders,
COUNT(DISTINCT pp.product_id) as unique_products,
-- Revenue metrics
SUM(oa.order_total) as total_revenue,
AVG(oa.order_total) as avg_order_value,
SUM(pp.total_margin) as total_margin,
-- Customer metrics with window functions
AVG(SUM(oa.order_total)) OVER (
PARTITION BY cs.customer_id
) as avg_customer_monthly_spend,
-- Product performance with rankings
RANK() OVER (
PARTITION BY oa.order_month
ORDER BY SUM(pp.total_margin) DESC
) as product_margin_rank,
-- Time-based analysis
COUNT(*) OVER (
PARTITION BY cs.geographic_segment, oa.season
) as segment_seasonal_orders,
-- Advanced statistical functions
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY oa.order_total) as median_order_value,
STDDEV_POP(oa.order_total) as order_value_stddev,
-- Cohort analysis elements
MIN(oa.order_date) OVER (PARTITION BY cs.customer_id) as customer_first_order,
MAX(oa.order_date) OVER (PARTITION BY cs.customer_id) as customer_last_order,
-- Calculate customer lifetime metrics
COUNT(*) OVER (PARTITION BY cs.customer_id) as customer_total_orders,
SUM(oa.order_total) OVER (PARTITION BY cs.customer_id) as customer_lifetime_value
FROM customer_segments cs
JOIN order_analytics oa ON cs.customer_id = oa.customer_id
JOIN product_performance pp ON oa.order_id = pp.order_id
),
final_analytics AS (
SELECT
customer_segment,
geographic_segment,
order_quarter,
season,
product_group,
category,
brand,
-- Core metrics
unique_customers,
total_orders,
unique_products,
ROUND(total_revenue, 2) as total_revenue,
ROUND(avg_order_value, 2) as avg_order_value,
ROUND(total_margin, 2) as total_margin,
ROUND((total_margin / total_revenue) * 100, 1) as margin_percentage,
-- Customer insights
ROUND(avg_customer_monthly_spend, 2) as avg_customer_monthly_spend,
ROUND(median_order_value, 2) as median_order_value,
ROUND(order_value_stddev, 2) as order_value_stddev,
-- Performance indicators
CASE
WHEN product_margin_rank <= 10 THEN 'top_performer'
WHEN product_margin_rank <= 50 THEN 'good_performer'
ELSE 'average_performer'
END as performance_tier,
-- Customer behavior analysis
AVG(EXTRACT(days FROM (customer_last_order - customer_first_order))) as avg_customer_lifespan_days,
AVG(customer_total_orders) as avg_orders_per_customer,
ROUND(AVG(customer_lifetime_value), 2) as avg_customer_lifetime_value,
-- Growth analysis using LAG function
LAG(total_revenue) OVER (
PARTITION BY customer_segment, geographic_segment, product_group
ORDER BY order_quarter
) as prev_quarter_revenue,
-- Calculate growth rate
ROUND(
((total_revenue - LAG(total_revenue) OVER (
PARTITION BY customer_segment, geographic_segment, product_group
ORDER BY order_quarter
)) / NULLIF(LAG(total_revenue) OVER (
PARTITION BY customer_segment, geographic_segment, product_group
ORDER BY order_quarter
), 0)) * 100,
1
) as revenue_growth_pct,
-- Market share analysis
ROUND(
(total_revenue / SUM(total_revenue) OVER (PARTITION BY order_quarter)) * 100,
2
) as market_share_pct,
-- Seasonal performance indexing
ROUND(
total_revenue / AVG(total_revenue) OVER (
PARTITION BY customer_segment, geographic_segment, product_group
) * 100,
1
) as seasonal_index
FROM comprehensive_analytics
)
SELECT
-- Dimensional attributes
customer_segment,
geographic_segment,
order_quarter,
season,
product_group,
category,
brand,
-- Core metrics
unique_customers,
total_orders,
unique_products,
total_revenue,
avg_order_value,
total_margin,
margin_percentage,
-- Customer insights
avg_customer_monthly_spend,
median_order_value,
order_value_stddev,
avg_customer_lifespan_days,
avg_orders_per_customer,
avg_customer_lifetime_value,
-- Performance classification
performance_tier,
-- Growth metrics
prev_quarter_revenue,
revenue_growth_pct,
market_share_pct,
seasonal_index,
-- Business insights and recommendations
CASE
WHEN revenue_growth_pct > 25 THEN 'high_growth_opportunity'
WHEN revenue_growth_pct > 10 THEN 'steady_growth'
WHEN revenue_growth_pct > 0 THEN 'slow_growth'
WHEN revenue_growth_pct IS NULL THEN 'new_segment'
ELSE 'declining_segment'
END as growth_classification,
CASE
WHEN margin_percentage > 30 AND revenue_growth_pct > 15 THEN 'invest_and_expand'
WHEN margin_percentage > 30 AND revenue_growth_pct < 0 THEN 'optimize_and_retain'
WHEN margin_percentage < 15 AND revenue_growth_pct > 15 THEN 'improve_margins'
WHEN margin_percentage < 15 AND revenue_growth_pct < 0 THEN 'consider_exit'
ELSE 'monitor_and_optimize'
END as strategic_recommendation,
-- Key performance indicators
CASE
WHEN avg_customer_lifetime_value > 1000 AND avg_orders_per_customer > 5 THEN 'high_value_loyal'
WHEN avg_customer_lifetime_value > 500 THEN 'high_value'
WHEN avg_orders_per_customer > 3 THEN 'loyal_customers'
ELSE 'acquisition_focus'
END as customer_strategy
FROM final_analytics
WHERE total_revenue > 1000 -- Filter for statistical significance
ORDER BY
total_revenue DESC,
revenue_growth_pct DESC NULLS LAST,
margin_percentage DESC
LIMIT 500;
-- Real-time product performance dashboard
CREATE VIEW real_time_product_performance AS
WITH hourly_product_metrics AS (
SELECT
JSON_EXTRACT(item, '$.product_id') as product_id,
DATE_TRUNC('hour', order_date) as hour_bucket,
-- Sales metrics
SUM(JSON_EXTRACT(item, '$.quantity')) as total_quantity_sold,
SUM(JSON_EXTRACT(item, '$.total_price')) as total_revenue,
COUNT(DISTINCT order_id) as unique_orders,
COUNT(DISTINCT customer_id) as unique_customers,
-- Pricing analysis
AVG(JSON_EXTRACT(item, '$.unit_price')) as avg_selling_price,
MAX(JSON_EXTRACT(item, '$.unit_price')) as max_selling_price,
MIN(JSON_EXTRACT(item, '$.unit_price')) as min_selling_price,
-- Performance indicators
AVG(JSON_EXTRACT(financial, '$.total')) as avg_order_value,
SUM(JSON_EXTRACT(item, '$.quantity')) / COUNT(DISTINCT order_id) as avg_quantity_per_order
FROM orders o
CROSS JOIN JSON_TABLE(
o.items, '$[*]' COLUMNS (
item JSON PATH '$'
)
) AS items_unnested
WHERE o.order_date >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND o.status IN ('completed', 'processing', 'shipped')
GROUP BY
JSON_EXTRACT(item, '$.product_id'),
DATE_TRUNC('hour', order_date)
),
product_rankings AS (
SELECT
hpm.*,
p.product_name,
p.category,
p.brand,
p.cost_per_unit,
-- Calculate margins
(hpm.avg_selling_price - p.cost_per_unit) as unit_margin,
((hpm.avg_selling_price - p.cost_per_unit) * hpm.total_quantity_sold) as total_margin,
-- Performance rankings using window functions
RANK() OVER (ORDER BY total_revenue DESC) as revenue_rank,
RANK() OVER (ORDER BY total_quantity_sold DESC) as quantity_rank,
RANK() OVER (PARTITION BY p.category ORDER BY total_revenue DESC) as category_rank,
-- Percentile rankings
PERCENT_RANK() OVER (ORDER BY total_revenue) as revenue_percentile,
PERCENT_RANK() OVER (ORDER BY total_quantity_sold) as quantity_percentile,
-- Growth analysis (comparing to previous hour)
LAG(total_revenue) OVER (
PARTITION BY product_id
ORDER BY hour_bucket
) as prev_hour_revenue,
LAG(total_quantity_sold) OVER (
PARTITION BY product_id
ORDER BY hour_bucket
) as prev_hour_quantity
FROM hourly_product_metrics hpm
JOIN products p ON hpm.product_id = p.product_id
WHERE p.is_active = true
)
SELECT
product_id,
product_name,
category,
brand,
hour_bucket,
-- Sales performance
total_quantity_sold,
ROUND(total_revenue, 2) as total_revenue,
unique_orders,
unique_customers,
-- Pricing metrics
ROUND(avg_selling_price, 2) as avg_selling_price,
max_selling_price,
min_selling_price,
ROUND(unit_margin, 2) as unit_margin,
ROUND(total_margin, 2) as total_margin,
ROUND((total_margin / total_revenue) * 100, 1) as margin_percentage,
-- Performance rankings
revenue_rank,
quantity_rank,
category_rank,
ROUND(revenue_percentile * 100, 1) as revenue_percentile_score,
-- Growth metrics
ROUND(
CASE
WHEN prev_hour_revenue > 0 THEN
((total_revenue - prev_hour_revenue) / prev_hour_revenue) * 100
ELSE NULL
END,
1
) as hourly_revenue_growth_pct,
ROUND(
CASE
WHEN prev_hour_quantity > 0 THEN
((total_quantity_sold - prev_hour_quantity) / prev_hour_quantity::DECIMAL) * 100
ELSE NULL
END,
1
) as hourly_quantity_growth_pct,
-- Customer metrics
ROUND(avg_order_value, 2) as avg_order_value,
ROUND(avg_quantity_per_order, 2) as avg_quantity_per_order,
ROUND(total_revenue / unique_customers, 2) as revenue_per_customer,
-- Performance classification
CASE
WHEN revenue_rank <= 10 AND margin_percentage > 20 THEN 'star_performer'
WHEN revenue_rank <= 50 AND margin_percentage > 15 THEN 'strong_performer'
WHEN revenue_rank <= 100 THEN 'solid_performer'
ELSE 'monitor_performance'
END as performance_classification,
-- Alert indicators
CASE
WHEN hourly_revenue_growth_pct > 50 THEN 'trending_up'
WHEN hourly_revenue_growth_pct < -30 THEN 'trending_down'
WHEN revenue_rank <= 20 AND margin_percentage < 10 THEN 'margin_concern'
ELSE 'normal'
END as alert_status,
-- Recommendations
CASE
WHEN performance_classification = 'star_performer' THEN 'increase_inventory_and_marketing'
WHEN alert_status = 'trending_down' THEN 'investigate_declining_performance'
WHEN margin_percentage < 10 THEN 'review_pricing_strategy'
WHEN revenue_rank > 100 THEN 'consider_promotion_or_discontinuation'
ELSE 'maintain_current_strategy'
END as recommendation
FROM product_rankings
WHERE hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY
hour_bucket DESC,
revenue_rank ASC,
margin_percentage DESC;
-- Advanced cohort analysis with SQL window functions
WITH customer_cohorts AS (
SELECT
customer_id,
DATE_TRUNC('month', MIN(order_date)) as cohort_month,
MIN(order_date) as first_order_date,
COUNT(*) as total_orders,
SUM(JSON_EXTRACT(financial, '$.total')) as total_spent
FROM orders
WHERE status IN ('completed', 'delivered')
AND order_date >= CURRENT_TIMESTAMP - INTERVAL '24 months'
GROUP BY customer_id
),
cohort_periods AS (
SELECT
cc.customer_id,
cc.cohort_month,
cc.first_order_date,
cc.total_orders,
cc.total_spent,
o.order_date,
o._id as order_id,
JSON_EXTRACT(o.financial, '$.total') as order_value,
-- Calculate periods since first order
FLOOR(
MONTHS_BETWEEN(DATE_TRUNC('month', o.order_date), cc.cohort_month)
) as periods_since_first_order,
DATE_TRUNC('month', o.order_date) as order_month
FROM customer_cohorts cc
JOIN orders o ON cc.customer_id = o.customer_id
WHERE o.status IN ('completed', 'delivered')
AND o.order_date >= cc.first_order_date
),
cohort_analysis AS (
SELECT
cohort_month,
periods_since_first_order,
order_month,
-- Cohort metrics
COUNT(DISTINCT customer_id) as active_customers,
COUNT(DISTINCT order_id) as total_orders,
SUM(order_value) as total_revenue,
AVG(order_value) as avg_order_value,
-- Customer behavior
AVG(total_orders) as avg_lifetime_orders,
AVG(total_spent) as avg_lifetime_value,
-- Period-specific insights
COUNT(DISTINCT customer_id) /
FIRST_VALUE(COUNT(DISTINCT customer_id)) OVER (
PARTITION BY cohort_month
ORDER BY periods_since_first_order
ROWS UNBOUNDED PRECEDING
) as retention_rate
FROM cohort_periods
GROUP BY cohort_month, periods_since_first_order, order_month
),
cohort_summary AS (
SELECT
cohort_month,
-- Cohort size (customers who made first purchase in this month)
MAX(CASE WHEN periods_since_first_order = 0 THEN active_customers END) as cohort_size,
-- Retention rates by period
MAX(CASE WHEN periods_since_first_order = 1 THEN retention_rate END) as month_1_retention,
MAX(CASE WHEN periods_since_first_order = 3 THEN retention_rate END) as month_3_retention,
MAX(CASE WHEN periods_since_first_order = 6 THEN retention_rate END) as month_6_retention,
MAX(CASE WHEN periods_since_first_order = 12 THEN retention_rate END) as month_12_retention,
-- Revenue metrics
SUM(total_revenue) as cohort_total_revenue,
AVG(avg_lifetime_value) as avg_customer_ltv,
-- Performance indicators
MAX(periods_since_first_order) as max_observed_periods,
AVG(avg_order_value) as cohort_avg_order_value
FROM cohort_analysis
GROUP BY cohort_month
)
SELECT
cohort_month,
cohort_size,
-- Retention analysis
ROUND(month_1_retention * 100, 1) as month_1_retention_pct,
ROUND(month_3_retention * 100, 1) as month_3_retention_pct,
ROUND(month_6_retention * 100, 1) as month_6_retention_pct,
ROUND(month_12_retention * 100, 1) as month_12_retention_pct,
-- Financial metrics
ROUND(cohort_total_revenue, 2) as cohort_total_revenue,
ROUND(avg_customer_ltv, 2) as avg_customer_ltv,
ROUND(cohort_avg_order_value, 2) as avg_order_value,
ROUND(cohort_total_revenue / cohort_size, 2) as revenue_per_customer,
-- Cohort performance classification
CASE
WHEN month_3_retention >= 0.4 AND avg_customer_ltv >= 500 THEN 'excellent'
WHEN month_3_retention >= 0.3 AND avg_customer_ltv >= 300 THEN 'good'
WHEN month_3_retention >= 0.2 OR avg_customer_ltv >= 200 THEN 'fair'
ELSE 'poor'
END as cohort_quality,
-- Growth trend analysis
ROUND(
(month_6_retention - month_1_retention) * 100,
1
) as retention_trend,
-- Business insights
CASE
WHEN month_1_retention < 0.2 THEN 'improve_onboarding'
WHEN month_12_retention < 0.1 THEN 'enhance_loyalty_program'
WHEN avg_customer_ltv < 100 THEN 'increase_customer_value'
ELSE 'maintain_performance'
END as recommendation,
max_observed_periods
FROM cohort_summary
WHERE cohort_size >= 10 -- Filter for statistical significance
ORDER BY cohort_month DESC;
-- QueryLeaf provides comprehensive aggregation capabilities:
-- 1. SQL-familiar syntax for complex MongoDB aggregation pipelines
-- 2. Advanced analytics with CTEs, window functions, and statistical operations
-- 3. Real-time processing with familiar SQL patterns and aggregation functions
-- 4. Complex customer segmentation and behavioral analysis using SQL constructs
-- 5. Product performance analytics with rankings and growth calculations
-- 6. Cohort analysis with retention rates and lifetime value calculations
-- 7. Integration with MongoDB's native aggregation optimizations
-- 8. Familiar SQL data types, functions, and expression syntax
-- 9. Advanced time-series analysis and trend detection capabilities
-- 10. Enterprise-ready analytics with performance optimization and scalability
Best Practices for Aggregation Framework Implementation
Performance Optimization and Pipeline Design
Essential strategies for effective MongoDB Aggregation Framework usage:
- Early Stage Filtering: Place
$matchstages as early as possible to reduce data processing volume - Index Utilization: Design compound indexes that support aggregation pipeline operations
- Memory Management: Use
allowDiskUse: truefor large aggregations and monitor memory usage - Pipeline Ordering: Arrange stages to minimize data movement and intermediate result sizes
- Expression Optimization: Use efficient expressions and avoid complex nested operations when possible
- Result Set Limiting: Apply
$limitstages strategically to control output size
Enterprise Analytics Architecture
Design scalable aggregation systems for production deployments:
- Distributed Processing: Leverage MongoDB's sharding to distribute aggregation workloads
- Caching Strategies: Implement result caching for frequently accessed aggregations
- Real-time Processing: Combine aggregation pipelines with Change Streams for live analytics
- Incremental Updates: Design incremental aggregation patterns for large, frequently updated datasets
- Performance Monitoring: Track aggregation performance and optimize based on usage patterns
- Resource Planning: Size clusters appropriately for expected aggregation workloads and data volumes
Conclusion
MongoDB's Aggregation Framework provides comprehensive data processing capabilities that eliminate the complexity and performance limitations of traditional SQL analytics approaches through optimized single-pass processing, native document transformations, and distributed execution capabilities. The rich expression language and extensive operator library enable sophisticated analytics while maintaining high performance and operational simplicity.
Key MongoDB Aggregation Framework benefits include:
- Unified Processing: Single-pass analytics without expensive JOINs or multiple query rounds
- Rich Expressions: Comprehensive mathematical, statistical, and analytical operations
- Document-Native: Native handling of nested documents, arrays, and complex data structures
- Performance Optimization: Automatic query optimization with index utilization and parallel processing
- Horizontal Scaling: Distributed aggregation processing across sharded MongoDB clusters
- Real-time Capabilities: Integration with Change Streams for live analytical processing
Whether you're building business intelligence platforms, real-time analytics systems, customer segmentation tools, or complex reporting solutions, MongoDB's Aggregation Framework with QueryLeaf's familiar SQL interface provides the foundation for powerful, scalable, and maintainable analytical processing.
QueryLeaf Integration: QueryLeaf seamlessly translates SQL analytics queries into optimized MongoDB aggregation pipelines while providing familiar SQL syntax for complex analytics, statistical functions, and reporting operations. Advanced aggregation patterns including cohort analysis, customer segmentation, and real-time analytics are elegantly handled through familiar SQL constructs, making sophisticated data processing both powerful and accessible to SQL-oriented analytics teams.
The combination of MongoDB's robust aggregation capabilities with SQL-style analytical operations makes it an ideal platform for applications requiring both advanced analytics functionality and familiar database interaction patterns, ensuring your analytical infrastructure can deliver insights efficiently while maintaining developer productivity and operational excellence.