MongoDB NoSQL Migration and Relational Database Modernization: Strategic Database Evolution for Modern Applications
Modern applications increasingly demand the flexibility, scalability, and performance characteristics that NoSQL databases provide, driving organizations to migrate from traditional relational database architectures to MongoDB's document-oriented model. However, this migration presents complex challenges including data transformation, schema evolution, application refactoring, and maintaining data consistency during the transition period.
MongoDB migration from relational databases requires sophisticated strategies that preserve data integrity while unlocking the benefits of document-oriented storage, flexible schemas, horizontal scaling capabilities, and improved developer productivity. Unlike simple data exports, successful MongoDB migrations involve careful planning of document structure design, relationship modeling, performance optimization, and gradual transition strategies that minimize application downtime and business disruption.
Traditional Relational Database Migration Challenges
Conventional database migration approaches often fail to leverage MongoDB's unique capabilities and struggle with complex data transformation requirements:
-- Traditional PostgreSQL schema - complex normalization causing migration challenges
-- Customer management with excessive normalization
CREATE TABLE customers (
customer_id SERIAL PRIMARY KEY,
customer_uuid UUID UNIQUE NOT NULL DEFAULT uuid_generate_v4(),
company_name VARCHAR(255) NOT NULL,
industry_code VARCHAR(10) REFERENCES industry_lookup(code),
company_size VARCHAR(20) CHECK (company_size IN ('startup', 'small', 'medium', 'large', 'enterprise')),
-- Address normalization requiring complex joins
billing_address_id INTEGER REFERENCES addresses(address_id),
shipping_address_id INTEGER REFERENCES addresses(address_id),
-- Contact information spread across multiple tables
primary_contact_id INTEGER REFERENCES contacts(contact_id),
billing_contact_id INTEGER REFERENCES contacts(contact_id),
technical_contact_id INTEGER REFERENCES contacts(contact_id),
-- Account management
account_status VARCHAR(20) DEFAULT 'active',
subscription_tier VARCHAR(50) DEFAULT 'basic',
annual_contract_value DECIMAL(12,2) DEFAULT 0,
-- Audit fields
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
created_by INTEGER REFERENCES users(user_id),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_by INTEGER REFERENCES users(user_id)
);
-- Separate address table creating complex relationships
CREATE TABLE addresses (
address_id SERIAL PRIMARY KEY,
address_type VARCHAR(20) NOT NULL CHECK (address_type IN ('billing', 'shipping', 'corporate')),
street_address_1 VARCHAR(200) NOT NULL,
street_address_2 VARCHAR(200),
city VARCHAR(100) NOT NULL,
state_province VARCHAR(100),
postal_code VARCHAR(20),
country_code VARCHAR(3) NOT NULL REFERENCES countries(code),
-- Geographic data
latitude DECIMAL(10,8),
longitude DECIMAL(11,8),
timezone VARCHAR(50),
-- Validation and verification
address_verified BOOLEAN DEFAULT FALSE,
verification_date TIMESTAMP WITH TIME ZONE,
verification_service VARCHAR(50),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Contact information table with complex relationships
CREATE TABLE contacts (
contact_id SERIAL PRIMARY KEY,
contact_type VARCHAR(30) NOT NULL,
title VARCHAR(100),
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
middle_name VARCHAR(100),
suffix VARCHAR(20),
-- Communication preferences
preferred_communication VARCHAR(20) DEFAULT 'email',
email_address VARCHAR(255) UNIQUE,
phone_number VARCHAR(20),
mobile_number VARCHAR(20),
fax_number VARCHAR(20),
-- Professional information
job_title VARCHAR(200),
department VARCHAR(100),
manager_contact_id INTEGER REFERENCES contacts(contact_id),
-- Social and professional networks
linkedin_url VARCHAR(500),
twitter_handle VARCHAR(100),
-- Communication preferences and consent
email_opt_in BOOLEAN DEFAULT TRUE,
sms_opt_in BOOLEAN DEFAULT FALSE,
marketing_consent BOOLEAN DEFAULT FALSE,
marketing_consent_date TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Product catalog with complex category hierarchy
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_uuid UUID UNIQUE NOT NULL DEFAULT uuid_generate_v4(),
product_name VARCHAR(255) NOT NULL,
product_sku VARCHAR(100) UNIQUE NOT NULL,
product_description TEXT,
-- Category management through separate table
primary_category_id INTEGER REFERENCES product_categories(category_id),
-- Pricing structure
base_price DECIMAL(10,2) NOT NULL,
currency_code VARCHAR(3) DEFAULT 'USD',
-- Inventory management
inventory_tracked BOOLEAN DEFAULT TRUE,
current_stock_level INTEGER DEFAULT 0,
minimum_stock_level INTEGER DEFAULT 10,
maximum_stock_level INTEGER DEFAULT 1000,
-- Product status and lifecycle
product_status VARCHAR(20) DEFAULT 'active' CHECK (product_status IN ('active', 'discontinued', 'draft', 'archived')),
launch_date DATE,
discontinuation_date DATE,
-- SEO and marketing
seo_title VARCHAR(200),
seo_description TEXT,
meta_keywords TEXT,
-- Physical characteristics
weight_kg DECIMAL(8,3),
length_cm DECIMAL(8,2),
width_cm DECIMAL(8,2),
height_cm DECIMAL(8,2),
-- Shipping and fulfillment
requires_shipping BOOLEAN DEFAULT TRUE,
shipping_class VARCHAR(50),
hazardous_material BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Product categories with complex hierarchy
CREATE TABLE product_categories (
category_id SERIAL PRIMARY KEY,
category_name VARCHAR(200) NOT NULL,
category_slug VARCHAR(200) UNIQUE NOT NULL,
category_description TEXT,
-- Hierarchical structure
parent_category_id INTEGER REFERENCES product_categories(category_id),
category_level INTEGER NOT NULL DEFAULT 1,
category_path TEXT, -- Computed field like '/electronics/computers/laptops'
-- Display and ordering
display_order INTEGER DEFAULT 0,
is_featured BOOLEAN DEFAULT FALSE,
is_visible BOOLEAN DEFAULT TRUE,
-- SEO optimization
seo_title VARCHAR(200),
seo_description TEXT,
category_image_url VARCHAR(500),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Order management with complex structure
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
order_uuid UUID UNIQUE NOT NULL DEFAULT uuid_generate_v4(),
order_number VARCHAR(50) UNIQUE NOT NULL,
customer_id INTEGER REFERENCES customers(customer_id),
-- Order timing
order_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
required_date DATE,
shipped_date TIMESTAMP WITH TIME ZONE,
delivered_date TIMESTAMP WITH TIME ZONE,
-- Financial information
subtotal DECIMAL(12,2) NOT NULL DEFAULT 0,
tax_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
shipping_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
discount_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
total_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
currency_code VARCHAR(3) DEFAULT 'USD',
-- Order status management
order_status VARCHAR(30) DEFAULT 'pending' CHECK (
order_status IN ('pending', 'confirmed', 'processing', 'shipped', 'delivered', 'cancelled', 'returned')
),
payment_status VARCHAR(30) DEFAULT 'pending' CHECK (
payment_status IN ('pending', 'authorized', 'captured', 'failed', 'refunded', 'partially_refunded')
),
fulfillment_status VARCHAR(30) DEFAULT 'unfulfilled' CHECK (
fulfillment_status IN ('unfulfilled', 'partially_fulfilled', 'fulfilled', 'restocked')
),
-- Shipping information
shipping_address_id INTEGER REFERENCES addresses(address_id),
billing_address_id INTEGER REFERENCES addresses(address_id),
shipping_method VARCHAR(100),
tracking_number VARCHAR(200),
shipping_carrier VARCHAR(100),
-- Customer service and notes
order_notes TEXT,
internal_notes TEXT,
customer_service_notes TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Order line items with product relationships
CREATE TABLE order_items (
order_item_id SERIAL PRIMARY KEY,
order_id INTEGER REFERENCES orders(order_id) ON DELETE CASCADE,
product_id INTEGER REFERENCES products(product_id),
-- Product information at time of order
product_name VARCHAR(255) NOT NULL,
product_sku VARCHAR(100) NOT NULL,
product_description TEXT,
-- Pricing and quantity
unit_price DECIMAL(10,2) NOT NULL,
quantity INTEGER NOT NULL DEFAULT 1,
line_total DECIMAL(12,2) NOT NULL,
-- Discounts and promotions
discount_percentage DECIMAL(5,2) DEFAULT 0,
discount_amount DECIMAL(10,2) DEFAULT 0,
promotion_code VARCHAR(100),
-- Fulfillment tracking
fulfillment_status VARCHAR(30) DEFAULT 'pending',
shipped_quantity INTEGER DEFAULT 0,
returned_quantity INTEGER DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Migration challenges with traditional relational approach:
-- Complex join requirements for simple operations
SELECT
c.company_name,
c.account_status,
c.annual_contract_value,
-- Billing address requires join
ba.street_address_1 as billing_address,
ba.city as billing_city,
ba.state_province as billing_state,
ba.country_code as billing_country,
-- Shipping address requires separate join
sa.street_address_1 as shipping_address,
sa.city as shipping_city,
sa.state_province as shipping_state,
sa.country_code as shipping_country,
-- Primary contact requires join
pc.first_name || ' ' || pc.last_name as primary_contact_name,
pc.email_address as primary_contact_email,
pc.phone_number as primary_contact_phone,
-- Billing contact requires separate join
bc.first_name || ' ' || bc.last_name as billing_contact_name,
bc.email_address as billing_contact_email,
-- Order summary requires complex aggregation
COUNT(o.order_id) as total_orders,
SUM(o.total_amount) as total_order_value,
MAX(o.order_date) as last_order_date
FROM customers c
LEFT JOIN addresses ba ON c.billing_address_id = ba.address_id
LEFT JOIN addresses sa ON c.shipping_address_id = sa.address_id
LEFT JOIN contacts pc ON c.primary_contact_id = pc.contact_id
LEFT JOIN contacts bc ON c.billing_contact_id = bc.contact_id
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE c.account_status = 'active'
AND c.annual_contract_value > 50000
GROUP BY
c.customer_id, c.company_name, c.account_status, c.annual_contract_value,
ba.street_address_1, ba.city, ba.state_province, ba.country_code,
sa.street_address_1, sa.city, sa.state_province, sa.country_code,
pc.first_name, pc.last_name, pc.email_address, pc.phone_number,
bc.first_name, bc.last_name, bc.email_address
ORDER BY c.annual_contract_value DESC;
-- Product catalog with category hierarchy complexity
WITH RECURSIVE category_hierarchy AS (
-- Base case: root categories
SELECT
category_id,
category_name,
category_slug,
parent_category_id,
category_level,
category_name as category_path,
ARRAY[category_id] as path_ids
FROM product_categories
WHERE parent_category_id IS NULL
UNION ALL
-- Recursive case: child categories
SELECT
pc.category_id,
pc.category_name,
pc.category_slug,
pc.parent_category_id,
pc.category_level,
ch.category_path || ' > ' || pc.category_name,
ch.path_ids || pc.category_id
FROM product_categories pc
INNER JOIN category_hierarchy ch ON pc.parent_category_id = ch.category_id
)
SELECT
p.product_name,
p.product_sku,
p.base_price,
p.current_stock_level,
p.product_status,
-- Category hierarchy information
ch.category_path as full_category_path,
ch.category_level as category_depth,
-- Recent order information
COUNT(oi.order_item_id) as times_ordered,
SUM(oi.quantity) as total_quantity_sold,
SUM(oi.line_total) as total_revenue,
MAX(o.order_date) as last_ordered_date
FROM products p
LEFT JOIN category_hierarchy ch ON p.primary_category_id = ch.category_id
LEFT JOIN order_items oi ON p.product_id = oi.product_id
LEFT JOIN orders o ON oi.order_id = o.order_id
WHERE p.product_status = 'active'
AND o.order_date >= CURRENT_DATE - INTERVAL '6 months'
GROUP BY
p.product_id, p.product_name, p.product_sku, p.base_price,
p.current_stock_level, p.product_status,
ch.category_path, ch.category_level
ORDER BY total_revenue DESC;
-- Problems with relational schema for migration:
-- 1. Excessive normalization creates complex join requirements
-- 2. Rigid schema makes it difficult to add new fields
-- 3. Foreign key constraints complicate data transformation
-- 4. Complex hierarchical relationships require recursive queries
-- 5. Performance degradation with increasing data volume
-- 6. Difficult to represent nested or array data structures
-- 7. Schema changes require coordinated migrations
-- 8. Poor fit for document-oriented access patterns
-- 9. Limited flexibility for varying data structures
-- 10. Complex application logic to reconstruct related data
-- Traditional migration approach (problematic):
-- Attempting direct table-to-collection mapping without optimization
-- Naive migration - maintains relational structure (inefficient)
INSERT INTO mongodb.customers
SELECT
customer_id,
customer_uuid,
company_name,
industry_code,
company_size,
billing_address_id, -- Foreign key becomes meaningless
shipping_address_id, -- Foreign key becomes meaningless
primary_contact_id, -- Foreign key becomes meaningless
account_status,
subscription_tier,
annual_contract_value,
created_at,
updated_at
FROM postgresql_customers;
-- This approach fails to leverage MongoDB's strengths:
-- - Maintains relational thinking
-- - Requires separate queries to reconstruct relationships
-- - Doesn't utilize document embedding opportunities
-- - Misses schema flexibility benefits
-- - Poor query performance due to separate collections
-- - Complex application logic to manage references
-- - No improvement in developer experience
-- - Limited scalability improvements
MongoDB provides powerful migration capabilities that transform relational schemas into optimized document structures:
// MongoDB Advanced Migration and Database Modernization Manager
const { MongoClient, ObjectId } = require('mongodb');
class MongoMigrationManager {
constructor(sourceDb, targetMongoUri, migrationConfig = {}) {
this.sourceDb = sourceDb; // Source database connection (PostgreSQL, MySQL, etc.)
this.mongoClient = new MongoClient(targetMongoUri);
this.targetDb = null;
this.migrationConfig = {
// Migration strategy configuration
migrationStrategy: migrationConfig.strategy || 'staged_migration', // Options: bulk_migration, staged_migration, live_migration
batchSize: migrationConfig.batchSize || 1000,
enableDataValidation: migrationConfig.enableDataValidation !== false,
enablePerformanceOptimization: migrationConfig.enablePerformanceOptimization !== false,
// Schema transformation settings
schemaTransformation: {
enableDocumentEmbedding: migrationConfig.enableDocumentEmbedding !== false,
enableArrayFields: migrationConfig.enableArrayFields !== false,
enableDenormalization: migrationConfig.enableDenormalization !== false,
optimizeForQueryPatterns: migrationConfig.optimizeForQueryPatterns !== false
},
// Data quality and validation
dataQuality: {
enableDataCleaning: migrationConfig.enableDataCleaning !== false,
enableReferentialIntegrityChecks: migrationConfig.enableReferentialIntegrityChecks !== false,
enableDuplicateDetection: migrationConfig.enableDuplicateDetection !== false,
enableDataTypeConversion: migrationConfig.enableDataTypeConversion !== false
},
// Performance and monitoring
performance: {
enableProgressTracking: migrationConfig.enableProgressTracking !== false,
enablePerformanceMetrics: migrationConfig.enablePerformanceMetrics !== false,
enableRollbackCapability: migrationConfig.enableRollbackCapability !== false,
maxConcurrentOperations: migrationConfig.maxConcurrentOperations || 4
}
};
this.migrationMetrics = {
startTime: null,
endTime: null,
recordsProcessed: 0,
recordsMigrated: 0,
errorsEncountered: 0,
performanceStats: {}
};
}
async initializeMigration() {
console.log('Initializing MongoDB migration system...');
try {
await this.mongoClient.connect();
this.targetDb = this.mongoClient.db('modernized_application');
// Setup target collections with optimized schemas
await this.setupOptimizedCollections();
// Initialize migration tracking
await this.initializeMigrationTracking();
// Setup performance monitoring
await this.setupPerformanceMonitoring();
console.log('Migration system initialized successfully');
} catch (error) {
console.error('Error initializing migration system:', error);
throw error;
}
}
async setupOptimizedCollections() {
console.log('Setting up optimized MongoDB collections...');
// Create customers collection with embedded documents
const customers = this.targetDb.collection('customers');
// Create optimized indexes for customer operations
await customers.createIndex({ customer_uuid: 1 }, { unique: true });
await customers.createIndex({ company_name: 1 });
await customers.createIndex({ "billing_contact.email": 1 });
await customers.createIndex({ account_status: 1, annual_contract_value: -1 });
await customers.createIndex({ industry_code: 1, company_size: 1 });
await customers.createIndex({ created_at: -1 });
// Create products collection with optimized structure
const products = this.targetDb.collection('products');
await products.createIndex({ product_uuid: 1 }, { unique: true });
await products.createIndex({ product_sku: 1 }, { unique: true });
await products.createIndex({ "categories.primary": 1 });
await products.createIndex({ "pricing.base_price": 1 });
await products.createIndex({ product_status: 1 });
await products.createIndex({ "inventory.current_stock": 1 });
// Create orders collection with embedded line items
const orders = this.targetDb.collection('orders');
await orders.createIndex({ order_uuid: 1 }, { unique: true });
await orders.createIndex({ order_number: 1 }, { unique: true });
await orders.createIndex({ customer_uuid: 1, order_date: -1 });
await orders.createIndex({ order_status: 1 });
await orders.createIndex({ order_date: -1 });
await orders.createIndex({ "financial.total_amount": -1 });
console.log('Optimized collections setup completed');
}
async migrateCustomerData() {
console.log('Migrating customer data with document optimization...');
const startTime = Date.now();
try {
const customersCollection = this.targetDb.collection('customers');
// Fetch customer data with all related information in batches
const customerQuery = `
SELECT
c.customer_id,
c.customer_uuid,
c.company_name,
c.industry_code,
c.company_size,
c.account_status,
c.subscription_tier,
c.annual_contract_value,
c.created_at,
c.updated_at,
-- Billing address information
ba.street_address_1 as billing_street_1,
ba.street_address_2 as billing_street_2,
ba.city as billing_city,
ba.state_province as billing_state,
ba.postal_code as billing_postal_code,
ba.country_code as billing_country,
ba.latitude as billing_lat,
ba.longitude as billing_lng,
ba.timezone as billing_timezone,
-- Shipping address information
sa.street_address_1 as shipping_street_1,
sa.street_address_2 as shipping_street_2,
sa.city as shipping_city,
sa.state_province as shipping_state,
sa.postal_code as shipping_postal_code,
sa.country_code as shipping_country,
sa.latitude as shipping_lat,
sa.longitude as shipping_lng,
sa.timezone as shipping_timezone,
-- Primary contact information
pc.contact_id as primary_contact_id,
pc.first_name as primary_first_name,
pc.last_name as primary_last_name,
pc.email_address as primary_email,
pc.phone_number as primary_phone,
pc.mobile_number as primary_mobile,
pc.job_title as primary_job_title,
pc.department as primary_department,
-- Billing contact information
bc.contact_id as billing_contact_id,
bc.first_name as billing_first_name,
bc.last_name as billing_last_name,
bc.email_address as billing_email,
bc.phone_number as billing_phone,
bc.job_title as billing_job_title
FROM customers c
LEFT JOIN addresses ba ON c.billing_address_id = ba.address_id
LEFT JOIN addresses sa ON c.shipping_address_id = sa.address_id
LEFT JOIN contacts pc ON c.primary_contact_id = pc.contact_id
LEFT JOIN contacts bc ON c.billing_contact_id = bc.contact_id
ORDER BY c.customer_id
`;
const customerResults = await this.sourceDb.query(customerQuery);
const batchSize = this.migrationConfig.batchSize;
for (let i = 0; i < customerResults.rows.length; i += batchSize) {
const batch = customerResults.rows.slice(i, i + batchSize);
const mongoDocuments = batch.map(customer => this.transformCustomerDocument(customer));
// Validate documents before insertion
const validatedDocuments = await this.validateDocuments(mongoDocuments);
// Insert batch with error handling
try {
await customersCollection.insertMany(validatedDocuments, { ordered: false });
this.migrationMetrics.recordsMigrated += validatedDocuments.length;
} catch (error) {
console.error(`Error inserting customer batch:`, error);
this.migrationMetrics.errorsEncountered += 1;
}
// Update progress tracking
await this.updateMigrationProgress('customers', i + batch.length, customerResults.rows.length);
}
const endTime = Date.now();
console.log(`Customer migration completed in ${endTime - startTime}ms`);
} catch (error) {
console.error('Error migrating customer data:', error);
throw error;
}
}
transformCustomerDocument(customerRow) {
// Transform relational data into optimized MongoDB document
const customer = {
_id: new ObjectId(),
customer_uuid: customerRow.customer_uuid,
company_name: customerRow.company_name,
industry_code: customerRow.industry_code,
company_size: customerRow.company_size,
// Account information
account: {
status: customerRow.account_status,
subscription_tier: customerRow.subscription_tier,
annual_contract_value: customerRow.annual_contract_value,
created_at: customerRow.created_at,
updated_at: customerRow.updated_at
},
// Embedded billing address
billing_address: customerRow.billing_street_1 ? {
street_address_1: customerRow.billing_street_1,
street_address_2: customerRow.billing_street_2,
city: customerRow.billing_city,
state_province: customerRow.billing_state,
postal_code: customerRow.billing_postal_code,
country_code: customerRow.billing_country,
coordinates: customerRow.billing_lat && customerRow.billing_lng ? {
latitude: customerRow.billing_lat,
longitude: customerRow.billing_lng
} : null,
timezone: customerRow.billing_timezone
} : null,
// Embedded shipping address
shipping_address: customerRow.shipping_street_1 ? {
street_address_1: customerRow.shipping_street_1,
street_address_2: customerRow.shipping_street_2,
city: customerRow.shipping_city,
state_province: customerRow.shipping_state,
postal_code: customerRow.shipping_postal_code,
country_code: customerRow.shipping_country,
coordinates: customerRow.shipping_lat && customerRow.shipping_lng ? {
latitude: customerRow.shipping_lat,
longitude: customerRow.shipping_lng
} : null,
timezone: customerRow.shipping_timezone
} : null,
// Embedded contact information
contacts: {
primary_contact: customerRow.primary_contact_id ? {
contact_id: customerRow.primary_contact_id,
name: {
first: customerRow.primary_first_name,
last: customerRow.primary_last_name,
full: `${customerRow.primary_first_name} ${customerRow.primary_last_name}`
},
communication: {
email: customerRow.primary_email,
phone: customerRow.primary_phone,
mobile: customerRow.primary_mobile
},
professional: {
job_title: customerRow.primary_job_title,
department: customerRow.primary_department
}
} : null,
billing_contact: customerRow.billing_contact_id ? {
contact_id: customerRow.billing_contact_id,
name: {
first: customerRow.billing_first_name,
last: customerRow.billing_last_name,
full: `${customerRow.billing_first_name} ${customerRow.billing_last_name}`
},
communication: {
email: customerRow.billing_email,
phone: customerRow.billing_phone
},
professional: {
job_title: customerRow.billing_job_title
}
} : null
},
// Migration metadata
migration_info: {
migrated_at: new Date(),
source_customer_id: customerRow.customer_id,
migration_version: '1.0',
data_validated: true
}
};
return customer;
}
async migrateProductData() {
console.log('Migrating product catalog with optimized structure...');
const startTime = Date.now();
try {
const productsCollection = this.targetDb.collection('products');
// Fetch products with category hierarchy and recent sales data
const productQuery = `
WITH RECURSIVE category_hierarchy AS (
SELECT
category_id,
category_name,
category_slug,
parent_category_id,
category_level,
category_name as category_path,
ARRAY[category_id] as path_ids
FROM product_categories
WHERE parent_category_id IS NULL
UNION ALL
SELECT
pc.category_id,
pc.category_name,
pc.category_slug,
pc.parent_category_id,
pc.category_level,
ch.category_path || ' > ' || pc.category_name,
ch.path_ids || pc.category_id
FROM product_categories pc
INNER JOIN category_hierarchy ch ON pc.parent_category_id = ch.category_id
),
recent_sales AS (
SELECT
oi.product_id,
COUNT(oi.order_item_id) as times_ordered,
SUM(oi.quantity) as total_quantity_sold,
SUM(oi.line_total) as total_revenue,
AVG(oi.unit_price) as avg_selling_price,
MAX(o.order_date) as last_ordered_date
FROM order_items oi
INNER JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY oi.product_id
)
SELECT
p.product_id,
p.product_uuid,
p.product_name,
p.product_sku,
p.product_description,
p.base_price,
p.currency_code,
p.current_stock_level,
p.minimum_stock_level,
p.maximum_stock_level,
p.product_status,
p.launch_date,
p.discontinuation_date,
p.weight_kg,
p.length_cm,
p.width_cm,
p.height_cm,
p.requires_shipping,
p.shipping_class,
p.hazardous_material,
p.seo_title,
p.seo_description,
p.meta_keywords,
p.created_at,
p.updated_at,
-- Category information
ch.category_path,
ch.category_level,
ch.path_ids,
-- Sales analytics
COALESCE(rs.times_ordered, 0) as times_ordered,
COALESCE(rs.total_quantity_sold, 0) as total_quantity_sold,
COALESCE(rs.total_revenue, 0) as total_revenue,
COALESCE(rs.avg_selling_price, 0) as avg_selling_price,
rs.last_ordered_date
FROM products p
LEFT JOIN category_hierarchy ch ON p.primary_category_id = ch.category_id
LEFT JOIN recent_sales rs ON p.product_id = rs.product_id
ORDER BY p.product_id
`;
const productResults = await this.sourceDb.query(productQuery);
const batchSize = this.migrationConfig.batchSize;
for (let i = 0; i < productResults.rows.length; i += batchSize) {
const batch = productResults.rows.slice(i, i + batchSize);
const mongoDocuments = batch.map(product => this.transformProductDocument(product));
const validatedDocuments = await this.validateDocuments(mongoDocuments);
try {
await productsCollection.insertMany(validatedDocuments, { ordered: false });
this.migrationMetrics.recordsMigrated += validatedDocuments.length;
} catch (error) {
console.error(`Error inserting product batch:`, error);
this.migrationMetrics.errorsEncountered += 1;
}
await this.updateMigrationProgress('products', i + batch.length, productResults.rows.length);
}
const endTime = Date.now();
console.log(`Product migration completed in ${endTime - startTime}ms`);
} catch (error) {
console.error('Error migrating product data:', error);
throw error;
}
}
transformProductDocument(productRow) {
// Transform relational product data into optimized MongoDB document
const product = {
_id: new ObjectId(),
product_uuid: productRow.product_uuid,
product_name: productRow.product_name,
product_sku: productRow.product_sku,
product_description: productRow.product_description,
product_status: productRow.product_status,
// Category information with full hierarchy
categories: {
primary: productRow.category_path ? productRow.category_path.split(' > ')[0] : null,
hierarchy: productRow.category_path ? productRow.category_path.split(' > ') : [],
level: productRow.category_level || 1,
path_ids: productRow.path_ids || []
},
// Pricing information
pricing: {
base_price: productRow.base_price,
currency_code: productRow.currency_code,
avg_selling_price: productRow.avg_selling_price || productRow.base_price
},
// Inventory management
inventory: {
current_stock: productRow.current_stock_level,
minimum_stock: productRow.minimum_stock_level,
maximum_stock: productRow.maximum_stock_level,
tracked: productRow.current_stock_level !== null
},
// Physical characteristics
physical: {
dimensions: {
weight_kg: productRow.weight_kg,
length_cm: productRow.length_cm,
width_cm: productRow.width_cm,
height_cm: productRow.height_cm
},
shipping: {
requires_shipping: productRow.requires_shipping,
shipping_class: productRow.shipping_class,
hazardous_material: productRow.hazardous_material
}
},
// SEO and marketing
seo: {
title: productRow.seo_title,
description: productRow.seo_description,
keywords: productRow.meta_keywords ? productRow.meta_keywords.split(',').map(k => k.trim()) : []
},
// Lifecycle information
lifecycle: {
launch_date: productRow.launch_date,
discontinuation_date: productRow.discontinuation_date,
created_at: productRow.created_at,
updated_at: productRow.updated_at
},
// Sales analytics embedded for performance
analytics: {
times_ordered: productRow.times_ordered || 0,
total_quantity_sold: productRow.total_quantity_sold || 0,
total_revenue: productRow.total_revenue || 0,
last_ordered_date: productRow.last_ordered_date,
performance_tier: this.calculatePerformanceTier(productRow)
},
// Migration tracking
migration_info: {
migrated_at: new Date(),
source_product_id: productRow.product_id,
migration_version: '1.0',
data_validated: true
}
};
return product;
}
calculatePerformanceTier(productRow) {
const revenue = productRow.total_revenue || 0;
const timesOrdered = productRow.times_ordered || 0;
if (revenue > 50000 && timesOrdered > 100) return 'high_performer';
if (revenue > 10000 && timesOrdered > 20) return 'good_performer';
if (revenue > 1000 && timesOrdered > 5) return 'average_performer';
if (timesOrdered > 0) return 'low_performer';
return 'no_sales';
}
async migrateOrderData() {
console.log('Migrating order data with embedded line items...');
const startTime = Date.now();
try {
const ordersCollection = this.targetDb.collection('orders');
// Fetch orders with embedded line items and customer information
const orderQuery = `
SELECT
o.order_id,
o.order_uuid,
o.order_number,
c.customer_uuid,
c.company_name,
o.order_date,
o.required_date,
o.shipped_date,
o.delivered_date,
o.subtotal,
o.tax_amount,
o.shipping_amount,
o.discount_amount,
o.total_amount,
o.currency_code,
o.order_status,
o.payment_status,
o.fulfillment_status,
o.shipping_method,
o.tracking_number,
o.shipping_carrier,
o.order_notes,
o.internal_notes,
o.created_at,
o.updated_at,
-- Shipping address
sa.street_address_1 as shipping_street_1,
sa.street_address_2 as shipping_street_2,
sa.city as shipping_city,
sa.state_province as shipping_state,
sa.postal_code as shipping_postal_code,
sa.country_code as shipping_country,
-- Billing address
ba.street_address_1 as billing_street_1,
ba.street_address_2 as billing_street_2,
ba.city as billing_city,
ba.state_province as billing_state,
ba.postal_code as billing_postal_code,
ba.country_code as billing_country,
-- Aggregated order items as JSON
json_agg(
json_build_object(
'order_item_id', oi.order_item_id,
'product_uuid', p.product_uuid,
'product_name', oi.product_name,
'product_sku', oi.product_sku,
'unit_price', oi.unit_price,
'quantity', oi.quantity,
'line_total', oi.line_total,
'discount_percentage', oi.discount_percentage,
'discount_amount', oi.discount_amount,
'promotion_code', oi.promotion_code,
'fulfillment_status', oi.fulfillment_status
) ORDER BY oi.order_item_id
) as order_items
FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id
LEFT JOIN addresses sa ON o.shipping_address_id = sa.address_id
LEFT JOIN addresses ba ON o.billing_address_id = ba.address_id
INNER JOIN order_items oi ON o.order_id = oi.order_id
LEFT JOIN products p ON oi.product_id = p.product_id
GROUP BY
o.order_id, o.order_uuid, o.order_number, c.customer_uuid, c.company_name,
o.order_date, o.required_date, o.shipped_date, o.delivered_date,
o.subtotal, o.tax_amount, o.shipping_amount, o.discount_amount, o.total_amount,
o.currency_code, o.order_status, o.payment_status, o.fulfillment_status,
o.shipping_method, o.tracking_number, o.shipping_carrier,
o.order_notes, o.internal_notes, o.created_at, o.updated_at,
sa.street_address_1, sa.street_address_2, sa.city, sa.state_province, sa.postal_code, sa.country_code,
ba.street_address_1, ba.street_address_2, ba.city, ba.state_province, ba.postal_code, ba.country_code
ORDER BY o.order_id
`;
const orderResults = await this.sourceDb.query(orderQuery);
const batchSize = this.migrationConfig.batchSize;
for (let i = 0; i < orderResults.rows.length; i += batchSize) {
const batch = orderResults.rows.slice(i, i + batchSize);
const mongoDocuments = batch.map(order => this.transformOrderDocument(order));
const validatedDocuments = await this.validateDocuments(mongoDocuments);
try {
await ordersCollection.insertMany(validatedDocuments, { ordered: false });
this.migrationMetrics.recordsMigrated += validatedDocuments.length;
} catch (error) {
console.error(`Error inserting order batch:`, error);
this.migrationMetrics.errorsEncountered += 1;
}
await this.updateMigrationProgress('orders', i + batch.length, orderResults.rows.length);
}
const endTime = Date.now();
console.log(`Order migration completed in ${endTime - startTime}ms`);
} catch (error) {
console.error('Error migrating order data:', error);
throw error;
}
}
transformOrderDocument(orderRow) {
// Transform relational order data into optimized MongoDB document with embedded items
const order = {
_id: new ObjectId(),
order_uuid: orderRow.order_uuid,
order_number: orderRow.order_number,
// Customer reference
customer: {
customer_uuid: orderRow.customer_uuid,
company_name: orderRow.company_name
},
// Order timing
dates: {
order_date: orderRow.order_date,
required_date: orderRow.required_date,
shipped_date: orderRow.shipped_date,
delivered_date: orderRow.delivered_date,
created_at: orderRow.created_at,
updated_at: orderRow.updated_at
},
// Financial information
financial: {
subtotal: orderRow.subtotal,
tax_amount: orderRow.tax_amount,
shipping_amount: orderRow.shipping_amount,
discount_amount: orderRow.discount_amount,
total_amount: orderRow.total_amount,
currency_code: orderRow.currency_code
},
// Status tracking
status: {
order_status: orderRow.order_status,
payment_status: orderRow.payment_status,
fulfillment_status: orderRow.fulfillment_status
},
// Shipping information
shipping: {
method: orderRow.shipping_method,
tracking_number: orderRow.tracking_number,
carrier: orderRow.shipping_carrier,
address: orderRow.shipping_street_1 ? {
street_address_1: orderRow.shipping_street_1,
street_address_2: orderRow.shipping_street_2,
city: orderRow.shipping_city,
state_province: orderRow.shipping_state,
postal_code: orderRow.shipping_postal_code,
country_code: orderRow.shipping_country
} : null
},
// Billing address
billing_address: orderRow.billing_street_1 ? {
street_address_1: orderRow.billing_street_1,
street_address_2: orderRow.billing_street_2,
city: orderRow.billing_city,
state_province: orderRow.billing_state,
postal_code: orderRow.billing_postal_code,
country_code: orderRow.billing_country
} : null,
// Embedded order items for optimal query performance
items: orderRow.order_items.map(item => ({
product_uuid: item.product_uuid,
product_name: item.product_name,
product_sku: item.product_sku,
pricing: {
unit_price: item.unit_price,
quantity: item.quantity,
line_total: item.line_total,
discount_percentage: item.discount_percentage,
discount_amount: item.discount_amount
},
promotion_code: item.promotion_code,
fulfillment_status: item.fulfillment_status
})),
// Order summary for quick access
summary: {
total_items: orderRow.order_items.length,
total_quantity: orderRow.order_items.reduce((sum, item) => sum + item.quantity, 0),
unique_products: new Set(orderRow.order_items.map(item => item.product_uuid)).size
},
// Notes and comments
notes: {
order_notes: orderRow.order_notes,
internal_notes: orderRow.internal_notes
},
// Migration tracking
migration_info: {
migrated_at: new Date(),
source_order_id: orderRow.order_id,
migration_version: '1.0',
data_validated: true
}
};
return order;
}
async validateDocuments(documents) {
if (!this.migrationConfig.dataQuality.enableDataCleaning) {
return documents;
}
// Validate and clean documents before insertion
return documents.filter(doc => {
// Basic validation
if (!doc._id) return false;
// Remove null/undefined fields
this.removeEmptyFields(doc);
return true;
});
}
removeEmptyFields(obj) {
Object.keys(obj).forEach(key => {
if (obj[key] && typeof obj[key] === 'object') {
this.removeEmptyFields(obj[key]);
} else if (obj[key] === null || obj[key] === undefined) {
delete obj[key];
}
});
}
async updateMigrationProgress(collection, processed, total) {
if (this.migrationConfig.performance.enableProgressTracking) {
const progress = {
collection: collection,
processed: processed,
total: total,
percentage: ((processed / total) * 100).toFixed(2),
timestamp: new Date()
};
console.log(`Migration progress - ${collection}: ${progress.percentage}% (${processed}/${total})`);
// Store progress in monitoring collection
await this.targetDb.collection('migration_progress').replaceOne(
{ collection: collection },
progress,
{ upsert: true }
);
}
}
async generateMigrationReport() {
console.log('Generating comprehensive migration report...');
const report = {
migration_summary: {
start_time: this.migrationMetrics.startTime,
end_time: this.migrationMetrics.endTime,
total_duration: this.migrationMetrics.endTime - this.migrationMetrics.startTime,
records_processed: this.migrationMetrics.recordsProcessed,
records_migrated: this.migrationMetrics.recordsMigrated,
errors_encountered: this.migrationMetrics.errorsEncountered,
success_rate: ((this.migrationMetrics.recordsMigrated / this.migrationMetrics.recordsProcessed) * 100).toFixed(2)
},
// Collection-specific statistics
collection_statistics: await this.generateCollectionStatistics(),
// Data quality assessment
data_quality_report: await this.generateDataQualityReport(),
// Performance metrics
performance_analysis: await this.generatePerformanceAnalysis(),
// Recommendations for optimization
optimization_recommendations: await this.generateOptimizationRecommendations()
};
// Store migration report
await this.targetDb.collection('migration_reports').insertOne({
...report,
generated_at: new Date(),
migration_id: new ObjectId()
});
return report;
}
async generateCollectionStatistics() {
const collections = ['customers', 'products', 'orders'];
const stats = {};
for (const collectionName of collections) {
const collection = this.targetDb.collection(collectionName);
stats[collectionName] = {
document_count: await collection.countDocuments(),
average_document_size: await this.calculateAverageDocumentSize(collection),
index_count: (await collection.indexes()).length,
storage_metrics: await this.getStorageMetrics(collection)
};
}
return stats;
}
}
// Benefits of MongoDB Advanced Migration:
// - Intelligent document transformation preserving relational relationships
// - Optimized schema design leveraging MongoDB's document model strengths
// - Embedded documents eliminating complex joins and improving performance
// - Flexible schema evolution supporting future application requirements
// - Comprehensive data validation and quality assurance during migration
// - Real-time migration progress tracking and performance monitoring
// - Automated optimization recommendations based on data patterns
// - Seamless integration with existing application architectures
// - Scalable migration strategies supporting large datasets
// - SQL-compatible operations through QueryLeaf for familiar database interactions
module.exports = {
MongoMigrationManager
};
Advanced Migration Patterns and Data Transformation
Complex Relationship Modeling and Document Design
Transform complex relational structures into optimized MongoDB documents:
// Advanced migration patterns for complex enterprise scenarios
class EnterpriseMigrationManager extends MongoMigrationManager {
constructor(sourceDb, targetMongoUri, enterpriseConfig) {
super(sourceDb, targetMongoUri, enterpriseConfig);
this.enterpriseConfig = {
...enterpriseConfig,
// Enterprise migration features
enableHierarchicalDataMigration: true,
enablePolymorphicDocumentSupport: true,
enableAdvancedRelationshipModeling: true,
enableDataGovernanceCompliance: true,
enableRealTimeMigrationMonitoring: true
};
}
async migrateHierarchicalData() {
console.log('Migrating hierarchical data with optimized tree structures...');
// Migrate organizational hierarchies, category trees, and nested structures
const organizationQuery = `
WITH RECURSIVE org_hierarchy AS (
SELECT
org_id,
org_name,
parent_org_id,
org_level,
org_type,
ARRAY[org_id] as path,
org_name as full_path
FROM organizations
WHERE parent_org_id IS NULL
UNION ALL
SELECT
o.org_id,
o.org_name,
o.parent_org_id,
o.org_level,
o.org_type,
oh.path || o.org_id,
oh.full_path || ' > ' || o.org_name
FROM organizations o
INNER JOIN org_hierarchy oh ON o.parent_org_id = oh.org_id
)
SELECT * FROM org_hierarchy ORDER BY path
`;
const orgResults = await this.sourceDb.query(organizationQuery);
const hierarchicalDocuments = this.transformHierarchicalStructure(orgResults.rows);
await this.targetDb.collection('organizations').insertMany(hierarchicalDocuments);
}
transformHierarchicalStructure(hierarchicalData) {
// Transform tree structures into MongoDB-optimized format
const rootNodes = hierarchicalData.filter(node => !node.parent_org_id);
return rootNodes.map(root => this.buildHierarchicalDocument(root, hierarchicalData));
}
buildHierarchicalDocument(node, allNodes) {
const children = allNodes.filter(n => n.parent_org_id === node.org_id);
return {
_id: new ObjectId(),
org_id: node.org_id,
org_name: node.org_name,
org_type: node.org_type,
org_level: node.org_level,
// Hierarchical metadata for efficient queries
hierarchy: {
path: node.path,
full_path: node.full_path,
level: node.org_level,
is_leaf: children.length === 0,
child_count: children.length
},
// Embedded children for complete tree access
children: children.map(child => this.buildHierarchicalDocument(child, allNodes)),
// Flattened descendant references for efficient searching
all_descendants: this.getAllDescendants(node.org_id, allNodes),
migration_info: {
migrated_at: new Date(),
source_org_id: node.org_id,
hierarchy_complete: true
}
};
}
}
SQL-Style Migration with QueryLeaf
QueryLeaf provides familiar SQL syntax for MongoDB migration and database modernization operations:
-- QueryLeaf advanced migration and database modernization with SQL-familiar syntax
-- Set migration context and configuration
SET migration_mode = 'staged_migration';
SET migration_batch_size = 1000;
SET enable_data_validation = true;
SET enable_performance_optimization = true;
-- Configure target MongoDB collections with optimized schemas
CREATE COLLECTION customers WITH (
-- Document structure optimization
enable_embedded_documents = true,
enable_array_fields = true,
enable_flexible_schema = true,
-- Performance optimization
default_write_concern = 'majority',
read_preference = 'primary',
-- Storage optimization
compression_algorithm = 'snappy',
enable_sharding = false
) AS (
customer_uuid UUID PRIMARY KEY,
company_name VARCHAR(255) NOT NULL,
industry_code VARCHAR(10),
company_size VARCHAR(20),
-- Embedded account information
account JSONB NOT NULL,
-- Embedded address information
billing_address JSONB,
shipping_address JSONB,
-- Embedded contact information
contacts JSONB NOT NULL,
-- Migration tracking
migration_info JSONB NOT NULL,
-- Optimized indexes
INDEX customer_uuid_idx (customer_uuid) UNIQUE,
INDEX company_name_idx (company_name),
INDEX account_status_idx ((account.status)),
INDEX billing_email_idx ((contacts.billing_contact.communication.email)),
TEXT INDEX company_search_idx (company_name, (contacts.primary_contact.name.full))
);
-- Advanced migration with data transformation and optimization
WITH source_customer_data AS (
SELECT
c.customer_uuid,
c.company_name,
c.industry_code,
c.company_size,
c.account_status,
c.subscription_tier,
c.annual_contract_value,
c.created_at,
c.updated_at,
-- Billing address aggregation
JSON_OBJECT(
'street_address_1', ba.street_address_1,
'street_address_2', ba.street_address_2,
'city', ba.city,
'state_province', ba.state_province,
'postal_code', ba.postal_code,
'country_code', ba.country_code,
'coordinates',
CASE
WHEN ba.latitude IS NOT NULL AND ba.longitude IS NOT NULL
THEN JSON_OBJECT('latitude', ba.latitude, 'longitude', ba.longitude)
ELSE NULL
END,
'timezone', ba.timezone
) as billing_address_json,
-- Shipping address aggregation
JSON_OBJECT(
'street_address_1', sa.street_address_1,
'street_address_2', sa.street_address_2,
'city', sa.city,
'state_province', sa.state_province,
'postal_code', sa.postal_code,
'country_code', sa.country_code,
'coordinates',
CASE
WHEN sa.latitude IS NOT NULL AND sa.longitude IS NOT NULL
THEN JSON_OBJECT('latitude', sa.latitude, 'longitude', sa.longitude)
ELSE NULL
END,
'timezone', sa.timezone
) as shipping_address_json,
-- Primary contact aggregation
JSON_OBJECT(
'contact_id', pc.contact_id,
'name', JSON_OBJECT(
'first', pc.first_name,
'last', pc.last_name,
'full', CONCAT(pc.first_name, ' ', pc.last_name)
),
'communication', JSON_OBJECT(
'email', pc.email_address,
'phone', pc.phone_number,
'mobile', pc.mobile_number
),
'professional', JSON_OBJECT(
'job_title', pc.job_title,
'department', pc.department
),
'preferences', JSON_OBJECT(
'preferred_communication', pc.preferred_communication,
'email_opt_in', pc.email_opt_in,
'sms_opt_in', pc.sms_opt_in
)
) as primary_contact_json,
-- Billing contact aggregation
JSON_OBJECT(
'contact_id', bc.contact_id,
'name', JSON_OBJECT(
'first', bc.first_name,
'last', bc.last_name,
'full', CONCAT(bc.first_name, ' ', bc.last_name)
),
'communication', JSON_OBJECT(
'email', bc.email_address,
'phone', bc.phone_number
),
'professional', JSON_OBJECT(
'job_title', bc.job_title
)
) as billing_contact_json
FROM postgresql_customers c
LEFT JOIN postgresql_addresses ba ON c.billing_address_id = ba.address_id
LEFT JOIN postgresql_addresses sa ON c.shipping_address_id = sa.address_id
LEFT JOIN postgresql_contacts pc ON c.primary_contact_id = pc.contact_id
LEFT JOIN postgresql_contacts bc ON c.billing_contact_id = bc.contact_id
WHERE c.account_status IN ('active', 'inactive', 'trial')
),
optimized_customer_documents AS (
SELECT
scd.customer_uuid,
scd.company_name,
scd.industry_code,
scd.company_size,
-- Optimized account information structure
JSON_OBJECT(
'status', scd.account_status,
'subscription_tier', scd.subscription_tier,
'annual_contract_value', scd.annual_contract_value,
'created_at', scd.created_at,
'updated_at', scd.updated_at,
-- Calculated account metrics
'customer_tier',
CASE
WHEN scd.annual_contract_value >= 100000 THEN 'enterprise'
WHEN scd.annual_contract_value >= 50000 THEN 'professional'
WHEN scd.annual_contract_value >= 10000 THEN 'standard'
ELSE 'basic'
END,
'account_age_days', DATE_PART('days', CURRENT_TIMESTAMP - scd.created_at),
'lifecycle_stage',
CASE
WHEN scd.account_status = 'trial' THEN 'prospect'
WHEN scd.account_status = 'active' AND DATE_PART('days', CURRENT_TIMESTAMP - scd.created_at) <= 90 THEN 'new_customer'
WHEN scd.account_status = 'active' THEN 'established_customer'
WHEN scd.account_status = 'inactive' THEN 'at_risk'
ELSE 'unknown'
END
) as account,
-- Optimized address structures
scd.billing_address_json as billing_address,
scd.shipping_address_json as shipping_address,
-- Optimized contact structure
JSON_OBJECT(
'primary_contact', scd.primary_contact_json,
'billing_contact', scd.billing_contact_json,
'contact_count',
CASE
WHEN scd.primary_contact_json IS NOT NULL AND scd.billing_contact_json IS NOT NULL THEN 2
WHEN scd.primary_contact_json IS NOT NULL OR scd.billing_contact_json IS NOT NULL THEN 1
ELSE 0
END
) as contacts,
-- Migration metadata
JSON_OBJECT(
'migrated_at', CURRENT_TIMESTAMP,
'migration_version', '1.0',
'data_validated', true,
'transformation_applied', true,
'optimization_level', 'advanced'
) as migration_info
FROM source_customer_data scd
WHERE scd.company_name IS NOT NULL
AND scd.customer_uuid IS NOT NULL
)
-- Execute optimized bulk migration with data validation
INSERT INTO customers (
customer_uuid,
company_name,
industry_code,
company_size,
account,
billing_address,
shipping_address,
contacts,
migration_info
)
SELECT
ocd.customer_uuid,
ocd.company_name,
ocd.industry_code,
ocd.company_size,
ocd.account,
-- Conditional address insertion (only if data exists)
CASE
WHEN ocd.billing_address->>'street_address_1' IS NOT NULL
THEN ocd.billing_address
ELSE NULL
END,
CASE
WHEN ocd.shipping_address->>'street_address_1' IS NOT NULL
THEN ocd.shipping_address
ELSE NULL
END,
ocd.contacts,
ocd.migration_info
FROM optimized_customer_documents ocd
-- Migration execution options
WITH (
migration_strategy = 'staged_migration',
batch_size = 1000,
parallel_batches = 4,
enable_progress_tracking = true,
enable_data_validation = true,
enable_rollback_capability = true,
-- Performance optimization
write_concern = 'majority',
write_timeout = 30000,
-- Error handling
continue_on_error = true,
max_errors_allowed = 100
);
-- Advanced product catalog migration with category optimization
WITH recursive_category_hierarchy AS (
WITH RECURSIVE category_tree AS (
SELECT
pc.category_id,
pc.category_name,
pc.category_slug,
pc.parent_category_id,
pc.category_level,
pc.category_description,
pc.display_order,
pc.is_featured,
ARRAY[pc.category_id] as path_ids,
pc.category_name as path_names,
1 as depth
FROM postgresql_product_categories pc
WHERE pc.parent_category_id IS NULL
UNION ALL
SELECT
pc.category_id,
pc.category_name,
pc.category_slug,
pc.parent_category_id,
pc.category_level,
pc.category_description,
pc.display_order,
pc.is_featured,
ct.path_ids || pc.category_id,
ct.path_names || ' > ' || pc.category_name,
ct.depth + 1
FROM postgresql_product_categories pc
INNER JOIN category_tree ct ON pc.parent_category_id = ct.category_id
WHERE ct.depth < 10 -- Prevent infinite recursion
)
SELECT * FROM category_tree
),
enhanced_product_data AS (
SELECT
p.product_uuid,
p.product_name,
p.product_sku,
p.product_description,
p.base_price,
p.currency_code,
p.current_stock_level,
p.minimum_stock_level,
p.maximum_stock_level,
p.product_status,
p.launch_date,
p.discontinuation_date,
p.weight_kg,
p.length_cm,
p.width_cm,
p.height_cm,
p.requires_shipping,
p.shipping_class,
p.hazardous_material,
p.seo_title,
p.seo_description,
p.meta_keywords,
p.created_at,
p.updated_at,
-- Enhanced category information
rch.category_name as primary_category,
rch.path_names as category_hierarchy,
rch.path_ids as category_path_ids,
rch.depth as category_depth,
rch.is_featured as category_featured,
-- Recent sales analytics
COALESCE(sales.times_ordered, 0) as times_ordered,
COALESCE(sales.total_quantity_sold, 0) as total_quantity_sold,
COALESCE(sales.total_revenue, 0) as total_revenue,
COALESCE(sales.avg_selling_price, p.base_price) as avg_selling_price,
sales.last_ordered_date,
-- Inventory analytics
CASE
WHEN p.current_stock_level <= p.minimum_stock_level THEN 'low_stock'
WHEN p.current_stock_level >= p.maximum_stock_level THEN 'overstock'
ELSE 'normal'
END as stock_status,
-- Product performance classification
CASE
WHEN COALESCE(sales.total_revenue, 0) >= 50000 AND COALESCE(sales.times_ordered, 0) >= 100 THEN 'high_performer'
WHEN COALESCE(sales.total_revenue, 0) >= 10000 AND COALESCE(sales.times_ordered, 0) >= 20 THEN 'good_performer'
WHEN COALESCE(sales.total_revenue, 0) >= 1000 AND COALESCE(sales.times_ordered, 0) >= 5 THEN 'average_performer'
WHEN COALESCE(sales.times_ordered, 0) > 0 THEN 'low_performer'
ELSE 'no_sales'
END as performance_tier
FROM postgresql_products p
LEFT JOIN recursive_category_hierarchy rch ON p.primary_category_id = rch.category_id
LEFT JOIN (
SELECT
oi.product_id,
COUNT(oi.order_item_id) as times_ordered,
SUM(oi.quantity) as total_quantity_sold,
SUM(oi.line_total) as total_revenue,
AVG(oi.unit_price) as avg_selling_price,
MAX(o.order_date) as last_ordered_date
FROM postgresql_order_items oi
INNER JOIN postgresql_orders o ON oi.order_id = o.order_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '12 months'
AND o.order_status NOT IN ('cancelled', 'returned')
GROUP BY oi.product_id
) sales ON p.product_id = sales.product_id
WHERE p.product_status IN ('active', 'discontinued')
)
-- Create optimized products collection
CREATE COLLECTION products WITH (
enable_embedded_documents = true,
enable_array_fields = true,
compression_algorithm = 'snappy'
) AS (
product_uuid UUID PRIMARY KEY,
product_name VARCHAR(255) NOT NULL,
product_sku VARCHAR(100) UNIQUE NOT NULL,
product_description TEXT,
product_status VARCHAR(20) NOT NULL,
-- Embedded category information
categories JSONB NOT NULL,
-- Embedded pricing information
pricing JSONB NOT NULL,
-- Embedded inventory information
inventory JSONB NOT NULL,
-- Embedded physical characteristics
physical JSONB,
-- Embedded SEO information
seo JSONB,
-- Embedded lifecycle information
lifecycle JSONB NOT NULL,
-- Embedded analytics information
analytics JSONB NOT NULL,
-- Migration tracking
migration_info JSONB NOT NULL,
-- Optimized indexes
INDEX product_uuid_idx (product_uuid) UNIQUE,
INDEX product_sku_idx (product_sku) UNIQUE,
INDEX primary_category_idx ((categories.primary)),
INDEX performance_tier_idx ((analytics.performance_tier)),
INDEX product_status_idx (product_status),
INDEX stock_status_idx ((inventory.stock_status)),
TEXT INDEX product_search_idx (product_name, product_description, product_sku)
);
-- Execute advanced product migration with comprehensive optimization
INSERT INTO products (
product_uuid,
product_name,
product_sku,
product_description,
product_status,
categories,
pricing,
inventory,
physical,
seo,
lifecycle,
analytics,
migration_info
)
SELECT
epd.product_uuid,
epd.product_name,
epd.product_sku,
epd.product_description,
epd.product_status,
-- Optimized category structure
JSON_OBJECT(
'primary', epd.primary_category,
'hierarchy', STRING_TO_ARRAY(epd.category_hierarchy, ' > '),
'path_ids', epd.category_path_ids,
'depth', epd.category_depth,
'is_featured', epd.category_featured
),
-- Comprehensive pricing information
JSON_OBJECT(
'base_price', epd.base_price,
'currency_code', epd.currency_code,
'avg_selling_price', epd.avg_selling_price,
'pricing_tier',
CASE
WHEN epd.base_price >= 1000 THEN 'premium'
WHEN epd.base_price >= 100 THEN 'standard'
ELSE 'economy'
END,
'last_price_update', epd.updated_at
),
-- Advanced inventory management
JSON_OBJECT(
'current_stock', epd.current_stock_level,
'minimum_stock', epd.minimum_stock_level,
'maximum_stock', epd.maximum_stock_level,
'stock_status', epd.stock_status,
'tracked', epd.current_stock_level IS NOT NULL,
'reorder_point', epd.minimum_stock_level * 1.2,
'stock_turnover_estimate',
CASE
WHEN epd.total_quantity_sold > 0 AND epd.current_stock_level > 0
THEN ROUND(epd.total_quantity_sold::float / epd.current_stock_level, 2)
ELSE 0
END
),
-- Physical characteristics and shipping
JSON_OBJECT(
'dimensions', JSON_OBJECT(
'weight_kg', epd.weight_kg,
'length_cm', epd.length_cm,
'width_cm', epd.width_cm,
'height_cm', epd.height_cm,
'volume_cubic_cm',
CASE
WHEN epd.length_cm IS NOT NULL AND epd.width_cm IS NOT NULL AND epd.height_cm IS NOT NULL
THEN epd.length_cm * epd.width_cm * epd.height_cm
ELSE NULL
END
),
'shipping', JSON_OBJECT(
'requires_shipping', epd.requires_shipping,
'shipping_class', epd.shipping_class,
'hazardous_material', epd.hazardous_material,
'dimensional_weight',
CASE
WHEN epd.length_cm IS NOT NULL AND epd.width_cm IS NOT NULL AND epd.height_cm IS NOT NULL
THEN (epd.length_cm * epd.width_cm * epd.height_cm) / 5000 -- Standard dimensional weight formula
ELSE NULL
END
)
),
-- SEO and marketing optimization
JSON_OBJECT(
'title', epd.seo_title,
'description', epd.seo_description,
'keywords',
CASE
WHEN epd.meta_keywords IS NOT NULL
THEN STRING_TO_ARRAY(epd.meta_keywords, ',')
ELSE ARRAY[]::text[]
END,
'url_slug', LOWER(REGEXP_REPLACE(epd.product_name, '[^a-zA-Z0-9]+', '-', 'g')),
'search_terms', ARRAY[epd.product_name, epd.product_sku, epd.primary_category]
),
-- Product lifecycle management
JSON_OBJECT(
'launch_date', epd.launch_date,
'discontinuation_date', epd.discontinuation_date,
'created_at', epd.created_at,
'updated_at', epd.updated_at,
'days_since_launch',
CASE
WHEN epd.launch_date IS NOT NULL
THEN DATE_PART('days', CURRENT_DATE - epd.launch_date)
ELSE DATE_PART('days', CURRENT_DATE - epd.created_at)
END,
'lifecycle_stage',
CASE
WHEN epd.discontinuation_date IS NOT NULL THEN 'discontinued'
WHEN epd.launch_date > CURRENT_DATE - INTERVAL '90 days' THEN 'new'
WHEN epd.launch_date > CURRENT_DATE - INTERVAL '1 year' THEN 'growing'
ELSE 'mature'
END
),
-- Comprehensive analytics and performance metrics
JSON_OBJECT(
'times_ordered', epd.times_ordered,
'total_quantity_sold', epd.total_quantity_sold,
'total_revenue', epd.total_revenue,
'last_ordered_date', epd.last_ordered_date,
'performance_tier', epd.performance_tier,
'revenue_per_order',
CASE
WHEN epd.times_ordered > 0
THEN ROUND(epd.total_revenue / epd.times_ordered, 2)
ELSE 0
END,
'average_order_quantity',
CASE
WHEN epd.times_ordered > 0
THEN ROUND(epd.total_quantity_sold::float / epd.times_ordered, 2)
ELSE 0
END,
'days_since_last_order',
CASE
WHEN epd.last_ordered_date IS NOT NULL
THEN DATE_PART('days', CURRENT_DATE - epd.last_ordered_date)
ELSE NULL
END,
'popularity_score',
CASE
WHEN epd.times_ordered >= 100 THEN 'very_high'
WHEN epd.times_ordered >= 50 THEN 'high'
WHEN epd.times_ordered >= 10 THEN 'medium'
WHEN epd.times_ordered > 0 THEN 'low'
ELSE 'none'
END
),
-- Migration tracking and validation
JSON_OBJECT(
'migrated_at', CURRENT_TIMESTAMP,
'migration_version', '1.0',
'source_product_id', p.product_id,
'data_validated', true,
'transformation_applied', true,
'analytics_computed', true,
'optimization_level', 'comprehensive'
)
FROM enhanced_product_data epd
JOIN postgresql_products p ON epd.product_uuid = p.product_uuid
WHERE epd.product_name IS NOT NULL
AND epd.product_sku IS NOT NULL
AND epd.product_status IN ('active', 'discontinued')
-- Migration execution with comprehensive options
WITH (
migration_strategy = 'staged_migration',
batch_size = 500, -- Smaller batches for complex documents
parallel_batches = 2,
enable_progress_tracking = true,
enable_data_validation = true,
enable_performance_optimization = true,
enable_rollback_capability = true,
-- Advanced optimization
enable_document_optimization = true,
enable_index_optimization = true,
enable_compression = true,
-- Quality assurance
validate_document_structure = true,
validate_data_relationships = true,
validate_business_rules = true,
-- Monitoring
enable_migration_metrics = true,
enable_performance_tracking = true,
track_optimization_effectiveness = true
);
-- Migration monitoring and analytics
CREATE VIEW migration_progress_dashboard AS
WITH migration_statistics AS (
SELECT
'customers' as collection_name,
COUNT(*) as migrated_documents,
AVG(LENGTH(BSON(document))) as avg_document_size,
MIN((migration_info->>'migrated_at')::timestamp) as migration_start,
MAX((migration_info->>'migrated_at')::timestamp) as migration_end
FROM customers
WHERE migration_info->>'migrated_at' IS NOT NULL
UNION ALL
SELECT
'products' as collection_name,
COUNT(*) as migrated_documents,
AVG(LENGTH(BSON(document))) as avg_document_size,
MIN((migration_info->>'migrated_at')::timestamp) as migration_start,
MAX((migration_info->>'migrated_at')::timestamp) as migration_end
FROM products
WHERE migration_info->>'migrated_at' IS NOT NULL
),
performance_metrics AS (
SELECT
collection_name,
migrated_documents,
avg_document_size,
migration_start,
migration_end,
-- Calculate migration performance
EXTRACT(EPOCH FROM (migration_end - migration_start)) as migration_duration_seconds,
ROUND(migrated_documents::float / EXTRACT(EPOCH FROM (migration_end - migration_start)) * 60, 2) as documents_per_minute,
-- Document optimization metrics
CASE
WHEN avg_document_size < 16000 THEN 'optimal' -- < 16KB
WHEN avg_document_size < 100000 THEN 'good' -- < 100KB
WHEN avg_document_size < 1000000 THEN 'large' -- < 1MB
ELSE 'very_large'
END as document_size_classification,
-- Migration efficiency assessment
CASE
WHEN migrated_documents / EXTRACT(EPOCH FROM (migration_end - migration_start)) > 100 THEN 'excellent'
WHEN migrated_documents / EXTRACT(EPOCH FROM (migration_end - migration_start)) > 50 THEN 'good'
WHEN migrated_documents / EXTRACT(EPOCH FROM (migration_end - migration_start)) > 10 THEN 'acceptable'
ELSE 'needs_optimization'
END as migration_efficiency
FROM migration_statistics
)
SELECT
pm.collection_name,
pm.migrated_documents,
ROUND(pm.avg_document_size / 1024.0, 2) || ' KB' as avg_document_size,
pm.migration_start,
pm.migration_end,
ROUND(pm.migration_duration_seconds / 60.0, 2) || ' minutes' as migration_duration,
pm.documents_per_minute || ' docs/min' as migration_throughput,
pm.document_size_classification,
pm.migration_efficiency,
-- Success metrics
'Migration completed successfully with optimized document structure' as status,
-- Recommendations
CASE pm.migration_efficiency
WHEN 'needs_optimization' THEN 'Consider increasing batch size and parallel processing'
WHEN 'acceptable' THEN 'Performance adequate, monitor for optimization opportunities'
WHEN 'good' THEN 'Good performance, continue with current configuration'
WHEN 'excellent' THEN 'Excellent performance, configuration optimized'
END as recommendation
FROM performance_metrics pm
ORDER BY pm.collection_name;
-- QueryLeaf provides comprehensive migration capabilities:
-- 1. Advanced data transformation with embedded documents and optimized structures
-- 2. Intelligent relationship modeling preserving data integrity
-- 3. Performance-optimized document design for MongoDB query patterns
-- 4. Comprehensive data validation and quality assurance
-- 5. Real-time migration monitoring and progress tracking
-- 6. Automated optimization recommendations based on data patterns
-- 7. Flexible schema evolution supporting future application requirements
-- 8. SQL-familiar syntax for MongoDB migration operations
-- 9. Enterprise-grade migration strategies with rollback capabilities
-- 10. Seamless integration with existing database infrastructure
Best Practices for MongoDB Migration
Migration Strategy and Planning
Essential principles for successful database modernization:
- Assessment and Planning: Thoroughly analyze source schemas, data relationships, and application access patterns before migration
- Document Design: Design MongoDB documents to optimize for application query patterns rather than maintaining relational structures
- Staged Migration: Implement gradual migration strategies to minimize downtime and enable thorough testing
- Data Validation: Implement comprehensive validation processes to ensure data integrity throughout the migration
- Performance Testing: Test migration performance and application performance against migrated data structures
- Rollback Capabilities: Maintain rollback strategies and data synchronization during transition periods
Data Transformation and Optimization
Optimize migration strategies for production-scale requirements:
- Relationship Analysis: Analyze relational patterns to determine optimal embedding vs. referencing strategies
- Index Planning: Design MongoDB indexes based on application query patterns and performance requirements
- Schema Flexibility: Leverage MongoDB's flexible schema capabilities while maintaining data consistency
- Batch Processing: Optimize batch sizes and parallel processing for large dataset migrations
- Monitoring Integration: Implement comprehensive monitoring for migration progress and post-migration performance
- Application Evolution: Plan application refactoring to leverage MongoDB's document model benefits
Conclusion
MongoDB NoSQL migration provides powerful capabilities for modernizing relational database architectures while preserving data integrity and improving application performance through optimized document design, flexible schemas, and intelligent data transformation strategies. The document-oriented approach naturally supports complex data relationships while eliminating the performance bottlenecks and complexity associated with traditional relational joins.
Key MongoDB Migration benefits include:
- Intelligent Data Transformation: Sophisticated migration tools that preserve relationships while optimizing for document-oriented access patterns
- Flexible Schema Evolution: Native support for schema changes without complex migrations or downtime requirements
- Performance Optimization: Dramatic improvements in query performance through embedded documents and optimized data structures
- Scalability Enhancement: Horizontal scaling capabilities that support modern application growth requirements
- Developer Productivity: Simplified data modeling that aligns with object-oriented application architectures
- SQL Accessibility: Familiar SQL-style migration operations through QueryLeaf for accessible database modernization
Whether you're modernizing legacy applications, building new cloud-native systems, or optimizing existing database performance, MongoDB migration with QueryLeaf's familiar SQL interface provides the foundation for successful database evolution.
QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB migration operations while providing familiar SQL syntax for data transformation, validation, and optimization. Advanced migration strategies, relationship modeling, and performance optimization techniques are seamlessly accessible through familiar SQL constructs, making sophisticated database modernization approachable for SQL-oriented development teams.
The combination of MongoDB's robust migration capabilities with SQL-style database operations makes it an ideal platform for organizations requiring both modern database performance and familiar database management patterns, ensuring your migration project can succeed while maintaining operational continuity and developer productivity.