Skip to content

MongoDB NoSQL Migration and Relational Database Modernization: Strategic Database Evolution for Modern Applications

Modern applications increasingly demand the flexibility, scalability, and performance characteristics that NoSQL databases provide, driving organizations to migrate from traditional relational database architectures to MongoDB's document-oriented model. However, this migration presents complex challenges including data transformation, schema evolution, application refactoring, and maintaining data consistency during the transition period.

MongoDB migration from relational databases requires sophisticated strategies that preserve data integrity while unlocking the benefits of document-oriented storage, flexible schemas, horizontal scaling capabilities, and improved developer productivity. Unlike simple data exports, successful MongoDB migrations involve careful planning of document structure design, relationship modeling, performance optimization, and gradual transition strategies that minimize application downtime and business disruption.

Traditional Relational Database Migration Challenges

Conventional database migration approaches often fail to leverage MongoDB's unique capabilities and struggle with complex data transformation requirements:

-- Traditional PostgreSQL schema - complex normalization causing migration challenges

-- Customer management with excessive normalization
CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,
    customer_uuid UUID UNIQUE NOT NULL DEFAULT uuid_generate_v4(),
    company_name VARCHAR(255) NOT NULL,
    industry_code VARCHAR(10) REFERENCES industry_lookup(code),
    company_size VARCHAR(20) CHECK (company_size IN ('startup', 'small', 'medium', 'large', 'enterprise')),

    -- Address normalization requiring complex joins
    billing_address_id INTEGER REFERENCES addresses(address_id),
    shipping_address_id INTEGER REFERENCES addresses(address_id),

    -- Contact information spread across multiple tables
    primary_contact_id INTEGER REFERENCES contacts(contact_id),
    billing_contact_id INTEGER REFERENCES contacts(contact_id),
    technical_contact_id INTEGER REFERENCES contacts(contact_id),

    -- Account management
    account_status VARCHAR(20) DEFAULT 'active',
    subscription_tier VARCHAR(50) DEFAULT 'basic',
    annual_contract_value DECIMAL(12,2) DEFAULT 0,

    -- Audit fields
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    created_by INTEGER REFERENCES users(user_id),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_by INTEGER REFERENCES users(user_id)
);

-- Separate address table creating complex relationships
CREATE TABLE addresses (
    address_id SERIAL PRIMARY KEY,
    address_type VARCHAR(20) NOT NULL CHECK (address_type IN ('billing', 'shipping', 'corporate')),
    street_address_1 VARCHAR(200) NOT NULL,
    street_address_2 VARCHAR(200),
    city VARCHAR(100) NOT NULL,
    state_province VARCHAR(100),
    postal_code VARCHAR(20),
    country_code VARCHAR(3) NOT NULL REFERENCES countries(code),

    -- Geographic data
    latitude DECIMAL(10,8),
    longitude DECIMAL(11,8),
    timezone VARCHAR(50),

    -- Validation and verification
    address_verified BOOLEAN DEFAULT FALSE,
    verification_date TIMESTAMP WITH TIME ZONE,
    verification_service VARCHAR(50),

    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Contact information table with complex relationships
CREATE TABLE contacts (
    contact_id SERIAL PRIMARY KEY,
    contact_type VARCHAR(30) NOT NULL,
    title VARCHAR(100),
    first_name VARCHAR(100) NOT NULL,
    last_name VARCHAR(100) NOT NULL,
    middle_name VARCHAR(100),
    suffix VARCHAR(20),

    -- Communication preferences
    preferred_communication VARCHAR(20) DEFAULT 'email',
    email_address VARCHAR(255) UNIQUE,
    phone_number VARCHAR(20),
    mobile_number VARCHAR(20),
    fax_number VARCHAR(20),

    -- Professional information
    job_title VARCHAR(200),
    department VARCHAR(100),
    manager_contact_id INTEGER REFERENCES contacts(contact_id),

    -- Social and professional networks
    linkedin_url VARCHAR(500),
    twitter_handle VARCHAR(100),

    -- Communication preferences and consent
    email_opt_in BOOLEAN DEFAULT TRUE,
    sms_opt_in BOOLEAN DEFAULT FALSE,
    marketing_consent BOOLEAN DEFAULT FALSE,
    marketing_consent_date TIMESTAMP WITH TIME ZONE,

    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Product catalog with complex category hierarchy
CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_uuid UUID UNIQUE NOT NULL DEFAULT uuid_generate_v4(),
    product_name VARCHAR(255) NOT NULL,
    product_sku VARCHAR(100) UNIQUE NOT NULL,
    product_description TEXT,

    -- Category management through separate table
    primary_category_id INTEGER REFERENCES product_categories(category_id),

    -- Pricing structure
    base_price DECIMAL(10,2) NOT NULL,
    currency_code VARCHAR(3) DEFAULT 'USD',

    -- Inventory management
    inventory_tracked BOOLEAN DEFAULT TRUE,
    current_stock_level INTEGER DEFAULT 0,
    minimum_stock_level INTEGER DEFAULT 10,
    maximum_stock_level INTEGER DEFAULT 1000,

    -- Product status and lifecycle
    product_status VARCHAR(20) DEFAULT 'active' CHECK (product_status IN ('active', 'discontinued', 'draft', 'archived')),
    launch_date DATE,
    discontinuation_date DATE,

    -- SEO and marketing
    seo_title VARCHAR(200),
    seo_description TEXT,
    meta_keywords TEXT,

    -- Physical characteristics
    weight_kg DECIMAL(8,3),
    length_cm DECIMAL(8,2),
    width_cm DECIMAL(8,2),
    height_cm DECIMAL(8,2),

    -- Shipping and fulfillment
    requires_shipping BOOLEAN DEFAULT TRUE,
    shipping_class VARCHAR(50),
    hazardous_material BOOLEAN DEFAULT FALSE,

    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Product categories with complex hierarchy
CREATE TABLE product_categories (
    category_id SERIAL PRIMARY KEY,
    category_name VARCHAR(200) NOT NULL,
    category_slug VARCHAR(200) UNIQUE NOT NULL,
    category_description TEXT,

    -- Hierarchical structure
    parent_category_id INTEGER REFERENCES product_categories(category_id),
    category_level INTEGER NOT NULL DEFAULT 1,
    category_path TEXT, -- Computed field like '/electronics/computers/laptops'

    -- Display and ordering
    display_order INTEGER DEFAULT 0,
    is_featured BOOLEAN DEFAULT FALSE,
    is_visible BOOLEAN DEFAULT TRUE,

    -- SEO optimization
    seo_title VARCHAR(200),
    seo_description TEXT,
    category_image_url VARCHAR(500),

    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Order management with complex structure
CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    order_uuid UUID UNIQUE NOT NULL DEFAULT uuid_generate_v4(),
    order_number VARCHAR(50) UNIQUE NOT NULL,
    customer_id INTEGER REFERENCES customers(customer_id),

    -- Order timing
    order_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    required_date DATE,
    shipped_date TIMESTAMP WITH TIME ZONE,
    delivered_date TIMESTAMP WITH TIME ZONE,

    -- Financial information
    subtotal DECIMAL(12,2) NOT NULL DEFAULT 0,
    tax_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
    shipping_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
    discount_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
    total_amount DECIMAL(12,2) NOT NULL DEFAULT 0,
    currency_code VARCHAR(3) DEFAULT 'USD',

    -- Order status management
    order_status VARCHAR(30) DEFAULT 'pending' CHECK (
        order_status IN ('pending', 'confirmed', 'processing', 'shipped', 'delivered', 'cancelled', 'returned')
    ),
    payment_status VARCHAR(30) DEFAULT 'pending' CHECK (
        payment_status IN ('pending', 'authorized', 'captured', 'failed', 'refunded', 'partially_refunded')
    ),
    fulfillment_status VARCHAR(30) DEFAULT 'unfulfilled' CHECK (
        fulfillment_status IN ('unfulfilled', 'partially_fulfilled', 'fulfilled', 'restocked')
    ),

    -- Shipping information
    shipping_address_id INTEGER REFERENCES addresses(address_id),
    billing_address_id INTEGER REFERENCES addresses(address_id),
    shipping_method VARCHAR(100),
    tracking_number VARCHAR(200),
    shipping_carrier VARCHAR(100),

    -- Customer service and notes
    order_notes TEXT,
    internal_notes TEXT,
    customer_service_notes TEXT,

    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Order line items with product relationships
CREATE TABLE order_items (
    order_item_id SERIAL PRIMARY KEY,
    order_id INTEGER REFERENCES orders(order_id) ON DELETE CASCADE,
    product_id INTEGER REFERENCES products(product_id),

    -- Product information at time of order
    product_name VARCHAR(255) NOT NULL,
    product_sku VARCHAR(100) NOT NULL,
    product_description TEXT,

    -- Pricing and quantity
    unit_price DECIMAL(10,2) NOT NULL,
    quantity INTEGER NOT NULL DEFAULT 1,
    line_total DECIMAL(12,2) NOT NULL,

    -- Discounts and promotions
    discount_percentage DECIMAL(5,2) DEFAULT 0,
    discount_amount DECIMAL(10,2) DEFAULT 0,
    promotion_code VARCHAR(100),

    -- Fulfillment tracking
    fulfillment_status VARCHAR(30) DEFAULT 'pending',
    shipped_quantity INTEGER DEFAULT 0,
    returned_quantity INTEGER DEFAULT 0,

    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Migration challenges with traditional relational approach:

-- Complex join requirements for simple operations
SELECT 
    c.company_name,
    c.account_status,
    c.annual_contract_value,

    -- Billing address requires join
    ba.street_address_1 as billing_address,
    ba.city as billing_city,
    ba.state_province as billing_state,
    ba.country_code as billing_country,

    -- Shipping address requires separate join
    sa.street_address_1 as shipping_address,
    sa.city as shipping_city,
    sa.state_province as shipping_state,
    sa.country_code as shipping_country,

    -- Primary contact requires join
    pc.first_name || ' ' || pc.last_name as primary_contact_name,
    pc.email_address as primary_contact_email,
    pc.phone_number as primary_contact_phone,

    -- Billing contact requires separate join
    bc.first_name || ' ' || bc.last_name as billing_contact_name,
    bc.email_address as billing_contact_email,

    -- Order summary requires complex aggregation
    COUNT(o.order_id) as total_orders,
    SUM(o.total_amount) as total_order_value,
    MAX(o.order_date) as last_order_date

FROM customers c
LEFT JOIN addresses ba ON c.billing_address_id = ba.address_id
LEFT JOIN addresses sa ON c.shipping_address_id = sa.address_id
LEFT JOIN contacts pc ON c.primary_contact_id = pc.contact_id
LEFT JOIN contacts bc ON c.billing_contact_id = bc.contact_id
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE c.account_status = 'active'
  AND c.annual_contract_value > 50000
GROUP BY 
    c.customer_id, c.company_name, c.account_status, c.annual_contract_value,
    ba.street_address_1, ba.city, ba.state_province, ba.country_code,
    sa.street_address_1, sa.city, sa.state_province, sa.country_code,
    pc.first_name, pc.last_name, pc.email_address, pc.phone_number,
    bc.first_name, bc.last_name, bc.email_address
ORDER BY c.annual_contract_value DESC;

-- Product catalog with category hierarchy complexity
WITH RECURSIVE category_hierarchy AS (
    -- Base case: root categories
    SELECT 
        category_id,
        category_name,
        category_slug,
        parent_category_id,
        category_level,
        category_name as category_path,
        ARRAY[category_id] as path_ids
    FROM product_categories
    WHERE parent_category_id IS NULL

    UNION ALL

    -- Recursive case: child categories
    SELECT 
        pc.category_id,
        pc.category_name,
        pc.category_slug,
        pc.parent_category_id,
        pc.category_level,
        ch.category_path || ' > ' || pc.category_name,
        ch.path_ids || pc.category_id
    FROM product_categories pc
    INNER JOIN category_hierarchy ch ON pc.parent_category_id = ch.category_id
)
SELECT 
    p.product_name,
    p.product_sku,
    p.base_price,
    p.current_stock_level,
    p.product_status,

    -- Category hierarchy information
    ch.category_path as full_category_path,
    ch.category_level as category_depth,

    -- Recent order information
    COUNT(oi.order_item_id) as times_ordered,
    SUM(oi.quantity) as total_quantity_sold,
    SUM(oi.line_total) as total_revenue,
    MAX(o.order_date) as last_ordered_date

FROM products p
LEFT JOIN category_hierarchy ch ON p.primary_category_id = ch.category_id
LEFT JOIN order_items oi ON p.product_id = oi.product_id
LEFT JOIN orders o ON oi.order_id = o.order_id
WHERE p.product_status = 'active'
  AND o.order_date >= CURRENT_DATE - INTERVAL '6 months'
GROUP BY 
    p.product_id, p.product_name, p.product_sku, p.base_price,
    p.current_stock_level, p.product_status,
    ch.category_path, ch.category_level
ORDER BY total_revenue DESC;

-- Problems with relational schema for migration:
-- 1. Excessive normalization creates complex join requirements
-- 2. Rigid schema makes it difficult to add new fields
-- 3. Foreign key constraints complicate data transformation
-- 4. Complex hierarchical relationships require recursive queries
-- 5. Performance degradation with increasing data volume
-- 6. Difficult to represent nested or array data structures
-- 7. Schema changes require coordinated migrations
-- 8. Poor fit for document-oriented access patterns
-- 9. Limited flexibility for varying data structures
-- 10. Complex application logic to reconstruct related data

-- Traditional migration approach (problematic):
-- Attempting direct table-to-collection mapping without optimization

-- Naive migration - maintains relational structure (inefficient)
INSERT INTO mongodb.customers 
SELECT 
    customer_id,
    customer_uuid,
    company_name,
    industry_code,
    company_size,
    billing_address_id,  -- Foreign key becomes meaningless
    shipping_address_id, -- Foreign key becomes meaningless
    primary_contact_id,  -- Foreign key becomes meaningless
    account_status,
    subscription_tier,
    annual_contract_value,
    created_at,
    updated_at
FROM postgresql_customers;

-- This approach fails to leverage MongoDB's strengths:
-- - Maintains relational thinking
-- - Requires separate queries to reconstruct relationships
-- - Doesn't utilize document embedding opportunities
-- - Misses schema flexibility benefits
-- - Poor query performance due to separate collections
-- - Complex application logic to manage references
-- - No improvement in developer experience
-- - Limited scalability improvements

MongoDB provides powerful migration capabilities that transform relational schemas into optimized document structures:

// MongoDB Advanced Migration and Database Modernization Manager
const { MongoClient, ObjectId } = require('mongodb');

class MongoMigrationManager {
  constructor(sourceDb, targetMongoUri, migrationConfig = {}) {
    this.sourceDb = sourceDb; // Source database connection (PostgreSQL, MySQL, etc.)
    this.mongoClient = new MongoClient(targetMongoUri);
    this.targetDb = null;

    this.migrationConfig = {
      // Migration strategy configuration
      migrationStrategy: migrationConfig.strategy || 'staged_migration', // Options: bulk_migration, staged_migration, live_migration
      batchSize: migrationConfig.batchSize || 1000,
      enableDataValidation: migrationConfig.enableDataValidation !== false,
      enablePerformanceOptimization: migrationConfig.enablePerformanceOptimization !== false,

      // Schema transformation settings
      schemaTransformation: {
        enableDocumentEmbedding: migrationConfig.enableDocumentEmbedding !== false,
        enableArrayFields: migrationConfig.enableArrayFields !== false,
        enableDenormalization: migrationConfig.enableDenormalization !== false,
        optimizeForQueryPatterns: migrationConfig.optimizeForQueryPatterns !== false
      },

      // Data quality and validation
      dataQuality: {
        enableDataCleaning: migrationConfig.enableDataCleaning !== false,
        enableReferentialIntegrityChecks: migrationConfig.enableReferentialIntegrityChecks !== false,
        enableDuplicateDetection: migrationConfig.enableDuplicateDetection !== false,
        enableDataTypeConversion: migrationConfig.enableDataTypeConversion !== false
      },

      // Performance and monitoring
      performance: {
        enableProgressTracking: migrationConfig.enableProgressTracking !== false,
        enablePerformanceMetrics: migrationConfig.enablePerformanceMetrics !== false,
        enableRollbackCapability: migrationConfig.enableRollbackCapability !== false,
        maxConcurrentOperations: migrationConfig.maxConcurrentOperations || 4
      }
    };

    this.migrationMetrics = {
      startTime: null,
      endTime: null,
      recordsProcessed: 0,
      recordsMigrated: 0,
      errorsEncountered: 0,
      performanceStats: {}
    };
  }

  async initializeMigration() {
    console.log('Initializing MongoDB migration system...');

    try {
      await this.mongoClient.connect();
      this.targetDb = this.mongoClient.db('modernized_application');

      // Setup target collections with optimized schemas
      await this.setupOptimizedCollections();

      // Initialize migration tracking
      await this.initializeMigrationTracking();

      // Setup performance monitoring
      await this.setupPerformanceMonitoring();

      console.log('Migration system initialized successfully');

    } catch (error) {
      console.error('Error initializing migration system:', error);
      throw error;
    }
  }

  async setupOptimizedCollections() {
    console.log('Setting up optimized MongoDB collections...');

    // Create customers collection with embedded documents
    const customers = this.targetDb.collection('customers');

    // Create optimized indexes for customer operations
    await customers.createIndex({ customer_uuid: 1 }, { unique: true });
    await customers.createIndex({ company_name: 1 });
    await customers.createIndex({ "billing_contact.email": 1 });
    await customers.createIndex({ account_status: 1, annual_contract_value: -1 });
    await customers.createIndex({ industry_code: 1, company_size: 1 });
    await customers.createIndex({ created_at: -1 });

    // Create products collection with optimized structure
    const products = this.targetDb.collection('products');
    await products.createIndex({ product_uuid: 1 }, { unique: true });
    await products.createIndex({ product_sku: 1 }, { unique: true });
    await products.createIndex({ "categories.primary": 1 });
    await products.createIndex({ "pricing.base_price": 1 });
    await products.createIndex({ product_status: 1 });
    await products.createIndex({ "inventory.current_stock": 1 });

    // Create orders collection with embedded line items
    const orders = this.targetDb.collection('orders');
    await orders.createIndex({ order_uuid: 1 }, { unique: true });
    await orders.createIndex({ order_number: 1 }, { unique: true });
    await orders.createIndex({ customer_uuid: 1, order_date: -1 });
    await orders.createIndex({ order_status: 1 });
    await orders.createIndex({ order_date: -1 });
    await orders.createIndex({ "financial.total_amount": -1 });

    console.log('Optimized collections setup completed');
  }

  async migrateCustomerData() {
    console.log('Migrating customer data with document optimization...');
    const startTime = Date.now();

    try {
      const customersCollection = this.targetDb.collection('customers');

      // Fetch customer data with all related information in batches
      const customerQuery = `
        SELECT 
          c.customer_id,
          c.customer_uuid,
          c.company_name,
          c.industry_code,
          c.company_size,
          c.account_status,
          c.subscription_tier,
          c.annual_contract_value,
          c.created_at,
          c.updated_at,

          -- Billing address information
          ba.street_address_1 as billing_street_1,
          ba.street_address_2 as billing_street_2,
          ba.city as billing_city,
          ba.state_province as billing_state,
          ba.postal_code as billing_postal_code,
          ba.country_code as billing_country,
          ba.latitude as billing_lat,
          ba.longitude as billing_lng,
          ba.timezone as billing_timezone,

          -- Shipping address information
          sa.street_address_1 as shipping_street_1,
          sa.street_address_2 as shipping_street_2,
          sa.city as shipping_city,
          sa.state_province as shipping_state,
          sa.postal_code as shipping_postal_code,
          sa.country_code as shipping_country,
          sa.latitude as shipping_lat,
          sa.longitude as shipping_lng,
          sa.timezone as shipping_timezone,

          -- Primary contact information
          pc.contact_id as primary_contact_id,
          pc.first_name as primary_first_name,
          pc.last_name as primary_last_name,
          pc.email_address as primary_email,
          pc.phone_number as primary_phone,
          pc.mobile_number as primary_mobile,
          pc.job_title as primary_job_title,
          pc.department as primary_department,

          -- Billing contact information
          bc.contact_id as billing_contact_id,
          bc.first_name as billing_first_name,
          bc.last_name as billing_last_name,
          bc.email_address as billing_email,
          bc.phone_number as billing_phone,
          bc.job_title as billing_job_title

        FROM customers c
        LEFT JOIN addresses ba ON c.billing_address_id = ba.address_id
        LEFT JOIN addresses sa ON c.shipping_address_id = sa.address_id
        LEFT JOIN contacts pc ON c.primary_contact_id = pc.contact_id
        LEFT JOIN contacts bc ON c.billing_contact_id = bc.contact_id
        ORDER BY c.customer_id
      `;

      const customerResults = await this.sourceDb.query(customerQuery);
      const batchSize = this.migrationConfig.batchSize;

      for (let i = 0; i < customerResults.rows.length; i += batchSize) {
        const batch = customerResults.rows.slice(i, i + batchSize);
        const mongoDocuments = batch.map(customer => this.transformCustomerDocument(customer));

        // Validate documents before insertion
        const validatedDocuments = await this.validateDocuments(mongoDocuments);

        // Insert batch with error handling
        try {
          await customersCollection.insertMany(validatedDocuments, { ordered: false });
          this.migrationMetrics.recordsMigrated += validatedDocuments.length;
        } catch (error) {
          console.error(`Error inserting customer batch:`, error);
          this.migrationMetrics.errorsEncountered += 1;
        }

        // Update progress tracking
        await this.updateMigrationProgress('customers', i + batch.length, customerResults.rows.length);
      }

      const endTime = Date.now();
      console.log(`Customer migration completed in ${endTime - startTime}ms`);

    } catch (error) {
      console.error('Error migrating customer data:', error);
      throw error;
    }
  }

  transformCustomerDocument(customerRow) {
    // Transform relational data into optimized MongoDB document
    const customer = {
      _id: new ObjectId(),
      customer_uuid: customerRow.customer_uuid,
      company_name: customerRow.company_name,
      industry_code: customerRow.industry_code,
      company_size: customerRow.company_size,

      // Account information
      account: {
        status: customerRow.account_status,
        subscription_tier: customerRow.subscription_tier,
        annual_contract_value: customerRow.annual_contract_value,
        created_at: customerRow.created_at,
        updated_at: customerRow.updated_at
      },

      // Embedded billing address
      billing_address: customerRow.billing_street_1 ? {
        street_address_1: customerRow.billing_street_1,
        street_address_2: customerRow.billing_street_2,
        city: customerRow.billing_city,
        state_province: customerRow.billing_state,
        postal_code: customerRow.billing_postal_code,
        country_code: customerRow.billing_country,
        coordinates: customerRow.billing_lat && customerRow.billing_lng ? {
          latitude: customerRow.billing_lat,
          longitude: customerRow.billing_lng
        } : null,
        timezone: customerRow.billing_timezone
      } : null,

      // Embedded shipping address
      shipping_address: customerRow.shipping_street_1 ? {
        street_address_1: customerRow.shipping_street_1,
        street_address_2: customerRow.shipping_street_2,
        city: customerRow.shipping_city,
        state_province: customerRow.shipping_state,
        postal_code: customerRow.shipping_postal_code,
        country_code: customerRow.shipping_country,
        coordinates: customerRow.shipping_lat && customerRow.shipping_lng ? {
          latitude: customerRow.shipping_lat,
          longitude: customerRow.shipping_lng
        } : null,
        timezone: customerRow.shipping_timezone
      } : null,

      // Embedded contact information
      contacts: {
        primary_contact: customerRow.primary_contact_id ? {
          contact_id: customerRow.primary_contact_id,
          name: {
            first: customerRow.primary_first_name,
            last: customerRow.primary_last_name,
            full: `${customerRow.primary_first_name} ${customerRow.primary_last_name}`
          },
          communication: {
            email: customerRow.primary_email,
            phone: customerRow.primary_phone,
            mobile: customerRow.primary_mobile
          },
          professional: {
            job_title: customerRow.primary_job_title,
            department: customerRow.primary_department
          }
        } : null,

        billing_contact: customerRow.billing_contact_id ? {
          contact_id: customerRow.billing_contact_id,
          name: {
            first: customerRow.billing_first_name,
            last: customerRow.billing_last_name,
            full: `${customerRow.billing_first_name} ${customerRow.billing_last_name}`
          },
          communication: {
            email: customerRow.billing_email,
            phone: customerRow.billing_phone
          },
          professional: {
            job_title: customerRow.billing_job_title
          }
        } : null
      },

      // Migration metadata
      migration_info: {
        migrated_at: new Date(),
        source_customer_id: customerRow.customer_id,
        migration_version: '1.0',
        data_validated: true
      }
    };

    return customer;
  }

  async migrateProductData() {
    console.log('Migrating product catalog with optimized structure...');
    const startTime = Date.now();

    try {
      const productsCollection = this.targetDb.collection('products');

      // Fetch products with category hierarchy and recent sales data
      const productQuery = `
        WITH RECURSIVE category_hierarchy AS (
          SELECT 
            category_id,
            category_name,
            category_slug,
            parent_category_id,
            category_level,
            category_name as category_path,
            ARRAY[category_id] as path_ids
          FROM product_categories
          WHERE parent_category_id IS NULL

          UNION ALL

          SELECT 
            pc.category_id,
            pc.category_name,
            pc.category_slug,
            pc.parent_category_id,
            pc.category_level,
            ch.category_path || ' > ' || pc.category_name,
            ch.path_ids || pc.category_id
          FROM product_categories pc
          INNER JOIN category_hierarchy ch ON pc.parent_category_id = ch.category_id
        ),

        recent_sales AS (
          SELECT 
            oi.product_id,
            COUNT(oi.order_item_id) as times_ordered,
            SUM(oi.quantity) as total_quantity_sold,
            SUM(oi.line_total) as total_revenue,
            AVG(oi.unit_price) as avg_selling_price,
            MAX(o.order_date) as last_ordered_date
          FROM order_items oi
          INNER JOIN orders o ON oi.order_id = o.order_id
          WHERE o.order_date >= CURRENT_DATE - INTERVAL '12 months'
          GROUP BY oi.product_id
        )

        SELECT 
          p.product_id,
          p.product_uuid,
          p.product_name,
          p.product_sku,
          p.product_description,
          p.base_price,
          p.currency_code,
          p.current_stock_level,
          p.minimum_stock_level,
          p.maximum_stock_level,
          p.product_status,
          p.launch_date,
          p.discontinuation_date,
          p.weight_kg,
          p.length_cm,
          p.width_cm,
          p.height_cm,
          p.requires_shipping,
          p.shipping_class,
          p.hazardous_material,
          p.seo_title,
          p.seo_description,
          p.meta_keywords,
          p.created_at,
          p.updated_at,

          -- Category information
          ch.category_path,
          ch.category_level,
          ch.path_ids,

          -- Sales analytics
          COALESCE(rs.times_ordered, 0) as times_ordered,
          COALESCE(rs.total_quantity_sold, 0) as total_quantity_sold,
          COALESCE(rs.total_revenue, 0) as total_revenue,
          COALESCE(rs.avg_selling_price, 0) as avg_selling_price,
          rs.last_ordered_date

        FROM products p
        LEFT JOIN category_hierarchy ch ON p.primary_category_id = ch.category_id
        LEFT JOIN recent_sales rs ON p.product_id = rs.product_id
        ORDER BY p.product_id
      `;

      const productResults = await this.sourceDb.query(productQuery);
      const batchSize = this.migrationConfig.batchSize;

      for (let i = 0; i < productResults.rows.length; i += batchSize) {
        const batch = productResults.rows.slice(i, i + batchSize);
        const mongoDocuments = batch.map(product => this.transformProductDocument(product));

        const validatedDocuments = await this.validateDocuments(mongoDocuments);

        try {
          await productsCollection.insertMany(validatedDocuments, { ordered: false });
          this.migrationMetrics.recordsMigrated += validatedDocuments.length;
        } catch (error) {
          console.error(`Error inserting product batch:`, error);
          this.migrationMetrics.errorsEncountered += 1;
        }

        await this.updateMigrationProgress('products', i + batch.length, productResults.rows.length);
      }

      const endTime = Date.now();
      console.log(`Product migration completed in ${endTime - startTime}ms`);

    } catch (error) {
      console.error('Error migrating product data:', error);
      throw error;
    }
  }

  transformProductDocument(productRow) {
    // Transform relational product data into optimized MongoDB document
    const product = {
      _id: new ObjectId(),
      product_uuid: productRow.product_uuid,
      product_name: productRow.product_name,
      product_sku: productRow.product_sku,
      product_description: productRow.product_description,
      product_status: productRow.product_status,

      // Category information with full hierarchy
      categories: {
        primary: productRow.category_path ? productRow.category_path.split(' > ')[0] : null,
        hierarchy: productRow.category_path ? productRow.category_path.split(' > ') : [],
        level: productRow.category_level || 1,
        path_ids: productRow.path_ids || []
      },

      // Pricing information
      pricing: {
        base_price: productRow.base_price,
        currency_code: productRow.currency_code,
        avg_selling_price: productRow.avg_selling_price || productRow.base_price
      },

      // Inventory management
      inventory: {
        current_stock: productRow.current_stock_level,
        minimum_stock: productRow.minimum_stock_level,
        maximum_stock: productRow.maximum_stock_level,
        tracked: productRow.current_stock_level !== null
      },

      // Physical characteristics
      physical: {
        dimensions: {
          weight_kg: productRow.weight_kg,
          length_cm: productRow.length_cm,
          width_cm: productRow.width_cm,
          height_cm: productRow.height_cm
        },
        shipping: {
          requires_shipping: productRow.requires_shipping,
          shipping_class: productRow.shipping_class,
          hazardous_material: productRow.hazardous_material
        }
      },

      // SEO and marketing
      seo: {
        title: productRow.seo_title,
        description: productRow.seo_description,
        keywords: productRow.meta_keywords ? productRow.meta_keywords.split(',').map(k => k.trim()) : []
      },

      // Lifecycle information
      lifecycle: {
        launch_date: productRow.launch_date,
        discontinuation_date: productRow.discontinuation_date,
        created_at: productRow.created_at,
        updated_at: productRow.updated_at
      },

      // Sales analytics embedded for performance
      analytics: {
        times_ordered: productRow.times_ordered || 0,
        total_quantity_sold: productRow.total_quantity_sold || 0,
        total_revenue: productRow.total_revenue || 0,
        last_ordered_date: productRow.last_ordered_date,
        performance_tier: this.calculatePerformanceTier(productRow)
      },

      // Migration tracking
      migration_info: {
        migrated_at: new Date(),
        source_product_id: productRow.product_id,
        migration_version: '1.0',
        data_validated: true
      }
    };

    return product;
  }

  calculatePerformanceTier(productRow) {
    const revenue = productRow.total_revenue || 0;
    const timesOrdered = productRow.times_ordered || 0;

    if (revenue > 50000 && timesOrdered > 100) return 'high_performer';
    if (revenue > 10000 && timesOrdered > 20) return 'good_performer';
    if (revenue > 1000 && timesOrdered > 5) return 'average_performer';
    if (timesOrdered > 0) return 'low_performer';
    return 'no_sales';
  }

  async migrateOrderData() {
    console.log('Migrating order data with embedded line items...');
    const startTime = Date.now();

    try {
      const ordersCollection = this.targetDb.collection('orders');

      // Fetch orders with embedded line items and customer information
      const orderQuery = `
        SELECT 
          o.order_id,
          o.order_uuid,
          o.order_number,
          c.customer_uuid,
          c.company_name,
          o.order_date,
          o.required_date,
          o.shipped_date,
          o.delivered_date,
          o.subtotal,
          o.tax_amount,
          o.shipping_amount,
          o.discount_amount,
          o.total_amount,
          o.currency_code,
          o.order_status,
          o.payment_status,
          o.fulfillment_status,
          o.shipping_method,
          o.tracking_number,
          o.shipping_carrier,
          o.order_notes,
          o.internal_notes,
          o.created_at,
          o.updated_at,

          -- Shipping address
          sa.street_address_1 as shipping_street_1,
          sa.street_address_2 as shipping_street_2,
          sa.city as shipping_city,
          sa.state_province as shipping_state,
          sa.postal_code as shipping_postal_code,
          sa.country_code as shipping_country,

          -- Billing address
          ba.street_address_1 as billing_street_1,
          ba.street_address_2 as billing_street_2,
          ba.city as billing_city,
          ba.state_province as billing_state,
          ba.postal_code as billing_postal_code,
          ba.country_code as billing_country,

          -- Aggregated order items as JSON
          json_agg(
            json_build_object(
              'order_item_id', oi.order_item_id,
              'product_uuid', p.product_uuid,
              'product_name', oi.product_name,
              'product_sku', oi.product_sku,
              'unit_price', oi.unit_price,
              'quantity', oi.quantity,
              'line_total', oi.line_total,
              'discount_percentage', oi.discount_percentage,
              'discount_amount', oi.discount_amount,
              'promotion_code', oi.promotion_code,
              'fulfillment_status', oi.fulfillment_status
            ) ORDER BY oi.order_item_id
          ) as order_items

        FROM orders o
        INNER JOIN customers c ON o.customer_id = c.customer_id
        LEFT JOIN addresses sa ON o.shipping_address_id = sa.address_id
        LEFT JOIN addresses ba ON o.billing_address_id = ba.address_id
        INNER JOIN order_items oi ON o.order_id = oi.order_id
        LEFT JOIN products p ON oi.product_id = p.product_id
        GROUP BY 
          o.order_id, o.order_uuid, o.order_number, c.customer_uuid, c.company_name,
          o.order_date, o.required_date, o.shipped_date, o.delivered_date,
          o.subtotal, o.tax_amount, o.shipping_amount, o.discount_amount, o.total_amount,
          o.currency_code, o.order_status, o.payment_status, o.fulfillment_status,
          o.shipping_method, o.tracking_number, o.shipping_carrier,
          o.order_notes, o.internal_notes, o.created_at, o.updated_at,
          sa.street_address_1, sa.street_address_2, sa.city, sa.state_province, sa.postal_code, sa.country_code,
          ba.street_address_1, ba.street_address_2, ba.city, ba.state_province, ba.postal_code, ba.country_code
        ORDER BY o.order_id
      `;

      const orderResults = await this.sourceDb.query(orderQuery);
      const batchSize = this.migrationConfig.batchSize;

      for (let i = 0; i < orderResults.rows.length; i += batchSize) {
        const batch = orderResults.rows.slice(i, i + batchSize);
        const mongoDocuments = batch.map(order => this.transformOrderDocument(order));

        const validatedDocuments = await this.validateDocuments(mongoDocuments);

        try {
          await ordersCollection.insertMany(validatedDocuments, { ordered: false });
          this.migrationMetrics.recordsMigrated += validatedDocuments.length;
        } catch (error) {
          console.error(`Error inserting order batch:`, error);
          this.migrationMetrics.errorsEncountered += 1;
        }

        await this.updateMigrationProgress('orders', i + batch.length, orderResults.rows.length);
      }

      const endTime = Date.now();
      console.log(`Order migration completed in ${endTime - startTime}ms`);

    } catch (error) {
      console.error('Error migrating order data:', error);
      throw error;
    }
  }

  transformOrderDocument(orderRow) {
    // Transform relational order data into optimized MongoDB document with embedded items
    const order = {
      _id: new ObjectId(),
      order_uuid: orderRow.order_uuid,
      order_number: orderRow.order_number,

      // Customer reference
      customer: {
        customer_uuid: orderRow.customer_uuid,
        company_name: orderRow.company_name
      },

      // Order timing
      dates: {
        order_date: orderRow.order_date,
        required_date: orderRow.required_date,
        shipped_date: orderRow.shipped_date,
        delivered_date: orderRow.delivered_date,
        created_at: orderRow.created_at,
        updated_at: orderRow.updated_at
      },

      // Financial information
      financial: {
        subtotal: orderRow.subtotal,
        tax_amount: orderRow.tax_amount,
        shipping_amount: orderRow.shipping_amount,
        discount_amount: orderRow.discount_amount,
        total_amount: orderRow.total_amount,
        currency_code: orderRow.currency_code
      },

      // Status tracking
      status: {
        order_status: orderRow.order_status,
        payment_status: orderRow.payment_status,
        fulfillment_status: orderRow.fulfillment_status
      },

      // Shipping information
      shipping: {
        method: orderRow.shipping_method,
        tracking_number: orderRow.tracking_number,
        carrier: orderRow.shipping_carrier,
        address: orderRow.shipping_street_1 ? {
          street_address_1: orderRow.shipping_street_1,
          street_address_2: orderRow.shipping_street_2,
          city: orderRow.shipping_city,
          state_province: orderRow.shipping_state,
          postal_code: orderRow.shipping_postal_code,
          country_code: orderRow.shipping_country
        } : null
      },

      // Billing address
      billing_address: orderRow.billing_street_1 ? {
        street_address_1: orderRow.billing_street_1,
        street_address_2: orderRow.billing_street_2,
        city: orderRow.billing_city,
        state_province: orderRow.billing_state,
        postal_code: orderRow.billing_postal_code,
        country_code: orderRow.billing_country
      } : null,

      // Embedded order items for optimal query performance
      items: orderRow.order_items.map(item => ({
        product_uuid: item.product_uuid,
        product_name: item.product_name,
        product_sku: item.product_sku,
        pricing: {
          unit_price: item.unit_price,
          quantity: item.quantity,
          line_total: item.line_total,
          discount_percentage: item.discount_percentage,
          discount_amount: item.discount_amount
        },
        promotion_code: item.promotion_code,
        fulfillment_status: item.fulfillment_status
      })),

      // Order summary for quick access
      summary: {
        total_items: orderRow.order_items.length,
        total_quantity: orderRow.order_items.reduce((sum, item) => sum + item.quantity, 0),
        unique_products: new Set(orderRow.order_items.map(item => item.product_uuid)).size
      },

      // Notes and comments
      notes: {
        order_notes: orderRow.order_notes,
        internal_notes: orderRow.internal_notes
      },

      // Migration tracking
      migration_info: {
        migrated_at: new Date(),
        source_order_id: orderRow.order_id,
        migration_version: '1.0',
        data_validated: true
      }
    };

    return order;
  }

  async validateDocuments(documents) {
    if (!this.migrationConfig.dataQuality.enableDataCleaning) {
      return documents;
    }

    // Validate and clean documents before insertion
    return documents.filter(doc => {
      // Basic validation
      if (!doc._id) return false;

      // Remove null/undefined fields
      this.removeEmptyFields(doc);

      return true;
    });
  }

  removeEmptyFields(obj) {
    Object.keys(obj).forEach(key => {
      if (obj[key] && typeof obj[key] === 'object') {
        this.removeEmptyFields(obj[key]);
      } else if (obj[key] === null || obj[key] === undefined) {
        delete obj[key];
      }
    });
  }

  async updateMigrationProgress(collection, processed, total) {
    if (this.migrationConfig.performance.enableProgressTracking) {
      const progress = {
        collection: collection,
        processed: processed,
        total: total,
        percentage: ((processed / total) * 100).toFixed(2),
        timestamp: new Date()
      };

      console.log(`Migration progress - ${collection}: ${progress.percentage}% (${processed}/${total})`);

      // Store progress in monitoring collection
      await this.targetDb.collection('migration_progress').replaceOne(
        { collection: collection },
        progress,
        { upsert: true }
      );
    }
  }

  async generateMigrationReport() {
    console.log('Generating comprehensive migration report...');

    const report = {
      migration_summary: {
        start_time: this.migrationMetrics.startTime,
        end_time: this.migrationMetrics.endTime,
        total_duration: this.migrationMetrics.endTime - this.migrationMetrics.startTime,
        records_processed: this.migrationMetrics.recordsProcessed,
        records_migrated: this.migrationMetrics.recordsMigrated,
        errors_encountered: this.migrationMetrics.errorsEncountered,
        success_rate: ((this.migrationMetrics.recordsMigrated / this.migrationMetrics.recordsProcessed) * 100).toFixed(2)
      },

      // Collection-specific statistics
      collection_statistics: await this.generateCollectionStatistics(),

      // Data quality assessment
      data_quality_report: await this.generateDataQualityReport(),

      // Performance metrics
      performance_analysis: await this.generatePerformanceAnalysis(),

      // Recommendations for optimization
      optimization_recommendations: await this.generateOptimizationRecommendations()
    };

    // Store migration report
    await this.targetDb.collection('migration_reports').insertOne({
      ...report,
      generated_at: new Date(),
      migration_id: new ObjectId()
    });

    return report;
  }

  async generateCollectionStatistics() {
    const collections = ['customers', 'products', 'orders'];
    const stats = {};

    for (const collectionName of collections) {
      const collection = this.targetDb.collection(collectionName);

      stats[collectionName] = {
        document_count: await collection.countDocuments(),
        average_document_size: await this.calculateAverageDocumentSize(collection),
        index_count: (await collection.indexes()).length,
        storage_metrics: await this.getStorageMetrics(collection)
      };
    }

    return stats;
  }
}

// Benefits of MongoDB Advanced Migration:
// - Intelligent document transformation preserving relational relationships
// - Optimized schema design leveraging MongoDB's document model strengths  
// - Embedded documents eliminating complex joins and improving performance
// - Flexible schema evolution supporting future application requirements
// - Comprehensive data validation and quality assurance during migration
// - Real-time migration progress tracking and performance monitoring
// - Automated optimization recommendations based on data patterns
// - Seamless integration with existing application architectures
// - Scalable migration strategies supporting large datasets
// - SQL-compatible operations through QueryLeaf for familiar database interactions

module.exports = {
  MongoMigrationManager
};

Advanced Migration Patterns and Data Transformation

Complex Relationship Modeling and Document Design

Transform complex relational structures into optimized MongoDB documents:

// Advanced migration patterns for complex enterprise scenarios
class EnterpriseMigrationManager extends MongoMigrationManager {
  constructor(sourceDb, targetMongoUri, enterpriseConfig) {
    super(sourceDb, targetMongoUri, enterpriseConfig);

    this.enterpriseConfig = {
      ...enterpriseConfig,

      // Enterprise migration features
      enableHierarchicalDataMigration: true,
      enablePolymorphicDocumentSupport: true,
      enableAdvancedRelationshipModeling: true,
      enableDataGovernanceCompliance: true,
      enableRealTimeMigrationMonitoring: true
    };
  }

  async migrateHierarchicalData() {
    console.log('Migrating hierarchical data with optimized tree structures...');

    // Migrate organizational hierarchies, category trees, and nested structures
    const organizationQuery = `
      WITH RECURSIVE org_hierarchy AS (
        SELECT 
          org_id,
          org_name,
          parent_org_id,
          org_level,
          org_type,
          ARRAY[org_id] as path,
          org_name as full_path
        FROM organizations
        WHERE parent_org_id IS NULL

        UNION ALL

        SELECT 
          o.org_id,
          o.org_name,
          o.parent_org_id,
          o.org_level,
          o.org_type,
          oh.path || o.org_id,
          oh.full_path || ' > ' || o.org_name
        FROM organizations o
        INNER JOIN org_hierarchy oh ON o.parent_org_id = oh.org_id
      )
      SELECT * FROM org_hierarchy ORDER BY path
    `;

    const orgResults = await this.sourceDb.query(organizationQuery);
    const hierarchicalDocuments = this.transformHierarchicalStructure(orgResults.rows);

    await this.targetDb.collection('organizations').insertMany(hierarchicalDocuments);
  }

  transformHierarchicalStructure(hierarchicalData) {
    // Transform tree structures into MongoDB-optimized format
    const rootNodes = hierarchicalData.filter(node => !node.parent_org_id);

    return rootNodes.map(root => this.buildHierarchicalDocument(root, hierarchicalData));
  }

  buildHierarchicalDocument(node, allNodes) {
    const children = allNodes.filter(n => n.parent_org_id === node.org_id);

    return {
      _id: new ObjectId(),
      org_id: node.org_id,
      org_name: node.org_name,
      org_type: node.org_type,
      org_level: node.org_level,

      // Hierarchical metadata for efficient queries
      hierarchy: {
        path: node.path,
        full_path: node.full_path,
        level: node.org_level,
        is_leaf: children.length === 0,
        child_count: children.length
      },

      // Embedded children for complete tree access
      children: children.map(child => this.buildHierarchicalDocument(child, allNodes)),

      // Flattened descendant references for efficient searching
      all_descendants: this.getAllDescendants(node.org_id, allNodes),

      migration_info: {
        migrated_at: new Date(),
        source_org_id: node.org_id,
        hierarchy_complete: true
      }
    };
  }
}

SQL-Style Migration with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB migration and database modernization operations:

-- QueryLeaf advanced migration and database modernization with SQL-familiar syntax

-- Set migration context and configuration
SET migration_mode = 'staged_migration';
SET migration_batch_size = 1000;
SET enable_data_validation = true;
SET enable_performance_optimization = true;

-- Configure target MongoDB collections with optimized schemas
CREATE COLLECTION customers WITH (
    -- Document structure optimization
    enable_embedded_documents = true,
    enable_array_fields = true,
    enable_flexible_schema = true,

    -- Performance optimization
    default_write_concern = 'majority',
    read_preference = 'primary',

    -- Storage optimization
    compression_algorithm = 'snappy',
    enable_sharding = false
) AS (
    customer_uuid UUID PRIMARY KEY,
    company_name VARCHAR(255) NOT NULL,
    industry_code VARCHAR(10),
    company_size VARCHAR(20),

    -- Embedded account information
    account JSONB NOT NULL,

    -- Embedded address information
    billing_address JSONB,
    shipping_address JSONB,

    -- Embedded contact information
    contacts JSONB NOT NULL,

    -- Migration tracking
    migration_info JSONB NOT NULL,

    -- Optimized indexes
    INDEX customer_uuid_idx (customer_uuid) UNIQUE,
    INDEX company_name_idx (company_name),
    INDEX account_status_idx ((account.status)),
    INDEX billing_email_idx ((contacts.billing_contact.communication.email)),
    TEXT INDEX company_search_idx (company_name, (contacts.primary_contact.name.full))
);

-- Advanced migration with data transformation and optimization
WITH source_customer_data AS (
    SELECT 
        c.customer_uuid,
        c.company_name,
        c.industry_code,
        c.company_size,
        c.account_status,
        c.subscription_tier,
        c.annual_contract_value,
        c.created_at,
        c.updated_at,

        -- Billing address aggregation
        JSON_OBJECT(
            'street_address_1', ba.street_address_1,
            'street_address_2', ba.street_address_2,
            'city', ba.city,
            'state_province', ba.state_province,
            'postal_code', ba.postal_code,
            'country_code', ba.country_code,
            'coordinates', 
                CASE 
                    WHEN ba.latitude IS NOT NULL AND ba.longitude IS NOT NULL 
                    THEN JSON_OBJECT('latitude', ba.latitude, 'longitude', ba.longitude)
                    ELSE NULL
                END,
            'timezone', ba.timezone
        ) as billing_address_json,

        -- Shipping address aggregation  
        JSON_OBJECT(
            'street_address_1', sa.street_address_1,
            'street_address_2', sa.street_address_2,
            'city', sa.city,
            'state_province', sa.state_province,
            'postal_code', sa.postal_code,
            'country_code', sa.country_code,
            'coordinates',
                CASE 
                    WHEN sa.latitude IS NOT NULL AND sa.longitude IS NOT NULL 
                    THEN JSON_OBJECT('latitude', sa.latitude, 'longitude', sa.longitude)
                    ELSE NULL
                END,
            'timezone', sa.timezone
        ) as shipping_address_json,

        -- Primary contact aggregation
        JSON_OBJECT(
            'contact_id', pc.contact_id,
            'name', JSON_OBJECT(
                'first', pc.first_name,
                'last', pc.last_name,
                'full', CONCAT(pc.first_name, ' ', pc.last_name)
            ),
            'communication', JSON_OBJECT(
                'email', pc.email_address,
                'phone', pc.phone_number,
                'mobile', pc.mobile_number
            ),
            'professional', JSON_OBJECT(
                'job_title', pc.job_title,
                'department', pc.department
            ),
            'preferences', JSON_OBJECT(
                'preferred_communication', pc.preferred_communication,
                'email_opt_in', pc.email_opt_in,
                'sms_opt_in', pc.sms_opt_in
            )
        ) as primary_contact_json,

        -- Billing contact aggregation
        JSON_OBJECT(
            'contact_id', bc.contact_id,
            'name', JSON_OBJECT(
                'first', bc.first_name,
                'last', bc.last_name,
                'full', CONCAT(bc.first_name, ' ', bc.last_name)
            ),
            'communication', JSON_OBJECT(
                'email', bc.email_address,
                'phone', bc.phone_number
            ),
            'professional', JSON_OBJECT(
                'job_title', bc.job_title
            )
        ) as billing_contact_json

    FROM postgresql_customers c
    LEFT JOIN postgresql_addresses ba ON c.billing_address_id = ba.address_id
    LEFT JOIN postgresql_addresses sa ON c.shipping_address_id = sa.address_id  
    LEFT JOIN postgresql_contacts pc ON c.primary_contact_id = pc.contact_id
    LEFT JOIN postgresql_contacts bc ON c.billing_contact_id = bc.contact_id
    WHERE c.account_status IN ('active', 'inactive', 'trial')
),

optimized_customer_documents AS (
    SELECT 
        scd.customer_uuid,
        scd.company_name,
        scd.industry_code,
        scd.company_size,

        -- Optimized account information structure
        JSON_OBJECT(
            'status', scd.account_status,
            'subscription_tier', scd.subscription_tier,
            'annual_contract_value', scd.annual_contract_value,
            'created_at', scd.created_at,
            'updated_at', scd.updated_at,

            -- Calculated account metrics
            'customer_tier', 
                CASE 
                    WHEN scd.annual_contract_value >= 100000 THEN 'enterprise'
                    WHEN scd.annual_contract_value >= 50000 THEN 'professional'
                    WHEN scd.annual_contract_value >= 10000 THEN 'standard'
                    ELSE 'basic'
                END,

            'account_age_days', DATE_PART('days', CURRENT_TIMESTAMP - scd.created_at),

            'lifecycle_stage',
                CASE 
                    WHEN scd.account_status = 'trial' THEN 'prospect'
                    WHEN scd.account_status = 'active' AND DATE_PART('days', CURRENT_TIMESTAMP - scd.created_at) <= 90 THEN 'new_customer'
                    WHEN scd.account_status = 'active' THEN 'established_customer'
                    WHEN scd.account_status = 'inactive' THEN 'at_risk'
                    ELSE 'unknown'
                END
        ) as account,

        -- Optimized address structures
        scd.billing_address_json as billing_address,
        scd.shipping_address_json as shipping_address,

        -- Optimized contact structure
        JSON_OBJECT(
            'primary_contact', scd.primary_contact_json,
            'billing_contact', scd.billing_contact_json,
            'contact_count', 
                CASE 
                    WHEN scd.primary_contact_json IS NOT NULL AND scd.billing_contact_json IS NOT NULL THEN 2
                    WHEN scd.primary_contact_json IS NOT NULL OR scd.billing_contact_json IS NOT NULL THEN 1
                    ELSE 0
                END
        ) as contacts,

        -- Migration metadata
        JSON_OBJECT(
            'migrated_at', CURRENT_TIMESTAMP,
            'migration_version', '1.0',
            'data_validated', true,
            'transformation_applied', true,
            'optimization_level', 'advanced'
        ) as migration_info

    FROM source_customer_data scd
    WHERE scd.company_name IS NOT NULL
      AND scd.customer_uuid IS NOT NULL
)

-- Execute optimized bulk migration with data validation
INSERT INTO customers (
    customer_uuid,
    company_name,
    industry_code,
    company_size,
    account,
    billing_address,
    shipping_address,
    contacts,
    migration_info
)
SELECT 
    ocd.customer_uuid,
    ocd.company_name,
    ocd.industry_code,
    ocd.company_size,
    ocd.account,

    -- Conditional address insertion (only if data exists)
    CASE 
        WHEN ocd.billing_address->>'street_address_1' IS NOT NULL 
        THEN ocd.billing_address 
        ELSE NULL 
    END,

    CASE 
        WHEN ocd.shipping_address->>'street_address_1' IS NOT NULL 
        THEN ocd.shipping_address 
        ELSE NULL 
    END,

    ocd.contacts,
    ocd.migration_info

FROM optimized_customer_documents ocd

-- Migration execution options
WITH (
    migration_strategy = 'staged_migration',
    batch_size = 1000,
    parallel_batches = 4,
    enable_progress_tracking = true,
    enable_data_validation = true,
    enable_rollback_capability = true,

    -- Performance optimization
    write_concern = 'majority',
    write_timeout = 30000,

    -- Error handling
    continue_on_error = true,
    max_errors_allowed = 100
);

-- Advanced product catalog migration with category optimization
WITH recursive_category_hierarchy AS (
    WITH RECURSIVE category_tree AS (
        SELECT 
            pc.category_id,
            pc.category_name,
            pc.category_slug,
            pc.parent_category_id,
            pc.category_level,
            pc.category_description,
            pc.display_order,
            pc.is_featured,
            ARRAY[pc.category_id] as path_ids,
            pc.category_name as path_names,
            1 as depth
        FROM postgresql_product_categories pc
        WHERE pc.parent_category_id IS NULL

        UNION ALL

        SELECT 
            pc.category_id,
            pc.category_name,
            pc.category_slug,
            pc.parent_category_id,
            pc.category_level,
            pc.category_description,
            pc.display_order,
            pc.is_featured,
            ct.path_ids || pc.category_id,
            ct.path_names || ' > ' || pc.category_name,
            ct.depth + 1
        FROM postgresql_product_categories pc
        INNER JOIN category_tree ct ON pc.parent_category_id = ct.category_id
        WHERE ct.depth < 10  -- Prevent infinite recursion
    )
    SELECT * FROM category_tree
),

enhanced_product_data AS (
    SELECT 
        p.product_uuid,
        p.product_name,
        p.product_sku,
        p.product_description,
        p.base_price,
        p.currency_code,
        p.current_stock_level,
        p.minimum_stock_level,
        p.maximum_stock_level,
        p.product_status,
        p.launch_date,
        p.discontinuation_date,
        p.weight_kg,
        p.length_cm,
        p.width_cm,
        p.height_cm,
        p.requires_shipping,
        p.shipping_class,
        p.hazardous_material,
        p.seo_title,
        p.seo_description,
        p.meta_keywords,
        p.created_at,
        p.updated_at,

        -- Enhanced category information
        rch.category_name as primary_category,
        rch.path_names as category_hierarchy,
        rch.path_ids as category_path_ids,
        rch.depth as category_depth,
        rch.is_featured as category_featured,

        -- Recent sales analytics
        COALESCE(sales.times_ordered, 0) as times_ordered,
        COALESCE(sales.total_quantity_sold, 0) as total_quantity_sold,
        COALESCE(sales.total_revenue, 0) as total_revenue,
        COALESCE(sales.avg_selling_price, p.base_price) as avg_selling_price,
        sales.last_ordered_date,

        -- Inventory analytics
        CASE 
            WHEN p.current_stock_level <= p.minimum_stock_level THEN 'low_stock'
            WHEN p.current_stock_level >= p.maximum_stock_level THEN 'overstock'
            ELSE 'normal'
        END as stock_status,

        -- Product performance classification
        CASE 
            WHEN COALESCE(sales.total_revenue, 0) >= 50000 AND COALESCE(sales.times_ordered, 0) >= 100 THEN 'high_performer'
            WHEN COALESCE(sales.total_revenue, 0) >= 10000 AND COALESCE(sales.times_ordered, 0) >= 20 THEN 'good_performer'
            WHEN COALESCE(sales.total_revenue, 0) >= 1000 AND COALESCE(sales.times_ordered, 0) >= 5 THEN 'average_performer'
            WHEN COALESCE(sales.times_ordered, 0) > 0 THEN 'low_performer'
            ELSE 'no_sales'
        END as performance_tier

    FROM postgresql_products p
    LEFT JOIN recursive_category_hierarchy rch ON p.primary_category_id = rch.category_id
    LEFT JOIN (
        SELECT 
            oi.product_id,
            COUNT(oi.order_item_id) as times_ordered,
            SUM(oi.quantity) as total_quantity_sold,
            SUM(oi.line_total) as total_revenue,
            AVG(oi.unit_price) as avg_selling_price,
            MAX(o.order_date) as last_ordered_date
        FROM postgresql_order_items oi
        INNER JOIN postgresql_orders o ON oi.order_id = o.order_id
        WHERE o.order_date >= CURRENT_DATE - INTERVAL '12 months'
          AND o.order_status NOT IN ('cancelled', 'returned')
        GROUP BY oi.product_id
    ) sales ON p.product_id = sales.product_id
    WHERE p.product_status IN ('active', 'discontinued')
)

-- Create optimized products collection
CREATE COLLECTION products WITH (
    enable_embedded_documents = true,
    enable_array_fields = true,
    compression_algorithm = 'snappy'
) AS (
    product_uuid UUID PRIMARY KEY,
    product_name VARCHAR(255) NOT NULL,
    product_sku VARCHAR(100) UNIQUE NOT NULL,
    product_description TEXT,
    product_status VARCHAR(20) NOT NULL,

    -- Embedded category information
    categories JSONB NOT NULL,

    -- Embedded pricing information
    pricing JSONB NOT NULL,

    -- Embedded inventory information
    inventory JSONB NOT NULL,

    -- Embedded physical characteristics
    physical JSONB,

    -- Embedded SEO information
    seo JSONB,

    -- Embedded lifecycle information
    lifecycle JSONB NOT NULL,

    -- Embedded analytics information
    analytics JSONB NOT NULL,

    -- Migration tracking
    migration_info JSONB NOT NULL,

    -- Optimized indexes
    INDEX product_uuid_idx (product_uuid) UNIQUE,
    INDEX product_sku_idx (product_sku) UNIQUE,
    INDEX primary_category_idx ((categories.primary)),
    INDEX performance_tier_idx ((analytics.performance_tier)),
    INDEX product_status_idx (product_status),
    INDEX stock_status_idx ((inventory.stock_status)),
    TEXT INDEX product_search_idx (product_name, product_description, product_sku)
);

-- Execute advanced product migration with comprehensive optimization
INSERT INTO products (
    product_uuid,
    product_name,
    product_sku,
    product_description,
    product_status,
    categories,
    pricing,
    inventory,
    physical,
    seo,
    lifecycle,
    analytics,
    migration_info
)
SELECT 
    epd.product_uuid,
    epd.product_name,
    epd.product_sku,
    epd.product_description,
    epd.product_status,

    -- Optimized category structure
    JSON_OBJECT(
        'primary', epd.primary_category,
        'hierarchy', STRING_TO_ARRAY(epd.category_hierarchy, ' > '),
        'path_ids', epd.category_path_ids,
        'depth', epd.category_depth,
        'is_featured', epd.category_featured
    ),

    -- Comprehensive pricing information
    JSON_OBJECT(
        'base_price', epd.base_price,
        'currency_code', epd.currency_code,
        'avg_selling_price', epd.avg_selling_price,
        'pricing_tier',
            CASE 
                WHEN epd.base_price >= 1000 THEN 'premium'
                WHEN epd.base_price >= 100 THEN 'standard'
                ELSE 'economy'
            END,
        'last_price_update', epd.updated_at
    ),

    -- Advanced inventory management
    JSON_OBJECT(
        'current_stock', epd.current_stock_level,
        'minimum_stock', epd.minimum_stock_level,
        'maximum_stock', epd.maximum_stock_level,
        'stock_status', epd.stock_status,
        'tracked', epd.current_stock_level IS NOT NULL,
        'reorder_point', epd.minimum_stock_level * 1.2,
        'stock_turnover_estimate',
            CASE 
                WHEN epd.total_quantity_sold > 0 AND epd.current_stock_level > 0 
                THEN ROUND(epd.total_quantity_sold::float / epd.current_stock_level, 2)
                ELSE 0
            END
    ),

    -- Physical characteristics and shipping
    JSON_OBJECT(
        'dimensions', JSON_OBJECT(
            'weight_kg', epd.weight_kg,
            'length_cm', epd.length_cm,
            'width_cm', epd.width_cm,
            'height_cm', epd.height_cm,
            'volume_cubic_cm', 
                CASE 
                    WHEN epd.length_cm IS NOT NULL AND epd.width_cm IS NOT NULL AND epd.height_cm IS NOT NULL 
                    THEN epd.length_cm * epd.width_cm * epd.height_cm
                    ELSE NULL
                END
        ),
        'shipping', JSON_OBJECT(
            'requires_shipping', epd.requires_shipping,
            'shipping_class', epd.shipping_class,
            'hazardous_material', epd.hazardous_material,
            'dimensional_weight',
                CASE 
                    WHEN epd.length_cm IS NOT NULL AND epd.width_cm IS NOT NULL AND epd.height_cm IS NOT NULL 
                    THEN (epd.length_cm * epd.width_cm * epd.height_cm) / 5000  -- Standard dimensional weight formula
                    ELSE NULL
                END
        )
    ),

    -- SEO and marketing optimization
    JSON_OBJECT(
        'title', epd.seo_title,
        'description', epd.seo_description,
        'keywords', 
            CASE 
                WHEN epd.meta_keywords IS NOT NULL 
                THEN STRING_TO_ARRAY(epd.meta_keywords, ',')
                ELSE ARRAY[]::text[]
            END,
        'url_slug', LOWER(REGEXP_REPLACE(epd.product_name, '[^a-zA-Z0-9]+', '-', 'g')),
        'search_terms', ARRAY[epd.product_name, epd.product_sku, epd.primary_category]
    ),

    -- Product lifecycle management
    JSON_OBJECT(
        'launch_date', epd.launch_date,
        'discontinuation_date', epd.discontinuation_date,
        'created_at', epd.created_at,
        'updated_at', epd.updated_at,
        'days_since_launch', 
            CASE 
                WHEN epd.launch_date IS NOT NULL 
                THEN DATE_PART('days', CURRENT_DATE - epd.launch_date)
                ELSE DATE_PART('days', CURRENT_DATE - epd.created_at)
            END,
        'lifecycle_stage',
            CASE 
                WHEN epd.discontinuation_date IS NOT NULL THEN 'discontinued'
                WHEN epd.launch_date > CURRENT_DATE - INTERVAL '90 days' THEN 'new'
                WHEN epd.launch_date > CURRENT_DATE - INTERVAL '1 year' THEN 'growing'
                ELSE 'mature'
            END
    ),

    -- Comprehensive analytics and performance metrics
    JSON_OBJECT(
        'times_ordered', epd.times_ordered,
        'total_quantity_sold', epd.total_quantity_sold,
        'total_revenue', epd.total_revenue,
        'last_ordered_date', epd.last_ordered_date,
        'performance_tier', epd.performance_tier,
        'revenue_per_order',
            CASE 
                WHEN epd.times_ordered > 0 
                THEN ROUND(epd.total_revenue / epd.times_ordered, 2)
                ELSE 0
            END,
        'average_order_quantity',
            CASE 
                WHEN epd.times_ordered > 0 
                THEN ROUND(epd.total_quantity_sold::float / epd.times_ordered, 2)
                ELSE 0
            END,
        'days_since_last_order',
            CASE 
                WHEN epd.last_ordered_date IS NOT NULL 
                THEN DATE_PART('days', CURRENT_DATE - epd.last_ordered_date)
                ELSE NULL
            END,
        'popularity_score',
            CASE 
                WHEN epd.times_ordered >= 100 THEN 'very_high'
                WHEN epd.times_ordered >= 50 THEN 'high'
                WHEN epd.times_ordered >= 10 THEN 'medium'
                WHEN epd.times_ordered > 0 THEN 'low'
                ELSE 'none'
            END
    ),

    -- Migration tracking and validation
    JSON_OBJECT(
        'migrated_at', CURRENT_TIMESTAMP,
        'migration_version', '1.0',
        'source_product_id', p.product_id,
        'data_validated', true,
        'transformation_applied', true,
        'analytics_computed', true,
        'optimization_level', 'comprehensive'
    )

FROM enhanced_product_data epd
JOIN postgresql_products p ON epd.product_uuid = p.product_uuid
WHERE epd.product_name IS NOT NULL
  AND epd.product_sku IS NOT NULL
  AND epd.product_status IN ('active', 'discontinued')

-- Migration execution with comprehensive options
WITH (
    migration_strategy = 'staged_migration',
    batch_size = 500,  -- Smaller batches for complex documents
    parallel_batches = 2,
    enable_progress_tracking = true,
    enable_data_validation = true,
    enable_performance_optimization = true,
    enable_rollback_capability = true,

    -- Advanced optimization
    enable_document_optimization = true,
    enable_index_optimization = true,
    enable_compression = true,

    -- Quality assurance
    validate_document_structure = true,
    validate_data_relationships = true,
    validate_business_rules = true,

    -- Monitoring
    enable_migration_metrics = true,
    enable_performance_tracking = true,
    track_optimization_effectiveness = true
);

-- Migration monitoring and analytics
CREATE VIEW migration_progress_dashboard AS
WITH migration_statistics AS (
    SELECT 
        'customers' as collection_name,
        COUNT(*) as migrated_documents,
        AVG(LENGTH(BSON(document))) as avg_document_size,
        MIN((migration_info->>'migrated_at')::timestamp) as migration_start,
        MAX((migration_info->>'migrated_at')::timestamp) as migration_end
    FROM customers
    WHERE migration_info->>'migrated_at' IS NOT NULL

    UNION ALL

    SELECT 
        'products' as collection_name,
        COUNT(*) as migrated_documents,
        AVG(LENGTH(BSON(document))) as avg_document_size,
        MIN((migration_info->>'migrated_at')::timestamp) as migration_start,
        MAX((migration_info->>'migrated_at')::timestamp) as migration_end
    FROM products
    WHERE migration_info->>'migrated_at' IS NOT NULL
),

performance_metrics AS (
    SELECT 
        collection_name,
        migrated_documents,
        avg_document_size,
        migration_start,
        migration_end,

        -- Calculate migration performance
        EXTRACT(EPOCH FROM (migration_end - migration_start)) as migration_duration_seconds,
        ROUND(migrated_documents::float / EXTRACT(EPOCH FROM (migration_end - migration_start)) * 60, 2) as documents_per_minute,

        -- Document optimization metrics
        CASE 
            WHEN avg_document_size < 16000 THEN 'optimal'  -- < 16KB
            WHEN avg_document_size < 100000 THEN 'good'    -- < 100KB  
            WHEN avg_document_size < 1000000 THEN 'large'  -- < 1MB
            ELSE 'very_large'
        END as document_size_classification,

        -- Migration efficiency assessment
        CASE 
            WHEN migrated_documents / EXTRACT(EPOCH FROM (migration_end - migration_start)) > 100 THEN 'excellent'
            WHEN migrated_documents / EXTRACT(EPOCH FROM (migration_end - migration_start)) > 50 THEN 'good'
            WHEN migrated_documents / EXTRACT(EPOCH FROM (migration_end - migration_start)) > 10 THEN 'acceptable'
            ELSE 'needs_optimization'
        END as migration_efficiency

    FROM migration_statistics
)

SELECT 
    pm.collection_name,
    pm.migrated_documents,
    ROUND(pm.avg_document_size / 1024.0, 2) || ' KB' as avg_document_size,
    pm.migration_start,
    pm.migration_end,
    ROUND(pm.migration_duration_seconds / 60.0, 2) || ' minutes' as migration_duration,
    pm.documents_per_minute || ' docs/min' as migration_throughput,
    pm.document_size_classification,
    pm.migration_efficiency,

    -- Success metrics
    'Migration completed successfully with optimized document structure' as status,

    -- Recommendations
    CASE pm.migration_efficiency
        WHEN 'needs_optimization' THEN 'Consider increasing batch size and parallel processing'
        WHEN 'acceptable' THEN 'Performance adequate, monitor for optimization opportunities'  
        WHEN 'good' THEN 'Good performance, continue with current configuration'
        WHEN 'excellent' THEN 'Excellent performance, configuration optimized'
    END as recommendation

FROM performance_metrics pm
ORDER BY pm.collection_name;

-- QueryLeaf provides comprehensive migration capabilities:
-- 1. Advanced data transformation with embedded documents and optimized structures
-- 2. Intelligent relationship modeling preserving data integrity
-- 3. Performance-optimized document design for MongoDB query patterns
-- 4. Comprehensive data validation and quality assurance
-- 5. Real-time migration monitoring and progress tracking
-- 6. Automated optimization recommendations based on data patterns
-- 7. Flexible schema evolution supporting future application requirements
-- 8. SQL-familiar syntax for MongoDB migration operations
-- 9. Enterprise-grade migration strategies with rollback capabilities
-- 10. Seamless integration with existing database infrastructure

Best Practices for MongoDB Migration

Migration Strategy and Planning

Essential principles for successful database modernization:

  1. Assessment and Planning: Thoroughly analyze source schemas, data relationships, and application access patterns before migration
  2. Document Design: Design MongoDB documents to optimize for application query patterns rather than maintaining relational structures
  3. Staged Migration: Implement gradual migration strategies to minimize downtime and enable thorough testing
  4. Data Validation: Implement comprehensive validation processes to ensure data integrity throughout the migration
  5. Performance Testing: Test migration performance and application performance against migrated data structures
  6. Rollback Capabilities: Maintain rollback strategies and data synchronization during transition periods

Data Transformation and Optimization

Optimize migration strategies for production-scale requirements:

  1. Relationship Analysis: Analyze relational patterns to determine optimal embedding vs. referencing strategies
  2. Index Planning: Design MongoDB indexes based on application query patterns and performance requirements
  3. Schema Flexibility: Leverage MongoDB's flexible schema capabilities while maintaining data consistency
  4. Batch Processing: Optimize batch sizes and parallel processing for large dataset migrations
  5. Monitoring Integration: Implement comprehensive monitoring for migration progress and post-migration performance
  6. Application Evolution: Plan application refactoring to leverage MongoDB's document model benefits

Conclusion

MongoDB NoSQL migration provides powerful capabilities for modernizing relational database architectures while preserving data integrity and improving application performance through optimized document design, flexible schemas, and intelligent data transformation strategies. The document-oriented approach naturally supports complex data relationships while eliminating the performance bottlenecks and complexity associated with traditional relational joins.

Key MongoDB Migration benefits include:

  • Intelligent Data Transformation: Sophisticated migration tools that preserve relationships while optimizing for document-oriented access patterns
  • Flexible Schema Evolution: Native support for schema changes without complex migrations or downtime requirements
  • Performance Optimization: Dramatic improvements in query performance through embedded documents and optimized data structures
  • Scalability Enhancement: Horizontal scaling capabilities that support modern application growth requirements
  • Developer Productivity: Simplified data modeling that aligns with object-oriented application architectures
  • SQL Accessibility: Familiar SQL-style migration operations through QueryLeaf for accessible database modernization

Whether you're modernizing legacy applications, building new cloud-native systems, or optimizing existing database performance, MongoDB migration with QueryLeaf's familiar SQL interface provides the foundation for successful database evolution.

QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB migration operations while providing familiar SQL syntax for data transformation, validation, and optimization. Advanced migration strategies, relationship modeling, and performance optimization techniques are seamlessly accessible through familiar SQL constructs, making sophisticated database modernization approachable for SQL-oriented development teams.

The combination of MongoDB's robust migration capabilities with SQL-style database operations makes it an ideal platform for organizations requiring both modern database performance and familiar database management patterns, ensuring your migration project can succeed while maintaining operational continuity and developer productivity.