MongoDB ETL and Data Pipeline Processing: High-Performance Data Transformation and Stream Processing with SQL-Familiar Pipeline Architecture
Modern data-driven organizations require sophisticated ETL (Extract, Transform, Load) processes that can handle diverse data sources, perform complex transformations, and deliver processed data to multiple downstream systems in real-time. Traditional ETL tools often struggle with the volume, variety, and velocity requirements of contemporary data workflows, particularly when dealing with semi-structured data, real-time streaming sources, and the need for flexible schema evolution.
MongoDB's aggregation framework, combined with change streams and flexible document storage, provides a powerful foundation for building high-performance ETL pipelines that can process data at scale while maintaining the familiar SQL-style operations that development teams understand. This approach enables efficient data transformation, real-time processing capabilities, and seamless integration with existing data infrastructure.
The Traditional ETL Challenge
Conventional ETL architectures face significant limitations when dealing with modern data requirements:
-- Traditional PostgreSQL ETL limitations
-- Fixed schema constraints limit data source flexibility
CREATE TABLE raw_customer_data (
customer_id BIGINT PRIMARY KEY,
email VARCHAR(255),
first_name VARCHAR(100),
last_name VARCHAR(100),
registration_date DATE,
-- Static schema can't accommodate varying data structures
profile_data JSONB -- Limited JSON support
);
-- Complex transformation logic requires stored procedures
CREATE OR REPLACE FUNCTION transform_customer_data()
RETURNS TABLE(
customer_key BIGINT,
full_name VARCHAR(201),
email_domain VARCHAR(100),
registration_month VARCHAR(7),
profile_score INTEGER
) AS $$
BEGIN
RETURN QUERY
SELECT
c.customer_id,
CONCAT(c.first_name, ' ', c.last_name),
SUBSTRING(c.email FROM POSITION('@' IN c.email) + 1),
TO_CHAR(c.registration_date, 'YYYY-MM'),
CASE
WHEN c.profile_data->>'premium' = 'true' THEN 100
WHEN c.profile_data->>'verified' = 'true' THEN 50
ELSE 25
END
FROM raw_customer_data c
WHERE c.registration_date >= CURRENT_DATE - INTERVAL '30 days';
END;
$$ LANGUAGE plpgsql;
-- Batch processing limitations
INSERT INTO transformed_customers
SELECT * FROM transform_customer_data();
-- Problems:
-- 1. Rigid schema requirements
-- 2. Limited real-time processing
-- 3. Complex stored procedure logic
-- 4. Poor scaling for large datasets
-- 5. Limited support for nested/complex data structures
MongoDB ETL pipelines address these limitations with flexible aggregation-based transformations:
// MongoDB flexible ETL pipeline
const customerTransformPipeline = [
// Extract: Flexible data ingestion from multiple sources
{
$match: {
registration_date: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) },
status: { $ne: "deleted" }
}
},
// Transform: Complex data transformation with aggregation operators
{
$addFields: {
full_name: { $concat: ["$first_name", " ", "$last_name"] },
email_domain: {
$substr: [
"$email",
{ $add: [{ $indexOfBytes: ["$email", "@"] }, 1] },
{ $strLenCP: "$email" }
]
},
registration_month: {
$dateToString: { format: "%Y-%m", date: "$registration_date" }
},
profile_score: {
$switch: {
branches: [
{ case: { $eq: ["$profile.premium", true] }, then: 100 },
{ case: { $eq: ["$profile.verified", true] }, then: 50 }
],
default: 25
}
},
// Handle complex nested transformations
preferences: {
$map: {
input: "$profile.preferences",
as: "pref",
in: {
category: "$$pref.type",
enabled: "$$pref.active",
weight: { $multiply: ["$$pref.priority", 10] }
}
}
}
}
},
// Load: Flexible output with computed fields
{
$project: {
customer_key: "$customer_id",
full_name: 1,
email_domain: 1,
registration_month: 1,
profile_score: 1,
preferences: 1,
transformation_timestamp: new Date(),
source_system: "customer_api"
}
}
];
// Process with high-performance aggregation
const transformedCustomers = await db.customers.aggregate(customerTransformPipeline).toArray();
// Benefits:
// - Flexible schema handling
// - Complex nested data transformation
// - High-performance parallel processing
// - Real-time processing capabilities
// - Rich transformation operators
ETL Pipeline Architecture
Data Extraction Patterns
Implement flexible data extraction from multiple sources:
// Comprehensive data extraction service
class DataExtractionService {
constructor(db, config) {
this.db = db;
this.config = config;
this.rawCollection = db.collection('raw_data');
this.metadataCollection = db.collection('etl_metadata');
}
async extractFromAPI(sourceConfig, extractionId) {
const extractionMetadata = {
extraction_id: extractionId,
source_type: 'api',
source_config: sourceConfig,
start_time: new Date(),
status: 'in_progress',
records_processed: 0,
errors: []
};
try {
// API data extraction with pagination
let hasMore = true;
let offset = 0;
const batchSize = sourceConfig.batch_size || 1000;
while (hasMore) {
const response = await this.fetchAPIData(sourceConfig, offset, batchSize);
if (response.data && response.data.length > 0) {
// Prepare documents for insertion
const documents = response.data.map(record => ({
...record,
_extraction_metadata: {
extraction_id: extractionId,
source_url: sourceConfig.url,
extracted_at: new Date(),
batch_offset: offset,
raw_record: true
}
}));
// Bulk insert with ordered operations
await this.rawCollection.insertMany(documents, {
ordered: false,
writeConcern: { w: 1, j: true }
});
extractionMetadata.records_processed += documents.length;
offset += batchSize;
hasMore = response.has_more;
} else {
hasMore = false;
}
// Update progress
await this.updateExtractionProgress(extractionId, extractionMetadata);
}
extractionMetadata.status = 'completed';
extractionMetadata.end_time = new Date();
} catch (error) {
extractionMetadata.status = 'failed';
extractionMetadata.error = error.message;
extractionMetadata.end_time = new Date();
throw error;
} finally {
await this.metadataCollection.replaceOne(
{ extraction_id: extractionId },
extractionMetadata,
{ upsert: true }
);
}
return extractionMetadata;
}
async extractFromDatabase(dbConfig, query, extractionId) {
// Database extraction with change tracking
const extractionMetadata = {
extraction_id: extractionId,
source_type: 'database',
source_config: dbConfig,
start_time: new Date(),
status: 'in_progress',
records_processed: 0
};
try {
// Get last extraction timestamp for incremental updates
const lastExtraction = await this.metadataCollection.findOne(
{
source_type: 'database',
'source_config.connection_string': dbConfig.connection_string,
status: 'completed'
},
{ sort: { end_time: -1 } }
);
// Build incremental query
let incrementalQuery = { ...query };
if (lastExtraction && dbConfig.incremental_field) {
incrementalQuery[dbConfig.incremental_field] = {
$gt: lastExtraction.end_time
};
}
// Extract with cursor for memory efficiency
const cursor = this.db.collection(dbConfig.collection).find(incrementalQuery);
const batchSize = 1000;
let batch = [];
let recordCount = 0;
for await (const doc of cursor) {
// Add extraction metadata
const enrichedDoc = {
...doc,
_extraction_metadata: {
extraction_id: extractionId,
source_database: dbConfig.database,
source_collection: dbConfig.collection,
extracted_at: new Date()
}
};
batch.push(enrichedDoc);
if (batch.length >= batchSize) {
await this.rawCollection.insertMany(batch, { ordered: false });
recordCount += batch.length;
batch = [];
// Update progress
extractionMetadata.records_processed = recordCount;
await this.updateExtractionProgress(extractionId, extractionMetadata);
}
}
// Insert final batch
if (batch.length > 0) {
await this.rawCollection.insertMany(batch, { ordered: false });
recordCount += batch.length;
}
extractionMetadata.records_processed = recordCount;
extractionMetadata.status = 'completed';
extractionMetadata.end_time = new Date();
} catch (error) {
extractionMetadata.status = 'failed';
extractionMetadata.error = error.message;
extractionMetadata.end_time = new Date();
throw error;
} finally {
await this.metadataCollection.replaceOne(
{ extraction_id: extractionId },
extractionMetadata,
{ upsert: true }
);
}
return extractionMetadata;
}
async extractFromFiles(fileConfig, extractionId) {
// File-based extraction (CSV, JSON, XML, etc.)
const extractionMetadata = {
extraction_id: extractionId,
source_type: 'file',
source_config: fileConfig,
start_time: new Date(),
status: 'in_progress',
files_processed: 0,
records_processed: 0
};
try {
const files = await this.getFilesFromSource(fileConfig);
for (const filePath of files) {
const fileData = await this.parseFile(filePath, fileConfig.format);
if (fileData && fileData.length > 0) {
const enrichedDocuments = fileData.map(record => ({
...record,
_extraction_metadata: {
extraction_id: extractionId,
source_file: filePath,
extracted_at: new Date(),
file_format: fileConfig.format
}
}));
// Batch insert file data
const batchSize = 1000;
for (let i = 0; i < enrichedDocuments.length; i += batchSize) {
const batch = enrichedDocuments.slice(i, i + batchSize);
await this.rawCollection.insertMany(batch, { ordered: false });
extractionMetadata.records_processed += batch.length;
}
}
extractionMetadata.files_processed++;
await this.updateExtractionProgress(extractionId, extractionMetadata);
}
extractionMetadata.status = 'completed';
extractionMetadata.end_time = new Date();
} catch (error) {
extractionMetadata.status = 'failed';
extractionMetadata.error = error.message;
extractionMetadata.end_time = new Date();
throw error;
} finally {
await this.metadataCollection.replaceOne(
{ extraction_id: extractionId },
extractionMetadata,
{ upsert: true }
);
}
return extractionMetadata;
}
async fetchAPIData(config, offset, limit) {
// Implement API-specific data fetching logic
// This would integrate with actual API clients
const url = `${config.url}?offset=${offset}&limit=${limit}`;
// Return { data: [...], has_more: boolean }
return { data: [], has_more: false }; // Placeholder
}
async parseFile(filePath, format) {
// Implement file parsing logic for various formats
// CSV, JSON, XML, Parquet, etc.
return []; // Placeholder
}
async getFilesFromSource(config) {
// Get file list from various sources (S3, FTP, local filesystem)
return []; // Placeholder
}
async updateExtractionProgress(extractionId, metadata) {
await this.metadataCollection.updateOne(
{ extraction_id: extractionId },
{ $set: metadata },
{ upsert: true }
);
}
}
Data Transformation Engine
Build sophisticated data transformation pipelines:
// Advanced data transformation service
class DataTransformationService {
constructor(db) {
this.db = db;
this.rawCollection = db.collection('raw_data');
this.transformedCollection = db.collection('transformed_data');
this.errorCollection = db.collection('transformation_errors');
}
async executeTransformationPipeline(pipelineConfig, transformationId) {
const transformationMetadata = {
transformation_id: transformationId,
pipeline_name: pipelineConfig.name,
start_time: new Date(),
status: 'in_progress',
records_processed: 0,
records_transformed: 0,
errors_encountered: 0,
stages: []
};
try {
// Build dynamic aggregation pipeline
const aggregationPipeline = this.buildAggregationPipeline(pipelineConfig);
// Execute transformation with error handling
const transformationResults = await this.executeWithErrorHandling(
aggregationPipeline,
transformationId,
transformationMetadata
);
transformationMetadata.records_transformed = transformationResults.successCount;
transformationMetadata.errors_encountered = transformationResults.errorCount;
transformationMetadata.status = 'completed';
transformationMetadata.end_time = new Date();
return transformationMetadata;
} catch (error) {
transformationMetadata.status = 'failed';
transformationMetadata.error = error.message;
transformationMetadata.end_time = new Date();
throw error;
}
}
buildAggregationPipeline(config) {
const pipeline = [];
// Stage 1: Data filtering and source selection
if (config.source_filter) {
pipeline.push({
$match: {
...config.source_filter,
'_extraction_metadata.extraction_id': { $exists: true }
}
});
}
// Stage 2: Data cleansing and validation
if (config.cleansing_rules) {
pipeline.push({
$addFields: {
// Clean and validate data
cleaned_data: {
$let: {
vars: {
cleaned: {
$switch: {
branches: config.cleansing_rules.map(rule => ({
case: rule.condition,
then: rule.transformation
})),
default: "$$ROOT"
}
}
},
in: "$$cleaned"
}
}
}
});
}
// Stage 3: Data enrichment and computed fields
if (config.enrichment_rules) {
const enrichmentFields = {};
config.enrichment_rules.forEach(rule => {
enrichmentFields[rule.target_field] = rule.computation;
});
pipeline.push({
$addFields: enrichmentFields
});
}
// Stage 4: Nested document transformations
if (config.nested_transformations) {
config.nested_transformations.forEach(transformation => {
if (transformation.type === 'array_processing') {
pipeline.push({
$addFields: {
[transformation.target_field]: {
$map: {
input: `$${transformation.source_field}`,
as: "item",
in: transformation.item_transformation
}
}
}
});
} else if (transformation.type === 'object_flattening') {
pipeline.push({
$addFields: this.buildFlatteningTransformation(transformation)
});
}
});
}
// Stage 5: Data aggregation and grouping
if (config.aggregation_rules) {
config.aggregation_rules.forEach(rule => {
if (rule.type === 'group') {
pipeline.push({
$group: {
_id: rule.group_by,
...rule.aggregations
}
});
} else if (rule.type === 'bucket') {
pipeline.push({
$bucket: {
groupBy: rule.group_by,
boundaries: rule.boundaries,
default: rule.default_bucket,
output: rule.output
}
});
}
});
}
// Stage 6: Output formatting and projection
if (config.output_format) {
pipeline.push({
$project: {
...config.output_format.fields,
_transformation_metadata: {
transformation_id: config.transformation_id,
pipeline_name: config.name,
transformed_at: new Date(),
source_extraction_id: "$_extraction_metadata.extraction_id"
}
}
});
}
return pipeline;
}
async executeWithErrorHandling(pipeline, transformationId, metadata) {
let successCount = 0;
let errorCount = 0;
const batchSize = 1000;
try {
// Use aggregation cursor for memory-efficient processing
const cursor = this.rawCollection.aggregate(pipeline, {
allowDiskUse: true,
cursor: { batchSize }
});
let batch = [];
for await (const document of cursor) {
try {
// Additional validation can be performed here
if (this.validateTransformedDocument(document)) {
batch.push(document);
successCount++;
} else {
await this.logTransformationError(
transformationId,
document,
'validation_failed',
'Document failed validation rules'
);
errorCount++;
}
// Process batch when full
if (batch.length >= batchSize) {
await this.insertTransformedBatch(batch, transformationId);
batch = [];
}
} catch (docError) {
await this.logTransformationError(
transformationId,
document,
'processing_error',
docError.message
);
errorCount++;
}
}
// Insert remaining documents
if (batch.length > 0) {
await this.insertTransformedBatch(batch, transformationId);
}
return { successCount, errorCount };
} catch (pipelineError) {
throw new Error(`Pipeline execution failed: ${pipelineError.message}`);
}
}
buildFlatteningTransformation(config) {
const flattenedFields = {};
// Flatten nested object structure
const flattenObject = (obj, prefix = '') => {
for (const [key, value] of Object.entries(obj)) {
const newKey = prefix ? `${prefix}.${key}` : key;
if (typeof value === 'object' && !Array.isArray(value)) {
flattenObject(value, newKey);
} else {
flattenedFields[newKey.replace('.', '_')] = `$${newKey}`;
}
}
};
flattenObject(config.source_structure);
return flattenedFields;
}
async insertTransformedBatch(batch, transformationId) {
try {
await this.transformedCollection.insertMany(batch, {
ordered: false,
writeConcern: { w: 1, j: true }
});
} catch (error) {
// Handle partial failures in batch
if (error.writeErrors) {
for (const writeError of error.writeErrors) {
await this.logTransformationError(
transformationId,
batch[writeError.index],
'insert_error',
writeError.errmsg
);
}
} else {
throw error;
}
}
}
validateTransformedDocument(doc) {
// Implement document validation logic
// Check required fields, data types, business rules, etc.
return true; // Placeholder
}
async logTransformationError(transformationId, document, errorType, errorMessage) {
const errorDoc = {
transformation_id: transformationId,
error_type: errorType,
error_message: errorMessage,
failed_document_id: document._id,
failed_document_sample: JSON.stringify(document).substring(0, 1000),
timestamp: new Date()
};
await this.errorCollection.insertOne(errorDoc);
}
// Complex transformation functions
async executeCustomTransformations(documents, transformationConfig) {
return documents.map(doc => {
let transformedDoc = { ...doc };
// Text processing transformations
if (transformationConfig.text_processing) {
transformedDoc = this.applyTextTransformations(transformedDoc, transformationConfig.text_processing);
}
// Date and time transformations
if (transformationConfig.date_processing) {
transformedDoc = this.applyDateTransformations(transformedDoc, transformationConfig.date_processing);
}
// Numerical computations
if (transformationConfig.numerical_processing) {
transformedDoc = this.applyNumericalTransformations(transformedDoc, transformationConfig.numerical_processing);
}
return transformedDoc;
});
}
applyTextTransformations(doc, config) {
// Text cleaning, normalization, extraction
config.forEach(rule => {
const fieldValue = this.getNestedValue(doc, rule.field);
if (fieldValue && typeof fieldValue === 'string') {
switch (rule.operation) {
case 'normalize':
this.setNestedValue(doc, rule.field, fieldValue.toLowerCase().trim());
break;
case 'extract_email':
const emailMatch = fieldValue.match(/[\w\.-]+@[\w\.-]+\.\w+/);
if (emailMatch) {
this.setNestedValue(doc, rule.target_field, emailMatch[0]);
}
break;
case 'extract_phone':
const phoneMatch = fieldValue.match(/(\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}/);
if (phoneMatch) {
this.setNestedValue(doc, rule.target_field, phoneMatch[0]);
}
break;
case 'split':
const parts = fieldValue.split(rule.delimiter);
this.setNestedValue(doc, rule.target_field, parts);
break;
}
}
});
return doc;
}
applyDateTransformations(doc, config) {
// Date parsing, formatting, calculations
config.forEach(rule => {
const fieldValue = this.getNestedValue(doc, rule.field);
if (fieldValue) {
switch (rule.operation) {
case 'parse_date':
const parsedDate = new Date(fieldValue);
if (!isNaN(parsedDate.getTime())) {
this.setNestedValue(doc, rule.target_field, parsedDate);
}
break;
case 'extract_components':
const date = new Date(fieldValue);
if (!isNaN(date.getTime())) {
this.setNestedValue(doc, `${rule.target_field}_year`, date.getFullYear());
this.setNestedValue(doc, `${rule.target_field}_month`, date.getMonth() + 1);
this.setNestedValue(doc, `${rule.target_field}_day`, date.getDate());
}
break;
case 'age_calculation':
const birthDate = new Date(fieldValue);
const age = Math.floor((Date.now() - birthDate.getTime()) / (1000 * 60 * 60 * 24 * 365));
this.setNestedValue(doc, rule.target_field, age);
break;
}
}
});
return doc;
}
applyNumericalTransformations(doc, config) {
// Numerical calculations, conversions, aggregations
config.forEach(rule => {
const fieldValue = this.getNestedValue(doc, rule.field);
if (typeof fieldValue === 'number') {
switch (rule.operation) {
case 'currency_conversion':
const convertedValue = fieldValue * rule.exchange_rate;
this.setNestedValue(doc, rule.target_field, Math.round(convertedValue * 100) / 100);
break;
case 'percentage_calculation':
const total = this.getNestedValue(doc, rule.total_field);
if (total && total !== 0) {
const percentage = (fieldValue / total) * 100;
this.setNestedValue(doc, rule.target_field, Math.round(percentage * 100) / 100);
}
break;
case 'range_classification':
let classification = 'unknown';
for (const range of rule.ranges) {
if (fieldValue >= range.min && fieldValue <= range.max) {
classification = range.label;
break;
}
}
this.setNestedValue(doc, rule.target_field, classification);
break;
}
}
});
return doc;
}
getNestedValue(obj, path) {
return path.split('.').reduce((current, prop) => current?.[prop], obj);
}
setNestedValue(obj, path, value) {
const props = path.split('.');
const lastProp = props.pop();
const target = props.reduce((current, prop) => {
if (!current[prop]) current[prop] = {};
return current[prop];
}, obj);
target[lastProp] = value;
}
}
Data Loading and Output Management
Implement flexible data loading strategies:
// Data loading and output service
class DataLoadingService {
constructor(db) {
this.db = db;
this.transformedCollection = db.collection('transformed_data');
this.outputMetadataCollection = db.collection('output_metadata');
}
async loadToMongoDB(targetConfig, loadId) {
const loadMetadata = {
load_id: loadId,
target_type: 'mongodb',
target_config: targetConfig,
start_time: new Date(),
status: 'in_progress',
records_loaded: 0,
errors: []
};
try {
const targetCollection = this.db.collection(targetConfig.collection);
// Configure loading strategy
const loadingStrategy = targetConfig.strategy || 'append';
if (loadingStrategy === 'replace') {
// Clear target collection before loading
await targetCollection.deleteMany({});
}
// Load data in batches
const batchSize = targetConfig.batch_size || 1000;
const pipeline = this.buildLoadingPipeline(targetConfig);
const cursor = this.transformedCollection.aggregate(pipeline, {
allowDiskUse: true,
cursor: { batchSize }
});
let batch = [];
let recordCount = 0;
for await (const doc of cursor) {
// Apply final transformations for target schema
const finalDoc = this.applyTargetTransformations(doc, targetConfig);
batch.push(finalDoc);
if (batch.length >= batchSize) {
await this.insertBatch(targetCollection, batch, loadingStrategy);
recordCount += batch.length;
batch = [];
// Update progress
loadMetadata.records_loaded = recordCount;
await this.updateLoadProgress(loadId, loadMetadata);
}
}
// Insert final batch
if (batch.length > 0) {
await this.insertBatch(targetCollection, batch, loadingStrategy);
recordCount += batch.length;
}
// Create indexes if specified
if (targetConfig.indexes) {
await this.createTargetIndexes(targetCollection, targetConfig.indexes);
}
loadMetadata.records_loaded = recordCount;
loadMetadata.status = 'completed';
loadMetadata.end_time = new Date();
} catch (error) {
loadMetadata.status = 'failed';
loadMetadata.error = error.message;
loadMetadata.end_time = new Date();
throw error;
} finally {
await this.outputMetadataCollection.replaceOne(
{ load_id: loadId },
loadMetadata,
{ upsert: true }
);
}
return loadMetadata;
}
async loadToWarehouse(warehouseConfig, loadId) {
// Load data to external data warehouse (Snowflake, Redshift, BigQuery)
const loadMetadata = {
load_id: loadId,
target_type: 'warehouse',
target_config: warehouseConfig,
start_time: new Date(),
status: 'in_progress'
};
try {
// Export data to format compatible with warehouse
const exportFormat = warehouseConfig.format || 'parquet';
const exportPath = await this.exportToFile(warehouseConfig, exportFormat);
// Upload to warehouse staging area
await this.uploadToWarehouse(exportPath, warehouseConfig);
// Execute warehouse loading commands
const loadResults = await this.executeWarehouseLoad(warehouseConfig);
loadMetadata.records_loaded = loadResults.recordCount;
loadMetadata.warehouse_table = loadResults.tableName;
loadMetadata.status = 'completed';
loadMetadata.end_time = new Date();
} catch (error) {
loadMetadata.status = 'failed';
loadMetadata.error = error.message;
loadMetadata.end_time = new Date();
throw error;
} finally {
await this.outputMetadataCollection.replaceOne(
{ load_id: loadId },
loadMetadata,
{ upsert: true }
);
}
return loadMetadata;
}
async loadToAPI(apiConfig, loadId) {
// Push data to external APIs
const loadMetadata = {
load_id: loadId,
target_type: 'api',
target_config: apiConfig,
start_time: new Date(),
status: 'in_progress',
api_calls_made: 0,
records_sent: 0
};
try {
const batchSize = apiConfig.batch_size || 100;
const pipeline = this.buildLoadingPipeline(apiConfig);
const cursor = this.transformedCollection.aggregate(pipeline);
let batch = [];
let apiCallCount = 0;
let recordCount = 0;
for await (const doc of cursor) {
const apiDoc = this.formatForAPI(doc, apiConfig);
batch.push(apiDoc);
if (batch.length >= batchSize) {
await this.sendToAPI(batch, apiConfig);
apiCallCount++;
recordCount += batch.length;
batch = [];
// Rate limiting
if (apiConfig.rate_limit_delay) {
await this.delay(apiConfig.rate_limit_delay);
}
// Update progress
loadMetadata.api_calls_made = apiCallCount;
loadMetadata.records_sent = recordCount;
await this.updateLoadProgress(loadId, loadMetadata);
}
}
// Send final batch
if (batch.length > 0) {
await this.sendToAPI(batch, apiConfig);
apiCallCount++;
recordCount += batch.length;
}
loadMetadata.api_calls_made = apiCallCount;
loadMetadata.records_sent = recordCount;
loadMetadata.status = 'completed';
loadMetadata.end_time = new Date();
} catch (error) {
loadMetadata.status = 'failed';
loadMetadata.error = error.message;
loadMetadata.end_time = new Date();
throw error;
} finally {
await this.outputMetadataCollection.replaceOne(
{ load_id: loadId },
loadMetadata,
{ upsert: true }
);
}
return loadMetadata;
}
buildLoadingPipeline(config) {
const pipeline = [];
// Filter data for loading
if (config.filter) {
pipeline.push({ $match: config.filter });
}
// Sort for consistent ordering
if (config.sort) {
pipeline.push({ $sort: config.sort });
}
// Limit if specified
if (config.limit) {
pipeline.push({ $limit: config.limit });
}
// Project fields for target format
if (config.projection) {
pipeline.push({ $project: config.projection });
}
return pipeline;
}
applyTargetTransformations(doc, config) {
let transformedDoc = { ...doc };
// Apply target-specific field mappings
if (config.field_mappings) {
const mappedDoc = {};
for (const [sourceField, targetField] of Object.entries(config.field_mappings)) {
const value = this.getNestedValue(transformedDoc, sourceField);
if (value !== undefined) {
this.setNestedValue(mappedDoc, targetField, value);
}
}
transformedDoc = { ...transformedDoc, ...mappedDoc };
}
// Apply data type conversions
if (config.type_conversions) {
config.type_conversions.forEach(conversion => {
const value = this.getNestedValue(transformedDoc, conversion.field);
if (value !== undefined) {
const convertedValue = this.convertDataType(value, conversion.target_type, conversion.options);
this.setNestedValue(transformedDoc, conversion.field, convertedValue);
}
});
}
return transformedDoc;
}
async insertBatch(collection, batch, strategy) {
if (strategy === 'upsert') {
// Perform upsert operations
const bulkOps = batch.map(doc => ({
replaceOne: {
filter: { [doc._id ? '_id' : 'unique_key']: doc._id || doc.unique_key },
replacement: doc,
upsert: true
}
}));
await collection.bulkWrite(bulkOps, { ordered: false });
} else {
// Regular insert
await collection.insertMany(batch, { ordered: false });
}
}
async createTargetIndexes(collection, indexDefinitions) {
for (const indexDef of indexDefinitions) {
try {
await collection.createIndex(indexDef.fields, indexDef.options || {});
} catch (error) {
console.warn(`Failed to create index: ${error.message}`);
}
}
}
convertDataType(value, targetType, options = {}) {
switch (targetType) {
case 'string':
return String(value);
case 'number':
return Number(value);
case 'date':
return new Date(value);
case 'boolean':
return Boolean(value);
case 'array':
return Array.isArray(value) ? value : [value];
case 'object':
return typeof value === 'object' ? value : { value };
default:
return value;
}
}
getNestedValue(obj, path) {
return path.split('.').reduce((current, prop) => current?.[prop], obj);
}
setNestedValue(obj, path, value) {
const props = path.split('.');
const lastProp = props.pop();
const target = props.reduce((current, prop) => {
if (!current[prop]) current[prop] = {};
return current[prop];
}, obj);
target[lastProp] = value;
}
async updateLoadProgress(loadId, metadata) {
await this.outputMetadataCollection.updateOne(
{ load_id: loadId },
{ $set: metadata },
{ upsert: true }
);
}
async exportToFile(config, format) {
// Implement file export logic
return '/tmp/export.parquet'; // Placeholder
}
async uploadToWarehouse(filePath, config) {
// Implement warehouse upload logic
}
async executeWarehouseLoad(config) {
// Execute warehouse-specific loading commands
return { recordCount: 0, tableName: config.table }; // Placeholder
}
formatForAPI(doc, config) {
// Format document according to API requirements
return doc; // Placeholder
}
async sendToAPI(batch, config) {
// Send data to external API
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Real-Time Stream Processing
Change Stream ETL
Implement real-time ETL using MongoDB change streams:
// Real-time ETL using change streams
class StreamETLProcessor {
constructor(db, config) {
this.db = db;
this.config = config;
this.changeStreams = new Map();
this.transformationService = new DataTransformationService(db);
this.loadingService = new DataLoadingService(db);
}
async startStreamProcessing(streamConfig) {
const sourceCollection = this.db.collection(streamConfig.source_collection);
// Configure change stream options
const changeStreamOptions = {
fullDocument: 'updateLookup',
resumeAfter: streamConfig.resume_token,
maxAwaitTimeMS: 1000
};
// Create change stream with pipeline filter
const pipeline = [];
if (streamConfig.operation_filter) {
pipeline.push({
$match: {
'operationType': { $in: streamConfig.operation_filter }
}
});
}
if (streamConfig.document_filter) {
pipeline.push({
$match: {
'fullDocument': streamConfig.document_filter
}
});
}
const changeStream = sourceCollection.watch(pipeline, changeStreamOptions);
this.changeStreams.set(streamConfig.stream_id, changeStream);
// Process change events
changeStream.on('change', async (changeEvent) => {
try {
await this.processChangeEvent(changeEvent, streamConfig);
} catch (error) {
console.error('Error processing change event:', error);
await this.handleStreamError(error, changeEvent, streamConfig);
}
});
changeStream.on('error', async (error) => {
console.error('Change stream error:', error);
await this.handleStreamError(error, null, streamConfig);
});
console.log(`Started stream processing for ${streamConfig.stream_id}`);
return changeStream;
}
async processChangeEvent(changeEvent, streamConfig) {
const processingMetadata = {
stream_id: streamConfig.stream_id,
change_event_id: changeEvent._id,
operation_type: changeEvent.operationType,
processing_time: new Date(),
status: 'processing'
};
try {
let documentToProcess = null;
// Extract document based on operation type
switch (changeEvent.operationType) {
case 'insert':
case 'replace':
documentToProcess = changeEvent.fullDocument;
break;
case 'update':
documentToProcess = changeEvent.fullDocument;
// Include update description for delta processing
documentToProcess._updateDescription = changeEvent.updateDescription;
break;
case 'delete':
documentToProcess = changeEvent.fullDocumentBeforeChange ||
{ _id: changeEvent.documentKey._id, _deleted: true };
break;
}
if (documentToProcess) {
// Apply real-time transformations
const transformedDocument = await this.applyStreamTransformations(
documentToProcess,
streamConfig.transformations,
changeEvent
);
// Apply business rules and validations
if (streamConfig.business_rules) {
const validationResult = await this.validateBusinessRules(
transformedDocument,
streamConfig.business_rules,
changeEvent
);
if (!validationResult.valid) {
await this.handleValidationFailure(validationResult, transformedDocument, streamConfig);
return;
}
}
// Load to target systems
if (streamConfig.target_systems) {
await this.loadToTargetSystems(
transformedDocument,
streamConfig.target_systems,
changeEvent
);
}
// Update processing status
processingMetadata.status = 'completed';
processingMetadata.records_processed = 1;
}
await this.logStreamProcessing(processingMetadata);
} catch (error) {
processingMetadata.status = 'failed';
processingMetadata.error = error.message;
await this.logStreamProcessing(processingMetadata);
throw error;
}
}
async applyStreamTransformations(document, transformations, changeEvent) {
let transformedDoc = { ...document };
for (const transformation of transformations) {
switch (transformation.type) {
case 'field_mapping':
transformedDoc = this.applyFieldMapping(transformedDoc, transformation.mapping);
break;
case 'computed_fields':
transformedDoc = await this.applyComputedFields(transformedDoc, transformation.computations);
break;
case 'enrichment':
transformedDoc = await this.applyEnrichment(transformedDoc, transformation.enrichment_config);
break;
case 'aggregation':
transformedDoc = await this.applyStreamAggregation(transformedDoc, transformation.aggregation_config, changeEvent);
break;
case 'custom_function':
transformedDoc = await transformation.function(transformedDoc, changeEvent);
break;
}
}
// Add stream processing metadata
transformedDoc._stream_metadata = {
stream_id: changeEvent.streamId,
change_event_id: changeEvent._id,
operation_type: changeEvent.operationType,
processed_at: new Date(),
cluster_time: changeEvent.clusterTime
};
return transformedDoc;
}
async applyComputedFields(document, computations) {
const enrichedDoc = { ...document };
for (const computation of computations) {
try {
let computedValue = null;
switch (computation.type) {
case 'lookup':
computedValue = await this.performLookup(document, computation.config);
break;
case 'calculation':
computedValue = this.performCalculation(document, computation.config);
break;
case 'text_processing':
computedValue = this.performTextProcessing(document, computation.config);
break;
case 'date_calculation':
computedValue = this.performDateCalculation(document, computation.config);
break;
}
if (computedValue !== null) {
this.setNestedValue(enrichedDoc, computation.target_field, computedValue);
}
} catch (error) {
console.warn(`Failed to compute field ${computation.target_field}:`, error);
}
}
return enrichedDoc;
}
async applyEnrichment(document, enrichmentConfig) {
const enrichedDoc = { ...document };
for (const enrichment of enrichmentConfig) {
try {
const lookupCollection = this.db.collection(enrichment.lookup_collection);
const lookupKey = this.getNestedValue(document, enrichment.source_field);
if (lookupKey) {
const lookupDoc = await lookupCollection.findOne({
[enrichment.lookup_field]: lookupKey
});
if (lookupDoc) {
// Merge lookup data
if (enrichment.merge_strategy === 'full') {
Object.assign(enrichedDoc, lookupDoc);
} else if (enrichment.merge_strategy === 'selective') {
enrichment.fields_to_merge.forEach(field => {
if (lookupDoc[field] !== undefined) {
this.setNestedValue(enrichedDoc, enrichment.target_prefix + field, lookupDoc[field]);
}
});
}
}
}
} catch (error) {
console.warn(`Failed to apply enrichment ${enrichment.name}:`, error);
}
}
return enrichedDoc;
}
async applyStreamAggregation(document, aggregationConfig, changeEvent) {
const aggregatedDoc = { ...document };
// Real-time aggregation using upsert operations
for (const aggregation of aggregationConfig) {
try {
const aggregationCollection = this.db.collection(aggregation.target_collection);
const groupingKey = this.buildGroupingKey(document, aggregation.group_by);
// Build update operations for real-time aggregation
const updateOps = {};
aggregation.aggregations.forEach(agg => {
const sourceValue = this.getNestedValue(document, agg.source_field);
switch (agg.operation) {
case 'sum':
updateOps.$inc = updateOps.$inc || {};
updateOps.$inc[agg.target_field] = sourceValue || 0;
break;
case 'count':
updateOps.$inc = updateOps.$inc || {};
updateOps.$inc[agg.target_field] = 1;
break;
case 'avg':
updateOps.$inc = updateOps.$inc || {};
updateOps.$inc[`${agg.target_field}_sum`] = sourceValue || 0;
updateOps.$inc[`${agg.target_field}_count`] = 1;
break;
case 'min':
updateOps.$min = updateOps.$min || {};
updateOps.$min[agg.target_field] = sourceValue;
break;
case 'max':
updateOps.$max = updateOps.$max || {};
updateOps.$max[agg.target_field] = sourceValue;
break;
case 'addToSet':
updateOps.$addToSet = updateOps.$addToSet || {};
updateOps.$addToSet[agg.target_field] = sourceValue;
break;
}
});
// Set metadata fields
updateOps.$set = updateOps.$set || {};
updateOps.$set.last_updated = new Date();
updateOps.$set.last_change_event = changeEvent._id;
// Perform upsert operation
await aggregationCollection.updateOne(
groupingKey,
updateOps,
{ upsert: true }
);
} catch (error) {
console.warn(`Failed to apply stream aggregation ${aggregation.name}:`, error);
}
}
return aggregatedDoc;
}
buildGroupingKey(document, groupByConfig) {
const groupingKey = {};
if (Array.isArray(groupByConfig)) {
groupByConfig.forEach(field => {
const value = this.getNestedValue(document, field);
groupingKey[field.replace('.', '_')] = value;
});
} else if (typeof groupByConfig === 'object') {
Object.entries(groupByConfig).forEach(([key, expression]) => {
// Support for complex grouping expressions
groupingKey[key] = this.evaluateExpression(document, expression);
});
}
return groupingKey;
}
async validateBusinessRules(document, businessRules, changeEvent) {
const validationResult = {
valid: true,
failures: [],
warnings: []
};
for (const rule of businessRules) {
try {
const ruleResult = await this.evaluateBusinessRule(document, rule, changeEvent);
if (!ruleResult.passed) {
if (rule.severity === 'error') {
validationResult.valid = false;
validationResult.failures.push({
rule: rule.name,
message: ruleResult.message
});
} else if (rule.severity === 'warning') {
validationResult.warnings.push({
rule: rule.name,
message: ruleResult.message
});
}
}
} catch (error) {
validationResult.valid = false;
validationResult.failures.push({
rule: rule.name,
message: `Rule evaluation failed: ${error.message}`
});
}
}
return validationResult;
}
async evaluateBusinessRule(document, rule, changeEvent) {
switch (rule.type) {
case 'field_validation':
return this.validateField(document, rule.config);
case 'cross_document_validation':
return await this.validateCrossDocument(document, rule.config);
case 'temporal_validation':
return this.validateTemporal(document, changeEvent, rule.config);
case 'custom_validation':
return await rule.config.validator(document, changeEvent);
default:
return { passed: true };
}
}
validateField(document, config) {
const fieldValue = this.getNestedValue(document, config.field);
// Check required fields
if (config.required && (fieldValue === undefined || fieldValue === null)) {
return {
passed: false,
message: `Required field '${config.field}' is missing`
};
}
// Check data type
if (fieldValue !== undefined && config.data_type) {
const actualType = typeof fieldValue;
if (actualType !== config.data_type) {
return {
passed: false,
message: `Field '${config.field}' expected type ${config.data_type}, got ${actualType}`
};
}
}
// Check value range
if (fieldValue !== undefined && config.range) {
if (fieldValue < config.range.min || fieldValue > config.range.max) {
return {
passed: false,
message: `Field '${config.field}' value ${fieldValue} is outside range [${config.range.min}, ${config.range.max}]`
};
}
}
// Check allowed values
if (fieldValue !== undefined && config.allowed_values) {
if (!config.allowed_values.includes(fieldValue)) {
return {
passed: false,
message: `Field '${config.field}' value '${fieldValue}' is not in allowed values: ${config.allowed_values.join(', ')}`
};
}
}
return { passed: true };
}
async validateCrossDocument(document, config) {
const referenceCollection = this.db.collection(config.reference_collection);
const referenceValue = this.getNestedValue(document, config.source_field);
if (referenceValue) {
const referenceDoc = await referenceCollection.findOne({
[config.reference_field]: referenceValue
});
if (!referenceDoc && config.required) {
return {
passed: false,
message: `Reference document not found for ${config.source_field}: ${referenceValue}`
};
}
// Additional cross-document validations
if (referenceDoc && config.additional_checks) {
for (const check of config.additional_checks) {
const refValue = this.getNestedValue(referenceDoc, check.reference_field);
const docValue = this.getNestedValue(document, check.document_field);
if (!this.compareValues(docValue, refValue, check.comparison)) {
return {
passed: false,
message: `Cross-document validation failed: ${check.message}`
};
}
}
}
}
return { passed: true };
}
validateTemporal(document, changeEvent, config) {
const timestamp = changeEvent.clusterTime || new Date();
// Check business hours
if (config.business_hours) {
const hour = timestamp.getHours();
if (hour < config.business_hours.start || hour > config.business_hours.end) {
return {
passed: false,
message: `Operation outside business hours: ${hour}`
};
}
}
// Check rate limits
if (config.rate_limit) {
// This would require additional state tracking
// Implementation depends on specific rate limiting strategy
}
return { passed: true };
}
compareValues(value1, value2, comparison) {
switch (comparison) {
case 'eq': return value1 === value2;
case 'ne': return value1 !== value2;
case 'gt': return value1 > value2;
case 'gte': return value1 >= value2;
case 'lt': return value1 < value2;
case 'lte': return value1 <= value2;
default: return false;
}
}
async loadToTargetSystems(document, targetSystems, changeEvent) {
for (const target of targetSystems) {
try {
switch (target.type) {
case 'mongodb':
await this.loadToMongoTarget(document, target.config);
break;
case 'elasticsearch':
await this.loadToElasticsearch(document, target.config);
break;
case 'kafka':
await this.loadToKafka(document, target.config, changeEvent);
break;
case 'webhook':
await this.loadToWebhook(document, target.config);
break;
}
} catch (error) {
console.error(`Failed to load to target system ${target.name}:`, error);
await this.handleTargetLoadError(error, document, target, changeEvent);
}
}
}
async loadToMongoTarget(document, config) {
const targetCollection = this.db.collection(config.collection);
if (config.strategy === 'upsert') {
const filter = {};
config.unique_fields.forEach(field => {
filter[field] = this.getNestedValue(document, field);
});
await targetCollection.replaceOne(filter, document, { upsert: true });
} else {
await targetCollection.insertOne(document);
}
}
async loadToKafka(document, config, changeEvent) {
// Send to Kafka topic
const message = {
key: this.getNestedValue(document, config.key_field),
value: JSON.stringify(document),
headers: {
operation_type: changeEvent.operationType,
timestamp: new Date().toISOString()
}
};
// This would use a Kafka producer client
console.log(`Would send to Kafka topic ${config.topic}:`, message);
}
async loadToWebhook(document, config) {
// Send HTTP request to webhook
const payload = {
data: document,
timestamp: new Date().toISOString(),
source: 'mongodb-etl'
};
// This would use an HTTP client
console.log(`Would send webhook to ${config.url}:`, payload);
}
async handleStreamError(error, changeEvent, streamConfig) {
// Log stream processing errors
const errorDoc = {
stream_id: streamConfig.stream_id,
error_type: 'stream_processing_error',
error_message: error.message,
change_event: changeEvent,
timestamp: new Date()
};
await this.db.collection('stream_errors').insertOne(errorDoc);
}
async handleValidationFailure(validationResult, document, streamConfig) {
// Handle business rule validation failures
const failureDoc = {
stream_id: streamConfig.stream_id,
failure_type: 'validation_failure',
validation_failures: validationResult.failures,
validation_warnings: validationResult.warnings,
document: document,
timestamp: new Date()
};
await this.db.collection('validation_failures').insertOne(failureDoc);
}
async handleTargetLoadError(error, document, target, changeEvent) {
// Handle target system loading errors
const errorDoc = {
target_system: target.name,
error_type: 'target_load_error',
error_message: error.message,
document: document,
change_event_id: changeEvent._id,
timestamp: new Date()
};
await this.db.collection('target_load_errors').insertOne(errorDoc);
}
performLookup(document, config) {
// Implement lookup logic
return null; // Placeholder
}
performCalculation(document, config) {
// Implement calculation logic
return null; // Placeholder
}
performTextProcessing(document, config) {
// Implement text processing logic
return null; // Placeholder
}
performDateCalculation(document, config) {
// Implement date calculation logic
return null; // Placeholder
}
evaluateExpression(document, expression) {
// Implement expression evaluation
return null; // Placeholder
}
applyFieldMapping(document, mapping) {
// Implement field mapping logic
return document; // Placeholder
}
getNestedValue(obj, path) {
return path.split('.').reduce((current, prop) => current?.[prop], obj);
}
setNestedValue(obj, path, value) {
const props = path.split('.');
const lastProp = props.pop();
const target = props.reduce((current, prop) => {
if (!current[prop]) current[prop] = {};
return current[prop];
}, obj);
target[lastProp] = value;
}
async logStreamProcessing(metadata) {
await this.db.collection('stream_processing_log').insertOne(metadata);
}
async stopStreamProcessing(streamId) {
const changeStream = this.changeStreams.get(streamId);
if (changeStream) {
await changeStream.close();
this.changeStreams.delete(streamId);
console.log(`Stopped stream processing for ${streamId}`);
}
}
}
SQL-Style ETL with QueryLeaf
QueryLeaf provides familiar SQL-style ETL operations with MongoDB's powerful aggregation capabilities:
-- QueryLeaf ETL operations with SQL-familiar syntax
-- Data extraction with SQL-style filtering and projection
WITH extracted_data AS (
SELECT
customer_id,
email,
first_name,
last_name,
registration_date,
profile_data,
last_login_date,
CASE
WHEN profile_data->>'premium' = 'true' THEN 'premium'
WHEN profile_data->>'verified' = 'true' THEN 'verified'
ELSE 'basic'
END AS customer_tier
FROM raw_customers
WHERE registration_date >= CURRENT_DATE - INTERVAL '30 days'
AND email IS NOT NULL
AND email LIKE '%@%.%'
)
-- Data transformation with complex calculations
, transformed_data AS (
SELECT
customer_id,
CONCAT(first_name, ' ', last_name) AS full_name,
LOWER(email) AS normalized_email,
SUBSTRING(email FROM POSITION('@' IN email) + 1) AS email_domain,
DATE_TRUNC('month', registration_date) AS registration_month,
customer_tier,
-- Customer lifecycle calculations
CASE
WHEN last_login_date >= CURRENT_DATE - INTERVAL '7 days' THEN 'active'
WHEN last_login_date >= CURRENT_DATE - INTERVAL '30 days' THEN 'inactive'
ELSE 'dormant'
END AS activity_status,
-- Age calculation
EXTRACT(DAYS FROM (CURRENT_DATE - registration_date)) AS days_since_registration,
-- JSON processing and nested field extraction
JSON_EXTRACT(profile_data, '$.preferences.notifications') AS notification_preferences,
ARRAY_LENGTH(JSON_EXTRACT(profile_data, '$.purchase_history')) AS purchase_count,
-- Computed engagement score
CASE
WHEN customer_tier = 'premium' THEN 100
WHEN days_since_registration < 30 AND last_login_date >= CURRENT_DATE - INTERVAL '7 days' THEN 75
WHEN customer_tier = 'verified' THEN 50
ELSE 25
END AS engagement_score,
CURRENT_TIMESTAMP AS transformation_timestamp
FROM extracted_data
)
-- Data aggregation and analytical computations
, aggregated_metrics AS (
SELECT
email_domain,
registration_month,
customer_tier,
activity_status,
COUNT(*) as customer_count,
AVG(engagement_score) as avg_engagement_score,
COUNT(CASE WHEN activity_status = 'active' THEN 1 END) as active_customers,
COUNT(CASE WHEN days_since_registration < 7 THEN 1 END) as new_customers,
-- Advanced aggregations
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY engagement_score) as median_engagement,
STDDEV(engagement_score) as engagement_variance,
-- Array aggregations
ARRAY_AGG(customer_id ORDER BY engagement_score DESC LIMIT 10) as top_customers,
-- JSON aggregation for complex data structures
JSON_OBJECT_AGG(
customer_tier,
JSON_OBJECT(
'count', COUNT(*),
'avg_score', AVG(engagement_score)
)
) as tier_metrics
FROM transformed_data
GROUP BY email_domain, registration_month, customer_tier, activity_status
HAVING COUNT(*) >= 5 -- Filter out small groups
)
-- Final data loading with upsert semantics
INSERT INTO customer_analytics (
email_domain,
registration_month,
customer_tier,
activity_status,
customer_count,
avg_engagement_score,
active_customers,
new_customers,
median_engagement,
engagement_variance,
top_customers,
tier_metrics,
last_updated,
etl_run_id
)
SELECT
email_domain,
registration_month,
customer_tier,
activity_status,
customer_count,
ROUND(avg_engagement_score, 2),
active_customers,
new_customers,
ROUND(median_engagement, 2),
ROUND(engagement_variance, 2),
top_customers,
tier_metrics,
CURRENT_TIMESTAMP,
'etl_run_' || EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
FROM aggregated_metrics
ON CONFLICT (email_domain, registration_month, customer_tier, activity_status)
DO UPDATE SET
customer_count = EXCLUDED.customer_count,
avg_engagement_score = EXCLUDED.avg_engagement_score,
active_customers = EXCLUDED.active_customers,
new_customers = EXCLUDED.new_customers,
median_engagement = EXCLUDED.median_engagement,
engagement_variance = EXCLUDED.engagement_variance,
top_customers = EXCLUDED.top_customers,
tier_metrics = EXCLUDED.tier_metrics,
last_updated = EXCLUDED.last_updated,
etl_run_id = EXCLUDED.etl_run_id;
-- Real-time streaming ETL with change data capture
CREATE OR REPLACE TRIGGER customer_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON customers
FOR EACH ROW
BEGIN
-- Capture change event
INSERT INTO customer_change_stream (
change_type,
customer_id,
old_data,
new_data,
change_timestamp,
change_sequence
) VALUES (
TG_OP, -- INSERT, UPDATE, or DELETE
COALESCE(NEW.customer_id, OLD.customer_id),
CASE WHEN TG_OP = 'DELETE' THEN ROW_TO_JSON(OLD) ELSE NULL END,
CASE WHEN TG_OP != 'DELETE' THEN ROW_TO_JSON(NEW) ELSE NULL END,
CURRENT_TIMESTAMP,
nextval('change_sequence')
);
-- Trigger downstream processing
PERFORM pg_notify('customer_changed',
JSON_BUILD_OBJECT(
'operation', TG_OP,
'customer_id', COALESCE(NEW.customer_id, OLD.customer_id),
'timestamp', CURRENT_TIMESTAMP
)::TEXT
);
END;
-- Advanced window functions for trend analysis
WITH customer_trends AS (
SELECT
customer_id,
registration_date,
engagement_score,
-- Running totals and moving averages
SUM(engagement_score) OVER (
PARTITION BY email_domain
ORDER BY registration_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_engagement,
AVG(engagement_score) OVER (
PARTITION BY customer_tier
ORDER BY registration_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_engagement,
-- Ranking and percentile calculations
PERCENT_RANK() OVER (
PARTITION BY registration_month
ORDER BY engagement_score
) as engagement_percentile,
ROW_NUMBER() OVER (
PARTITION BY email_domain, customer_tier
ORDER BY engagement_score DESC
) as tier_rank,
-- Lead/Lag for sequential analysis
LAG(engagement_score, 1) OVER (
PARTITION BY customer_id
ORDER BY last_login_date
) as previous_engagement,
-- First/Last value analytics
FIRST_VALUE(engagement_score) OVER (
PARTITION BY customer_id
ORDER BY registration_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as initial_engagement
FROM transformed_data
)
-- Pivot tables for cross-dimensional analysis
SELECT *
FROM (
SELECT
registration_month,
customer_tier,
activity_status,
customer_count
FROM aggregated_metrics
) AS source_data
PIVOT (
SUM(customer_count)
FOR activity_status IN ('active', 'inactive', 'dormant')
) AS pivoted_activity;
-- Time-series analysis for trend detection
WITH monthly_trends AS (
SELECT
DATE_TRUNC('month', registration_date) as month,
customer_tier,
COUNT(*) as registrations,
AVG(engagement_score) as avg_engagement,
-- Year-over-year comparisons
LAG(COUNT(*), 12) OVER (
PARTITION BY customer_tier
ORDER BY DATE_TRUNC('month', registration_date)
) as registrations_year_ago,
-- Growth rate calculations
CASE
WHEN LAG(COUNT(*), 1) OVER (
PARTITION BY customer_tier
ORDER BY DATE_TRUNC('month', registration_date)
) > 0 THEN
ROUND(
((COUNT(*)::FLOAT / LAG(COUNT(*), 1) OVER (
PARTITION BY customer_tier
ORDER BY DATE_TRUNC('month', registration_date)
)) - 1) * 100, 2
)
ELSE NULL
END as month_over_month_growth
FROM customers
WHERE registration_date >= CURRENT_DATE - INTERVAL '24 months'
GROUP BY DATE_TRUNC('month', registration_date), customer_tier
)
-- Data quality monitoring and validation
, quality_metrics AS (
SELECT
'customers' as table_name,
COUNT(*) as total_records,
COUNT(CASE WHEN email IS NULL OR email = '' THEN 1 END) as missing_email,
COUNT(CASE WHEN first_name IS NULL OR first_name = '' THEN 1 END) as missing_first_name,
COUNT(CASE WHEN email !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 1 END) as invalid_email,
COUNT(CASE WHEN registration_date > CURRENT_DATE THEN 1 END) as future_registration,
-- Data completeness percentage
ROUND(
(COUNT(*) - COUNT(CASE WHEN email IS NULL OR email = '' THEN 1 END)) * 100.0 / COUNT(*),
2
) as email_completeness_pct,
-- Data freshness
MAX(registration_date) as latest_registration,
MIN(registration_date) as earliest_registration,
EXTRACT(DAYS FROM (CURRENT_DATE - MAX(registration_date))) as days_since_last_record
FROM customers
)
-- Output final results with data lineage tracking
SELECT
m.*,
q.total_records,
q.email_completeness_pct,
q.days_since_last_record,
'etl_pipeline_v2.1' as pipeline_version,
CURRENT_TIMESTAMP as processed_at
FROM monthly_trends m
CROSS JOIN quality_metrics q
ORDER BY m.month DESC, m.customer_tier;
-- QueryLeaf automatically handles:
-- 1. MongoDB aggregation pipeline generation for complex SQL operations
-- 2. Nested document processing and JSON operations
-- 3. Change stream integration for real-time ETL
-- 4. Parallel processing and optimization for large datasets
-- 5. Error handling and data validation
-- 6. Integration with external systems and data formats
ETL Monitoring and Management
Performance Monitoring
Implement comprehensive ETL monitoring:
// ETL monitoring and performance tracking service
class ETLMonitoringService {
constructor(db) {
this.db = db;
this.metricsCollection = db.collection('etl_metrics');
this.alertsCollection = db.collection('etl_alerts');
this.performanceCollection = db.collection('etl_performance');
}
async trackPipelineExecution(pipelineId, executionMetadata) {
const metrics = {
pipeline_id: pipelineId,
execution_id: executionMetadata.execution_id,
start_time: executionMetadata.start_time,
end_time: executionMetadata.end_time,
duration_ms: executionMetadata.end_time - executionMetadata.start_time,
status: executionMetadata.status,
// Data volume metrics
records_extracted: executionMetadata.records_extracted || 0,
records_transformed: executionMetadata.records_transformed || 0,
records_loaded: executionMetadata.records_loaded || 0,
records_failed: executionMetadata.records_failed || 0,
// Performance metrics
extraction_duration_ms: executionMetadata.extraction_duration || 0,
transformation_duration_ms: executionMetadata.transformation_duration || 0,
loading_duration_ms: executionMetadata.loading_duration || 0,
// Resource utilization
memory_peak_mb: executionMetadata.memory_peak || 0,
cpu_usage_percent: executionMetadata.cpu_usage || 0,
disk_io_mb: executionMetadata.disk_io || 0,
// Error information
error_count: executionMetadata.errors?.length || 0,
error_details: executionMetadata.errors || [],
timestamp: new Date()
};
await this.metricsCollection.insertOne(metrics);
// Check for performance anomalies
await this.checkPerformanceAlerts(pipelineId, metrics);
return metrics;
}
async generatePipelineReport(pipelineId, timeRange = 7) {
const sinceDate = new Date(Date.now() - timeRange * 24 * 60 * 60 * 1000);
const reportPipeline = [
{
$match: {
pipeline_id: pipelineId,
timestamp: { $gte: sinceDate }
}
},
{
$group: {
_id: {
date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
},
// Execution metrics
total_executions: { $sum: 1 },
successful_executions: {
$sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
},
failed_executions: {
$sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
},
// Duration statistics
avg_duration_ms: { $avg: "$duration_ms" },
max_duration_ms: { $max: "$duration_ms" },
min_duration_ms: { $min: "$duration_ms" },
// Data volume statistics
total_records_processed: { $sum: "$records_transformed" },
avg_records_per_execution: { $avg: "$records_transformed" },
// Performance metrics
avg_memory_usage_mb: { $avg: "$memory_peak_mb" },
avg_cpu_usage: { $avg: "$cpu_usage_percent" },
// Error analysis
total_errors: { $sum: "$error_count" },
unique_error_types: { $addToSet: "$error_details.error_type" }
}
},
{
$addFields: {
success_rate: {
$round: [
{ $multiply: [
{ $divide: ["$successful_executions", "$total_executions"] },
100
]},
2
]
},
throughput_records_per_minute: {
$round: [
{ $divide: [
"$avg_records_per_execution",
{ $divide: ["$avg_duration_ms", 60000] }
]},
0
]
}
}
},
{
$sort: { "_id.date": -1 }
}
];
const dailyMetrics = await this.metricsCollection.aggregate(reportPipeline).toArray();
// Generate overall statistics
const overallStats = await this.generateOverallStatistics(pipelineId, sinceDate);
// Generate trend analysis
const trendAnalysis = await this.generateTrendAnalysis(pipelineId, sinceDate);
return {
pipeline_id: pipelineId,
report_period_days: timeRange,
generated_at: new Date(),
daily_metrics: dailyMetrics,
overall_statistics: overallStats,
trend_analysis: trendAnalysis,
recommendations: await this.generateRecommendations(pipelineId, dailyMetrics, overallStats)
};
}
async generateOverallStatistics(pipelineId, sinceDate) {
const statsPipeline = [
{
$match: {
pipeline_id: pipelineId,
timestamp: { $gte: sinceDate }
}
},
{
$group: {
_id: null,
total_executions: { $sum: 1 },
successful_executions: {
$sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
},
failed_executions: {
$sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
},
avg_duration_minutes: {
$avg: { $divide: ["$duration_ms", 60000] }
},
total_records_processed: { $sum: "$records_transformed" },
total_errors: { $sum: "$error_count" },
// Performance percentiles
duration_p50: { $percentile: { input: "$duration_ms", p: [0.5], method: 'approximate' } },
duration_p95: { $percentile: { input: "$duration_ms", p: [0.95], method: 'approximate' } },
duration_p99: { $percentile: { input: "$duration_ms", p: [0.99], method: 'approximate' } },
memory_p95: { $percentile: { input: "$memory_peak_mb", p: [0.95], method: 'approximate' } },
// First and last execution
first_execution: { $min: "$timestamp" },
last_execution: { $max: "$timestamp" }
}
},
{
$addFields: {
success_rate: {
$round: [
{ $multiply: [
{ $divide: ["$successful_executions", "$total_executions"] },
100
]},
2
]
},
avg_throughput_records_per_hour: {
$round: [
{ $divide: [
"$total_records_processed",
{ $divide: [
{ $subtract: ["$last_execution", "$first_execution"] },
3600000 // Convert ms to hours
]}
]},
0
]
},
error_rate: {
$round: [
{ $multiply: [
{ $divide: ["$failed_executions", "$total_executions"] },
100
]},
2
]
}
}
}
];
const stats = await this.metricsCollection.aggregate(statsPipeline).toArray();
return stats[0] || {};
}
async generateTrendAnalysis(pipelineId, sinceDate) {
const trendPipeline = [
{
$match: {
pipeline_id: pipelineId,
timestamp: { $gte: sinceDate }
}
},
{
$group: {
_id: {
week: { $week: "$timestamp" },
year: { $year: "$timestamp" }
},
avg_duration: { $avg: "$duration_ms" },
avg_records_processed: { $avg: "$records_transformed" },
success_rate: {
$avg: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
},
execution_count: { $sum: 1 }
}
},
{
$sort: { "_id.year": 1, "_id.week": 1 }
},
{
$group: {
_id: null,
weekly_data: {
$push: {
week: "$_id.week",
year: "$_id.year",
avg_duration: "$avg_duration",
avg_records: "$avg_records_processed",
success_rate: "$success_rate",
execution_count: "$execution_count"
}
}
}
},
{
$addFields: {
// Calculate trends using linear regression approximation
duration_trend: {
$let: {
vars: {
n: { $size: "$weekly_data" },
data: "$weekly_data"
},
in: {
$cond: {
if: { $gte: ["$$n", 3] },
then: {
// Simple trend calculation (last value vs first value)
$subtract: [
{ $arrayElemAt: ["$$data.avg_duration", -1] },
{ $arrayElemAt: ["$$data.avg_duration", 0] }
]
},
else: null
}
}
}
},
performance_trend: {
$let: {
vars: {
n: { $size: "$weekly_data" },
data: "$weekly_data"
},
in: {
$cond: {
if: { $gte: ["$$n", 3] },
then: {
$subtract: [
{ $arrayElemAt: ["$$data.avg_records", -1] },
{ $arrayElemAt: ["$$data.avg_records", 0] }
]
},
else: null
}
}
}
}
}
}
];
const trendData = await this.metricsCollection.aggregate(trendPipeline).toArray();
return trendData[0] || { weekly_data: [], duration_trend: null, performance_trend: null };
}
async generateRecommendations(pipelineId, dailyMetrics, overallStats) {
const recommendations = [];
// Performance recommendations
if (overallStats.success_rate < 95) {
recommendations.push({
type: 'reliability',
priority: 'high',
message: `Pipeline success rate is ${overallStats.success_rate}%. Consider implementing error handling and retry logic.`,
suggested_actions: [
'Add retry mechanisms for transient failures',
'Implement circuit breakers for external dependencies',
'Add validation checks for input data quality'
]
});
}
if (overallStats.avg_duration_minutes > 60) {
recommendations.push({
type: 'performance',
priority: 'medium',
message: `Average execution time is ${Math.round(overallStats.avg_duration_minutes)} minutes. Consider optimization.`,
suggested_actions: [
'Implement parallel processing for transformation steps',
'Optimize database queries and indexing',
'Consider breaking pipeline into smaller, concurrent jobs'
]
});
}
// Resource utilization recommendations
if (overallStats.memory_p95 > 8000) { // 8GB
recommendations.push({
type: 'resource',
priority: 'medium',
message: `Memory usage is high (95th percentile: ${Math.round(overallStats.memory_p95)}MB).`,
suggested_actions: [
'Implement streaming processing for large datasets',
'Optimize data structures and reduce memory footprint',
'Consider increasing available memory resources'
]
});
}
// Data quality recommendations
if (overallStats.error_rate > 5) {
recommendations.push({
type: 'data_quality',
priority: 'high',
message: `High error rate detected (${overallStats.error_rate}%).`,
suggested_actions: [
'Implement comprehensive data validation',
'Add data profiling to identify quality issues',
'Set up data quality monitoring dashboards'
]
});
}
// Operational recommendations
const recentFailures = dailyMetrics.filter(day => day.failed_executions > 0).length;
if (recentFailures > dailyMetrics.length * 0.3) {
recommendations.push({
type: 'operational',
priority: 'high',
message: `Frequent failures detected in ${recentFailures} of the last ${dailyMetrics.length} days.`,
suggested_actions: [
'Set up real-time alerting for pipeline failures',
'Implement automated recovery procedures',
'Schedule regular pipeline health checks'
]
});
}
return recommendations;
}
async checkPerformanceAlerts(pipelineId, currentMetrics) {
// Define alert thresholds
const alertThresholds = await this.getAlertThresholds(pipelineId);
const alerts = [];
// Duration alert
if (currentMetrics.duration_ms > alertThresholds.max_duration_ms) {
alerts.push({
type: 'performance_degradation',
severity: 'warning',
message: `Pipeline execution took ${currentMetrics.duration_ms}ms, exceeding threshold of ${alertThresholds.max_duration_ms}ms`,
metric_value: currentMetrics.duration_ms,
threshold: alertThresholds.max_duration_ms
});
}
// Failure rate alert
if (currentMetrics.status === 'failed') {
const recentFailures = await this.getRecentFailureCount(pipelineId, 24); // Last 24 hours
if (recentFailures >= alertThresholds.max_failures_per_day) {
alerts.push({
type: 'high_failure_rate',
severity: 'critical',
message: `Pipeline has failed ${recentFailures} times in the last 24 hours`,
metric_value: recentFailures,
threshold: alertThresholds.max_failures_per_day
});
}
}
// Memory usage alert
if (currentMetrics.memory_peak_mb > alertThresholds.max_memory_mb) {
alerts.push({
type: 'high_memory_usage',
severity: 'warning',
message: `Memory usage peaked at ${currentMetrics.memory_peak_mb}MB, exceeding threshold`,
metric_value: currentMetrics.memory_peak_mb,
threshold: alertThresholds.max_memory_mb
});
}
// Data volume anomaly
if (currentMetrics.records_transformed < alertThresholds.min_records_expected * 0.5) {
alerts.push({
type: 'low_data_volume',
severity: 'warning',
message: `Processed only ${currentMetrics.records_transformed} records, significantly below expected ${alertThresholds.min_records_expected}`,
metric_value: currentMetrics.records_transformed,
threshold: alertThresholds.min_records_expected
});
}
// Store alerts
for (const alert of alerts) {
const alertDoc = {
...alert,
pipeline_id: pipelineId,
execution_id: currentMetrics.execution_id,
timestamp: new Date(),
status: 'active'
};
await this.alertsCollection.insertOne(alertDoc);
// Send notifications
await this.sendAlertNotification(alertDoc);
}
return alerts;
}
async getAlertThresholds(pipelineId) {
// Get baseline performance metrics for threshold calculation
const baseline = await this.metricsCollection.aggregate([
{
$match: {
pipeline_id: pipelineId,
status: 'completed',
timestamp: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
}
},
{
$group: {
_id: null,
avg_duration: { $avg: "$duration_ms" },
p95_duration: { $percentile: { input: "$duration_ms", p: [0.95], method: 'approximate' } },
avg_memory: { $avg: "$memory_peak_mb" },
p95_memory: { $percentile: { input: "$memory_peak_mb", p: [0.95], method: 'approximate' } },
avg_records: { $avg: "$records_transformed" }
}
}
]).toArray();
if (baseline.length === 0) {
// Default thresholds for new pipelines
return {
max_duration_ms: 3600000, // 1 hour
max_memory_mb: 4000, // 4GB
max_failures_per_day: 3,
min_records_expected: 1000
};
}
const stats = baseline[0];
return {
max_duration_ms: Math.max(stats.p95_duration * 1.5, stats.avg_duration * 2),
max_memory_mb: Math.max(stats.p95_memory * 1.5, stats.avg_memory * 2),
max_failures_per_day: 3,
min_records_expected: Math.max(stats.avg_records * 0.1, 100)
};
}
async getRecentFailureCount(pipelineId, hours) {
const since = new Date(Date.now() - hours * 60 * 60 * 1000);
const result = await this.metricsCollection.countDocuments({
pipeline_id: pipelineId,
status: 'failed',
timestamp: { $gte: since }
});
return result;
}
async sendAlertNotification(alert) {
// Implement notification logic (email, Slack, PagerDuty, etc.)
console.log('ALERT:', alert.message);
// This would integrate with actual notification services
// Example: await this.slackService.sendAlert(alert);
// Example: await this.emailService.sendAlert(alert);
}
async getDashboardMetrics(pipelineIds = [], timeRange = 24) {
const sinceDate = new Date(Date.now() - timeRange * 60 * 60 * 1000);
const matchStage = {
timestamp: { $gte: sinceDate }
};
if (pipelineIds.length > 0) {
matchStage.pipeline_id = { $in: pipelineIds };
}
const dashboardPipeline = [
{ $match: matchStage },
{
$group: {
_id: {
pipeline_id: "$pipeline_id",
hour: { $dateToString: { format: "%Y-%m-%d %H:00", date: "$timestamp" } }
},
executions: { $sum: 1 },
successes: {
$sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
},
failures: {
$sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
},
avg_duration: { $avg: "$duration_ms" },
total_records: { $sum: "$records_transformed" },
total_errors: { $sum: "$error_count" }
}
},
{
$group: {
_id: "$_id.pipeline_id",
hourly_metrics: {
$push: {
hour: "$_id.hour",
executions: "$executions",
success_rate: {
$round: [
{ $multiply: [{ $divide: ["$successes", "$executions"] }, 100] },
1
]
},
avg_duration_minutes: { $round: [{ $divide: ["$avg_duration", 60000] }, 1] },
total_records: "$total_records",
total_errors: "$total_errors"
}
},
total_executions: { $sum: "$executions" },
overall_success_rate: {
$round: [
{ $multiply: [
{ $divide: [{ $sum: "$successes" }, { $sum: "$executions" }] },
100
]},
1
]
},
total_records_processed: { $sum: "$total_records" },
total_errors: { $sum: "$total_errors" }
}
},
{ $sort: { "_id": 1 } }
];
const dashboardData = await this.metricsCollection.aggregate(dashboardPipeline).toArray();
return {
time_range_hours: timeRange,
generated_at: new Date(),
pipeline_metrics: dashboardData
};
}
}
Best Practices for ETL Implementation
ETL Design Principles
Essential guidelines for building robust ETL pipelines:
- Idempotency: Design pipelines that can be safely re-run without side effects
- Error Handling: Implement comprehensive error handling and recovery mechanisms
- Data Validation: Validate data quality at every stage of the pipeline
- Monitoring: Track performance, data quality, and operational metrics
- Scalability: Design for horizontal scaling and parallel processing
- Documentation: Maintain clear documentation of data transformations and business logic
Performance Optimization
Optimize ETL pipeline performance:
- Parallel Processing: Use MongoDB's aggregation framework for concurrent data processing
- Incremental Loading: Process only changed data to reduce processing time
- Index Optimization: Create appropriate indexes for extraction and lookup operations
- Batch Size Tuning: Optimize batch sizes for memory and throughput balance
- Resource Management: Monitor and optimize CPU, memory, and I/O utilization
- Caching: Cache frequently accessed reference data and transformation results
QueryLeaf ETL Integration
QueryLeaf enables familiar SQL-style ETL development while leveraging MongoDB's powerful aggregation capabilities. This integration provides teams with the flexibility to implement complex data transformations using familiar SQL patterns while benefiting from MongoDB's document-oriented storage and processing advantages.
Key QueryLeaf ETL benefits include:
- SQL Familiarity: Write ETL logic using familiar SQL syntax and patterns
- MongoDB Performance: Leverage MongoDB's high-performance aggregation pipeline
- Flexible Schema: Handle semi-structured and evolving data schemas effortlessly
- Real-Time Processing: Integrate change streams for real-time ETL processing
- Scalable Architecture: Build ETL pipelines that scale horizontally with data growth
Whether you're migrating from traditional ETL tools or building new data processing workflows, MongoDB with QueryLeaf's SQL interface provides a powerful foundation for modern ETL architectures that can handle the complexity and scale requirements of contemporary data environments.
Conclusion
MongoDB ETL and data pipeline processing capabilities provide enterprise-grade data transformation and processing infrastructure that addresses the challenges of modern data workflows. Combined with QueryLeaf's familiar SQL interface, MongoDB enables teams to build sophisticated ETL pipelines while preserving development patterns and query approaches they already understand.
The combination of MongoDB's flexible document model, powerful aggregation framework, and real-time change streams creates an ideal platform for handling diverse data sources, complex transformations, and scalable processing requirements. QueryLeaf's SQL-style syntax makes these capabilities accessible to broader development teams while maintaining the performance and flexibility advantages of MongoDB's native architecture.
Whether you're building batch ETL processes, real-time streaming pipelines, or hybrid architectures, MongoDB with QueryLeaf provides the tools and patterns necessary to implement robust, scalable, and maintainable data processing solutions that can adapt and evolve with your organization's growing data requirements.