MongoDB Time Series Collections: High-Performance Analytics with SQL-Style Time Data Operations
Modern applications generate massive amounts of time-stamped data from IoT sensors, application metrics, financial trades, user activity logs, and monitoring systems. Whether you're tracking server performance metrics, analyzing user behavior patterns, or processing real-time sensor data from industrial equipment, traditional database approaches often struggle with the volume, velocity, and specific query patterns required for time-series workloads.
Time-series data presents unique challenges: high write throughput, time-based queries, efficient storage compression, and analytics operations that span large time ranges. MongoDB's time series collections provide specialized optimizations for these workloads while maintaining the flexibility and query capabilities that make MongoDB powerful for application development.
The Time Series Data Challenge
Traditional approaches to storing time-series data have significant limitations:
-- SQL time series storage challenges
-- Basic table structure for metrics
CREATE TABLE server_metrics (
id SERIAL PRIMARY KEY,
server_id VARCHAR(50),
metric_name VARCHAR(100),
value DECIMAL(10,4),
timestamp TIMESTAMP,
tags JSONB
);
-- High insert volume creates index maintenance overhead
INSERT INTO server_metrics (server_id, metric_name, value, timestamp, tags)
VALUES
('web-01', 'cpu_usage', 85.2, '2025-09-03 10:15:00', '{"datacenter": "us-east", "env": "prod"}'),
('web-01', 'memory_usage', 72.1, '2025-09-03 10:15:00', '{"datacenter": "us-east", "env": "prod"}'),
('web-01', 'disk_io', 150.8, '2025-09-03 10:15:00', '{"datacenter": "us-east", "env": "prod"}');
-- Problems: Index bloat, storage inefficiency, slow inserts
-- Time-range queries require expensive scans
SELECT
server_id,
metric_name,
AVG(value) as avg_value,
MAX(value) as max_value
FROM server_metrics
WHERE timestamp BETWEEN '2025-09-03 00:00:00' AND '2025-09-03 23:59:59'
AND metric_name = 'cpu_usage'
GROUP BY server_id, metric_name;
-- Problems: Full table scans, no time-series optimization
-- Storage grows rapidly without compression
SELECT
pg_size_pretty(pg_total_relation_size('server_metrics')) AS table_size,
COUNT(*) as row_count,
MAX(timestamp) - MIN(timestamp) as time_span
FROM server_metrics;
-- Problems: No time-based compression, storage overhead
MongoDB time series collections address these challenges:
// MongoDB time series collections optimizations
db.createCollection('server_metrics', {
timeseries: {
timeField: 'timestamp',
metaField: 'metadata',
granularity: 'minutes',
bucketMaxSpanSeconds: 3600,
bucketRoundingSeconds: 60
}
});
// Optimized insertions for high-throughput scenarios
db.server_metrics.insertMany([
{
timestamp: ISODate("2025-09-03T10:15:00Z"),
cpu_usage: 85.2,
memory_usage: 72.1,
disk_io: 150.8,
metadata: {
server_id: "web-01",
datacenter: "us-east",
environment: "prod",
instance_type: "c5.large"
}
},
{
timestamp: ISODate("2025-09-03T10:16:00Z"),
cpu_usage: 87.5,
memory_usage: 74.3,
disk_io: 165.2,
metadata: {
server_id: "web-01",
datacenter: "us-east",
environment: "prod",
instance_type: "c5.large"
}
}
]);
// Benefits:
// - Automatic bucketing reduces storage overhead by 70%+
// - Time-based indexes optimized for range queries
// - Compression algorithms designed for time-series patterns
// - Query performance optimized for time-range operations
Creating Time Series Collections
Basic Time Series Setup
Configure time series collections for optimal performance:
// Time series collection configuration
class TimeSeriesManager {
constructor(db) {
this.db = db;
}
async createMetricsCollection(options = {}) {
// Server metrics time series collection
return await this.db.createCollection('server_metrics', {
timeseries: {
timeField: 'timestamp',
metaField: 'metadata',
granularity: options.granularity || 'minutes',
// Bucket configuration for optimization
bucketMaxSpanSeconds: options.maxSpan || 3600, // 1 hour buckets
bucketRoundingSeconds: options.rounding || 60 // Round to nearest minute
},
// Additional optimizations
clusteredIndex: {
key: { _id: 1 },
unique: true
}
});
}
async createIoTSensorCollection() {
// IoT sensor data with high-frequency measurements
return await this.db.createCollection('sensor_readings', {
timeseries: {
timeField: 'timestamp',
metaField: 'sensor_info',
granularity: 'seconds', // High-frequency data
// Shorter buckets for high-frequency data
bucketMaxSpanSeconds: 300, // 5 minute buckets
bucketRoundingSeconds: 10 // Round to nearest 10 seconds
}
});
}
async createFinancialDataCollection() {
// Financial market data (trades, prices)
return await this.db.createCollection('market_data', {
timeseries: {
timeField: 'trade_time',
metaField: 'instrument',
granularity: 'seconds',
// Financial data specific optimizations
bucketMaxSpanSeconds: 60, // 1 minute buckets for market data
bucketRoundingSeconds: 1 // Precise timing important
},
// Expire old data automatically (regulatory requirements)
expireAfterSeconds: 7 * 365 * 24 * 60 * 60 // 7 years retention
});
}
async createUserActivityCollection() {
// User activity tracking (clicks, views, sessions)
return await this.db.createCollection('user_activity', {
timeseries: {
timeField: 'event_time',
metaField: 'user_context',
granularity: 'minutes',
bucketMaxSpanSeconds: 3600, // 1 hour buckets
bucketRoundingSeconds: 60 // Minute precision
},
// Data lifecycle management
expireAfterSeconds: 90 * 24 * 60 * 60 // 90 days retention
});
}
}
SQL-style time series table creation concepts:
-- SQL time series table equivalent patterns
-- Specialized table for time-series data
CREATE TABLE server_metrics (
timestamp TIMESTAMPTZ NOT NULL,
server_id VARCHAR(50) NOT NULL,
datacenter VARCHAR(20),
environment VARCHAR(10),
cpu_usage DECIMAL(5,2),
memory_usage DECIMAL(5,2),
disk_io DECIMAL(8,2),
network_bytes_in BIGINT,
network_bytes_out BIGINT,
-- Time-series optimizations
CONSTRAINT pk_server_metrics PRIMARY KEY (server_id, timestamp),
CONSTRAINT check_timestamp_range
CHECK (timestamp >= '2024-01-01' AND timestamp < '2030-01-01')
);
-- Time-series specific indexes
CREATE INDEX idx_server_metrics_time_range
ON server_metrics USING BRIN (timestamp);
-- Partitioning by time for performance
CREATE TABLE server_metrics_2025_09
PARTITION OF server_metrics
FOR VALUES FROM ('2025-09-01') TO ('2025-10-01');
-- Automatic data lifecycle with partitions
CREATE TABLE server_metrics_template (
LIKE server_metrics INCLUDING ALL
) WITH (
fillfactor = 100, -- Optimize for append-only data
parallel_workers = 8
);
-- Compression for historical data
ALTER TABLE server_metrics_2025_08 SET (
toast_compression = 'lz4',
parallel_workers = 4
);
High-Performance Time Series Queries
Time-Range Analytics
Implement efficient time-based analytics operations:
// Time series analytics implementation
class TimeSeriesAnalytics {
constructor(db) {
this.db = db;
this.metricsCollection = db.collection('server_metrics');
}
async getMetricSummary(serverId, metricName, startTime, endTime) {
// Basic time series aggregation with performance optimization
const pipeline = [
{
$match: {
'metadata.server_id': serverId,
timestamp: {
$gte: startTime,
$lte: endTime
}
}
},
{
$group: {
_id: null,
avg_value: { $avg: `$${metricName}` },
min_value: { $min: `$${metricName}` },
max_value: { $max: `$${metricName}` },
sample_count: { $sum: 1 },
first_timestamp: { $min: "$timestamp" },
last_timestamp: { $max: "$timestamp" }
}
},
{
$project: {
_id: 0,
server_id: serverId,
metric_name: metricName,
statistics: {
average: { $round: ["$avg_value", 2] },
minimum: "$min_value",
maximum: "$max_value",
sample_count: "$sample_count"
},
time_range: {
start: "$first_timestamp",
end: "$last_timestamp",
duration_minutes: {
$divide: [
{ $subtract: ["$last_timestamp", "$first_timestamp"] },
60000
]
}
}
}
}
];
const results = await this.metricsCollection.aggregate(pipeline).toArray();
return results[0];
}
async getTimeSeriesData(serverId, metricName, startTime, endTime, intervalMinutes = 5) {
// Time bucketed aggregation for charts and visualization
const intervalMs = intervalMinutes * 60 * 1000;
const pipeline = [
{
$match: {
'metadata.server_id': serverId,
timestamp: {
$gte: startTime,
$lte: endTime
}
}
},
{
$group: {
_id: {
// Create time buckets
time_bucket: {
$dateFromParts: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" },
hour: { $hour: "$timestamp" },
minute: {
$multiply: [
{ $floor: { $divide: [{ $minute: "$timestamp" }, intervalMinutes] } },
intervalMinutes
]
}
}
}
},
avg_value: { $avg: `$${metricName}` },
min_value: { $min: `$${metricName}` },
max_value: { $max: `$${metricName}` },
sample_count: { $sum: 1 },
// Calculate percentiles
values: { $push: `$${metricName}` }
}
},
{
$addFields: {
// Calculate approximate percentiles
p95_value: {
$arrayElemAt: [
"$values",
{ $floor: { $multiply: [{ $size: "$values" }, 0.95] } }
]
}
}
},
{
$sort: { "_id.time_bucket": 1 }
},
{
$project: {
timestamp: "$_id.time_bucket",
metrics: {
average: { $round: ["$avg_value", 2] },
minimum: "$min_value",
maximum: "$max_value",
p95: "$p95_value",
sample_count: "$sample_count"
},
_id: 0
}
}
];
return await this.metricsCollection.aggregate(pipeline).toArray();
}
async detectAnomalies(serverId, metricName, windowHours = 24) {
// Statistical anomaly detection using moving averages
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000);
const pipeline = [
{
$match: {
'metadata.server_id': serverId,
timestamp: { $gte: windowStart }
}
},
{
$sort: { timestamp: 1 }
},
{
$setWindowFields: {
partitionBy: null,
sortBy: { timestamp: 1 },
output: {
// Moving average over last 10 points
moving_avg: {
$avg: `$${metricName}`,
window: {
documents: [-9, 0] // Current + 9 previous points
}
},
// Standard deviation
moving_std: {
$stdDevSamp: `$${metricName}`,
window: {
documents: [-19, 0] // Current + 19 previous points
}
}
}
}
},
{
$addFields: {
// Detect anomalies using 2-sigma rule
deviation: {
$abs: { $subtract: [`$${metricName}`, "$moving_avg"] }
},
threshold: { $multiply: ["$moving_std", 2] }
}
},
{
$addFields: {
is_anomaly: { $gt: ["$deviation", "$threshold"] },
anomaly_severity: {
$cond: {
if: { $gt: ["$deviation", { $multiply: ["$moving_std", 3] }] },
then: "high",
else: {
$cond: {
if: { $gt: ["$deviation", { $multiply: ["$moving_std", 2] }] },
then: "medium",
else: "low"
}
}
}
}
}
},
{
$match: {
is_anomaly: true
}
},
{
$project: {
timestamp: 1,
value: `$${metricName}`,
expected_value: { $round: ["$moving_avg", 2] },
deviation: { $round: ["$deviation", 2] },
severity: "$anomaly_severity",
metadata: 1
}
},
{
$sort: { timestamp: -1 }
},
{
$limit: 50
}
];
return await this.metricsCollection.aggregate(pipeline).toArray();
}
async calculateMetricCorrelations(serverIds, metrics, timeWindow) {
// Analyze correlations between different metrics
const pipeline = [
{
$match: {
'metadata.server_id': { $in: serverIds },
timestamp: {
$gte: new Date(Date.now() - timeWindow)
}
}
},
{
// Group by minute for correlation analysis
$group: {
_id: {
server: "$metadata.server_id",
minute: {
$dateFromParts: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" },
hour: { $hour: "$timestamp" },
minute: { $minute: "$timestamp" }
}
}
},
// Average metrics within each minute bucket
cpu_avg: { $avg: "$cpu_usage" },
memory_avg: { $avg: "$memory_usage" },
disk_io_avg: { $avg: "$disk_io" },
network_in_avg: { $avg: "$network_bytes_in" },
network_out_avg: { $avg: "$network_bytes_out" }
}
},
{
$group: {
_id: "$_id.server",
data_points: {
$push: {
timestamp: "$_id.minute",
cpu: "$cpu_avg",
memory: "$memory_avg",
disk_io: "$disk_io_avg",
network_in: "$network_in_avg",
network_out: "$network_out_avg"
}
}
}
},
{
$addFields: {
// Calculate correlation between CPU and memory
cpu_memory_correlation: {
$function: {
body: function(dataPoints) {
const n = dataPoints.length;
if (n < 2) return 0;
const cpuValues = dataPoints.map(d => d.cpu);
const memValues = dataPoints.map(d => d.memory);
const cpuMean = cpuValues.reduce((a, b) => a + b, 0) / n;
const memMean = memValues.reduce((a, b) => a + b, 0) / n;
let numerator = 0, cpuSumSq = 0, memSumSq = 0;
for (let i = 0; i < n; i++) {
const cpuDiff = cpuValues[i] - cpuMean;
const memDiff = memValues[i] - memMean;
numerator += cpuDiff * memDiff;
cpuSumSq += cpuDiff * cpuDiff;
memSumSq += memDiff * memDiff;
}
const denominator = Math.sqrt(cpuSumSq * memSumSq);
return denominator === 0 ? 0 : numerator / denominator;
},
args: ["$data_points"],
lang: "js"
}
}
}
},
{
$project: {
server_id: "$_id",
correlation_analysis: {
cpu_memory: { $round: ["$cpu_memory_correlation", 3] },
data_points: { $size: "$data_points" },
analysis_period: timeWindow
},
_id: 0
}
}
];
return await this.metricsCollection.aggregate(pipeline).toArray();
}
async getTrendAnalysis(serverId, metricName, days = 7) {
// Trend analysis with growth rates and predictions
const daysAgo = new Date(Date.now() - days * 24 * 60 * 60 * 1000);
const pipeline = [
{
$match: {
'metadata.server_id': serverId,
timestamp: { $gte: daysAgo }
}
},
{
$group: {
_id: {
// Group by hour for trend analysis
date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } },
hour: { $hour: "$timestamp" }
},
avg_value: { $avg: `$${metricName}` },
min_value: { $min: `$${metricName}` },
max_value: { $max: `$${metricName}` },
sample_count: { $sum: 1 }
}
},
{
$sort: { "_id.date": 1, "_id.hour": 1 }
},
{
$setWindowFields: {
sortBy: { "_id.date": 1, "_id.hour": 1 },
output: {
// Calculate rate of change
previous_value: {
$shift: {
output: "$avg_value",
by: -1
}
},
// 24-hour moving average
daily_trend: {
$avg: "$avg_value",
window: {
documents: [-23, 0] // 24 hours
}
}
}
}
},
{
$addFields: {
hourly_change: {
$cond: {
if: { $ne: ["$previous_value", null] },
then: { $subtract: ["$avg_value", "$previous_value"] },
else: 0
}
},
change_percentage: {
$cond: {
if: { $and: [
{ $ne: ["$previous_value", null] },
{ $ne: ["$previous_value", 0] }
]},
then: {
$multiply: [
{ $divide: [
{ $subtract: ["$avg_value", "$previous_value"] },
"$previous_value"
]},
100
]
},
else: 0
}
}
}
},
{
$match: {
previous_value: { $ne: null } // Exclude first data point
}
},
{
$project: {
date: "$_id.date",
hour: "$_id.hour",
metric_value: { $round: ["$avg_value", 2] },
trend_value: { $round: ["$daily_trend", 2] },
hourly_change: { $round: ["$hourly_change", 2] },
change_percentage: { $round: ["$change_percentage", 1] },
volatility: {
$abs: { $subtract: ["$avg_value", "$daily_trend"] }
},
_id: 0
}
}
];
return await this.metricsCollection.aggregate(pipeline).toArray();
}
async getCapacityForecast(serverId, metricName, forecastDays = 30) {
// Simple linear regression for capacity planning
const historyDays = forecastDays * 2; // Use 2x history for prediction
const historyStart = new Date(Date.now() - historyDays * 24 * 60 * 60 * 1000);
const pipeline = [
{
$match: {
'metadata.server_id': serverId,
timestamp: { $gte: historyStart }
}
},
{
$group: {
_id: {
date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
},
daily_avg: { $avg: `$${metricName}` },
daily_max: { $max: `$${metricName}` },
sample_count: { $sum: 1 }
}
},
{
$sort: { "_id.date": 1 }
},
{
$group: {
_id: null,
daily_data: {
$push: {
date: "$_id.date",
avg_value: "$daily_avg",
max_value: "$daily_max"
}
}
}
},
{
$addFields: {
// Linear regression calculation
regression: {
$function: {
body: function(dailyData) {
const n = dailyData.length;
if (n < 7) return null; // Need minimum data points
// Convert dates to day numbers for regression
const baseDate = new Date(dailyData[0].date).getTime();
const points = dailyData.map((d, i) => ({
x: i, // Day number
y: d.avg_value
}));
// Calculate linear regression
const sumX = points.reduce((sum, p) => sum + p.x, 0);
const sumY = points.reduce((sum, p) => sum + p.y, 0);
const sumXY = points.reduce((sum, p) => sum + (p.x * p.y), 0);
const sumXX = points.reduce((sum, p) => sum + (p.x * p.x), 0);
const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
const intercept = (sumY - slope * sumX) / n;
// Calculate R-squared
const meanY = sumY / n;
const totalSS = points.reduce((sum, p) => sum + Math.pow(p.y - meanY, 2), 0);
const residualSS = points.reduce((sum, p) => {
const predicted = slope * p.x + intercept;
return sum + Math.pow(p.y - predicted, 2);
}, 0);
const rSquared = 1 - (residualSS / totalSS);
return {
slope: slope,
intercept: intercept,
correlation: Math.sqrt(Math.max(0, rSquared)),
confidence: rSquared > 0.7 ? 'high' : rSquared > 0.4 ? 'medium' : 'low'
};
},
args: ["$daily_data"],
lang: "js"
}
}
}
},
{
$project: {
current_trend: "$regression",
forecast_days: forecastDays,
historical_data: { $slice: ["$daily_data", -7] }, // Last 7 days
_id: 0
}
}
];
const results = await this.metricsCollection.aggregate(pipeline).toArray();
if (results.length > 0 && results[0].current_trend) {
const trend = results[0].current_trend;
const forecastData = [];
// Generate forecast points
for (let day = 1; day <= forecastDays; day++) {
const futureDate = new Date(Date.now() + day * 24 * 60 * 60 * 1000);
const xValue = historyDays + day;
const predictedValue = trend.slope * xValue + trend.intercept;
forecastData.push({
date: futureDate.toISOString().split('T')[0],
predicted_value: Math.round(predictedValue * 100) / 100,
confidence: trend.confidence
});
}
results[0].forecast = forecastData;
}
return results[0];
}
async getMultiServerComparison(serverIds, metricName, hours = 24) {
// Compare metrics across multiple servers
const startTime = new Date(Date.now() - hours * 60 * 60 * 1000);
const pipeline = [
{
$match: {
'metadata.server_id': { $in: serverIds },
timestamp: { $gte: startTime }
}
},
{
$group: {
_id: {
server: "$metadata.server_id",
// Hourly buckets for comparison
hour: {
$dateFromParts: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" },
hour: { $hour: "$timestamp" }
}
}
},
avg_value: { $avg: `$${metricName}` },
max_value: { $max: `$${metricName}` },
sample_count: { $sum: 1 }
}
},
{
$group: {
_id: "$_id.hour",
server_data: {
$push: {
server_id: "$_id.server",
avg_value: "$avg_value",
max_value: "$max_value",
sample_count: "$sample_count"
}
}
}
},
{
$addFields: {
// Calculate statistics across all servers for each hour
hourly_stats: {
avg_across_servers: { $avg: "$server_data.avg_value" },
max_across_servers: { $max: "$server_data.max_value" },
min_across_servers: { $min: "$server_data.avg_value" },
server_count: { $size: "$server_data" }
}
}
},
{
$sort: { "_id": 1 }
},
{
$project: {
timestamp: "$_id",
servers: "$server_data",
cluster_stats: "$hourly_stats",
_id: 0
}
}
];
return await this.metricsCollection.aggregate(pipeline).toArray();
}
}
IoT and Sensor Data Management
Real-Time Sensor Processing
Handle high-frequency IoT sensor data efficiently:
// IoT sensor data management for time series
class IoTTimeSeriesManager {
constructor(db) {
this.db = db;
this.sensorCollection = db.collection('sensor_readings');
}
async setupSensorIndexes() {
// Optimized indexes for sensor queries
await this.sensorCollection.createIndexes([
// Time range queries
{ 'timestamp': 1, 'sensor_info.device_id': 1 },
// Sensor type and location queries
{ 'sensor_info.sensor_type': 1, 'timestamp': -1 },
{ 'sensor_info.location': '2dsphere', 'timestamp': -1 },
// Multi-sensor aggregation queries
{ 'sensor_info.facility_id': 1, 'sensor_info.sensor_type': 1, 'timestamp': -1 }
]);
}
async processSensorBatch(sensorReadings) {
// High-performance batch insertion for IoT data
const documents = sensorReadings.map(reading => ({
timestamp: new Date(reading.timestamp),
temperature: reading.temperature,
humidity: reading.humidity,
pressure: reading.pressure,
vibration: reading.vibration,
sensor_info: {
device_id: reading.deviceId,
sensor_type: reading.sensorType,
location: {
type: "Point",
coordinates: [reading.longitude, reading.latitude]
},
facility_id: reading.facilityId,
installation_date: reading.installationDate,
firmware_version: reading.firmwareVersion
}
}));
try {
const result = await this.sensorCollection.insertMany(documents, {
ordered: false, // Allow partial success for high throughput
bypassDocumentValidation: false
});
return {
success: true,
insertedCount: result.insertedCount,
insertedIds: result.insertedIds
};
} catch (error) {
// Handle partial failures gracefully
return {
success: false,
error: error.message,
partialResults: error.writeErrors || []
};
}
}
async getSensorTelemetry(facilityId, sensorType, timeRange) {
// Real-time sensor monitoring dashboard
const pipeline = [
{
$match: {
'sensor_info.facility_id': facilityId,
'sensor_info.sensor_type': sensorType,
timestamp: {
$gte: timeRange.start,
$lte: timeRange.end
}
}
},
{
$group: {
_id: {
device_id: "$sensor_info.device_id",
// 15-minute intervals for real-time monitoring
interval: {
$dateFromParts: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" },
hour: { $hour: "$timestamp" },
minute: {
$multiply: [
{ $floor: { $divide: [{ $minute: "$timestamp" }, 15] } },
15
]
}
}
}
},
// Aggregate sensor readings
avg_temperature: { $avg: "$temperature" },
avg_humidity: { $avg: "$humidity" },
avg_pressure: { $avg: "$pressure" },
max_vibration: { $max: "$vibration" },
reading_count: { $sum: 1 },
// Device metadata
device_location: { $first: "$sensor_info.location" },
firmware_version: { $first: "$sensor_info.firmware_version" }
}
},
{
$addFields: {
// Health indicators
health_score: {
$switch: {
branches: [
{
case: { $lt: ["$reading_count", 3] },
then: "poor" // Too few readings
},
{
case: { $gt: ["$max_vibration", 100] },
then: "critical" // High vibration
},
{
case: { $or: [
{ $lt: ["$avg_temperature", -10] },
{ $gt: ["$avg_temperature", 50] }
]},
then: "warning" // Temperature out of range
}
],
default: "normal"
}
}
}
},
{
$group: {
_id: "$_id.interval",
devices: {
$push: {
device_id: "$_id.device_id",
measurements: {
temperature: { $round: ["$avg_temperature", 1] },
humidity: { $round: ["$avg_humidity", 1] },
pressure: { $round: ["$avg_pressure", 1] },
vibration: { $round: ["$max_vibration", 1] }
},
health: "$health_score",
reading_count: "$reading_count",
location: "$device_location"
}
},
facility_summary: {
avg_temp: { $avg: "$avg_temperature" },
avg_humidity: { $avg: "$avg_humidity" },
total_devices: { $sum: 1 },
healthy_devices: {
$sum: {
$cond: {
if: { $eq: ["$health_score", "normal"] },
then: 1,
else: 0
}
}
}
}
}
},
{
$sort: { "_id": -1 }
},
{
$limit: 24 // Last 24 intervals (6 hours of 15-min intervals)
},
{
$project: {
timestamp: "$_id",
devices: 1,
facility_summary: {
avg_temperature: { $round: ["$facility_summary.avg_temp", 1] },
avg_humidity: { $round: ["$facility_summary.avg_humidity", 1] },
device_health_ratio: {
$round: [
{ $divide: ["$facility_summary.healthy_devices", "$facility_summary.total_devices"] },
2
]
}
},
_id: 0
}
}
];
return await this.sensorCollection.aggregate(pipeline).toArray();
}
async detectSensorFailures(facilityId, timeWindowHours = 2) {
// Identify potentially failed or malfunctioning sensors
const windowStart = new Date(Date.now() - timeWindowHours * 60 * 60 * 1000);
const pipeline = [
{
$match: {
'sensor_info.facility_id': facilityId,
timestamp: { $gte: windowStart }
}
},
{
$group: {
_id: "$sensor_info.device_id",
reading_count: { $sum: 1 },
last_reading: { $max: "$timestamp" },
avg_temperature: { $avg: "$temperature" },
temp_variance: { $stdDevSamp: "$temperature" },
max_vibration: { $max: "$vibration" },
location: { $first: "$sensor_info.location" },
sensor_type: { $first: "$sensor_info.sensor_type" }
}
},
{
$addFields: {
minutes_since_last_reading: {
$divide: [
{ $subtract: [new Date(), "$last_reading"] },
60000
]
},
expected_readings: timeWindowHours * 4, // Assuming 15-min intervals
reading_ratio: {
$divide: ["$reading_count", timeWindowHours * 4]
}
}
},
{
$addFields: {
failure_indicators: {
no_recent_data: { $gt: ["$minutes_since_last_reading", 30] },
insufficient_readings: { $lt: ["$reading_ratio", 0.5] },
temperature_anomaly: { $gt: ["$temp_variance", 20] },
vibration_alert: { $gt: ["$max_vibration", 150] }
}
}
},
{
$addFields: {
failure_score: {
$add: [
{ $cond: { if: "$failure_indicators.no_recent_data", then: 3, else: 0 } },
{ $cond: { if: "$failure_indicators.insufficient_readings", then: 2, else: 0 } },
{ $cond: { if: "$failure_indicators.temperature_anomaly", then: 2, else: 0 } },
{ $cond: { if: "$failure_indicators.vibration_alert", then: 1, else: 0 } }
]
}
}
},
{
$match: {
failure_score: { $gte: 2 } // Devices with significant failure indicators
}
},
{
$sort: { failure_score: -1, minutes_since_last_reading: -1 }
},
{
$project: {
device_id: "$_id",
sensor_type: 1,
location: 1,
failure_score: 1,
failure_indicators: 1,
last_reading: 1,
minutes_since_last_reading: { $round: ["$minutes_since_last_reading", 1] },
reading_count: 1,
expected_readings: 1,
_id: 0
}
}
];
return await this.sensorCollection.aggregate(pipeline).toArray();
}
}
SQL-style sensor data analytics concepts:
-- SQL time series sensor analytics equivalent
-- IoT sensor data table with time partitioning
CREATE TABLE sensor_readings (
timestamp TIMESTAMPTZ NOT NULL,
device_id VARCHAR(50) NOT NULL,
sensor_type VARCHAR(20),
temperature DECIMAL(5,2),
humidity DECIMAL(5,2),
pressure DECIMAL(7,2),
vibration DECIMAL(6,2),
location POINT,
facility_id VARCHAR(20),
PRIMARY KEY (device_id, timestamp)
) PARTITION BY RANGE (timestamp);
-- Real-time sensor monitoring query
WITH recent_readings AS (
SELECT
device_id,
sensor_type,
AVG(temperature) as avg_temp,
AVG(humidity) as avg_humidity,
MAX(vibration) as max_vibration,
COUNT(*) as reading_count,
MAX(timestamp) as last_reading
FROM sensor_readings
WHERE timestamp >= NOW() - INTERVAL '15 minutes'
AND facility_id = 'FACILITY_001'
GROUP BY device_id, sensor_type
)
SELECT
device_id,
sensor_type,
ROUND(avg_temp, 1) as current_temperature,
ROUND(avg_humidity, 1) as current_humidity,
ROUND(max_vibration, 1) as peak_vibration,
reading_count,
CASE
WHEN EXTRACT(EPOCH FROM (NOW() - last_reading)) / 60 > 30 THEN 'OFFLINE'
WHEN max_vibration > 150 THEN 'CRITICAL'
WHEN avg_temp < -10 OR avg_temp > 50 THEN 'WARNING'
ELSE 'NORMAL'
END as device_status
FROM recent_readings
ORDER BY
CASE device_status
WHEN 'CRITICAL' THEN 1
WHEN 'WARNING' THEN 2
WHEN 'OFFLINE' THEN 3
ELSE 4
END,
device_id;
Financial Time Series Analytics
Market Data Processing
Process high-frequency financial data with time series collections:
// Financial market data time series processing
class FinancialTimeSeriesProcessor {
constructor(db) {
this.db = db;
this.marketDataCollection = db.collection('market_data');
}
async processTradeData(trades) {
// Process high-frequency trade data
const documents = trades.map(trade => ({
trade_time: new Date(trade.timestamp),
price: parseFloat(trade.price),
volume: parseInt(trade.volume),
bid_price: parseFloat(trade.bidPrice),
ask_price: parseFloat(trade.askPrice),
trade_type: trade.tradeType, // 'buy' or 'sell'
instrument: {
symbol: trade.symbol,
exchange: trade.exchange,
market_sector: trade.sector,
currency: trade.currency
}
}));
return await this.marketDataCollection.insertMany(documents, {
ordered: false
});
}
async calculateOHLCData(symbol, intervalMinutes = 5, days = 1) {
// Calculate OHLC (Open, High, Low, Close) data for charting
const startTime = new Date(Date.now() - days * 24 * 60 * 60 * 1000);
const pipeline = [
{
$match: {
'instrument.symbol': symbol,
trade_time: { $gte: startTime }
}
},
{
$group: {
_id: {
// Create time buckets for OHLC intervals
interval_start: {
$dateFromParts: {
year: { $year: "$trade_time" },
month: { $month: "$trade_time" },
day: { $dayOfMonth: "$trade_time" },
hour: { $hour: "$trade_time" },
minute: {
$multiply: [
{ $floor: { $divide: [{ $minute: "$trade_time" }, intervalMinutes] } },
intervalMinutes
]
}
}
}
},
// OHLC calculations
open_price: { $first: "$price" }, // First trade in interval
high_price: { $max: "$price" }, // Highest trade price
low_price: { $min: "$price" }, // Lowest trade price
close_price: { $last: "$price" }, // Last trade in interval
total_volume: { $sum: "$volume" },
trade_count: { $sum: 1 },
// Additional analytics
volume_weighted_price: {
$divide: [
{ $sum: { $multiply: ["$price", "$volume"] } },
{ $sum: "$volume" }
]
},
// Bid-ask spread analysis
avg_bid_ask_spread: {
$avg: { $subtract: ["$ask_price", "$bid_price"] }
}
}
},
{
$addFields: {
// Calculate price movement and volatility
price_change: { $subtract: ["$close_price", "$open_price"] },
price_range: { $subtract: ["$high_price", "$low_price"] },
volatility_ratio: {
$divide: [
{ $subtract: ["$high_price", "$low_price"] },
"$open_price"
]
}
}
},
{
$sort: { "_id.interval_start": 1 }
},
{
$project: {
timestamp: "$_id.interval_start",
ohlc: {
open: { $round: ["$open_price", 4] },
high: { $round: ["$high_price", 4] },
low: { $round: ["$low_price", 4] },
close: { $round: ["$close_price", 4] }
},
volume: "$total_volume",
trades: "$trade_count",
analytics: {
vwap: { $round: ["$volume_weighted_price", 4] },
price_change: { $round: ["$price_change", 4] },
volatility: { $round: ["$volatility_ratio", 6] },
avg_spread: { $round: ["$avg_bid_ask_spread", 4] }
},
_id: 0
}
}
];
return await this.marketDataCollection.aggregate(pipeline).toArray();
}
async detectTradingPatterns(symbol, lookbackHours = 4) {
// Pattern recognition for algorithmic trading
const startTime = new Date(Date.now() - lookbackHours * 60 * 60 * 1000);
const pipeline = [
{
$match: {
'instrument.symbol': symbol,
trade_time: { $gte: startTime }
}
},
{
$sort: { trade_time: 1 }
},
{
$setWindowFields: {
sortBy: { trade_time: 1 },
output: {
// Moving averages for technical analysis
sma_5: {
$avg: "$price",
window: { documents: [-4, 0] } // 5-point simple moving average
},
sma_20: {
$avg: "$price",
window: { documents: [-19, 0] } // 20-point simple moving average
},
// Price momentum indicators
price_change_1: {
$subtract: [
"$price",
{ $shift: { output: "$price", by: -1 } }
]
},
// Volume analysis
volume_ratio: {
$divide: [
"$volume",
{
$avg: "$volume",
window: { documents: [-9, 0] } // 10-period volume average
}
]
}
}
}
},
{
$addFields: {
// Technical indicators
trend_signal: {
$cond: {
if: { $gt: ["$sma_5", "$sma_20"] },
then: "bullish",
else: "bearish"
}
},
momentum_signal: {
$switch: {
branches: [
{ case: { $gt: ["$price_change_1", 0.01] }, then: "strong_buy" },
{ case: { $gt: ["$price_change_1", 0] }, then: "buy" },
{ case: { $lt: ["$price_change_1", -0.01] }, then: "strong_sell" },
{ case: { $lt: ["$price_change_1", 0] }, then: "sell" }
],
default: "hold"
}
},
volume_signal: {
$cond: {
if: { $gt: ["$volume_ratio", 1.5] },
then: "high_volume",
else: "normal_volume"
}
}
}
},
{
$match: {
sma_5: { $ne: null }, // Exclude initial points without moving averages
sma_20: { $ne: null }
}
},
{
$project: {
trade_time: 1,
price: { $round: ["$price", 4] },
volume: 1,
technical_indicators: {
sma_5: { $round: ["$sma_5", 4] },
sma_20: { $round: ["$sma_20", 4] },
trend: "$trend_signal",
momentum: "$momentum_signal",
volume: "$volume_signal"
},
_id: 0
}
},
{
$sort: { trade_time: -1 }
},
{
$limit: 100
}
];
return await this.marketDataCollection.aggregate(pipeline).toArray();
}
}
QueryLeaf Time Series Integration
QueryLeaf provides SQL-familiar syntax for time series operations with MongoDB's optimized storage:
-- QueryLeaf time series operations with SQL-style syntax
-- Time range queries with familiar SQL date functions
SELECT
sensor_info.device_id,
sensor_info.facility_id,
AVG(temperature) as avg_temperature,
MAX(humidity) as max_humidity,
COUNT(*) as reading_count
FROM sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND sensor_info.sensor_type = 'environmental'
GROUP BY sensor_info.device_id, sensor_info.facility_id
ORDER BY avg_temperature DESC;
-- Time bucketing using SQL date functions
SELECT
DATE_TRUNC('hour', timestamp) as hour_bucket,
instrument.symbol,
FIRST(price ORDER BY trade_time) as open_price,
MAX(price) as high_price,
MIN(price) as low_price,
LAST(price ORDER BY trade_time) as close_price,
SUM(volume) as total_volume,
COUNT(*) as trade_count
FROM market_data
WHERE trade_time >= CURRENT_DATE - INTERVAL '7 days'
AND instrument.symbol IN ('AAPL', 'GOOGL', 'MSFT')
GROUP BY hour_bucket, instrument.symbol
ORDER BY hour_bucket DESC, instrument.symbol;
-- Window functions for technical analysis
SELECT
trade_time,
instrument.symbol,
price,
volume,
AVG(price) OVER (
PARTITION BY instrument.symbol
ORDER BY trade_time
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) as sma_5,
AVG(price) OVER (
PARTITION BY instrument.symbol
ORDER BY trade_time
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) as sma_20
FROM market_data
WHERE trade_time >= CURRENT_TIMESTAMP - INTERVAL '4 hours'
AND instrument.symbol = 'BTC-USD'
ORDER BY trade_time DESC;
-- Sensor anomaly detection using SQL analytics
WITH sensor_stats AS (
SELECT
sensor_info.device_id,
timestamp,
temperature,
AVG(temperature) OVER (
PARTITION BY sensor_info.device_id
ORDER BY timestamp
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) as rolling_avg,
STDDEV(temperature) OVER (
PARTITION BY sensor_info.device_id
ORDER BY timestamp
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) as rolling_std
FROM sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND sensor_info.facility_id = 'PLANT_001'
)
SELECT
device_id,
timestamp,
temperature,
rolling_avg,
ABS(temperature - rolling_avg) as deviation,
rolling_std * 2 as anomaly_threshold,
CASE
WHEN ABS(temperature - rolling_avg) > rolling_std * 3 THEN 'CRITICAL'
WHEN ABS(temperature - rolling_avg) > rolling_std * 2 THEN 'WARNING'
ELSE 'NORMAL'
END as anomaly_level
FROM sensor_stats
WHERE ABS(temperature - rolling_avg) > rolling_std * 2
ORDER BY timestamp DESC;
-- QueryLeaf automatically optimizes for:
-- 1. Time series collection bucketing and compression
-- 2. Time-based index utilization for range queries
-- 3. Efficient aggregation pipelines for time bucketing
-- 4. Window function translation to MongoDB analytics
-- 5. Date/time function mapping to MongoDB operators
-- 6. Automatic data lifecycle management
-- Capacity planning with growth analysis
WITH daily_metrics AS (
SELECT
DATE_TRUNC('day', timestamp) as metric_date,
metadata.server_id,
AVG(cpu_usage) as daily_avg_cpu,
MAX(memory_usage) as daily_peak_memory
FROM server_metrics
WHERE timestamp >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY metric_date, metadata.server_id
),
growth_analysis AS (
SELECT
server_id,
metric_date,
daily_avg_cpu,
daily_peak_memory,
LAG(daily_avg_cpu, 7) OVER (PARTITION BY server_id ORDER BY metric_date) as cpu_week_ago,
AVG(daily_avg_cpu) OVER (
PARTITION BY server_id
ORDER BY metric_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) as cpu_30_day_avg
FROM daily_metrics
)
SELECT
server_id,
daily_avg_cpu as current_cpu,
cpu_30_day_avg,
CASE
WHEN cpu_week_ago IS NOT NULL
THEN ((daily_avg_cpu - cpu_week_ago) / cpu_week_ago) * 100
ELSE NULL
END as weekly_growth_percent,
CASE
WHEN daily_avg_cpu > cpu_30_day_avg * 1.2 THEN 'SCALING_NEEDED'
WHEN daily_avg_cpu > cpu_30_day_avg * 1.1 THEN 'MONITOR_CLOSELY'
ELSE 'NORMAL_CAPACITY'
END as capacity_status
FROM growth_analysis
WHERE metric_date = CURRENT_DATE - INTERVAL '1 day'
ORDER BY weekly_growth_percent DESC NULLS LAST;
Data Lifecycle and Retention
Automated Data Management
Implement intelligent data lifecycle policies:
// Time series data lifecycle management
class TimeSeriesLifecycleManager {
constructor(db) {
this.db = db;
this.retentionPolicies = new Map();
}
defineRetentionPolicy(collection, policy) {
this.retentionPolicies.set(collection, {
hotDataDays: policy.hotDataDays || 7, // High-frequency access
warmDataDays: policy.warmDataDays || 90, // Moderate access
coldDataDays: policy.coldDataDays || 365, // Archive access
deleteAfterDays: policy.deleteAfterDays || 2555 // 7 years
});
}
async applyDataLifecycle(collection) {
const policy = this.retentionPolicies.get(collection);
if (!policy) return;
const now = new Date();
const hotCutoff = new Date(now.getTime() - policy.hotDataDays * 24 * 60 * 60 * 1000);
const warmCutoff = new Date(now.getTime() - policy.warmDataDays * 24 * 60 * 60 * 1000);
const coldCutoff = new Date(now.getTime() - policy.coldDataDays * 24 * 60 * 60 * 1000);
const deleteCutoff = new Date(now.getTime() - policy.deleteAfterDays * 24 * 60 * 60 * 1000);
// Archive warm data (compress and move to separate collection)
await this.archiveWarmData(collection, warmCutoff, coldCutoff);
// Move cold data to archive storage
await this.moveColdData(collection, coldCutoff, deleteCutoff);
// Delete expired data
await this.deleteExpiredData(collection, deleteCutoff);
return {
hotDataCutoff: hotCutoff,
warmDataCutoff: warmCutoff,
coldDataCutoff: coldCutoff,
deleteCutoff: deleteCutoff
};
}
async archiveWarmData(collection, startTime, endTime) {
const archiveCollection = `${collection}_archive`;
// Aggregate and compress warm data
const pipeline = [
{
$match: {
timestamp: { $gte: startTime, $lt: endTime }
}
},
{
$group: {
_id: {
// Compress to hourly aggregates
hour: {
$dateFromParts: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" },
hour: { $hour: "$timestamp" }
}
},
metadata: "$metadata"
},
// Statistical aggregates preserve essential information
avg_values: {
cpu_usage: { $avg: "$cpu_usage" },
memory_usage: { $avg: "$memory_usage" },
disk_io: { $avg: "$disk_io" }
},
max_values: {
cpu_usage: { $max: "$cpu_usage" },
memory_usage: { $max: "$memory_usage" },
disk_io: { $max: "$disk_io" }
},
min_values: {
cpu_usage: { $min: "$cpu_usage" },
memory_usage: { $min: "$memory_usage" },
disk_io: { $min: "$disk_io" }
},
sample_count: { $sum: 1 },
first_reading: { $min: "$timestamp" },
last_reading: { $max: "$timestamp" }
}
},
{
$addFields: {
archived_at: new Date(),
data_type: "hourly_aggregate",
original_collection: collection
}
},
{
$out: archiveCollection
}
];
await this.db.collection(collection).aggregate(pipeline).toArray();
// Remove original data after successful archival
const deleteResult = await this.db.collection(collection).deleteMany({
timestamp: { $gte: startTime, $lt: endTime }
});
return {
archivedDocuments: deleteResult.deletedCount,
archiveCollection: archiveCollection
};
}
}
Advanced Time Series Analytics
Complex Time-Based Aggregations
Implement sophisticated analytics operations:
// Advanced time series analytics operations
class TimeSeriesAnalyticsEngine {
constructor(db) {
this.db = db;
}
async generateTimeSeriesForecast(collection, field, options = {}) {
// Time series forecasting using exponential smoothing
const days = options.historyDays || 30;
const forecastDays = options.forecastDays || 7;
const startTime = new Date(Date.now() - days * 24 * 60 * 60 * 1000);
const pipeline = [
{
$match: {
timestamp: { $gte: startTime },
[field]: { $exists: true, $ne: null }
}
},
{
$group: {
_id: {
date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
},
daily_avg: { $avg: `$${field}` },
daily_count: { $sum: 1 }
}
},
{
$sort: { "_id.date": 1 }
},
{
$group: {
_id: null,
daily_series: {
$push: {
date: "$_id.date",
value: "$daily_avg",
sample_size: "$daily_count"
}
}
}
},
{
$addFields: {
// Calculate exponential smoothing forecast
forecast: {
$function: {
body: function(dailySeries, forecastDays) {
if (dailySeries.length < 7) return null;
// Exponential smoothing parameters
const alpha = 0.3; // Smoothing factor
const beta = 0.1; // Trend factor
let level = dailySeries[0].value;
let trend = 0;
// Calculate initial trend
if (dailySeries.length >= 2) {
trend = dailySeries[1].value - dailySeries[0].value;
}
const smoothed = [];
const forecasts = [];
// Apply exponential smoothing to historical data
for (let i = 0; i < dailySeries.length; i++) {
const actual = dailySeries[i].value;
if (i > 0) {
const forecast = level + trend;
const error = actual - forecast;
// Update level and trend
const newLevel = alpha * actual + (1 - alpha) * (level + trend);
const newTrend = beta * (newLevel - level) + (1 - beta) * trend;
level = newLevel;
trend = newTrend;
}
smoothed.push({
date: dailySeries[i].date,
actual: actual,
smoothed: level,
trend: trend
});
}
// Generate future forecasts
for (let i = 1; i <= forecastDays; i++) {
const forecastValue = level + (trend * i);
const futureDate = new Date(new Date(dailySeries[dailySeries.length - 1].date).getTime() + i * 24 * 60 * 60 * 1000);
forecasts.push({
date: futureDate.toISOString().split('T')[0],
forecast_value: Math.round(forecastValue * 100) / 100,
confidence: Math.max(0.1, 1 - (i * 0.1)) // Decreasing confidence
});
}
return {
historical_smoothing: smoothed,
forecasts: forecasts,
model_parameters: {
alpha: alpha,
beta: beta,
final_level: level,
final_trend: trend
}
};
},
args: ["$daily_series", forecastDays],
lang: "js"
}
}
}
},
{
$project: {
field_name: field,
forecast_analysis: "$forecast",
data_points: { $size: "$daily_series" },
forecast_period_days: forecastDays,
_id: 0
}
}
];
const results = await this.db.collection(collection).aggregate(pipeline).toArray();
return results[0];
}
async correlateTimeSeriesMetrics(collection, metrics, timeWindow) {
// Cross-metric correlation analysis
const startTime = new Date(Date.now() - timeWindow);
const pipeline = [
{
$match: {
timestamp: { $gte: startTime }
}
},
{
$group: {
_id: {
// Hourly buckets for correlation
hour: {
$dateFromParts: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" },
hour: { $hour: "$timestamp" }
}
},
server: "$metadata.server_id"
},
// Average metrics for each hour/server combination
hourly_metrics: {
$push: metrics.reduce((obj, metric) => {
obj[metric] = { $avg: `$${metric}` };
return obj;
}, {})
}
}
},
{
$group: {
_id: "$_id.server",
metric_series: { $push: "$hourly_metrics" }
}
},
{
$addFields: {
correlations: {
$function: {
body: function(metricSeries, metricNames) {
const correlations = {};
// Calculate pairwise correlations
for (let i = 0; i < metricNames.length; i++) {
for (let j = i + 1; j < metricNames.length; j++) {
const metric1 = metricNames[i];
const metric2 = metricNames[j];
const values1 = metricSeries.map(s => s[0][metric1]);
const values2 = metricSeries.map(s => s[0][metric2]);
const correlation = calculateCorrelation(values1, values2);
correlations[`${metric1}_${metric2}`] = Math.round(correlation * 1000) / 1000;
}
}
function calculateCorrelation(x, y) {
const n = x.length;
if (n !== y.length || n < 2) return 0;
const sumX = x.reduce((a, b) => a + b, 0);
const sumY = y.reduce((a, b) => a + b, 0);
const sumXY = x.reduce((sum, xi, i) => sum + xi * y[i], 0);
const sumXX = x.reduce((sum, xi) => sum + xi * xi, 0);
const sumYY = y.reduce((sum, yi) => sum + yi * yi, 0);
const numerator = n * sumXY - sumX * sumY;
const denominator = Math.sqrt((n * sumXX - sumX * sumX) * (n * sumYY - sumY * sumY));
return denominator === 0 ? 0 : numerator / denominator;
}
return correlations;
},
args: ["$metric_series", metrics],
lang: "js"
}
}
}
},
{
$project: {
server_id: "$_id",
metric_correlations: "$correlations",
analysis_period: timeWindow,
_id: 0
}
}
];
return await this.db.collection(collection).aggregate(pipeline).toArray();
}
}
Best Practices for Time Series Collections
Design Guidelines
Essential practices for MongoDB time series implementations:
- Time Field Selection: Choose appropriate time field granularity based on data frequency
- Metadata Organization: Structure metadata for efficient querying and aggregation
- Index Strategy: Create time-based compound indexes for common query patterns
- Bucket Configuration: Optimize bucket sizes based on data insertion patterns
- Retention Policies: Implement automatic data lifecycle management
- Compression Strategy: Use MongoDB's time series compression for storage efficiency
Performance Optimization
Optimize time series collection performance:
- Write Optimization: Use batch inserts and optimize insertion order by timestamp
- Query Patterns: Design queries to leverage time series optimizations and indexes
- Aggregation Efficiency: Use time bucketing and window functions for analytics
- Memory Management: Monitor working set size and adjust based on query patterns
- Sharding Strategy: Implement time-based sharding for horizontal scaling
- Cache Strategy: Cache frequently accessed time ranges and aggregations
Conclusion
MongoDB time series collections provide specialized optimizations for time-stamped data workloads, delivering high-performance storage, querying, and analytics capabilities. Combined with SQL-style query patterns, time series collections enable familiar database operations while leveraging MongoDB's optimization advantages for temporal data.
Key time series benefits include:
- Storage Efficiency: Automatic bucketing and compression reduce storage overhead by 70%+
- Write Performance: Optimized insertion patterns for high-frequency data streams
- Query Optimization: Time-based indexes and aggregation pipelines designed for temporal queries
- Analytics Integration: Built-in support for windowing functions and statistical operations
- Lifecycle Management: Automated data aging and retention policy enforcement
Whether you're building IoT monitoring systems, financial analytics platforms, or application performance dashboards, MongoDB time series collections with QueryLeaf's familiar SQL interface provide the foundation for scalable time-based data processing. This combination enables you to implement powerful temporal analytics while preserving the development patterns and query approaches your team already knows.
QueryLeaf Integration: QueryLeaf automatically detects time series collections and optimizes SQL queries to leverage MongoDB's time series storage and indexing optimizations. Window functions, date operations, and time-based grouping are seamlessly translated to efficient MongoDB aggregation pipelines designed for temporal data patterns.
The integration of specialized time series storage with SQL-style temporal analytics makes MongoDB an ideal platform for applications requiring both high-performance time data processing and familiar database interaction patterns, ensuring your time series analytics remain both comprehensive and maintainable as data volumes scale.