MongoDB Data Pipeline Management and Stream Processing: Advanced Real-Time Data Processing and ETL Pipelines for Modern Applications
Modern data-driven applications require sophisticated data processing pipelines that can handle real-time data ingestion, complex transformations, and reliable data delivery across multiple systems and formats. Traditional batch processing approaches struggle with latency requirements, data volume scalability, and the complexity of managing distributed processing workflows. Effective data pipeline management demands real-time stream processing, incremental data transformations, and intelligent error handling mechanisms.
MongoDB's comprehensive data pipeline capabilities provide advanced stream processing features through Change Streams, Aggregation Framework, and native pipeline orchestration that enable sophisticated real-time data processing workflows. Unlike traditional ETL systems that require separate infrastructure components and complex coordination mechanisms, MongoDB integrates stream processing directly into the database with optimized pipeline execution, automatic scaling, and built-in fault tolerance.
The Traditional Data Pipeline Challenge
Conventional approaches to data pipeline management in relational systems face significant limitations in real-time processing:
-- Traditional PostgreSQL data pipeline management - complex batch processing with limited real-time capabilities
-- Basic ETL tracking table with limited functionality
CREATE TABLE etl_job_runs (
run_id SERIAL PRIMARY KEY,
job_name VARCHAR(255) NOT NULL,
job_type VARCHAR(100) NOT NULL,
source_system VARCHAR(100),
target_system VARCHAR(100),
-- Job execution tracking
start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
status VARCHAR(50) DEFAULT 'running',
-- Basic metrics (very limited)
records_processed INTEGER DEFAULT 0,
records_inserted INTEGER DEFAULT 0,
records_updated INTEGER DEFAULT 0,
records_deleted INTEGER DEFAULT 0,
records_failed INTEGER DEFAULT 0,
-- Error tracking (basic)
error_message TEXT,
error_count INTEGER DEFAULT 0,
-- Resource usage (manual tracking)
cpu_usage_percent DECIMAL(5,2),
memory_usage_mb INTEGER,
disk_io_mb INTEGER,
-- Basic configuration
batch_size INTEGER DEFAULT 1000,
parallel_workers INTEGER DEFAULT 1,
retry_attempts INTEGER DEFAULT 3
);
-- Data transformation rules (static and inflexible)
CREATE TABLE transformation_rules (
rule_id SERIAL PRIMARY KEY,
rule_name VARCHAR(255) NOT NULL,
source_table VARCHAR(255),
target_table VARCHAR(255),
transformation_type VARCHAR(100),
-- Transformation logic (limited SQL expressions)
source_columns TEXT[],
target_columns TEXT[],
transformation_sql TEXT,
-- Basic validation rules
validation_rules TEXT[],
data_quality_checks TEXT[],
-- Rule metadata
active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by VARCHAR(100)
);
-- Simple batch processing function (no real-time capabilities)
CREATE OR REPLACE FUNCTION execute_batch_etl(
job_name_param VARCHAR(255),
batch_size_param INTEGER DEFAULT 1000
) RETURNS TABLE (
run_id INTEGER,
records_processed INTEGER,
execution_time_seconds INTEGER,
status VARCHAR(50),
error_message TEXT
) AS $$
DECLARE
current_run_id INTEGER;
processing_start TIMESTAMP;
processing_end TIMESTAMP;
batch_count INTEGER := 0;
total_records INTEGER := 0;
error_msg TEXT := '';
processing_status VARCHAR(50) := 'completed';
BEGIN
-- Start new job run
INSERT INTO etl_job_runs (job_name, job_type, status)
VALUES (job_name_param, 'batch_etl', 'running')
RETURNING etl_job_runs.run_id INTO current_run_id;
processing_start := clock_timestamp();
BEGIN
-- Very basic batch processing loop
LOOP
-- Simulate batch processing (would be actual data transformation in reality)
PERFORM pg_sleep(0.1); -- Simulate processing time
batch_count := batch_count + 1;
total_records := total_records + batch_size_param;
-- Simple exit condition (no real data source integration)
EXIT WHEN batch_count >= 10; -- Process 10 batches maximum
END LOOP;
EXCEPTION WHEN OTHERS THEN
error_msg := SQLERRM;
processing_status := 'failed';
END;
processing_end := clock_timestamp();
-- Update job run status
UPDATE etl_job_runs
SET
end_time = processing_end,
status = processing_status,
records_processed = total_records,
records_inserted = total_records,
error_message = error_msg,
error_count = CASE WHEN processing_status = 'failed' THEN 1 ELSE 0 END
WHERE etl_job_runs.run_id = current_run_id;
-- Return execution results
RETURN QUERY SELECT
current_run_id,
total_records,
EXTRACT(SECONDS FROM (processing_end - processing_start))::INTEGER,
processing_status,
error_msg;
END;
$$ LANGUAGE plpgsql;
-- Execute batch ETL job (very basic functionality)
SELECT * FROM execute_batch_etl('customer_data_sync', 500);
-- Data quality monitoring (limited real-time capabilities)
WITH data_quality_metrics AS (
SELECT
ejr.job_name,
ejr.run_id,
ejr.start_time,
ejr.end_time,
ejr.records_processed,
ejr.records_failed,
-- Basic quality calculations
CASE
WHEN ejr.records_processed > 0 THEN
ROUND((ejr.records_processed - ejr.records_failed)::DECIMAL / ejr.records_processed * 100, 2)
ELSE 0
END as success_rate_percent,
-- Processing rate
CASE
WHEN EXTRACT(SECONDS FROM (ejr.end_time - ejr.start_time)) > 0 THEN
ROUND(ejr.records_processed::DECIMAL / EXTRACT(SECONDS FROM (ejr.end_time - ejr.start_time)), 2)
ELSE 0
END as records_per_second,
-- Basic status assessment
CASE ejr.status
WHEN 'completed' THEN 'success'
WHEN 'failed' THEN 'failure'
ELSE 'unknown'
END as quality_status
FROM etl_job_runs ejr
WHERE ejr.start_time >= CURRENT_DATE - INTERVAL '7 days'
),
quality_summary AS (
SELECT
job_name,
COUNT(*) as total_runs,
COUNT(*) FILTER (WHERE quality_status = 'success') as successful_runs,
COUNT(*) FILTER (WHERE quality_status = 'failure') as failed_runs,
-- Quality metrics
AVG(success_rate_percent) as avg_success_rate,
AVG(records_per_second) as avg_processing_rate,
SUM(records_processed) as total_records_processed,
SUM(records_failed) as total_records_failed,
-- Time-based analysis
AVG(EXTRACT(SECONDS FROM (end_time - start_time))) as avg_execution_seconds,
MAX(EXTRACT(SECONDS FROM (end_time - start_time))) as max_execution_seconds,
MIN(start_time) as first_run,
MAX(end_time) as last_run
FROM data_quality_metrics
GROUP BY job_name
)
SELECT
job_name,
total_runs,
successful_runs,
failed_runs,
-- Success rates
CASE
WHEN total_runs > 0 THEN
ROUND((successful_runs::DECIMAL / total_runs) * 100, 1)
ELSE 0
END as job_success_rate_percent,
-- Performance metrics
ROUND(avg_success_rate, 1) as avg_record_success_rate_percent,
ROUND(avg_processing_rate, 1) as avg_records_per_second,
total_records_processed,
total_records_failed,
-- Timing analysis
ROUND(avg_execution_seconds, 1) as avg_duration_seconds,
ROUND(max_execution_seconds, 1) as max_duration_seconds,
-- Data quality assessment
CASE
WHEN failed_runs = 0 AND avg_success_rate > 98 THEN 'excellent'
WHEN failed_runs <= total_runs * 0.05 AND avg_success_rate > 95 THEN 'good'
WHEN failed_runs <= total_runs * 0.1 AND avg_success_rate > 90 THEN 'acceptable'
ELSE 'poor'
END as data_quality_rating,
-- Recommendations
CASE
WHEN failed_runs > total_runs * 0.1 THEN 'investigate_failures'
WHEN avg_processing_rate < 100 THEN 'optimize_performance'
WHEN max_execution_seconds > avg_execution_seconds * 3 THEN 'check_consistency'
ELSE 'monitor_continued'
END as recommendation
FROM quality_summary
ORDER BY total_records_processed DESC;
-- Real-time data change tracking (very limited functionality)
CREATE TABLE data_changes (
change_id SERIAL PRIMARY KEY,
table_name VARCHAR(255) NOT NULL,
operation_type VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
record_id VARCHAR(100),
-- Change tracking (basic)
old_values JSONB,
new_values JSONB,
changed_columns TEXT[],
-- Metadata
change_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id VARCHAR(100),
application_name VARCHAR(100),
-- Processing status
processed BOOLEAN DEFAULT false,
processing_attempts INTEGER DEFAULT 0,
last_processing_attempt TIMESTAMP,
processing_error TEXT
);
-- Basic trigger function for change tracking
CREATE OR REPLACE FUNCTION track_data_changes()
RETURNS TRIGGER AS $$
BEGIN
-- Insert change record (very basic functionality)
INSERT INTO data_changes (
table_name,
operation_type,
record_id,
old_values,
new_values,
user_id
)
VALUES (
TG_TABLE_NAME,
TG_OP,
CASE
WHEN TG_OP = 'DELETE' THEN OLD.id::TEXT
ELSE NEW.id::TEXT
END,
CASE
WHEN TG_OP = 'DELETE' THEN to_jsonb(OLD)
WHEN TG_OP = 'UPDATE' THEN to_jsonb(OLD)
ELSE NULL
END,
CASE
WHEN TG_OP = 'DELETE' THEN NULL
ELSE to_jsonb(NEW)
END,
current_user
);
-- Return appropriate record
CASE TG_OP
WHEN 'DELETE' THEN RETURN OLD;
ELSE RETURN NEW;
END CASE;
EXCEPTION WHEN OTHERS THEN
-- Basic error handling (logs errors but doesn't stop operations)
RAISE WARNING 'Change tracking failed: %', SQLERRM;
CASE TG_OP
WHEN 'DELETE' THEN RETURN OLD;
ELSE RETURN NEW;
END CASE;
END;
$$ LANGUAGE plpgsql;
-- Process pending changes (batch processing only)
WITH pending_changes AS (
SELECT
change_id,
table_name,
operation_type,
new_values,
old_values,
change_timestamp,
-- Group changes by time windows for batch processing
DATE_TRUNC('minute', change_timestamp) as processing_window
FROM data_changes
WHERE processed = false
AND processing_attempts < 3
ORDER BY change_timestamp
LIMIT 1000
),
change_summary AS (
SELECT
processing_window,
table_name,
operation_type,
COUNT(*) as change_count,
MIN(change_timestamp) as first_change,
MAX(change_timestamp) as last_change,
-- Basic aggregations (very limited analysis)
COUNT(*) FILTER (WHERE operation_type = 'INSERT') as inserts,
COUNT(*) FILTER (WHERE operation_type = 'UPDATE') as updates,
COUNT(*) FILTER (WHERE operation_type = 'DELETE') as deletes
FROM pending_changes
GROUP BY processing_window, table_name, operation_type
)
SELECT
processing_window,
table_name,
operation_type,
change_count,
first_change,
last_change,
-- Change rate analysis
CASE
WHEN EXTRACT(SECONDS FROM (last_change - first_change)) > 0 THEN
ROUND(change_count::DECIMAL / EXTRACT(SECONDS FROM (last_change - first_change)), 2)
ELSE change_count
END as changes_per_second,
-- Processing recommendations (very basic)
CASE
WHEN change_count > 1000 THEN 'high_volume_batch'
WHEN change_count > 100 THEN 'medium_batch'
ELSE 'small_batch'
END as processing_strategy,
-- Simple priority assessment
CASE table_name
WHEN 'users' THEN 'high'
WHEN 'orders' THEN 'high'
WHEN 'products' THEN 'medium'
ELSE 'low'
END as processing_priority
FROM change_summary
ORDER BY processing_window DESC, change_count DESC;
-- Problems with traditional data pipeline approaches:
-- 1. No real-time processing - only batch operations with delays
-- 2. Limited transformation capabilities - basic SQL only
-- 3. Poor scalability - single-threaded processing
-- 4. Manual error handling and recovery
-- 5. No automatic schema evolution or data type handling
-- 6. Limited monitoring and observability
-- 7. Complex integration with external systems
-- 8. No built-in data quality validation
-- 9. Difficult to maintain and debug complex pipelines
-- 10. No support for stream processing or event-driven architectures
MongoDB provides comprehensive data pipeline management with advanced stream processing capabilities:
// MongoDB Advanced Data Pipeline Management and Stream Processing
const { MongoClient, ChangeStream } = require('mongodb');
const { EventEmitter } = require('events');
// Comprehensive MongoDB Data Pipeline Manager
class AdvancedDataPipelineManager extends EventEmitter {
constructor(mongoUri, pipelineConfig = {}) {
super();
this.mongoUri = mongoUri;
this.client = null;
this.db = null;
// Advanced pipeline configuration
this.config = {
// Processing configuration
enableRealTimeProcessing: pipelineConfig.enableRealTimeProcessing !== false,
enableBatchProcessing: pipelineConfig.enableBatchProcessing !== false,
enableStreamProcessing: pipelineConfig.enableStreamProcessing !== false,
// Performance settings
maxConcurrentPipelines: pipelineConfig.maxConcurrentPipelines || 10,
batchSize: pipelineConfig.batchSize || 1000,
maxRetries: pipelineConfig.maxRetries || 3,
retryDelay: pipelineConfig.retryDelay || 1000,
// Change stream configuration
enableChangeStreams: pipelineConfig.enableChangeStreams !== false,
changeStreamOptions: pipelineConfig.changeStreamOptions || {
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
},
// Data quality and validation
enableDataValidation: pipelineConfig.enableDataValidation !== false,
enableSchemaEvolution: pipelineConfig.enableSchemaEvolution || false,
enableDataLineage: pipelineConfig.enableDataLineage || false,
// Monitoring and observability
enableMetrics: pipelineConfig.enableMetrics !== false,
enablePipelineMonitoring: pipelineConfig.enablePipelineMonitoring !== false,
enableErrorTracking: pipelineConfig.enableErrorTracking !== false,
// Advanced features
enableIncrementalProcessing: pipelineConfig.enableIncrementalProcessing || false,
enableDataDeduplication: pipelineConfig.enableDataDeduplication || false,
enableDataEnrichment: pipelineConfig.enableDataEnrichment || false
};
// Pipeline registry and state management
this.pipelines = new Map();
this.changeStreams = new Map();
this.pipelineMetrics = new Map();
this.activeProcessing = new Map();
// Error tracking and recovery
this.errorHistory = [];
this.retryQueues = new Map();
this.initializeDataPipelines();
}
async initializeDataPipelines() {
console.log('Initializing advanced data pipeline management system...');
try {
// Connect to MongoDB
this.client = new MongoClient(this.mongoUri);
await this.client.connect();
this.db = this.client.db();
// Setup pipeline infrastructure
await this.setupPipelineInfrastructure();
// Initialize change streams if enabled
if (this.config.enableChangeStreams) {
await this.setupChangeStreams();
}
// Start pipeline monitoring
if (this.config.enablePipelineMonitoring) {
await this.startPipelineMonitoring();
}
console.log('Advanced data pipeline system initialized successfully');
} catch (error) {
console.error('Error initializing data pipeline system:', error);
throw error;
}
}
async setupPipelineInfrastructure() {
console.log('Setting up pipeline infrastructure...');
try {
// Create collections for pipeline management
const collections = {
pipelineDefinitions: this.db.collection('pipeline_definitions'),
pipelineRuns: this.db.collection('pipeline_runs'),
pipelineMetrics: this.db.collection('pipeline_metrics'),
dataLineage: this.db.collection('data_lineage'),
pipelineErrors: this.db.collection('pipeline_errors'),
transformationRules: this.db.collection('transformation_rules')
};
// Create indexes for optimal performance
await collections.pipelineRuns.createIndex(
{ pipelineId: 1, startTime: -1 },
{ background: true }
);
await collections.pipelineMetrics.createIndex(
{ pipelineId: 1, timestamp: -1 },
{ background: true }
);
await collections.dataLineage.createIndex(
{ sourceCollection: 1, targetCollection: 1, timestamp: -1 },
{ background: true }
);
this.collections = collections;
} catch (error) {
console.error('Error setting up pipeline infrastructure:', error);
throw error;
}
}
async registerDataPipeline(pipelineDefinition) {
console.log(`Registering data pipeline: ${pipelineDefinition.name}`);
try {
// Validate pipeline definition
const validatedDefinition = await this.validatePipelineDefinition(pipelineDefinition);
// Enhanced pipeline definition with metadata
const enhancedDefinition = {
...validatedDefinition,
pipelineId: this.generatePipelineId(validatedDefinition.name),
// Pipeline metadata
registeredAt: new Date(),
version: pipelineDefinition.version || '1.0.0',
status: 'registered',
// Processing configuration
processingMode: pipelineDefinition.processingMode || 'stream', // stream, batch, hybrid
triggerType: pipelineDefinition.triggerType || 'change_stream', // change_stream, schedule, manual
// Data transformation pipeline
transformationStages: pipelineDefinition.transformationStages || [],
// Data sources and targets
dataSources: pipelineDefinition.dataSources || [],
dataTargets: pipelineDefinition.dataTargets || [],
// Quality and validation rules
dataQualityRules: pipelineDefinition.dataQualityRules || [],
schemaValidationRules: pipelineDefinition.schemaValidationRules || [],
// Performance configuration
performance: {
batchSize: pipelineDefinition.batchSize || this.config.batchSize,
maxConcurrency: pipelineDefinition.maxConcurrency || 5,
timeoutMs: pipelineDefinition.timeoutMs || 300000,
// Resource limits
maxMemoryMB: pipelineDefinition.maxMemoryMB || 1024,
maxCpuPercent: pipelineDefinition.maxCpuPercent || 80
},
// Error handling configuration
errorHandling: {
retryStrategy: pipelineDefinition.retryStrategy || 'exponential_backoff',
maxRetries: pipelineDefinition.maxRetries || this.config.maxRetries,
deadLetterQueue: pipelineDefinition.deadLetterQueue || true,
errorNotifications: pipelineDefinition.errorNotifications || []
},
// Monitoring configuration
monitoring: {
enableMetrics: pipelineDefinition.enableMetrics !== false,
metricsInterval: pipelineDefinition.metricsInterval || 60000,
alertThresholds: pipelineDefinition.alertThresholds || {}
}
};
// Store pipeline definition
await this.collections.pipelineDefinitions.replaceOne(
{ pipelineId: enhancedDefinition.pipelineId },
enhancedDefinition,
{ upsert: true }
);
// Register pipeline in memory
this.pipelines.set(enhancedDefinition.pipelineId, {
definition: enhancedDefinition,
status: 'registered',
lastRun: null,
statistics: {
totalRuns: 0,
successfulRuns: 0,
failedRuns: 0,
totalRecordsProcessed: 0,
averageProcessingTime: 0
}
});
console.log(`Pipeline '${enhancedDefinition.name}' registered successfully with ID: ${enhancedDefinition.pipelineId}`);
// Start pipeline if configured for automatic startup
if (enhancedDefinition.autoStart) {
await this.startPipeline(enhancedDefinition.pipelineId);
}
return {
success: true,
pipelineId: enhancedDefinition.pipelineId,
definition: enhancedDefinition
};
} catch (error) {
console.error(`Error registering pipeline '${pipelineDefinition.name}':`, error);
return {
success: false,
error: error.message,
pipelineDefinition: pipelineDefinition
};
}
}
async startPipeline(pipelineId) {
console.log(`Starting data pipeline: ${pipelineId}`);
try {
const pipeline = this.pipelines.get(pipelineId);
if (!pipeline) {
throw new Error(`Pipeline not found: ${pipelineId}`);
}
if (pipeline.status === 'running') {
console.log(`Pipeline ${pipelineId} is already running`);
return { success: true, status: 'already_running' };
}
const definition = pipeline.definition;
// Create pipeline run record
const runRecord = {
runId: this.generateRunId(),
pipelineId: pipelineId,
pipelineName: definition.name,
startTime: new Date(),
status: 'running',
// Processing metrics
recordsProcessed: 0,
recordsSuccessful: 0,
recordsFailed: 0,
// Performance tracking
processingTimeMs: 0,
throughputRecordsPerSecond: 0,
// Resource usage
memoryUsageMB: 0,
cpuUsagePercent: 0,
// Error tracking
errors: [],
retryAttempts: 0
};
await this.collections.pipelineRuns.insertOne(runRecord);
// Start processing based on trigger type
switch (definition.triggerType) {
case 'change_stream':
await this.startChangeStreamPipeline(pipelineId, definition, runRecord);
break;
case 'schedule':
await this.startScheduledPipeline(pipelineId, definition, runRecord);
break;
case 'batch':
await this.startBatchPipeline(pipelineId, definition, runRecord);
break;
default:
throw new Error(`Unsupported trigger type: ${definition.triggerType}`);
}
// Update pipeline status
pipeline.status = 'running';
pipeline.lastRun = runRecord;
this.emit('pipelineStarted', {
pipelineId: pipelineId,
runId: runRecord.runId,
startTime: runRecord.startTime
});
return {
success: true,
pipelineId: pipelineId,
runId: runRecord.runId,
status: 'running'
};
} catch (error) {
console.error(`Error starting pipeline ${pipelineId}:`, error);
// Update pipeline status to error
const pipeline = this.pipelines.get(pipelineId);
if (pipeline) {
pipeline.status = 'error';
}
return {
success: false,
pipelineId: pipelineId,
error: error.message
};
}
}
async startChangeStreamPipeline(pipelineId, definition, runRecord) {
console.log(`Starting change stream pipeline: ${pipelineId}`);
try {
const dataSources = definition.dataSources;
for (const dataSource of dataSources) {
const collection = this.db.collection(dataSource.collection);
// Configure change stream options
const changeStreamOptions = {
...this.config.changeStreamOptions,
...dataSource.changeStreamOptions,
// Add pipeline-specific filters
...(dataSource.filter && { matchStage: { $match: dataSource.filter } })
};
// Create change stream
const changeStream = collection.watch([], changeStreamOptions);
// Store change stream reference
this.changeStreams.set(`${pipelineId}_${dataSource.collection}`, changeStream);
// Setup change stream event handlers
changeStream.on('change', async (changeEvent) => {
await this.processChangeEvent(pipelineId, definition, runRecord, changeEvent);
});
changeStream.on('error', async (error) => {
console.error(`Change stream error for pipeline ${pipelineId}:`, error);
await this.handlePipelineError(pipelineId, runRecord, error);
});
changeStream.on('close', () => {
console.log(`Change stream closed for pipeline ${pipelineId}`);
this.emit('pipelineStreamClosed', { pipelineId, collection: dataSource.collection });
});
}
} catch (error) {
console.error(`Error starting change stream pipeline ${pipelineId}:`, error);
throw error;
}
}
async processChangeEvent(pipelineId, definition, runRecord, changeEvent) {
try {
// Track processing start
const processingStart = Date.now();
// Apply transformation stages
let processedData = changeEvent;
for (const transformationStage of definition.transformationStages) {
processedData = await this.applyTransformation(
processedData,
transformationStage,
definition
);
}
// Apply data quality validation
if (this.config.enableDataValidation) {
const validationResult = await this.validateData(
processedData,
definition.dataQualityRules
);
if (!validationResult.isValid) {
await this.handleValidationError(pipelineId, runRecord, processedData, validationResult);
return;
}
}
// Write to target destinations
const writeResults = await this.writeToTargets(
processedData,
definition.dataTargets,
definition
);
// Update run metrics
const processingTime = Date.now() - processingStart;
await this.updateRunMetrics(runRecord, {
recordsProcessed: 1,
recordsSuccessful: writeResults.successCount,
recordsFailed: writeResults.failureCount,
processingTimeMs: processingTime
});
// Record data lineage if enabled
if (this.config.enableDataLineage) {
await this.recordDataLineage(pipelineId, changeEvent, processedData, definition);
}
this.emit('recordProcessed', {
pipelineId: pipelineId,
runId: runRecord.runId,
changeEvent: changeEvent,
processedData: processedData,
processingTime: processingTime
});
} catch (error) {
console.error(`Error processing change event for pipeline ${pipelineId}:`, error);
await this.handleProcessingError(pipelineId, runRecord, changeEvent, error);
}
}
async applyTransformation(data, transformationStage, pipelineDefinition) {
console.log(`Applying transformation: ${transformationStage.type}`);
try {
switch (transformationStage.type) {
case 'aggregation':
return await this.applyAggregationTransformation(data, transformationStage);
case 'field_mapping':
return await this.applyFieldMapping(data, transformationStage);
case 'data_enrichment':
return await this.applyDataEnrichment(data, transformationStage, pipelineDefinition);
case 'filtering':
return await this.applyFiltering(data, transformationStage);
case 'normalization':
return await this.applyNormalization(data, transformationStage);
case 'custom_function':
return await this.applyCustomFunction(data, transformationStage);
default:
console.warn(`Unknown transformation type: ${transformationStage.type}`);
return data;
}
} catch (error) {
console.error(`Error applying transformation ${transformationStage.type}:`, error);
throw error;
}
}
async applyAggregationTransformation(data, transformationStage) {
// Apply MongoDB aggregation pipeline to transform data
const pipeline = transformationStage.aggregationPipeline;
if (!Array.isArray(pipeline) || pipeline.length === 0) {
return data;
}
try {
// Execute aggregation on source data
// This would work with the actual data structure in a real implementation
let transformedData = data;
// Simulate aggregation operations
for (const stage of pipeline) {
if (stage.$project) {
transformedData = this.projectFields(transformedData, stage.$project);
} else if (stage.$match) {
transformedData = this.matchFilter(transformedData, stage.$match);
} else if (stage.$addFields) {
transformedData = this.addFields(transformedData, stage.$addFields);
}
// Add more aggregation operators as needed
}
return transformedData;
} catch (error) {
console.error('Error in aggregation transformation:', error);
throw error;
}
}
async applyFieldMapping(data, transformationStage) {
// Apply field mapping transformation
const mappings = transformationStage.fieldMappings;
if (!mappings || Object.keys(mappings).length === 0) {
return data;
}
try {
let mappedData = { ...data };
// Apply field mappings
Object.entries(mappings).forEach(([targetField, sourceField]) => {
const sourceValue = this.getNestedValue(data, sourceField);
this.setNestedValue(mappedData, targetField, sourceValue);
});
return mappedData;
} catch (error) {
console.error('Error in field mapping transformation:', error);
throw error;
}
}
async applyDataEnrichment(data, transformationStage, pipelineDefinition) {
// Apply data enrichment from external sources
const enrichmentConfig = transformationStage.enrichmentConfig;
try {
let enrichedData = { ...data };
for (const enrichment of enrichmentConfig.enrichments) {
switch (enrichment.type) {
case 'lookup':
enrichedData = await this.applyLookupEnrichment(enrichedData, enrichment);
break;
case 'calculation':
enrichedData = await this.applyCalculationEnrichment(enrichedData, enrichment);
break;
case 'external_api':
enrichedData = await this.applyExternalApiEnrichment(enrichedData, enrichment);
break;
}
}
return enrichedData;
} catch (error) {
console.error('Error in data enrichment transformation:', error);
throw error;
}
}
async writeToTargets(processedData, dataTargets, pipelineDefinition) {
console.log('Writing processed data to targets...');
const writeResults = {
successCount: 0,
failureCount: 0,
results: []
};
try {
const writePromises = dataTargets.map(async (target) => {
try {
const result = await this.writeToTarget(processedData, target, pipelineDefinition);
writeResults.successCount++;
writeResults.results.push({ target: target.name, success: true, result });
return result;
} catch (error) {
console.error(`Error writing to target ${target.name}:`, error);
writeResults.failureCount++;
writeResults.results.push({
target: target.name,
success: false,
error: error.message
});
throw error;
}
});
await Promise.allSettled(writePromises);
return writeResults;
} catch (error) {
console.error('Error writing to targets:', error);
throw error;
}
}
async writeToTarget(processedData, target, pipelineDefinition) {
console.log(`Writing to target: ${target.name} (${target.type})`);
try {
switch (target.type) {
case 'mongodb_collection':
return await this.writeToMongoDBCollection(processedData, target);
case 'file':
return await this.writeToFile(processedData, target);
case 'external_api':
return await this.writeToExternalAPI(processedData, target);
case 'message_queue':
return await this.writeToMessageQueue(processedData, target);
default:
throw new Error(`Unsupported target type: ${target.type}`);
}
} catch (error) {
console.error(`Error writing to target ${target.name}:`, error);
throw error;
}
}
async writeToMongoDBCollection(processedData, target) {
const collection = this.db.collection(target.collection);
try {
switch (target.writeMode || 'insert') {
case 'insert':
const insertResult = await collection.insertOne(processedData);
return { operation: 'insert', insertedId: insertResult.insertedId };
case 'upsert':
const upsertResult = await collection.replaceOne(
target.upsertFilter || { _id: processedData._id },
processedData,
{ upsert: true }
);
return {
operation: 'upsert',
modifiedCount: upsertResult.modifiedCount,
upsertedId: upsertResult.upsertedId
};
case 'update':
const updateResult = await collection.updateOne(
target.updateFilter || { _id: processedData._id },
{ $set: processedData }
);
return {
operation: 'update',
matchedCount: updateResult.matchedCount,
modifiedCount: updateResult.modifiedCount
};
default:
throw new Error(`Unsupported write mode: ${target.writeMode}`);
}
} catch (error) {
console.error('Error writing to MongoDB collection:', error);
throw error;
}
}
async getPipelineMetrics(pipelineId, timeRange = {}) {
console.log(`Getting metrics for pipeline: ${pipelineId}`);
try {
const pipeline = this.pipelines.get(pipelineId);
if (!pipeline) {
throw new Error(`Pipeline not found: ${pipelineId}`);
}
// Build time range filter
const timeFilter = {};
if (timeRange.startTime) {
timeFilter.$gte = new Date(timeRange.startTime);
}
if (timeRange.endTime) {
timeFilter.$lte = new Date(timeRange.endTime);
}
const matchStage = { pipelineId: pipelineId };
if (Object.keys(timeFilter).length > 0) {
matchStage.startTime = timeFilter;
}
// Aggregate pipeline metrics
const metricsAggregation = [
{ $match: matchStage },
{
$group: {
_id: '$pipelineId',
totalRuns: { $sum: 1 },
successfulRuns: {
$sum: { $cond: [{ $eq: ['$status', 'completed'] }, 1, 0] }
},
failedRuns: {
$sum: { $cond: [{ $eq: ['$status', 'failed'] }, 1, 0] }
},
totalRecordsProcessed: { $sum: '$recordsProcessed' },
totalRecordsSuccessful: { $sum: '$recordsSuccessful' },
totalRecordsFailed: { $sum: '$recordsFailed' },
// Performance metrics
averageProcessingTime: { $avg: '$processingTimeMs' },
maxProcessingTime: { $max: '$processingTimeMs' },
minProcessingTime: { $min: '$processingTimeMs' },
// Throughput metrics
averageThroughput: { $avg: '$throughputRecordsPerSecond' },
maxThroughput: { $max: '$throughputRecordsPerSecond' },
// Resource usage
averageMemoryUsage: { $avg: '$memoryUsageMB' },
maxMemoryUsage: { $max: '$memoryUsageMB' },
averageCpuUsage: { $avg: '$cpuUsagePercent' },
maxCpuUsage: { $max: '$cpuUsagePercent' },
// Time range
firstRun: { $min: '$startTime' },
lastRun: { $max: '$startTime' }
}
}
];
const metricsResult = await this.collections.pipelineRuns
.aggregate(metricsAggregation)
.toArray();
const metrics = metricsResult[0] || {
_id: pipelineId,
totalRuns: 0,
successfulRuns: 0,
failedRuns: 0,
totalRecordsProcessed: 0,
totalRecordsSuccessful: 0,
totalRecordsFailed: 0,
averageProcessingTime: 0,
averageThroughput: 0,
averageMemoryUsage: 0,
averageCpuUsage: 0
};
// Calculate additional derived metrics
const successRate = metrics.totalRuns > 0 ?
(metrics.successfulRuns / metrics.totalRuns) * 100 : 0;
const dataQualityRate = metrics.totalRecordsProcessed > 0 ?
(metrics.totalRecordsSuccessful / metrics.totalRecordsProcessed) * 100 : 0;
return {
success: true,
pipelineId: pipelineId,
timeRange: timeRange,
// Basic metrics
totalRuns: metrics.totalRuns,
successfulRuns: metrics.successfulRuns,
failedRuns: metrics.failedRuns,
successRate: Math.round(successRate * 100) / 100,
// Data processing metrics
totalRecordsProcessed: metrics.totalRecordsProcessed,
totalRecordsSuccessful: metrics.totalRecordsSuccessful,
totalRecordsFailed: metrics.totalRecordsFailed,
dataQualityRate: Math.round(dataQualityRate * 100) / 100,
// Performance metrics
performance: {
averageProcessingTimeMs: Math.round(metrics.averageProcessingTime || 0),
maxProcessingTimeMs: metrics.maxProcessingTime || 0,
minProcessingTimeMs: metrics.minProcessingTime || 0,
averageThroughputRps: Math.round((metrics.averageThroughput || 0) * 100) / 100,
maxThroughputRps: Math.round((metrics.maxThroughput || 0) * 100) / 100
},
// Resource usage
resourceUsage: {
averageMemoryMB: Math.round(metrics.averageMemoryUsage || 0),
maxMemoryMB: metrics.maxMemoryUsage || 0,
averageCpuPercent: Math.round((metrics.averageCpuUsage || 0) * 100) / 100,
maxCpuPercent: Math.round((metrics.maxCpuUsage || 0) * 100) / 100
},
// Time range
timeSpan: {
firstRun: metrics.firstRun,
lastRun: metrics.lastRun,
duration: metrics.firstRun && metrics.lastRun ?
metrics.lastRun.getTime() - metrics.firstRun.getTime() : 0
},
// Pipeline status
currentStatus: pipeline.status,
lastRunStatus: pipeline.lastRun ? pipeline.lastRun.status : null
};
} catch (error) {
console.error(`Error getting pipeline metrics for ${pipelineId}:`, error);
return {
success: false,
pipelineId: pipelineId,
error: error.message
};
}
}
async stopPipeline(pipelineId) {
console.log(`Stopping pipeline: ${pipelineId}`);
try {
const pipeline = this.pipelines.get(pipelineId);
if (!pipeline) {
throw new Error(`Pipeline not found: ${pipelineId}`);
}
// Stop change streams
for (const [streamKey, changeStream] of this.changeStreams.entries()) {
if (streamKey.startsWith(pipelineId)) {
await changeStream.close();
this.changeStreams.delete(streamKey);
}
}
// Update pipeline status
pipeline.status = 'stopped';
// Update current run if exists
if (pipeline.lastRun && pipeline.lastRun.status === 'running') {
await this.collections.pipelineRuns.updateOne(
{ runId: pipeline.lastRun.runId },
{
$set: {
status: 'stopped',
endTime: new Date(),
processingTimeMs: Date.now() - pipeline.lastRun.startTime.getTime()
}
}
);
}
this.emit('pipelineStopped', {
pipelineId: pipelineId,
stopTime: new Date()
});
return {
success: true,
pipelineId: pipelineId,
status: 'stopped'
};
} catch (error) {
console.error(`Error stopping pipeline ${pipelineId}:`, error);
return {
success: false,
pipelineId: pipelineId,
error: error.message
};
}
}
// Utility methods for data processing
getNestedValue(obj, path) {
return path.split('.').reduce((current, key) => current && current[key], obj);
}
setNestedValue(obj, path, value) {
const keys = path.split('.');
const lastKey = keys.pop();
const target = keys.reduce((current, key) => {
if (!current[key]) current[key] = {};
return current[key];
}, obj);
target[lastKey] = value;
}
projectFields(data, projection) {
const result = {};
Object.entries(projection).forEach(([field, include]) => {
if (include) {
const value = this.getNestedValue(data, field);
if (value !== undefined) {
this.setNestedValue(result, field, value);
}
}
});
return result;
}
matchFilter(data, filter) {
// Simplified match implementation
// In production, would implement full MongoDB query matching
for (const [field, condition] of Object.entries(filter)) {
const value = this.getNestedValue(data, field);
if (typeof condition === 'object' && condition !== null) {
// Handle operators like $eq, $ne, $gt, etc.
for (const [operator, operand] of Object.entries(condition)) {
switch (operator) {
case '$eq':
if (value !== operand) return null;
break;
case '$ne':
if (value === operand) return null;
break;
case '$gt':
if (value <= operand) return null;
break;
case '$gte':
if (value < operand) return null;
break;
case '$lt':
if (value >= operand) return null;
break;
case '$lte':
if (value > operand) return null;
break;
case '$in':
if (!operand.includes(value)) return null;
break;
case '$nin':
if (operand.includes(value)) return null;
break;
}
}
} else {
// Direct value comparison
if (value !== condition) return null;
}
}
return data;
}
addFields(data, fieldsToAdd) {
const result = { ...data };
Object.entries(fieldsToAdd).forEach(([field, expression]) => {
// Simplified field addition
// In production, would implement full MongoDB expression evaluation
if (typeof expression === 'string' && expression.startsWith('$')) {
// Reference to another field
const referencedValue = this.getNestedValue(data, expression.slice(1));
this.setNestedValue(result, field, referencedValue);
} else {
// Literal value
this.setNestedValue(result, field, expression);
}
});
return result;
}
generatePipelineId(name) {
return `pipeline_${name.toLowerCase().replace(/\s+/g, '_')}_${Date.now()}`;
}
generateRunId() {
return `run_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
async validatePipelineDefinition(definition) {
// Validate required fields
if (!definition.name) {
throw new Error('Pipeline name is required');
}
if (!definition.dataSources || definition.dataSources.length === 0) {
throw new Error('At least one data source is required');
}
if (!definition.dataTargets || definition.dataTargets.length === 0) {
throw new Error('At least one data target is required');
}
// Add more validation as needed
return definition;
}
async updateRunMetrics(runRecord, metrics) {
try {
const updateData = {};
if (metrics.recordsProcessed) {
updateData.$inc = { recordsProcessed: metrics.recordsProcessed };
}
if (metrics.recordsSuccessful) {
updateData.$inc = { ...updateData.$inc, recordsSuccessful: metrics.recordsSuccessful };
}
if (metrics.recordsFailed) {
updateData.$inc = { ...updateData.$inc, recordsFailed: metrics.recordsFailed };
}
if (metrics.processingTimeMs) {
updateData.$set = {
lastProcessingTime: metrics.processingTimeMs,
lastUpdateTime: new Date()
};
}
if (Object.keys(updateData).length > 0) {
await this.collections.pipelineRuns.updateOne(
{ runId: runRecord.runId },
updateData
);
}
} catch (error) {
console.error('Error updating run metrics:', error);
}
}
async handlePipelineError(pipelineId, runRecord, error) {
console.error(`Pipeline error for ${pipelineId}:`, error);
try {
// Record error
const errorRecord = {
pipelineId: pipelineId,
runId: runRecord.runId,
errorTime: new Date(),
errorType: error.constructor.name,
errorMessage: error.message,
errorStack: error.stack,
// Context information
processingContext: {
recordsProcessedBeforeError: runRecord.recordsProcessed,
runDuration: Date.now() - runRecord.startTime.getTime()
}
};
await this.collections.pipelineErrors.insertOne(errorRecord);
// Update run status
await this.collections.pipelineRuns.updateOne(
{ runId: runRecord.runId },
{
$set: {
status: 'failed',
endTime: new Date(),
errorMessage: error.message
},
$push: { errors: errorRecord }
}
);
// Update pipeline status
const pipeline = this.pipelines.get(pipelineId);
if (pipeline) {
pipeline.status = 'error';
pipeline.statistics.failedRuns++;
}
this.emit('pipelineError', {
pipelineId: pipelineId,
runId: runRecord.runId,
error: errorRecord
});
} catch (recordingError) {
console.error('Error recording pipeline error:', recordingError);
}
}
async validateData(data, qualityRules) {
// Implement data quality validation logic
const validationResult = {
isValid: true,
errors: [],
warnings: []
};
// Apply quality rules
for (const rule of qualityRules) {
try {
const ruleResult = await this.applyQualityRule(data, rule);
if (!ruleResult.passed) {
validationResult.isValid = false;
validationResult.errors.push({
rule: rule.name,
message: ruleResult.message,
field: rule.field,
value: this.getNestedValue(data, rule.field)
});
}
} catch (error) {
validationResult.warnings.push({
rule: rule.name,
message: `Rule validation failed: ${error.message}`
});
}
}
return validationResult;
}
async applyQualityRule(data, rule) {
// Implement specific quality rule logic
switch (rule.type) {
case 'required':
const value = this.getNestedValue(data, rule.field);
return {
passed: value !== null && value !== undefined && value !== '',
message: value ? 'Field is present' : `Required field '${rule.field}' is missing`
};
case 'type':
const fieldValue = this.getNestedValue(data, rule.field);
const actualType = typeof fieldValue;
return {
passed: actualType === rule.expectedType,
message: actualType === rule.expectedType ?
'Type validation passed' :
`Expected type '${rule.expectedType}' but got '${actualType}'`
};
case 'range':
const numericValue = this.getNestedValue(data, rule.field);
const inRange = numericValue >= rule.min && numericValue <= rule.max;
return {
passed: inRange,
message: inRange ?
'Value is within range' :
`Value ${numericValue} is outside range [${rule.min}, ${rule.max}]`
};
default:
return { passed: true, message: 'Unknown rule type' };
}
}
async recordDataLineage(pipelineId, originalData, processedData, definition) {
try {
const lineageRecord = {
pipelineId: pipelineId,
timestamp: new Date(),
// Data sources
dataSources: definition.dataSources.map(source => ({
collection: source.collection,
database: source.database || this.db.databaseName
})),
// Data targets
dataTargets: definition.dataTargets.map(target => ({
collection: target.collection,
database: target.database || this.db.databaseName,
type: target.type
})),
// Transformation metadata
transformations: definition.transformationStages.map(stage => ({
type: stage.type,
applied: true
})),
// Data checksums for integrity verification
originalDataChecksum: this.calculateChecksum(originalData),
processedDataChecksum: this.calculateChecksum(processedData),
// Record identifiers
originalRecordId: originalData._id || originalData.id,
processedRecordId: processedData._id || processedData.id
};
await this.collections.dataLineage.insertOne(lineageRecord);
} catch (error) {
console.error('Error recording data lineage:', error);
// Don't throw - lineage recording shouldn't stop pipeline execution
}
}
calculateChecksum(data) {
// Simple checksum calculation for demonstration
// In production, would use proper hashing algorithm
const dataString = JSON.stringify(data, Object.keys(data).sort());
let hash = 0;
for (let i = 0; i < dataString.length; i++) {
const char = dataString.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // Convert to 32bit integer
}
return hash.toString(36);
}
async shutdown() {
console.log('Shutting down data pipeline manager...');
try {
// Stop all running pipelines
for (const [pipelineId, pipeline] of this.pipelines.entries()) {
if (pipeline.status === 'running') {
await this.stopPipeline(pipelineId);
}
}
// Close all change streams
for (const [streamKey, changeStream] of this.changeStreams.entries()) {
await changeStream.close();
}
// Close MongoDB connection
if (this.client) {
await this.client.close();
}
console.log('Data pipeline manager shutdown complete');
} catch (error) {
console.error('Error during shutdown:', error);
}
}
// Additional methods would include implementations for:
// - setupChangeStreams()
// - startPipelineMonitoring()
// - startScheduledPipeline()
// - startBatchPipeline()
// - applyLookupEnrichment()
// - applyCalculationEnrichment()
// - applyExternalApiEnrichment()
// - applyFiltering()
// - applyNormalization()
// - applyCustomFunction()
// - writeToFile()
// - writeToExternalAPI()
// - writeToMessageQueue()
// - handleValidationError()
// - handleProcessingError()
}
// Benefits of MongoDB Advanced Data Pipeline Management:
// - Real-time stream processing with Change Streams
// - Sophisticated data transformation and enrichment capabilities
// - Comprehensive error handling and recovery mechanisms
// - Built-in data quality validation and monitoring
// - Automatic scalability and performance optimization
// - Data lineage tracking and audit capabilities
// - Flexible pipeline orchestration and scheduling
// - SQL-compatible operations through QueryLeaf integration
// - Production-ready monitoring and observability features
// - Enterprise-grade reliability and fault tolerance
module.exports = {
AdvancedDataPipelineManager
};
Advanced Stream Processing Patterns
Real-Time Data Transformation and Analytics
Implement sophisticated stream processing for real-time data analytics:
// Advanced real-time stream processing and analytics
class RealTimeStreamProcessor extends AdvancedDataPipelineManager {
constructor(mongoUri, streamConfig) {
super(mongoUri, streamConfig);
this.streamConfig = {
...streamConfig,
enableWindowedProcessing: true,
enableEventTimeProcessing: true,
enableComplexEventProcessing: true,
enableStreamAggregation: true
};
this.windowManager = new Map();
this.eventPatterns = new Map();
this.streamState = new Map();
this.setupStreamProcessing();
}
async processEventStream(streamDefinition) {
console.log('Setting up advanced event stream processing...');
try {
const streamProcessor = {
streamId: this.generateStreamId(streamDefinition.name),
definition: streamDefinition,
// Windowing configuration
windowConfig: {
type: streamDefinition.windowType || 'tumbling', // tumbling, hopping, sliding
size: streamDefinition.windowSize || 60000, // 1 minute
advance: streamDefinition.windowAdvance || 30000 // 30 seconds
},
// Processing configuration
processingConfig: {
enableLateSparks: streamDefinition.enableLateSparks || false,
watermarkDelay: streamDefinition.watermarkDelay || 5000,
enableExactlyOnceProcessing: streamDefinition.enableExactlyOnceProcessing || false
},
// Analytics configuration
analyticsConfig: {
enableAggregation: streamDefinition.enableAggregation !== false,
enablePatternDetection: streamDefinition.enablePatternDetection || false,
enableAnomalyDetection: streamDefinition.enableAnomalyDetection || false,
enableTrendAnalysis: streamDefinition.enableTrendAnalysis || false
}
};
return await this.deployStreamProcessor(streamProcessor);
} catch (error) {
console.error('Error processing event stream:', error);
return {
success: false,
error: error.message
};
}
}
async deployStreamProcessor(streamProcessor) {
console.log(`Deploying stream processor: ${streamProcessor.streamId}`);
try {
// Setup windowed processing
if (this.streamConfig.enableWindowedProcessing) {
await this.setupWindowedProcessing(streamProcessor);
}
// Setup complex event processing
if (this.streamConfig.enableComplexEventProcessing) {
await this.setupComplexEventProcessing(streamProcessor);
}
// Setup stream aggregation
if (this.streamConfig.enableStreamAggregation) {
await this.setupStreamAggregation(streamProcessor);
}
return {
success: true,
streamId: streamProcessor.streamId,
processorConfig: streamProcessor
};
} catch (error) {
console.error(`Error deploying stream processor ${streamProcessor.streamId}:`, error);
return {
success: false,
streamId: streamProcessor.streamId,
error: error.message
};
}
}
}
SQL-Style Data Pipeline Operations with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB data pipeline management:
-- QueryLeaf advanced data pipeline operations with SQL-familiar syntax for MongoDB
-- Pipeline definition and configuration
CREATE OR REPLACE PIPELINE customer_data_enrichment_pipeline
AS
WITH pipeline_config AS (
-- Pipeline metadata and configuration
SELECT
'customer_data_enrichment' as pipeline_name,
'stream' as processing_mode,
'change_stream' as trigger_type,
true as auto_start,
-- Performance configuration
1000 as batch_size,
5 as max_concurrency,
300000 as timeout_ms,
-- Quality configuration
true as enable_data_validation,
true as enable_schema_evolution,
true as enable_data_lineage,
-- Error handling
'exponential_backoff' as retry_strategy,
3 as max_retries,
true as dead_letter_queue
),
data_sources AS (
-- Define data sources for pipeline
SELECT ARRAY[
JSON_BUILD_OBJECT(
'name', 'customer_changes',
'collection', 'customers',
'database', 'ecommerce',
'filter', JSON_BUILD_OBJECT(
'operationType', JSON_BUILD_OBJECT('$in', ARRAY['insert', 'update'])
),
'change_stream_options', JSON_BUILD_OBJECT(
'fullDocument', 'updateLookup',
'fullDocumentBeforeChange', 'whenAvailable'
)
),
JSON_BUILD_OBJECT(
'name', 'order_changes',
'collection', 'orders',
'database', 'ecommerce',
'filter', JSON_BUILD_OBJECT(
'fullDocument.customer_id', JSON_BUILD_OBJECT('$exists', true)
)
)
] as sources
),
transformation_stages AS (
-- Define transformation pipeline stages
SELECT ARRAY[
-- Stage 1: Data enrichment with external lookups
JSON_BUILD_OBJECT(
'type', 'data_enrichment',
'name', 'customer_profile_enrichment',
'enrichment_config', JSON_BUILD_OBJECT(
'enrichments', ARRAY[
JSON_BUILD_OBJECT(
'type', 'lookup',
'lookup_collection', 'customer_profiles',
'lookup_field', 'customer_id',
'source_field', 'fullDocument.customer_id',
'target_field', 'customer_profile'
),
JSON_BUILD_OBJECT(
'type', 'calculation',
'calculations', ARRAY[
JSON_BUILD_OBJECT(
'field', 'customer_lifetime_value',
'expression', 'customer_profile.total_orders * customer_profile.avg_order_value'
),
JSON_BUILD_OBJECT(
'field', 'customer_segment',
'expression', 'CASE WHEN customer_lifetime_value > 1000 THEN "premium" WHEN customer_lifetime_value > 500 THEN "standard" ELSE "basic" END'
)
]
)
]
)
),
-- Stage 2: Field mapping and normalization
JSON_BUILD_OBJECT(
'type', 'field_mapping',
'name', 'customer_data_mapping',
'field_mappings', JSON_BUILD_OBJECT(
'customer_id', 'fullDocument.customer_id',
'customer_email', 'fullDocument.email',
'customer_name', 'fullDocument.full_name',
'customer_phone', 'fullDocument.phone_number',
'registration_date', 'fullDocument.created_at',
'last_login', 'fullDocument.last_login_at',
'profile_completion', 'customer_profile.completion_percentage',
'lifetime_value', 'customer_lifetime_value',
'segment', 'customer_segment',
'change_type', 'operationType',
'change_timestamp', 'clusterTime'
)
),
-- Stage 3: Data validation and quality checks
JSON_BUILD_OBJECT(
'type', 'data_validation',
'name', 'customer_data_validation',
'validation_rules', ARRAY[
JSON_BUILD_OBJECT(
'field', 'customer_email',
'type', 'email',
'required', true
),
JSON_BUILD_OBJECT(
'field', 'customer_phone',
'type', 'phone',
'required', false
),
JSON_BUILD_OBJECT(
'field', 'lifetime_value',
'type', 'numeric',
'min_value', 0,
'max_value', 100000
)
]
),
-- Stage 4: Aggregation for analytics
JSON_BUILD_OBJECT(
'type', 'aggregation',
'name', 'customer_analytics_aggregation',
'aggregation_pipeline', ARRAY[
JSON_BUILD_OBJECT(
'$addFields', JSON_BUILD_OBJECT(
'processing_date', '$$NOW',
'data_freshness_score', JSON_BUILD_OBJECT(
'$subtract', ARRAY[100, JSON_BUILD_OBJECT(
'$divide', ARRAY[
JSON_BUILD_OBJECT('$subtract', ARRAY['$$NOW', '$change_timestamp']),
3600000 -- Convert to hours
]
)]
),
'engagement_score', JSON_BUILD_OBJECT(
'$multiply', ARRAY[
'$profile_completion',
JSON_BUILD_OBJECT('$cond', ARRAY[
JSON_BUILD_OBJECT('$ne', ARRAY['$last_login', NULL]),
1.2, -- Boost for active users
1.0
])
]
)
)
),
JSON_BUILD_OBJECT(
'$addFields', JSON_BUILD_OBJECT(
'customer_score', JSON_BUILD_OBJECT(
'$add', ARRAY[
JSON_BUILD_OBJECT('$multiply', ARRAY['$lifetime_value', 0.4]),
JSON_BUILD_OBJECT('$multiply', ARRAY['$engagement_score', 0.3]),
JSON_BUILD_OBJECT('$multiply', ARRAY['$data_freshness_score', 0.3])
]
)
)
)
]
)
] as stages
),
data_targets AS (
-- Define output destinations
SELECT ARRAY[
JSON_BUILD_OBJECT(
'name', 'enriched_customers',
'type', 'mongodb_collection',
'collection', 'enriched_customers',
'database', 'analytics',
'write_mode', 'upsert',
'upsert_filter', JSON_BUILD_OBJECT('customer_id', '$customer_id')
),
JSON_BUILD_OBJECT(
'name', 'customer_analytics_stream',
'type', 'message_queue',
'queue_name', 'customer_analytics',
'format', 'json',
'partition_key', 'customer_segment'
),
JSON_BUILD_OBJECT(
'name', 'data_warehouse_export',
'type', 'file',
'file_path', '/data/exports/customer_enrichment',
'format', 'parquet',
'partition_by', ARRAY['segment', 'processing_date']
)
] as targets
),
data_quality_rules AS (
-- Define comprehensive data quality rules
SELECT ARRAY[
JSON_BUILD_OBJECT(
'name', 'required_customer_id',
'type', 'required',
'field', 'customer_id',
'severity', 'critical'
),
JSON_BUILD_OBJECT(
'name', 'valid_email_format',
'type', 'regex',
'field', 'customer_email',
'pattern', '^[\\w\\.-]+@[\\w\\.-]+\\.[a-zA-Z]{2,}$',
'severity', 'high'
),
JSON_BUILD_OBJECT(
'name', 'reasonable_lifetime_value',
'type', 'range',
'field', 'lifetime_value',
'min', 0,
'max', 50000,
'severity', 'medium'
),
JSON_BUILD_OBJECT(
'name', 'valid_customer_segment',
'type', 'enum',
'field', 'segment',
'allowed_values', ARRAY['premium', 'standard', 'basic'],
'severity', 'high'
)
] as rules
)
-- Create the pipeline with comprehensive configuration
SELECT
'customer_data_enrichment_pipeline' as pipeline_name,
pipeline_config.*,
data_sources.sources,
transformation_stages.stages,
data_targets.targets,
data_quality_rules.rules,
-- Pipeline scheduling
JSON_BUILD_OBJECT(
'schedule_type', 'real_time',
'trigger_conditions', ARRAY[
'customer_data_change',
'order_completion',
'profile_update'
]
) as scheduling_config,
-- Monitoring configuration
JSON_BUILD_OBJECT(
'enable_metrics', true,
'metrics_interval_seconds', 60,
'alert_thresholds', JSON_BUILD_OBJECT(
'error_rate_percent', 5,
'processing_latency_ms', 5000,
'throughput_records_per_second', 100
),
'notification_channels', ARRAY[
'email:[email protected]',
'slack:#data-pipelines',
'webhook:https://monitoring.company.com/alerts'
]
) as monitoring_config
FROM pipeline_config, data_sources, transformation_stages, data_targets, data_quality_rules;
-- Pipeline execution and monitoring queries
-- Real-time pipeline performance monitoring
WITH pipeline_performance AS (
SELECT
pipeline_id,
pipeline_name,
run_id,
start_time,
end_time,
status,
-- Processing metrics
records_processed,
records_successful,
records_failed,
-- Performance calculations
EXTRACT(MILLISECONDS FROM (COALESCE(end_time, CURRENT_TIMESTAMP) - start_time)) as duration_ms,
-- Throughput calculation
CASE
WHEN EXTRACT(SECONDS FROM (COALESCE(end_time, CURRENT_TIMESTAMP) - start_time)) > 0 THEN
records_processed / EXTRACT(SECONDS FROM (COALESCE(end_time, CURRENT_TIMESTAMP) - start_time))
ELSE 0
END as throughput_records_per_second,
-- Success rate
CASE
WHEN records_processed > 0 THEN
(records_successful * 100.0) / records_processed
ELSE 0
END as success_rate_percent,
-- Resource utilization
memory_usage_mb,
cpu_usage_percent,
-- Current processing lag
CASE
WHEN status = 'running' THEN
EXTRACT(SECONDS FROM (CURRENT_TIMESTAMP - last_processed_timestamp))
ELSE NULL
END as current_lag_seconds
FROM pipeline_runs
WHERE start_time >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
),
performance_summary AS (
SELECT
pipeline_name,
COUNT(*) as total_runs,
COUNT(*) FILTER (WHERE status = 'completed') as successful_runs,
COUNT(*) FILTER (WHERE status = 'failed') as failed_runs,
COUNT(*) FILTER (WHERE status = 'running') as active_runs,
-- Aggregate performance metrics
SUM(records_processed) as total_records_processed,
SUM(records_successful) as total_records_successful,
SUM(records_failed) as total_records_failed,
-- Performance statistics
AVG(duration_ms) as avg_duration_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95_duration_ms,
AVG(throughput_records_per_second) as avg_throughput_rps,
MAX(throughput_records_per_second) as max_throughput_rps,
-- Quality metrics
AVG(success_rate_percent) as avg_success_rate,
MIN(success_rate_percent) as min_success_rate,
-- Resource usage
AVG(memory_usage_mb) as avg_memory_usage_mb,
MAX(memory_usage_mb) as max_memory_usage_mb,
AVG(cpu_usage_percent) as avg_cpu_usage,
MAX(cpu_usage_percent) as max_cpu_usage,
-- Lag analysis
AVG(current_lag_seconds) as avg_processing_lag_seconds,
MAX(current_lag_seconds) as max_processing_lag_seconds
FROM pipeline_performance
GROUP BY pipeline_name
)
SELECT
pipeline_name,
total_runs,
successful_runs,
failed_runs,
active_runs,
-- Overall health assessment
CASE
WHEN failed_runs > total_runs * 0.1 THEN 'critical'
WHEN avg_success_rate < 95 THEN 'warning'
WHEN avg_processing_lag_seconds > 300 THEN 'warning' -- 5 minutes lag
WHEN max_cpu_usage > 90 OR max_memory_usage_mb > 4096 THEN 'warning'
ELSE 'healthy'
END as health_status,
-- Processing statistics
total_records_processed,
total_records_successful,
total_records_failed,
-- Performance metrics
ROUND(avg_duration_ms, 0) as avg_duration_ms,
ROUND(p95_duration_ms, 0) as p95_duration_ms,
ROUND(avg_throughput_rps, 2) as avg_throughput_rps,
ROUND(max_throughput_rps, 2) as max_throughput_rps,
-- Quality and reliability
ROUND(avg_success_rate, 2) as avg_success_rate_percent,
ROUND(min_success_rate, 2) as min_success_rate_percent,
-- Resource utilization
ROUND(avg_memory_usage_mb, 0) as avg_memory_usage_mb,
ROUND(max_memory_usage_mb, 0) as max_memory_usage_mb,
ROUND(avg_cpu_usage, 1) as avg_cpu_usage_percent,
ROUND(max_cpu_usage, 1) as max_cpu_usage_percent,
-- Processing lag indicators
COALESCE(ROUND(avg_processing_lag_seconds, 0), 0) as avg_lag_seconds,
COALESCE(ROUND(max_processing_lag_seconds, 0), 0) as max_lag_seconds,
-- Operational recommendations
CASE
WHEN failed_runs > total_runs * 0.05 THEN 'investigate_errors'
WHEN avg_throughput_rps < 50 THEN 'optimize_performance'
WHEN max_cpu_usage > 80 THEN 'scale_up_resources'
WHEN avg_processing_lag_seconds > 120 THEN 'reduce_processing_latency'
ELSE 'monitor_continued'
END as recommendation,
-- Capacity planning
CASE
WHEN max_throughput_rps / avg_throughput_rps < 1.5 THEN 'add_capacity'
WHEN max_memory_usage_mb > 3072 THEN 'increase_memory'
WHEN active_runs > 1 THEN 'check_concurrency_limits'
ELSE 'capacity_sufficient'
END as capacity_recommendation
FROM performance_summary
ORDER BY
CASE health_status
WHEN 'critical' THEN 1
WHEN 'warning' THEN 2
ELSE 3
END,
total_records_processed DESC;
-- Data lineage and quality tracking
WITH data_lineage_analysis AS (
SELECT
pipeline_id,
DATE_TRUNC('hour', timestamp) as processing_hour,
-- Source and target tracking
JSONB_ARRAY_ELEMENTS(data_sources) ->> 'collection' as source_collection,
JSONB_ARRAY_ELEMENTS(data_targets) ->> 'collection' as target_collection,
-- Data quality metrics
COUNT(*) as total_transformations,
COUNT(*) FILTER (WHERE original_data_checksum != processed_data_checksum) as data_modified,
COUNT(DISTINCT original_record_id) as unique_source_records,
COUNT(DISTINCT processed_record_id) as unique_target_records,
-- Transformation tracking
JSONB_ARRAY_ELEMENTS(transformations) ->> 'type' as transformation_type,
COUNT(*) FILTER (WHERE (JSONB_ARRAY_ELEMENTS(transformations) ->> 'applied')::boolean = true) as transformations_applied,
-- Data integrity checks
COUNT(*) FILTER (WHERE original_data_checksum IS NOT NULL AND processed_data_checksum IS NOT NULL) as checksum_validations,
-- Processing metadata
MIN(timestamp) as first_processing,
MAX(timestamp) as last_processing
FROM data_lineage
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY
pipeline_id,
DATE_TRUNC('hour', timestamp),
JSONB_ARRAY_ELEMENTS(data_sources) ->> 'collection',
JSONB_ARRAY_ELEMENTS(data_targets) ->> 'collection',
JSONB_ARRAY_ELEMENTS(transformations) ->> 'type'
),
quality_summary AS (
SELECT
pipeline_id,
source_collection,
target_collection,
transformation_type,
-- Aggregated metrics
SUM(total_transformations) as total_transformations,
SUM(data_modified) as total_data_modified,
SUM(unique_source_records) as total_source_records,
SUM(unique_target_records) as total_target_records,
SUM(transformations_applied) as total_transformations_applied,
SUM(checksum_validations) as total_checksum_validations,
-- Data quality calculations
CASE
WHEN SUM(total_transformations) > 0 THEN
(SUM(transformations_applied) * 100.0) / SUM(total_transformations)
ELSE 0
END as transformation_success_rate,
CASE
WHEN SUM(unique_source_records) > 0 THEN
(SUM(unique_target_records) * 100.0) / SUM(unique_source_records)
ELSE 0
END as record_completeness_rate,
-- Data modification analysis
CASE
WHEN SUM(total_transformations) > 0 THEN
(SUM(data_modified) * 100.0) / SUM(total_transformations)
ELSE 0
END as data_modification_rate,
-- Processing consistency
COUNT(DISTINCT processing_hour) as processing_hours_active,
AVG(EXTRACT(MINUTES FROM (last_processing - first_processing))) as avg_processing_window_minutes
FROM data_lineage_analysis
GROUP BY pipeline_id, source_collection, target_collection, transformation_type
)
SELECT
pipeline_id,
source_collection,
target_collection,
transformation_type,
-- Volume metrics
total_source_records,
total_target_records,
total_transformations,
total_transformations_applied,
-- Quality scores
ROUND(transformation_success_rate, 2) as transformation_success_percent,
ROUND(record_completeness_rate, 2) as record_completeness_percent,
ROUND(data_modification_rate, 2) as data_modification_percent,
-- Data integrity assessment
total_checksum_validations,
CASE
WHEN total_checksum_validations > 0 AND transformation_success_rate > 98 THEN 'excellent'
WHEN total_checksum_validations > 0 AND transformation_success_rate > 95 THEN 'good'
WHEN total_checksum_validations > 0 AND transformation_success_rate > 90 THEN 'acceptable'
ELSE 'needs_attention'
END as data_quality_rating,
-- Processing consistency
processing_hours_active,
ROUND(avg_processing_window_minutes, 1) as avg_processing_window_minutes,
-- Operational insights
CASE
WHEN record_completeness_rate < 98 THEN 'investigate_data_loss'
WHEN transformation_success_rate < 95 THEN 'review_transformation_logic'
WHEN data_modification_rate > 80 THEN 'validate_transformation_accuracy'
WHEN avg_processing_window_minutes > 60 THEN 'optimize_processing_speed'
ELSE 'quality_acceptable'
END as quality_recommendation,
-- Data flow health
CASE
WHEN record_completeness_rate > 99 AND transformation_success_rate > 98 THEN 'healthy'
WHEN record_completeness_rate > 95 AND transformation_success_rate > 95 THEN 'stable'
WHEN record_completeness_rate > 90 AND transformation_success_rate > 90 THEN 'concerning'
ELSE 'critical'
END as data_flow_health
FROM quality_summary
WHERE total_transformations > 0
ORDER BY
CASE data_flow_health
WHEN 'critical' THEN 1
WHEN 'concerning' THEN 2
WHEN 'stable' THEN 3
ELSE 4
END,
total_source_records DESC;
-- Error analysis and troubleshooting
SELECT
pe.pipeline_id,
pe.run_id,
pe.error_time,
pe.error_type,
pe.error_message,
-- Error context
pe.processing_context ->> 'recordsProcessedBeforeError' as records_before_error,
pe.processing_context ->> 'runDuration' as run_duration_before_error,
-- Error frequency analysis
COUNT(*) OVER (
PARTITION BY pe.pipeline_id, pe.error_type
ORDER BY pe.error_time
RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
) as similar_errors_last_hour,
-- Error pattern detection
LAG(pe.error_time) OVER (
PARTITION BY pe.pipeline_id, pe.error_type
ORDER BY pe.error_time
) as previous_similar_error,
-- Pipeline run context
pr.start_time as run_start_time,
pr.records_processed as total_run_records,
pr.status as run_status,
-- Resolution tracking
CASE
WHEN pe.error_type IN ('ValidationError', 'SchemaError') THEN 'data_quality_issue'
WHEN pe.error_type IN ('ConnectionError', 'TimeoutError') THEN 'infrastructure_issue'
WHEN pe.error_type IN ('TransformationError', 'ProcessingError') THEN 'logic_issue'
ELSE 'unknown_category'
END as error_category,
-- Priority assessment
CASE
WHEN COUNT(*) OVER (PARTITION BY pe.pipeline_id, pe.error_type ORDER BY pe.error_time RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW) > 10 THEN 'high'
WHEN pe.error_type IN ('ConnectionError', 'TimeoutError') THEN 'high'
WHEN pr.records_processed > 1000 THEN 'medium'
ELSE 'low'
END as error_priority,
-- Suggested resolution
CASE
WHEN pe.error_type = 'ValidationError' THEN 'Review data quality rules and source data format'
WHEN pe.error_type = 'ConnectionError' THEN 'Check database connectivity and network stability'
WHEN pe.error_type = 'TimeoutError' THEN 'Increase timeout values or optimize query performance'
WHEN pe.error_type = 'TransformationError' THEN 'Review transformation logic and test with sample data'
ELSE 'Investigate error stack trace and contact development team'
END as suggested_resolution
FROM pipeline_errors pe
JOIN pipeline_runs pr ON pe.run_id = pr.run_id
WHERE pe.error_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY error_priority DESC, pe.error_time DESC;
-- QueryLeaf provides comprehensive MongoDB data pipeline capabilities:
-- 1. Real-time change stream processing with SQL-familiar syntax
-- 2. Advanced data transformation and enrichment operations
-- 3. Comprehensive data quality validation and monitoring
-- 4. Pipeline orchestration and scheduling capabilities
-- 5. Data lineage tracking and audit functionality
-- 6. Error handling and troubleshooting tools
-- 7. Performance monitoring and optimization features
-- 8. Stream processing and windowed analytics
-- 9. SQL-style pipeline definition and management
-- 10. Enterprise-grade reliability and fault tolerance
Best Practices for Production Data Pipelines
Pipeline Architecture and Design Principles
Essential principles for effective MongoDB data pipeline deployment:
- Stream Processing Design: Implement real-time change stream processing for low-latency data operations
- Data Quality Management: Establish comprehensive validation rules and monitoring for data integrity
- Error Handling Strategy: Design robust error handling with retry mechanisms and dead letter queues
- Performance Optimization: Optimize pipeline throughput with appropriate batching and concurrency settings
- Monitoring Integration: Implement comprehensive monitoring for pipeline health and data quality metrics
- Schema Evolution: Plan for schema changes and backward compatibility in data transformations
Scalability and Production Operations
Optimize data pipeline operations for enterprise-scale requirements:
- Resource Management: Configure appropriate resource limits and scaling policies for pipeline execution
- Data Lineage: Track data transformations and dependencies for auditing and troubleshooting
- Backup and Recovery: Implement pipeline state backup and recovery mechanisms
- Security Integration: Ensure pipeline operations meet security and compliance requirements
- Operational Integration: Integrate pipeline monitoring with existing alerting and operational workflows
- Cost Optimization: Monitor resource usage and optimize pipeline efficiency for cost-effective operations
Conclusion
MongoDB data pipeline management provides sophisticated real-time data processing capabilities that enable modern applications to handle complex data transformation workflows, stream processing, and ETL operations with advanced monitoring, error handling, and scalability features. The native change stream support and aggregation framework ensure that data pipelines can process high-volume data streams efficiently while maintaining data quality and reliability.
Key MongoDB Data Pipeline benefits include:
- Real-Time Processing: Native change stream support for immediate data processing and transformation
- Advanced Transformations: Comprehensive data transformation capabilities with aggregation framework integration
- Data Quality Management: Built-in validation, monitoring, and quality assessment tools
- Stream Processing: Sophisticated stream processing patterns for complex event processing and analytics
- Pipeline Orchestration: Flexible pipeline scheduling and orchestration with error handling and recovery
- SQL Accessibility: Familiar SQL-style pipeline operations through QueryLeaf for accessible data pipeline management
Whether you're building real-time analytics systems, data warehousing pipelines, microservices data synchronization, or complex ETL workflows, MongoDB data pipeline management with QueryLeaf's familiar SQL interface provides the foundation for sophisticated, scalable data processing operations.
QueryLeaf Integration: QueryLeaf automatically translates SQL-style pipeline operations into MongoDB's native change streams and aggregation pipelines, making advanced data processing functionality accessible to SQL-oriented development teams. Complex data transformations, stream processing operations, and pipeline orchestration are seamlessly handled through familiar SQL constructs, enabling sophisticated data workflows without requiring deep MongoDB pipeline expertise.
The combination of MongoDB's robust data pipeline capabilities with SQL-style pipeline management operations makes it an ideal platform for applications requiring both sophisticated real-time data processing and familiar database management patterns, ensuring your data pipelines can scale efficiently while maintaining reliability and performance as data volume and processing complexity grow.