MongoDB Time-Series Data Management and IoT Analytics: Advanced Optimization Patterns for Real-Time Sensor Data Processing
Internet of Things (IoT) applications generate massive volumes of time-stamped sensor data that require specialized database architectures capable of handling high-velocity data ingestion, efficient storage optimization, and real-time analytical processing. Traditional relational databases struggle with the unique characteristics of time-series data: irregular intervals, high write throughput, temporal query patterns, and the need for real-time aggregation across millions of data points from distributed sensors and devices.
MongoDB's time-series collections provide purpose-built capabilities for IoT data management that dramatically improve storage efficiency, query performance, and analytical processing through automatic data bucketing, intelligent compression, and optimized indexing strategies. Unlike traditional databases that treat time-series data as regular tables, MongoDB's native time-series architecture is specifically designed for temporal workloads with built-in optimization for IoT use cases, sensor data patterns, and real-time analytics requirements.
The Traditional IoT Data Management Challenge
Conventional relational database approaches for IoT time-series data face significant performance and scalability limitations:
-- Traditional PostgreSQL approach for IoT sensor data - inefficient and problematic
-- Typical IoT sensor data table with poor time-series optimization
CREATE TABLE sensor_readings (
reading_id BIGSERIAL PRIMARY KEY,
device_id VARCHAR(100) NOT NULL,
sensor_type VARCHAR(50) NOT NULL,
location VARCHAR(200) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
-- Multiple sensor measurements (inefficient normalized structure)
temperature DECIMAL(5,2),
humidity DECIMAL(5,2),
pressure DECIMAL(7,2),
voltage DECIMAL(4,2),
current DECIMAL(6,3),
power_consumption DECIMAL(8,2),
-- Device metadata (repetitive storage)
device_model VARCHAR(100),
firmware_version VARCHAR(50),
installation_date DATE,
-- Location data (redundant storage)
building_id VARCHAR(50),
floor_number INTEGER,
room_number VARCHAR(20),
latitude DECIMAL(10,8),
longitude DECIMAL(11,8),
-- Quality and status indicators
signal_strength INTEGER,
battery_level DECIMAL(5,2),
reading_quality VARCHAR(20),
data_source VARCHAR(50),
-- Audit and tracking fields
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP,
validation_status VARCHAR(20) DEFAULT 'pending'
);
-- Attempt at optimization with basic indexes (insufficient for time-series workloads)
CREATE INDEX sensor_readings_device_time_idx ON sensor_readings (device_id, timestamp DESC);
CREATE INDEX sensor_readings_timestamp_idx ON sensor_readings (timestamp DESC);
CREATE INDEX sensor_readings_sensor_type_idx ON sensor_readings (sensor_type, timestamp DESC);
CREATE INDEX sensor_readings_location_idx ON sensor_readings (building_id, floor_number, timestamp DESC);
-- Example of problematic IoT data insertion (individual row inserts are slow)
INSERT INTO sensor_readings (
device_id, sensor_type, location, timestamp,
temperature, humidity, pressure, voltage, current, power_consumption,
device_model, firmware_version, installation_date,
building_id, floor_number, room_number, latitude, longitude,
signal_strength, battery_level, reading_quality, data_source
) VALUES (
'TEMP_SENSOR_001_FLOOR_1_ROOM_A',
'Environmental Sensor',
'Building A - Floor 1 - Room A - North Wall',
CURRENT_TIMESTAMP,
22.5, 65.3, 1013.25, 3.3, 0.125, 0.4125,
'SensorTech ST-2000 Environmental Monitor',
'v2.1.5',
'2024-01-15',
'BUILDING_A_MAIN_CAMPUS',
1,
'A101',
40.7128,
-74.0060,
85,
94.2,
'excellent',
'real_time_telemetry'
);
-- This approach requires individual inserts for each sensor reading:
-- - Device TEMP_SENSOR_001 sends reading every 30 seconds = 2,880 inserts/day
-- - Device HUMIDITY_SENSOR_002 sends reading every 60 seconds = 1,440 inserts/day
-- - Device PRESSURE_SENSOR_003 sends reading every 15 seconds = 5,760 inserts/day
-- - For 1,000 devices = millions of individual INSERT operations per day
-- Example of inefficient time-series queries
-- Get hourly averages for temperature sensors (slow aggregation)
SELECT
device_id,
DATE_TRUNC('hour', timestamp) as hour_bucket,
-- Basic aggregations (computationally expensive)
AVG(temperature) as avg_temperature,
MIN(temperature) as min_temperature,
MAX(temperature) as max_temperature,
STDDEV(temperature) as temperature_variance,
-- Count and quality metrics
COUNT(*) as reading_count,
COUNT(*) FILTER (WHERE reading_quality = 'excellent') as excellent_readings,
AVG(signal_strength) as avg_signal_strength,
-- Power consumption analysis
AVG(power_consumption) as avg_power,
SUM(power_consumption) as total_power
FROM sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND sensor_type = 'Environmental Sensor'
AND temperature IS NOT NULL
GROUP BY device_id, DATE_TRUNC('hour', timestamp)
ORDER BY device_id, hour_bucket DESC;
-- Problems with traditional time-series queries:
-- 1. Full table scans despite indexes when dealing with large time ranges
-- 2. Expensive GROUP BY operations on timestamp truncation
-- 3. Multiple aggregation calculations are computationally intensive
-- 4. Poor performance with concurrent analytical queries
-- 5. No pre-aggregated rollups or materialized views for common patterns
-- 6. Inefficient storage with repeated device metadata in every row
-- Attempting to get device performance trends (extremely slow)
WITH daily_device_metrics AS (
SELECT
device_id,
DATE_TRUNC('day', timestamp) as date_bucket,
-- Performance indicators
AVG(temperature) as avg_temp,
AVG(humidity) as avg_humidity,
AVG(pressure) as avg_pressure,
AVG(power_consumption) as avg_power,
AVG(battery_level) as avg_battery,
AVG(signal_strength) as avg_signal,
-- Operational metrics
COUNT(*) as total_readings,
COUNT(*) FILTER (WHERE reading_quality = 'poor') as poor_quality_readings,
-- Calculate uptime percentage
(COUNT(*)::DECIMAL / (24 * 60 * 2)) * 100 as uptime_percentage, -- Assuming 30-second intervals
-- Data quality assessment
(COUNT(*) FILTER (WHERE reading_quality IN ('excellent', 'good'))::DECIMAL / COUNT(*)) * 100 as quality_percentage
FROM sensor_readings
WHERE timestamp >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY device_id, DATE_TRUNC('day', timestamp)
),
device_trend_analysis AS (
SELECT
device_id,
-- Performance trends using window functions (very expensive)
AVG(avg_temp) as overall_avg_temp,
STDDEV(avg_temp) as temp_stability,
-- Power consumption trends
AVG(avg_power) as overall_avg_power,
-- Calculate linear regression slope for temperature trend
REGR_SLOPE(avg_temp, EXTRACT(EPOCH FROM date_bucket)) as temp_trend_slope,
REGR_R2(avg_temp, EXTRACT(EPOCH FROM date_bucket)) as temp_trend_correlation,
-- Battery degradation analysis
REGR_SLOPE(avg_battery, EXTRACT(EPOCH FROM date_bucket)) as battery_degradation_slope,
-- Reliability metrics
AVG(uptime_percentage) as avg_uptime,
MIN(quality_percentage) as min_quality_percentage,
-- Trend classification
CASE
WHEN REGR_SLOPE(avg_temp, EXTRACT(EPOCH FROM date_bucket)) > 0.01 THEN 'temperature_increasing'
WHEN REGR_SLOPE(avg_temp, EXTRACT(EPOCH FROM date_bucket)) < -0.01 THEN 'temperature_decreasing'
ELSE 'temperature_stable'
END as temperature_trend,
CASE
WHEN REGR_SLOPE(avg_battery, EXTRACT(EPOCH FROM date_bucket)) < -0.1 THEN 'battery_degrading'
WHEN REGR_SLOPE(avg_battery, EXTRACT(EPOCH FROM date_bucket)) > 0.1 THEN 'battery_improving'
ELSE 'battery_stable'
END as battery_trend
FROM daily_device_metrics
GROUP BY device_id
)
-- This query would take minutes or hours on large datasets
SELECT
device_id,
overall_avg_temp,
temp_stability,
overall_avg_power,
temp_trend_slope,
battery_degradation_slope,
avg_uptime,
temperature_trend,
battery_trend,
-- Device health scoring
CASE
WHEN avg_uptime > 95 AND min_quality_percentage > 90 AND battery_degradation_slope > -0.05 THEN 'excellent'
WHEN avg_uptime > 90 AND min_quality_percentage > 80 AND battery_degradation_slope > -0.1 THEN 'good'
WHEN avg_uptime > 80 AND min_quality_percentage > 70 THEN 'fair'
ELSE 'poor'
END as device_health_grade
FROM device_trend_analysis
ORDER BY overall_avg_power DESC;
-- Storage analysis showing massive inefficiency
SELECT
schemaname,
tablename,
-- Table size metrics showing bloat
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) as table_size,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename) - pg_relation_size(schemaname||'.'||tablename)) as indexes_size,
-- Row statistics
n_tup_ins as total_rows,
CASE
WHEN n_tup_ins > 0 THEN
pg_relation_size(schemaname||'.'||tablename) / n_tup_ins
ELSE 0
END as avg_row_size_bytes,
-- Storage efficiency (poor due to repetitive data)
CASE
WHEN n_tup_ins > 0 THEN
ROUND((pg_relation_size(schemaname||'.'||tablename)::DECIMAL / 1024 / 1024 / 1024), 2)
ELSE 0.0
END as storage_size_gb
FROM pg_stat_user_tables
WHERE tablename = 'sensor_readings'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- Traditional approach problems:
-- 1. Massive storage bloat due to repetitive device metadata in every row
-- 2. Poor compression due to mixed data types and variable-length fields
-- 3. Inefficient indexing strategies that don't leverage time-series patterns
-- 4. Slow aggregation queries that require full scans and expensive calculations
-- 5. No built-in support for time-based data retention and archival
-- 6. Complex partitioning strategies required for reasonable performance
-- 7. Poor concurrent read/write performance for mixed analytical and operational workloads
-- 8. Difficult real-time analytics due to lack of incremental aggregation support
-- 9. No specialized compression for time-series data patterns
-- 10. Manual optimization required for downsample and rollup operations
-- PostgreSQL time partitioning attempt (complex and limited)
-- Manual partitioning by month (operational overhead)
CREATE TABLE sensor_readings_2024_12 PARTITION OF sensor_readings
FOR VALUES FROM ('2024-12-01') TO ('2025-01-01');
CREATE TABLE sensor_readings_2024_11 PARTITION OF sensor_readings
FOR VALUES FROM ('2024-11-01') TO ('2024-12-01');
-- Even with partitioning:
-- 1. Manual partition management overhead
-- 2. Limited optimization for time-series specific operations
-- 3. No automatic data retention and archival policies
-- 4. Complex cross-partition analytics and aggregation
-- 5. Poor storage compression compared to purpose-built time-series systems
MongoDB provides powerful time-series collection capabilities designed specifically for IoT data management:
// MongoDB Advanced Time-Series Data Management for IoT Analytics
const { MongoClient, ObjectId } = require('mongodb');
// Comprehensive IoT Time-Series Data Management System
class MongoDBIoTTimeSeriesManager {
constructor(connectionString, config = {}) {
this.client = new MongoClient(connectionString);
this.db = null;
this.timeSeriesCollections = new Map();
// Advanced time-series configuration for IoT workloads
this.config = {
// Time-series collection optimization
timeSeries: {
metaField: 'device', // Device metadata field
timeField: 'timestamp', // Time field
granularity: 'seconds', // Data granularity: seconds, minutes, hours
enableOptimization: true,
enableCompression: true
},
// IoT-specific optimization settings
iotOptimization: {
enableDeviceBatching: config.enableDeviceBatching !== false,
enableSensorGrouping: config.enableSensorGrouping !== false,
enableDataValidation: config.enableDataValidation !== false,
enableRealTimeAggregation: config.enableRealTimeAggregation !== false,
enableAnomalyDetection: config.enableAnomalyDetection || false
},
// Data retention and archival
dataRetention: {
enableAutoArchival: config.enableAutoArchival || false,
hotDataRetentionDays: config.hotDataRetentionDays || 30,
warmDataRetentionDays: config.warmDataRetentionDays || 90,
coldDataRetentionDays: config.coldDataRetentionDays || 365,
enableCompression: config.enableRetentionCompression !== false
},
// Performance and monitoring
performance: {
enableMetrics: config.enableMetrics !== false,
enableAlerts: config.enableAlerts || false,
enablePerformanceOptimization: config.enablePerformanceOptimization !== false,
maxConcurrentReads: config.maxConcurrentReads || 10,
maxConcurrentWrites: config.maxConcurrentWrites || 5
},
// Real-time analytics
analytics: {
enableRealTimeAggregation: config.enableRealTimeAggregation !== false,
enableTrendAnalysis: config.enableTrendAnalysis || false,
enablePredictiveAnalytics: config.enablePredictiveAnalytics || false,
aggregationWindowSizes: config.aggregationWindowSizes || ['1m', '5m', '15m', '1h', '1d']
}
};
this.deviceRegistry = new Map();
this.metricsCollector = null;
this.alertManager = null;
}
async initialize() {
console.log('Initializing MongoDB IoT Time-Series management system...');
try {
await this.client.connect();
this.db = this.client.db('iot_time_series_platform');
// Setup time-series collections for different sensor types
await this.setupTimeSeriesCollections();
// Initialize device registry and metadata management
await this.setupDeviceManagement();
// Setup real-time aggregation pipelines
await this.setupRealTimeAggregation();
// Initialize monitoring and alerting
await this.setupMonitoringAndAlerting();
console.log('IoT Time-Series system initialized successfully');
} catch (error) {
console.error('Error initializing IoT Time-Series system:', error);
throw error;
}
}
async setupTimeSeriesCollections() {
console.log('Setting up optimized time-series collections for IoT data...');
try {
// Environmental sensor data collection
await this.createTimeSeriesCollection('environmental_sensors', {
metaField: 'device',
timeField: 'timestamp',
granularity: 'seconds'
});
// Power monitoring collection
await this.createTimeSeriesCollection('power_monitoring', {
metaField: 'device',
timeField: 'timestamp',
granularity: 'seconds'
});
// Network performance collection
await this.createTimeSeriesCollection('network_performance', {
metaField: 'device',
timeField: 'timestamp',
granularity: 'seconds'
});
// Device health and diagnostics collection
await this.createTimeSeriesCollection('device_diagnostics', {
metaField: 'device',
timeField: 'timestamp',
granularity: 'minutes'
});
// Security events collection
await this.createTimeSeriesCollection('security_events', {
metaField: 'device',
timeField: 'timestamp',
granularity: 'seconds'
});
} catch (error) {
console.error('Error setting up time-series collections:', error);
throw error;
}
}
async createTimeSeriesCollection(collectionName, timeSeriesOptions) {
console.log(`Creating optimized time-series collection: ${collectionName}`);
try {
// Create time-series collection with MongoDB native optimization
await this.db.createCollection(collectionName, {
timeseries: {
timeField: timeSeriesOptions.timeField,
metaField: timeSeriesOptions.metaField,
granularity: timeSeriesOptions.granularity
},
// Enable compression for storage efficiency
storageEngine: {
wiredTiger: {
configString: 'block_compressor=zstd'
}
}
});
const collection = this.db.collection(collectionName);
// Create optimized indexes for time-series access patterns
await collection.createIndex({
[timeSeriesOptions.metaField + '.device_id']: 1,
[timeSeriesOptions.timeField]: -1
});
await collection.createIndex({
[timeSeriesOptions.metaField + '.location']: 1,
[timeSeriesOptions.timeField]: -1
});
await collection.createIndex({
[timeSeriesOptions.metaField + '.sensor_type']: 1,
[timeSeriesOptions.timeField]: -1
});
// Store collection reference
this.timeSeriesCollections.set(collectionName, {
collection: collection,
options: timeSeriesOptions
});
console.log(`Time-series collection ${collectionName} created successfully`);
} catch (error) {
console.error(`Error creating time-series collection ${collectionName}:`, error);
throw error;
}
}
async setupDeviceManagement() {
console.log('Setting up device registry and metadata management...');
const deviceRegistry = this.db.collection('device_registry');
// Create indexes for device management
await deviceRegistry.createIndex({ device_id: 1 }, { unique: true });
await deviceRegistry.createIndex({ location: 1 });
await deviceRegistry.createIndex({ sensor_type: 1 });
await deviceRegistry.createIndex({ status: 1, last_seen: -1 });
// Create device groups collection for hierarchical organization
const deviceGroups = this.db.collection('device_groups');
await deviceGroups.createIndex({ group_id: 1 }, { unique: true });
await deviceGroups.createIndex({ parent_group: 1 });
}
async registerIoTDevice(deviceData) {
console.log(`Registering IoT device: ${deviceData.device_id}`);
try {
const deviceRegistry = this.db.collection('device_registry');
// Check for existing device
const existingDevice = await deviceRegistry.findOne({ device_id: deviceData.device_id });
if (existingDevice) {
throw new Error(`Device already registered: ${deviceData.device_id}`);
}
const deviceRegistration = {
device_id: deviceData.device_id,
device_name: deviceData.device_name,
sensor_type: deviceData.sensor_type,
// Location and installation information
location: {
building: deviceData.building,
floor: deviceData.floor,
room: deviceData.room,
coordinates: {
latitude: deviceData.latitude || null,
longitude: deviceData.longitude || null
},
zone: deviceData.zone || null
},
// Technical specifications
specifications: {
model: deviceData.model,
manufacturer: deviceData.manufacturer,
firmware_version: deviceData.firmware_version,
hardware_revision: deviceData.hardware_revision,
communication_protocol: deviceData.communication_protocol || 'mqtt',
power_source: deviceData.power_source || 'battery',
sampling_rate: deviceData.sampling_rate || 60 // seconds
},
// Operational configuration
configuration: {
data_retention_days: deviceData.data_retention_days || 365,
alert_thresholds: deviceData.alert_thresholds || {},
measurement_units: deviceData.measurement_units || {},
calibration_data: deviceData.calibration_data || {},
quality_settings: deviceData.quality_settings || {}
},
// Status and monitoring
status: 'active',
health_status: 'unknown',
last_seen: null,
last_reading: null,
// Registration metadata
registered_at: new Date(),
updated_at: new Date(),
registered_by: deviceData.registered_by || 'system'
};
const result = await deviceRegistry.insertOne(deviceRegistration);
// Add to in-memory registry for quick access
this.deviceRegistry.set(deviceData.device_id, deviceRegistration);
console.log(`Device ${deviceData.device_id} registered successfully`);
return {
success: true,
device_id: deviceData.device_id,
registration_id: result.insertedId
};
} catch (error) {
console.error(`Error registering device ${deviceData.device_id}:`, error);
throw error;
}
}
async insertSensorData(collectionName, deviceId, sensorData, options = {}) {
console.log(`Inserting sensor data for device ${deviceId} into ${collectionName}`);
try {
const timeSeriesInfo = this.timeSeriesCollections.get(collectionName);
if (!timeSeriesInfo) {
throw new Error(`Time-series collection not found: ${collectionName}`);
}
const collection = timeSeriesInfo.collection;
// Get device metadata
const deviceInfo = await this.getDeviceInfo(deviceId);
if (!deviceInfo) {
throw new Error(`Device not registered: ${deviceId}`);
}
// Prepare optimized time-series document
const timeSeriesDocument = {
timestamp: sensorData.timestamp || new Date(),
// Device metadata (stored efficiently in metaField)
device: {
device_id: deviceId,
sensor_type: deviceInfo.sensor_type,
location: deviceInfo.location,
model: deviceInfo.specifications.model
},
// Sensor measurements (optimized structure)
measurements: this.optimizeMeasurements(sensorData.measurements),
// Data quality indicators
quality: {
signal_strength: sensorData.signal_strength,
battery_level: sensorData.battery_level,
reading_quality: sensorData.reading_quality || 'good',
data_source: sensorData.data_source || 'sensor'
},
// Operational metadata
metadata: {
firmware_version: deviceInfo.specifications.firmware_version,
sampling_interval: deviceInfo.specifications.sampling_rate,
processing_flags: sensorData.processing_flags || []
}
};
// Apply data validation if enabled
if (this.config.iotOptimization.enableDataValidation) {
await this.validateSensorData(deviceId, timeSeriesDocument);
}
// Insert with time-series optimization
const result = await collection.insertOne(timeSeriesDocument, {
writeConcern: { w: 'majority', j: true }
});
// Update device last seen timestamp
await this.updateDeviceLastSeen(deviceId);
// Trigger real-time aggregation if enabled
if (this.config.analytics.enableRealTimeAggregation) {
await this.triggerRealTimeAggregation(collectionName, deviceId, timeSeriesDocument);
}
return {
success: true,
inserted_id: result.insertedId,
collection: collectionName,
device_id: deviceId,
timestamp: timeSeriesDocument.timestamp
};
} catch (error) {
console.error(`Error inserting sensor data for device ${deviceId}:`, error);
throw error;
}
}
optimizeMeasurements(measurements) {
if (!measurements || typeof measurements !== 'object') {
return measurements;
}
// Optimize measurement structure for compression and query performance
const optimized = {};
Object.entries(measurements).forEach(([key, value]) => {
if (typeof value === 'number') {
// Round to reasonable precision to improve compression
optimized[key] = Math.round(value * 100) / 100;
} else {
optimized[key] = value;
}
});
return optimized;
}
async validateSensorData(deviceId, document) {
const deviceInfo = this.deviceRegistry.get(deviceId);
if (!deviceInfo) return;
// Apply device-specific validation rules
const thresholds = deviceInfo.configuration.alert_thresholds;
Object.entries(document.measurements).forEach(([measurement, value]) => {
if (thresholds[measurement]) {
const threshold = thresholds[measurement];
if (value < threshold.min || value > threshold.max) {
// Log anomaly but don't reject data
console.warn(`Measurement ${measurement} out of range for device ${deviceId}: ${value}`);
document.quality.anomaly_detected = true;
document.quality.anomaly_type = `${measurement}_out_of_range`;
}
}
});
}
async getDeviceInfo(deviceId) {
if (this.deviceRegistry.has(deviceId)) {
return this.deviceRegistry.get(deviceId);
}
const deviceRegistry = this.db.collection('device_registry');
const device = await deviceRegistry.findOne({ device_id: deviceId });
if (device) {
this.deviceRegistry.set(deviceId, device);
}
return device;
}
async updateDeviceLastSeen(deviceId) {
try {
const deviceRegistry = this.db.collection('device_registry');
await deviceRegistry.updateOne(
{ device_id: deviceId },
{
$set: {
last_seen: new Date(),
updated_at: new Date()
}
}
);
// Update in-memory cache
if (this.deviceRegistry.has(deviceId)) {
const device = this.deviceRegistry.get(deviceId);
device.last_seen = new Date();
device.updated_at = new Date();
}
} catch (error) {
console.error(`Error updating last seen for device ${deviceId}:`, error);
// Don't throw - last seen updates shouldn't break data insertion
}
}
async queryTimeSeriesData(collectionName, query = {}, options = {}) {
console.log(`Querying time-series data from ${collectionName}`);
try {
const timeSeriesInfo = this.timeSeriesCollections.get(collectionName);
if (!timeSeriesInfo) {
throw new Error(`Time-series collection not found: ${collectionName}`);
}
const collection = timeSeriesInfo.collection;
const startTime = Date.now();
// Apply time-series optimized query patterns
const optimizedQuery = this.optimizeTimeSeriesQuery(query);
const optimizedOptions = this.optimizeQueryOptions(options);
const results = await collection.find(optimizedQuery, optimizedOptions).toArray();
const queryTime = Date.now() - startTime;
console.log(`Time-series query completed in ${queryTime}ms, returned ${results.length} documents`);
return {
data: results,
metadata: {
collection: collectionName,
query_time_ms: queryTime,
document_count: results.length,
query: optimizedQuery
}
};
} catch (error) {
console.error(`Error querying time-series data from ${collectionName}:`, error);
throw error;
}
}
optimizeTimeSeriesQuery(query) {
const optimized = { ...query };
// Optimize time range queries for time-series performance
if (query.timestamp) {
optimized.timestamp = query.timestamp;
} else if (query.timeRange) {
optimized.timestamp = {
$gte: new Date(query.timeRange.start),
$lte: new Date(query.timeRange.end)
};
delete optimized.timeRange;
}
// Optimize device filtering for metadata field
if (query.device_id) {
optimized['device.device_id'] = query.device_id;
delete optimized.device_id;
}
if (query.sensor_type) {
optimized['device.sensor_type'] = query.sensor_type;
delete optimized.sensor_type;
}
return optimized;
}
optimizeQueryOptions(options) {
const optimized = {
...options,
// Default sort by timestamp descending for time-series efficiency
sort: options.sort || { timestamp: -1 },
// Limit results for performance unless explicitly requested
limit: options.limit || 1000
};
return optimized;
}
// Advanced time-series aggregation operations
async generateDeviceAnalytics(deviceId, timeRange, granularity = '1h') {
console.log(`Generating analytics for device ${deviceId} with ${granularity} granularity`);
try {
const pipeline = [
{
$match: {
'device.device_id': deviceId,
timestamp: {
$gte: new Date(timeRange.start),
$lte: new Date(timeRange.end)
}
}
},
{
$group: {
_id: {
$dateTrunc: {
date: '$timestamp',
unit: this.getTimeUnit(granularity),
binSize: this.getBinSize(granularity)
}
},
// Basic statistics
reading_count: { $sum: 1 },
avg_signal_strength: { $avg: '$quality.signal_strength' },
avg_battery_level: { $avg: '$quality.battery_level' },
// Measurement aggregations (dynamic based on sensor type)
measurements: {
$push: '$measurements'
}
}
},
{
$addFields: {
timestamp: '$_id',
// Calculate measurement statistics
measurement_stats: {
$reduce: {
input: '$measurements',
initialValue: {},
in: this.createMeasurementReducer()
}
}
}
},
{
$project: {
_id: 0,
timestamp: 1,
reading_count: 1,
avg_signal_strength: 1,
avg_battery_level: 1,
measurement_stats: 1
}
},
{
$sort: { timestamp: 1 }
}
];
// Execute aggregation on appropriate collection
const collection = this.getCollectionForDevice(deviceId);
const results = await collection.aggregate(pipeline).toArray();
return {
device_id: deviceId,
time_range: timeRange,
granularity: granularity,
analytics: results
};
} catch (error) {
console.error(`Error generating analytics for device ${deviceId}:`, error);
throw error;
}
}
async generateFleetAnalytics(fleetQuery, timeRange, granularity = '1h') {
console.log(`Generating fleet analytics with ${granularity} granularity`);
try {
const pipeline = [
{
$match: {
...this.optimizeTimeSeriesQuery(fleetQuery),
timestamp: {
$gte: new Date(timeRange.start),
$lte: new Date(timeRange.end)
}
}
},
{
$group: {
_id: {
time_bucket: {
$dateTrunc: {
date: '$timestamp',
unit: this.getTimeUnit(granularity),
binSize: this.getBinSize(granularity)
}
},
device_id: '$device.device_id',
location: '$device.location.building'
},
// Device-level aggregations
reading_count: { $sum: 1 },
avg_battery: { $avg: '$quality.battery_level' },
avg_signal: { $avg: '$quality.signal_strength' },
// Measurement aggregations
measurements: { $push: '$measurements' }
}
},
{
$group: {
_id: '$_id.time_bucket',
// Fleet-level metrics
total_devices: { $sum: 1 },
total_readings: { $sum: '$reading_count' },
avg_fleet_battery: { $avg: '$avg_battery' },
avg_fleet_signal: { $avg: '$avg_signal' },
// Device health distribution
healthy_devices: {
$sum: {
$cond: [
{ $and: [
{ $gte: ['$avg_battery', 20] },
{ $gte: ['$avg_signal', 50] }
]},
1,
0
]
}
},
// Location distribution
locations: {
$addToSet: '$_id.location'
}
}
},
{
$addFields: {
timestamp: '$_id',
device_health_percentage: {
$multiply: [
{ $divide: ['$healthy_devices', '$total_devices'] },
100
]
}
}
},
{
$project: {
_id: 0,
timestamp: 1,
total_devices: 1,
total_readings: 1,
avg_fleet_battery: { $round: ['$avg_fleet_battery', 2] },
avg_fleet_signal: { $round: ['$avg_fleet_signal', 2] },
device_health_percentage: { $round: ['$device_health_percentage', 2] },
healthy_devices: 1,
unique_locations: { $size: '$locations' }
}
},
{
$sort: { timestamp: 1 }
}
];
const collection = this.db.collection('environmental_sensors'); // Default collection
const results = await collection.aggregate(pipeline).toArray();
return {
fleet_query: fleetQuery,
time_range: timeRange,
granularity: granularity,
analytics: results
};
} catch (error) {
console.error(`Error generating fleet analytics:`, error);
throw error;
}
}
getTimeUnit(granularity) {
const unitMap = {
'1s': 'second',
'1m': 'minute',
'5m': 'minute',
'15m': 'minute',
'1h': 'hour',
'1d': 'day'
};
return unitMap[granularity] || 'hour';
}
getBinSize(granularity) {
const binMap = {
'1s': 1,
'1m': 1,
'5m': 5,
'15m': 15,
'1h': 1,
'1d': 1
};
return binMap[granularity] || 1;
}
createMeasurementReducer() {
return {
$function: {
body: `function(accumulator, measurement) {
for (let key in measurement) {
if (typeof measurement[key] === 'number') {
if (!accumulator[key]) {
accumulator[key] = { sum: 0, count: 0, min: measurement[key], max: measurement[key] };
}
accumulator[key].sum += measurement[key];
accumulator[key].count++;
accumulator[key].min = Math.min(accumulator[key].min, measurement[key]);
accumulator[key].max = Math.max(accumulator[key].max, measurement[key]);
accumulator[key].avg = accumulator[key].sum / accumulator[key].count;
}
}
return accumulator;
}`,
args: ['$$value', '$$this'],
lang: 'js'
}
};
}
getCollectionForDevice(deviceId) {
// Simple mapping - in production, this would use device registry
return this.timeSeriesCollections.get('environmental_sensors').collection;
}
async setupRealTimeAggregation() {
if (!this.config.analytics.enableRealTimeAggregation) {
return;
}
console.log('Setting up real-time aggregation pipelines...');
// Setup materialized views for common aggregations
await this.createMaterializedViews();
// Setup change streams for real-time updates
await this.setupChangeStreams();
}
async createMaterializedViews() {
console.log('Creating materialized views for real-time analytics...');
// Hourly device summaries
await this.db.createCollection('device_hourly_summary');
// Daily fleet metrics
await this.db.createCollection('fleet_daily_metrics');
// Real-time alerts collection
await this.db.createCollection('real_time_alerts');
}
async setupChangeStreams() {
console.log('Setting up change streams for real-time processing...');
for (const [collectionName, collectionInfo] of this.timeSeriesCollections) {
const collection = collectionInfo.collection;
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', async (change) => {
if (change.operationType === 'insert') {
await this.processRealTimeInsert(collectionName, change.fullDocument);
}
});
}
}
async processRealTimeInsert(collectionName, document) {
try {
// Update real-time aggregations
await this.updateRealTimeAggregations(collectionName, document);
// Check for alerts
if (this.config.performance.enableAlerts) {
await this.checkAlertConditions(document);
}
} catch (error) {
console.error(`Error processing real-time insert:`, error);
// Don't throw - real-time processing shouldn't break inserts
}
}
async updateRealTimeAggregations(collectionName, document) {
const deviceId = document.device.device_id;
const timestamp = document.timestamp;
const hourBucket = new Date(timestamp.getFullYear(), timestamp.getMonth(), timestamp.getDate(), timestamp.getHours());
// Update hourly device summary
const deviceHourlySummary = this.db.collection('device_hourly_summary');
await deviceHourlySummary.updateOne(
{
device_id: deviceId,
hour_bucket: hourBucket
},
{
$inc: {
reading_count: 1,
'quality.signal_strength_sum': document.quality.signal_strength || 0,
'quality.battery_level_sum': document.quality.battery_level || 0
},
$min: {
'timestamp.first_reading': timestamp
},
$max: {
'timestamp.last_reading': timestamp
},
$set: {
device: document.device,
updated_at: new Date()
}
},
{ upsert: true }
);
}
async checkAlertConditions(document) {
const deviceId = document.device.device_id;
const deviceInfo = await this.getDeviceInfo(deviceId);
if (!deviceInfo || !deviceInfo.configuration.alert_thresholds) {
return;
}
const thresholds = deviceInfo.configuration.alert_thresholds;
const alerts = [];
// Check measurement thresholds
Object.entries(document.measurements).forEach(([measurement, value]) => {
const threshold = thresholds[measurement];
if (threshold) {
if (value < threshold.critical_min || value > threshold.critical_max) {
alerts.push({
type: 'critical_threshold',
measurement: measurement,
value: value,
threshold: threshold,
severity: 'critical'
});
} else if (value < threshold.warning_min || value > threshold.warning_max) {
alerts.push({
type: 'warning_threshold',
measurement: measurement,
value: value,
threshold: threshold,
severity: 'warning'
});
}
}
});
// Check device health
if (document.quality.battery_level < 20) {
alerts.push({
type: 'low_battery',
value: document.quality.battery_level,
severity: 'warning'
});
}
if (document.quality.signal_strength < 30) {
alerts.push({
type: 'poor_signal',
value: document.quality.signal_strength,
severity: 'warning'
});
}
// Store alerts if any
if (alerts.length > 0) {
await this.storeAlerts(deviceId, document.timestamp, alerts);
}
}
async storeAlerts(deviceId, timestamp, alerts) {
const realTimeAlerts = this.db.collection('real_time_alerts');
const alertDocument = {
device_id: deviceId,
timestamp: timestamp,
alerts: alerts,
status: 'active',
created_at: new Date()
};
await realTimeAlerts.insertOne(alertDocument);
console.log(`Stored ${alerts.length} alerts for device ${deviceId}`);
}
async setupMonitoringAndAlerting() {
if (this.config.performance.enableMetrics) {
console.log('Setting up performance monitoring and alerting...');
// Create metrics collection
const metricsCollection = this.db.collection('system_metrics');
await metricsCollection.createIndex({ timestamp: -1 });
await metricsCollection.createIndex({ metric_type: 1, timestamp: -1 });
// Start metrics collection
this.startMetricsCollection();
}
}
startMetricsCollection() {
setInterval(async () => {
try {
await this.collectSystemMetrics();
} catch (error) {
console.error('Error collecting system metrics:', error);
}
}, 60000); // Collect metrics every minute
}
async collectSystemMetrics() {
const timestamp = new Date();
// Collect performance metrics for each time-series collection
for (const [collectionName, collectionInfo] of this.timeSeriesCollections) {
const collection = collectionInfo.collection;
// Get collection stats
const stats = await this.db.runCommand({ collStats: collectionName });
// Calculate recent insertion rate
const oneMinuteAgo = new Date(timestamp.getTime() - 60000);
const recentInserts = await collection.countDocuments({
timestamp: { $gte: oneMinuteAgo }
});
const metrics = {
timestamp: timestamp,
metric_type: 'collection_performance',
collection_name: collectionName,
metrics: {
document_count: stats.count,
storage_size: stats.storageSize,
index_size: stats.totalIndexSize,
recent_inserts_per_minute: recentInserts,
avg_object_size: stats.avgObjSize
}
};
await this.db.collection('system_metrics').insertOne(metrics);
}
}
}
// Example usage: Comprehensive IoT time-series platform
async function demonstrateIoTTimeSeriesManagement() {
const iotManager = new MongoDBIoTTimeSeriesManager('mongodb://localhost:27017', {
enableRealTimeAggregation: true,
enableAnomalyDetection: true,
enableMetrics: true,
hotDataRetentionDays: 30,
warmDataRetentionDays: 90
});
await iotManager.initialize();
// Register IoT devices
await iotManager.registerIoTDevice({
device_id: 'ENV_SENSOR_001',
device_name: 'Environmental Sensor - Conference Room A',
sensor_type: 'environmental',
building: 'Main Office',
floor: 2,
room: 'Conference Room A',
model: 'SensorTech ST-ENV-2000',
manufacturer: 'SensorTech Corp',
firmware_version: 'v2.1.3',
communication_protocol: 'mqtt',
sampling_rate: 60,
alert_thresholds: {
temperature: { min: 18, max: 26, warning_min: 20, warning_max: 24, critical_min: 15, critical_max: 30 },
humidity: { min: 30, max: 70, warning_min: 40, warning_max: 60, critical_min: 20, critical_max: 80 }
}
});
// Insert time-series sensor data
await iotManager.insertSensorData('environmental_sensors', 'ENV_SENSOR_001', {
measurements: {
temperature: 22.5,
humidity: 55.2,
pressure: 1013.25,
light_level: 450
},
signal_strength: 85,
battery_level: 92,
reading_quality: 'excellent'
});
// Generate device analytics
const deviceAnalytics = await iotManager.generateDeviceAnalytics(
'ENV_SENSOR_001',
{
start: new Date(Date.now() - 24 * 60 * 60 * 1000), // 24 hours ago
end: new Date()
},
'1h'
);
console.log('Device Analytics:', deviceAnalytics);
// Generate fleet analytics
const fleetAnalytics = await iotManager.generateFleetAnalytics(
{ 'device.sensor_type': 'environmental' },
{
start: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), // 7 days ago
end: new Date()
},
'1d'
);
console.log('Fleet Analytics:', fleetAnalytics);
}
module.exports = {
MongoDBIoTTimeSeriesManager
};
Understanding MongoDB Time-Series Architecture for IoT
Native Time-Series Optimization and Storage Efficiency
MongoDB's time-series collections provide specialized optimization for IoT data management:
// Enterprise-grade IoT analytics with advanced time-series optimization
class EnterpriseIoTAnalyticsManager extends MongoDBIoTTimeSeriesManager {
constructor(connectionString, enterpriseConfig) {
super(connectionString, enterpriseConfig);
this.enterpriseConfig = {
...enterpriseConfig,
// Advanced analytics capabilities
enablePredictiveAnalytics: true,
enableMachineLearning: true,
enableAnomalyDetection: true,
enableTrendForecasting: true,
// Performance optimization
enableDataPreAggregation: true,
enableIntelligentArchival: true,
enableAdaptiveIndexing: true,
enableQueryOptimization: true,
// Enterprise features
enableDataLineage: true,
enableComplianceTracking: true,
enableDataGovernance: true,
enableAuditTrails: true
};
}
async setupAdvancedAnalytics() {
console.log('Setting up enterprise-grade IoT analytics pipelines...');
// Setup predictive analytics collections
await this.setupPredictiveAnalytics();
// Initialize machine learning pipelines
await this.setupMachineLearningPipelines();
// Configure intelligent data lifecycle management
await this.setupIntelligentDataLifecycle();
}
async performPredictiveAnalysis(deviceId, predictionHorizon = '7d') {
console.log(`Performing predictive analysis for device ${deviceId}...`);
const pipeline = [
{
$match: {
'device.device_id': deviceId,
timestamp: {
$gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) // 30 days historical
}
}
},
{
$group: {
_id: {
$dateTrunc: {
date: '$timestamp',
unit: 'hour'
}
},
avg_temperature: { $avg: '$measurements.temperature' },
avg_humidity: { $avg: '$measurements.humidity' },
avg_battery: { $avg: '$quality.battery_level' },
reading_count: { $sum: 1 }
}
},
{
$sort: { _id: 1 }
},
{
$setWindowFields: {
sortBy: { _id: 1 },
output: {
// Calculate moving averages
temp_7h_ma: {
$avg: '$avg_temperature',
window: { range: [-3, 3], unit: 'position' }
},
humidity_7h_ma: {
$avg: '$avg_humidity',
window: { range: [-3, 3], unit: 'position' }
},
// Calculate trends
temp_trend: {
$linearFill: '$avg_temperature'
},
battery_degradation: {
$derivative: {
input: '$avg_battery',
unit: 'hour'
}
}
}
}
}
];
const collection = this.getCollectionForDevice(deviceId);
const results = await collection.aggregate(pipeline).toArray();
return this.generatePredictions(results, predictionHorizon);
}
generatePredictions(historicalData, horizon) {
// Simplified prediction logic - in production, use advanced ML models
const predictions = {
temperature: this.predictTrend(historicalData, 'temp_trend', horizon),
humidity: this.predictTrend(historicalData, 'humidity_7h_ma', horizon),
battery_life: this.predictBatteryLife(historicalData),
maintenance_needed: this.predictMaintenanceWindow(historicalData),
anomaly_probability: this.calculateAnomalyProbability(historicalData)
};
return predictions;
}
async performFleetOptimizationAnalysis(fleetQuery) {
console.log('Performing fleet optimization analysis...');
const pipeline = [
{
$match: this.optimizeTimeSeriesQuery(fleetQuery)
},
{
$group: {
_id: '$device.device_id',
// Performance metrics
avg_efficiency: { $avg: '$measurements.efficiency' },
avg_power_consumption: { $avg: '$measurements.power_consumption' },
avg_uptime: { $avg: { $cond: [{ $gt: ['$quality.signal_strength', 0] }, 1, 0] } },
// Maintenance indicators
maintenance_score: {
$avg: {
$add: [
{ $cond: [{ $lt: ['$quality.battery_level', 30] }, 0.3, 0] },
{ $cond: [{ $lt: ['$quality.signal_strength', 50] }, 0.2, 0] },
{ $cond: [{ $gt: ['$measurements.temperature', 25] }, 0.2, 0] }
]
}
},
// Location and deployment data
location: { $first: '$device.location' },
last_seen: { $max: '$timestamp' }
}
},
{
$addFields: {
// Calculate optimization recommendations
optimization_priority: {
$cond: [
{ $gt: ['$maintenance_score', 0.5] }, 'high',
{ $cond: [
{ $gt: ['$maintenance_score', 0.3] }, 'medium', 'low'
]}
]
},
efficiency_grade: {
$cond: [
{ $gt: ['$avg_efficiency', 0.8] }, 'A',
{ $cond: [
{ $gt: ['$avg_efficiency', 0.6] }, 'B',
{ $cond: [
{ $gt: ['$avg_efficiency', 0.4] }, 'C', 'D'
]}
]}
]
}
}
},
{
$sort: { maintenance_score: -1, avg_efficiency: 1 }
}
];
const collection = this.db.collection('environmental_sensors');
const results = await collection.aggregate(pipeline).toArray();
return this.generateFleetOptimizationReport(results);
}
generateFleetOptimizationReport(analysisResults) {
const report = {
generated_at: new Date(),
total_devices: analysisResults.length,
// Fleet health summary
health_distribution: {
high_priority: analysisResults.filter(d => d.optimization_priority === 'high').length,
medium_priority: analysisResults.filter(d => d.optimization_priority === 'medium').length,
low_priority: analysisResults.filter(d => d.optimization_priority === 'low').length
},
// Efficiency distribution
efficiency_grades: {
grade_a: analysisResults.filter(d => d.efficiency_grade === 'A').length,
grade_b: analysisResults.filter(d => d.efficiency_grade === 'B').length,
grade_c: analysisResults.filter(d => d.efficiency_grade === 'C').length,
grade_d: analysisResults.filter(d => d.efficiency_grade === 'D').length
},
// Detailed device recommendations
device_recommendations: analysisResults.slice(0, 10), // Top 10 priority devices
// Fleet-wide recommendations
fleet_recommendations: this.generateFleetRecommendations(analysisResults)
};
return report;
}
generateFleetRecommendations(devices) {
const recommendations = [];
const highMaintenanceDevices = devices.filter(d => d.maintenance_score > 0.5);
if (highMaintenanceDevices.length > devices.length * 0.2) {
recommendations.push({
type: 'maintenance_schedule',
priority: 'high',
description: `${highMaintenanceDevices.length} devices require immediate maintenance attention`,
action: 'Schedule preventive maintenance for high-priority devices'
});
}
const lowEfficiencyDevices = devices.filter(d => d.efficiency_grade === 'D');
if (lowEfficiencyDevices.length > 0) {
recommendations.push({
type: 'efficiency_improvement',
priority: 'medium',
description: `${lowEfficiencyDevices.length} devices operating at low efficiency`,
action: 'Review device configuration and environmental conditions'
});
}
return recommendations;
}
}
QueryLeaf Time-Series Operations
QueryLeaf provides familiar SQL syntax for MongoDB time-series operations and IoT analytics:
-- QueryLeaf time-series operations with SQL-familiar syntax for MongoDB IoT analytics
-- Create time-series enabled tables for IoT sensor data
CREATE TABLE environmental_sensors (
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
device_id VARCHAR(100) NOT NULL,
sensor_type VARCHAR(50) NOT NULL,
-- Device metadata (automatically optimized in MongoDB metaField)
location JSONB NOT NULL,
device_model VARCHAR(100),
firmware_version VARCHAR(50),
-- Sensor measurements (stored efficiently in MongoDB measurements)
temperature DECIMAL(5,2),
humidity DECIMAL(5,2),
pressure DECIMAL(7,2),
light_level INTEGER,
air_quality_index INTEGER,
-- Device health indicators
battery_level DECIMAL(5,2),
signal_strength INTEGER,
reading_quality VARCHAR(20) DEFAULT 'good',
-- Quality and operational metadata
data_source VARCHAR(50) DEFAULT 'sensor',
processing_flags TEXT[],
-- Time-series optimization (QueryLeaf automatically configures MongoDB time-series collection)
PRIMARY KEY (device_id, timestamp),
-- Time-series specific indexes (optimized for temporal queries)
INDEX ts_device_time_idx (device_id, timestamp DESC),
INDEX ts_sensor_type_time_idx (sensor_type, timestamp DESC),
INDEX ts_location_time_idx ((location->>'building'), (location->>'floor'), timestamp DESC)
) WITH (
-- MongoDB time-series collection configuration
timeseries_timefield = 'timestamp',
timeseries_metafield = 'device_metadata',
timeseries_granularity = 'seconds',
-- Storage optimization
compression_algorithm = 'zstd',
enable_compression = true,
-- Performance settings
enable_time_series_optimization = true,
enable_automatic_bucketing = true,
bucket_max_span_seconds = 3600 -- 1 hour buckets
);
-- Power monitoring time-series table
CREATE TABLE power_monitoring (
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
device_id VARCHAR(100) NOT NULL,
meter_type VARCHAR(50) NOT NULL,
-- Device and location metadata
location JSONB NOT NULL,
circuit_id VARCHAR(50),
panel_id VARCHAR(50),
-- Power measurements
voltage DECIMAL(6,2),
current DECIMAL(8,3),
power_active DECIMAL(10,2),
power_reactive DECIMAL(10,2),
power_factor DECIMAL(4,3),
frequency DECIMAL(5,2),
energy_consumed_kwh DECIMAL(12,3),
-- Power quality metrics
voltage_thd DECIMAL(5,2), -- Total Harmonic Distortion
current_thd DECIMAL(5,2),
power_interruptions INTEGER DEFAULT 0,
-- Device status
meter_status VARCHAR(20) DEFAULT 'operational',
communication_status VARCHAR(20) DEFAULT 'online',
last_calibration_date DATE,
PRIMARY KEY (device_id, timestamp),
INDEX ts_power_device_time_idx (device_id, timestamp DESC),
INDEX ts_power_circuit_time_idx (circuit_id, timestamp DESC),
INDEX ts_power_consumption_idx (energy_consumed_kwh, timestamp DESC)
) WITH (
timeseries_timefield = 'timestamp',
timeseries_metafield = 'meter_metadata',
timeseries_granularity = 'seconds',
compression_algorithm = 'zstd',
enable_time_series_optimization = true
);
-- Advanced time-series data insertion with automatic optimization
INSERT INTO environmental_sensors (
timestamp, device_id, sensor_type, location, device_model, firmware_version,
temperature, humidity, pressure, light_level, air_quality_index,
battery_level, signal_strength, reading_quality
)
SELECT
-- Generate realistic time-series data for demonstration
CURRENT_TIMESTAMP - (generate_series * INTERVAL '1 minute'),
'ENV_SENSOR_' || LPAD(device_num::text, 3, '0'),
'environmental',
-- Location data as JSONB
JSON_BUILD_OBJECT(
'building', 'Building ' || CHAR(64 + (device_num % 3) + 1),
'floor', (device_num % 5) + 1,
'room', 'Room ' || LPAD(((device_num % 20) + 1)::text, 3, '0'),
'zone', 'Zone ' || CHAR(64 + (device_num % 4) + 1),
'coordinates', JSON_BUILD_OBJECT(
'latitude', 40.7128 + (RANDOM() - 0.5) * 0.01,
'longitude', -74.0060 + (RANDOM() - 0.5) * 0.01
)
),
'SensorTech ST-ENV-2000',
'v2.1.' || (3 + (device_num % 5))::text,
-- Environmental measurements with realistic variations
ROUND((20 + 5 * SIN(generate_series * 0.1) + (RANDOM() - 0.5) * 2)::numeric, 2) as temperature,
ROUND((50 + 10 * COS(generate_series * 0.05) + (RANDOM() - 0.5) * 5)::numeric, 2) as humidity,
ROUND((1013 + 2 * SIN(generate_series * 0.02) + (RANDOM() - 0.5) * 1)::numeric, 2) as pressure,
(400 + (100 * SIN(generate_series * 0.3)) + (RANDOM() * 50))::integer as light_level,
(50 + (20 * COS(generate_series * 0.1)) + (RANDOM() * 10))::integer as air_quality_index,
-- Device health with degradation over time
GREATEST(20, 100 - (generate_series * 0.001) + (RANDOM() - 0.5) * 5)::numeric(5,2) as battery_level,
(70 + (RANDOM() * 30))::integer as signal_strength,
CASE
WHEN RANDOM() < 0.85 THEN 'excellent'
WHEN RANDOM() < 0.95 THEN 'good'
WHEN RANDOM() < 0.99 THEN 'fair'
ELSE 'poor'
END as reading_quality
FROM generate_series(1, 1440) as generate_series, -- 24 hours of minute-by-minute data
generate_series(1, 50) as device_num -- 50 devices
WHERE generate_series <= 1440; -- Limit to 24 hours
-- QueryLeaf automatically optimizes this bulk insert for MongoDB time-series collections
-- Advanced time-series analytics with SQL window functions
WITH hourly_environmental_metrics AS (
SELECT
device_id,
location->>'building' as building,
location->>'floor' as floor,
-- Time bucketing for hourly aggregation
DATE_TRUNC('hour', timestamp) as hour_bucket,
-- Basic statistical aggregations
COUNT(*) as reading_count,
AVG(temperature) as avg_temperature,
MIN(temperature) as min_temperature,
MAX(temperature) as max_temperature,
STDDEV(temperature) as temp_stddev,
AVG(humidity) as avg_humidity,
MIN(humidity) as min_humidity,
MAX(humidity) as max_humidity,
AVG(pressure) as avg_pressure,
AVG(light_level) as avg_light_level,
AVG(air_quality_index) as avg_air_quality,
-- Device health metrics
AVG(battery_level) as avg_battery_level,
AVG(signal_strength) as avg_signal_strength,
-- Data quality assessment
COUNT(*) FILTER (WHERE reading_quality = 'excellent') as excellent_readings,
COUNT(*) FILTER (WHERE reading_quality IN ('excellent', 'good')) as good_readings,
(COUNT(*) FILTER (WHERE reading_quality IN ('excellent', 'good'))::DECIMAL / COUNT(*)) * 100 as quality_percentage
FROM environmental_sensors
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '7 days'
GROUP BY device_id, location->>'building', location->>'floor', DATE_TRUNC('hour', timestamp)
),
environmental_trends AS (
SELECT
hem.*,
-- Time-series trend analysis using window functions
LAG(avg_temperature, 1) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
) as prev_hour_temp,
LAG(avg_temperature, 24) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
) as same_hour_yesterday_temp,
-- Calculate temperature trends
avg_temperature - LAG(avg_temperature, 1) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
) as temp_hourly_change,
avg_temperature - LAG(avg_temperature, 24) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
) as temp_daily_change,
-- Moving averages for smoothing
AVG(avg_temperature) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING
) as temp_7h_moving_avg,
AVG(avg_humidity) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING
) as humidity_7h_moving_avg,
-- Battery degradation analysis
FIRST_VALUE(avg_battery_level) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
ROWS UNBOUNDED PRECEDING
) as initial_battery_level,
-- Calculate linear regression slope for trends
REGR_SLOPE(avg_temperature, EXTRACT(EPOCH FROM hour_bucket)) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW -- 24-hour window
) as temp_trend_slope,
REGR_R2(avg_temperature, EXTRACT(EPOCH FROM hour_bucket)) OVER (
PARTITION BY device_id
ORDER BY hour_bucket
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
) as temp_trend_r2,
-- Device performance ranking
DENSE_RANK() OVER (
PARTITION BY building, floor, hour_bucket
ORDER BY quality_percentage DESC, avg_signal_strength DESC
) as device_performance_rank
FROM hourly_environmental_metrics hem
),
anomaly_detection AS (
SELECT
et.*,
-- Statistical anomaly detection
CASE
WHEN ABS(avg_temperature - temp_7h_moving_avg) > (2 * temp_stddev) THEN 'temperature_anomaly'
WHEN temp_hourly_change > 5 OR temp_hourly_change < -5 THEN 'rapid_temperature_change'
WHEN avg_battery_level < 20 THEN 'low_battery'
WHEN avg_signal_strength < 30 THEN 'poor_connectivity'
WHEN quality_percentage < 80 THEN 'poor_data_quality'
ELSE 'normal'
END as anomaly_type,
-- Severity assessment
CASE
WHEN avg_battery_level < 10 OR avg_signal_strength < 20 THEN 'critical'
WHEN avg_battery_level < 20 OR avg_signal_strength < 30 OR quality_percentage < 70 THEN 'high'
WHEN ABS(temp_hourly_change) > 3 OR quality_percentage < 90 THEN 'medium'
ELSE 'low'
END as anomaly_severity,
-- Trend classification
CASE
WHEN temp_trend_slope > 0.01 AND temp_trend_r2 > 0.7 THEN 'increasing_trend'
WHEN temp_trend_slope < -0.01 AND temp_trend_r2 > 0.7 THEN 'decreasing_trend'
WHEN ABS(temp_trend_slope) <= 0.01 THEN 'stable'
ELSE 'irregular'
END as temperature_trend_classification,
-- Battery health assessment
CASE
WHEN (initial_battery_level - avg_battery_level) / GREATEST(EXTRACT(DAYS FROM (hour_bucket - (SELECT MIN(hour_bucket) FROM environmental_trends WHERE device_id = et.device_id))), 1) > 2 THEN 'fast_degradation'
WHEN (initial_battery_level - avg_battery_level) / GREATEST(EXTRACT(DAYS FROM (hour_bucket - (SELECT MIN(hour_bucket) FROM environmental_trends WHERE device_id = et.device_id))), 1) > 0.5 THEN 'normal_degradation'
ELSE 'stable_battery'
END as battery_health_status
FROM environmental_trends et
)
-- Generate comprehensive IoT device analytics report
SELECT
ad.device_id,
ad.building,
ad.floor,
ad.hour_bucket,
-- Current measurements
ROUND(ad.avg_temperature, 2) as current_avg_temperature,
ROUND(ad.avg_humidity, 2) as current_avg_humidity,
ROUND(ad.avg_pressure, 2) as current_avg_pressure,
ad.avg_light_level,
ad.avg_air_quality,
-- Trend analysis
ROUND(ad.temp_7h_moving_avg, 2) as temperature_trend,
ROUND(ad.temp_hourly_change, 2) as hourly_temp_change,
ROUND(ad.temp_daily_change, 2) as daily_temp_change,
ad.temperature_trend_classification,
-- Device health
ROUND(ad.avg_battery_level, 1) as battery_level,
ad.avg_signal_strength,
ROUND(ad.quality_percentage, 1) as data_quality_pct,
ad.battery_health_status,
-- Performance metrics
ad.reading_count,
ad.device_performance_rank,
-- Anomaly and alert information
ad.anomaly_type,
ad.anomaly_severity,
-- Operational insights
CASE
WHEN ad.anomaly_severity = 'critical' THEN 'immediate_maintenance_required'
WHEN ad.anomaly_severity = 'high' AND ad.battery_health_status = 'fast_degradation' THEN 'schedule_battery_replacement'
WHEN ad.temperature_trend_classification = 'irregular' AND ad.quality_percentage < 85 THEN 'investigate_sensor_calibration'
WHEN ad.avg_signal_strength < 50 THEN 'improve_network_connectivity'
WHEN ad.device_performance_rank > 3 THEN 'investigate_environmental_factors'
ELSE 'normal_operation'
END as recommended_action,
-- Predictive maintenance scoring
ROUND(
(
CASE WHEN ad.avg_battery_level < 30 THEN 0.3 ELSE 0 END +
CASE WHEN ad.avg_signal_strength < 50 THEN 0.2 ELSE 0 END +
CASE WHEN ad.quality_percentage < 85 THEN 0.2 ELSE 0 END +
CASE WHEN ad.temperature_trend_classification = 'irregular' THEN 0.15 ELSE 0 END +
CASE WHEN ABS(ad.temp_hourly_change) > 3 THEN 0.15 ELSE 0 END
) * 100, 1
) as maintenance_priority_score,
-- Time context
ad.hour_bucket as analysis_time,
CURRENT_TIMESTAMP as report_generated_at
FROM anomaly_detection ad
WHERE ad.hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND (ad.anomaly_type != 'normal' OR ad.maintenance_priority_score > 30)
ORDER BY ad.maintenance_priority_score DESC, ad.device_id, ad.hour_bucket DESC
LIMIT 100;
-- Advanced fleet-wide analytics with cross-device correlations
WITH fleet_performance_metrics AS (
SELECT
DATE_TRUNC('hour', timestamp) as hour_bucket,
location->>'building' as building,
location->>'floor' as floor,
-- Fleet-wide aggregations
COUNT(DISTINCT device_id) as active_devices,
COUNT(*) as total_readings,
-- Environmental averages across fleet
AVG(temperature) as fleet_avg_temperature,
STDDEV(temperature) as fleet_temp_stddev,
MIN(temperature) as fleet_min_temperature,
MAX(temperature) as fleet_max_temperature,
AVG(humidity) as fleet_avg_humidity,
AVG(pressure) as fleet_avg_pressure,
AVG(air_quality_index) as fleet_avg_air_quality,
-- Fleet health metrics
AVG(battery_level) as fleet_avg_battery,
MIN(battery_level) as fleet_min_battery,
AVG(signal_strength) as fleet_avg_signal,
MIN(signal_strength) as fleet_min_signal,
-- Data quality across fleet
COUNT(*) FILTER (WHERE reading_quality IN ('excellent', 'good')) as quality_readings,
(COUNT(*) FILTER (WHERE reading_quality IN ('excellent', 'good'))::DECIMAL / COUNT(*)) * 100 as fleet_quality_percentage,
-- Device health distribution
COUNT(DISTINCT device_id) FILTER (WHERE battery_level > 50 AND signal_strength > 70) as healthy_devices,
COUNT(DISTINCT device_id) FILTER (WHERE battery_level < 20 OR signal_strength < 30) as critical_devices,
-- Environmental comfort assessment
COUNT(*) FILTER (WHERE temperature BETWEEN 20 AND 24 AND humidity BETWEEN 40 AND 60) as comfort_readings,
(COUNT(*) FILTER (WHERE temperature BETWEEN 20 AND 24 AND humidity BETWEEN 40 AND 60)::DECIMAL / COUNT(*)) * 100 as comfort_percentage
FROM environmental_sensors
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '7 days'
GROUP BY DATE_TRUNC('hour', timestamp), location->>'building', location->>'floor'
),
fleet_trends AS (
SELECT
fpm.*,
-- Fleet efficiency trends
LAG(fleet_avg_temperature, 1) OVER (
PARTITION BY building, floor
ORDER BY hour_bucket
) as prev_hour_fleet_temp,
LAG(fleet_quality_percentage, 24) OVER (
PARTITION BY building, floor
ORDER BY hour_bucket
) as same_hour_yesterday_quality,
-- Calculate fleet-wide trends
fleet_quality_percentage - LAG(fleet_quality_percentage, 24) OVER (
PARTITION BY building, floor
ORDER BY hour_bucket
) as daily_quality_change,
-- Fleet health scoring
ROUND(
(
(fleet_avg_battery / 100.0) * 0.3 +
(fleet_avg_signal / 100.0) * 0.2 +
(fleet_quality_percentage / 100.0) * 0.3 +
(comfort_percentage / 100.0) * 0.2
) * 100, 1
) as fleet_health_score
FROM fleet_performance_metrics fpm
)
-- Generate fleet optimization and management dashboard
SELECT
ft.building,
ft.floor,
ft.hour_bucket,
-- Fleet operational metrics
ft.active_devices,
ft.total_readings,
ROUND(ft.fleet_avg_temperature, 1) as avg_temperature,
ROUND(ft.fleet_avg_humidity, 1) as avg_humidity,
ft.fleet_avg_air_quality,
-- Fleet health assessment
ft.healthy_devices,
ft.critical_devices,
ROUND(ft.fleet_avg_battery, 1) as avg_battery_level,
ROUND(ft.fleet_avg_signal, 1) as avg_signal_strength,
ROUND(ft.fleet_quality_percentage, 1) as data_quality_pct,
ft.fleet_health_score,
-- Environmental management
ROUND(ft.comfort_percentage, 1) as comfort_zone_percentage,
-- Trend analysis
ROUND(ft.daily_quality_change, 2) as quality_trend_24h,
-- Fleet status classification
CASE
WHEN ft.fleet_health_score >= 85 THEN 'optimal'
WHEN ft.fleet_health_score >= 75 THEN 'good'
WHEN ft.fleet_health_score >= 65 THEN 'fair'
ELSE 'attention_required'
END as fleet_status,
-- Management recommendations
CASE
WHEN ft.critical_devices > ft.active_devices * 0.2 THEN 'urgent_maintenance_needed'
WHEN ft.fleet_avg_battery < 30 THEN 'battery_replacement_program'
WHEN ft.fleet_quality_percentage < 80 THEN 'calibration_review_required'
WHEN ft.comfort_percentage < 70 THEN 'environmental_adjustment_needed'
WHEN ft.fleet_avg_signal < 60 THEN 'network_infrastructure_upgrade'
ELSE 'continue_monitoring'
END as fleet_recommendation,
-- Efficiency metrics
ROUND(ft.total_readings::DECIMAL / ft.active_devices, 1) as avg_readings_per_device,
-- Time context
ft.hour_bucket as analysis_time
FROM fleet_trends ft
WHERE ft.hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '48 hours'
ORDER BY ft.building, ft.floor, ft.hour_bucket DESC;
-- Real-time alerting and monitoring setup
CREATE OR REPLACE VIEW real_time_device_alerts AS
WITH recent_readings AS (
SELECT
device_id,
timestamp,
temperature,
humidity,
battery_level,
signal_strength,
reading_quality,
location,
-- Calculate recent trends
LAG(temperature, 1) OVER (PARTITION BY device_id ORDER BY timestamp) as prev_temperature,
LAG(battery_level, 1) OVER (PARTITION BY device_id ORDER BY timestamp) as prev_battery
FROM environmental_sensors
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
),
alert_conditions AS (
SELECT
rr.*,
-- Temperature alerts
CASE
WHEN temperature > 30 THEN 'high_temperature_critical'
WHEN temperature < 15 THEN 'low_temperature_critical'
WHEN temperature > 26 THEN 'high_temperature_warning'
WHEN temperature < 18 THEN 'low_temperature_warning'
WHEN ABS(temperature - prev_temperature) > 5 THEN 'rapid_temperature_change'
ELSE NULL
END as temperature_alert,
-- Device health alerts
CASE
WHEN battery_level < 10 THEN 'battery_critical'
WHEN battery_level < 20 THEN 'battery_low'
WHEN signal_strength < 20 THEN 'connectivity_critical'
WHEN signal_strength < 40 THEN 'connectivity_poor'
WHEN reading_quality = 'poor' THEN 'data_quality_poor'
ELSE NULL
END as device_alert,
-- Environmental comfort alerts
CASE
WHEN temperature > 26 AND humidity > 70 THEN 'comfort_poor_hot_humid'
WHEN temperature < 20 AND humidity < 30 THEN 'comfort_poor_cold_dry'
WHEN humidity > 80 THEN 'humidity_excessive'
WHEN humidity < 20 THEN 'humidity_insufficient'
ELSE NULL
END as comfort_alert
FROM recent_readings rr
)
SELECT
device_id,
timestamp,
location->>'building' as building,
location->>'floor' as floor,
location->>'room' as room,
-- Current measurements
temperature,
humidity,
battery_level,
signal_strength,
reading_quality,
-- Alert information
COALESCE(temperature_alert, device_alert, comfort_alert) as alert_type,
CASE
WHEN temperature_alert LIKE '%critical' OR device_alert LIKE '%critical' THEN 'critical'
WHEN temperature_alert LIKE '%warning' OR device_alert LIKE '%poor' OR comfort_alert IS NOT NULL THEN 'warning'
ELSE 'info'
END as alert_severity,
-- Alert context
CASE
WHEN temperature_alert IS NOT NULL THEN 'Environmental conditions outside normal range'
WHEN device_alert LIKE '%battery%' THEN 'Device requires battery maintenance'
WHEN device_alert LIKE '%connectivity%' THEN 'Device has communication issues'
WHEN comfort_alert IS NOT NULL THEN 'Environmental comfort affected'
ELSE 'Device operating normally'
END as alert_message,
timestamp as alert_timestamp
FROM alert_conditions
WHERE temperature_alert IS NOT NULL
OR device_alert IS NOT NULL
OR comfort_alert IS NOT NULL
ORDER BY timestamp DESC,
CASE
WHEN temperature_alert LIKE '%critical' OR device_alert LIKE '%critical' THEN 1
WHEN temperature_alert LIKE '%warning' OR device_alert LIKE '%poor' THEN 2
ELSE 3
END;
-- QueryLeaf provides comprehensive time-series capabilities:
-- 1. Native MongoDB time-series collection optimization with SQL syntax
-- 2. Advanced temporal analytics with window functions and trend analysis
-- 3. Real-time IoT data processing with automatic bucketing and compression
-- 4. Fleet-wide analytics and cross-device correlation analysis
-- 5. Predictive maintenance and anomaly detection capabilities
-- 6. Automated alerting and monitoring with configurable thresholds
-- 7. Efficient storage and query optimization for high-velocity sensor data
-- 8. Familiar SQL interface for complex time-series operations and analytics
Best Practices for MongoDB Time-Series IoT Architecture
Performance Optimization and Storage Efficiency
Essential principles for scalable IoT time-series database design:
- Time-Series Collection Design: Configure appropriate granularity and metaField organization for optimal bucketing and compression
- Indexing Strategy: Create time-based indexes optimized for temporal query patterns and device-specific access
- Data Retention Policies: Implement intelligent data lifecycle management with hot, warm, and cold storage tiers
- Batch Insert Optimization: Use bulk operations and batch processing for high-throughput sensor data ingestion
- Query Optimization: Leverage MongoDB's time-series query optimization features for aggregation and analytical workloads
- Compression Configuration: Enable appropriate compression algorithms for time-series data patterns and storage efficiency
Real-Time Analytics and Monitoring
Optimize time-series architectures for real-time IoT analytics:
- Aggregation Pipelines: Design efficient aggregation pipelines for real-time metrics and trend calculation
- Change Stream Processing: Implement change streams for real-time data processing and alert generation
- Materialized Views: Create pre-computed aggregations for common analytical queries and dashboard requirements
- Anomaly Detection: Implement statistical and machine learning-based anomaly detection for sensor data
- Predictive Analytics: Design predictive models for maintenance scheduling and operational optimization
- Alert Management: Configure intelligent alerting systems with severity classification and escalation policies
Conclusion
MongoDB time-series collections provide purpose-built capabilities for IoT data management that dramatically improve storage efficiency, query performance, and real-time analytics through native time-series optimization, intelligent data bucketing, and comprehensive aggregation capabilities. The specialized architecture ensures that IoT applications can efficiently handle massive sensor data volumes while maintaining fast query response times and cost-effective storage utilization.
Key MongoDB Time-Series IoT benefits include:
- Optimized Storage Efficiency: Native time-series compression and bucketing reduce storage requirements by 60-90% compared to traditional approaches
- High-Velocity Data Ingestion: Purpose-built for high-throughput sensor data with automatic optimization for temporal access patterns
- Real-Time Analytics: Built-in aggregation optimization enables real-time analytics and monitoring across massive sensor fleets
- Intelligent Data Lifecycle: Automated data retention and archival policies optimize costs while maintaining compliance requirements
- Scalable Performance: Time-series collections scale seamlessly from thousands to millions of sensors with consistent performance
- SQL Accessibility: Familiar SQL-style time-series operations through QueryLeaf for accessible IoT analytics and management
Whether you're building smart building management systems, industrial IoT monitoring platforms, or large-scale sensor networks, MongoDB time-series collections with QueryLeaf's familiar SQL interface provide the foundation for efficient, scalable IoT data management.
QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB time-series operations while providing familiar SQL syntax for complex temporal analytics, fleet management, and predictive maintenance workflows. Advanced time-series aggregations, real-time monitoring, and anomaly detection are seamlessly accessible through familiar SQL constructs, making sophisticated IoT analytics approachable for SQL-oriented development teams.
The combination of MongoDB's robust time-series capabilities with SQL-style temporal operations makes it an ideal platform for IoT applications requiring both efficient sensor data management and familiar analytical database patterns, ensuring your IoT infrastructure can scale effectively while maintaining performance and operational simplicity.