MongoDB Time Series Collections for IoT and Real-Time Analytics: High-Performance Sensor Data Management and Stream Processing
Modern IoT applications generate massive volumes of time-stamped sensor data that require specialized storage and query optimization strategies. Traditional relational databases struggle with the volume, velocity, and analytical requirements of IoT workloads, particularly when dealing with millions of data points per second from distributed sensor networks, real-time alerting systems, and complex analytical queries across historical time ranges.
MongoDB time series collections provide purpose-built storage and query optimization specifically designed for time-stamped data, offering automatic data organization, specialized compression algorithms, and optimized aggregation pipelines that can handle high-velocity IoT data ingestion while supporting real-time analytics and historical trend analysis at scale.
The IoT Data Challenge
Traditional approaches to storing and analyzing time series data face significant scalability and performance limitations:
-- Traditional PostgreSQL time series approach - performance bottlenecks
CREATE TABLE sensor_readings (
reading_id BIGSERIAL PRIMARY KEY,
device_id VARCHAR(50) NOT NULL,
sensor_type VARCHAR(50) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
value DECIMAL(15,4) NOT NULL,
unit VARCHAR(20),
location_lat DECIMAL(10,8),
location_lng DECIMAL(11,8),
-- Basic metadata
device_status VARCHAR(20) DEFAULT 'online',
data_quality INTEGER DEFAULT 100,
CONSTRAINT valid_quality CHECK (data_quality BETWEEN 0 AND 100)
);
-- Partitioning by time (complex maintenance)
CREATE TABLE sensor_readings_2025_01 PARTITION OF sensor_readings
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE sensor_readings_2025_02 PARTITION OF sensor_readings
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- Multiple indexes required for different query patterns
CREATE INDEX idx_sensor_device_time ON sensor_readings (device_id, timestamp);
CREATE INDEX idx_sensor_type_time ON sensor_readings (sensor_type, timestamp);
CREATE INDEX idx_sensor_location ON sensor_readings (location_lat, location_lng);
CREATE INDEX idx_sensor_timestamp ON sensor_readings (timestamp);
-- Complex aggregation queries for analytics
WITH hourly_averages AS (
SELECT
device_id,
sensor_type,
DATE_TRUNC('hour', timestamp) as hour_bucket,
AVG(value) as avg_value,
COUNT(*) as reading_count,
MIN(value) as min_value,
MAX(value) as max_value,
STDDEV(value) as stddev_value
FROM sensor_readings
WHERE timestamp >= NOW() - INTERVAL '24 hours'
AND device_status = 'online'
AND data_quality > 80
GROUP BY device_id, sensor_type, DATE_TRUNC('hour', timestamp)
),
device_statistics AS (
SELECT
device_id,
sensor_type,
COUNT(*) as total_hours,
AVG(avg_value) as daily_average,
MAX(max_value) as daily_peak,
MIN(min_value) as daily_low,
AVG(reading_count) as avg_readings_per_hour,
-- Calculate trend using linear regression approximation
CASE
WHEN COUNT(*) > 1 THEN
(COUNT(*) * SUM(EXTRACT(EPOCH FROM hour_bucket) * avg_value) -
SUM(EXTRACT(EPOCH FROM hour_bucket)) * SUM(avg_value)) /
(COUNT(*) * SUM(POWER(EXTRACT(EPOCH FROM hour_bucket), 2)) -
POWER(SUM(EXTRACT(EPOCH FROM hour_bucket)), 2))
ELSE 0
END as trend_slope
FROM hourly_averages
GROUP BY device_id, sensor_type
)
-- Final aggregation (performance intensive)
SELECT
ds.device_id,
ds.sensor_type,
ROUND(ds.daily_average, 2) as avg_24h,
ROUND(ds.daily_peak, 2) as peak_24h,
ROUND(ds.daily_low, 2) as low_24h,
ROUND(ds.avg_readings_per_hour, 0) as readings_per_hour,
-- Trend analysis
CASE
WHEN ds.trend_slope > 0.1 THEN 'rising'
WHEN ds.trend_slope < -0.1 THEN 'falling'
ELSE 'stable'
END as trend_direction,
-- Alert conditions
CASE
WHEN ds.daily_peak > 100 THEN 'high_alert'
WHEN ds.daily_low < 10 THEN 'low_alert'
WHEN ds.avg_readings_per_hour < 5 THEN 'connectivity_alert'
ELSE 'normal'
END as alert_status,
ds.total_hours
FROM device_statistics ds
WHERE ds.total_hours >= 20 -- At least 20 hours of data
ORDER BY ds.device_id, ds.sensor_type;
-- Challenges with traditional time series approaches:
-- 1. Storage overhead - separate tables and partition management
-- 2. Index explosion - multiple indexes needed for various query patterns
-- 3. Query complexity - complex CTEs and window functions for basic analytics
-- 4. Maintenance burden - manual partition creation and cleanup
-- 5. Limited compression - basic storage compression insufficient for time series patterns
-- 6. Scaling bottlenecks - horizontal scaling requires complex sharding strategies
-- 7. Real-time constraints - difficult to optimize for both writes and analytics
-- 8. Data lifecycle management - complex procedures for archiving and cleanup
-- Real-time ingestion performance issues
INSERT INTO sensor_readings (
device_id, sensor_type, timestamp, value, unit,
location_lat, location_lng, device_status, data_quality
)
SELECT
'device_' || (random() * 1000)::int,
CASE (random() * 5)::int
WHEN 0 THEN 'temperature'
WHEN 1 THEN 'humidity'
WHEN 2 THEN 'pressure'
WHEN 3 THEN 'light'
ELSE 'motion'
END,
NOW() - (random() * interval '1 hour'),
random() * 100,
CASE (random() * 5)::int
WHEN 0 THEN 'celsius'
WHEN 1 THEN 'percent'
WHEN 2 THEN 'pascal'
WHEN 3 THEN 'lux'
ELSE 'boolean'
END,
40.7128 + (random() - 0.5) * 0.1,
-74.0060 + (random() - 0.5) * 0.1,
CASE WHEN random() > 0.1 THEN 'online' ELSE 'offline' END,
80 + (random() * 20)::int
FROM generate_series(1, 10000) -- 10K inserts - already showing performance issues
ON CONFLICT DO NOTHING;
-- Problems:
-- 1. Linear performance degradation with data volume
-- 2. Index maintenance overhead during high-velocity writes
-- 3. Lock contention during concurrent analytics and writes
-- 4. Complex query optimization required for time-range queries
-- 5. Storage bloat due to lack of time-series specific compression
-- 6. Difficult to implement real-time alerting efficiently
-- 7. Complex setup for distributed deployments across geographic regions
-- 8. Limited built-in support for time-series specific operations
MongoDB Time Series Collections eliminate these limitations with purpose-built time series capabilities:
// MongoDB Time Series Collections - optimized for IoT and analytics workloads
const { MongoClient, ObjectId } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
const db = client.db('iot_platform');
class MongoDBTimeSeriesManager {
constructor(db) {
this.db = db;
// Time series collections with automatic optimization
this.sensorData = null;
this.deviceMetrics = null;
this.analyticsCache = null;
this.initializeTimeSeriesCollections();
}
async initializeTimeSeriesCollections() {
console.log('Setting up optimized time series collections...');
try {
// Primary sensor data collection
await this.db.createCollection('sensor_readings', {
timeseries: {
timeField: 'timestamp',
metaField: 'device',
granularity: 'seconds', // Optimal for IoT sensor data
bucketMaxSpanSeconds: 3600, // 1-hour buckets
bucketRoundingSeconds: 60 // Round to nearest minute
},
expireAfterSeconds: 31536000, // 1 year retention
storageEngine: { wiredTiger: { configString: 'block_compressor=zstd' } }
});
// Device analytics and metrics collection
await this.db.createCollection('device_metrics', {
timeseries: {
timeField: 'timestamp',
metaField: 'device_info',
granularity: 'minutes', // Aggregated data points
bucketMaxSpanSeconds: 86400, // 24-hour buckets for analytics
bucketRoundingSeconds: 3600 // Round to nearest hour
},
expireAfterSeconds: 94608000 // 3 year retention for analytics
});
// Real-time analytics cache for dashboard queries
await this.db.createCollection('analytics_cache', {
timeseries: {
timeField: 'computed_at',
metaField: 'computation_type',
granularity: 'minutes'
},
expireAfterSeconds: 604800 // 1 week cache retention
});
this.sensorData = this.db.collection('sensor_readings');
this.deviceMetrics = this.db.collection('device_metrics');
this.analyticsCache = this.db.collection('analytics_cache');
// Create specialized indexes for time series queries
await this.createOptimizedIndexes();
console.log('Time series collections initialized successfully');
} catch (error) {
console.error('Error initializing time series collections:', error);
throw error;
}
}
async createOptimizedIndexes() {
console.log('Creating optimized time series indexes...');
// Sensor data indexes
await this.sensorData.createIndexes([
// Compound index for device + time range queries
{
key: { 'device.id': 1, timestamp: 1 },
name: 'device_time_optimal'
},
// Sensor type + time for analytics
{
key: { 'device.sensor_type': 1, timestamp: 1 },
name: 'sensor_type_time'
},
// Geospatial index for location-based queries
{
key: { 'device.location': '2dsphere' },
name: 'device_location_geo'
},
// Value range index for threshold queries
{
key: { 'measurements.value': 1, timestamp: 1 },
name: 'value_threshold_time',
partialFilterExpression: { 'measurements.value': { $exists: true } }
}
]);
// Device metrics indexes
await this.deviceMetrics.createIndexes([
{
key: { 'device_info.id': 1, timestamp: -1 },
name: 'device_metrics_latest'
},
{
key: { 'device_info.facility': 1, timestamp: 1 },
name: 'facility_time_series'
}
]);
console.log('Time series indexes created successfully');
}
async ingestSensorData(deviceId, sensorType, measurements, metadata = {}) {
const timestamp = new Date();
try {
const document = {
timestamp: timestamp,
// Metadata field for efficient bucketing
device: {
id: deviceId,
sensor_type: sensorType,
facility: metadata.facility || 'default',
zone: metadata.zone || 'unspecified',
location: metadata.location ? {
type: 'Point',
coordinates: [metadata.location.lng, metadata.location.lat]
} : null,
// Device characteristics
model: metadata.model || 'unknown',
firmware_version: metadata.firmware_version || '1.0',
installation_date: metadata.installation_date,
// Network information
network_info: {
connection_type: metadata.connection_type || 'wifi',
signal_strength: metadata.signal_strength || -50,
gateway_id: metadata.gateway_id
}
},
// Time series measurements
measurements: this.normalizeMeasurements(measurements),
// Data quality and status
quality_metrics: {
data_quality_score: this.calculateDataQuality(measurements, metadata),
sensor_health: metadata.sensor_health || 'normal',
calibration_status: metadata.calibration_status || 'valid',
measurement_accuracy: metadata.measurement_accuracy || 0.95
},
// Processing metadata
processing_info: {
ingestion_timestamp: timestamp,
processing_latency_ms: 0,
source: metadata.source || 'sensor_direct',
batch_id: metadata.batch_id,
schema_version: '2.0'
}
};
const result = await this.sensorData.insertOne(document);
// Trigger real-time processing if enabled
if (metadata.enable_realtime_processing !== false) {
await this.processRealTimeAnalytics(document);
}
return {
success: true,
documentId: result.insertedId,
timestamp: timestamp,
bucketed: true, // Time series collections automatically bucket
processing_time_ms: Date.now() - timestamp.getTime()
};
} catch (error) {
console.error('Sensor data ingestion failed:', error);
return {
success: false,
error: error.message,
timestamp: timestamp
};
}
}
normalizeMeasurements(rawMeasurements) {
// Normalize different measurement formats into consistent structure
const normalized = {};
if (Array.isArray(rawMeasurements)) {
// Handle array of measurement objects
rawMeasurements.forEach(measurement => {
if (measurement.type && measurement.value !== undefined) {
normalized[measurement.type] = {
value: Number(measurement.value),
unit: measurement.unit || '',
precision: measurement.precision || 2,
range: measurement.range || { min: null, max: null }
};
}
});
} else if (typeof rawMeasurements === 'object') {
// Handle object with measurement properties
Object.entries(rawMeasurements).forEach(([key, value]) => {
if (typeof value === 'number') {
normalized[key] = {
value: value,
unit: '',
precision: 2,
range: { min: null, max: null }
};
} else if (typeof value === 'object' && value.value !== undefined) {
normalized[key] = {
value: Number(value.value),
unit: value.unit || '',
precision: value.precision || 2,
range: value.range || { min: null, max: null }
};
}
});
}
return normalized;
}
calculateDataQuality(measurements, metadata) {
let qualityScore = 100;
// Check signal strength impact
if (metadata.signal_strength < -80) {
qualityScore -= 20;
} else if (metadata.signal_strength < -70) {
qualityScore -= 10;
}
// Check measurement consistency
Object.values(measurements).forEach(measurement => {
if (typeof measurement === 'object' && measurement.value !== undefined) {
const value = Number(measurement.value);
const range = measurement.range;
if (range && range.min !== null && range.max !== null) {
if (value < range.min || value > range.max) {
qualityScore -= 15; // Out of expected range
}
}
// Check for anomalous readings
if (isNaN(value) || !isFinite(value)) {
qualityScore -= 30;
}
}
});
return Math.max(0, qualityScore);
}
async processRealTimeAnalytics(document) {
const deviceId = document.device.id;
const timestamp = document.timestamp;
// Real-time threshold monitoring
await this.checkAlertThresholds(document);
// Update device status and health metrics
await this.updateDeviceHealthMetrics(deviceId, document);
// Calculate rolling averages for dashboard
await this.updateRollingAverages(deviceId, document);
}
async checkAlertThresholds(document) {
const measurements = document.measurements;
const deviceId = document.device.id;
const sensorType = document.device.sensor_type;
// Define threshold rules (could be stored in configuration collection)
const thresholds = {
temperature: { min: -10, max: 60, critical: 80 },
humidity: { min: 0, max: 100, critical: 95 },
pressure: { min: 900, max: 1100, critical: 1200 },
light: { min: 0, max: 100000, critical: 120000 },
motion: { min: 0, max: 1, critical: null }
};
const sensorThresholds = thresholds[sensorType];
if (!sensorThresholds) return;
Object.entries(measurements).forEach(async ([measurementType, measurement]) => {
const value = measurement.value;
const threshold = sensorThresholds;
let alertLevel = null;
let alertMessage = null;
if (threshold.critical && value > threshold.critical) {
alertLevel = 'critical';
alertMessage = `Critical ${measurementType} level: ${value} (threshold: ${threshold.critical})`;
} else if (value > threshold.max) {
alertLevel = 'high';
alertMessage = `High ${measurementType} level: ${value} (max: ${threshold.max})`;
} else if (value < threshold.min) {
alertLevel = 'low';
alertMessage = `Low ${measurementType} level: ${value} (min: ${threshold.min})`;
}
if (alertLevel) {
await this.createAlert({
device_id: deviceId,
sensor_type: sensorType,
measurement_type: measurementType,
alert_level: alertLevel,
message: alertMessage,
value: value,
threshold: threshold,
timestamp: document.timestamp,
location: document.device.location
});
}
});
}
async createAlert(alertData) {
const alertsCollection = this.db.collection('alerts');
const alert = {
_id: new ObjectId(),
...alertData,
created_at: new Date(),
status: 'active',
acknowledged: false,
acknowledged_by: null,
acknowledged_at: null,
resolved: false,
resolved_at: null,
escalation_level: 0,
// Alert metadata
correlation_id: `${alertData.device_id}_${alertData.sensor_type}_${alertData.measurement_type}`,
alert_hash: this.calculateAlertHash(alertData)
};
// Check for duplicate recent alerts (deduplication)
const recentAlert = await alertsCollection.findOne({
alert_hash: alert.alert_hash,
created_at: { $gte: new Date(Date.now() - 300000) }, // Last 5 minutes
status: 'active'
});
if (!recentAlert) {
await alertsCollection.insertOne(alert);
// Trigger real-time notifications
await this.sendAlertNotification(alert);
} else {
// Update escalation level for repeated alerts
await alertsCollection.updateOne(
{ _id: recentAlert._id },
{
$inc: { escalation_level: 1 },
$set: { last_occurrence: new Date() }
}
);
}
}
calculateAlertHash(alertData) {
const crypto = require('crypto');
const hashString = `${alertData.device_id}:${alertData.sensor_type}:${alertData.measurement_type}:${alertData.alert_level}`;
return crypto.createHash('md5').update(hashString).digest('hex');
}
async sendAlertNotification(alert) {
// Implementation would integrate with notification systems
console.log(`ALERT [${alert.alert_level.toUpperCase()}]: ${alert.message}`);
// Here you would integrate with:
// - Email/SMS services
// - Slack/Teams webhooks
// - PagerDuty/OpsGenie
// - Custom notification APIs
}
async updateDeviceHealthMetrics(deviceId, document) {
const now = new Date();
// Calculate device health score based on multiple factors
const healthMetrics = {
timestamp: now,
device_info: {
id: deviceId,
facility: document.device.facility,
zone: document.device.zone
},
health_indicators: {
data_quality_score: document.quality_metrics.data_quality_score,
signal_strength: document.device.network_info.signal_strength,
sensor_health: document.quality_metrics.sensor_health,
measurement_frequency: await this.calculateMeasurementFrequency(deviceId),
last_communication: now,
// Calculated health score
overall_health_score: this.calculateOverallHealthScore(document),
// Status indicators
is_online: true,
is_responsive: true,
calibration_valid: document.quality_metrics.calibration_status === 'valid'
},
performance_metrics: {
uptime_percentage: await this.calculateUptimePercentage(deviceId),
average_response_time_ms: document.processing_info.processing_latency_ms,
data_completeness_percentage: 100, // Could be calculated based on expected vs actual measurements
error_rate_percentage: 0 // Could be calculated from failed measurements
}
};
await this.deviceMetrics.insertOne(healthMetrics);
}
calculateOverallHealthScore(document) {
let score = 100;
// Factor in data quality
score = score * (document.quality_metrics.data_quality_score / 100);
// Factor in signal strength
const signalStrength = document.device.network_info.signal_strength;
if (signalStrength < -80) {
score *= 0.8;
} else if (signalStrength < -70) {
score *= 0.9;
}
// Factor in sensor health
if (document.quality_metrics.sensor_health !== 'normal') {
score *= 0.7;
}
return Math.round(score);
}
async calculateMeasurementFrequency(deviceId, windowMinutes = 60) {
const windowStart = new Date(Date.now() - windowMinutes * 60 * 1000);
const count = await this.sensorData.countDocuments({
'device.id': deviceId,
timestamp: { $gte: windowStart }
});
return count / windowMinutes; // Measurements per minute
}
async calculateUptimePercentage(deviceId, windowHours = 24) {
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000);
// Get expected measurement intervals (assuming every minute)
const expectedMeasurements = windowHours * 60;
const actualMeasurements = await this.sensorData.countDocuments({
'device.id': deviceId,
timestamp: { $gte: windowStart }
});
return Math.min(100, (actualMeasurements / expectedMeasurements) * 100);
}
async updateRollingAverages(deviceId, document) {
// Update cached analytics for dashboard performance
const measurementTypes = Object.keys(document.measurements);
for (const measurementType of measurementTypes) {
const value = document.measurements[measurementType].value;
// Calculate rolling averages for different time windows
const timeWindows = [
{ name: '5min', minutes: 5 },
{ name: '1hour', minutes: 60 },
{ name: '24hour', minutes: 1440 }
];
for (const window of timeWindows) {
await this.updateWindowAverage(deviceId, measurementType, value, window);
}
}
}
async updateWindowAverage(deviceId, measurementType, currentValue, window) {
const windowStart = new Date(Date.now() - window.minutes * 60 * 1000);
// Calculate average for the time window using aggregation
const pipeline = [
{
$match: {
'device.id': deviceId,
timestamp: { $gte: windowStart },
[`measurements.${measurementType}`]: { $exists: true }
}
},
{
$group: {
_id: null,
average: { $avg: `$measurements.${measurementType}.value` },
count: { $sum: 1 },
min: { $min: `$measurements.${measurementType}.value` },
max: { $max: `$measurements.${measurementType}.value` },
stddev: { $stdDevPop: `$measurements.${measurementType}.value` }
}
}
];
const result = await this.sensorData.aggregate(pipeline).next();
if (result) {
const cacheDocument = {
computed_at: new Date(),
computation_type: {
type: 'rolling_average',
device_id: deviceId,
measurement_type: measurementType,
window: window.name
},
statistics: {
average: result.average,
count: result.count,
min: result.min,
max: result.max,
stddev: result.stddev || 0,
current_value: currentValue,
// Trend calculation
trend: currentValue > result.average ? 'rising' :
currentValue < result.average ? 'falling' : 'stable',
deviation_percentage: Math.abs((currentValue - result.average) / result.average * 100)
}
};
await this.analyticsCache.replaceOne(
{
'computation_type.type': 'rolling_average',
'computation_type.device_id': deviceId,
'computation_type.measurement_type': measurementType,
'computation_type.window': window.name
},
cacheDocument,
{ upsert: true }
);
}
}
async getDeviceAnalytics(deviceId, options = {}) {
const timeRange = options.timeRange || '24h';
const measurementTypes = options.measurementTypes || null;
const includeAggregations = options.includeAggregations !== false;
try {
// Parse time range
const timeRangeMs = this.parseTimeRange(timeRange);
const startTime = new Date(Date.now() - timeRangeMs);
// Build aggregation pipeline
const pipeline = [
{
$match: {
'device.id': deviceId,
timestamp: { $gte: startTime }
}
}
];
// Add measurement type filtering if specified
if (measurementTypes && measurementTypes.length > 0) {
const measurementFilters = {};
measurementTypes.forEach(type => {
measurementFilters[`measurements.${type}`] = { $exists: true };
});
pipeline.push({ $match: { $or: Object.entries(measurementFilters).map(([key, value]) => ({ [key]: value })) } });
}
if (includeAggregations) {
// Add aggregation stages for comprehensive analytics
pipeline.push(
{
$addFields: {
hour_bucket: {
$dateTrunc: { date: '$timestamp', unit: 'hour' }
}
}
},
{
$group: {
_id: {
hour: '$hour_bucket',
sensor_type: '$device.sensor_type'
},
// Time and count metrics
measurement_count: { $sum: 1 },
first_measurement: { $min: '$timestamp' },
last_measurement: { $max: '$timestamp' },
// Data quality metrics
avg_quality_score: { $avg: '$quality_metrics.data_quality_score' },
min_quality_score: { $min: '$quality_metrics.data_quality_score' },
// Network metrics
avg_signal_strength: { $avg: '$device.network_info.signal_strength' },
// Measurement statistics (dynamic based on available measurements)
measurements: { $push: '$measurements' }
}
},
{
$addFields: {
// Process measurements to calculate statistics for each type
measurement_stats: {
$reduce: {
input: '$measurements',
initialValue: {},
in: {
$mergeObjects: [
'$$value',
{
$arrayToObject: {
$map: {
input: { $objectToArray: '$$this' },
in: {
k: '$$this.k',
v: {
values: { $concatArrays: [{ $ifNull: [{ $getField: { field: 'values', input: { $getField: { field: '$$this.k', input: '$$value' } } } }, []] }, ['$$this.v.value']] },
unit: '$$this.v.unit'
}
}
}
}
}
]
}
}
}
}
},
{
$addFields: {
// Calculate final statistics for each measurement type
final_measurement_stats: {
$arrayToObject: {
$map: {
input: { $objectToArray: '$measurement_stats' },
in: {
k: '$$this.k',
v: {
count: { $size: '$$this.v.values' },
average: { $avg: '$$this.v.values' },
min: { $min: '$$this.v.values' },
max: { $max: '$$this.v.values' },
stddev: { $stdDevPop: '$$this.v.values' },
unit: '$$this.v.unit'
}
}
}
}
}
}
},
{
$sort: { '_id.hour': 1 }
}
);
} else {
// Simple data retrieval without aggregations
pipeline.push(
{
$sort: { timestamp: -1 }
},
{
$limit: options.limit || 1000
}
);
}
const results = await this.sensorData.aggregate(pipeline).toArray();
// Get cached analytics for quick dashboard metrics
const cachedAnalytics = await this.getCachedAnalytics(deviceId, measurementTypes);
return {
success: true,
device_id: deviceId,
time_range: timeRange,
query_timestamp: new Date(),
data_points: results,
cached_analytics: cachedAnalytics,
summary: {
total_measurements: results.reduce((sum, item) => sum + (item.measurement_count || 1), 0),
time_span: {
start: startTime,
end: new Date()
},
measurement_types: measurementTypes || 'all'
}
};
} catch (error) {
console.error('Error retrieving device analytics:', error);
return {
success: false,
error: error.message,
device_id: deviceId
};
}
}
async getCachedAnalytics(deviceId, measurementTypes = null) {
const query = {
'computation_type.device_id': deviceId
};
if (measurementTypes && measurementTypes.length > 0) {
query['computation_type.measurement_type'] = { $in: measurementTypes };
}
const cachedResults = await this.analyticsCache.find(query)
.sort({ computed_at: -1 })
.toArray();
// Organize cached results by measurement type and window
const organized = {};
cachedResults.forEach(result => {
const measurementType = result.computation_type.measurement_type;
const window = result.computation_type.window;
if (!organized[measurementType]) {
organized[measurementType] = {};
}
organized[measurementType][window] = {
...result.statistics,
computed_at: result.computed_at
};
});
return organized;
}
parseTimeRange(timeRange) {
const ranges = {
'5m': 5 * 60 * 1000,
'15m': 15 * 60 * 1000,
'1h': 60 * 60 * 1000,
'6h': 6 * 60 * 60 * 1000,
'24h': 24 * 60 * 60 * 1000,
'7d': 7 * 24 * 60 * 60 * 1000,
'30d': 30 * 24 * 60 * 60 * 1000
};
return ranges[timeRange] || ranges['24h'];
}
async getFacilityOverview(facility, options = {}) {
const timeRange = options.timeRange || '24h';
const timeRangeMs = this.parseTimeRange(timeRange);
const startTime = new Date(Date.now() - timeRangeMs);
try {
const pipeline = [
{
$match: {
'device.facility': facility,
timestamp: { $gte: startTime }
}
},
{
$group: {
_id: {
device_id: '$device.id',
zone: '$device.zone',
sensor_type: '$device.sensor_type'
},
latest_timestamp: { $max: '$timestamp' },
measurement_count: { $sum: 1 },
avg_data_quality: { $avg: '$quality_metrics.data_quality_score' },
avg_signal_strength: { $avg: '$device.network_info.signal_strength' },
// Latest measurements for current values
latest_measurements: { $last: '$measurements' },
// Device info
device_info: { $last: '$device' },
latest_quality_metrics: { $last: '$quality_metrics' }
}
},
{
$addFields: {
// Calculate device status
device_status: {
$switch: {
branches: [
{
case: { $lt: ['$latest_timestamp', { $subtract: [new Date(), 300000] }] }, // 5 minutes
then: 'offline'
},
{
case: { $lt: ['$avg_data_quality', 50] },
then: 'degraded'
},
{
case: { $lt: ['$avg_signal_strength', -80] },
then: 'poor_connectivity'
}
],
default: 'online'
}
},
// Time since last measurement
minutes_since_last_measurement: {
$divide: [
{ $subtract: [new Date(), '$latest_timestamp'] },
60000
]
}
}
},
{
$group: {
_id: '$_id.zone',
device_count: { $sum: 1 },
// Status distribution
online_devices: {
$sum: { $cond: [{ $eq: ['$device_status', 'online'] }, 1, 0] }
},
offline_devices: {
$sum: { $cond: [{ $eq: ['$device_status', 'offline'] }, 1, 0] }
},
degraded_devices: {
$sum: { $cond: [{ $eq: ['$device_status', 'degraded'] }, 1, 0] }
},
// Performance metrics
avg_data_quality: { $avg: '$avg_data_quality' },
avg_signal_strength: { $avg: '$avg_signal_strength' },
total_measurements: { $sum: '$measurement_count' },
// Sensor type distribution
sensor_types: { $addToSet: '$_id.sensor_type' },
// Device details
devices: {
$push: {
device_id: '$_id.device_id',
sensor_type: '$_id.sensor_type',
status: '$device_status',
last_seen: '$latest_timestamp',
data_quality: '$avg_data_quality',
signal_strength: '$avg_signal_strength',
measurement_count: '$measurement_count',
latest_measurements: '$latest_measurements'
}
}
}
},
{
$sort: { '_id': 1 }
}
];
const results = await this.sensorData.aggregate(pipeline).toArray();
// Calculate facility-wide statistics
const facilityStats = results.reduce((stats, zone) => {
stats.total_devices += zone.device_count;
stats.total_online += zone.online_devices;
stats.total_offline += zone.offline_devices;
stats.total_degraded += zone.degraded_devices;
stats.total_measurements += zone.total_measurements;
stats.avg_data_quality = (stats.avg_data_quality * stats.zones_processed + zone.avg_data_quality) / (stats.zones_processed + 1);
stats.avg_signal_strength = (stats.avg_signal_strength * stats.zones_processed + zone.avg_signal_strength) / (stats.zones_processed + 1);
stats.zones_processed += 1;
// Collect unique sensor types
zone.sensor_types.forEach(type => {
if (!stats.sensor_types.includes(type)) {
stats.sensor_types.push(type);
}
});
return stats;
}, {
total_devices: 0,
total_online: 0,
total_offline: 0,
total_degraded: 0,
total_measurements: 0,
avg_data_quality: 0,
avg_signal_strength: 0,
sensor_types: [],
zones_processed: 0
});
// Calculate health score
const healthScore = Math.round(
(facilityStats.total_online / Math.max(facilityStats.total_devices, 1)) * 0.6 +
(facilityStats.avg_data_quality / 100) * 0.3 +
((facilityStats.avg_signal_strength + 100) / 50) * 0.1
);
return {
success: true,
facility: facility,
time_range: timeRange,
generated_at: new Date(),
facility_overview: {
total_devices: facilityStats.total_devices,
online_devices: facilityStats.total_online,
offline_devices: facilityStats.total_offline,
degraded_devices: facilityStats.total_degraded,
uptime_percentage: Math.round((facilityStats.total_online / Math.max(facilityStats.total_devices, 1)) * 100),
avg_data_quality: Math.round(facilityStats.avg_data_quality),
avg_signal_strength: Math.round(facilityStats.avg_signal_strength),
facility_health_score: healthScore,
sensor_types: facilityStats.sensor_types,
total_measurements_today: facilityStats.total_measurements,
zones: results
}
};
} catch (error) {
console.error('Error generating facility overview:', error);
return {
success: false,
error: error.message,
facility: facility
};
}
}
async performBatchIngestion(batchData, options = {}) {
const batchSize = options.batchSize || 1000;
const enableValidation = options.enableValidation !== false;
const startTime = Date.now();
console.log(`Starting batch ingestion of ${batchData.length} records...`);
try {
const results = {
total_records: batchData.length,
processed_records: 0,
failed_records: 0,
batches_processed: 0,
processing_time_ms: 0,
errors: []
};
// Process in batches for optimal performance
for (let i = 0; i < batchData.length; i += batchSize) {
const batch = batchData.slice(i, i + batchSize);
const batchStartTime = Date.now();
// Prepare documents for insertion
const documents = batch.map(record => {
try {
if (enableValidation) {
this.validateBatchRecord(record);
}
return {
timestamp: new Date(record.timestamp),
device: {
id: record.device_id,
sensor_type: record.sensor_type,
facility: record.facility || 'unknown',
zone: record.zone || 'unspecified',
location: record.location ? {
type: 'Point',
coordinates: [record.location.lng, record.location.lat]
} : null,
model: record.device_model || 'unknown',
firmware_version: record.firmware_version || '1.0',
network_info: {
connection_type: record.connection_type || 'unknown',
signal_strength: record.signal_strength || -50,
gateway_id: record.gateway_id
}
},
measurements: this.normalizeMeasurements(record.measurements || record.values),
quality_metrics: {
data_quality_score: record.data_quality_score || 95,
sensor_health: record.sensor_health || 'normal',
calibration_status: record.calibration_status || 'valid',
measurement_accuracy: record.measurement_accuracy || 0.95
},
processing_info: {
ingestion_timestamp: new Date(),
processing_latency_ms: 0,
source: 'batch_import',
batch_id: options.batchId || `batch_${Date.now()}`,
schema_version: '2.0'
}
};
} catch (validationError) {
results.errors.push({
record_index: i + batch.indexOf(record),
error: validationError.message,
record: record
});
return null;
}
}).filter(doc => doc !== null);
if (documents.length > 0) {
try {
await this.sensorData.insertMany(documents, {
ordered: false,
writeConcern: { w: 'majority', j: true }
});
results.processed_records += documents.length;
} catch (insertError) {
results.failed_records += documents.length;
results.errors.push({
batch_index: results.batches_processed,
error: insertError.message,
documents_count: documents.length
});
}
}
results.batches_processed += 1;
const batchTime = Date.now() - batchStartTime;
console.log(`Batch ${results.batches_processed} processed: ${documents.length} records in ${batchTime}ms`);
}
results.processing_time_ms = Date.now() - startTime;
results.success_rate = (results.processed_records / results.total_records) * 100;
results.throughput_records_per_second = Math.round(results.processed_records / (results.processing_time_ms / 1000));
console.log(`Batch ingestion completed: ${results.processed_records}/${results.total_records} records processed in ${results.processing_time_ms}ms`);
return {
success: true,
results: results
};
} catch (error) {
console.error('Batch ingestion failed:', error);
return {
success: false,
error: error.message,
processing_time_ms: Date.now() - startTime
};
}
}
validateBatchRecord(record) {
if (!record.device_id) {
throw new Error('device_id is required');
}
if (!record.sensor_type) {
throw new Error('sensor_type is required');
}
if (!record.timestamp) {
throw new Error('timestamp is required');
}
if (!record.measurements && !record.values) {
throw new Error('measurements or values are required');
}
// Validate timestamp format
const timestamp = new Date(record.timestamp);
if (isNaN(timestamp.getTime())) {
throw new Error('Invalid timestamp format');
}
// Validate timestamp is not in the future
if (timestamp > new Date()) {
throw new Error('Timestamp cannot be in the future');
}
// Validate timestamp is not too old (more than 1 year)
const oneYearAgo = new Date(Date.now() - 365 * 24 * 60 * 60 * 1000);
if (timestamp < oneYearAgo) {
throw new Error('Timestamp too old (more than 1 year)');
}
}
}
module.exports = { MongoDBTimeSeriesManager };
Advanced Time Series Analytics Patterns
Real-Time Aggregation Pipelines
Implement sophisticated real-time analytics using MongoDB aggregation frameworks:
// Advanced analytics and alerting system
class TimeSeriesAnalyticsEngine {
constructor(timeSeriesManager) {
this.tsManager = timeSeriesManager;
this.db = timeSeriesManager.db;
this.alertRules = new Map();
this.analyticsCache = new Map();
}
async createAdvancedAnalyticsPipeline(analysisConfig) {
const {
deviceFilter = {},
timeRange = '24h',
aggregationLevel = 'hour',
analysisTypes = ['trend', 'anomaly', 'correlation'],
realTimeEnabled = true
} = analysisConfig;
try {
const timeRangeMs = this.tsManager.parseTimeRange(timeRange);
const startTime = new Date(Date.now() - timeRangeMs);
// Build comprehensive analytics pipeline
const pipeline = [
// Stage 1: Filter data by time and device criteria
{
$match: {
timestamp: { $gte: startTime },
...this.buildDeviceFilter(deviceFilter)
}
},
// Stage 2: Add time bucketing for aggregation
{
$addFields: {
time_bucket: this.getTimeBucketExpression(aggregationLevel),
hour_of_day: { $hour: '$timestamp' },
day_of_week: { $dayOfWeek: '$timestamp' },
is_business_hours: {
$and: [
{ $gte: [{ $hour: '$timestamp' }, 8] },
{ $lte: [{ $hour: '$timestamp' }, 18] }
]
}
}
},
// Stage 3: Unwind measurements for individual analysis
{
$addFields: {
measurement_array: {
$objectToArray: '$measurements'
}
}
},
{
$unwind: '$measurement_array'
},
// Stage 4: Group by time bucket, device, and measurement type
{
$group: {
_id: {
time_bucket: '$time_bucket',
device_id: '$device.id',
measurement_type: '$measurement_array.k',
facility: '$device.facility',
zone: '$device.zone'
},
// Statistical aggregations
count: { $sum: 1 },
avg_value: { $avg: '$measurement_array.v.value' },
min_value: { $min: '$measurement_array.v.value' },
max_value: { $max: '$measurement_array.v.value' },
sum_value: { $sum: '$measurement_array.v.value' },
stddev_value: { $stdDevPop: '$measurement_array.v.value' },
// Data quality metrics
avg_data_quality: { $avg: '$quality_metrics.data_quality_score' },
min_data_quality: { $min: '$quality_metrics.data_quality_score' },
// Network performance
avg_signal_strength: { $avg: '$device.network_info.signal_strength' },
// Time-based metrics
first_timestamp: { $min: '$timestamp' },
last_timestamp: { $max: '$timestamp' },
// Business context
business_hours_readings: {
$sum: { $cond: ['$is_business_hours', 1, 0] }
},
// Value arrays for advanced calculations
values: { $push: '$measurement_array.v.value' },
timestamps: { $push: '$timestamp' },
// Metadata
unit: { $last: '$measurement_array.v.unit' },
device_info: { $last: '$device' }
}
},
// Stage 5: Calculate advanced metrics
{
$addFields: {
// Variance and coefficient of variation
variance: { $pow: ['$stddev_value', 2] },
coefficient_of_variation: {
$cond: [
{ $ne: ['$avg_value', 0] },
{ $divide: ['$stddev_value', '$avg_value'] },
0
]
},
// Range and percentiles (approximated)
value_range: { $subtract: ['$max_value', '$min_value'] },
// Data completeness
expected_readings: {
$divide: [
{ $subtract: ['$last_timestamp', '$first_timestamp'] },
{ $multiply: [this.getExpectedInterval(aggregationLevel), 1000] }
]
},
// Time span coverage
time_span_hours: {
$divide: [
{ $subtract: ['$last_timestamp', '$first_timestamp'] },
3600000
]
},
// Business hours coverage
business_hours_percentage: {
$multiply: [
{ $divide: ['$business_hours_readings', '$count'] },
100
]
}
}
},
// Stage 6: Calculate trend indicators
{
$addFields: {
// Simple trend approximation
trend_direction: {
$switch: {
branches: [
{
case: { $gt: ['$coefficient_of_variation', 0.5] },
then: 'highly_variable'
},
{
case: { $gt: ['$max_value', { $multiply: ['$avg_value', 1.2] }] },
then: 'trending_high'
},
{
case: { $lt: ['$min_value', { $multiply: ['$avg_value', 0.8] }] },
then: 'trending_low'
}
],
default: 'stable'
}
},
// Anomaly detection flags
anomaly_indicators: {
$let: {
vars: {
three_sigma_upper: { $add: ['$avg_value', { $multiply: ['$stddev_value', 3] }] },
three_sigma_lower: { $subtract: ['$avg_value', { $multiply: ['$stddev_value', 3] }] }
},
in: {
has_outliers: {
$or: [
{ $gt: ['$max_value', '$$three_sigma_upper'] },
{ $lt: ['$min_value', '$$three_sigma_lower'] }
]
},
outlier_percentage: {
$multiply: [
{
$divide: [
{
$size: {
$filter: {
input: '$values',
cond: {
$or: [
{ $gt: ['$$this', '$$three_sigma_upper'] },
{ $lt: ['$$this', '$$three_sigma_lower'] }
]
}
}
}
},
'$count'
]
},
100
]
}
}
}
},
// Performance indicators
performance_score: {
$multiply: [
// Data quality component (40%)
{ $multiply: [{ $divide: ['$avg_data_quality', 100] }, 0.4] },
// Connectivity component (30%)
{ $multiply: [{ $divide: [{ $add: ['$avg_signal_strength', 100] }, 50] }, 0.3] },
// Completeness component (30%)
{ $multiply: [{ $min: [{ $divide: ['$count', '$expected_readings'] }, 1] }, 0.3] },
100
]
}
}
},
// Stage 7: Add comparative context
{
$lookup: {
from: 'sensor_readings',
let: {
device_id: '$_id.device_id',
measurement_type: '$_id.measurement_type',
current_start: '$first_timestamp'
},
pipeline: [
{
$match: {
$expr: {
$and: [
{ $eq: ['$device.id', '$$device_id'] },
{ $lt: ['$timestamp', '$$current_start'] },
{ $gte: ['$timestamp', { $subtract: ['$$current_start', timeRangeMs] }] }
]
}
}
},
{
$group: {
_id: null,
historical_avg: { $avg: { $getField: { field: '$$measurement_type', input: '$measurements' } } }
}
}
],
as: 'historical_context'
}
},
// Stage 8: Final calculations and categorization
{
$addFields: {
// Historical comparison
historical_avg: {
$ifNull: [
{ $arrayElemAt: ['$historical_context.historical_avg', 0] },
'$avg_value'
]
},
// Calculate change from historical baseline
historical_change_percentage: {
$let: {
vars: {
historical: {
$ifNull: [
{ $arrayElemAt: ['$historical_context.historical_avg', 0] },
'$avg_value'
]
}
},
in: {
$cond: [
{ $ne: ['$$historical', 0] },
{
$multiply: [
{ $divide: [{ $subtract: ['$avg_value', '$$historical'] }, '$$historical'] },
100
]
},
0
]
}
}
},
// Overall health assessment
health_status: {
$switch: {
branches: [
{
case: { $lt: ['$performance_score', 50] },
then: 'critical'
},
{
case: { $lt: ['$performance_score', 70] },
then: 'warning'
},
{
case: { $gt: ['$anomaly_indicators.outlier_percentage', 10] },
then: 'anomalous'
}
],
default: 'healthy'
}
},
// Analysis timestamp
analyzed_at: new Date(),
analysis_duration_ms: { $subtract: [new Date(), startTime] }
}
},
// Stage 9: Sort by relevance
{
$sort: {
performance_score: 1, // Worst performers first
'anomaly_indicators.outlier_percentage': -1,
'_id.time_bucket': -1
}
}
];
const results = await this.tsManager.sensorData.aggregate(pipeline).toArray();
// Process results for real-time actions
if (realTimeEnabled) {
await this.processAnalyticsForAlerts(results);
}
// Cache results for dashboard performance
const cacheKey = this.generateAnalyticsCacheKey(analysisConfig);
this.analyticsCache.set(cacheKey, {
results: results,
generated_at: new Date(),
config: analysisConfig
});
return {
success: true,
analysis_config: analysisConfig,
results: results,
summary: this.generateAnalyticsSummary(results),
generated_at: new Date(),
cache_key: cacheKey
};
} catch (error) {
console.error('Advanced analytics pipeline failed:', error);
return {
success: false,
error: error.message,
analysis_config: analysisConfig
};
}
}
buildDeviceFilter(deviceFilter) {
const filter = {};
if (deviceFilter.device_ids && deviceFilter.device_ids.length > 0) {
filter['device.id'] = { $in: deviceFilter.device_ids };
}
if (deviceFilter.facilities && deviceFilter.facilities.length > 0) {
filter['device.facility'] = { $in: deviceFilter.facilities };
}
if (deviceFilter.zones && deviceFilter.zones.length > 0) {
filter['device.zone'] = { $in: deviceFilter.zones };
}
if (deviceFilter.sensor_types && deviceFilter.sensor_types.length > 0) {
filter['device.sensor_type'] = { $in: deviceFilter.sensor_types };
}
if (deviceFilter.location_radius) {
const { center, radius_meters } = deviceFilter.location_radius;
filter['device.location'] = {
$geoWithin: {
$centerSphere: [[center.lng, center.lat], radius_meters / 6378137] // Earth radius in meters
}
};
}
return filter;
}
getTimeBucketExpression(aggregationLevel) {
const buckets = {
minute: { $dateTrunc: { date: '$timestamp', unit: 'minute' } },
hour: { $dateTrunc: { date: '$timestamp', unit: 'hour' } },
day: { $dateTrunc: { date: '$timestamp', unit: 'day' } },
week: {
$dateAdd: {
startDate: { $dateTrunc: { date: '$timestamp', unit: 'week', startOfWeek: 'monday' } },
unit: 'day',
amount: 0
}
},
month: { $dateTrunc: { date: '$timestamp', unit: 'month' } }
};
return buckets[aggregationLevel] || buckets.hour;
}
getExpectedInterval(aggregationLevel) {
const intervals = {
minute: 60, // 60 seconds
hour: 3600, // 3600 seconds
day: 86400, // 86400 seconds
week: 604800, // 604800 seconds
month: 2592000 // ~30 days in seconds
};
return intervals[aggregationLevel] || intervals.hour;
}
async processAnalyticsForAlerts(analyticsResults) {
for (const result of analyticsResults) {
// Check for alert conditions
if (result.health_status === 'critical') {
await this.createAnalyticsAlert('critical_performance', result);
}
if (result.anomaly_indicators.outlier_percentage > 15) {
await this.createAnalyticsAlert('anomaly_detected', result);
}
if (Math.abs(result.historical_change_percentage) > 50) {
await this.createAnalyticsAlert('significant_trend_change', result);
}
if (result.performance_score < 30) {
await this.createAnalyticsAlert('poor_performance', result);
}
}
}
async createAnalyticsAlert(alertType, analyticsData) {
const alertsCollection = this.db.collection('analytics_alerts');
const alert = {
_id: new ObjectId(),
alert_type: alertType,
device_id: analyticsData._id.device_id,
measurement_type: analyticsData._id.measurement_type,
facility: analyticsData._id.facility,
zone: analyticsData._id.zone,
// Alert details
severity: this.calculateAlertSeverity(alertType, analyticsData),
description: this.generateAlertDescription(alertType, analyticsData),
// Analytics context
analytics_data: {
time_bucket: analyticsData._id.time_bucket,
performance_score: analyticsData.performance_score,
health_status: analyticsData.health_status,
anomaly_indicators: analyticsData.anomaly_indicators,
historical_change_percentage: analyticsData.historical_change_percentage,
avg_value: analyticsData.avg_value,
trend_direction: analyticsData.trend_direction
},
// Timestamps
created_at: new Date(),
acknowledged: false,
resolved: false
};
await alertsCollection.insertOne(alert);
console.log(`Analytics Alert Created: ${alertType} for device ${analyticsData._id.device_id}`);
}
calculateAlertSeverity(alertType, analyticsData) {
switch (alertType) {
case 'critical_performance':
return analyticsData.performance_score < 20 ? 'critical' : 'high';
case 'anomaly_detected':
return analyticsData.anomaly_indicators.outlier_percentage > 25 ? 'high' : 'medium';
case 'significant_trend_change':
return Math.abs(analyticsData.historical_change_percentage) > 100 ? 'high' : 'medium';
case 'poor_performance':
return analyticsData.performance_score < 20 ? 'high' : 'medium';
default:
return 'medium';
}
}
generateAlertDescription(alertType, analyticsData) {
const device = analyticsData._id.device_id;
const measurement = analyticsData._id.measurement_type;
switch (alertType) {
case 'critical_performance':
return `Critical performance degradation detected for ${device} ${measurement} sensor. Performance score: ${Math.round(analyticsData.performance_score)}%`;
case 'anomaly_detected':
return `Anomalous readings detected for ${device} ${measurement}. ${Math.round(analyticsData.anomaly_indicators.outlier_percentage)}% of readings are outliers`;
case 'significant_trend_change':
return `Significant trend change for ${device} ${measurement}. ${Math.round(analyticsData.historical_change_percentage)}% change from historical baseline`;
case 'poor_performance':
return `Poor performance detected for ${device} ${measurement}. Performance score: ${Math.round(analyticsData.performance_score)}%`;
default:
return `Analytics alert for ${device} ${measurement}`;
}
}
generateAnalyticsSummary(results) {
if (results.length === 0) {
return { total_devices: 0, total_measurements: 0 };
}
const summary = {
total_analyses: results.length,
unique_devices: new Set(results.map(r => r._id.device_id)).size,
unique_measurements: new Set(results.map(r => r._id.measurement_type)).size,
unique_facilities: new Set(results.map(r => r._id.facility)).size,
// Health distribution
health_status_distribution: {},
// Performance metrics
avg_performance_score: 0,
min_performance_score: 100,
max_performance_score: 0,
// Anomaly statistics
anomalous_analyses: 0,
avg_outlier_percentage: 0,
// Trend distribution
trend_distribution: {},
// Time range
earliest_bucket: null,
latest_bucket: null
};
// Calculate distributions and averages
results.forEach(result => {
// Health status distribution
const status = result.health_status;
summary.health_status_distribution[status] = (summary.health_status_distribution[status] || 0) + 1;
// Performance metrics
summary.avg_performance_score += result.performance_score;
summary.min_performance_score = Math.min(summary.min_performance_score, result.performance_score);
summary.max_performance_score = Math.max(summary.max_performance_score, result.performance_score);
// Anomaly tracking
if (result.anomaly_indicators.has_outliers) {
summary.anomalous_analyses++;
}
summary.avg_outlier_percentage += result.anomaly_indicators.outlier_percentage;
// Trend distribution
const trend = result.trend_direction;
summary.trend_distribution[trend] = (summary.trend_distribution[trend] || 0) + 1;
// Time range
const bucket = result._id.time_bucket;
if (!summary.earliest_bucket || bucket < summary.earliest_bucket) {
summary.earliest_bucket = bucket;
}
if (!summary.latest_bucket || bucket > summary.latest_bucket) {
summary.latest_bucket = bucket;
}
});
// Calculate averages
summary.avg_performance_score = Math.round(summary.avg_performance_score / results.length);
summary.avg_outlier_percentage = Math.round(summary.avg_outlier_percentage / results.length);
summary.anomaly_rate = Math.round((summary.anomalous_analyses / results.length) * 100);
return summary;
}
generateAnalyticsCacheKey(analysisConfig) {
const keyData = {
devices: JSON.stringify(analysisConfig.deviceFilter || {}),
timeRange: analysisConfig.timeRange,
aggregationLevel: analysisConfig.aggregationLevel,
analysisTypes: JSON.stringify(analysisConfig.analysisTypes || [])
};
const crypto = require('crypto');
return crypto.createHash('md5').update(JSON.stringify(keyData)).digest('hex');
}
}
module.exports = { TimeSeriesAnalyticsEngine };
SQL-Style Time Series Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB time series operations:
-- QueryLeaf Time Series operations with SQL-familiar syntax
-- Insert time series data with automatic optimization
INSERT INTO sensor_readings (
timestamp,
device_id,
sensor_type,
measurements,
location,
quality_metrics
)
VALUES (
CURRENT_TIMESTAMP,
'device_001',
'temperature',
JSON_BUILD_OBJECT(
'temperature', JSON_BUILD_OBJECT('value', 23.5, 'unit', 'celsius'),
'humidity', JSON_BUILD_OBJECT('value', 65.2, 'unit', 'percent')
),
ST_GeomFromText('POINT(-74.0060 40.7128)', 4326),
JSON_BUILD_OBJECT(
'data_quality_score', 95,
'sensor_health', 'normal',
'signal_strength', -45
)
);
-- Advanced time series analytics with window functions
WITH hourly_analytics AS (
SELECT
device_id,
sensor_type,
DATE_TRUNC('hour', timestamp) as hour_bucket,
-- Basic statistics
COUNT(*) as reading_count,
AVG(measurements->>'temperature'->>'value') as avg_temperature,
MIN(measurements->>'temperature'->>'value') as min_temperature,
MAX(measurements->>'temperature'->>'value') as max_temperature,
STDDEV(measurements->>'temperature'->>'value') as temp_stddev,
-- Time series specific aggregations
FIRST_VALUE(measurements->>'temperature'->>'value') OVER (
PARTITION BY device_id, DATE_TRUNC('hour', timestamp)
ORDER BY timestamp ASC
ROWS UNBOUNDED PRECEDING
) as first_reading,
LAST_VALUE(measurements->>'temperature'->>'value') OVER (
PARTITION BY device_id, DATE_TRUNC('hour', timestamp)
ORDER BY timestamp ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as last_reading,
-- Moving averages
AVG(measurements->>'temperature'->>'value') OVER (
PARTITION BY device_id
ORDER BY timestamp
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) as moving_avg_6_readings,
-- Data quality metrics
AVG(quality_metrics->>'data_quality_score') as avg_data_quality,
MIN(quality_metrics->>'signal_strength') as min_signal_strength,
-- Geographical aggregation
device.facility,
device.zone,
ST_AsText(AVG(device.location)) as avg_location
FROM sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND device.sensor_type = 'temperature'
GROUP BY
device_id,
sensor_type,
DATE_TRUNC('hour', timestamp),
device.facility,
device.zone
),
trend_analysis AS (
-- Calculate trends and changes over time
SELECT
*,
-- Hour-over-hour trend calculation
LAG(avg_temperature, 1) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
) as prev_hour_temp,
-- Trend direction
CASE
WHEN avg_temperature > LAG(avg_temperature, 1) OVER (
PARTITION BY device_id ORDER BY hour_bucket
) + temp_stddev THEN 'rising_fast'
WHEN avg_temperature > LAG(avg_temperature, 1) OVER (
PARTITION BY device_id ORDER BY hour_bucket
) THEN 'rising'
WHEN avg_temperature < LAG(avg_temperature, 1) OVER (
PARTITION BY device_id ORDER BY hour_bucket
) - temp_stddev THEN 'falling_fast'
WHEN avg_temperature < LAG(avg_temperature, 1) OVER (
PARTITION BY device_id ORDER BY hour_bucket
) THEN 'falling'
ELSE 'stable'
END as trend_direction,
-- Anomaly detection using statistical boundaries
CASE
WHEN ABS(avg_temperature - AVG(avg_temperature) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
)) > 3 * STDDEV(avg_temperature) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
) THEN true
ELSE false
END as is_anomaly,
-- Performance scoring
CASE
WHEN avg_data_quality >= 90 AND min_signal_strength >= -60 THEN 'excellent'
WHEN avg_data_quality >= 80 AND min_signal_strength >= -70 THEN 'good'
WHEN avg_data_quality >= 70 AND min_signal_strength >= -80 THEN 'fair'
ELSE 'poor'
END as performance_rating,
-- Operational status
CASE
WHEN reading_count < 50 THEN 'low_frequency' -- Expected: 60 readings per hour
WHEN reading_count > 70 THEN 'high_frequency'
ELSE 'normal_frequency'
END as operational_status
FROM hourly_analytics
),
facility_overview AS (
-- Facility-level aggregations and insights
SELECT
facility,
zone,
hour_bucket,
-- Device and measurement counts
COUNT(DISTINCT device_id) as active_devices,
SUM(reading_count) as total_readings,
-- Temperature analytics
AVG(avg_temperature) as facility_avg_temp,
MIN(min_temperature) as facility_min_temp,
MAX(max_temperature) as facility_max_temp,
-- Performance metrics
AVG(avg_data_quality) as facility_data_quality,
AVG(min_signal_strength) as facility_avg_signal,
-- Status distribution
COUNT(*) FILTER (WHERE performance_rating = 'excellent') as excellent_devices,
COUNT(*) FILTER (WHERE performance_rating = 'good') as good_devices,
COUNT(*) FILTER (WHERE performance_rating = 'fair') as fair_devices,
COUNT(*) FILTER (WHERE performance_rating = 'poor') as poor_devices,
-- Anomaly and trend insights
COUNT(*) FILTER (WHERE is_anomaly = true) as anomalous_devices,
COUNT(*) FILTER (WHERE trend_direction LIKE '%rising%') as rising_trend_devices,
COUNT(*) FILTER (WHERE trend_direction LIKE '%falling%') as falling_trend_devices,
-- Operational health
COUNT(*) FILTER (WHERE operational_status = 'normal_frequency') as normal_operation_devices,
COUNT(*) FILTER (WHERE operational_status = 'low_frequency') as low_frequency_devices,
-- Geographic insights (if location data available)
COUNT(DISTINCT avg_location) as location_diversity
FROM trend_analysis
GROUP BY facility, zone, hour_bucket
)
-- Final comprehensive time series analytics dashboard
SELECT
f.facility,
f.zone,
f.hour_bucket,
-- Device and data summary
f.active_devices,
f.total_readings,
ROUND(f.total_readings::numeric / NULLIF(f.active_devices, 0), 0) as avg_readings_per_device,
-- Environmental metrics
ROUND(f.facility_avg_temp, 2) as avg_temperature,
ROUND(f.facility_min_temp, 2) as min_temperature,
ROUND(f.facility_max_temp, 2) as max_temperature,
ROUND(f.facility_max_temp - f.facility_min_temp, 2) as temperature_range,
-- Performance assessment
ROUND(f.facility_data_quality, 1) as data_quality_percentage,
ROUND(f.facility_avg_signal, 0) as avg_signal_strength,
-- Health score calculation
ROUND(
(f.excellent_devices * 100 + f.good_devices * 80 + f.fair_devices * 60 + f.poor_devices * 40)
/ NULLIF(f.active_devices, 0),
1
) as facility_health_score,
-- Status distribution percentages
ROUND((f.excellent_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as excellent_percentage,
ROUND((f.good_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as good_percentage,
ROUND((f.fair_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as fair_percentage,
ROUND((f.poor_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as poor_percentage,
-- Trend and anomaly insights
f.anomalous_devices,
ROUND((f.anomalous_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as anomaly_percentage,
f.rising_trend_devices,
f.falling_trend_devices,
-- Operational status
f.normal_operation_devices,
f.low_frequency_devices,
ROUND((f.normal_operation_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as operational_health_percentage,
-- Alert conditions
CASE
WHEN f.anomalous_devices > (f.active_devices * 0.2) THEN 'high_anomaly_alert'
WHEN f.poor_devices > (f.active_devices * 0.3) THEN 'poor_performance_alert'
WHEN f.low_frequency_devices > (f.active_devices * 0.4) THEN 'connectivity_alert'
WHEN f.facility_avg_temp > 40 OR f.facility_avg_temp < 0 THEN 'environmental_alert'
ELSE 'normal'
END as alert_status,
-- Recommendations
CASE
WHEN f.poor_devices > (f.active_devices * 0.2) THEN 'Investigate device performance issues'
WHEN f.anomalous_devices > (f.active_devices * 0.1) THEN 'Review anomalous readings for pattern analysis'
WHEN f.facility_data_quality < 80 THEN 'Improve data quality monitoring and sensor calibration'
WHEN f.facility_avg_signal < -70 THEN 'Consider network infrastructure improvements'
ELSE 'System operating within normal parameters'
END as recommendation,
-- Metadata
CURRENT_TIMESTAMP as report_generated_at,
'24h_analysis' as analysis_type
FROM facility_overview f
WHERE f.hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY
f.facility,
f.zone,
f.hour_bucket DESC;
-- Real-time alerting with time series patterns
WITH real_time_thresholds AS (
SELECT
device_id,
sensor_type,
-- Current reading
measurements->>'temperature'->>'value' as current_temp,
measurements->>'humidity'->>'value' as current_humidity,
quality_metrics->>'data_quality_score' as current_quality,
-- Historical context (last hour average)
AVG(measurements->>'temperature'->>'value') OVER (
PARTITION BY device_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND INTERVAL '1 minute' PRECEDING
) as historical_avg_temp,
-- Recent trend (last 5 readings)
AVG(measurements->>'temperature'->>'value') OVER (
PARTITION BY device_id
ORDER BY timestamp
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) as recent_avg_temp,
-- Device metadata
device.facility,
device.zone,
device.location,
timestamp,
-- Network health
quality_metrics->>'signal_strength' as signal_strength
FROM sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
AND device.sensor_type = 'temperature'
),
alert_conditions AS (
SELECT
*,
-- Threshold breaches
CASE
WHEN current_temp > 80 THEN 'critical_high_temperature'
WHEN current_temp < -10 THEN 'critical_low_temperature'
WHEN current_temp > 60 THEN 'high_temperature_warning'
WHEN current_temp < 5 THEN 'low_temperature_warning'
ELSE null
END as temperature_alert,
-- Rapid changes
CASE
WHEN ABS(current_temp - recent_avg_temp) > 10 THEN 'rapid_temperature_change'
WHEN ABS(current_temp - historical_avg_temp) > 15 THEN 'significant_deviation'
ELSE null
END as change_alert,
-- Data quality issues
CASE
WHEN current_quality < 50 THEN 'critical_data_quality'
WHEN current_quality < 70 THEN 'poor_data_quality'
WHEN signal_strength < -90 THEN 'poor_connectivity'
ELSE null
END as quality_alert,
-- Combined severity assessment
CASE
WHEN current_temp > 80 OR current_temp < -10 OR current_quality < 50 THEN 'critical'
WHEN current_temp > 60 OR current_temp < 5 OR current_quality < 70 OR ABS(current_temp - recent_avg_temp) > 10 THEN 'warning'
WHEN ABS(current_temp - historical_avg_temp) > 15 OR signal_strength < -80 THEN 'info'
ELSE 'normal'
END as overall_severity
FROM real_time_thresholds
)
-- Generate active alerts with context
SELECT
device_id,
facility,
zone,
ST_AsText(location) as device_location,
timestamp as alert_timestamp,
overall_severity,
-- Primary alert
COALESCE(temperature_alert, change_alert, quality_alert, 'normal') as primary_alert_type,
-- Alert message
CASE
WHEN temperature_alert IS NOT NULL THEN
CONCAT('Temperature alert: ', current_temp, '°C detected')
WHEN change_alert IS NOT NULL THEN
CONCAT('Temperature change alert: ', ROUND(ABS(current_temp - recent_avg_temp), 1), '°C change detected')
WHEN quality_alert IS NOT NULL THEN
CONCAT('Data quality alert: ', current_quality, '% quality score')
ELSE 'System normal'
END as alert_message,
-- Current readings
ROUND(current_temp, 2) as current_temperature,
ROUND(current_humidity, 1) as current_humidity,
current_quality as data_quality_percentage,
signal_strength as signal_strength_dbm,
-- Context
ROUND(historical_avg_temp, 2) as hourly_avg_temperature,
ROUND(recent_avg_temp, 2) as recent_avg_temperature,
ROUND(ABS(current_temp - historical_avg_temp), 2) as deviation_from_hourly_avg,
-- Action required
CASE overall_severity
WHEN 'critical' THEN 'IMMEDIATE ACTION REQUIRED'
WHEN 'warning' THEN 'Investigation recommended'
WHEN 'info' THEN 'Monitor for trends'
ELSE 'No action required'
END as recommended_action,
-- Contact priority
CASE overall_severity
WHEN 'critical' THEN 'Notify operations team immediately'
WHEN 'warning' THEN 'Escalate to facility manager'
ELSE 'Log for review'
END as escalation_level
FROM alert_conditions
WHERE overall_severity IN ('critical', 'warning', 'info')
ORDER BY
CASE overall_severity
WHEN 'critical' THEN 1
WHEN 'warning' THEN 2
WHEN 'info' THEN 3
END,
timestamp DESC;
-- Time series data lifecycle management
WITH data_lifecycle_analysis AS (
SELECT
DATE_TRUNC('day', timestamp) as date_bucket,
device.facility,
COUNT(*) as daily_record_count,
AVG(quality_metrics->>'data_quality_score') as avg_daily_quality,
-- Data size estimation (approximate)
COUNT(*) * 1024 as estimated_bytes_per_day, -- Rough estimate
-- Retention category
CASE
WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '7 days' THEN 'hot_data'
WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '90 days' THEN 'warm_data'
WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '365 days' THEN 'cold_data'
ELSE 'archive_data'
END as retention_category,
-- Archive recommendations
CASE
WHEN DATE_TRUNC('day', timestamp) < CURRENT_DATE - INTERVAL '2 years' THEN 'candidate_for_deletion'
WHEN DATE_TRUNC('day', timestamp) < CURRENT_DATE - INTERVAL '1 year'
AND AVG(quality_metrics->>'data_quality_score') < 70 THEN 'candidate_for_archive'
ELSE 'keep_active'
END as lifecycle_recommendation
FROM sensor_readings
WHERE timestamp >= CURRENT_DATE - INTERVAL '2 years'
GROUP BY DATE_TRUNC('day', timestamp), device.facility
)
SELECT
retention_category,
COUNT(*) as day_buckets,
SUM(daily_record_count) as total_records,
ROUND(AVG(avg_daily_quality), 1) as avg_quality_score,
-- Storage estimates
ROUND(SUM(estimated_bytes_per_day) / (1024 * 1024), 1) as estimated_mb,
ROUND(SUM(estimated_bytes_per_day) / (1024 * 1024 * 1024), 2) as estimated_gb,
-- Lifecycle recommendations
SUM(CASE WHEN lifecycle_recommendation = 'candidate_for_deletion' THEN daily_record_count ELSE 0 END) as records_for_deletion,
SUM(CASE WHEN lifecycle_recommendation = 'candidate_for_archive' THEN daily_record_count ELSE 0 END) as records_for_archive,
SUM(CASE WHEN lifecycle_recommendation = 'keep_active' THEN daily_record_count ELSE 0 END) as records_to_keep,
-- Storage optimization potential
ROUND(
SUM(CASE WHEN lifecycle_recommendation IN ('candidate_for_deletion', 'candidate_for_archive')
THEN estimated_bytes_per_day ELSE 0 END) / (1024 * 1024), 1
) as potential_storage_savings_mb
FROM data_lifecycle_analysis
GROUP BY retention_category
ORDER BY
CASE retention_category
WHEN 'hot_data' THEN 1
WHEN 'warm_data' THEN 2
WHEN 'cold_data' THEN 3
WHEN 'archive_data' THEN 4
END;
-- QueryLeaf provides comprehensive time series capabilities:
-- 1. High-performance data ingestion with automatic time series optimization
-- 2. Advanced analytics using familiar SQL window functions and aggregations
-- 3. Real-time alerting and threshold monitoring with SQL expressions
-- 4. Facility and device-level dashboards using complex analytical queries
-- 5. Trend analysis and anomaly detection through statistical SQL functions
-- 6. Data lifecycle management with retention and archiving recommendations
-- 7. Geospatial analytics for location-aware IoT deployments
-- 8. Integration with MongoDB's native time series compression and bucketing
Best Practices for Production Time Series Deployments
Performance Optimization Strategies
Essential optimization techniques for high-throughput time series workloads:
- Time Series Collection Configuration: Choose optimal granularity and bucket settings based on data patterns
- Index Strategy: Create compound indexes optimized for time-range and device queries
- Data Retention: Implement automated lifecycle policies for different data temperatures
- Aggregation Performance: Design materialized views for frequently accessed analytics
- Real-Time Processing: Optimize change streams and triggers for low-latency analytics
- Compression Settings: Configure appropriate compression algorithms for time series data patterns
IoT Architecture Design
Design principles for scalable IoT time series systems:
- Device Management: Implement device registration, health monitoring, and metadata management
- Network Optimization: Design efficient data transmission protocols for IoT constraints
- Edge Processing: Implement edge analytics to reduce data transmission and latency
- Fault Tolerance: Design robust error handling and offline data synchronization
- Security Implementation: Implement device authentication, encryption, and access controls
- Scalability Planning: Plan for horizontal scaling across geographic regions and device growth
Conclusion
MongoDB Time Series Collections provide enterprise-grade IoT data management capabilities that address the unique challenges of sensor data ingestion, real-time analytics, and long-term historical analysis. The purpose-built time series optimizations eliminate the complexity and performance limitations of traditional database approaches while delivering sophisticated analytics and monitoring capabilities at IoT scale.
Key MongoDB Time Series advantages include:
- Optimized Storage: Automatic bucketing and compression specifically designed for time-stamped data
- High-Velocity Ingestion: Purpose-built write optimization for high-frequency sensor data streams
- Advanced Analytics: Sophisticated aggregation pipelines for real-time and historical analytics
- Automatic Lifecycle Management: Built-in data retention and archiving capabilities
- Scalable Architecture: Horizontal scaling optimized for time series query patterns
- SQL Accessibility: Familiar time series operations through QueryLeaf's SQL interface
Whether you're building IoT monitoring systems, industrial sensor networks, environmental tracking applications, or real-time analytics platforms, MongoDB Time Series Collections with QueryLeaf's SQL interface provide the foundation for efficient, scalable, and maintainable time series data management that can adapt to evolving IoT requirements while maintaining familiar database interaction patterns.
QueryLeaf Integration: QueryLeaf automatically optimizes SQL-style time series operations for MongoDB's specialized time series collections, enabling developers to leverage advanced IoT analytics through familiar SQL syntax. Complex sensor data aggregations, real-time alerting logic, and trend analysis queries are seamlessly translated into MongoDB's high-performance time series operations, making sophisticated IoT analytics accessible without requiring specialized time series expertise.
The combination of MongoDB's time series optimizations with SQL-familiar operations creates an ideal platform for IoT applications requiring both high-performance data processing and familiar database interaction patterns, ensuring your IoT systems can scale efficiently while maintaining data accessibility and analytical capabilities.