MongoDB Time-Series Collections for IoT Data Processing: Edge Analytics and Real-Time Stream Processing
Modern IoT applications generate massive volumes of time-stamped sensor data requiring efficient storage, real-time analysis, and historical trend analysis. MongoDB's time-series collections provide specialized storage optimization and query capabilities designed specifically for time-ordered data workloads, enabling high-performance IoT data processing with familiar SQL-style analytics patterns.
Time-series collections in MongoDB automatically optimize storage layout, indexing strategies, and query execution for temporal data patterns, significantly improving performance for IoT sensor readings, device telemetry, financial market data, and application metrics while maintaining the flexibility to handle complex nested sensor data structures.
The IoT Data Processing Challenge
Consider a smart manufacturing facility with thousands of sensors generating continuous data streams:
// Traditional document storage - inefficient for time-series data
{
"_id": ObjectId("..."),
"device_id": "SENSOR_001_TEMP",
"device_type": "temperature",
"location": "assembly_line_1",
"timestamp": ISODate("2025-12-24T10:15:30.123Z"),
"temperature_celsius": 23.5,
"humidity_percent": 45.2,
"pressure_bar": 1.013,
"battery_level": 87,
"signal_strength": -65,
"metadata": {
"firmware_version": "2.1.4",
"calibration_date": ISODate("2025-11-15T00:00:00Z"),
"maintenance_status": "ok"
}
}
// Problems with traditional collections for IoT data:
// 1. Storage Overhead: Full document structure repeated for each reading
// 2. Index Inefficiency: Generic indexes not optimized for time-ordered queries
// 3. Query Performance: Range queries on timestamp fields are slow
// 4. Memory Usage: Large working sets for time-based aggregations
// 5. Disk I/O: Scattered document layout reduces sequential read performance
// 6. Scaling Issues: Hot-spotting on insertion due to monotonic timestamps
// 7. Compression: Limited compression opportunities with varied document structures
// Example of inefficient time-range query performance:
db.sensor_data.find({
"device_id": "SENSOR_001_TEMP",
"timestamp": {
$gte: ISODate("2025-12-24T00:00:00Z"),
$lt: ISODate("2025-12-24T01:00:00Z")
}
}).explain("executionStats")
// Result: Full collection scan, high disk I/O, poor cache utilization
MongoDB time-series collections solve these challenges through specialized optimizations:
// Create optimized time-series collection for IoT data
db.createCollection("iot_sensor_readings", {
timeseries: {
timeField: "timestamp", // Required: field containing timestamp
metaField: "device_info", // Optional: field containing metadata
granularity: "seconds", // Optimization hint: seconds, minutes, or hours
bucketMaxSpanSeconds: 3600, // Maximum time span per bucket (1 hour)
bucketRoundingSeconds: 3600 // Round bucket boundaries to hour marks
},
expireAfterSeconds: 7776000, // TTL: expire data after 90 days
clusteredIndex: { // Optimize for time-ordered access
key: { timestamp: 1 },
unique: false
}
})
// Optimized IoT sensor data structure for time-series collections
{
"timestamp": ISODate("2025-12-24T10:15:30.123Z"), // Time field - always required
"device_info": { // Meta field - constant per device
"device_id": "SENSOR_001_TEMP",
"device_type": "temperature",
"location": "assembly_line_1",
"firmware_version": "2.1.4",
"calibration_date": ISODate("2025-11-15T00:00:00Z")
},
// Measurement fields - vary over time
"temperature_celsius": 23.5,
"humidity_percent": 45.2,
"pressure_bar": 1.013,
"battery_level": 87,
"signal_strength_dbm": -65,
"status": "operational"
}
IoT Data Ingestion and Streaming
High-Throughput Sensor Data Insertion
// Batch insertion for high-volume IoT data streams
const sensorReadings = [
{
timestamp: new Date("2025-12-24T10:15:00Z"),
device_info: {
device_id: "TEMP_001",
location: "warehouse_zone_a",
device_type: "environmental"
},
temperature_celsius: 22.1,
humidity_percent: 43.5,
battery_level: 89
},
{
timestamp: new Date("2025-12-24T10:15:30Z"),
device_info: {
device_id: "TEMP_001",
location: "warehouse_zone_a",
device_type: "environmental"
},
temperature_celsius: 22.3,
humidity_percent: 43.2,
battery_level: 89
},
{
timestamp: new Date("2025-12-24T10:15:00Z"),
device_info: {
device_id: "PRESS_002",
location: "hydraulic_system_1",
device_type: "pressure"
},
pressure_bar: 15.7,
flow_rate_lpm: 125.3,
valve_position_percent: 67
}
// ... thousands more readings
];
// Efficient bulk insertion with time-series optimizations
const result = await db.iot_sensor_readings.insertMany(sensorReadings, {
ordered: false, // Allow parallel inserts for better performance
writeConcern: { // Balance between performance and durability
w: 1, // Acknowledge from primary only
j: false // Don't wait for journal sync for high throughput
}
});
console.log(`Inserted ${result.insertedCount} sensor readings`);
Real-Time Data Streaming Pipeline
// MongoDB Change Streams for real-time IoT data processing
const changeStream = db.iot_sensor_readings.watch([
{
$match: {
"operationType": "insert",
// Filter for specific device types or locations
"fullDocument.device_info.device_type": { $in: ["temperature", "pressure"] },
"fullDocument.device_info.location": { $regex: /^production_line/ }
}
},
{
$project: {
timestamp: "$fullDocument.timestamp",
device_id: "$fullDocument.device_info.device_id",
location: "$fullDocument.device_info.location",
temperature: "$fullDocument.temperature_celsius",
pressure: "$fullDocument.pressure_bar",
inserted_at: "$$clusterTime"
}
}
], { fullDocument: 'updateLookup' });
// Real-time alert processing
changeStream.on('change', async (changeEvent) => {
const { timestamp, device_id, location, temperature, pressure } = changeEvent;
// Temperature threshold monitoring
if (temperature !== undefined && temperature > 35.0) {
await processTemperatureAlert({
device_id,
location,
temperature,
timestamp,
severity: temperature > 40.0 ? 'critical' : 'warning'
});
}
// Pressure threshold monitoring
if (pressure !== undefined && pressure > 20.0) {
await processPressureAlert({
device_id,
location,
pressure,
timestamp,
severity: pressure > 25.0 ? 'critical' : 'warning'
});
}
// Update real-time dashboard
await updateDashboardMetrics({
device_id,
location,
latest_reading: { temperature, pressure, timestamp }
});
});
async function processTemperatureAlert(alertData) {
// Check for sustained high temperature
const recentReadings = await db.iot_sensor_readings.aggregate([
{
$match: {
"device_info.device_id": alertData.device_id,
"timestamp": {
$gte: new Date(Date.now() - 5 * 60 * 1000) // Last 5 minutes
},
"temperature_celsius": { $gt: 35.0 }
}
},
{
$group: {
_id: null,
avg_temperature: { $avg: "$temperature_celsius" },
max_temperature: { $max: "$temperature_celsius" },
reading_count: { $sum: 1 }
}
}
]).next();
if (recentReadings && recentReadings.reading_count >= 3) {
// Sustained high temperature - trigger maintenance alert
await db.maintenance_alerts.insertOne({
alert_type: "temperature_sustained_high",
device_id: alertData.device_id,
location: alertData.location,
severity: alertData.severity,
current_temperature: alertData.temperature,
avg_temperature_5min: recentReadings.avg_temperature,
max_temperature_5min: recentReadings.max_temperature,
created_at: new Date(),
acknowledged: false
});
// Send notification to operations team
await sendAlert({
type: 'email',
recipients: ['[email protected]'],
subject: `High Temperature Alert - ${alertData.location}`,
body: `Device ${alertData.device_id} reporting sustained high temperature: ${alertData.temperature}°C`
});
}
}
Time-Series Analytics and Aggregations
SQL-Style Time-Based Analytics
// Advanced time-series aggregation for IoT analytics
db.iot_sensor_readings.aggregate([
// Stage 1: Filter recent sensor data
{
$match: {
"timestamp": {
$gte: ISODate("2025-12-24T00:00:00Z"),
$lt: ISODate("2025-12-25T00:00:00Z")
},
"device_info.location": { $regex: /^production_line/ }
}
},
// Stage 2: Time-based grouping (hourly buckets)
{
$group: {
_id: {
device_id: "$device_info.device_id",
location: "$device_info.location",
device_type: "$device_info.device_type",
hour: {
$dateToString: {
format: "%Y-%m-%d %H:00:00",
date: "$timestamp"
}
}
},
// Temperature analytics
avg_temperature: { $avg: "$temperature_celsius" },
min_temperature: { $min: "$temperature_celsius" },
max_temperature: { $max: "$temperature_celsius" },
temperature_readings: { $sum: { $cond: [{ $ne: ["$temperature_celsius", null] }, 1, 0] } },
// Pressure analytics
avg_pressure: { $avg: "$pressure_bar" },
min_pressure: { $min: "$pressure_bar" },
max_pressure: { $max: "$pressure_bar" },
pressure_readings: { $sum: { $cond: [{ $ne: ["$pressure_bar", null] }, 1, 0] } },
// Humidity analytics
avg_humidity: { $avg: "$humidity_percent" },
min_humidity: { $min: "$humidity_percent" },
max_humidity: { $max: "$humidity_percent" },
// Battery level monitoring
avg_battery: { $avg: "$battery_level" },
min_battery: { $min: "$battery_level" },
low_battery_count: {
$sum: { $cond: [{ $and: [{ $ne: ["$battery_level", null] }, { $lt: ["$battery_level", 20] }] }, 1, 0] }
},
// Data quality metrics
total_readings: { $sum: 1 },
missing_data_count: {
$sum: {
$cond: [
{
$and: [
{ $eq: ["$temperature_celsius", null] },
{ $eq: ["$pressure_bar", null] },
{ $eq: ["$humidity_percent", null] }
]
},
1,
0
]
}
},
// Signal quality
avg_signal_strength: { $avg: "$signal_strength_dbm" },
weak_signal_count: {
$sum: { $cond: [{ $and: [{ $ne: ["$signal_strength_dbm", null] }, { $lt: ["$signal_strength_dbm", -80] }] }, 1, 0] }
},
first_reading_time: { $min: "$timestamp" },
last_reading_time: { $max: "$timestamp" }
}
},
// Stage 3: Calculate derived metrics and data quality indicators
{
$addFields: {
// Temperature variation coefficient
temperature_variation_coefficient: {
$cond: [
{ $gt: ["$avg_temperature", 0] },
{
$divide: [
{ $subtract: ["$max_temperature", "$min_temperature"] },
"$avg_temperature"
]
},
null
]
},
// Pressure stability indicator
pressure_stability_score: {
$cond: [
{ $and: [{ $gt: ["$avg_pressure", 0] }, { $gt: ["$pressure_readings", 10] }] },
{
$subtract: [
1,
{
$divide: [
{ $subtract: ["$max_pressure", "$min_pressure"] },
{ $multiply: ["$avg_pressure", 2] }
]
}
]
},
null
]
},
// Data completeness percentage
data_completeness_percent: {
$multiply: [
{
$divide: [
{ $subtract: ["$total_readings", "$missing_data_count"] },
"$total_readings"
]
},
100
]
},
// Equipment health score (composite metric)
equipment_health_score: {
$multiply: [
{
$avg: [
// Battery health factor (0-1)
{ $divide: ["$avg_battery", 100] },
// Signal quality factor (0-1)
{
$cond: [
{ $ne: ["$avg_signal_strength", null] },
{ $divide: [{ $add: ["$avg_signal_strength", 100] }, 100] },
0.5
]
},
// Data quality factor (0-1)
{ $divide: ["$data_completeness_percent", 100] }
]
},
100
]
}
}
},
// Stage 4: Quality and threshold analysis
{
$addFields: {
temperature_status: {
$switch: {
branches: [
{ case: { $gt: ["$max_temperature", 40] }, then: "critical" },
{ case: { $gt: ["$avg_temperature", 35] }, then: "warning" },
{ case: { $lt: ["$avg_temperature", 15] }, then: "too_cold" },
{ case: { $gt: ["$temperature_variation_coefficient", 0.3] }, then: "unstable" }
],
default: "normal"
}
},
pressure_status: {
$switch: {
branches: [
{ case: { $gt: ["$max_pressure", 25] }, then: "critical" },
{ case: { $gt: ["$avg_pressure", 20] }, then: "warning" },
{ case: { $lt: ["$pressure_stability_score", 0.7] }, then: "unstable" }
],
default: "normal"
}
},
battery_status: {
$switch: {
branches: [
{ case: { $lt: ["$min_battery", 10] }, then: "critical" },
{ case: { $lt: ["$avg_battery", 20] }, then: "low" },
{ case: { $gt: ["$low_battery_count", 5] }, then: "degrading" }
],
default: "normal"
}
},
overall_status: {
$switch: {
branches: [
{
case: {
$or: [
{ $eq: ["$temperature_status", "critical"] },
{ $eq: ["$pressure_status", "critical"] },
{ $eq: ["$battery_status", "critical"] }
]
},
then: "critical"
},
{
case: {
$or: [
{ $eq: ["$temperature_status", "warning"] },
{ $eq: ["$pressure_status", "warning"] },
{ $eq: ["$battery_status", "low"] },
{ $lt: ["$data_completeness_percent", 90] }
]
},
then: "warning"
}
],
default: "normal"
}
}
}
},
// Stage 5: Sort and format results
{
$sort: {
"_id.location": 1,
"_id.device_id": 1,
"_id.hour": 1
}
},
// Stage 6: Project final analytics results
{
$project: {
device_id: "$_id.device_id",
location: "$_id.location",
device_type: "$_id.device_type",
hour: "$_id.hour",
// Environmental metrics
temperature_metrics: {
average: { $round: ["$avg_temperature", 1] },
minimum: { $round: ["$min_temperature", 1] },
maximum: { $round: ["$max_temperature", 1] },
variation_coefficient: { $round: ["$temperature_variation_coefficient", 3] },
reading_count: "$temperature_readings",
status: "$temperature_status"
},
pressure_metrics: {
average: { $round: ["$avg_pressure", 2] },
minimum: { $round: ["$min_pressure", 2] },
maximum: { $round: ["$max_pressure", 2] },
stability_score: { $round: ["$pressure_stability_score", 3] },
reading_count: "$pressure_readings",
status: "$pressure_status"
},
humidity_metrics: {
average: { $round: ["$avg_humidity", 1] },
minimum: { $round: ["$min_humidity", 1] },
maximum: { $round: ["$max_humidity", 1] }
},
// Equipment health
equipment_metrics: {
battery_average: { $round: ["$avg_battery", 1] },
battery_minimum: "$min_battery",
low_battery_incidents: "$low_battery_count",
battery_status: "$battery_status",
signal_strength_avg: { $round: ["$avg_signal_strength", 1] },
weak_signal_count: "$weak_signal_count",
health_score: { $round: ["$equipment_health_score", 1] }
},
// Data quality
data_quality: {
total_readings: "$total_readings",
completeness_percent: { $round: ["$data_completeness_percent", 1] },
missing_readings: "$missing_data_count",
time_span_minutes: {
$divide: [
{ $subtract: ["$last_reading_time", "$first_reading_time"] },
60000
]
}
},
overall_status: "$overall_status",
analysis_timestamp: "$$NOW"
}
}
])
Moving Averages and Trend Analysis
// Calculate moving averages and trend detection for predictive maintenance
db.iot_sensor_readings.aggregate([
{
$match: {
"device_info.device_id": "MOTOR_PUMP_001",
"timestamp": {
$gte: ISODate("2025-12-20T00:00:00Z"),
$lt: ISODate("2025-12-25T00:00:00Z")
}
}
},
// Sort by timestamp for window functions
{ $sort: { "timestamp": 1 } },
// Calculate moving averages using sliding windows
{
$setWindowFields: {
partitionBy: "$device_info.device_id",
sortBy: { "timestamp": 1 },
output: {
// 5-minute moving average for vibration
vibration_ma_5min: {
$avg: "$vibration_amplitude_mm",
window: {
range: [-300, 0], // 5 minutes in seconds
unit: "second"
}
},
// 15-minute moving average for temperature
temperature_ma_15min: {
$avg: "$temperature_celsius",
window: {
range: [-900, 0], // 15 minutes in seconds
unit: "second"
}
},
// 1-hour moving average for pressure
pressure_ma_1hour: {
$avg: "$pressure_bar",
window: {
range: [-3600, 0], // 1 hour in seconds
unit: "second"
}
},
// Rolling standard deviation for anomaly detection
vibration_std_5min: {
$stdDevSamp: "$vibration_amplitude_mm",
window: {
range: [-300, 0],
unit: "second"
}
},
// Previous reading for trend calculation
prev_vibration: {
$shift: {
output: "$vibration_amplitude_mm",
by: -1
}
},
// Previous moving average for trend direction
prev_vibration_ma: {
$shift: {
output: {
$avg: "$vibration_amplitude_mm",
window: {
range: [-300, 0],
unit: "second"
}
},
by: -60 // 1-minute lag for trend detection
}
}
}
}
},
// Calculate derived trend metrics
{
$addFields: {
// Vibration trend direction
vibration_trend: {
$cond: [
{ $and: [{ $ne: ["$vibration_ma_5min", null] }, { $ne: ["$prev_vibration_ma", null] }] },
{
$switch: {
branches: [
{
case: { $gt: [{ $subtract: ["$vibration_ma_5min", "$prev_vibration_ma"] }, 0.1] },
then: "increasing"
},
{
case: { $lt: [{ $subtract: ["$vibration_ma_5min", "$prev_vibration_ma"] }, -0.1] },
then: "decreasing"
}
],
default: "stable"
}
},
null
]
},
// Anomaly detection using z-score
vibration_anomaly_score: {
$cond: [
{ $and: [{ $gt: ["$vibration_std_5min", 0] }, { $ne: ["$vibration_ma_5min", null] }] },
{
$abs: {
$divide: [
{ $subtract: ["$vibration_amplitude_mm", "$vibration_ma_5min"] },
"$vibration_std_5min"
]
}
},
null
]
},
// Predictive maintenance indicators
maintenance_risk_score: {
$multiply: [
{
$add: [
// High vibration factor
{ $cond: [{ $gt: ["$vibration_ma_5min", 2.5] }, 25, 0] },
// Increasing vibration trend factor
{ $cond: [{ $eq: ["$vibration_trend", "increasing"] }, 15, 0] },
// High temperature factor
{ $cond: [{ $gt: ["$temperature_ma_15min", 75] }, 20, 0] },
// Anomaly factor
{ $cond: [{ $gt: ["$vibration_anomaly_score", 2] }, 30, 0] },
// Pressure variation factor
{ $cond: [{ $gt: [{ $abs: { $subtract: ["$pressure_bar", "$pressure_ma_1hour"] } }, 2] }, 10, 0] }
]
},
0.01 // Scale to 0-100
]
}
}
},
// Filter to significant readings and add maintenance recommendations
{
$match: {
$or: [
{ "vibration_anomaly_score": { $gt: 1.5 } },
{ "maintenance_risk_score": { $gt: 30 } },
{ "vibration_trend": "increasing" }
]
}
},
// Add maintenance recommendations
{
$addFields: {
maintenance_recommendation: {
$switch: {
branches: [
{
case: { $gt: ["$maintenance_risk_score", 70] },
then: {
priority: "immediate",
action: "schedule_emergency_inspection",
description: "High risk indicators detected - immediate inspection required"
}
},
{
case: { $gt: ["$maintenance_risk_score", 50] },
then: {
priority: "high",
action: "schedule_maintenance_window",
description: "Elevated risk indicators - schedule maintenance within 24 hours"
}
},
{
case: { $gt: ["$maintenance_risk_score", 30] },
then: {
priority: "medium",
action: "monitor_closely",
description: "Potential issues detected - increase monitoring frequency"
}
}
],
default: {
priority: "low",
action: "continue_monitoring",
description: "Minor anomalies detected - continue standard monitoring"
}
}
}
}
},
// Project final predictive maintenance report
{
$project: {
timestamp: 1,
device_id: "$device_info.device_id",
current_readings: {
vibration_amplitude: "$vibration_amplitude_mm",
temperature: "$temperature_celsius",
pressure: "$pressure_bar"
},
moving_averages: {
vibration_5min: { $round: ["$vibration_ma_5min", 2] },
temperature_15min: { $round: ["$temperature_ma_15min", 1] },
pressure_1hour: { $round: ["$pressure_ma_1hour", 2] }
},
trend_analysis: {
vibration_trend: "$vibration_trend",
anomaly_score: { $round: ["$vibration_anomaly_score", 2] },
risk_score: { $round: ["$maintenance_risk_score", 0] }
},
maintenance_recommendation: 1,
analysis_timestamp: "$$NOW"
}
},
{ $sort: { "timestamp": -1 } },
{ $limit: 100 }
])
Edge Computing and Local Processing
Edge Analytics with Local Aggregation
// Edge device local aggregation before cloud synchronization
class IoTEdgeProcessor {
constructor(deviceConfig) {
this.deviceId = deviceConfig.deviceId;
this.location = deviceConfig.location;
this.aggregationWindow = deviceConfig.aggregationWindow || 60; // seconds
this.localBuffer = [];
this.thresholds = deviceConfig.thresholds || {};
}
// Process incoming sensor reading at edge
async processSensorReading(reading) {
const enhancedReading = {
...reading,
timestamp: new Date(),
device_info: {
device_id: this.deviceId,
location: this.location,
edge_processed: true
}
};
// Add to local buffer
this.localBuffer.push(enhancedReading);
// Check for immediate alerts
await this.checkAlertConditions(enhancedReading);
// Perform local aggregation if buffer is full
if (this.shouldAggregate()) {
await this.performLocalAggregation();
}
return enhancedReading;
}
shouldAggregate() {
if (this.localBuffer.length === 0) return false;
const oldestReading = this.localBuffer[0];
const currentTime = new Date();
const timeDiff = (currentTime - oldestReading.timestamp) / 1000;
return timeDiff >= this.aggregationWindow || this.localBuffer.length >= 100;
}
async performLocalAggregation() {
if (this.localBuffer.length === 0) return;
const aggregationPeriod = {
start: this.localBuffer[0].timestamp,
end: this.localBuffer[this.localBuffer.length - 1].timestamp
};
// Calculate edge aggregations
const aggregatedData = {
timestamp: aggregationPeriod.start,
device_info: {
device_id: this.deviceId,
location: this.location,
aggregation_type: "edge_local",
reading_count: this.localBuffer.length
},
// Temperature aggregations
temperature_metrics: this.calculateFieldMetrics(this.localBuffer, 'temperature_celsius'),
// Pressure aggregations
pressure_metrics: this.calculateFieldMetrics(this.localBuffer, 'pressure_bar'),
// Humidity aggregations
humidity_metrics: this.calculateFieldMetrics(this.localBuffer, 'humidity_percent'),
// Battery and signal quality
battery_level: this.calculateFieldMetrics(this.localBuffer, 'battery_level'),
signal_strength: this.calculateFieldMetrics(this.localBuffer, 'signal_strength_dbm'),
// Data quality indicators
data_quality: {
total_readings: this.localBuffer.length,
time_span_seconds: (aggregationPeriod.end - aggregationPeriod.start) / 1000,
missing_data_count: this.countMissingData(),
completeness_percent: this.calculateDataCompleteness()
},
// Edge-specific metadata
edge_metadata: {
aggregated_at: new Date(),
local_alerts_triggered: this.localAlertsCount,
network_quality: this.getNetworkQuality(),
processing_latency_ms: Date.now() - aggregationPeriod.end.getTime()
}
};
// Send to cloud database
await this.sendToCloud(aggregatedData);
// Keep recent raw data, clear older entries
this.localBuffer = this.localBuffer.slice(-10); // Keep last 10 readings
this.localAlertsCount = 0;
}
calculateFieldMetrics(buffer, fieldName) {
const values = buffer
.map(reading => reading[fieldName])
.filter(value => value !== null && value !== undefined);
if (values.length === 0) return null;
const sorted = [...values].sort((a, b) => a - b);
return {
average: values.reduce((sum, val) => sum + val, 0) / values.length,
minimum: Math.min(...values),
maximum: Math.max(...values),
median: sorted[Math.floor(sorted.length / 2)],
standard_deviation: this.calculateStandardDeviation(values),
reading_count: values.length,
trend: this.calculateTrend(values)
};
}
calculateStandardDeviation(values) {
const avg = values.reduce((sum, val) => sum + val, 0) / values.length;
const squaredDiffs = values.map(val => Math.pow(val - avg, 2));
const variance = squaredDiffs.reduce((sum, val) => sum + val, 0) / values.length;
return Math.sqrt(variance);
}
calculateTrend(values) {
if (values.length < 3) return "insufficient_data";
const firstHalf = values.slice(0, Math.floor(values.length / 2));
const secondHalf = values.slice(Math.floor(values.length / 2));
const firstAvg = firstHalf.reduce((sum, val) => sum + val, 0) / firstHalf.length;
const secondAvg = secondHalf.reduce((sum, val) => sum + val, 0) / secondHalf.length;
const difference = secondAvg - firstAvg;
const threshold = Math.abs(firstAvg) * 0.05; // 5% threshold
if (Math.abs(difference) < threshold) return "stable";
return difference > 0 ? "increasing" : "decreasing";
}
async checkAlertConditions(reading) {
const alerts = [];
// Temperature alerts
if (reading.temperature_celsius !== undefined) {
if (reading.temperature_celsius > this.thresholds.temperature_critical || 40) {
alerts.push({
type: "temperature_critical",
value: reading.temperature_celsius,
threshold: this.thresholds.temperature_critical,
severity: "critical"
});
} else if (reading.temperature_celsius > this.thresholds.temperature_warning || 35) {
alerts.push({
type: "temperature_warning",
value: reading.temperature_celsius,
threshold: this.thresholds.temperature_warning,
severity: "warning"
});
}
}
// Battery alerts
if (reading.battery_level !== undefined && reading.battery_level < 15) {
alerts.push({
type: "battery_low",
value: reading.battery_level,
threshold: 15,
severity: "warning"
});
}
// Process alerts locally
for (const alert of alerts) {
await this.processEdgeAlert(alert, reading);
this.localAlertsCount = (this.localAlertsCount || 0) + 1;
}
}
async processEdgeAlert(alert, reading) {
const alertData = {
alert_id: `edge_${this.deviceId}_${Date.now()}`,
device_id: this.deviceId,
location: this.location,
alert_type: alert.type,
severity: alert.severity,
triggered_value: alert.value,
threshold_value: alert.threshold,
reading_timestamp: reading.timestamp,
processed_at_edge: new Date(),
raw_reading: reading
};
// Store alert locally for immediate action
await this.storeLocalAlert(alertData);
// If critical, try immediate cloud notification
if (alert.severity === "critical") {
await this.sendCriticalAlertToCloud(alertData);
}
}
async sendToCloud(aggregatedData) {
try {
await db.iot_edge_aggregations.insertOne(aggregatedData);
} catch (error) {
console.error('Failed to send aggregated data to cloud:', error);
// Store locally for later retry
await this.queueForRetry(aggregatedData);
}
}
getNetworkQuality() {
// Simulate network quality assessment
return {
signal_strength: Math.floor(Math.random() * 100),
latency_ms: Math.floor(Math.random() * 200) + 50,
bandwidth_mbps: Math.floor(Math.random() * 100) + 10
};
}
}
SQL Integration with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB time-series operations:
-- QueryLeaf SQL syntax for MongoDB time-series analytics
-- Basic time-series data selection with SQL syntax
SELECT
timestamp,
device_info.device_id,
device_info.location,
temperature_celsius,
pressure_bar,
humidity_percent,
battery_level
FROM iot_sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND device_info.location LIKE 'production_line_%'
AND temperature_celsius IS NOT NULL
ORDER BY timestamp DESC
LIMIT 1000;
-- Time-based aggregation with SQL window functions
SELECT
device_info.device_id,
device_info.location,
DATE_TRUNC('hour', timestamp) AS hour,
-- Temperature analytics
AVG(temperature_celsius) AS avg_temperature,
MIN(temperature_celsius) AS min_temperature,
MAX(temperature_celsius) AS max_temperature,
STDDEV(temperature_celsius) AS temp_std_deviation,
-- Pressure analytics
AVG(pressure_bar) AS avg_pressure,
MIN(pressure_bar) AS min_pressure,
MAX(pressure_bar) AS max_pressure,
-- Data quality metrics
COUNT(*) AS total_readings,
COUNT(temperature_celsius) AS temp_reading_count,
COUNT(pressure_bar) AS pressure_reading_count,
(COUNT(temperature_celsius) * 100.0 / COUNT(*)) AS temp_data_completeness_pct
FROM iot_sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '7 days'
AND device_info.device_type IN ('environmental', 'pressure')
GROUP BY device_info.device_id, device_info.location, DATE_TRUNC('hour', timestamp)
HAVING COUNT(*) >= 10 -- Ensure sufficient data points
ORDER BY device_info.location, device_info.device_id, hour;
-- Moving averages using SQL window functions
SELECT
timestamp,
device_info.device_id,
temperature_celsius,
pressure_bar,
-- Moving averages with time-based windows
AVG(temperature_celsius) OVER (
PARTITION BY device_info.device_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '5 minutes' PRECEDING AND CURRENT ROW
) AS temperature_ma_5min,
AVG(pressure_bar) OVER (
PARTITION BY device_info.device_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '15 minutes' PRECEDING AND CURRENT ROW
) AS pressure_ma_15min,
-- Standard deviation for anomaly detection
STDDEV(temperature_celsius) OVER (
PARTITION BY device_info.device_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '10 minutes' PRECEDING AND CURRENT ROW
) AS temperature_rolling_std,
-- Previous values for trend calculation
LAG(temperature_celsius, 1) OVER (
PARTITION BY device_info.device_id
ORDER BY timestamp
) AS prev_temperature,
-- Z-score calculation for anomaly detection
(temperature_celsius - AVG(temperature_celsius) OVER (
PARTITION BY device_info.device_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
)) / NULLIF(STDDEV(temperature_celsius) OVER (
PARTITION BY device_info.device_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
), 0) AS temperature_z_score
FROM iot_sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 day'
AND device_info.device_id = 'TEMP_SENSOR_001'
ORDER BY timestamp;
-- Anomaly detection with SQL pattern matching
WITH sensor_analytics AS (
SELECT
timestamp,
device_info.device_id,
device_info.location,
temperature_celsius,
pressure_bar,
-- Calculate moving statistics
AVG(temperature_celsius) OVER w AS temp_avg,
STDDEV(temperature_celsius) OVER w AS temp_std,
AVG(pressure_bar) OVER w AS pressure_avg,
STDDEV(pressure_bar) OVER w AS pressure_std
FROM iot_sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '2 days'
WINDOW w AS (
PARTITION BY device_info.device_id
ORDER BY timestamp
RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
)
),
anomaly_detection AS (
SELECT *,
-- Temperature anomaly score (Z-score)
ABS(temperature_celsius - temp_avg) / NULLIF(temp_std, 0) AS temp_anomaly_score,
-- Pressure anomaly score
ABS(pressure_bar - pressure_avg) / NULLIF(pressure_std, 0) AS pressure_anomaly_score,
-- Classify anomalies
CASE
WHEN ABS(temperature_celsius - temp_avg) / NULLIF(temp_std, 0) > 3 THEN 'severe'
WHEN ABS(temperature_celsius - temp_avg) / NULLIF(temp_std, 0) > 2 THEN 'moderate'
WHEN ABS(temperature_celsius - temp_avg) / NULLIF(temp_std, 0) > 1.5 THEN 'mild'
ELSE 'normal'
END AS temperature_anomaly_level,
CASE
WHEN ABS(pressure_bar - pressure_avg) / NULLIF(pressure_std, 0) > 3 THEN 'severe'
WHEN ABS(pressure_bar - pressure_avg) / NULLIF(pressure_std, 0) > 2 THEN 'moderate'
WHEN ABS(pressure_bar - pressure_avg) / NULLIF(pressure_std, 0) > 1.5 THEN 'mild'
ELSE 'normal'
END AS pressure_anomaly_level
FROM sensor_analytics
WHERE temp_std > 0 AND pressure_std > 0
)
SELECT
timestamp,
device_id,
location,
temperature_celsius,
pressure_bar,
temp_anomaly_score,
pressure_anomaly_score,
temperature_anomaly_level,
pressure_anomaly_level,
-- Combined risk assessment
CASE
WHEN temperature_anomaly_level IN ('severe', 'moderate')
OR pressure_anomaly_level IN ('severe', 'moderate') THEN 'high_risk'
WHEN temperature_anomaly_level = 'mild'
OR pressure_anomaly_level = 'mild' THEN 'medium_risk'
ELSE 'low_risk'
END AS overall_risk_level,
-- Maintenance recommendation
CASE
WHEN temperature_anomaly_level = 'severe' OR pressure_anomaly_level = 'severe'
THEN 'immediate_inspection_required'
WHEN temperature_anomaly_level = 'moderate' OR pressure_anomaly_level = 'moderate'
THEN 'schedule_maintenance_check'
WHEN temperature_anomaly_level = 'mild' OR pressure_anomaly_level = 'mild'
THEN 'monitor_closely'
ELSE 'continue_normal_monitoring'
END AS maintenance_action
FROM anomaly_detection
WHERE temperature_anomaly_level != 'normal' OR pressure_anomaly_level != 'normal'
ORDER BY timestamp DESC, temp_anomaly_score DESC;
-- Predictive maintenance analytics
WITH equipment_health_trends AS (
SELECT
device_info.device_id,
device_info.location,
DATE_TRUNC('day', timestamp) AS date,
-- Daily health metrics
AVG(temperature_celsius) AS avg_daily_temp,
MAX(temperature_celsius) AS max_daily_temp,
STDDEV(temperature_celsius) AS daily_temp_variation,
AVG(pressure_bar) AS avg_daily_pressure,
MAX(pressure_bar) AS max_daily_pressure,
STDDEV(pressure_bar) AS daily_pressure_variation,
AVG(battery_level) AS avg_daily_battery,
MIN(battery_level) AS min_daily_battery,
COUNT(*) AS daily_reading_count,
COUNT(CASE WHEN temperature_celsius > 35 THEN 1 END) AS high_temp_incidents,
COUNT(CASE WHEN pressure_bar > 20 THEN 1 END) AS high_pressure_incidents
FROM iot_sensor_readings
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '30 days'
GROUP BY device_info.device_id, device_info.location, DATE_TRUNC('day', timestamp)
),
health_score_calculation AS (
SELECT *,
-- Temperature health factor (0-100)
GREATEST(0, 100 - (max_daily_temp - 20) * 5) AS temp_health_factor,
-- Pressure health factor (0-100)
GREATEST(0, 100 - (max_daily_pressure - 15) * 10) AS pressure_health_factor,
-- Battery health factor (0-100)
avg_daily_battery AS battery_health_factor,
-- Data quality factor (0-100)
LEAST(100, daily_reading_count / 1440.0 * 100) AS data_quality_factor, -- Assuming 1 reading per minute ideal
-- Stability factor (0-100) - lower variation is better
GREATEST(0, 100 - daily_temp_variation * 10) AS temp_stability_factor,
GREATEST(0, 100 - daily_pressure_variation * 20) AS pressure_stability_factor
FROM equipment_health_trends
),
predictive_scoring AS (
SELECT *,
-- Overall equipment health score
(temp_health_factor * 0.25 +
pressure_health_factor * 0.25 +
battery_health_factor * 0.20 +
data_quality_factor * 0.10 +
temp_stability_factor * 0.10 +
pressure_stability_factor * 0.10) AS daily_health_score,
-- Trend analysis using moving average
AVG(temp_health_factor) OVER (
PARTITION BY device_id
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS temp_health_trend_7day,
AVG(pressure_health_factor) OVER (
PARTITION BY device_id
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS pressure_health_trend_7day
FROM health_score_calculation
)
SELECT
device_id,
location,
date,
daily_health_score,
temp_health_factor,
pressure_health_factor,
battery_health_factor,
-- Health trend indicators
temp_health_trend_7day,
pressure_health_trend_7day,
-- Predictive maintenance classification
CASE
WHEN daily_health_score < 30 THEN 'critical_maintenance_needed'
WHEN daily_health_score < 50 THEN 'maintenance_recommended'
WHEN daily_health_score < 70 THEN 'monitor_closely'
ELSE 'healthy'
END AS maintenance_status,
-- Failure risk prediction
CASE
WHEN daily_health_score < 40 AND temp_health_trend_7day < temp_health_factor THEN 'high_failure_risk'
WHEN daily_health_score < 60 AND (temp_health_trend_7day < temp_health_factor OR pressure_health_trend_7day < pressure_health_factor) THEN 'medium_failure_risk'
ELSE 'low_failure_risk'
END AS failure_risk_level,
-- Recommended actions
CASE
WHEN daily_health_score < 30 THEN 'schedule_immediate_inspection'
WHEN daily_health_score < 50 AND temp_health_trend_7day < 50 THEN 'schedule_preventive_maintenance'
WHEN daily_health_score < 70 THEN 'increase_monitoring_frequency'
ELSE 'continue_standard_monitoring'
END AS recommended_action
FROM predictive_scoring
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY device_id, date DESC;
-- QueryLeaf automatically translates these SQL operations to optimized MongoDB time-series aggregations:
-- 1. DATE_TRUNC functions become MongoDB date aggregation operators
-- 2. Window functions translate to MongoDB $setWindowFields operations
-- 3. Statistical functions map to MongoDB aggregation operators
-- 4. Complex CASE statements become MongoDB $switch expressions
-- 5. Time-based WHERE clauses leverage time-series index optimizations
-- 6. Multi-table operations use MongoDB $lookup for cross-collection analytics
Best Practices for Production IoT Systems
Performance Optimization for High-Volume IoT Data
- Collection Design: Use appropriate time-series collection settings for your data granularity and retention requirements
- Index Strategy: Create compound indexes on metaField + timeField for optimal query performance
- Bucketing Configuration: Set granularity and bucket parameters based on your query patterns
- TTL Management: Implement data lifecycle policies with expireAfterSeconds for automatic data expiration
- Batch Processing: Use bulk insertions and optimize write operations for high-throughput scenarios
Data Quality and Monitoring
- Validation: Implement schema validation for IoT data structure consistency
- Anomaly Detection: Build real-time anomaly detection using statistical analysis and machine learning
- Data Completeness: Monitor and alert on missing data or device connectivity issues
- Performance Metrics: Track insertion rates, query performance, and storage utilization
- Alert Systems: Implement multi-level alerting for device health, data quality, and system performance
Conclusion
MongoDB time-series collections provide specialized capabilities for IoT data processing that combine high-performance storage optimization with flexible analytics capabilities. The integration with QueryLeaf enables familiar SQL-style analytics while leveraging MongoDB's optimized time-series storage and indexing strategies.
Key advantages of MongoDB time-series collections for IoT include:
- Storage Efficiency: Automatic compression and optimized storage layout for time-ordered data
- Query Performance: Specialized indexing and query optimization for temporal data patterns
- Real-Time Analytics: Built-in support for streaming analytics and real-time aggregations
- Edge Integration: Seamless synchronization between edge devices and cloud databases
- SQL Accessibility: Familiar time-series analytics through QueryLeaf's SQL interface
- Scalable Architecture: Horizontal scaling capabilities for massive IoT data volumes
Whether you're building smart manufacturing systems, environmental monitoring networks, or industrial IoT platforms, MongoDB's time-series collections with SQL-familiar query patterns provide the foundation for building scalable, high-performance IoT analytics solutions.
QueryLeaf Integration: QueryLeaf seamlessly translates SQL time-series operations into optimized MongoDB time-series queries. Advanced analytics like window functions, moving averages, and anomaly detection are accessible through familiar SQL syntax while leveraging MongoDB's specialized time-series storage optimizations, making sophisticated IoT analytics approachable for SQL-oriented development teams.
The combination of MongoDB's time-series optimizations with SQL-familiar analytics patterns creates an ideal platform for IoT applications that require both high-performance data ingestion and sophisticated analytical capabilities at scale.