MongoDB Time Series Collections for IoT Data Management: Real-Time Analytics and High-Performance Data Processing
Modern IoT applications generate massive volumes of time-stamped sensor data that require specialized storage and processing capabilities to handle millions of data points per second while enabling real-time analytics and efficient historical data queries. Traditional database approaches struggle with the scale, write-heavy workloads, and time-based query patterns characteristic of IoT systems, often requiring complex partitioning schemes, multiple storage tiers, and custom optimization strategies that increase operational complexity and development overhead.
MongoDB Time Series Collections provide purpose-built storage optimization for time-stamped data with automatic bucketing, compression, and query optimization specifically designed for IoT workloads. Unlike traditional approaches that require manual time-based partitioning and complex indexing strategies, Time Series Collections automatically organize data by time ranges, apply intelligent compression, and optimize queries for time-based access patterns while maintaining MongoDB's flexible document model and powerful aggregation capabilities.
The Traditional IoT Data Storage Challenge
Conventional approaches to storing and processing IoT time series data face significant scalability and performance limitations:
-- Traditional PostgreSQL time series approach - complex partitioning and limited scalability
-- IoT sensor data with traditional table design
CREATE TABLE sensor_readings (
reading_id BIGSERIAL PRIMARY KEY,
device_id VARCHAR(100) NOT NULL,
sensor_type VARCHAR(50) NOT NULL,
location VARCHAR(200),
-- Time series data
timestamp TIMESTAMPTZ NOT NULL,
value DECIMAL(15,6) NOT NULL,
unit VARCHAR(20),
quality_score DECIMAL(3,2) DEFAULT 1.0,
-- Device and context metadata
device_metadata JSONB,
environmental_conditions JSONB,
-- Data processing flags
processed BOOLEAN DEFAULT FALSE,
anomaly_detected BOOLEAN DEFAULT FALSE,
data_source VARCHAR(100),
-- Partitioning helper columns
year_month INTEGER GENERATED ALWAYS AS (EXTRACT(YEAR FROM timestamp) * 100 + EXTRACT(MONTH FROM timestamp)) STORED,
date_partition DATE GENERATED ALWAYS AS (DATE(timestamp)) STORED
);
-- Complex time-based partitioning (manual maintenance required)
CREATE TABLE sensor_readings_2024_01 PARTITION OF sensor_readings
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE sensor_readings_2024_02 PARTITION OF sensor_readings
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
CREATE TABLE sensor_readings_2024_03 PARTITION OF sensor_readings
FOR VALUES FROM ('2024-03-01') TO ('2024-04-01');
-- Additional partitions must be created manually each month
-- Automation required to prevent partition overflow
-- Indexing strategy for time series queries (expensive maintenance)
CREATE INDEX idx_sensor_readings_device_time ON sensor_readings (device_id, timestamp DESC);
CREATE INDEX idx_sensor_readings_sensor_type_time ON sensor_readings (sensor_type, timestamp DESC);
CREATE INDEX idx_sensor_readings_location_time ON sensor_readings (location, timestamp DESC);
CREATE INDEX idx_sensor_readings_timestamp_only ON sensor_readings (timestamp DESC);
CREATE INDEX idx_sensor_readings_processed_flag ON sensor_readings (processed, timestamp DESC);
-- Additional indexes for different query patterns
CREATE INDEX idx_sensor_readings_anomaly_time ON sensor_readings (anomaly_detected, timestamp DESC) WHERE anomaly_detected = TRUE;
CREATE INDEX idx_sensor_readings_device_type_time ON sensor_readings (device_id, sensor_type, timestamp DESC);
-- Materialized view for real-time aggregations (complex maintenance)
CREATE MATERIALIZED VIEW sensor_readings_hourly_summary AS
WITH hourly_aggregations AS (
SELECT
device_id,
sensor_type,
location,
DATE_TRUNC('hour', timestamp) as hour_bucket,
-- Statistical aggregations
COUNT(*) as reading_count,
AVG(value) as avg_value,
MIN(value) as min_value,
MAX(value) as max_value,
STDDEV(value) as stddev_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) as median_value,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95_value,
-- Data quality metrics
AVG(quality_score) as avg_quality,
COUNT(*) FILTER (WHERE quality_score < 0.8) as low_quality_readings,
COUNT(*) FILTER (WHERE anomaly_detected = true) as anomaly_count,
-- Value change analysis
(MAX(value) - MIN(value)) as value_range,
CASE
WHEN COUNT(*) > 1 THEN
(LAST_VALUE(value) OVER (ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) -
FIRST_VALUE(value) OVER (ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))
ELSE 0
END as value_change_in_hour,
-- Processing statistics
COUNT(*) FILTER (WHERE processed = true) as processed_readings,
(COUNT(*) FILTER (WHERE processed = true)::DECIMAL / COUNT(*)) * 100 as processing_rate_percent,
-- Time coverage analysis
(EXTRACT(EPOCH FROM MAX(timestamp) - MIN(timestamp)) / 3600) as time_coverage_hours,
COUNT(*)::DECIMAL / (EXTRACT(EPOCH FROM MAX(timestamp) - MIN(timestamp)) / 60) as readings_per_minute
FROM sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY device_id, sensor_type, location, DATE_TRUNC('hour', timestamp)
)
SELECT
ha.*,
-- Additional calculated metrics
CASE
WHEN ha.reading_count < 50 THEN 'sparse'
WHEN ha.reading_count < 200 THEN 'normal'
WHEN ha.reading_count < 500 THEN 'dense'
ELSE 'very_dense'
END as data_density_category,
CASE
WHEN ha.avg_quality >= 0.95 THEN 'excellent'
WHEN ha.avg_quality >= 0.8 THEN 'good'
WHEN ha.avg_quality >= 0.6 THEN 'fair'
ELSE 'poor'
END as quality_category,
-- Anomaly rate analysis
CASE
WHEN ha.anomaly_count = 0 THEN 'normal'
WHEN (ha.anomaly_count::DECIMAL / ha.reading_count) < 0.01 THEN 'low_anomalies'
WHEN (ha.anomaly_count::DECIMAL / ha.reading_count) < 0.05 THEN 'moderate_anomalies'
ELSE 'high_anomalies'
END as anomaly_level,
-- Performance indicators
CASE
WHEN ha.readings_per_minute >= 10 THEN 'high_frequency'
WHEN ha.readings_per_minute >= 1 THEN 'medium_frequency'
WHEN ha.readings_per_minute >= 0.1 THEN 'low_frequency'
ELSE 'very_low_frequency'
END as sampling_frequency_category
FROM hourly_aggregations ha;
-- Must be refreshed periodically (expensive operation)
CREATE UNIQUE INDEX idx_sensor_hourly_unique ON sensor_readings_hourly_summary (device_id, sensor_type, location, hour_bucket);
-- Complex query for real-time analytics (resource-intensive)
WITH device_performance AS (
SELECT
sr.device_id,
sr.sensor_type,
sr.location,
DATE_TRUNC('minute', sr.timestamp) as minute_bucket,
-- Real-time aggregations (expensive on large datasets)
COUNT(*) as readings_per_minute,
AVG(sr.value) as avg_value,
STDDEV(sr.value) as value_stability,
-- Change detection (requires window functions)
LAG(AVG(sr.value)) OVER (
PARTITION BY sr.device_id, sr.sensor_type
ORDER BY DATE_TRUNC('minute', sr.timestamp)
) as prev_minute_avg,
-- Quality assessment
AVG(sr.quality_score) as avg_quality,
COUNT(*) FILTER (WHERE sr.anomaly_detected) as anomaly_count,
-- Processing lag calculation
AVG(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - sr.timestamp)) as avg_processing_lag_seconds
FROM sensor_readings sr
WHERE
sr.timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
AND sr.processed = true
GROUP BY sr.device_id, sr.sensor_type, sr.location, DATE_TRUNC('minute', sr.timestamp)
),
real_time_alerts AS (
SELECT
dp.*,
-- Alert conditions
CASE
WHEN ABS(dp.avg_value - dp.prev_minute_avg) > (dp.value_stability * 3) THEN 'value_spike'
WHEN dp.avg_quality < 0.7 THEN 'quality_degradation'
WHEN dp.anomaly_count > 0 THEN 'anomalies_detected'
WHEN dp.avg_processing_lag_seconds > 300 THEN 'processing_delay'
WHEN dp.readings_per_minute < 0.5 THEN 'data_gap'
ELSE 'normal'
END as alert_type,
-- Severity calculation
CASE
WHEN dp.anomaly_count > 10 OR dp.avg_quality < 0.5 THEN 'critical'
WHEN dp.anomaly_count > 5 OR dp.avg_quality < 0.7 OR dp.avg_processing_lag_seconds > 600 THEN 'high'
WHEN dp.anomaly_count > 2 OR dp.avg_quality < 0.8 OR dp.avg_processing_lag_seconds > 300 THEN 'medium'
ELSE 'low'
END as alert_severity,
-- Performance assessment
CASE
WHEN dp.readings_per_minute >= 30 AND dp.avg_quality >= 0.9 THEN 'optimal'
WHEN dp.readings_per_minute >= 10 AND dp.avg_quality >= 0.8 THEN 'good'
WHEN dp.readings_per_minute >= 1 AND dp.avg_quality >= 0.6 THEN 'acceptable'
ELSE 'poor'
END as performance_status
FROM device_performance dp
WHERE dp.minute_bucket >= CURRENT_TIMESTAMP - INTERVAL '15 minutes'
),
device_health_summary AS (
SELECT
rta.device_id,
COUNT(*) as total_minutes_analyzed,
-- Health metrics
COUNT(*) FILTER (WHERE rta.alert_type != 'normal') as minutes_with_alerts,
COUNT(*) FILTER (WHERE rta.alert_severity IN ('critical', 'high')) as critical_minutes,
COUNT(*) FILTER (WHERE rta.performance_status IN ('optimal', 'good')) as good_performance_minutes,
-- Overall device status
AVG(rta.avg_quality) as overall_quality,
AVG(rta.readings_per_minute) as avg_data_rate,
SUM(rta.anomaly_count) as total_anomalies,
-- Most recent status
LAST_VALUE(rta.performance_status) OVER (
PARTITION BY rta.device_id
ORDER BY rta.minute_bucket
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as current_status,
LAST_VALUE(rta.alert_type) OVER (
PARTITION BY rta.device_id
ORDER BY rta.minute_bucket
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as current_alert_type
FROM real_time_alerts rta
GROUP BY rta.device_id
)
-- Final real-time dashboard query
SELECT
dhs.device_id,
dhs.current_status,
dhs.current_alert_type,
-- Health indicators
ROUND(dhs.overall_quality, 3) as quality_score,
ROUND(dhs.avg_data_rate, 1) as data_rate_per_minute,
dhs.total_anomalies,
-- Alert summary
dhs.minutes_with_alerts,
dhs.critical_minutes,
dhs.good_performance_minutes,
-- Performance assessment
ROUND((dhs.good_performance_minutes::DECIMAL / dhs.total_minutes_analyzed) * 100, 1) as uptime_percentage,
ROUND((dhs.minutes_with_alerts::DECIMAL / dhs.total_minutes_analyzed) * 100, 1) as alert_percentage,
-- Device health classification
CASE
WHEN dhs.critical_minutes > 2 OR dhs.overall_quality < 0.6 THEN 'unhealthy'
WHEN dhs.minutes_with_alerts > 5 OR dhs.overall_quality < 0.8 THEN 'degraded'
WHEN dhs.good_performance_minutes >= (dhs.total_minutes_analyzed * 0.8) THEN 'healthy'
ELSE 'monitoring'
END as device_health_status,
-- Recommendations
CASE
WHEN dhs.total_anomalies > 20 THEN 'investigate_sensor_calibration'
WHEN dhs.avg_data_rate < 1 THEN 'check_connectivity'
WHEN dhs.overall_quality < 0.7 THEN 'review_sensor_maintenance'
WHEN dhs.critical_minutes > 0 THEN 'immediate_attention_required'
ELSE 'operating_normally'
END as recommended_action
FROM device_health_summary dhs
ORDER BY
CASE dhs.current_status
WHEN 'poor' THEN 1
WHEN 'acceptable' THEN 2
WHEN 'good' THEN 3
WHEN 'optimal' THEN 4
END,
dhs.critical_minutes DESC,
dhs.total_anomalies DESC;
-- Traditional time series problems:
-- 1. Complex manual partitioning and maintenance overhead
-- 2. Expensive materialized view refreshes for real-time analytics
-- 3. Limited compression and storage optimization for time series data
-- 4. Complex indexing strategies with high maintenance costs
-- 5. Poor write performance under high-volume IoT workloads
-- 6. Difficult horizontal scaling for time series data
-- 7. Limited time-based query optimization
-- 8. Complex time window and rollup aggregations
-- 9. Expensive historical data archiving and cleanup operations
-- 10. No built-in time series specific features and optimizations
MongoDB Time Series Collections provide comprehensive IoT data management with automatic optimization and intelligent compression:
// MongoDB Time Series Collections - Optimized IoT data storage and analytics
const { MongoClient, ObjectId } = require('mongodb');
// Comprehensive IoT Time Series Data Manager
class IoTTimeSeriesManager {
constructor(connectionString, iotConfig = {}) {
this.connectionString = connectionString;
this.client = null;
this.db = null;
this.config = {
// Time series configuration
defaultGranularity: iotConfig.defaultGranularity || 'seconds',
enableAutomaticIndexing: iotConfig.enableAutomaticIndexing !== false,
enableCompression: iotConfig.enableCompression !== false,
// IoT-specific features
enableRealTimeAlerts: iotConfig.enableRealTimeAlerts !== false,
enableAnomalyDetection: iotConfig.enableAnomalyDetection !== false,
enablePredictiveAnalytics: iotConfig.enablePredictiveAnalytics !== false,
enableDataQualityMonitoring: iotConfig.enableDataQualityMonitoring !== false,
// Performance optimization
batchWriteSize: iotConfig.batchWriteSize || 1000,
writeConcern: iotConfig.writeConcern || { w: 1, j: true },
readPreference: iotConfig.readPreference || 'primaryPreferred',
maxConnectionPoolSize: iotConfig.maxConnectionPoolSize || 100,
// Data retention and archival
enableDataLifecycleManagement: iotConfig.enableDataLifecycleManagement !== false,
defaultRetentionDays: iotConfig.defaultRetentionDays || 365,
enableAutomaticArchiving: iotConfig.enableAutomaticArchiving !== false,
// Analytics and processing
enableStreamProcessing: iotConfig.enableStreamProcessing !== false,
enableRealTimeAggregation: iotConfig.enableRealTimeAggregation !== false,
aggregationWindowSize: iotConfig.aggregationWindowSize || '1 minute',
...iotConfig
};
// Time series collections for different data types
this.timeSeriesCollections = new Map();
this.aggregationCollections = new Map();
this.alertCollections = new Map();
// Real-time processing components
this.changeStreams = new Map();
this.processingPipelines = new Map();
this.alertRules = new Map();
// Performance metrics
this.performanceMetrics = {
totalDataPoints: 0,
writeOperationsPerSecond: 0,
queryOperationsPerSecond: 0,
averageLatency: 0,
compressionRatio: 0,
alertsTriggered: 0
};
}
async initializeIoTTimeSeriesSystem() {
console.log('Initializing MongoDB IoT Time Series system...');
try {
// Connect to MongoDB
this.client = new MongoClient(this.connectionString, {
maxPoolSize: this.config.maxConnectionPoolSize,
writeConcern: this.config.writeConcern,
readPreference: this.config.readPreference
});
await this.client.connect();
this.db = this.client.db();
// Create time series collections for different sensor types
await this.createTimeSeriesCollections();
// Setup real-time processing pipelines
if (this.config.enableStreamProcessing) {
await this.setupStreamProcessing();
}
// Initialize real-time aggregations
if (this.config.enableRealTimeAggregation) {
await this.setupRealTimeAggregations();
}
// Setup anomaly detection
if (this.config.enableAnomalyDetection) {
await this.setupAnomalyDetection();
}
// Initialize data lifecycle management
if (this.config.enableDataLifecycleManagement) {
await this.setupDataLifecycleManagement();
}
console.log('IoT Time Series system initialized successfully');
} catch (error) {
console.error('Error initializing IoT Time Series system:', error);
throw error;
}
}
async createTimeSeriesCollections() {
console.log('Creating optimized time series collections...');
// Sensor readings time series collection with automatic bucketing
await this.createOptimizedTimeSeriesCollection('sensor_readings', {
timeField: 'timestamp',
metaField: 'device',
granularity: this.config.defaultGranularity,
bucketMaxSpanSeconds: 3600, // 1 hour buckets
bucketRoundingSeconds: 60, // Round to nearest minute
// Optimize for IoT data patterns
expireAfterSeconds: this.config.defaultRetentionDays * 24 * 60 * 60,
// Index optimization for common IoT queries
additionalIndexes: [
{ 'device.id': 1, 'timestamp': -1 },
{ 'device.type': 1, 'timestamp': -1 },
{ 'device.location': 1, 'timestamp': -1 },
{ 'sensor.type': 1, 'timestamp': -1 }
]
});
// Environmental monitoring time series
await this.createOptimizedTimeSeriesCollection('environmental_data', {
timeField: 'timestamp',
metaField: 'location',
granularity: 'minutes',
bucketMaxSpanSeconds: 7200, // 2 hour buckets for slower changing data
additionalIndexes: [
{ 'location.facility': 1, 'location.zone': 1, 'timestamp': -1 },
{ 'sensor_type': 1, 'timestamp': -1 }
]
});
// Equipment performance monitoring
await this.createOptimizedTimeSeriesCollection('equipment_metrics', {
timeField: 'timestamp',
metaField: 'equipment',
granularity: 'seconds',
bucketMaxSpanSeconds: 1800, // 30 minute buckets for high-frequency data
additionalIndexes: [
{ 'equipment.id': 1, 'equipment.type': 1, 'timestamp': -1 },
{ 'metric_type': 1, 'timestamp': -1 }
]
});
// Energy consumption tracking
await this.createOptimizedTimeSeriesCollection('energy_consumption', {
timeField: 'timestamp',
metaField: 'meter',
granularity: 'minutes',
bucketMaxSpanSeconds: 3600, // 1 hour buckets
additionalIndexes: [
{ 'meter.id': 1, 'timestamp': -1 },
{ 'meter.building': 1, 'meter.floor': 1, 'timestamp': -1 }
]
});
// Vehicle telemetry data
await this.createOptimizedTimeSeriesCollection('vehicle_telemetry', {
timeField: 'timestamp',
metaField: 'vehicle',
granularity: 'seconds',
bucketMaxSpanSeconds: 900, // 15 minute buckets for mobile data
additionalIndexes: [
{ 'vehicle.id': 1, 'timestamp': -1 },
{ 'vehicle.route': 1, 'timestamp': -1 },
{ 'telemetry_type': 1, 'timestamp': -1 }
]
});
console.log('Time series collections created successfully');
}
async createOptimizedTimeSeriesCollection(collectionName, config) {
console.log(`Creating time series collection: ${collectionName}`);
try {
// Create time series collection with MongoDB's native optimization
const collection = await this.db.createCollection(collectionName, {
timeseries: {
timeField: config.timeField,
metaField: config.metaField,
granularity: config.granularity,
bucketMaxSpanSeconds: config.bucketMaxSpanSeconds,
bucketRoundingSeconds: config.bucketRoundingSeconds || 60
},
// Set TTL for automatic data expiration
...(config.expireAfterSeconds && {
expireAfterSeconds: config.expireAfterSeconds
}),
// Enable compression for storage optimization
storageEngine: {
wiredTiger: {
configString: 'block_compressor=zstd'
}
}
});
// Create additional indexes for query optimization
if (config.additionalIndexes) {
await collection.createIndexes(
config.additionalIndexes.map(indexSpec => ({
key: indexSpec,
background: true,
name: `idx_${Object.keys(indexSpec).join('_')}`
}))
);
}
// Store collection reference and configuration
this.timeSeriesCollections.set(collectionName, {
collection: collection,
config: config,
stats: {
documentsInserted: 0,
bytesStored: 0,
compressionRatio: 0,
lastInsertTime: null
}
});
console.log(`Time series collection ${collectionName} created successfully`);
} catch (error) {
console.error(`Error creating time series collection ${collectionName}:`, error);
throw error;
}
}
async insertSensorData(collectionName, sensorDataPoints) {
const startTime = Date.now();
try {
const collectionInfo = this.timeSeriesCollections.get(collectionName);
if (!collectionInfo) {
throw new Error(`Time series collection ${collectionName} not found`);
}
const collection = collectionInfo.collection;
// Prepare data points with enhanced metadata
const enhancedDataPoints = sensorDataPoints.map(dataPoint => ({
// Time series fields
timestamp: dataPoint.timestamp || new Date(),
// Device/sensor metadata (automatically indexed as metaField)
device: {
id: dataPoint.deviceId,
type: dataPoint.deviceType || 'generic_sensor',
location: {
facility: dataPoint.facility || 'unknown',
zone: dataPoint.zone || 'default',
coordinates: dataPoint.coordinates || null,
floor: dataPoint.floor || null,
room: dataPoint.room || null
},
firmware: dataPoint.firmwareVersion || null,
manufacturer: dataPoint.manufacturer || null,
model: dataPoint.model || null,
installDate: dataPoint.installDate || null
},
// Sensor information
sensor: {
type: dataPoint.sensorType,
unit: dataPoint.unit || null,
precision: dataPoint.precision || null,
calibrationDate: dataPoint.calibrationDate || null,
maintenanceSchedule: dataPoint.maintenanceSchedule || null
},
// Measurement data
value: dataPoint.value,
rawValue: dataPoint.rawValue || dataPoint.value,
// Data quality indicators
quality: {
score: dataPoint.qualityScore || 1.0,
flags: dataPoint.qualityFlags || [],
confidence: dataPoint.confidence || 1.0,
calibrationStatus: dataPoint.calibrationStatus || 'valid',
sensorHealth: dataPoint.sensorHealth || 'healthy'
},
// Environmental context
environmentalConditions: {
temperature: dataPoint.ambientTemperature || null,
humidity: dataPoint.ambientHumidity || null,
pressure: dataPoint.atmosphericPressure || null,
vibration: dataPoint.vibrationLevel || null,
electricalNoise: dataPoint.electricalNoise || null
},
// Processing metadata
processing: {
receivedAt: new Date(),
source: dataPoint.dataSource || 'direct',
protocol: dataPoint.protocol || 'unknown',
gateway: dataPoint.gatewayId || null,
processingLatency: dataPoint.processingLatency || null,
networkLatency: dataPoint.networkLatency || null
},
// Alert and anomaly flags
alerts: {
anomalyDetected: dataPoint.anomalyDetected || false,
thresholdViolation: dataPoint.thresholdViolation || null,
alertLevel: dataPoint.alertLevel || 'normal',
alertReason: dataPoint.alertReason || null
},
// Business context
businessContext: {
assetId: dataPoint.assetId || null,
processId: dataPoint.processId || null,
operationalMode: dataPoint.operationalMode || 'normal',
shiftId: dataPoint.shiftId || null,
operatorId: dataPoint.operatorId || null
},
// Additional custom metadata
customMetadata: dataPoint.customMetadata || {}
}));
// Perform batch insert with write concern
const result = await collection.insertMany(enhancedDataPoints, {
writeConcern: this.config.writeConcern,
ordered: false // Allow partial success for better performance
});
const insertTime = Date.now() - startTime;
// Update collection statistics
collectionInfo.stats.documentsInserted += result.insertedCount;
collectionInfo.stats.lastInsertTime = new Date();
// Update performance metrics
this.updatePerformanceMetrics('insert', result.insertedCount, insertTime);
// Trigger real-time processing if enabled
if (this.config.enableStreamProcessing) {
await this.processRealTimeData(collectionName, enhancedDataPoints);
}
// Check for alerts if enabled
if (this.config.enableRealTimeAlerts) {
await this.checkAlertConditions(collectionName, enhancedDataPoints);
}
console.log(`Inserted ${result.insertedCount} sensor data points into ${collectionName} in ${insertTime}ms`);
return {
success: true,
collection: collectionName,
insertedCount: result.insertedCount,
insertTime: insertTime,
insertedIds: result.insertedIds
};
} catch (error) {
console.error(`Error inserting sensor data into ${collectionName}:`, error);
return {
success: false,
collection: collectionName,
error: error.message,
insertTime: Date.now() - startTime
};
}
}
async queryTimeSeriesData(collectionName, query) {
const startTime = Date.now();
try {
const collectionInfo = this.timeSeriesCollections.get(collectionName);
if (!collectionInfo) {
throw new Error(`Time series collection ${collectionName} not found`);
}
const collection = collectionInfo.collection;
// Build comprehensive aggregation pipeline for time series analysis
const pipeline = [
// Time range filtering (optimized for time series collections)
{
$match: {
timestamp: {
$gte: query.startTime,
$lte: query.endTime || new Date()
},
...(query.deviceIds && { 'device.id': { $in: query.deviceIds } }),
...(query.deviceTypes && { 'device.type': { $in: query.deviceTypes } }),
...(query.sensorTypes && { 'sensor.type': { $in: query.sensorTypes } }),
...(query.locations && { 'device.location.facility': { $in: query.locations } }),
...(query.minQualityScore && { 'quality.score': { $gte: query.minQualityScore } }),
...(query.alertLevel && { 'alerts.alertLevel': query.alertLevel })
}
},
// Time-based grouping and aggregation
{
$group: {
_id: {
deviceId: '$device.id',
deviceType: '$device.type',
sensorType: '$sensor.type',
location: '$device.location',
// Time bucketing based on query granularity
timeBucket: query.granularity === 'hour'
? { $dateTrunc: { date: '$timestamp', unit: 'hour' } }
: query.granularity === 'minute'
? { $dateTrunc: { date: '$timestamp', unit: 'minute' } }
: query.granularity === 'day'
? { $dateTrunc: { date: '$timestamp', unit: 'day' } }
: '$timestamp' // Raw timestamp for second-level granularity
},
// Statistical aggregations
count: { $sum: 1 },
avgValue: { $avg: '$value' },
minValue: { $min: '$value' },
maxValue: { $max: '$value' },
sumValue: { $sum: '$value' },
// Advanced statistical measures
stdDevValue: { $stdDevPop: '$value' },
varianceValue: { $pow: [{ $stdDevPop: '$value' }, 2] },
// Percentile calculations using $percentile (MongoDB 7.0+)
percentiles: {
$percentile: {
input: '$value',
p: [0.25, 0.5, 0.75, 0.9, 0.95, 0.99],
method: 'approximate'
}
},
// Data quality metrics
avgQualityScore: { $avg: '$quality.score' },
minQualityScore: { $min: '$quality.score' },
lowQualityCount: {
$sum: { $cond: [{ $lt: ['$quality.score', 0.8] }, 1, 0] }
},
// Alert and anomaly statistics
anomalyCount: {
$sum: { $cond: ['$alerts.anomalyDetected', 1, 0] }
},
alertCounts: {
$push: {
$cond: [
{ $ne: ['$alerts.alertLevel', 'normal'] },
'$alerts.alertLevel',
'$$REMOVE'
]
}
},
// Time-based metrics
firstReading: { $min: '$timestamp' },
lastReading: { $max: '$timestamp' },
// Value change analysis
valueRange: { $subtract: [{ $max: '$value' }, { $min: '$value' }] },
// Environmental conditions (if available)
avgAmbientTemp: { $avg: '$environmentalConditions.temperature' },
avgAmbientHumidity: { $avg: '$environmentalConditions.humidity' },
// Processing performance
avgProcessingLatency: { $avg: '$processing.processingLatency' },
maxProcessingLatency: { $max: '$processing.processingLatency' },
// Raw data points (if requested for detailed analysis)
...(query.includeRawData && {
rawDataPoints: {
$push: {
timestamp: '$timestamp',
value: '$value',
quality: '$quality.score',
anomaly: '$alerts.anomalyDetected'
}
}
})
}
},
// Calculate additional derived metrics
{
$addFields: {
// Time coverage and sampling rate analysis
timeCoverageSeconds: {
$divide: [
{ $subtract: ['$lastReading', '$firstReading'] },
1000
]
},
// Data completeness analysis
expectedReadings: {
$cond: [
{ $eq: [query.granularity, 'minute'] },
{ $divide: [{ $subtract: ['$lastReading', '$firstReading'] }, 60000] },
{ $cond: [
{ $eq: [query.granularity, 'hour'] },
{ $divide: [{ $subtract: ['$lastReading', '$firstReading'] }, 3600000] },
'$count'
]}
]
},
// Statistical analysis
coefficientOfVariation: {
$cond: [
{ $ne: ['$avgValue', 0] },
{ $divide: ['$stdDevValue', '$avgValue'] },
0
]
},
// Data quality percentage
qualityPercentage: {
$multiply: [
{ $divide: [
{ $subtract: ['$count', '$lowQualityCount'] },
'$count'
]},
100
]
},
// Anomaly rate
anomalyRate: {
$multiply: [
{ $divide: ['$anomalyCount', '$count'] },
100
]
},
// Alert distribution
alertDistribution: {
$reduce: {
input: '$alertCounts',
initialValue: {},
in: {
$mergeObjects: [
'$$value',
{ ['$$this']: { $add: [{ $ifNull: [{ $getField: { field: '$$this', input: '$$value' } }, 0] }, 1] } }
]
}
}
},
// Performance classification
performanceCategory: {
$switch: {
branches: [
{
case: {
$and: [
{ $gte: ['$qualityPercentage', 95] },
{ $lt: ['$anomalyRate', 1] },
{ $lte: ['$avgProcessingLatency', 100] }
]
},
then: 'excellent'
},
{
case: {
$and: [
{ $gte: ['$qualityPercentage', 85] },
{ $lt: ['$anomalyRate', 5] },
{ $lte: ['$avgProcessingLatency', 300] }
]
},
then: 'good'
},
{
case: {
$and: [
{ $gte: ['$qualityPercentage', 70] },
{ $lt: ['$anomalyRate', 10] }
]
},
then: 'fair'
}
],
default: 'poor'
}
},
// Trending analysis (basic)
valueTrend: {
$cond: [
{ $and: [
{ $ne: ['$minValue', '$maxValue'] },
{ $gt: ['$count', 1] }
]},
{
$switch: {
branches: [
{ case: { $gt: ['$valueRange', { $multiply: ['$avgValue', 0.2] }] }, then: 'volatile' },
{ case: { $gt: ['$coefficientOfVariation', 0.3] }, then: 'variable' },
{ case: { $lt: ['$coefficientOfVariation', 0.1] }, then: 'stable' }
],
default: 'moderate'
}
},
'insufficient_data'
]
}
}
},
// Data completeness analysis
{
$addFields: {
dataCompleteness: {
$multiply: [
{ $divide: ['$count', { $max: ['$expectedReadings', 1] }] },
100
]
},
// Sampling rate (readings per minute)
samplingRate: {
$cond: [
{ $gt: ['$timeCoverageSeconds', 0] },
{ $divide: ['$count', { $divide: ['$timeCoverageSeconds', 60] }] },
0
]
}
}
},
// Final projection and organization
{
$project: {
// Identity fields
deviceId: '$_id.deviceId',
deviceType: '$_id.deviceType',
sensorType: '$_id.sensorType',
location: '$_id.location',
timeBucket: '$_id.timeBucket',
// Basic statistics
dataPoints: '$count',
statistics: {
avg: { $round: ['$avgValue', 4] },
min: '$minValue',
max: '$maxValue',
sum: { $round: ['$sumValue', 2] },
stdDev: { $round: ['$stdDevValue', 4] },
variance: { $round: ['$varianceValue', 4] },
coefficientOfVariation: { $round: ['$coefficientOfVariation', 4] },
valueRange: { $round: ['$valueRange', 4] },
percentiles: '$percentiles'
},
// Data quality metrics
dataQuality: {
avgScore: { $round: ['$avgQualityScore', 3] },
minScore: { $round: ['$minQualityScore', 3] },
qualityPercentage: { $round: ['$qualityPercentage', 1] },
lowQualityCount: '$lowQualityCount'
},
// Alert and anomaly information
alerts: {
anomalyCount: '$anomalyCount',
anomalyRate: { $round: ['$anomalyRate', 2] },
alertDistribution: '$alertDistribution'
},
// Time-based analysis
temporal: {
firstReading: '$firstReading',
lastReading: '$lastReading',
timeCoverageSeconds: { $round: ['$timeCoverageSeconds', 0] },
dataCompleteness: { $round: ['$dataCompleteness', 1] },
samplingRate: { $round: ['$samplingRate', 2] }
},
// Environmental context
environment: {
avgTemperature: { $round: ['$avgAmbientTemp', 1] },
avgHumidity: { $round: ['$avgAmbientHumidity', 1] }
},
// Performance metrics
performance: {
avgProcessingLatency: { $round: ['$avgProcessingLatency', 0] },
maxProcessingLatency: { $round: ['$maxProcessingLatency', 0] },
performanceCategory: '$performanceCategory'
},
// Analysis results
analysis: {
valueTrend: '$valueTrend',
overallAssessment: {
$switch: {
branches: [
{
case: {
$and: [
{ $eq: ['$performanceCategory', 'excellent'] },
{ $gte: ['$dataCompleteness', 95] }
]
},
then: 'optimal_performance'
},
{
case: {
$and: [
{ $in: ['$performanceCategory', ['good', 'excellent']] },
{ $gte: ['$dataCompleteness', 80] }
]
},
then: 'good_performance'
},
{
case: { $lt: ['$dataCompleteness', 50] },
then: 'data_gaps_detected'
},
{
case: { $gt: ['$anomalyRate', 15] },
then: 'high_anomaly_rate'
},
{
case: { $lt: ['$qualityPercentage', 70] },
then: 'quality_issues'
}
],
default: 'acceptable_performance'
}
}
},
// Include raw data if requested
...(query.includeRawData && { rawDataPoints: 1 })
}
},
// Sort results
{ $sort: { deviceId: 1, timeBucket: 1 } },
// Apply result limits
...(query.limit && [{ $limit: query.limit }])
];
// Execute aggregation pipeline
const results = await collection.aggregate(pipeline, {
allowDiskUse: true,
maxTimeMS: 30000
}).toArray();
const queryTime = Date.now() - startTime;
// Update performance metrics
this.updatePerformanceMetrics('query', results.length, queryTime);
console.log(`Time series query completed: ${results.length} results in ${queryTime}ms`);
return {
success: true,
collection: collectionName,
results: results,
resultCount: results.length,
queryTime: queryTime,
queryMetadata: {
timeRange: {
start: query.startTime,
end: query.endTime || new Date()
},
granularity: query.granularity || 'raw',
filters: {
deviceIds: query.deviceIds?.length || 0,
deviceTypes: query.deviceTypes?.length || 0,
sensorTypes: query.sensorTypes?.length || 0,
locations: query.locations?.length || 0
},
optimizationsApplied: ['time_series_bucketing', 'statistical_aggregation', 'index_optimization']
}
};
} catch (error) {
console.error(`Error querying time series data from ${collectionName}:`, error);
return {
success: false,
collection: collectionName,
error: error.message,
queryTime: Date.now() - startTime
};
}
}
async setupRealTimeAggregations() {
console.log('Setting up real-time aggregation pipelines...');
// Create aggregation collections for different time windows
const aggregationConfigs = [
{
name: 'sensor_readings_1min',
sourceCollection: 'sensor_readings',
windowSize: '1 minute',
retentionDays: 7
},
{
name: 'sensor_readings_5min',
sourceCollection: 'sensor_readings',
windowSize: '5 minutes',
retentionDays: 30
},
{
name: 'sensor_readings_1hour',
sourceCollection: 'sensor_readings',
windowSize: '1 hour',
retentionDays: 365
},
{
name: 'sensor_readings_1day',
sourceCollection: 'sensor_readings',
windowSize: '1 day',
retentionDays: 1825 // 5 years
}
];
for (const config of aggregationConfigs) {
await this.createAggregationPipeline(config);
}
console.log('Real-time aggregation pipelines setup completed');
}
async createAggregationPipeline(config) {
console.log(`Creating aggregation pipeline: ${config.name}`);
// Create collection for storing aggregated data
const aggregationCollection = await this.db.createCollection(config.name, {
timeseries: {
timeField: 'timestamp',
metaField: 'device',
granularity: config.windowSize.includes('minute') ? 'minutes' :
config.windowSize.includes('hour') ? 'hours' : 'days'
},
expireAfterSeconds: config.retentionDays * 24 * 60 * 60
});
this.aggregationCollections.set(config.name, {
collection: aggregationCollection,
config: config
});
}
async processRealTimeData(collectionName, dataPoints) {
console.log(`Processing real-time data for ${collectionName}: ${dataPoints.length} points`);
// Update real-time aggregations
for (const [aggName, aggInfo] of this.aggregationCollections.entries()) {
if (aggInfo.config.sourceCollection === collectionName) {
await this.updateRealTimeAggregation(aggName, dataPoints);
}
}
// Process data through ML pipelines if enabled
if (this.config.enablePredictiveAnalytics) {
await this.processPredictiveAnalytics(dataPoints);
}
}
async checkAlertConditions(collectionName, dataPoints) {
console.log(`Checking alert conditions for ${dataPoints.length} data points`);
const alertsTriggered = [];
for (const dataPoint of dataPoints) {
// Check various alert conditions
const alerts = [];
// Value threshold alerts
if (dataPoint.sensor.type === 'temperature' && dataPoint.value > 80) {
alerts.push({
type: 'threshold_violation',
severity: 'high',
message: `Temperature ${dataPoint.value}°C exceeds threshold`,
deviceId: dataPoint.device.id
});
}
// Quality score alerts
if (dataPoint.quality.score < 0.7) {
alerts.push({
type: 'quality_degradation',
severity: 'medium',
message: `Quality score ${dataPoint.quality.score} below acceptable level`,
deviceId: dataPoint.device.id
});
}
// Anomaly alerts
if (dataPoint.alerts.anomalyDetected) {
alerts.push({
type: 'anomaly_detected',
severity: 'high',
message: `Anomaly detected in sensor reading`,
deviceId: dataPoint.device.id
});
}
// Processing latency alerts
if (dataPoint.processing.processingLatency > 5000) { // 5 seconds
alerts.push({
type: 'processing_delay',
severity: 'medium',
message: `Processing latency ${dataPoint.processing.processingLatency}ms exceeds threshold`,
deviceId: dataPoint.device.id
});
}
if (alerts.length > 0) {
alertsTriggered.push(...alerts);
this.performanceMetrics.alertsTriggered += alerts.length;
}
}
// Store alerts if any were triggered
if (alertsTriggered.length > 0) {
await this.storeAlerts(alertsTriggered);
}
return alertsTriggered;
}
async storeAlerts(alerts) {
try {
// Create alerts collection if it doesn't exist
if (!this.alertCollections.has('iot_alerts')) {
const alertsCollection = await this.db.createCollection('iot_alerts');
await alertsCollection.createIndexes([
{ key: { deviceId: 1, timestamp: -1 }, background: true },
{ key: { severity: 1, timestamp: -1 }, background: true },
{ key: { type: 1, timestamp: -1 }, background: true }
]);
this.alertCollections.set('iot_alerts', alertsCollection);
}
const alertsCollection = this.alertCollections.get('iot_alerts');
const alertDocuments = alerts.map(alert => ({
...alert,
timestamp: new Date(),
acknowledged: false,
resolvedAt: null
}));
await alertsCollection.insertMany(alertDocuments);
console.log(`Stored ${alertDocuments.length} alerts`);
} catch (error) {
console.error('Error storing alerts:', error);
}
}
updatePerformanceMetrics(operation, count, duration) {
if (operation === 'insert') {
this.performanceMetrics.totalDataPoints += count;
this.performanceMetrics.writeOperationsPerSecond =
(count / duration) * 1000;
} else if (operation === 'query') {
this.performanceMetrics.queryOperationsPerSecond =
(count / duration) * 1000;
}
// Update average latency
this.performanceMetrics.averageLatency =
(this.performanceMetrics.averageLatency + duration) / 2;
}
async getSystemStatistics() {
console.log('Gathering IoT Time Series system statistics...');
const stats = {
collections: {},
performance: this.performanceMetrics,
aggregations: {},
systemHealth: 'healthy'
};
// Gather statistics from each time series collection
for (const [collectionName, collectionInfo] of this.timeSeriesCollections.entries()) {
try {
const collection = collectionInfo.collection;
const [collectionStats, sampleData] = await Promise.all([
collection.stats(),
collection.find().sort({ timestamp: -1 }).limit(1).toArray()
]);
stats.collections[collectionName] = {
documentCount: collectionStats.count || 0,
storageSize: collectionStats.size || 0,
indexSize: collectionStats.totalIndexSize || 0,
avgDocumentSize: collectionStats.avgObjSize || 0,
compressionRatio: collectionStats.size > 0 ?
(collectionStats.storageSize / collectionStats.size) : 1,
lastDataPoint: sampleData[0]?.timestamp || null,
configuration: collectionInfo.config,
performance: collectionInfo.stats
};
} catch (error) {
stats.collections[collectionName] = {
error: error.message,
available: false
};
}
}
return stats;
}
async shutdown() {
console.log('Shutting down IoT Time Series Manager...');
// Close change streams
for (const [streamName, changeStream] of this.changeStreams.entries()) {
try {
await changeStream.close();
console.log(`Closed change stream: ${streamName}`);
} catch (error) {
console.error(`Error closing change stream ${streamName}:`, error);
}
}
// Close MongoDB connection
if (this.client) {
await this.client.close();
}
console.log('IoT Time Series Manager shutdown complete');
}
}
// Benefits of MongoDB Time Series Collections:
// - Native time series optimization with automatic bucketing and compression
// - Specialized indexing and query optimization for time-based data patterns
// - Efficient storage with automatic data lifecycle management
// - Real-time aggregation pipelines for IoT analytics
// - Built-in support for high-volume write workloads
// - Intelligent compression reducing storage costs by up to 90%
// - Seamless integration with MongoDB's distributed architecture
// - SQL-compatible time series operations through QueryLeaf integration
// - Native support for IoT-specific query patterns and analytics
// - Automatic data archiving and retention management
module.exports = {
IoTTimeSeriesManager
};
Understanding MongoDB Time Series Collections Architecture
IoT Data Patterns and Optimization Strategies
MongoDB Time Series Collections are specifically designed for the unique characteristics of IoT and time-stamped data:
// Advanced IoT Time Series Processing with Enterprise Features
class EnterpriseIoTProcessor extends IoTTimeSeriesManager {
constructor(connectionString, enterpriseConfig) {
super(connectionString, enterpriseConfig);
this.enterpriseConfig = {
...enterpriseConfig,
enableEdgeComputing: true,
enablePredictiveAnalytics: true,
enableDigitalTwins: true,
enableMLPipelines: true,
enableAdvancedVisualization: true,
enableMultiTenancy: true
};
this.setupEnterpriseFeatures();
this.initializeMLPipelines();
this.setupDigitalTwins();
}
async implementAdvancedIoTStrategies() {
console.log('Implementing enterprise IoT data strategies...');
const strategies = {
// Edge computing integration
edgeComputing: {
edgeDataAggregation: true,
intelligentFiltering: true,
localAnomalyDetection: true,
bandwidthOptimization: true
},
// Predictive analytics
predictiveAnalytics: {
equipmentFailurePrediction: true,
energyOptimization: true,
maintenanceScheduling: true,
capacityPlanning: true
},
// Digital twin implementation
digitalTwins: {
realTimeSimulation: true,
processOptimization: true,
scenarioModeling: true,
performanceAnalytics: true
}
};
return await this.deployEnterpriseIoTStrategies(strategies);
}
async setupAdvancedAnalytics() {
console.log('Setting up advanced IoT analytics capabilities...');
const analyticsConfig = {
// Real-time processing
realTimeProcessing: {
streamProcessing: true,
complexEventProcessing: true,
patternRecognition: true,
correlationAnalysis: true
},
// Machine learning integration
machineLearning: {
anomalyDetection: true,
predictiveModeling: true,
classificationModels: true,
reinforcementLearning: true
},
// Advanced visualization
visualization: {
realTimeDashboards: true,
historicalAnalytics: true,
geospatialVisualization: true,
threeDimensionalModeling: true
}
};
return await this.deployAdvancedAnalytics(analyticsConfig);
}
}
SQL-Style Time Series Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB Time Series operations and IoT data management:
-- QueryLeaf Time Series operations with SQL-familiar syntax for IoT data
-- Create optimized time series collections for different IoT data types
CREATE TIME_SERIES_COLLECTION sensor_readings (
timestamp TIMESTAMPTZ,
device_id STRING,
sensor_type STRING,
value DECIMAL,
quality_score DECIMAL,
location OBJECT,
metadata OBJECT
)
WITH (
time_field = 'timestamp',
meta_field = 'device_metadata',
granularity = 'seconds',
bucket_max_span_seconds = 3600,
bucket_rounding_seconds = 60,
expire_after_seconds = 31536000, -- 1 year retention
enable_compression = true,
compression_algorithm = 'zstd'
);
-- Create specialized collections for different IoT use cases
CREATE TIME_SERIES_COLLECTION equipment_telemetry (
timestamp TIMESTAMPTZ,
equipment_id STRING,
metric_type STRING,
value DECIMAL,
operational_status STRING,
maintenance_flags ARRAY,
performance_indicators OBJECT
)
WITH (
granularity = 'seconds',
bucket_max_span_seconds = 1800, -- 30 minute buckets for high-frequency data
enable_automatic_indexing = true
);
CREATE TIME_SERIES_COLLECTION environmental_monitoring (
timestamp TIMESTAMPTZ,
location_id STRING,
sensor_network STRING,
measurements OBJECT,
weather_conditions OBJECT,
air_quality_index DECIMAL
)
WITH (
granularity = 'minutes',
bucket_max_span_seconds = 7200, -- 2 hour buckets for environmental data
enable_predictive_analytics = true
);
-- Advanced IoT data insertion with comprehensive metadata
INSERT INTO sensor_readings (
timestamp, device_metadata, sensor_info, measurements, quality_metrics, context
)
WITH iot_data_enrichment AS (
SELECT
reading_timestamp as timestamp,
-- Device and location metadata (optimized as metaField)
JSON_OBJECT(
'device_id', device_identifier,
'device_type', equipment_type,
'firmware_version', firmware_ver,
'location', JSON_OBJECT(
'facility', facility_name,
'zone', zone_identifier,
'coordinates', JSON_OBJECT('lat', latitude, 'lng', longitude),
'floor', floor_number,
'room', room_identifier
),
'network_info', JSON_OBJECT(
'gateway_id', gateway_identifier,
'signal_strength', rssi_value,
'protocol', communication_protocol,
'network_latency', network_delay_ms
)
) as device_metadata,
-- Sensor information and calibration data
JSON_OBJECT(
'sensor_type', sensor_category,
'model_number', sensor_model,
'serial_number', sensor_serial,
'calibration_date', last_calibration,
'maintenance_schedule', maintenance_interval,
'measurement_unit', measurement_units,
'precision', sensor_precision,
'accuracy', sensor_accuracy_percent,
'operating_range', JSON_OBJECT(
'min_value', minimum_measurable,
'max_value', maximum_measurable,
'optimal_range', optimal_operating_range
)
) as sensor_info,
-- Measurement data with statistical context
JSON_OBJECT(
'primary_value', sensor_reading,
'raw_value', unprocessed_reading,
'calibrated_value', calibration_adjusted_value,
'statistical_context', JSON_OBJECT(
'recent_average', rolling_average_10min,
'recent_min', rolling_min_10min,
'recent_max', rolling_max_10min,
'trend_indicator', trend_direction,
'volatility_index', measurement_volatility
),
'related_measurements', JSON_OBJECT(
'secondary_sensors', related_sensor_readings,
'environmental_factors', ambient_conditions,
'operational_context', equipment_operating_mode
)
) as measurements,
-- Comprehensive quality assessment
JSON_OBJECT(
'overall_score', data_quality_score,
'confidence_level', measurement_confidence,
'quality_factors', JSON_OBJECT(
'sensor_health', sensor_status_indicator,
'calibration_validity', calibration_status,
'environmental_conditions', environmental_suitability,
'signal_integrity', signal_quality_assessment,
'power_status', power_supply_stability
),
'quality_flags', quality_warning_flags,
'anomaly_indicators', JSON_OBJECT(
'statistical_anomaly', statistical_outlier_flag,
'temporal_anomaly', temporal_pattern_anomaly,
'contextual_anomaly', contextual_deviation_flag,
'severity_level', anomaly_severity_rating
)
) as quality_metrics,
-- Business and operational context
JSON_OBJECT(
'business_context', JSON_OBJECT(
'asset_id', primary_asset_identifier,
'process_id', manufacturing_process_id,
'production_batch', current_batch_identifier,
'shift_information', JSON_OBJECT(
'shift_id', current_shift,
'operator_id', responsible_operator,
'supervisor_id', shift_supervisor
)
),
'operational_context', JSON_OBJECT(
'equipment_mode', current_operational_mode,
'production_rate', current_production_speed,
'efficiency_metrics', operational_efficiency_data,
'maintenance_status', equipment_maintenance_state,
'compliance_flags', regulatory_compliance_status
),
'alert_configuration', JSON_OBJECT(
'threshold_settings', alert_threshold_values,
'notification_rules', alert_notification_config,
'escalation_procedures', alert_escalation_rules,
'suppression_conditions', alert_suppression_rules
)
) as context
FROM raw_iot_data_stream
WHERE
data_timestamp >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
AND data_quality_preliminary >= 0.5
AND device_status != 'maintenance_mode'
)
SELECT
timestamp,
device_metadata,
sensor_info,
measurements,
quality_metrics,
context,
-- Processing metadata
JSON_OBJECT(
'ingestion_timestamp', CURRENT_TIMESTAMP,
'processing_latency_ms',
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - timestamp)) * 1000,
'data_pipeline_version', '2.1.0',
'enrichment_applied', JSON_ARRAY(
'metadata_enhancement',
'quality_assessment',
'anomaly_detection',
'contextual_enrichment'
)
) as processing_metadata
FROM iot_data_enrichment
WHERE
-- Final data quality validation
JSON_EXTRACT(quality_metrics, '$.overall_score') >= 0.6
AND JSON_EXTRACT(measurements, '$.primary_value') IS NOT NULL
ORDER BY timestamp;
-- Real-time IoT analytics with time-based aggregations
WITH real_time_sensor_analytics AS (
SELECT
DATE_TRUNC('minute', timestamp) as time_bucket,
JSON_EXTRACT(device_metadata, '$.device_id') as device_id,
JSON_EXTRACT(device_metadata, '$.device_type') as device_type,
JSON_EXTRACT(device_metadata, '$.location.facility') as facility,
JSON_EXTRACT(device_metadata, '$.location.zone') as zone,
JSON_EXTRACT(sensor_info, '$.sensor_type') as sensor_type,
-- Statistical aggregations optimized for time series
COUNT(*) as reading_count,
AVG(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as avg_value,
MIN(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as min_value,
MAX(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as max_value,
STDDEV(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as stddev_value,
-- Percentile calculations for distribution analysis
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as p25_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as median_value,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as p75_value,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as p95_value,
-- Data quality aggregations
AVG(JSON_EXTRACT(quality_metrics, '$.overall_score')::DECIMAL) as avg_quality_score,
MIN(JSON_EXTRACT(quality_metrics, '$.overall_score')::DECIMAL) as min_quality_score,
COUNT(*) FILTER (WHERE JSON_EXTRACT(quality_metrics, '$.overall_score')::DECIMAL < 0.8) as low_quality_readings,
-- Anomaly detection aggregations
COUNT(*) FILTER (WHERE JSON_EXTRACT(quality_metrics, '$.anomaly_indicators.statistical_anomaly')::BOOLEAN = true) as statistical_anomalies,
COUNT(*) FILTER (WHERE JSON_EXTRACT(quality_metrics, '$.anomaly_indicators.temporal_anomaly')::BOOLEAN = true) as temporal_anomalies,
COUNT(*) FILTER (WHERE JSON_EXTRACT(quality_metrics, '$.anomaly_indicators.contextual_anomaly')::BOOLEAN = true) as contextual_anomalies,
-- Value change and trend analysis
(MAX(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) -
MIN(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL)) as value_range,
-- Time coverage and sampling analysis
(EXTRACT(EPOCH FROM MAX(timestamp) - MIN(timestamp))) as time_span_seconds,
COUNT(*)::DECIMAL / GREATEST(1, EXTRACT(EPOCH FROM MAX(timestamp) - MIN(timestamp)) / 60) as readings_per_minute,
-- Processing performance metrics
AVG(JSON_EXTRACT(processing_metadata, '$.processing_latency_ms')::DECIMAL) as avg_processing_latency,
MAX(JSON_EXTRACT(processing_metadata, '$.processing_latency_ms')::DECIMAL) as max_processing_latency,
-- Network performance indicators
AVG(JSON_EXTRACT(device_metadata, '$.network_info.network_latency')::DECIMAL) as avg_network_latency,
AVG(JSON_EXTRACT(device_metadata, '$.network_info.signal_strength')::DECIMAL) as avg_signal_strength,
-- Environmental context aggregations
AVG(JSON_EXTRACT(measurements, '$.related_measurements.environmental_factors.temperature')::DECIMAL) as avg_ambient_temp,
AVG(JSON_EXTRACT(measurements, '$.related_measurements.environmental_factors.humidity')::DECIMAL) as avg_ambient_humidity,
-- Operational context
MODE() WITHIN GROUP (ORDER BY JSON_EXTRACT(context, '$.operational_context.equipment_mode')::STRING) as primary_equipment_mode,
AVG(JSON_EXTRACT(context, '$.operational_context.production_rate')::DECIMAL) as avg_production_rate
FROM sensor_readings
WHERE
timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
AND JSON_EXTRACT(quality_metrics, '$.overall_score')::DECIMAL >= 0.5
GROUP BY
time_bucket, device_id, device_type, facility, zone, sensor_type
),
performance_analysis AS (
SELECT
rtsa.*,
-- Data quality assessment
ROUND((rtsa.reading_count - rtsa.low_quality_readings)::DECIMAL / rtsa.reading_count * 100, 2) as quality_percentage,
-- Anomaly rate calculations
ROUND((rtsa.statistical_anomalies + rtsa.temporal_anomalies + rtsa.contextual_anomalies)::DECIMAL / rtsa.reading_count * 100, 2) as total_anomaly_rate,
-- Statistical analysis
CASE
WHEN rtsa.avg_value != 0 THEN ROUND(rtsa.stddev_value / ABS(rtsa.avg_value), 4)
ELSE 0
END as coefficient_of_variation,
-- Data completeness analysis (expected vs actual readings)
ROUND(rtsa.reading_count / GREATEST(1, rtsa.time_span_seconds / 60) * 100, 1) as data_completeness_percent,
-- Performance classification
CASE
WHEN rtsa.avg_quality_score >= 0.95 AND rtsa.total_anomaly_rate <= 1 THEN 'excellent'
WHEN rtsa.avg_quality_score >= 0.85 AND rtsa.total_anomaly_rate <= 5 THEN 'good'
WHEN rtsa.avg_quality_score >= 0.70 AND rtsa.total_anomaly_rate <= 10 THEN 'acceptable'
ELSE 'poor'
END as performance_category,
-- Trend analysis
CASE
WHEN rtsa.coefficient_of_variation > 0.5 THEN 'highly_variable'
WHEN rtsa.coefficient_of_variation > 0.3 THEN 'variable'
WHEN rtsa.coefficient_of_variation > 0.1 THEN 'moderate'
ELSE 'stable'
END as stability_classification,
-- Alert conditions
CASE
WHEN rtsa.avg_quality_score < 0.7 THEN 'quality_alert'
WHEN rtsa.total_anomaly_rate > 15 THEN 'anomaly_alert'
WHEN rtsa.avg_processing_latency > 5000 THEN 'latency_alert'
WHEN rtsa.data_completeness_percent < 80 THEN 'data_gap_alert'
WHEN ABS(rtsa.avg_signal_strength) < -80 THEN 'connectivity_alert'
ELSE 'normal'
END as alert_status,
-- Operational efficiency indicators
CASE
WHEN rtsa.primary_equipment_mode = 'production' AND rtsa.avg_production_rate >= 95 THEN 'optimal_efficiency'
WHEN rtsa.primary_equipment_mode = 'production' AND rtsa.avg_production_rate >= 80 THEN 'good_efficiency'
WHEN rtsa.primary_equipment_mode = 'production' AND rtsa.avg_production_rate >= 60 THEN 'reduced_efficiency'
WHEN rtsa.primary_equipment_mode = 'maintenance' THEN 'maintenance_mode'
ELSE 'unknown_efficiency'
END as operational_efficiency,
-- Time-based patterns
EXTRACT(HOUR FROM rtsa.time_bucket) as hour_of_day,
EXTRACT(DOW FROM rtsa.time_bucket) as day_of_week
FROM real_time_sensor_analytics rtsa
),
device_health_assessment AS (
SELECT
pa.device_id,
pa.device_type,
pa.facility,
pa.zone,
pa.sensor_type,
-- Current status indicators
LAST_VALUE(pa.performance_category) OVER (
PARTITION BY pa.device_id
ORDER BY pa.time_bucket
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as current_performance_status,
LAST_VALUE(pa.alert_status) OVER (
PARTITION BY pa.device_id
ORDER BY pa.time_bucket
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as current_alert_status,
-- Performance trends over the analysis window
COUNT(*) as analysis_periods,
COUNT(*) FILTER (WHERE pa.performance_category IN ('excellent', 'good')) as good_periods,
COUNT(*) FILTER (WHERE pa.alert_status != 'normal') as alert_periods,
-- Average performance metrics
ROUND(AVG(pa.avg_quality_score), 3) as overall_avg_quality,
ROUND(AVG(pa.total_anomaly_rate), 2) as overall_anomaly_rate,
ROUND(AVG(pa.readings_per_minute), 2) as overall_data_rate,
ROUND(AVG(pa.avg_processing_latency), 0) as overall_processing_latency,
-- Stability and consistency
ROUND(AVG(pa.coefficient_of_variation), 4) as average_stability_index,
ROUND(AVG(pa.data_completeness_percent), 1) as average_data_completeness,
-- Network and connectivity
ROUND(AVG(pa.avg_network_latency), 0) as average_network_latency,
ROUND(AVG(pa.avg_signal_strength), 1) as average_signal_strength,
-- Environmental context
ROUND(AVG(pa.avg_ambient_temp), 1) as average_ambient_temperature,
ROUND(AVG(pa.avg_ambient_humidity), 1) as average_ambient_humidity,
-- Operational efficiency
MODE() WITHIN GROUP (ORDER BY pa.operational_efficiency) as predominant_efficiency_level,
-- Value statistics across all time periods
ROUND(AVG(pa.avg_value), 4) as overall_average_value,
ROUND(AVG(pa.stddev_value), 4) as overall_value_variability,
MIN(pa.min_value) as absolute_minimum_value,
MAX(pa.max_value) as absolute_maximum_value
FROM performance_analysis pa
GROUP BY
pa.device_id, pa.device_type, pa.facility, pa.zone, pa.sensor_type
)
-- Comprehensive IoT device health and performance report
SELECT
dha.device_id,
dha.device_type,
dha.sensor_type,
dha.facility,
dha.zone,
-- Current status
dha.current_performance_status,
dha.current_alert_status,
-- Performance summary
dha.overall_avg_quality as quality_score,
dha.overall_anomaly_rate as anomaly_rate_percent,
dha.overall_data_rate as readings_per_minute,
dha.overall_processing_latency as avg_latency_ms,
-- Reliability indicators
ROUND((dha.good_periods::DECIMAL / dha.analysis_periods) * 100, 1) as uptime_percentage,
ROUND((dha.alert_periods::DECIMAL / dha.analysis_periods) * 100, 1) as alert_percentage,
dha.average_data_completeness as data_completeness_percent,
-- Performance classification
CASE
WHEN dha.overall_avg_quality >= 0.9 AND dha.overall_anomaly_rate <= 2 AND dha.uptime_percentage >= 95 THEN 'optimal'
WHEN dha.overall_avg_quality >= 0.8 AND dha.overall_anomaly_rate <= 5 AND dha.uptime_percentage >= 90 THEN 'good'
WHEN dha.overall_avg_quality >= 0.6 AND dha.overall_anomaly_rate <= 10 AND dha.uptime_percentage >= 80 THEN 'acceptable'
WHEN dha.overall_avg_quality < 0.5 OR dha.overall_anomaly_rate > 20 THEN 'critical'
ELSE 'needs_attention'
END as device_health_classification,
-- Operational context
dha.predominant_efficiency_level,
dha.overall_average_value as typical_reading_value,
dha.overall_value_variability as measurement_stability,
-- Environmental factors
dha.average_ambient_temperature,
dha.average_ambient_humidity,
-- Connectivity and infrastructure
dha.average_network_latency as network_latency_ms,
dha.average_signal_strength as signal_strength_dbm,
-- Recommendations and next actions
CASE
WHEN dha.current_alert_status = 'quality_alert' THEN 'calibrate_sensor_immediate'
WHEN dha.current_alert_status = 'anomaly_alert' THEN 'investigate_anomaly_patterns'
WHEN dha.current_alert_status = 'latency_alert' THEN 'optimize_data_pipeline'
WHEN dha.current_alert_status = 'connectivity_alert' THEN 'check_network_infrastructure'
WHEN dha.current_alert_status = 'data_gap_alert' THEN 'verify_sensor_connectivity'
WHEN dha.overall_avg_quality < 0.8 THEN 'schedule_maintenance'
WHEN dha.overall_anomaly_rate > 10 THEN 'review_operating_conditions'
WHEN dha.uptime_percentage < 90 THEN 'improve_system_reliability'
ELSE 'continue_monitoring'
END as recommended_action,
-- Priority level for action
CASE
WHEN dha.device_health_classification = 'critical' THEN 'immediate'
WHEN dha.device_health_classification = 'needs_attention' THEN 'high'
WHEN dha.current_alert_status != 'normal' THEN 'medium'
WHEN dha.device_health_classification = 'acceptable' THEN 'low'
ELSE 'routine'
END as action_priority
FROM device_health_assessment dha
ORDER BY
CASE action_priority
WHEN 'immediate' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
WHEN 'low' THEN 4
ELSE 5
END,
dha.overall_anomaly_rate DESC,
dha.overall_avg_quality ASC;
-- Time series forecasting and predictive analytics
WITH historical_patterns AS (
SELECT
JSON_EXTRACT(device_metadata, '$.device_id') as device_id,
JSON_EXTRACT(sensor_info, '$.sensor_type') as sensor_type,
DATE_TRUNC('hour', timestamp) as hour_bucket,
AVG(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as hourly_avg_value,
COUNT(*) as readings_count,
MIN(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as hourly_min,
MAX(JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL) as hourly_max,
-- Time-based features for forecasting
EXTRACT(HOUR FROM timestamp) as hour_of_day,
EXTRACT(DOW FROM timestamp) as day_of_week,
EXTRACT(DAY FROM timestamp) as day_of_month,
-- Seasonal indicators
CASE
WHEN EXTRACT(MONTH FROM timestamp) IN (12, 1, 2) THEN 'winter'
WHEN EXTRACT(MONTH FROM timestamp) IN (3, 4, 5) THEN 'spring'
WHEN EXTRACT(MONTH FROM timestamp) IN (6, 7, 8) THEN 'summer'
ELSE 'autumn'
END as season,
-- Operational context features
MODE() WITHIN GROUP (ORDER BY JSON_EXTRACT(context, '$.operational_context.equipment_mode')) as predominant_mode
FROM sensor_readings
WHERE
timestamp >= CURRENT_TIMESTAMP - INTERVAL '30 days'
AND JSON_EXTRACT(quality_metrics, '$.overall_score')::DECIMAL >= 0.8
GROUP BY
device_id, sensor_type, hour_bucket, hour_of_day, day_of_week, day_of_month, season
),
trend_analysis AS (
SELECT
hp.*,
-- Moving averages for trend analysis
AVG(hp.hourly_avg_value) OVER (
PARTITION BY hp.device_id, hp.sensor_type
ORDER BY hp.hour_bucket
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
) as moving_avg_24h,
AVG(hp.hourly_avg_value) OVER (
PARTITION BY hp.device_id, hp.sensor_type
ORDER BY hp.hour_bucket
ROWS BETWEEN 167 PRECEDING AND CURRENT ROW -- 7 days * 24 hours
) as moving_avg_7d,
-- Lag values for change detection
LAG(hp.hourly_avg_value, 1) OVER (
PARTITION BY hp.device_id, hp.sensor_type
ORDER BY hp.hour_bucket
) as prev_hour_value,
LAG(hp.hourly_avg_value, 24) OVER (
PARTITION BY hp.device_id, hp.sensor_type
ORDER BY hp.hour_bucket
) as prev_day_same_hour_value,
LAG(hp.hourly_avg_value, 168) OVER (
PARTITION BY hp.device_id, hp.sensor_type
ORDER BY hp.hour_bucket
) as prev_week_same_hour_value,
-- Seasonal comparison
AVG(hp.hourly_avg_value) OVER (
PARTITION BY hp.device_id, hp.sensor_type, hp.hour_of_day, hp.day_of_week
ORDER BY hp.hour_bucket
ROWS BETWEEN 672 PRECEDING AND 672 PRECEDING -- 4 weeks ago, same hour/day
) as seasonal_baseline
FROM historical_patterns hp
),
predictive_indicators AS (
SELECT
ta.*,
-- Change calculations
COALESCE(ta.hourly_avg_value - ta.prev_hour_value, 0) as hourly_change,
COALESCE(ta.hourly_avg_value - ta.prev_day_same_hour_value, 0) as daily_change,
COALESCE(ta.hourly_avg_value - ta.prev_week_same_hour_value, 0) as weekly_change,
COALESCE(ta.hourly_avg_value - ta.seasonal_baseline, 0) as seasonal_deviation,
-- Trend direction indicators
CASE
WHEN ta.hourly_avg_value > ta.moving_avg_24h * 1.05 THEN 'upward'
WHEN ta.hourly_avg_value < ta.moving_avg_24h * 0.95 THEN 'downward'
ELSE 'stable'
END as short_term_trend,
CASE
WHEN ta.moving_avg_24h > ta.moving_avg_7d * 1.02 THEN 'increasing'
WHEN ta.moving_avg_24h < ta.moving_avg_7d * 0.98 THEN 'decreasing'
ELSE 'steady'
END as long_term_trend,
-- Volatility measures
ABS(ta.hourly_avg_value - ta.moving_avg_24h) / NULLIF(ta.moving_avg_24h, 0) as relative_volatility,
-- Anomaly scoring
CASE
WHEN ABS(ta.hourly_avg_value - ta.seasonal_baseline) > (ta.moving_avg_7d * 0.3) THEN 'high_anomaly'
WHEN ABS(ta.hourly_avg_value - ta.seasonal_baseline) > (ta.moving_avg_7d * 0.15) THEN 'moderate_anomaly'
WHEN ABS(ta.hourly_avg_value - ta.seasonal_baseline) > (ta.moving_avg_7d * 0.05) THEN 'low_anomaly'
ELSE 'normal'
END as anomaly_level,
-- Predictive risk assessment
CASE
WHEN ta.short_term_trend = 'upward' AND ta.long_term_trend = 'increasing' AND ta.relative_volatility > 0.2 THEN 'high_risk'
WHEN ta.short_term_trend = 'downward' AND ta.long_term_trend = 'decreasing' AND ta.relative_volatility > 0.15 THEN 'high_risk'
WHEN ta.relative_volatility > 0.25 THEN 'moderate_risk'
WHEN ta.anomaly_level IN ('high_anomaly', 'moderate_anomaly') THEN 'moderate_risk'
ELSE 'low_risk'
END as predictive_risk_level
FROM trend_analysis ta
WHERE ta.hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '7 days'
)
-- Predictive analytics and forecasting results
SELECT
pi.device_id,
pi.sensor_type,
pi.hour_bucket,
-- Current values and trends
ROUND(pi.hourly_avg_value, 4) as current_value,
ROUND(pi.moving_avg_24h, 4) as trend_24h,
ROUND(pi.moving_avg_7d, 4) as trend_7d,
-- Change analysis
ROUND(pi.hourly_change, 4) as hour_to_hour_change,
ROUND(pi.daily_change, 4) as day_to_day_change,
ROUND(pi.weekly_change, 4) as week_to_week_change,
ROUND(pi.seasonal_deviation, 4) as seasonal_variance,
-- Trend classification
pi.short_term_trend,
pi.long_term_trend,
pi.anomaly_level,
pi.predictive_risk_level,
-- Risk indicators
ROUND(pi.relative_volatility * 100, 2) as volatility_percent,
-- Simple linear forecast (next hour prediction)
ROUND(
pi.hourly_avg_value +
(COALESCE(pi.hourly_change, 0) * 0.7) +
(COALESCE(pi.daily_change, 0) * 0.2) +
(COALESCE(pi.weekly_change, 0) * 0.1),
4
) as predicted_next_hour_value,
-- Confidence level for prediction
CASE
WHEN pi.relative_volatility < 0.05 AND pi.anomaly_level = 'normal' THEN 'high'
WHEN pi.relative_volatility < 0.15 AND pi.anomaly_level IN ('normal', 'low_anomaly') THEN 'medium'
WHEN pi.relative_volatility < 0.30 THEN 'low'
ELSE 'very_low'
END as prediction_confidence,
-- Maintenance and operational recommendations
CASE
WHEN pi.predictive_risk_level = 'high_risk' THEN 'schedule_immediate_inspection'
WHEN pi.anomaly_level = 'high_anomaly' THEN 'investigate_root_cause'
WHEN pi.long_term_trend = 'decreasing' AND pi.sensor_type = 'efficiency' THEN 'schedule_maintenance'
WHEN pi.relative_volatility > 0.2 THEN 'check_sensor_calibration'
WHEN pi.short_term_trend != pi.long_term_trend THEN 'monitor_closely'
ELSE 'continue_routine_monitoring'
END as maintenance_recommendation
FROM predictive_indicators pi
WHERE pi.hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY
CASE pi.predictive_risk_level
WHEN 'high_risk' THEN 1
WHEN 'moderate_risk' THEN 2
ELSE 3
END,
pi.relative_volatility DESC,
pi.device_id,
pi.hour_bucket DESC;
-- Real-time alerting and notification system
WITH real_time_monitoring AS (
SELECT
JSON_EXTRACT(device_metadata, '$.device_id') as device_id,
JSON_EXTRACT(device_metadata, '$.device_type') as device_type,
JSON_EXTRACT(device_metadata, '$.location.facility') as facility,
JSON_EXTRACT(sensor_info, '$.sensor_type') as sensor_type,
timestamp,
JSON_EXTRACT(measurements, '$.primary_value')::DECIMAL as current_value,
JSON_EXTRACT(quality_metrics, '$.overall_score')::DECIMAL as quality_score,
-- Alert thresholds from configuration
JSON_EXTRACT(context, '$.alert_configuration.threshold_settings.critical_high')::DECIMAL as critical_high_threshold,
JSON_EXTRACT(context, '$.alert_configuration.threshold_settings.critical_low')::DECIMAL as critical_low_threshold,
JSON_EXTRACT(context, '$.alert_configuration.threshold_settings.warning_high')::DECIMAL as warning_high_threshold,
JSON_EXTRACT(context, '$.alert_configuration.threshold_settings.warning_low')::DECIMAL as warning_low_threshold,
-- Quality thresholds
JSON_EXTRACT(context, '$.alert_configuration.threshold_settings.min_quality_score')::DECIMAL as min_quality_threshold,
-- Anomaly flags
JSON_EXTRACT(quality_metrics, '$.anomaly_indicators.statistical_anomaly')::BOOLEAN as statistical_anomaly,
JSON_EXTRACT(quality_metrics, '$.anomaly_indicators.temporal_anomaly')::BOOLEAN as temporal_anomaly,
JSON_EXTRACT(quality_metrics, '$.anomaly_indicators.contextual_anomaly')::BOOLEAN as contextual_anomaly,
-- Processing performance
JSON_EXTRACT(processing_metadata, '$.processing_latency_ms')::DECIMAL as processing_latency,
JSON_EXTRACT(device_metadata, '$.network_info.network_latency')::DECIMAL as network_latency
FROM sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
),
alert_evaluation AS (
SELECT
rtm.*,
-- Value-based alerts
CASE
WHEN rtm.current_value >= rtm.critical_high_threshold THEN 'critical_high_value'
WHEN rtm.current_value <= rtm.critical_low_threshold THEN 'critical_low_value'
WHEN rtm.current_value >= rtm.warning_high_threshold THEN 'warning_high_value'
WHEN rtm.current_value <= rtm.warning_low_threshold THEN 'warning_low_value'
ELSE null
END as value_alert_type,
-- Quality-based alerts
CASE
WHEN rtm.quality_score < rtm.min_quality_threshold THEN 'quality_degradation'
ELSE null
END as quality_alert_type,
-- Anomaly-based alerts
CASE
WHEN rtm.statistical_anomaly = true THEN 'statistical_anomaly_detected'
WHEN rtm.temporal_anomaly = true THEN 'temporal_pattern_anomaly'
WHEN rtm.contextual_anomaly = true THEN 'contextual_anomaly_detected'
ELSE null
END as anomaly_alert_type,
-- Performance-based alerts
CASE
WHEN rtm.processing_latency > 5000 THEN 'high_processing_latency'
WHEN rtm.network_latency > 2000 THEN 'high_network_latency'
ELSE null
END as performance_alert_type,
-- Severity calculation
CASE
WHEN rtm.current_value >= rtm.critical_high_threshold OR rtm.current_value <= rtm.critical_low_threshold THEN 'critical'
WHEN rtm.quality_score < (rtm.min_quality_threshold * 0.7) THEN 'critical'
WHEN rtm.statistical_anomaly = true OR rtm.temporal_anomaly = true THEN 'high'
WHEN rtm.current_value >= rtm.warning_high_threshold OR rtm.current_value <= rtm.warning_low_threshold THEN 'medium'
WHEN rtm.quality_score < rtm.min_quality_threshold THEN 'medium'
WHEN rtm.contextual_anomaly = true OR rtm.processing_latency > 5000 THEN 'low'
ELSE null
END as alert_severity
FROM real_time_monitoring rtm
),
active_alerts AS (
SELECT
ae.device_id,
ae.device_type,
ae.facility,
ae.sensor_type,
ae.timestamp as alert_timestamp,
ae.current_value,
ae.quality_score,
-- Consolidate all alert types
COALESCE(ae.value_alert_type, ae.quality_alert_type, ae.anomaly_alert_type, ae.performance_alert_type) as alert_type,
ae.alert_severity,
-- Alert context
JSON_OBJECT(
'current_reading', ae.current_value,
'quality_score', ae.quality_score,
'thresholds', JSON_OBJECT(
'critical_high', ae.critical_high_threshold,
'critical_low', ae.critical_low_threshold,
'warning_high', ae.warning_high_threshold,
'warning_low', ae.warning_low_threshold
),
'anomaly_indicators', JSON_OBJECT(
'statistical_anomaly', ae.statistical_anomaly,
'temporal_anomaly', ae.temporal_anomaly,
'contextual_anomaly', ae.contextual_anomaly
),
'performance_metrics', JSON_OBJECT(
'processing_latency_ms', ae.processing_latency,
'network_latency_ms', ae.network_latency
)
) as alert_context,
-- Notification urgency
CASE
WHEN ae.alert_severity = 'critical' THEN 'immediate'
WHEN ae.alert_severity = 'high' THEN 'within_15_minutes'
WHEN ae.alert_severity = 'medium' THEN 'within_1_hour'
ELSE 'next_business_day'
END as notification_urgency,
-- Recommended actions
CASE
WHEN ae.value_alert_type IN ('critical_high_value', 'critical_low_value') THEN 'emergency_shutdown_consider'
WHEN ae.quality_alert_type = 'quality_degradation' THEN 'sensor_maintenance_required'
WHEN ae.anomaly_alert_type IN ('statistical_anomaly_detected', 'temporal_pattern_anomaly') THEN 'investigate_anomaly_cause'
WHEN ae.performance_alert_type = 'high_processing_latency' THEN 'check_system_resources'
WHEN ae.performance_alert_type = 'high_network_latency' THEN 'check_network_connectivity'
ELSE 'standard_investigation'
END as recommended_action
FROM alert_evaluation ae
WHERE ae.alert_severity IS NOT NULL
)
-- Active alerts requiring immediate attention
SELECT
aa.alert_timestamp,
aa.device_id,
aa.device_type,
aa.sensor_type,
aa.facility,
-- Alert details
aa.alert_type,
aa.alert_severity,
aa.notification_urgency,
aa.recommended_action,
-- Current status
aa.current_value as current_reading,
aa.quality_score as current_quality,
-- Alert context for operators
aa.alert_context,
-- Time since alert
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - aa.alert_timestamp)) / 60 as minutes_since_alert,
-- Business impact assessment
CASE
WHEN aa.alert_severity = 'critical' AND aa.device_type = 'safety_system' THEN 'safety_risk'
WHEN aa.alert_severity = 'critical' AND aa.device_type = 'production_equipment' THEN 'production_impact'
WHEN aa.alert_severity IN ('critical', 'high') AND aa.sensor_type = 'environmental' THEN 'compliance_risk'
WHEN aa.alert_severity IN ('critical', 'high') THEN 'operational_impact'
ELSE 'monitoring_required'
END as business_impact_level,
-- Next steps for operators
JSON_OBJECT(
'immediate_action', aa.recommended_action,
'escalation_required',
CASE aa.alert_severity
WHEN 'critical' THEN true
ELSE false
END,
'estimated_resolution_time',
CASE aa.alert_type
WHEN 'quality_degradation' THEN '30-60 minutes'
WHEN 'statistical_anomaly_detected' THEN '1-4 hours'
WHEN 'critical_high_value' THEN '15-30 minutes'
WHEN 'critical_low_value' THEN '15-30 minutes'
ELSE '1-2 hours'
END,
'required_expertise',
CASE aa.alert_type
WHEN 'quality_degradation' THEN 'maintenance_technician'
WHEN 'statistical_anomaly_detected' THEN 'process_engineer'
WHEN 'high_processing_latency' THEN 'it_support'
WHEN 'high_network_latency' THEN 'network_administrator'
ELSE 'operations_supervisor'
END
) as operational_guidance
FROM active_alerts aa
ORDER BY
CASE aa.alert_severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
ELSE 4
END,
aa.alert_timestamp DESC;
-- QueryLeaf provides comprehensive IoT time series capabilities:
-- 1. SQL-familiar time series collection creation and optimization
-- 2. High-performance IoT data ingestion with automatic bucketing
-- 3. Real-time analytics and aggregation for sensor data
-- 4. Predictive analytics and trend analysis
-- 5. Comprehensive anomaly detection and alerting
-- 6. Performance monitoring and health assessment
-- 7. Integration with MongoDB's native time series optimizations
-- 8. Enterprise-ready IoT data management with familiar SQL syntax
-- 9. Automatic data lifecycle management and archiving
-- 10. Production-ready scalability for high-volume IoT workloads
Best Practices for Time Series Implementation
IoT Data Architecture and Performance Optimization
Essential principles for effective MongoDB Time Series deployment in IoT environments:
- Collection Design: Create purpose-built time series collections with optimal bucketing strategies for different sensor types and data frequencies
- Metadata Strategy: Design comprehensive metadata schemas that enable efficient filtering and provide rich context for analytics
- Ingestion Optimization: Implement batch ingestion patterns and write concern configurations optimized for IoT write workloads
- Query Patterns: Design aggregation pipelines that leverage time series optimizations for common IoT analytics patterns
- Real-Time Processing: Implement change streams and real-time processing pipelines for immediate anomaly detection and alerting
- Data Lifecycle: Establish automated data retention and archiving strategies to manage long-term storage costs
Production IoT Systems and Operational Excellence
Design time series systems for enterprise IoT requirements:
- Scalable Architecture: Implement horizontally scalable time series infrastructure with proper sharding and distribution strategies
- Performance Monitoring: Establish comprehensive monitoring for write performance, query latency, and storage utilization
- Alert Management: Create intelligent alerting systems that reduce noise while ensuring critical issues are detected promptly
- Edge Integration: Design systems that work efficiently with edge computing environments and intermittent connectivity
- Security Implementation: Implement device authentication, data encryption, and access controls appropriate for IoT environments
- Compliance Features: Build in data governance, audit trails, and regulatory compliance capabilities for industrial applications
Conclusion
MongoDB Time Series Collections provide comprehensive IoT data management capabilities that eliminate the complexity of traditional time-based partitioning and manual optimization through automatic bucketing, intelligent compression, and purpose-built query optimization. The native support for high-volume writes, real-time aggregations, and time-based analytics makes Time Series Collections ideal for modern IoT applications requiring both scale and performance.
Key Time Series Collections benefits include:
- Automatic Optimization: Native bucketing and compression eliminate manual partitioning and maintenance overhead
- High-Performance Writes: Optimized storage engine designed for high-volume, time-stamped data ingestion
- Intelligent Compression: Automatic compression reduces storage costs by up to 90% compared to traditional approaches
- Real-Time Analytics: Built-in aggregation optimization for time-based queries and real-time processing
- Flexible Data Models: Rich document structure accommodates complex IoT metadata alongside time series measurements
- SQL Accessibility: Familiar SQL-style time series operations through QueryLeaf for accessible IoT data management
Whether you're building industrial monitoring systems, smart city infrastructure, environmental sensors, or enterprise IoT platforms, MongoDB Time Series Collections with QueryLeaf's familiar SQL interface provides the foundation for scalable, efficient IoT data management.
QueryLeaf Integration: QueryLeaf seamlessly manages MongoDB Time Series Collections while providing SQL-familiar syntax for time series data operations, real-time analytics, and IoT-specific query patterns. Advanced time series capabilities including automatic bucketing, predictive analytics, and enterprise alerting are elegantly handled through familiar SQL constructs, making sophisticated IoT data management both powerful and accessible to SQL-oriented development teams.
The combination of MongoDB's robust time series capabilities with SQL-style data operations makes it an ideal platform for applications requiring both high-performance IoT data storage and familiar database interaction patterns, ensuring your time series infrastructure can scale efficiently while maintaining operational simplicity and developer productivity.