Skip to content

2025

MongoDB Text Search and Full-Text Indexing: SQL-Style Search Queries

Building search functionality in MongoDB can be complex when working with the native operators. While MongoDB's $text and $regex operators are powerful, implementing comprehensive search features often requires understanding multiple MongoDB-specific concepts and syntax patterns.

Using SQL-style search queries makes text search more intuitive and maintainable, especially for teams familiar with traditional database search patterns.

The Text Search Challenge

Consider a content management system with articles, products, and user profiles. Traditional MongoDB text search involves multiple operators and complex aggregation pipelines:

// Sample article document
{
  "_id": ObjectId("..."),
  "title": "Getting Started with MongoDB Indexing",
  "content": "MongoDB provides several types of indexes to optimize query performance. Understanding compound indexes, text indexes, and partial indexes is crucial for building scalable applications.",
  "author": "Jane Developer",
  "category": "Database",
  "tags": ["mongodb", "indexing", "performance", "databases"],
  "publishDate": ISODate("2025-08-15"),
  "status": "published",
  "wordCount": 1250,
  "readTime": 5
}

Native MongoDB search requires multiple approaches:

// Basic text search
db.articles.find({
  $text: {
    $search: "mongodb indexing performance"
  }
})

// Complex search with multiple conditions
db.articles.find({
  $and: [
    { $text: { $search: "mongodb indexing" } },
    { status: "published" },
    { category: "Database" },
    { publishDate: { $gte: ISODate("2025-01-01") } }
  ]
}).sort({ score: { $meta: "textScore" } })

// Regex-based partial matches
db.articles.find({
  $or: [
    { title: { $regex: "mongodb", $options: "i" } },
    { content: { $regex: "mongodb", $options: "i" } }
  ]
})

The same searches become much more readable with SQL syntax:

-- Basic full-text search
SELECT title, author, publishDate, 
       MATCH_SCORE(title, content) AS relevance
FROM articles
WHERE MATCH(title, content) AGAINST ('mongodb indexing performance')
  AND status = 'published'
ORDER BY relevance DESC

-- Advanced search with multiple criteria
SELECT title, author, category, readTime,
       MATCH_SCORE(title, content) AS score
FROM articles  
WHERE MATCH(title, content) AGAINST ('mongodb indexing')
  AND category = 'Database'
  AND publishDate >= '2025-01-01'
  AND status = 'published'
ORDER BY score DESC, publishDate DESC

Setting Up Text Indexes

Before performing text searches, you need appropriate indexes. Here's how to create them:

Basic Text Index

-- Create text index on multiple fields
CREATE TEXT INDEX idx_articles_search 
ON articles (title, content)

MongoDB equivalent:

db.articles.createIndex({ 
  title: "text", 
  content: "text" 
})

Weighted Text Index

Give different importance to various fields:

-- Create weighted text index
CREATE TEXT INDEX idx_articles_weighted_search 
ON articles (title, content, tags)
WITH WEIGHTS (title: 10, content: 5, tags: 1)

MongoDB syntax:

db.articles.createIndex(
  { title: "text", content: "text", tags: "text" },
  { weights: { title: 10, content: 5, tags: 1 } }
)

Language-Specific Text Index

-- Create text index with language specification
CREATE TEXT INDEX idx_articles_english_search 
ON articles (title, content)
WITH LANGUAGE 'english'

MongoDB equivalent:

db.articles.createIndex(
  { title: "text", content: "text" },
  { default_language: "english" }
)

Search Query Patterns

-- Search for exact phrases
SELECT title, author, MATCH_SCORE(title, content) AS score
FROM articles
WHERE MATCH(title, content) AGAINST ('"compound indexes"')
  AND status = 'published'
ORDER BY score DESC

Boolean Search Operations

-- Advanced boolean search
SELECT title, author, category
FROM articles
WHERE MATCH(title, content) AGAINST ('mongodb +indexing -aggregation')
  AND status = 'published'

-- Search with OR conditions
SELECT title, author
FROM articles  
WHERE MATCH(title, content) AGAINST ('indexing OR performance OR optimization')
  AND category IN ('Database', 'Performance')

Case-Insensitive Pattern Matching

-- Partial string matching
SELECT title, author, category
FROM articles
WHERE title ILIKE '%mongodb%'
   OR content ILIKE '%mongodb%'
   OR ARRAY_TO_STRING(tags, ' ') ILIKE '%mongodb%'

-- Using REGEX for complex patterns
SELECT title, author
FROM articles
WHERE title REGEX '(?i)mongo.*db'
   OR content REGEX '(?i)index(ing|es)?'

Advanced Search Features

Search with Aggregations

Combine text search with analytical queries:

-- Search results with category breakdown
SELECT 
  category,
  COUNT(*) AS articleCount,
  AVG(MATCH_SCORE(title, content)) AS avgRelevance,
  AVG(readTime) AS avgReadTime
FROM articles
WHERE MATCH(title, content) AGAINST ('mongodb performance')
  AND status = 'published'
  AND publishDate >= '2025-01-01'
GROUP BY category
ORDER BY avgRelevance DESC

Search with JOIN Operations

-- Search articles with author information
SELECT 
  a.title,
  a.publishDate,
  u.name AS authorName,
  u.expertise,
  MATCH_SCORE(a.title, a.content) AS relevance
FROM articles a
JOIN users u ON a.author = u.username
WHERE MATCH(a.title, a.content) AGAINST ('indexing strategies')
  AND a.status = 'published'
  AND u.isActive = true
ORDER BY relevance DESC, a.publishDate DESC

Faceted Search Results

-- Get search results with facet counts
WITH search_results AS (
  SELECT *,
         MATCH_SCORE(title, content) AS score
  FROM articles
  WHERE MATCH(title, content) AGAINST ('mongodb optimization')
    AND status = 'published'
)
SELECT 
  'results' AS type,
  COUNT(*) AS count,
  JSON_AGG(
    JSON_BUILD_OBJECT(
      'title', title,
      'author', author,
      'category', category,
      'score', score
    )
  ) AS data
FROM search_results
WHERE score > 0.5

UNION ALL

SELECT 
  'categories' AS type,
  COUNT(*) AS count,
  JSON_AGG(
    JSON_BUILD_OBJECT(
      'category', category,
      'count', category_count
    )
  ) AS data
FROM (
  SELECT category, COUNT(*) AS category_count
  FROM search_results
  GROUP BY category
) category_facets

Performance Optimization

Create compound indexes that support both search and filtering:

-- Compound index for search + filtering
CREATE INDEX idx_articles_search_filter 
ON articles (status, category, publishDate)

-- Combined with text index for optimal performance
CREATE TEXT INDEX idx_articles_content_search
ON articles (title, content)

Search Result Pagination

-- Efficient pagination for search results
SELECT title, author, publishDate,
       MATCH_SCORE(title, content) AS score
FROM articles
WHERE MATCH(title, content) AGAINST ('mongodb tutorial')
  AND status = 'published'
ORDER BY score DESC, _id ASC
LIMIT 20 OFFSET 40

Search Performance Analysis

-- Analyze search query performance
EXPLAIN ANALYZE
SELECT title, author, MATCH_SCORE(title, content) AS score
FROM articles
WHERE MATCH(title, content) AGAINST ('performance optimization')
  AND category = 'Database'
  AND publishDate >= '2025-01-01'
ORDER BY score DESC
LIMIT 10

Real-World Search Implementation

// Sample product document
{
  "_id": ObjectId("..."),
  "name": "MacBook Pro 16-inch M3",
  "description": "Powerful laptop with M3 chip, perfect for development and creative work",
  "brand": "Apple",
  "category": "Laptops",
  "subcategory": "Professional",
  "price": 2499.99,
  "features": ["M3 chip", "16GB RAM", "1TB SSD", "Liquid Retina Display"],
  "tags": ["laptop", "apple", "macbook", "professional", "development"],
  "inStock": true,
  "rating": 4.8,
  "reviewCount": 1247
}

Comprehensive product search query:

SELECT 
  p.name,
  p.brand,
  p.price,
  p.rating,
  p.reviewCount,
  MATCH_SCORE(p.name, p.description) AS textScore,
  -- Boost score based on rating and reviews
  (MATCH_SCORE(p.name, p.description) * 0.7 + 
   (p.rating / 5.0) * 0.2 + 
   LOG(p.reviewCount + 1) * 0.1) AS finalScore
FROM products p
WHERE MATCH(p.name, p.description) AGAINST ('macbook pro development')
  AND p.inStock = true
  AND p.price BETWEEN 1000 AND 5000
  AND p.rating >= 4.0
ORDER BY finalScore DESC, p.reviewCount DESC
LIMIT 20

Content Discovery System

-- Find related articles based on search terms and user preferences
WITH user_interests AS (
  SELECT UNNEST(interests) AS interest
  FROM users 
  WHERE _id = ?
),
search_matches AS (
  SELECT 
    a.*,
    MATCH_SCORE(a.title, a.content) AS textScore
  FROM articles a
  WHERE MATCH(a.title, a.content) AGAINST (?)
    AND a.status = 'published'
    AND a.publishDate >= CURRENT_DATE - INTERVAL '90 days'
)
SELECT 
  s.title,
  s.author,
  s.category,
  s.publishDate,
  s.readTime,
  s.textScore,
  -- Boost articles matching user interests
  CASE 
    WHEN s.category IN (SELECT interest FROM user_interests) THEN s.textScore * 1.5
    WHEN EXISTS (
      SELECT 1 FROM user_interests ui 
      WHERE s.tags @> ARRAY[ui.interest]
    ) THEN s.textScore * 1.2
    ELSE s.textScore
  END AS personalizedScore
FROM search_matches s
ORDER BY personalizedScore DESC, s.publishDate DESC
LIMIT 15

Multi-Language Search Support

Language Detection and Indexing

-- Create language-specific indexes
CREATE TEXT INDEX idx_articles_english 
ON articles (title, content) 
WHERE language = 'english'
WITH LANGUAGE 'english'

CREATE TEXT INDEX idx_articles_spanish 
ON articles (title, content) 
WHERE language = 'spanish'
WITH LANGUAGE 'spanish'

Multi-Language Search Query

-- Search across multiple languages
SELECT 
  title,
  author,
  language,
  MATCH_SCORE(title, content) AS score
FROM articles
WHERE (
  (language = 'english' AND MATCH(title, content) AGAINST ('database performance'))
  OR 
  (language = 'spanish' AND MATCH(title, content) AGAINST ('rendimiento base datos'))
)
AND status = 'published'
ORDER BY score DESC

Search Analytics and Insights

Search Term Analysis

-- Analyze popular search terms (from search logs)
SELECT 
  searchTerm,
  COUNT(*) AS searchCount,
  AVG(resultCount) AS avgResults,
  AVG(clickThroughRate) AS avgCTR
FROM search_logs
WHERE searchDate >= CURRENT_DATE - INTERVAL '30 days'
  AND resultCount > 0
GROUP BY searchTerm
HAVING COUNT(*) >= 10
ORDER BY searchCount DESC, avgCTR DESC
LIMIT 20

Content Gap Analysis

-- Find search terms with low result counts
SELECT 
  sl.searchTerm,
  COUNT(*) AS searchFrequency,
  AVG(sl.resultCount) AS avgResultCount
FROM search_logs sl
WHERE sl.searchDate >= CURRENT_DATE - INTERVAL '30 days'
  AND sl.resultCount < 5
GROUP BY sl.searchTerm
HAVING COUNT(*) >= 5
ORDER BY searchFrequency DESC

QueryLeaf Integration

When using QueryLeaf for MongoDB text search, you gain several advantages:

-- QueryLeaf automatically optimizes this complex search query
SELECT 
  a.title,
  a.author,
  a.publishDate,
  u.name AS authorFullName,
  u.expertise,
  MATCH_SCORE(a.title, a.content) AS relevance,
  -- Complex scoring with user engagement metrics
  (MATCH_SCORE(a.title, a.content) * 0.6 + 
   LOG(a.viewCount + 1) * 0.2 + 
   a.socialShares * 0.2) AS engagementScore
FROM articles a
JOIN users u ON a.author = u.username
WHERE MATCH(a.title, a.content) AGAINST ('mongodb indexing performance optimization')
  AND a.status = 'published'
  AND a.publishDate >= '2025-01-01'
  AND u.isActive = true
  AND a.category IN ('Database', 'Performance', 'Tutorial')
ORDER BY engagementScore DESC, a.publishDate DESC
LIMIT 25

QueryLeaf handles the complex MongoDB aggregation pipeline generation, text index utilization, and query optimization automatically.

  1. Index Strategy: Create appropriate text indexes for your search fields
  2. Query Optimization: Use compound indexes to support filtering alongside text search
  3. Result Ranking: Implement scoring algorithms that consider relevance and business metrics
  4. Performance Monitoring: Regularly analyze search query performance and user behavior
  5. Content Quality: Maintain good content structure to improve search effectiveness

Conclusion

MongoDB's text search capabilities are powerful, but SQL-style queries make them much more accessible and maintainable. By using familiar SQL patterns, you can build sophisticated search functionality that performs well and is easy to understand.

Key benefits of SQL-style text search: - Intuitive query syntax for complex search operations - Easy integration of search with business logic and analytics - Better performance through optimized query planning - Simplified maintenance and debugging of search functionality

Whether you're building content discovery systems, e-commerce product search, or knowledge management platforms, SQL-style text search queries provide the clarity and power needed to create effective search experiences.

With QueryLeaf, you can leverage MongoDB's document flexibility while maintaining the search query patterns your team already knows, creating the best of both worlds for modern applications.

MongoDB Schema Design Patterns: Building Scalable Document Structures

MongoDB's flexible document model offers freedom from rigid table schemas, but this flexibility can be overwhelming. Unlike SQL databases with normalized tables, MongoDB requires careful consideration of how to structure documents to balance query performance, data consistency, and application scalability.

Understanding proven schema design patterns helps you leverage MongoDB's strengths while avoiding common pitfalls that can hurt performance and maintainability.

The Schema Design Challenge

Consider an e-commerce application with users, orders, and products. In SQL, you'd normalize this into separate tables:

-- SQL normalized approach
CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  email VARCHAR(255) UNIQUE,
  name VARCHAR(255),
  address_street VARCHAR(255),
  address_city VARCHAR(255),
  address_country VARCHAR(255)
);

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  user_id INTEGER REFERENCES users(id),
  order_date TIMESTAMP,
  total_amount DECIMAL(10,2),
  status VARCHAR(50)
);

CREATE TABLE order_items (
  id SERIAL PRIMARY KEY,
  order_id INTEGER REFERENCES orders(id),
  product_id INTEGER REFERENCES products(id),
  quantity INTEGER,
  price DECIMAL(10,2)
);

In MongoDB, you have multiple design options, each with different tradeoffs. Let's explore the main patterns.

Pattern 1: Embedding (Denormalization)

Embedding stores related data within a single document, reducing the need for joins.

// Embedded approach - Order with items embedded
{
  "_id": ObjectId("..."),
  "userId": ObjectId("..."),
  "userEmail": "[email protected]",
  "userName": "John Smith",
  "orderDate": ISODate("2025-08-17"),
  "status": "completed",
  "shippingAddress": {
    "street": "123 Main St",
    "city": "Seattle",
    "state": "WA",
    "zipCode": "98101",
    "country": "USA"
  },
  "items": [
    {
      "productId": ObjectId("..."),
      "name": "MacBook Pro",
      "price": 1299.99,
      "quantity": 1,
      "category": "Electronics"
    },
    {
      "productId": ObjectId("..."),
      "name": "USB-C Cable",
      "price": 19.99,
      "quantity": 2,
      "category": "Accessories"
    }
  ],
  "totalAmount": 1339.97
}

Benefits of Embedding:

  • Single Query Performance: Retrieve all related data in one operation
  • Atomic Updates: MongoDB guarantees ACID properties within a single document
  • Reduced Network Round Trips: No need for multiple queries or joins

SQL-Style Queries for Embedded Data:

-- Find orders with expensive items
SELECT 
  _id,
  userId,
  orderDate,
  items[0].name AS primaryItem,
  totalAmount
FROM orders
WHERE items[0].price > 1000
  AND status = 'completed'

-- Analyze spending by product category
SELECT 
  i.category,
  COUNT(*) AS orderCount,
  SUM(i.price * i.quantity) AS totalRevenue
FROM orders o
CROSS JOIN UNNEST(o.items) AS i
WHERE o.status = 'completed'
  AND o.orderDate >= '2025-01-01'
GROUP BY i.category
ORDER BY totalRevenue DESC

When to Use Embedding:

  • One-to-few relationships (typically < 100 subdocuments)
  • Child documents are always accessed with the parent
  • Child documents don't need independent querying
  • Document size stays under 16MB limit
  • Update patterns favor atomic operations

Pattern 2: References (Normalization)

References store related data in separate collections, similar to SQL foreign keys.

// Users collection
{
  "_id": ObjectId("user123"),
  "email": "[email protected]", 
  "name": "John Smith",
  "addresses": [
    {
      "type": "shipping",
      "street": "123 Main St",
      "city": "Seattle",
      "state": "WA",
      "zipCode": "98101",
      "country": "USA"
    }
  ]
}

// Orders collection
{
  "_id": ObjectId("order456"),
  "userId": ObjectId("user123"),
  "orderDate": ISODate("2025-08-17"),
  "status": "completed",
  "itemIds": [
    ObjectId("item789"),
    ObjectId("item790")
  ],
  "totalAmount": 1339.97
}

// Order Items collection  
{
  "_id": ObjectId("item789"),
  "orderId": ObjectId("order456"),
  "productId": ObjectId("prod001"),
  "name": "MacBook Pro",
  "price": 1299.99,
  "quantity": 1,
  "category": "Electronics"
}

SQL-Style Queries with References:

-- Join orders with user information
SELECT 
  o._id AS orderId,
  o.orderDate,
  o.totalAmount,
  u.name AS userName,
  u.email
FROM orders o
JOIN users u ON o.userId = u._id
WHERE o.status = 'completed'
  AND o.orderDate >= '2025-08-01'

-- Get detailed order information with items
SELECT 
  o._id AS orderId,
  o.orderDate,
  u.name AS customerName,
  i.name AS itemName,
  i.price,
  i.quantity
FROM orders o
JOIN users u ON o.userId = u._id
JOIN order_items i ON o._id = i.orderId
WHERE o.status = 'completed'
ORDER BY o.orderDate DESC, i.name

When to Use References:

  • One-to-many relationships with many children
  • Child documents need independent querying
  • Child documents are frequently updated
  • Need to maintain data consistency across documents
  • Document size would exceed MongoDB's 16MB limit

Pattern 3: Hybrid Approach

Combines embedding and referencing based on access patterns and data characteristics.

// Order with embedded frequently-accessed data and references for detailed data
{
  "_id": ObjectId("order456"),
  "userId": ObjectId("user123"),

  // Embedded user snapshot for quick access
  "userSnapshot": {
    "name": "John Smith",
    "email": "[email protected]",
    "membershipLevel": "gold"
  },

  "orderDate": ISODate("2025-08-17"),
  "status": "completed",

  // Embedded order items for atomic updates
  "items": [
    {
      "productId": ObjectId("prod001"),
      "name": "MacBook Pro", 
      "price": 1299.99,
      "quantity": 1
    }
  ],

  // Reference to detailed shipping info
  "shippingAddressId": ObjectId("addr123"),

  // Reference to payment information
  "paymentId": ObjectId("payment456"),

  "totalAmount": 1339.97
}

Benefits of Hybrid Approach:

  • Optimized Queries: Fast access to commonly needed data
  • Reduced Duplication: Reference detailed data that changes infrequently
  • Flexible Updates: Update embedded snapshots as needed

Advanced Schema Patterns

1. Polymorphic Pattern

Store different document types in the same collection:

// Products collection with different product types
{
  "_id": ObjectId("..."),
  "type": "book",
  "name": "MongoDB Definitive Guide",
  "price": 39.99,
  "isbn": "978-1449344689",
  "author": "Kristina Chodorow",
  "pages": 432
}

{
  "_id": ObjectId("..."),
  "type": "electronics",
  "name": "iPhone 15",
  "price": 799.99,
  "brand": "Apple",
  "model": "iPhone 15",
  "storage": "128GB"
}

Query with type-specific logic:

SELECT 
  name,
  price,
  CASE type
    WHEN 'book' THEN CONCAT(author, ' - ', pages, ' pages')
    WHEN 'electronics' THEN CONCAT(brand, ' ', model)
    ELSE 'Unknown product type'
  END AS productDetails
FROM products
WHERE price BETWEEN 30 AND 100
ORDER BY price DESC

2. Bucket Pattern

Group related documents to optimize for time-series or IoT data:

// Sensor readings bucketed by hour
{
  "_id": ObjectId("..."),
  "sensorId": "temp_sensor_01",
  "bucketDate": ISODate("2025-08-17T10:00:00Z"),
  "readings": [
    { "timestamp": ISODate("2025-08-17T10:00:00Z"), "value": 22.1 },
    { "timestamp": ISODate("2025-08-17T10:01:00Z"), "value": 22.3 },
    { "timestamp": ISODate("2025-08-17T10:02:00Z"), "value": 22.0 }
  ],
  "readingCount": 3,
  "minValue": 22.0,
  "maxValue": 22.3,
  "avgValue": 22.13
}

3. Outlier Pattern

Separate frequently accessed data from rare edge cases:

// Normal product document
{
  "_id": ObjectId("prod001"),
  "name": "Standard Widget",
  "price": 19.99,
  "category": "Widgets",
  "inStock": true,
  "hasOutliers": false
}

// Product with outlier data stored separately  
{
  "_id": ObjectId("prod002"), 
  "name": "Premium Widget",
  "price": 199.99,
  "category": "Widgets",
  "inStock": true,
  "hasOutliers": true
}

// Separate outlier collection
{
  "_id": ObjectId("..."),
  "productId": ObjectId("prod002"),
  "detailedSpecs": { /* large technical specifications */ },
  "userManual": "http://example.com/manual.pdf",
  "warrantyInfo": { /* detailed warranty terms */ }
}

Schema Design Decision Framework

1. Analyze Access Patterns

-- Common query: Get user's recent orders
SELECT * FROM orders 
WHERE userId = ? 
ORDER BY orderDate DESC 
LIMIT 10

-- This suggests embedding user snapshot in orders
-- Or at least indexing userId + orderDate

2. Consider Update Frequency

  • High Update Frequency: Use references to avoid document growth
  • Low Update Frequency: Embedding may be optimal
  • Atomic Updates Needed: Embed related data

3. Evaluate Data Growth

  • Bounded Growth: Embedding works well
  • Unbounded Growth: Use references
  • Predictable Growth: Hybrid approach

4. Query Performance Requirements

-- If this query is critical:
SELECT o.*, u.name, u.email
FROM orders o
JOIN users u ON o.userId = u._id
WHERE o.status = 'pending'

-- Consider embedding user snapshot in orders:
-- { "userSnapshot": { "name": "...", "email": "..." } }

Indexing Strategy for Different Patterns

Embedded Documents

// Index embedded array elements
db.orders.createIndex({ "items.productId": 1 })
db.orders.createIndex({ "items.category": 1, "orderDate": -1 })

// Index nested object fields
db.orders.createIndex({ "shippingAddress.city": 1 })

Referenced Documents

// Standard foreign key indexes
db.orders.createIndex({ "userId": 1, "orderDate": -1 })
db.orderItems.createIndex({ "orderId": 1 })
db.orderItems.createIndex({ "productId": 1 })

Migration Strategies

When your schema needs to evolve:

1. Adding New Fields (Easy)

// Add versioning to handle schema changes
{
  "_id": ObjectId("..."),
  "schemaVersion": 2,
  "userId": ObjectId("..."),
  // ... existing fields
  "newField": "new value"  // Added in version 2
}

2. Restructuring Documents (Complex)

-- Use aggregation to transform documents
UPDATE orders 
SET items = [
  {
    "productId": productId,
    "name": productName, 
    "price": price,
    "quantity": quantity
  }
]
WHERE schemaVersion = 1

Performance Testing Your Schema

Test different patterns with realistic data volumes:

// Load test embedded approach
for (let i = 0; i < 100000; i++) {
  db.orders.insertOne({
    userId: ObjectId(),
    items: generateRandomItems(1, 10),
    // ... other fields
  })
}

// Compare query performance
db.orders.find({ "userId": userId }).explain("executionStats")

QueryLeaf Schema Optimization

When using QueryLeaf for SQL-to-MongoDB translation, your schema design becomes even more critical. QueryLeaf can analyze your SQL query patterns and suggest optimal schema structures:

-- QueryLeaf can detect this join pattern
SELECT 
  o.orderDate,
  o.totalAmount,
  u.name AS customerName,
  i.productName,
  i.price
FROM orders o
JOIN users u ON o.userId = u._id
JOIN order_items i ON o._id = i.orderId
WHERE o.orderDate >= '2025-01-01'

-- And recommend either:
-- 1. Embedding user snapshots in orders
-- 2. Creating specific indexes for join performance
-- 3. Hybrid approach based on query frequency

Conclusion

Effective MongoDB schema design requires balancing multiple factors: query patterns, data relationships, update frequency, and performance requirements. There's no one-size-fits-all solution – the best approach depends on your specific use case.

Key principles: - Start with your queries: Design schemas to support your most important access patterns - Consider data lifecycle: How your data grows and changes over time - Measure performance: Test different approaches with realistic data volumes - Plan for evolution: Build in flexibility for future schema changes - Use appropriate indexes: Support your chosen schema pattern with proper indexing

Whether you choose embedding, referencing, or a hybrid approach, understanding these patterns helps you build MongoDB applications that scale efficiently while maintaining data integrity and query performance.

The combination of thoughtful schema design with tools like QueryLeaf gives you the flexibility of MongoDB documents with the query power of SQL – letting you build applications that are both performant and maintainable.

MongoDB Indexing Strategies: Optimizing Queries with SQL-Driven Approaches

MongoDB's indexing system is powerful, but designing effective indexes can be challenging when you're thinking in SQL terms. Understanding how your SQL queries translate to MongoDB operations is crucial for creating indexes that actually improve performance.

This guide shows how to design MongoDB indexes that support SQL-style queries, ensuring your applications run efficiently while maintaining query readability.

Understanding Index Types in MongoDB

MongoDB supports several index types that map well to SQL concepts:

  1. Single Field Indexes - Similar to SQL column indexes
  2. Compound Indexes - Like SQL multi-column indexes
  3. Text Indexes - For full-text search capabilities
  4. Partial Indexes - Equivalent to SQL conditional indexes
  5. TTL Indexes - For automatic document expiration

Basic Indexing for SQL-Style Queries

Single Field Indexes

Consider this user query pattern:

SELECT name, email, registrationDate
FROM users
WHERE email = '[email protected]'

Create a supporting index:

CREATE INDEX idx_users_email ON users (email)

In MongoDB shell syntax:

db.users.createIndex({ email: 1 })

Compound Indexes for Complex Queries

For queries involving multiple fields:

SELECT productName, price, category, inStock
FROM products
WHERE category = 'Electronics'
  AND price BETWEEN 100 AND 500
  AND inStock = true
ORDER BY price ASC

Create an optimized compound index:

CREATE INDEX idx_products_category_instock_price 
ON products (category, inStock, price)

MongoDB equivalent:

db.products.createIndex({ 
  category: 1, 
  inStock: 1, 
  price: 1 
})

The index field order matters: equality filters first, range filters last, sort fields at the end.

Indexing for Array Operations

When working with embedded arrays, index specific array positions for known access patterns:

// Sample order document
{
  "customerId": ObjectId("..."),
  "items": [
    { "product": "iPhone", "price": 999, "category": "Electronics" },
    { "product": "Case", "price": 29, "category": "Accessories" }
  ],
  "orderDate": ISODate("2025-01-15")
}

For this SQL query accessing the first item:

SELECT customerId, orderDate, items[0].product
FROM orders
WHERE items[0].category = 'Electronics'
  AND items[0].price > 500
ORDER BY orderDate DESC

Create targeted indexes:

-- Index for first item queries
CREATE INDEX idx_orders_first_item 
ON orders (items[0].category, items[0].price, orderDate)

-- General array element index (covers any position)
CREATE INDEX idx_orders_items_category 
ON orders (items.category, items.price)

Advanced Indexing Patterns

Text Search Indexes

For content search across multiple fields:

SELECT title, content, author
FROM articles
WHERE MATCH(title, content) AGAINST ('mongodb indexing')
ORDER BY score DESC

Create a text index:

CREATE TEXT INDEX idx_articles_search 
ON articles (title, content) 
WITH WEIGHTS (title: 2, content: 1)

MongoDB syntax:

db.articles.createIndex(
  { title: "text", content: "text" },
  { weights: { title: 2, content: 1 } }
)

Partial Indexes for Conditional Data

Index only relevant documents to save space:

-- Only index active users for login queries
CREATE INDEX idx_users_active_email 
ON users (email)
WHERE status = 'active'

MongoDB equivalent:

db.users.createIndex(
  { email: 1 },
  { partialFilterExpression: { status: "active" } }
)

TTL Indexes for Time-Based Data

Automatically expire temporary data:

-- Sessions expire after 24 hours
CREATE TTL INDEX idx_sessions_expiry 
ON sessions (createdAt)
EXPIRE AFTER 86400 SECONDS

MongoDB syntax:

db.sessions.createIndex(
  { createdAt: 1 },
  { expireAfterSeconds: 86400 }
)

JOIN-Optimized Indexing

When using SQL JOINs, ensure both collections have appropriate indexes:

SELECT 
  o.orderDate,
  o.totalAmount,
  c.name,
  c.region
FROM orders o
JOIN customers c ON o.customerId = c._id
WHERE c.region = 'North America'
  AND o.orderDate >= '2025-01-01'
ORDER BY o.orderDate DESC

Required indexes:

-- Index foreign key field in orders
CREATE INDEX idx_orders_customer_date 
ON orders (customerId, orderDate)

-- Index join condition and filter in customers  
CREATE INDEX idx_customers_region_id 
ON customers (region, _id)

Index Performance Analysis

Monitoring Index Usage

Check if your indexes are being used effectively:

-- Analyze query performance
EXPLAIN SELECT name, email
FROM users  
WHERE email = '[email protected]'
  AND status = 'active'

This helps identify: - Which indexes are used - Query execution time - Documents examined vs returned - Whether sorts use indexes

Index Optimization Tips

  1. Use Covered Queries: Include all selected fields in the index

    -- This query can be fully satisfied by the index
    CREATE INDEX idx_users_covered 
    ON users (email, status, name)
    
    SELECT name FROM users 
    WHERE email = '[email protected]' AND status = 'active'
    

  2. Optimize Sort Operations: Include sort fields in compound indexes

    CREATE INDEX idx_orders_status_date 
    ON orders (status, orderDate)
    
    SELECT * FROM orders 
    WHERE status = 'pending'
    ORDER BY orderDate DESC
    

  3. Consider Index Intersection: Sometimes multiple single-field indexes work better than one compound index

Real-World Indexing Strategy

E-commerce Platform Example

For a typical e-commerce application, here's a comprehensive indexing strategy:

-- Product catalog queries
CREATE INDEX idx_products_category_price ON products (category, price)
CREATE INDEX idx_products_search ON products (name, description) -- text index
CREATE INDEX idx_products_instock ON products (inStock, category)

-- Order management  
CREATE INDEX idx_orders_customer_date ON orders (customerId, orderDate)
CREATE INDEX idx_orders_status_date ON orders (status, orderDate)
CREATE INDEX idx_orders_items_category ON orders (items.category, items.price)

-- User management
CREATE INDEX idx_users_email ON users (email) -- unique
CREATE INDEX idx_users_region_status ON users (region, status)

-- Analytics queries
CREATE INDEX idx_orders_analytics ON orders (orderDate, status, totalAmount)

Query Pattern Matching

Design indexes based on your most common query patterns:

-- Pattern 1: Customer order history
SELECT * FROM orders 
WHERE customerId = ? 
ORDER BY orderDate DESC

-- Supporting index:
CREATE INDEX idx_orders_customer_date ON orders (customerId, orderDate)

-- Pattern 2: Product search with filters  
SELECT * FROM products
WHERE category = ? AND price BETWEEN ? AND ?
ORDER BY price ASC

-- Supporting index:
CREATE INDEX idx_products_category_price ON products (category, price)

-- Pattern 3: Recent activity analytics
SELECT DATE(orderDate), COUNT(*), SUM(totalAmount)
FROM orders
WHERE orderDate >= ?
GROUP BY DATE(orderDate)

-- Supporting index:
CREATE INDEX idx_orders_date_amount ON orders (orderDate, totalAmount)

Index Maintenance and Monitoring

Identifying Missing Indexes

Use query analysis to find slow operations:

-- Queries scanning many documents suggest missing indexes
EXPLAIN ANALYZE SELECT * FROM orders 
WHERE status = 'pending' AND items[0].category = 'Electronics'

If the explain plan shows high totalDocsExamined relative to totalDocsReturned, you likely need better indexes.

Removing Unused Indexes

Monitor index usage and remove unnecessary ones:

// MongoDB command to see index usage stats
db.orders.aggregate([{ $indexStats: {} }])

Remove indexes that haven't been used:

DROP INDEX idx_orders_unused ON orders

Performance Best Practices

  1. Limit Index Count: Too many indexes slow down writes
  2. Use Ascending Order: Unless you specifically need descending sorts
  3. Index Selectivity: Put most selective fields first in compound indexes
  4. Monitor Index Size: Large indexes impact memory usage
  5. Regular Maintenance: Rebuild indexes periodically in busy systems

QueryLeaf Integration

When using QueryLeaf for SQL-to-MongoDB translation, your indexing strategy becomes even more important. QueryLeaf can provide index recommendations based on your SQL query patterns:

-- QueryLeaf can suggest optimal indexes for complex queries
SELECT 
  c.region,
  COUNT(DISTINCT o.customerId) AS uniqueCustomers,
  SUM(i.price * i.quantity) AS totalRevenue
FROM customers c
JOIN orders o ON c._id = o.customerId  
CROSS JOIN UNNEST(o.items) AS i
WHERE o.orderDate >= '2025-01-01'
  AND o.status = 'completed'
GROUP BY c.region
HAVING totalRevenue > 10000
ORDER BY totalRevenue DESC

QueryLeaf analyzes such queries and can recommend compound indexes that support the JOIN conditions, array operations, filtering, grouping, and sorting requirements.

Conclusion

Effective MongoDB indexing requires understanding how your SQL queries translate to document operations. By thinking about indexes in terms of your query patterns rather than just individual fields, you can create an indexing strategy that significantly improves application performance.

Key takeaways: - Design indexes to match your SQL query patterns - Use compound indexes for multi-field queries and sorts - Consider partial indexes for conditional data - Monitor and maintain indexes based on actual usage - Test index effectiveness with realistic data volumes

With proper indexing aligned to your SQL query patterns, MongoDB can deliver excellent performance while maintaining the query readability you're used to from SQL databases.

MongoDB Data Modeling: Managing Relationships with SQL-Style Queries

One of the biggest challenges when transitioning from relational databases to MongoDB is understanding how to model relationships between data. MongoDB's flexible document structure offers multiple ways to represent relationships, but choosing the right approach can be confusing.

This guide shows how to design and query MongoDB relationships using familiar SQL patterns, making data modeling decisions clearer and queries more intuitive.

Understanding MongoDB Relationship Patterns

MongoDB provides several ways to model relationships:

  1. Embedded Documents - Store related data within the same document
  2. References - Store ObjectId references to other documents
  3. Hybrid Approach - Combine embedding and referencing strategically

Let's explore each pattern with practical examples.

Pattern 1: Embedded Relationships

When to Embed

Use embedded documents when: - Related data is always accessed together - The embedded data has a clear ownership relationship - The embedded collection size is bounded and relatively small

Example: Blog Posts with Comments

// Embedded approach
{
  "_id": ObjectId("..."),
  "title": "Getting Started with MongoDB",
  "content": "MongoDB is a powerful NoSQL database...",
  "author": "Jane Developer",
  "publishDate": ISODate("2025-01-10"),
  "comments": [
    {
      "author": "John Reader",
      "text": "Great article!",
      "date": ISODate("2025-01-11")
    },
    {
      "author": "Alice Coder",
      "text": "Very helpful examples",
      "date": ISODate("2025-01-12")
    }
  ]
}

Querying embedded data with SQL is straightforward:

-- Find posts with comments containing specific text
SELECT title, author, publishDate
FROM posts
WHERE comments[0].text LIKE '%helpful%'
   OR comments[1].text LIKE '%helpful%'
   OR comments[2].text LIKE '%helpful%'

-- Get posts with recent comments
SELECT title, comments[0].author, comments[0].date
FROM posts  
WHERE comments[0].date >= '2025-01-01'
ORDER BY comments[0].date DESC

The equivalent MongoDB aggregation would be much more complex:

db.posts.aggregate([
  {
    $match: {
      "comments.text": { $regex: /helpful/i }
    }
  },
  {
    $project: {
      title: 1,
      author: 1, 
      publishDate: 1
    }
  }
])

Pattern 2: Referenced Relationships

When to Reference

Use references when: - Related documents are large or frequently updated independently - You need to avoid duplication across multiple parent documents - Relationship cardinality is one-to-many or many-to-many

Example: E-commerce with Separate Collections

// Orders collection
{
  "_id": ObjectId("..."),
  "customerId": ObjectId("507f1f77bcf86cd799439011"),
  "orderDate": ISODate("2025-01-15"),
  "totalAmount": 1299.97,
  "status": "processing"
}

// Customers collection  
{
  "_id": ObjectId("507f1f77bcf86cd799439011"),
  "name": "Sarah Johnson",
  "email": "[email protected]",
  "address": {
    "street": "123 Main St",
    "city": "Seattle", 
    "state": "WA"
  },
  "memberSince": ISODate("2024-03-15")
}

SQL JOINs make working with references intuitive:

-- Get order details with customer information
SELECT 
  o.orderDate,
  o.totalAmount,
  o.status,
  c.name AS customerName,
  c.email,
  c.address.city
FROM orders o
JOIN customers c ON o.customerId = c._id
WHERE o.orderDate >= '2025-01-01'
ORDER BY o.orderDate DESC

Advanced Reference Queries

-- Find customers with multiple high-value orders
SELECT 
  c.name,
  c.email,
  COUNT(o._id) AS orderCount,
  SUM(o.totalAmount) AS totalSpent
FROM customers c
JOIN orders o ON c._id = o.customerId
WHERE o.totalAmount > 500
GROUP BY c._id, c.name, c.email
HAVING COUNT(o._id) >= 3
ORDER BY totalSpent DESC

Pattern 3: Hybrid Approach

When to Use Hybrid Modeling

Combine embedding and referencing when: - You need both immediate access to summary data and detailed information - Some related data changes frequently while other parts remain stable - You want to optimize for different query patterns

Example: User Profiles with Activity History

// Users collection with embedded recent activity + references
{
  "_id": ObjectId("..."),
  "username": "developer_mike",
  "profile": {
    "name": "Mike Chen",
    "avatar": "/images/avatars/mike.jpg",
    "bio": "Full-stack developer"
  },
  "recentActivity": [
    {
      "type": "post_created",
      "title": "MongoDB Best Practices", 
      "date": ISODate("2025-01-14"),
      "postId": ObjectId("...")
    },
    {
      "type": "comment_added",
      "text": "Great point about indexing",
      "date": ISODate("2025-01-13"), 
      "postId": ObjectId("...")
    }
  ],
  "stats": {
    "totalPosts": 127,
    "totalComments": 892,
    "reputation": 2450
  }
}

// Separate Posts collection for full content
{
  "_id": ObjectId("..."),
  "authorId": ObjectId("..."),
  "title": "MongoDB Best Practices",
  "content": "When working with MongoDB...",
  "publishDate": ISODate("2025-01-14")
}

Query both embedded and referenced data:

-- Get user dashboard with recent activity and full post details
SELECT 
  u.username,
  u.profile.name,
  u.recentActivity[0].title AS latestActivityTitle,
  u.recentActivity[0].date AS latestActivityDate,
  u.stats.totalPosts,
  p.content AS latestPostContent
FROM users u
LEFT JOIN posts p ON u.recentActivity[0].postId = p._id
WHERE u.recentActivity[0].type = 'post_created'
  AND u.recentActivity[0].date >= '2025-01-01'
ORDER BY u.recentActivity[0].date DESC

Performance Optimization for Relationships

Indexing Strategies

-- Index embedded array fields for efficient queries
CREATE INDEX ON orders (items[0].category, items[0].price)

-- Index reference fields
CREATE INDEX ON orders (customerId, orderDate)

-- Compound indexes for complex queries
CREATE INDEX ON posts (authorId, publishDate, status)

Query Optimization Patterns

-- Efficient pagination with references
SELECT 
  o._id,
  o.orderDate,
  o.totalAmount,
  c.name
FROM orders o
JOIN customers c ON o.customerId = c._id
WHERE o.orderDate >= '2025-01-01'
ORDER BY o.orderDate DESC
LIMIT 20 OFFSET 0

Choosing the Right Pattern

Decision Matrix

Scenario Pattern Reason
User profiles with preferences Embedded Preferences are small and always accessed with user
Blog posts with comments Embedded Comments belong to post, bounded size
Orders with customer data Referenced Customer data is large and shared across orders
Products with inventory tracking Referenced Inventory changes frequently and independently
Shopping cart items Embedded Cart items are temporary and belong to session
Order items with product details Hybrid Embed order-specific data, reference product catalog

Performance Guidelines

-- Good: Query embedded data directly
SELECT customerId, items[0].name, items[0].price
FROM orders
WHERE items[0].category = 'Electronics'

-- Better: Use references for large related documents
SELECT o.orderDate, c.name, c.address.city
FROM orders o  
JOIN customers c ON o.customerId = c._id
WHERE c.address.state = 'CA'

-- Best: Hybrid approach for optimal queries
SELECT 
  u.username,
  u.stats.reputation,
  u.recentActivity[0].title,
  p.content
FROM users u
JOIN posts p ON u.recentActivity[0].postId = p._id
WHERE u.stats.reputation > 1000

Data Consistency Patterns

Maintaining Reference Integrity

-- Find orphaned records
SELECT o._id, o.customerId
FROM orders o
LEFT JOIN customers c ON o.customerId = c._id
WHERE c._id IS NULL

-- Update related documents atomically
UPDATE users
SET stats.totalPosts = stats.totalPosts + 1
WHERE _id = '507f1f77bcf86cd799439011'

Querying with QueryLeaf

All the SQL examples in this guide work seamlessly with QueryLeaf, which translates your familiar SQL syntax into optimized MongoDB operations. You get the modeling flexibility of MongoDB with the query clarity of SQL.

For more details on advanced relationship queries, see our guides on JOINs and nested field access.

Conclusion

MongoDB relationship modeling doesn't have to be complex. By understanding when to embed, reference, or use hybrid approaches, you can design schemas that are both performant and maintainable.

Using SQL syntax for relationship queries provides several advantages: - Familiar patterns for developers with SQL background - Clear expression of business logic and data relationships - Easier debugging and query optimization - Better collaboration across teams with mixed database experience

The key is choosing the right modeling pattern for your use case and then leveraging SQL's expressive power to query your MongoDB data effectively. With the right approach, you get MongoDB's document flexibility combined with SQL's query clarity.

MongoDB Aggregation Pipelines Simplified: From Complex Pipelines to Simple SQL

MongoDB's aggregation framework is powerful, but its multi-stage pipeline syntax can be overwhelming for developers coming from SQL backgrounds. Complex operations that would be straightforward in SQL often require lengthy aggregation pipelines with multiple stages, operators, and nested expressions.

What if you could achieve the same results using familiar SQL syntax? Let's explore how to transform complex MongoDB aggregations into readable SQL queries.

The Aggregation Pipeline Challenge

Consider an e-commerce database with orders and customers. A common business requirement is to analyze sales by region and product category. Here's what this looks like with MongoDB's native aggregation:

// Sample documents
// Orders collection:
{
  "_id": ObjectId("..."),
  "customerId": ObjectId("..."),
  "orderDate": ISODate("2025-07-15"),
  "items": [
    { "product": "iPhone 15", "category": "Electronics", "price": 999, "quantity": 1 },
    { "product": "Case", "category": "Accessories", "price": 29, "quantity": 2 }
  ],
  "status": "completed"
}

// Customers collection:
{
  "_id": ObjectId("..."),
  "name": "John Smith",
  "email": "[email protected]",
  "region": "North America",
  "registrationDate": ISODate("2024-03-10")
}

To get sales by region and category, you'd need this complex aggregation pipeline:

db.orders.aggregate([
  // Stage 1: Match completed orders from last 30 days
  {
    $match: {
      status: "completed",
      orderDate: { $gte: new Date(Date.now() - 30*24*60*60*1000) }
    }
  },

  // Stage 2: Unwind the items array
  { $unwind: "$items" },

  // Stage 3: Join with customers
  {
    $lookup: {
      from: "customers",
      localField: "customerId",
      foreignField: "_id",
      as: "customer"
    }
  },

  // Stage 4: Unwind customer (since lookup returns array)
  { $unwind: "$customer" },

  // Stage 5: Calculate item total and group by region/category
  {
    $group: {
      _id: {
        region: "$customer.region",
        category: "$items.category"
      },
      totalRevenue: { 
        $sum: { $multiply: ["$items.price", "$items.quantity"] }
      },
      orderCount: { $sum: 1 },
      avgOrderValue: { 
        $avg: { $multiply: ["$items.price", "$items.quantity"] }
      }
    }
  },

  // Stage 6: Sort by revenue descending
  { $sort: { totalRevenue: -1 } },

  // Stage 7: Format output
  {
    $project: {
      _id: 0,
      region: "$_id.region",
      category: "$_id.category",
      totalRevenue: 1,
      orderCount: 1,
      avgOrderValue: { $round: ["$avgOrderValue", 2] }
    }
  }
])

This pipeline has 7 stages and is difficult to read, modify, or debug. The logic is spread across multiple stages, making it hard to understand the business intent.

SQL: Clear and Concise

The same analysis becomes much more readable with SQL:

SELECT 
  c.region,
  i.category,
  SUM(i.price * i.quantity) AS totalRevenue,
  COUNT(*) AS orderCount,
  ROUND(AVG(i.price * i.quantity), 2) AS avgOrderValue
FROM orders o
JOIN customers c ON o.customerId = c._id
CROSS JOIN UNNEST(o.items) AS i
WHERE o.status = 'completed'
  AND o.orderDate >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY c.region, i.category
ORDER BY totalRevenue DESC

The SQL version is much more concise and follows a logical flow that matches how we think about the problem. Let's break down more examples.

Common Aggregation Patterns in SQL

1. Time-Based Analytics

MongoDB aggregation for daily sales trends:

db.orders.aggregate([
  {
    $match: {
      orderDate: { $gte: ISODate("2025-07-01") },
      status: "completed"
    }
  },
  {
    $group: {
      _id: {
        year: { $year: "$orderDate" },
        month: { $month: "$orderDate" },
        day: { $dayOfMonth: "$orderDate" }
      },
      dailySales: { $sum: "$totalAmount" },
      orderCount: { $sum: 1 }
    }
  },
  {
    $project: {
      _id: 0,
      date: {
        $dateFromParts: {
          year: "$_id.year",
          month: "$_id.month",
          day: "$_id.day"
        }
      },
      dailySales: 1,
      orderCount: 1
    }
  },
  { $sort: { date: 1 } }
])

SQL equivalent:

SELECT 
  DATE(orderDate) AS date,
  SUM(totalAmount) AS dailySales,
  COUNT(*) AS orderCount
FROM orders
WHERE orderDate >= '2025-07-01'
  AND status = 'completed'
GROUP BY DATE(orderDate)
ORDER BY date

2. Complex Filtering and Grouping

Finding top customers by spending in each region:

db.orders.aggregate([
  { $match: { status: "completed" } },
  {
    $lookup: {
      from: "customers",
      localField: "customerId", 
      foreignField: "_id",
      as: "customer"
    }
  },
  { $unwind: "$customer" },
  {
    $group: {
      _id: {
        customerId: "$customerId",
        region: "$customer.region"
      },
      customerName: { $first: "$customer.name" },
      totalSpent: { $sum: "$totalAmount" },
      orderCount: { $sum: 1 }
    }
  },
  { $sort: { "_id.region": 1, totalSpent: -1 } },
  {
    $group: {
      _id: "$_id.region",
      topCustomers: {
        $push: {
          customerId: "$_id.customerId",
          name: "$customerName",
          totalSpent: "$totalSpent",
          orderCount: "$orderCount"
        }
      }
    }
  }
])

SQL with window functions:

SELECT 
  region,
  customerId,
  customerName,
  totalSpent,
  orderCount,
  RANK() OVER (PARTITION BY region ORDER BY totalSpent DESC) as regionRank
FROM (
  SELECT 
    c.region,
    o.customerId,
    c.name AS customerName,
    SUM(o.totalAmount) AS totalSpent,
    COUNT(*) AS orderCount
  FROM orders o
  JOIN customers c ON o.customerId = c._id
  WHERE o.status = 'completed'
  GROUP BY c.region, o.customerId, c.name
) customer_totals
WHERE regionRank <= 5
ORDER BY region, totalSpent DESC

3. Advanced Array Processing

Analyzing product performance across all orders:

db.orders.aggregate([
  { $match: { status: "completed" } },
  { $unwind: "$items" },
  {
    $group: {
      _id: "$items.product",
      category: { $first: "$items.category" },
      totalQuantity: { $sum: "$items.quantity" },
      totalRevenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
      avgPrice: { $avg: "$items.price" },
      orderFrequency: { $sum: 1 }
    }
  },
  { $sort: { totalRevenue: -1 } },
  {
    $project: {
      _id: 0,
      product: "$_id",
      category: 1,
      totalQuantity: 1,
      totalRevenue: 1,
      avgPrice: { $round: ["$avgPrice", 2] },
      orderFrequency: 1
    }
  }
])

SQL equivalent:

SELECT 
  i.product,
  i.category,
  SUM(i.quantity) AS totalQuantity,
  SUM(i.price * i.quantity) AS totalRevenue,
  ROUND(AVG(i.price), 2) AS avgPrice,
  COUNT(*) AS orderFrequency
FROM orders o
CROSS JOIN UNNEST(o.items) AS i
WHERE o.status = 'completed'
GROUP BY i.product, i.category
ORDER BY totalRevenue DESC

Advanced SQL Features for MongoDB

Conditional Aggregations

Instead of multiple MongoDB pipeline stages for conditional logic:

SELECT 
  customerId,
  COUNT(*) AS totalOrders,
  COUNT(CASE WHEN totalAmount > 100 THEN 1 END) AS highValueOrders,
  COUNT(CASE WHEN status = 'completed' THEN 1 END) AS completedOrders,
  ROUND(
    COUNT(CASE WHEN totalAmount > 100 THEN 1 END) * 100.0 / COUNT(*), 
    2
  ) AS highValuePercentage
FROM orders
WHERE orderDate >= '2025-01-01'
GROUP BY customerId
HAVING COUNT(*) >= 5
ORDER BY highValuePercentage DESC

Window Functions for Rankings

-- Top 3 products in each category by revenue
SELECT *
FROM (
  SELECT 
    i.category,
    i.product,
    SUM(i.price * i.quantity) AS revenue,
    ROW_NUMBER() OVER (PARTITION BY i.category ORDER BY SUM(i.price * i.quantity) DESC) as rank
  FROM orders o
  CROSS JOIN UNNEST(o.items) AS i
  WHERE o.status = 'completed'
  GROUP BY i.category, i.product
) ranked_products
WHERE rank <= 3
ORDER BY category, rank

Performance Benefits

SQL queries often perform better because:

  1. Query Optimization: SQL engines optimize entire queries, while MongoDB processes each pipeline stage separately
  2. Index Usage: SQL can better utilize compound indexes across JOINs
  3. Memory Efficiency: No need to pass large intermediate result sets between pipeline stages
  4. Parallel Processing: SQL engines can parallelize operations more effectively

When to Use SQL vs Native Aggregation

Use SQL-style queries when: - Writing complex analytics and reporting queries - Team members are more familiar with SQL - You need readable, maintainable code - Working with multiple collections (JOINs)

Stick with MongoDB aggregation when: - Using MongoDB-specific features like $facet or $bucket - Need fine-grained control over pipeline stages - Working with highly specialized MongoDB operators - Performance testing shows aggregation pipeline is faster for your specific use case

Real-World Example: Customer Segmentation

Here's a practical customer segmentation analysis that would be complex in MongoDB but straightforward in SQL:

SELECT 
  CASE 
    WHEN totalSpent > 1000 THEN 'VIP'
    WHEN totalSpent > 500 THEN 'Premium'
    WHEN totalSpent > 100 THEN 'Regular'
    ELSE 'New'
  END AS customerSegment,
  COUNT(*) AS customerCount,
  AVG(totalSpent) AS avgSpending,
  AVG(orderCount) AS avgOrders,
  MIN(lastOrderDate) AS earliestLastOrder,
  MAX(lastOrderDate) AS latestLastOrder
FROM (
  SELECT 
    c._id,
    c.name,
    COALESCE(SUM(o.totalAmount), 0) AS totalSpent,
    COUNT(o._id) AS orderCount,
    MAX(o.orderDate) AS lastOrderDate
  FROM customers c
  LEFT JOIN orders o ON c._id = o.customerId AND o.status = 'completed'
  GROUP BY c._id, c.name
) customer_summary
GROUP BY customerSegment
ORDER BY 
  CASE customerSegment
    WHEN 'VIP' THEN 1
    WHEN 'Premium' THEN 2  
    WHEN 'Regular' THEN 3
    ELSE 4
  END

Getting Started with QueryLeaf

Ready to simplify your MongoDB aggregations? QueryLeaf allows you to write SQL queries that automatically compile to optimized MongoDB operations. You get the readability of SQL with the flexibility of MongoDB's document model.

For more information about advanced SQL features, check out our guides on GROUP BY operations and working with JOINs.

Conclusion

MongoDB aggregation pipelines are powerful but can become unwieldy for complex analytics. SQL provides a more intuitive way to express these operations, making your code more readable and maintainable.

By using SQL syntax for MongoDB operations, you can: - Reduce complexity in data analysis queries - Make code more accessible to SQL-familiar team members
- Improve query maintainability and debugging - Leverage familiar patterns for complex business logic

The combination of SQL's expressiveness with MongoDB's document flexibility gives you the best of both worlds – clear, concise queries that work with your existing MongoDB data structures.

MongoDB Array Operations Made Simple with SQL Syntax

Working with arrays in MongoDB can be challenging, especially if you come from a SQL background. MongoDB's native query syntax for arrays involves complex aggregation pipelines and operators that can be intimidating for developers used to straightforward SQL queries.

What if you could query MongoDB arrays using the SQL syntax you already know? Let's explore how to make MongoDB array operations intuitive and readable.

The Array Query Challenge in MongoDB

Consider a typical e-commerce scenario where you have orders with arrays of items:

{
  "_id": ObjectId("..."),
  "customerId": "user123",
  "orderDate": "2025-01-10",
  "items": [
    { "name": "Laptop", "price": 999.99, "category": "Electronics" },
    { "name": "Mouse", "price": 29.99, "category": "Electronics" },
    { "name": "Keyboard", "price": 79.99, "category": "Electronics" }
  ],
  "status": "shipped"
}

In native MongoDB, finding orders where the first item costs more than $500 requires this aggregation pipeline:

db.orders.aggregate([
  {
    $match: {
      "items.0.price": { $gt: 500 }
    }
  },
  {
    $project: {
      customerId: 1,
      orderDate: 1,
      firstItemName: "$items.0.name",
      firstItemPrice: "$items.0.price",
      status: 1
    }
  }
])

This works, but it's verbose and not intuitive for developers familiar with SQL.

SQL Array Access: Intuitive and Readable

With SQL syntax for MongoDB, the same query becomes straightforward:

SELECT 
  customerId,
  orderDate,
  items[0].name AS firstItemName,
  items[0].price AS firstItemPrice,
  status
FROM orders
WHERE items[0].price > 500

Much cleaner, right? Let's explore more array operations.

Common Array Operations with SQL

1. Accessing Specific Array Elements

Query orders where the second item is in the Electronics category:

SELECT customerId, orderDate, items[1].name, items[1].category
FROM orders
WHERE items[1].category = 'Electronics'

This translates to MongoDB's items.1.category field path, handling the zero-based indexing automatically.

2. Working with Nested Arrays

For documents with nested arrays, like product reviews with ratings arrays:

{
  "productId": "prod456",
  "reviews": [
    {
      "user": "alice",
      "rating": 5,
      "tags": ["excellent", "fast-delivery"]
    },
    {
      "user": "bob", 
      "rating": 4,
      "tags": ["good", "value-for-money"]
    }
  ]
}

Find products where the first review's second tag is "fast-delivery":

SELECT productId, reviews[0].user, reviews[0].rating
FROM products
WHERE reviews[0].tags[1] = 'fast-delivery'

3. Filtering and Projecting Array Elements

Get order details showing only the first two items:

SELECT 
  customerId,
  orderDate,
  items[0].name AS item1Name,
  items[0].price AS item1Price,
  items[1].name AS item2Name,
  items[1].price AS item2Price
FROM orders
WHERE status = 'shipped'

4. Array Operations in JOINs

When joining collections that contain arrays, SQL syntax makes relationships clear:

SELECT 
  u.name,
  u.email,
  o.orderDate,
  o.items[0].name AS primaryItem
FROM users u
JOIN orders o ON u._id = o.customerId
WHERE o.items[0].price > 100

This joins users with orders and filters by the first item's price, automatically handling ObjectId conversion.

Advanced Array Patterns

Working with Dynamic Array Access

While direct array indexing works well for known positions, you can also combine array access with other SQL features:

-- Get orders where any item exceeds $500
SELECT customerId, orderDate, status
FROM orders
WHERE items[0].price > 500 
   OR items[1].price > 500 
   OR items[2].price > 500

For more complex array queries that need to check all elements regardless of position, you'd still use MongoDB's native array operators, but for specific positional queries, SQL array access is perfect.

Updating Array Elements

Updating specific array positions is also intuitive with SQL syntax:

-- Update the price of the first item in an order
UPDATE orders
SET items[0].price = 899.99
WHERE _id = '507f1f77bcf86cd799439011'

-- Update nested array values
UPDATE products
SET reviews[0].tags[1] = 'super-fast-delivery'
WHERE productId = 'prod456'

Performance Considerations

When working with array operations:

  1. Index Array Elements: Create indexes on frequently queried array positions like items.0.price
  2. Limit Deep Nesting: Accessing deeply nested arrays (items[0].details[2].specs[1]) can be slow
  3. Consider Array Size: Operations on large arrays may impact performance
  4. Use Compound Indexes: For queries combining array access with other fields

Real-World Example: E-commerce Analytics

Here's a practical example analyzing order patterns:

-- Find high-value orders where the primary item is expensive
SELECT 
  customerId,
  orderDate,
  items[0].name AS primaryProduct,
  items[0].price AS primaryPrice,
  items[0].category,
  status
FROM orders
WHERE items[0].price > 200
  AND status IN ('shipped', 'delivered')
  AND orderDate >= '2025-01-01'
ORDER BY items[0].price DESC
LIMIT 50

This query helps identify customers who purchase high-value primary items, useful for marketing campaigns or inventory planning.

When to Use Array Indexing vs Native MongoDB Queries

Use SQL array indexing when: - Accessing specific, known array positions - Working with fixed-structure arrays - Writing readable queries for specific business logic - Team members are more comfortable with SQL

Use native MongoDB queries when: - Need to query all array elements regardless of position - Working with variable-length arrays where position doesn't matter - Requires complex array aggregations - Performance is critical and you need MongoDB's optimized array operators

Getting Started

To start using SQL syntax for MongoDB array operations, you can use tools that translate SQL to MongoDB queries. The key is having a system that understands both SQL array syntax and MongoDB's document structure.

For more information about working with nested document structures in SQL, check out our guide on working with nested fields which complements array operations perfectly.

Conclusion

MongoDB arrays don't have to be intimidating. With SQL syntax, you can leverage familiar patterns to query and manipulate array data effectively. This approach bridges the gap between SQL knowledge and MongoDB's document model, making your database operations more intuitive and maintainable.

Whether you're building e-commerce platforms, content management systems, or analytics dashboards, SQL-style array operations can simplify your MongoDB development workflow while keeping your queries readable and maintainable.

QueryLeaf Integration: QueryLeaf automatically translates SQL array syntax into MongoDB's native array operators, making complex array operations accessible through familiar SQL patterns. Array indexing, element filtering, and nested array queries are seamlessly handled through standard SQL syntax, enabling developers to work with MongoDB arrays using the SQL knowledge they already possess without learning MongoDB's aggregation pipeline syntax.

The combination of SQL's clarity with MongoDB's flexibility gives you the best of both worlds – familiar syntax with powerful document database capabilities.

MongoDB Full-Text Search for Intelligent Content Discovery: Advanced Text Indexing, Ranking, and Relevance Scoring with SQL-Compatible Operations

Modern applications require sophisticated full-text search capabilities to enable users to discover relevant content across large document collections, knowledge bases, and content management systems. Traditional database text search approaches often provide limited functionality, poor performance with large datasets, and insufficient relevance ranking mechanisms that fail to deliver the intelligent search experiences users expect.

MongoDB Full-Text Search provides native support for intelligent text indexing, advanced relevance scoring, and high-performance text queries with comprehensive linguistic features including stemming, stop word filtering, and multi-language support. Unlike basic SQL LIKE operations or external search engine integrations that require complex infrastructure management, MongoDB's Full-Text Search delivers enterprise-grade search capabilities while maintaining unified data access patterns and transactional consistency.

The Traditional Text Search Challenge

Building effective text search with conventional database approaches creates significant performance and functionality limitations:

-- Traditional PostgreSQL text search - limited functionality and performance issues

-- Basic text table with conventional indexing
CREATE TABLE documents (
    document_id BIGSERIAL PRIMARY KEY,
    title VARCHAR(500) NOT NULL,
    content TEXT NOT NULL,
    author VARCHAR(200),
    category VARCHAR(100),
    tags TEXT[],
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Metadata for search optimization
    document_language VARCHAR(10) DEFAULT 'en',
    content_type VARCHAR(50),
    word_count INTEGER,

    -- Search optimization fields
    search_vector TSVECTOR,
    title_search_vector TSVECTOR
);

-- Traditional full-text indexes (limited configuration options)
CREATE INDEX idx_documents_search_vector ON documents USING GIN(search_vector);
CREATE INDEX idx_documents_title_search ON documents USING GIN(title_search_vector);
CREATE INDEX idx_documents_category_search ON documents (category, search_vector) USING GIN;

-- Triggers for maintaining search vectors
CREATE OR REPLACE FUNCTION update_document_search_vector()
RETURNS TRIGGER AS $$
BEGIN
    NEW.search_vector := 
        setweight(to_tsvector(coalesce(NEW.title, '')), 'A') ||
        setweight(to_tsvector(coalesce(NEW.content, '')), 'B') ||
        setweight(to_tsvector(coalesce(array_to_string(NEW.tags, ' '), '')), 'C');

    NEW.title_search_vector := to_tsvector(coalesce(NEW.title, ''));

    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER tr_update_search_vector
    BEFORE INSERT OR UPDATE ON documents
    FOR EACH ROW EXECUTE FUNCTION update_document_search_vector();

-- Complex text search query with limited ranking capabilities
WITH search_query AS (
    SELECT 
        to_tsquery($1) as query_vector,  -- User search query
        $1 as original_query
),

basic_text_search AS (
    SELECT 
        d.document_id,
        d.title,
        d.content,
        d.author,
        d.category,
        d.tags,
        d.created_at,
        d.word_count,
        sq.original_query,

        -- Basic relevance scoring (limited functionality)
        ts_rank(d.search_vector, sq.query_vector) as basic_rank,
        ts_rank_cd(d.search_vector, sq.query_vector) as cover_density_rank,

        -- Title match scoring
        ts_rank(d.title_search_vector, sq.query_vector) as title_rank,

        -- Query matching analysis
        ts_headline(d.content, sq.query_vector, 'MaxWords=30, MinWords=5') as content_highlight,
        ts_headline(d.title, sq.query_vector) as title_highlight,

        -- Manual relevance factors (limited sophistication)
        CASE 
            WHEN d.category = $2 THEN 0.2  -- Category boost parameter
            ELSE 0.0 
        END as category_boost,

        CASE 
            WHEN d.created_at >= CURRENT_TIMESTAMP - INTERVAL '30 days' THEN 0.1
            WHEN d.created_at >= CURRENT_TIMESTAMP - INTERVAL '90 days' THEN 0.05
            ELSE 0.0
        END as recency_boost,

        -- Tag matching (manual implementation)
        CASE 
            WHEN d.tags && string_to_array($3, ',') THEN 0.1  -- Tag filter parameter
            ELSE 0.0
        END as tag_boost

    FROM documents d
    CROSS JOIN search_query sq
    WHERE d.search_vector @@ sq.query_vector
),

enhanced_ranking AS (
    SELECT 
        bts.*,

        -- Combined relevance scoring (manual calculation)
        (
            basic_rank * 0.4 +                    -- Primary text relevance
            title_rank * 0.3 +                    -- Title match importance
            category_boost +                      -- Category relevance
            recency_boost +                       -- Time decay factor
            tag_boost +                          -- Tag matching

            -- Word count normalization (rough approximation)
            CASE 
                WHEN word_count BETWEEN 300 AND 2000 THEN 0.1
                WHEN word_count BETWEEN 100 AND 300 THEN 0.05
                ELSE 0.0
            END
        ) as composite_relevance_score,

        -- Search quality indicators (limited analysis)
        CASE 
            WHEN basic_rank > 0.1 AND title_rank > 0.1 THEN 'high_relevance'
            WHEN basic_rank > 0.05 THEN 'medium_relevance'
            ELSE 'low_relevance'
        END as relevance_category,

        -- Content analysis (basic)
        LENGTH(content) as content_length,
        ROUND((LENGTH(content) / NULLIF(word_count, 0))::numeric, 1) as avg_word_length

    FROM basic_text_search bts
),

search_results AS (
    SELECT 
        er.*,

        -- Result ranking and grouping
        ROW_NUMBER() OVER (ORDER BY composite_relevance_score DESC) as search_rank,

        -- Diversity scoring (limited implementation)
        ROW_NUMBER() OVER (
            PARTITION BY category 
            ORDER BY composite_relevance_score DESC
        ) as category_rank,

        -- Search metadata
        CURRENT_TIMESTAMP as search_performed_at,
        original_query as search_query

    FROM enhanced_ranking er
    WHERE composite_relevance_score > 0.01  -- Minimum relevance threshold
)

SELECT 
    document_id,
    title,
    CASE 
        WHEN LENGTH(content) > 300 THEN LEFT(content, 300) || '...'
        ELSE content
    END as content_preview,
    author,
    category,
    tags,
    TO_CHAR(created_at, 'YYYY-MM-DD') as published_date,

    -- Relevance and ranking metrics
    ROUND(composite_relevance_score::numeric, 4) as relevance_score,
    search_rank,
    relevance_category,

    -- Search highlights (limited formatting)
    title_highlight,
    content_highlight,

    -- Content characteristics
    word_count,
    content_length,

    -- Quality indicators
    CASE 
        WHEN word_count >= 500 AND composite_relevance_score > 0.1 THEN 'comprehensive_match'
        WHEN title_rank > 0.2 THEN 'title_focused_match'
        WHEN basic_rank > 0.1 THEN 'content_focused_match'
        ELSE 'partial_match'
    END as match_quality,

    -- Search context
    search_performed_at,
    search_query

FROM search_results
WHERE 
    -- Result filtering and diversity constraints
    search_rank <= 50                           -- Limit total results
    AND category_rank <= 5                      -- Max per category for diversity
    AND composite_relevance_score >= 0.02       -- Quality threshold

ORDER BY composite_relevance_score DESC, search_rank
LIMIT 20;

-- Problems with traditional text search approaches:
-- 1. Limited linguistic processing and language-specific features
-- 2. Complex manual relevance scoring with poor ranking algorithms
-- 3. Expensive full-text index maintenance and update overhead
-- 4. Limited support for fuzzy matching and typo tolerance
-- 5. Poor performance with large document collections and complex queries
-- 6. No built-in support for search analytics and query optimization
-- 7. Difficult integration of business logic into ranking algorithms
-- 8. Limited faceted search and filtering capabilities
-- 9. Complex infrastructure required for distributed search scenarios
-- 10. Manual implementation of search features like auto-complete and suggestions

MongoDB Full-Text Search provides native intelligent text search capabilities:

// MongoDB Full-Text Search - native intelligent content discovery with advanced ranking
const { MongoClient } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('content_management_platform');

// Advanced Full-Text Search Management System
class MongoFullTextSearchManager {
  constructor(db, searchConfig = {}) {
    this.db = db;
    this.config = {
      // Text search configuration
      enableFullTextSearch: searchConfig.enableFullTextSearch !== false,
      defaultLanguage: searchConfig.defaultLanguage || 'en',
      supportedLanguages: searchConfig.supportedLanguages || ['en', 'es', 'fr', 'de'],

      // Index configuration
      textIndexWeights: searchConfig.textIndexWeights || {
        title: 10,
        content: 5,
        tags: 8,
        category: 3,
        description: 6
      },

      // Search optimization
      enableFuzzyMatching: searchConfig.enableFuzzyMatching !== false,
      enableAutoComplete: searchConfig.enableAutoComplete !== false,
      defaultSearchLimit: searchConfig.defaultSearchLimit || 20,
      maxSearchResults: searchConfig.maxSearchResults || 100,

      // Relevance scoring
      enableAdvancedScoring: searchConfig.enableAdvancedScoring !== false,
      scoringWeights: searchConfig.scoringWeights || {
        textScore: 0.4,
        titleBoost: 0.3,
        recencyFactor: 0.1,
        popularityBoost: 0.1,
        categoryBoost: 0.1
      },

      // Performance optimization
      enableSearchCache: searchConfig.enableSearchCache !== false,
      enableSearchAnalytics: searchConfig.enableSearchAnalytics !== false,

      ...searchConfig
    };

    // Collection references
    this.collections = {
      documents: db.collection('documents'),
      searchQueries: db.collection('search_queries'),
      searchAnalytics: db.collection('search_analytics'),
      searchSuggestions: db.collection('search_suggestions')
    };

    this.initializeFullTextSearch();
  }

  async initializeFullTextSearch() {
    console.log('Initializing MongoDB Full-Text Search system...');

    try {
      // Create optimized text indexes
      await this.createTextIndexes();

      // Setup search analytics infrastructure
      await this.setupSearchAnalytics();

      // Initialize auto-complete and suggestions
      await this.setupSearchSuggestions();

      console.log('Full-Text Search system initialized successfully');

    } catch (error) {
      console.error('Error initializing full-text search:', error);
      throw error;
    }
  }

  async createTextIndexes() {
    console.log('Creating optimized full-text search indexes...');

    const documentsCollection = this.collections.documents;

    try {
      // Comprehensive text index with weighted fields
      await documentsCollection.createIndex(
        {
          title: 'text',
          content: 'text',
          tags: 'text',
          category: 'text',
          description: 'text',
          author: 'text'
        },
        {
          name: 'comprehensive_text_index',
          weights: this.config.textIndexWeights,
          default_language: this.config.defaultLanguage,
          language_override: 'language',
          background: true
        }
      );

      // Category-specific text index for targeted searches
      await documentsCollection.createIndex(
        {
          category: 1,
          title: 'text',
          content: 'text'
        },
        {
          name: 'category_text_index',
          weights: {
            title: 10,
            content: 5
          },
          background: true
        }
      );

      // Supporting indexes for search optimization
      await documentsCollection.createIndexes([
        {
          key: { category: 1, createdAt: -1 },
          name: 'category_date_index',
          background: true
        },
        {
          key: { author: 1, createdAt: -1 },
          name: 'author_date_index', 
          background: true
        },
        {
          key: { tags: 1, createdAt: -1 },
          name: 'tags_date_index',
          background: true
        },
        {
          key: { 'analytics.viewCount': -1 },
          name: 'popularity_index',
          background: true
        }
      ]);

      console.log('✅ Full-text search indexes created successfully');

    } catch (error) {
      console.error('Error creating text indexes:', error);
      throw error;
    }
  }

  async performIntelligentTextSearch(searchQuery, options = {}) {
    console.log(`Performing intelligent text search for: "${searchQuery}"`);

    const searchStartTime = Date.now();

    try {
      const searchConfig = {
        query: searchQuery,
        language: options.language || this.config.defaultLanguage,
        categoryFilter: options.categoryFilter,
        authorFilter: options.authorFilter,
        dateRange: options.dateRange,
        tagFilter: options.tagFilter,
        limit: Math.min(options.limit || this.config.defaultSearchLimit, this.config.maxSearchResults),
        enableFuzzy: options.enableFuzzy !== false,
        sortBy: options.sortBy || 'relevance',
        includeHighlights: options.includeHighlights !== false,
        enableFacets: options.enableFacets !== false
      };

      // Build comprehensive search aggregation pipeline
      const searchPipeline = [
        // Stage 1: Text search with scoring
        {
          $match: {
            $text: {
              $search: searchConfig.query,
              $language: searchConfig.language,
              ...(searchConfig.enableFuzzy && { 
                $caseSensitive: false,
                $diacriticSensitive: false 
              })
            },

            // Apply filters
            ...(searchConfig.categoryFilter && { 
              category: { $in: Array.isArray(searchConfig.categoryFilter) ? searchConfig.categoryFilter : [searchConfig.categoryFilter] }
            }),
            ...(searchConfig.authorFilter && { author: searchConfig.authorFilter }),
            ...(searchConfig.dateRange && {
              createdAt: {
                $gte: new Date(searchConfig.dateRange.start),
                $lte: new Date(searchConfig.dateRange.end)
              }
            }),
            ...(searchConfig.tagFilter && { 
              tags: { $in: Array.isArray(searchConfig.tagFilter) ? searchConfig.tagFilter : [searchConfig.tagFilter] }
            }),

            // Quality filters
            isPublished: true,
            isActive: { $ne: false }
          }
        },

        // Stage 2: Add text score and metadata
        {
          $addFields: {
            textScore: { $meta: 'textScore' },
            searchMetadata: {
              query: searchConfig.query,
              searchTime: new Date(),
              language: searchConfig.language
            }
          }
        },

        // Stage 3: Enhanced relevance scoring
        {
          $addFields: {
            // Advanced relevance calculation
            relevanceScore: {
              $add: [
                // Primary text score component
                {
                  $multiply: [
                    { $meta: 'textScore' },
                    this.config.scoringWeights.textScore
                  ]
                },

                // Title boost (if query terms appear in title)
                {
                  $cond: [
                    {
                      $regexMatch: {
                        input: { $toLower: '$title' },
                        regex: { $toLower: searchConfig.query },
                        options: 'i'
                      }
                    },
                    this.config.scoringWeights.titleBoost,
                    0
                  ]
                },

                // Recency factor (newer content gets boost)
                {
                  $multiply: [
                    {
                      $max: [
                        0,
                        {
                          $subtract: [
                            1,
                            {
                              $divide: [
                                { $subtract: [new Date(), '$createdAt'] },
                                365 * 24 * 60 * 60 * 1000  // 1 year in milliseconds
                              ]
                            }
                          ]
                        }
                      ]
                    },
                    this.config.scoringWeights.recencyFactor
                  ]
                },

                // Popularity boost (view count factor)
                {
                  $multiply: [
                    {
                      $min: [
                        1,
                        {
                          $divide: [
                            { $ifNull: ['$analytics.viewCount', 0] },
                            1000
                          ]
                        }
                      ]
                    },
                    this.config.scoringWeights.popularityBoost
                  ]
                },

                // Category match boost
                {
                  $cond: [
                    { $in: ['$category', searchConfig.categoryFilter || []] },
                    this.config.scoringWeights.categoryBoost,
                    0
                  ]
                }
              ]
            },

            // Content quality indicators
            contentQuality: {
              $switch: {
                branches: [
                  {
                    case: { 
                      $and: [
                        { $gte: ['$wordCount', 1000] },
                        { $gte: [{ $ifNull: ['$analytics.averageRating', 0] }, 4] }
                      ]
                    },
                    then: 'high_quality'
                  },
                  {
                    case: {
                      $and: [
                        { $gte: ['$wordCount', 300] },
                        { $gte: [{ $ifNull: ['$analytics.averageRating', 0] }, 3] }
                      ]
                    },
                    then: 'good_quality'
                  },
                  {
                    case: { $gte: ['$wordCount', 100] },
                    then: 'basic_quality'
                  }
                ],
                default: 'short_content'
              }
            },

            // Search match type classification
            matchType: {
              $switch: {
                branches: [
                  {
                    case: {
                      $regexMatch: {
                        input: { $toLower: '$title' },
                        regex: { $toLower: searchConfig.query },
                        options: 'i'
                      }
                    },
                    then: 'title_match'
                  },
                  {
                    case: { $gte: [{ $meta: 'textScore' }, 2.0] },
                    then: 'strong_content_match'
                  },
                  {
                    case: { $gte: [{ $meta: 'textScore' }, 1.0] },
                    then: 'good_content_match'
                  }
                ],
                default: 'weak_content_match'
              }
            }
          }
        },

        // Stage 4: Content analysis and enrichment
        {
          $lookup: {
            from: 'user_interactions',
            let: { docId: '$_id' },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $eq: ['$documentId', '$$docId'] },
                      { $gte: ['$interactionDate', { $subtract: [new Date(), 30 * 24 * 60 * 60 * 1000] }] }
                    ]
                  }
                }
              },
              {
                $group: {
                  _id: null,
                  recentViews: { $sum: 1 },
                  averageRating: { $avg: '$rating' },
                  uniqueUsers: { $addToSet: '$userId' }
                }
              }
            ],
            as: 'recentEngagement'
          }
        },

        // Stage 5: Related content discovery
        {
          $lookup: {
            from: 'documents',
            let: {
              currentCategory: '$category',
              currentTags: '$tags',
              currentId: '$_id'
            },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $ne: ['$_id', '$$currentId'] },
                      { $eq: ['$isPublished', true] },
                      {
                        $or: [
                          { $eq: ['$category', '$$currentCategory'] },
                          { $gt: [{ $size: { $setIntersection: ['$tags', '$$currentTags'] } }, 0] }
                        ]
                      }
                    ]
                  }
                }
              },
              {
                $sample: { size: 3 }
              },
              {
                $project: {
                  _id: 1,
                  title: 1,
                  category: 1
                }
              }
            ],
            as: 'relatedContent'
          }
        },

        // Stage 6: Generate search highlights
        ...(searchConfig.includeHighlights ? [
          {
            $addFields: {
              searchHighlights: {
                title: {
                  $regexFind: {
                    input: '$title',
                    regex: searchConfig.query,
                    options: 'i'
                  }
                },
                contentSnippet: {
                  $let: {
                    vars: {
                      contentLower: { $toLower: '$content' },
                      queryLower: { $toLower: searchConfig.query }
                    },
                    in: {
                      $cond: [
                        { $gt: [{ $indexOfCP: ['$$contentLower', '$$queryLower'] }, -1] },
                        {
                          $let: {
                            vars: {
                              matchIndex: { $indexOfCP: ['$$contentLower', '$$queryLower'] },
                              snippetStart: {
                                $max: [
                                  0,
                                  { $subtract: [{ $indexOfCP: ['$$contentLower', '$$queryLower'] }, 50] }
                                ]
                              }
                            },
                            in: {
                              $concat: [
                                { $cond: [{ $gt: ['$$snippetStart', 0] }, '...', ''] },
                                { $substrCP: ['$content', '$$snippetStart', 200] },
                                '...'
                              ]
                            }
                          }
                        },
                        { $substrCP: ['$content', 0, 200] }
                      ]
                    }
                  }
                }
              }
            }
          }
        ] : []),

        // Stage 7: Final enrichment and formatting
        {
          $addFields: {
            // Engagement metrics
            engagementMetrics: {
              $cond: [
                { $gt: [{ $size: '$recentEngagement' }, 0] },
                {
                  recentViews: { $arrayElemAt: ['$recentEngagement.recentViews', 0] },
                  averageRating: { $arrayElemAt: ['$recentEngagement.averageRating', 0] },
                  uniqueUsers: { $size: { $arrayElemAt: ['$recentEngagement.uniqueUsers', 0] } }
                },
                {
                  recentViews: 0,
                  averageRating: null,
                  uniqueUsers: 0
                }
              ]
            },

            // Search result metadata
            searchResultMetadata: {
              rank: 0,  // Will be set in sort stage
              resultType: 'standard',
              searchAlgorithm: 'mongodb_text_search_v1'
            }
          }
        },

        // Stage 8: Final projection and cleanup
        {
          $project: {
            // Core document information
            title: 1,
            author: 1,
            category: 1,
            tags: 1,
            createdAt: 1,
            updatedAt: 1,
            language: 1,

            // Content preview
            contentPreview: {
              $cond: [
                searchConfig.includeHighlights,
                '$searchHighlights.contentSnippet',
                { $substrCP: ['$content', 0, 200] }
              ]
            },

            // Search relevance metrics
            textScore: { $round: ['$textScore', 4] },
            relevanceScore: { $round: ['$relevanceScore', 4] },
            matchType: 1,
            contentQuality: 1,

            // Content metrics
            wordCount: 1,

            // Engagement and quality indicators
            engagementMetrics: 1,

            // Related content
            relatedContent: 1,

            // Search highlights
            ...(searchConfig.includeHighlights && { searchHighlights: 1 }),

            // Metadata
            searchMetadata: 1,
            searchResultMetadata: 1
          }
        },

        // Stage 9: Sorting based on configuration
        {
          $sort: {
            ...(searchConfig.sortBy === 'relevance' && { relevanceScore: -1, textScore: -1 }),
            ...(searchConfig.sortBy === 'date' && { createdAt: -1 }),
            ...(searchConfig.sortBy === 'popularity' && { 'analytics.viewCount': -1 }),
            ...(searchConfig.sortBy === 'rating' && { 'analytics.averageRating': -1 })
          }
        },

        // Stage 10: Add search ranking
        {
          $addFields: {
            'searchResultMetadata.rank': {
              $add: [{ $indexOfArray: [{ $map: { input: { $range: [0, searchConfig.limit] }, as: 'i', in: '$$i' } }, { $indexOfArray: [{ $map: { input: { $range: [0, searchConfig.limit] }, as: 'i', in: '$$i' } }, 0] }] }, 1]
            }
          }
        },

        // Stage 11: Limit results
        {
          $limit: searchConfig.limit
        }
      ];

      // Execute search pipeline
      const searchResults = await this.collections.documents
        .aggregate(searchPipeline, {
          allowDiskUse: true,
          maxTimeMS: 30000
        })
        .toArray();

      const searchLatency = Date.now() - searchStartTime;

      // Generate search facets if requested
      let facets = null;
      if (searchConfig.enableFacets) {
        facets = await this.generateSearchFacets(searchQuery, searchConfig);
      }

      // Log search analytics
      await this.logSearchAnalytics({
        query: searchConfig.query,
        resultsCount: searchResults.length,
        searchLatency: searchLatency,
        searchConfig: searchConfig,
        timestamp: new Date()
      });

      console.log(`✅ Text search completed: ${searchResults.length} results in ${searchLatency}ms`);

      return {
        success: true,
        query: searchConfig.query,
        results: searchResults,
        facets: facets,
        searchMetadata: {
          latency: searchLatency,
          resultsCount: searchResults.length,
          language: searchConfig.language,
          algorithm: 'mongodb_text_search_v1'
        }
      };

    } catch (error) {
      console.error('Error performing text search:', error);
      const searchLatency = Date.now() - searchStartTime;

      return {
        success: false,
        error: error.message,
        searchMetadata: {
          latency: searchLatency,
          resultsCount: 0
        }
      };
    }
  }

  async generateSearchFacets(searchQuery, searchConfig) {
    console.log('Generating search facets...');

    try {
      const facetPipeline = [
        // Match documents that would appear in search results
        {
          $match: {
            $text: {
              $search: searchQuery,
              $language: searchConfig.language
            },
            isPublished: true,
            isActive: { $ne: false }
          }
        },

        // Generate facet aggregations
        {
          $facet: {
            // Category distribution
            categories: [
              {
                $group: {
                  _id: '$category',
                  count: { $sum: 1 },
                  averageRelevance: { $avg: { $meta: 'textScore' } }
                }
              },
              {
                $sort: { count: -1 }
              },
              {
                $limit: 10
              }
            ],

            // Author distribution
            authors: [
              {
                $group: {
                  _id: '$author',
                  count: { $sum: 1 },
                  latestDocument: { $max: '$createdAt' }
                }
              },
              {
                $sort: { count: -1 }
              },
              {
                $limit: 10
              }
            ],

            // Tag distribution
            tags: [
              {
                $unwind: '$tags'
              },
              {
                $group: {
                  _id: '$tags',
                  count: { $sum: 1 }
                }
              },
              {
                $sort: { count: -1 }
              },
              {
                $limit: 15
              }
            ],

            // Date range distribution
            dateRanges: [
              {
                $group: {
                  _id: {
                    $switch: {
                      branches: [
                        {
                          case: { $gte: ['$createdAt', { $subtract: [new Date(), 7 * 24 * 60 * 60 * 1000] }] },
                          then: 'last_week'
                        },
                        {
                          case: { $gte: ['$createdAt', { $subtract: [new Date(), 30 * 24 * 60 * 60 * 1000] }] },
                          then: 'last_month'
                        },
                        {
                          case: { $gte: ['$createdAt', { $subtract: [new Date(), 90 * 24 * 60 * 60 * 1000] }] },
                          then: 'last_quarter'
                        },
                        {
                          case: { $gte: ['$createdAt', { $subtract: [new Date(), 365 * 24 * 60 * 60 * 1000] }] },
                          then: 'last_year'
                        }
                      ],
                      default: 'older'
                    }
                  },
                  count: { $sum: 1 }
                }
              }
            ],

            // Content quality distribution
            contentQuality: [
              {
                $group: {
                  _id: {
                    $switch: {
                      branches: [
                        {
                          case: { $gte: ['$wordCount', 1000] },
                          then: 'comprehensive'
                        },
                        {
                          case: { $gte: ['$wordCount', 500] },
                          then: 'detailed'
                        },
                        {
                          case: { $gte: ['$wordCount', 200] },
                          then: 'standard'
                        }
                      ],
                      default: 'brief'
                    }
                  },
                  count: { $sum: 1 }
                }
              }
            ]
          }
        }
      ];

      const facetResults = await this.collections.documents
        .aggregate(facetPipeline)
        .toArray();

      return facetResults[0];

    } catch (error) {
      console.error('Error generating search facets:', error);
      return null;
    }
  }

  async generateAutoCompleteData(query, limit = 10) {
    console.log(`Generating auto-complete suggestions for: "${query}"`);

    try {
      // Use text search with partial matching for auto-complete
      const autoCompletePipeline = [
        {
          $match: {
            $or: [
              { title: { $regex: query, $options: 'i' } },
              { tags: { $regex: query, $options: 'i' } },
              { category: { $regex: query, $options: 'i' } }
            ],
            isPublished: true
          }
        },
        {
          $group: {
            _id: null,
            titleSuggestions: {
              $addToSet: {
                $cond: [
                  { $regexMatch: { input: '$title', regex: query, options: 'i' } },
                  '$title',
                  null
                ]
              }
            },
            tagSuggestions: { $addToSet: '$tags' },
            categorySuggestions: { $addToSet: '$category' }
          }
        },
        {
          $project: {
            suggestions: {
              $setUnion: [
                { $filter: { input: '$titleSuggestions', cond: { $ne: ['$$this', null] } } },
                { $reduce: {
                  input: '$tagSuggestions',
                  initialValue: [],
                  in: { $concatArrays: ['$$value', '$$this'] }
                }},
                '$categorySuggestions'
              ]
            }
          }
        },
        {
          $unwind: '$suggestions'
        },
        {
          $match: {
            suggestions: { $regex: query, $options: 'i' }
          }
        },
        {
          $group: {
            _id: '$suggestions',
            relevance: { $sum: 1 }
          }
        },
        {
          $sort: { relevance: -1 }
        },
        {
          $limit: limit
        }
      ];

      const autoCompleteResults = await this.collections.documents
        .aggregate(autoCompletePipeline)
        .toArray();

      return {
        suggestions: autoCompleteResults.map(result => ({
          text: result._id,
          relevance: result.relevance
        }))
      };

    } catch (error) {
      console.error('Error generating auto-complete suggestions:', error);
      return { suggestions: [] };
    }
  }

  async setupSearchAnalytics() {
    console.log('Setting up search analytics infrastructure...');

    try {
      // Create indexes for search analytics
      await this.collections.searchAnalytics.createIndexes([
        {
          key: { timestamp: -1 },
          name: 'timestamp_index',
          background: true
        },
        {
          key: { 'query': 1, timestamp: -1 },
          name: 'query_time_index',
          background: true
        },
        {
          key: { resultsCount: 1, searchLatency: 1 },
          name: 'performance_index',
          background: true
        }
      ]);

      console.log('✅ Search analytics infrastructure setup completed');

    } catch (error) {
      console.error('Error setting up search analytics:', error);
      throw error;
    }
  }

  async setupSearchSuggestions() {
    console.log('Setting up search suggestions system...');

    try {
      // Create collection for search suggestions with text index
      await this.collections.searchSuggestions.createIndex(
        { suggestion: 'text' },
        {
          name: 'suggestion_text_index',
          background: true
        }
      );

      console.log('✅ Search suggestions system setup completed');

    } catch (error) {
      console.error('Error setting up search suggestions:', error);
    }
  }

  async logSearchAnalytics(searchData) {
    try {
      await this.collections.searchAnalytics.insertOne({
        ...searchData,
        createdAt: new Date()
      });
    } catch (error) {
      console.error('Error logging search analytics:', error);
    }
  }

  async getSearchPerformanceReport(timeRange = '24h') {
    console.log('Generating search performance report...');

    try {
      const timeRangeMs = this.parseTimeRange(timeRange);
      const startTime = new Date(Date.now() - timeRangeMs);

      const performanceReport = await this.collections.searchAnalytics.aggregate([
        {
          $match: {
            timestamp: { $gte: startTime }
          }
        },
        {
          $group: {
            _id: null,
            totalSearches: { $sum: 1 },
            averageLatency: { $avg: '$searchLatency' },
            medianLatency: { $percentile: { input: '$searchLatency', p: [0.5] } },
            p95Latency: { $percentile: { input: '$searchLatency', p: [0.95] } },
            averageResultsCount: { $avg: '$resultsCount' },
            uniqueQueries: { $addToSet: '$query' },
            zeroResultQueries: {
              $sum: {
                $cond: [{ $eq: ['$resultsCount', 0] }, 1, 0]
              }
            }
          }
        },
        {
          $project: {
            totalSearches: 1,
            averageLatency: { $round: ['$averageLatency', 2] },
            medianLatency: { $round: [{ $arrayElemAt: ['$medianLatency', 0] }, 2] },
            p95Latency: { $round: [{ $arrayElemAt: ['$p95Latency', 0] }, 2] },
            averageResultsCount: { $round: ['$averageResultsCount', 1] },
            uniqueQueryCount: { $size: '$uniqueQueries' },
            zeroResultRate: {
              $round: [
                { $multiply: [{ $divide: ['$zeroResultQueries', '$totalSearches'] }, 100] },
                2
              ]
            }
          }
        }
      ]).toArray();

      return performanceReport[0] || {};

    } catch (error) {
      console.error('Error generating search performance report:', error);
      return {};
    }
  }

  parseTimeRange(timeRange) {
    const timeMap = {
      '1h': 60 * 60 * 1000,
      '6h': 6 * 60 * 60 * 1000,
      '24h': 24 * 60 * 60 * 1000,
      '7d': 7 * 24 * 60 * 60 * 1000,
      '30d': 30 * 24 * 60 * 60 * 1000
    };
    return timeMap[timeRange] || timeMap['24h'];
  }

  async shutdown() {
    console.log('Shutting down Full-Text Search manager...');

    try {
      if (this.client) {
        await this.client.close();
      }
      console.log('Full-Text Search manager shutdown completed');
    } catch (error) {
      console.error('Error during shutdown:', error);
    }
  }
}

// Benefits of MongoDB Full-Text Search:
// - Native text indexing with advanced linguistic processing and language support
// - Intelligent relevance scoring with customizable ranking algorithms
// - High-performance text queries with automatic optimization and caching
// - Advanced search features including fuzzy matching, auto-complete, and faceted search
// - Seamless integration with existing MongoDB data and operations
// - Real-time search analytics and query performance monitoring
// - Flexible scoring mechanisms incorporating business logic and user behavior
// - Multi-language support with language-specific stemming and stop word processing
// - SQL-compatible text search operations through QueryLeaf integration
// - Enterprise-ready scalability with distributed search capabilities

module.exports = {
  MongoFullTextSearchManager
};

Understanding MongoDB Full-Text Search Architecture

Advanced Search Patterns for Content Management

MongoDB Full-Text Search enables sophisticated content discovery patterns for modern applications:

// Enterprise Content Discovery Platform with Advanced Search Capabilities
class EnterpriseContentSearchPlatform extends MongoFullTextSearchManager {
  constructor(db, enterpriseConfig) {
    super(db, enterpriseConfig);

    this.enterpriseConfig = {
      ...enterpriseConfig,
      enableSemanticSearch: true,
      enablePersonalization: true,
      enableSearchInsights: true,
      enableContentRecommendations: true
    };

    this.setupEnterpriseSearchCapabilities();
  }

  async implementAdvancedSearchPatterns() {
    console.log('Implementing enterprise search patterns...');

    const searchPatterns = {
      // Semantic search enhancement
      semanticSearchEnhancement: {
        enableConceptualMatching: true,
        enableEntityRecognition: true,
        enableTopicModeling: true,
        enableContextualSearch: true
      },

      // Personalized search results
      personalizedSearch: {
        userProfileIntegration: true,
        behaviorBasedRanking: true,
        preferenceBasedFiltering: true,
        collaborativeFiltering: true
      },

      // Content recommendation engine
      contentRecommendations: {
        similarContentDiscovery: true,
        trendingContentIdentification: true,
        relatedTopicSuggestions: true,
        expertiseBasedRecommendations: true
      },

      // Search quality optimization
      searchQualityOptimization: {
        queryUnderstandingEnhancement: true,
        resultDiversification: true,
        relevanceFeedbackIntegration: true,
        searchResultOptimization: true
      }
    };

    return await this.deployEnterpriseSearchPatterns(searchPatterns);
  }

  async implementPersonalizedSearch(userId, searchQuery, searchOptions = {}) {
    console.log(`Implementing personalized search for user: ${userId}`);

    // Get user search profile and preferences
    const userProfile = await this.getUserSearchProfile(userId);

    // Enhance search with personalization
    const personalizedSearchOptions = {
      ...searchOptions,
      userProfile: userProfile,
      personalizedScoring: true,
      contentPreferences: userProfile.contentPreferences,
      expertiseLevel: userProfile.expertiseLevel,
      preferredAuthors: userProfile.preferredAuthors,
      preferredCategories: userProfile.preferredCategories
    };

    // Execute personalized search
    const personalizedResults = await this.performIntelligentTextSearch(
      searchQuery, 
      personalizedSearchOptions
    );

    // Update user search behavior
    await this.updateUserSearchBehavior(userId, searchQuery, personalizedResults);

    return personalizedResults;
  }

  async getUserSearchProfile(userId) {
    // Aggregate user search behavior and preferences
    const userProfilePipeline = [
      {
        $match: {
          userId: userId,
          timestamp: {
            $gte: new Date(Date.now() - 90 * 24 * 60 * 60 * 1000) // Last 90 days
          }
        }
      },
      {
        $group: {
          _id: '$userId',
          searchQueries: { $push: '$query' },
          clickedResults: { $push: '$clickedDocuments' },
          preferredCategories: { $push: '$categoryInteractions' },
          searchPatterns: { $push: '$searchMetadata' }
        }
      }
    ];

    const profileData = await this.collections.searchAnalytics
      .aggregate(userProfilePipeline)
      .toArray();

    return this.buildUserSearchProfile(profileData[0] || {});
  }

  buildUserSearchProfile(rawProfileData) {
    return {
      contentPreferences: this.extractContentPreferences(rawProfileData),
      expertiseLevel: this.assessExpertiseLevel(rawProfileData),
      preferredAuthors: this.identifyPreferredAuthors(rawProfileData),
      preferredCategories: this.identifyPreferredCategories(rawProfileData),
      searchBehaviorPatterns: this.analyzeSearchPatterns(rawProfileData)
    };
  }
}

SQL-Style Full-Text Search Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB Full-Text Search operations:

-- QueryLeaf full-text search operations with SQL-familiar syntax

-- Create full-text searchable table
CREATE TABLE content_documents (
    document_id UUID PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    description TEXT,
    author VARCHAR(200),
    category VARCHAR(100),
    tags TEXT[],
    language VARCHAR(10) DEFAULT 'en',

    -- Content metadata
    word_count INTEGER,
    reading_time_minutes INTEGER,
    content_type VARCHAR(50),

    -- Publishing information
    published_date DATE,
    is_published BOOLEAN DEFAULT false,
    is_featured BOOLEAN DEFAULT false,

    -- Analytics
    view_count BIGINT DEFAULT 0,
    like_count BIGINT DEFAULT 0,
    share_count BIGINT DEFAULT 0,
    average_rating DECIMAL(3,2),

    -- Timestamps
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

) WITH (
    -- MongoDB full-text search configuration
    text_indexes = [
        {
            name: 'comprehensive_text_search',
            fields: {
                title: 10,      -- Higher weight for title matches
                content: 5,     -- Standard weight for content
                description: 8, -- High weight for descriptions
                tags: 7,        -- High weight for tag matches
                category: 3,    -- Lower weight for category
                author: 2       -- Lowest weight for author name
            },
            language: 'en',
            language_override: 'language'
        },
        {
            name: 'category_focused_search',
            fields: {
                title: 'text',
                content: 'text'
            },
            filters: ['category']
        }
    ]
);

-- Advanced full-text search with relevance scoring
WITH intelligent_search_results AS (
    SELECT 
        document_id,
        title,
        content,
        description,
        author,
        category,
        tags,
        published_date,
        word_count,
        view_count,
        average_rating,

        -- MongoDB text search score
        TEXT_SEARCH_SCORE() as text_relevance_score,

        -- Enhanced relevance calculation
        (
            -- Base text relevance (40%)
            TEXT_SEARCH_SCORE() * 0.4 +

            -- Title match bonus (30%)
            CASE 
                WHEN LOWER(title) LIKE '%' || LOWER($1) || '%' THEN 0.3
                WHEN title ILIKE '%' || $1 || '%' THEN 0.2  -- Case insensitive partial match
                ELSE 0.0
            END +

            -- Recency factor (10%)
            CASE 
                WHEN published_date >= CURRENT_DATE - INTERVAL '30 days' THEN 0.1
                WHEN published_date >= CURRENT_DATE - INTERVAL '90 days' THEN 0.05
                ELSE 0.0
            END +

            -- Popularity boost (10%)
            LEAST(0.1, (view_count / 10000.0)) +

            -- Quality indicator (10%)
            CASE 
                WHEN average_rating >= 4.5 THEN 0.1
                WHEN average_rating >= 4.0 THEN 0.05
                WHEN average_rating >= 3.5 THEN 0.025
                ELSE 0.0
            END
        ) as enhanced_relevance_score,

        -- Content quality assessment
        CASE 
            WHEN word_count >= 1500 AND average_rating >= 4.0 THEN 'comprehensive_high_quality'
            WHEN word_count >= 800 AND average_rating >= 3.5 THEN 'detailed_good_quality'
            WHEN word_count >= 300 AND average_rating >= 3.0 THEN 'standard_quality'
            WHEN word_count >= 100 THEN 'brief_content'
            ELSE 'minimal_content'
        END as content_quality_tier,

        -- Match type classification
        CASE 
            WHEN LOWER(title) LIKE '%' || LOWER($1) || '%' THEN 'title_match'
            WHEN TEXT_SEARCH_SCORE() > 2.0 THEN 'strong_content_match'
            WHEN TEXT_SEARCH_SCORE() > 1.0 THEN 'good_content_match'
            ELSE 'weak_content_match'
        END as match_type

    FROM content_documents
    WHERE 
        -- Full-text search condition
        TEXT_SEARCH(title, content, description, tags, $1) -- $1 = search query

        -- Quality and availability filters
        AND is_published = true
        AND word_count >= 50  -- Minimum content length

        -- Optional category filter
        AND ($2 IS NULL OR category = $2)  -- $2 = optional category filter

        -- Optional date range filter  
        AND ($3 IS NULL OR published_date >= $3::DATE)  -- $3 = optional start date
        AND ($4 IS NULL OR published_date <= $4::DATE)  -- $4 = optional end date
),

search_with_highlights AS (
    SELECT 
        isr.*,

        -- Generate search highlights
        TEXT_HIGHLIGHT(title, $1, 'MaxWords=10') as title_highlight,
        TEXT_HIGHLIGHT(content, $1, 'MaxWords=30, MinWords=10') as content_highlight,
        TEXT_HIGHLIGHT(description, $1, 'MaxWords=20') as description_highlight,

        -- Content preview generation
        CASE 
            WHEN LENGTH(content) <= 200 THEN content
            WHEN POSITION(LOWER($1) IN LOWER(content)) > 0 THEN
                -- Extract snippet around search term
                SUBSTRING(
                    content, 
                    GREATEST(1, POSITION(LOWER($1) IN LOWER(content)) - 50), 
                    200
                ) || '...'
            ELSE 
                LEFT(content, 200) || '...'
        END as content_preview,

        -- Tag matching analysis
        ARRAY(
            SELECT tag 
            FROM UNNEST(tags) AS tag 
            WHERE tag ILIKE '%' || $1 || '%'
        ) as matching_tags,

        -- Related content scoring
        ARRAY_LENGTH(
            ARRAY(
                SELECT tag 
                FROM UNNEST(tags) AS tag 
                WHERE tag ILIKE '%' || $1 || '%'
            ),
            1
        ) as tag_match_count

    FROM intelligent_search_results isr
),

search_analytics AS (
    SELECT 
        swh.*,

        -- Search result ranking
        ROW_NUMBER() OVER (ORDER BY enhanced_relevance_score DESC, text_relevance_score DESC) as search_rank,

        -- Category diversity ranking
        ROW_NUMBER() OVER (PARTITION BY category ORDER BY enhanced_relevance_score DESC) as category_rank,

        -- Author diversity ranking  
        ROW_NUMBER() OVER (PARTITION BY author ORDER BY enhanced_relevance_score DESC) as author_rank,

        -- Quality tier ranking
        ROW_NUMBER() OVER (PARTITION BY content_quality_tier ORDER BY enhanced_relevance_score DESC) as quality_tier_rank,

        -- Engagement metrics
        (view_count + like_count * 2 + share_count * 3) as engagement_score,

        -- Search confidence scoring
        CASE 
            WHEN enhanced_relevance_score >= 1.5 THEN 'high_confidence'
            WHEN enhanced_relevance_score >= 0.8 THEN 'medium_confidence'
            WHEN enhanced_relevance_score >= 0.4 THEN 'low_confidence'
            ELSE 'very_low_confidence'
        END as search_confidence

    FROM search_with_highlights swh
)

SELECT 
    document_id,
    title,
    content_preview,
    description,
    author,
    category,
    tags,
    TO_CHAR(published_date, 'YYYY-MM-DD') as published_date,

    -- Search relevance metrics
    ROUND(text_relevance_score::NUMERIC, 4) as text_score,
    ROUND(enhanced_relevance_score::NUMERIC, 4) as relevance_score,
    search_rank,
    match_type,
    search_confidence,

    -- Content characteristics
    word_count,
    content_quality_tier,
    ROUND((word_count / NULLIF(reading_time_minutes, 0))::NUMERIC, 0) as reading_speed_wpm,

    -- Search highlights
    title_highlight,
    content_highlight,
    description_highlight,

    -- Tag analysis
    matching_tags,
    tag_match_count,

    -- Engagement and quality
    view_count,
    like_count,
    share_count,
    average_rating,
    engagement_score,

    -- Diversity indicators
    category_rank,
    author_rank,
    quality_tier_rank,

    -- Search metadata
    CASE 
        WHEN search_confidence = 'high_confidence' THEN 'Excellent match for your search'
        WHEN match_type = 'title_match' THEN 'Title contains your search terms'
        WHEN content_quality_tier = 'comprehensive_high_quality' THEN 'Comprehensive, high-quality content'
        WHEN engagement_score > 100 THEN 'Popular content with high engagement'
        ELSE 'Relevant match found'
    END as result_description,

    CURRENT_TIMESTAMP as search_performed_at

FROM search_analytics
WHERE 
    -- Result quality thresholds
    enhanced_relevance_score >= 0.2
    AND text_relevance_score >= 0.5

    -- Diversity constraints for better result variety
    AND category_rank <= 3          -- Max 3 results per category
    AND author_rank <= 2            -- Max 2 results per author  
    AND quality_tier_rank <= 5      -- Max 5 per quality tier

ORDER BY 
    enhanced_relevance_score DESC,
    search_rank ASC,
    engagement_score DESC
LIMIT 20;

-- Search faceting for advanced filtering
WITH search_facets AS (
    SELECT 
        category,
        author,
        content_quality_tier,
        EXTRACT(YEAR FROM published_date) as publication_year,
        COUNT(*) as result_count,
        AVG(enhanced_relevance_score) as avg_relevance,
        MAX(enhanced_relevance_score) as max_relevance

    FROM intelligent_search_results
    GROUP BY category, author, content_quality_tier, EXTRACT(YEAR FROM published_date)
    HAVING COUNT(*) >= 2  -- Minimum results threshold
)

SELECT 
    'category' as facet_type,
    category as facet_value,
    result_count,
    ROUND(avg_relevance::NUMERIC, 3) as avg_relevance_score,
    ROUND(max_relevance::NUMERIC, 3) as max_relevance_score
FROM search_facets
WHERE category IS NOT NULL

UNION ALL

SELECT 
    'author' as facet_type,
    author as facet_value,
    result_count,
    ROUND(avg_relevance::NUMERIC, 3) as avg_relevance_score,
    ROUND(max_relevance::NUMERIC, 3) as max_relevance_score
FROM search_facets  
WHERE author IS NOT NULL AND result_count >= 3

UNION ALL

SELECT 
    'content_quality' as facet_type,
    content_quality_tier as facet_value,
    result_count,
    ROUND(avg_relevance::NUMERIC, 3) as avg_relevance_score,
    ROUND(max_relevance::NUMERIC, 3) as max_relevance_score
FROM search_facets
WHERE content_quality_tier IS NOT NULL

UNION ALL

SELECT 
    'publication_year' as facet_type,
    publication_year::TEXT as facet_value,
    result_count,
    ROUND(avg_relevance::NUMERIC, 3) as avg_relevance_score,
    ROUND(max_relevance::NUMERIC, 3) as max_relevance_score
FROM search_facets
WHERE publication_year IS NOT NULL

ORDER BY facet_type, result_count DESC;

-- Search analytics and insights
CREATE VIEW search_performance_insights AS
WITH search_statistics AS (
    SELECT 
        DATE_TRUNC('day', search_performed_at) as search_date,
        search_query,
        COUNT(*) as search_frequency,
        AVG(results_count) as avg_results_returned,
        AVG(search_latency) as avg_search_latency,

        -- Query analysis
        LENGTH(search_query) as query_length,
        ARRAY_LENGTH(STRING_TO_ARRAY(search_query, ' '), 1) as query_word_count,

        -- Zero result tracking
        COUNT(*) FILTER (WHERE results_count = 0) as zero_result_searches,
        COUNT(*) FILTER (WHERE results_count > 0) as successful_searches,

        -- Performance classification
        COUNT(*) FILTER (WHERE search_latency <= 100) as fast_searches,
        COUNT(*) FILTER (WHERE search_latency <= 500) as acceptable_searches,
        COUNT(*) FILTER (WHERE search_latency > 500) as slow_searches

    FROM search_analytics_log
    WHERE search_performed_at >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY DATE_TRUNC('day', search_performed_at), search_query
    HAVING COUNT(*) >= 2  -- Focus on repeated queries
),

search_trends AS (
    SELECT 
        search_query,
        SUM(search_frequency) as total_searches,
        AVG(avg_results_returned) as overall_avg_results,
        AVG(avg_search_latency) as overall_avg_latency,

        -- Success rate calculation
        (SUM(successful_searches)::DECIMAL / NULLIF(SUM(search_frequency), 0)) * 100 as success_rate_percent,

        -- Performance score
        (SUM(fast_searches)::DECIMAL / NULLIF(SUM(search_frequency), 0)) * 100 as fast_search_percent,

        -- Query characteristics
        AVG(query_length) as avg_query_length,
        AVG(query_word_count) as avg_query_words,

        -- Trend analysis
        COUNT(DISTINCT search_date) as search_days,
        MIN(search_date) as first_search_date,
        MAX(search_date) as last_search_date

    FROM search_statistics
    GROUP BY search_query
),

query_insights AS (
    SELECT 
        st.*,

        -- Classification
        CASE 
            WHEN success_rate_percent >= 95 THEN 'high_performing_query'
            WHEN success_rate_percent >= 80 THEN 'good_performing_query'
            WHEN success_rate_percent >= 60 THEN 'fair_performing_query'
            ELSE 'poor_performing_query'
        END as query_performance_class,

        CASE 
            WHEN total_searches >= 100 THEN 'very_popular'
            WHEN total_searches >= 50 THEN 'popular'  
            WHEN total_searches >= 20 THEN 'moderately_popular'
            WHEN total_searches >= 10 THEN 'occasionally_used'
            ELSE 'rarely_used'
        END as query_popularity_class,

        CASE 
            WHEN fast_search_percent >= 90 THEN 'excellent_performance'
            WHEN fast_search_percent >= 70 THEN 'good_performance'
            WHEN fast_search_percent >= 50 THEN 'acceptable_performance'
            ELSE 'poor_performance'
        END as latency_performance_class,

        -- Recommendations
        CASE 
            WHEN success_rate_percent < 60 THEN 'Improve content coverage for this query'
            WHEN fast_search_percent < 50 THEN 'Optimize search performance'
            WHEN overall_avg_results < 5 THEN 'Expand content in this area'
            ELSE 'Query performing well'
        END as optimization_recommendation

    FROM search_trends st
)

SELECT 
    search_query,
    query_popularity_class,
    query_performance_class,  
    latency_performance_class,

    -- Key metrics
    total_searches,
    ROUND(success_rate_percent::NUMERIC, 1) as success_rate_pct,
    ROUND(overall_avg_results::NUMERIC, 1) as avg_results_returned,
    ROUND(overall_avg_latency::NUMERIC, 0) as avg_latency_ms,
    ROUND(fast_search_percent::NUMERIC, 1) as fast_searches_pct,

    -- Query characteristics
    ROUND(avg_query_length::NUMERIC, 1) as avg_character_length,
    ROUND(avg_query_words::NUMERIC, 1) as avg_word_count,

    -- Usage patterns
    search_days as days_active,
    TO_CHAR(first_search_date, 'YYYY-MM-DD') as first_seen,
    TO_CHAR(last_search_date, 'YYYY-MM-DD') as last_seen,

    -- Insights and recommendations
    optimization_recommendation,

    -- Priority scoring for optimization efforts
    CASE 
        WHEN query_popularity_class IN ('very_popular', 'popular') AND query_performance_class IN ('poor_performing_query', 'fair_performing_query') THEN 1
        WHEN query_popularity_class = 'very_popular' AND latency_performance_class = 'poor_performance' THEN 2
        WHEN query_performance_class = 'poor_performing_query' THEN 3
        ELSE 4
    END as optimization_priority

FROM query_insights
ORDER BY 
    optimization_priority ASC,
    total_searches DESC,
    success_rate_percent ASC;

-- QueryLeaf provides comprehensive full-text search capabilities:
-- 1. SQL-familiar TEXT_SEARCH function for complex text queries
-- 2. Advanced relevance scoring with customizable ranking algorithms  
-- 3. Built-in search highlighting and snippet generation
-- 4. Faceted search capabilities with aggregation-based filtering
-- 5. Search analytics and performance monitoring with SQL queries
-- 6. Auto-complete and suggestion generation using pattern matching
-- 7. Multi-language text search support with language-specific processing
-- 8. Enterprise search patterns with personalization and recommendations
-- 9. Native integration with MongoDB text indexing optimizations
-- 10. Familiar SQL patterns for complex search and content discovery requirements

Best Practices for Full-Text Search Implementation

Index Design and Optimization

Essential practices for production full-text search deployments:

  1. Weight Configuration: Assign appropriate weights to different fields based on their importance for relevance scoring
  2. Language Support: Configure language-specific processing for stemming, stop words, and linguistic analysis
  3. Index Maintenance: Monitor index performance and rebuild indexes when content patterns change significantly
  4. Query Optimization: Design search queries that leverage index capabilities while minimizing performance overhead
  5. Result Caching: Implement intelligent caching strategies for frequently executed search queries
  6. Performance Monitoring: Track search latency, relevance quality, and user satisfaction metrics

Enterprise Search Architecture

Design full-text search systems for enterprise-scale content discovery:

  1. Personalization Integration: Implement user behavior tracking and personalized ranking algorithms
  2. Content Quality Assessment: Develop metrics for content quality that influence search ranking
  3. Search Analytics: Establish comprehensive search analytics for query optimization and content gap analysis
  4. Faceted Navigation: Design intuitive faceted search interfaces that help users refine search results
  5. Auto-Complete Systems: Implement intelligent auto-complete that learns from user behavior and content
  6. Multi-Modal Search: Integrate text search with other search capabilities like vector similarity and geospatial queries

Conclusion

MongoDB Full-Text Search provides comprehensive intelligent content discovery capabilities that eliminate the complexity and limitations of traditional database text search approaches. The combination of advanced linguistic processing, intelligent relevance scoring, and sophisticated analytical capabilities enables modern applications to deliver the intelligent search experiences users expect while maintaining familiar database interaction patterns.

Key Full-Text Search benefits include:

  • Advanced Linguistic Processing: Native support for stemming, stop word filtering, and multi-language text analysis
  • Intelligent Relevance Scoring: Sophisticated ranking algorithms with customizable business logic integration
  • High-Performance Text Queries: Optimized text indexing with automatic query optimization and caching
  • Enterprise Search Features: Built-in support for faceted search, auto-complete, and search analytics
  • Seamless MongoDB Integration: Unified data access patterns with existing MongoDB operations and security
  • SQL Compatibility: Familiar search operations through QueryLeaf for accessible content discovery development

Whether you're building knowledge management systems, content discovery platforms, e-commerce search functionality, or any application requiring intelligent text search, MongoDB Full-Text Search with QueryLeaf's SQL-familiar interface provides the foundation for modern content discovery that scales efficiently while maintaining familiar interaction patterns.

QueryLeaf Integration: QueryLeaf automatically manages MongoDB Full-Text Search operations while providing SQL-familiar syntax for text search queries, relevance scoring, and search analytics. Advanced content discovery patterns including personalized search, faceted navigation, and search optimization are seamlessly accessible through familiar SQL constructs, making sophisticated search development both powerful and approachable for SQL-oriented development teams.

The combination of MongoDB's robust text search capabilities with SQL-style operations makes it an ideal platform for applications requiring both intelligent content discovery and familiar database query patterns, ensuring your search solutions remain both effective and maintainable as content volumes and user expectations evolve.

MongoDB Change Streams and Event-Driven Architecture: Real-Time Data Processing and Reactive Application Development with SQL-Compatible Operations

Modern applications increasingly require real-time responsiveness to data changes, enabling immediate updates across distributed systems, live dashboards, notification systems, and collaborative features. Traditional polling-based approaches create significant performance overhead, increase database load, and introduce unacceptable latency for responsive user experiences.

MongoDB Change Streams provide native event-driven capabilities that eliminate polling overhead through real-time change notifications, enabling sophisticated reactive architectures with guaranteed delivery, resumability, and comprehensive filtering. Unlike traditional database triggers or external message queues that require complex infrastructure management, Change Streams deliver enterprise-grade real-time data processing with automatic failover, distributed coordination, and seamless integration with MongoDB's operational model.

The Traditional Change Detection Challenge

Conventional approaches to detecting data changes involve significant complexity, performance penalties, and reliability issues:

-- Traditional PostgreSQL change detection - complex polling with performance overhead

-- Audit table approach with triggers (complex maintenance and performance impact)
CREATE TABLE product_audit (
    audit_id BIGSERIAL PRIMARY KEY,
    product_id BIGINT NOT NULL,
    operation_type VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
    old_data JSONB,
    new_data JSONB,
    changed_fields TEXT[],
    change_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    user_id BIGINT,
    session_id VARCHAR(100),
    application_context JSONB,

    -- Performance indexes
    INDEX audit_product_time_idx (product_id, change_timestamp DESC),
    INDEX audit_operation_time_idx (operation_type, change_timestamp DESC)
);

-- Complex trigger function for change tracking
CREATE OR REPLACE FUNCTION track_product_changes()
RETURNS TRIGGER AS $$
DECLARE
    old_json JSONB;
    new_json JSONB;
    changed_fields TEXT[] := ARRAY[]::TEXT[];
    field_name TEXT;
    field_value_old TEXT;
    field_value_new TEXT;
BEGIN
    -- Handle different operation types
    CASE TG_OP
        WHEN 'INSERT' THEN
            new_json := row_to_json(NEW)::JSONB;
            INSERT INTO product_audit (
                product_id, operation_type, new_data, 
                changed_fields, user_id, session_id
            ) VALUES (
                NEW.product_id, 'INSERT', new_json,
                array(select key from jsonb_each(new_json)),
                NEW.last_modified_by, NEW.session_id
            );
            RETURN NEW;

        WHEN 'UPDATE' THEN
            old_json := row_to_json(OLD)::JSONB;
            new_json := row_to_json(NEW)::JSONB;

            -- Complex field-by-field comparison for change detection
            FOR field_name IN SELECT key FROM jsonb_each(new_json) LOOP
                field_value_old := COALESCE((old_json->>field_name), '');
                field_value_new := COALESCE((new_json->>field_name), '');

                IF field_value_old != field_value_new THEN
                    changed_fields := array_append(changed_fields, field_name);
                END IF;
            END LOOP;

            -- Only log if there are actual changes
            IF array_length(changed_fields, 1) > 0 THEN
                INSERT INTO product_audit (
                    product_id, operation_type, old_data, new_data,
                    changed_fields, user_id, session_id
                ) VALUES (
                    NEW.product_id, 'UPDATE', old_json, new_json,
                    changed_fields, NEW.last_modified_by, NEW.session_id
                );
            END IF;
            RETURN NEW;

        WHEN 'DELETE' THEN
            old_json := row_to_json(OLD)::JSONB;
            INSERT INTO product_audit (
                product_id, operation_type, old_data,
                changed_fields, user_id, session_id
            ) VALUES (
                OLD.product_id, 'DELETE', old_json,
                array(select key from jsonb_each(old_json)),
                OLD.last_modified_by, OLD.session_id
            );
            RETURN OLD;
    END CASE;

    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Create triggers on multiple tables (maintenance overhead)
CREATE TRIGGER product_changes_trigger
    AFTER INSERT OR UPDATE OR DELETE ON products
    FOR EACH ROW EXECUTE FUNCTION track_product_changes();

CREATE TRIGGER inventory_changes_trigger  
    AFTER INSERT OR UPDATE OR DELETE ON inventory
    FOR EACH ROW EXECUTE FUNCTION track_inventory_changes();

-- Polling-based change consumption (expensive and unreliable)
WITH recent_changes AS (
    SELECT 
        pa.audit_id,
        pa.product_id,
        pa.operation_type,
        pa.old_data,
        pa.new_data,
        pa.changed_fields,
        pa.change_timestamp,
        pa.user_id,
        pa.session_id,

        -- Product context enrichment (expensive joins)
        p.name as product_name,
        p.category_id,
        p.current_price,
        p.status,
        c.category_name,

        -- Change analysis
        CASE 
            WHEN pa.operation_type = 'INSERT' THEN 'Product Created'
            WHEN pa.operation_type = 'UPDATE' AND 'current_price' = ANY(pa.changed_fields) THEN 'Price Updated'
            WHEN pa.operation_type = 'UPDATE' AND 'status' = ANY(pa.changed_fields) THEN 'Status Changed'
            WHEN pa.operation_type = 'UPDATE' THEN 'Product Modified'
            WHEN pa.operation_type = 'DELETE' THEN 'Product Removed'
        END as change_description,

        -- Business impact assessment
        CASE 
            WHEN pa.operation_type = 'UPDATE' AND 'current_price' = ANY(pa.changed_fields) THEN
                CASE 
                    WHEN (pa.new_data->>'current_price')::DECIMAL > (pa.old_data->>'current_price')::DECIMAL 
                    THEN 'Price Increase'
                    ELSE 'Price Decrease'
                END
            WHEN pa.operation_type = 'UPDATE' AND 'inventory_count' = ANY(pa.changed_fields) THEN
                CASE 
                    WHEN (pa.new_data->>'inventory_count')::INTEGER <= 5 THEN 'Low Stock Alert'
                    WHEN (pa.new_data->>'inventory_count')::INTEGER = 0 THEN 'Out of Stock'
                    ELSE 'Inventory Updated'
                END
        END as business_impact,

        -- Notification targeting
        CASE 
            WHEN pa.operation_type = 'INSERT' THEN ARRAY['product_managers', 'inventory_team']
            WHEN pa.operation_type = 'UPDATE' AND 'current_price' = ANY(pa.changed_fields) 
                THEN ARRAY['pricing_team', 'sales_team', 'customers']
            WHEN pa.operation_type = 'UPDATE' AND 'inventory_count' = ANY(pa.changed_fields) 
                THEN ARRAY['inventory_team', 'fulfillment']
            WHEN pa.operation_type = 'DELETE' THEN ARRAY['product_managers', 'customers']
            ELSE ARRAY['general_subscribers']
        END as notification_targets

    FROM product_audit pa
    LEFT JOIN products p ON pa.product_id = p.product_id
    LEFT JOIN categories c ON p.category_id = c.category_id
    WHERE pa.change_timestamp > (
        -- Get last processed timestamp (requires external state management)
        SELECT COALESCE(last_processed_timestamp, CURRENT_TIMESTAMP - INTERVAL '5 minutes')
        FROM change_processing_checkpoint 
        WHERE processor_name = 'product_change_handler'
    )
    ORDER BY pa.change_timestamp ASC
),

change_aggregation AS (
    -- Complex aggregation for batch processing
    SELECT 
        rc.product_id,
        rc.product_name,
        rc.category_name,
        COUNT(*) as total_changes,

        -- Change type counts
        COUNT(*) FILTER (WHERE operation_type = 'INSERT') as creates,
        COUNT(*) FILTER (WHERE operation_type = 'UPDATE') as updates, 
        COUNT(*) FILTER (WHERE operation_type = 'DELETE') as deletes,

        -- Business impact analysis
        COUNT(*) FILTER (WHERE business_impact LIKE '%Price%') as price_changes,
        COUNT(*) FILTER (WHERE business_impact LIKE '%Stock%') as inventory_changes,

        -- Change timeline
        MIN(change_timestamp) as first_change,
        MAX(change_timestamp) as last_change,
        EXTRACT(SECONDS FROM (MAX(change_timestamp) - MIN(change_timestamp))) as change_window_seconds,

        -- Most recent change details
        (array_agg(rc.operation_type ORDER BY rc.change_timestamp DESC))[1] as latest_operation,
        (array_agg(rc.change_description ORDER BY rc.change_timestamp DESC))[1] as latest_description,
        (array_agg(rc.business_impact ORDER BY rc.change_timestamp DESC))[1] as latest_impact,

        -- Notification consolidation
        array_agg(DISTINCT unnest(rc.notification_targets)) as all_notification_targets,

        -- Change velocity (changes per minute)
        CASE 
            WHEN EXTRACT(SECONDS FROM (MAX(change_timestamp) - MIN(change_timestamp))) > 0 
            THEN COUNT(*)::DECIMAL / (EXTRACT(SECONDS FROM (MAX(change_timestamp) - MIN(change_timestamp))) / 60)
            ELSE COUNT(*)::DECIMAL
        END as changes_per_minute

    FROM recent_changes rc
    GROUP BY rc.product_id, rc.product_name, rc.category_name
),

notification_prioritization AS (
    SELECT 
        ca.*,

        -- Priority scoring
        (
            -- Change frequency component
            LEAST(changes_per_minute * 2, 10) +

            -- Business impact component  
            CASE 
                WHEN price_changes > 0 THEN 5
                WHEN inventory_changes > 0 THEN 4
                WHEN creates > 0 THEN 3
                WHEN deletes > 0 THEN 6
                ELSE 1
            END +

            -- Recency component
            CASE 
                WHEN change_window_seconds < 300 THEN 3  -- Within 5 minutes
                WHEN change_window_seconds < 3600 THEN 2 -- Within 1 hour
                ELSE 1
            END
        ) as priority_score,

        -- Alert classification
        CASE 
            WHEN deletes > 0 THEN 'critical'
            WHEN price_changes > 0 AND changes_per_minute > 1 THEN 'high'
            WHEN inventory_changes > 0 THEN 'medium'
            WHEN creates > 0 THEN 'low'
            ELSE 'informational'
        END as alert_level,

        -- Message formatting
        CASE 
            WHEN total_changes = 1 THEN latest_description
            ELSE CONCAT(total_changes, ' changes to ', product_name, ' (', latest_description, ')')
        END as notification_message

    FROM change_aggregation ca
)

-- Final change processing output (still requires external message queue)
SELECT 
    np.product_id,
    np.product_name, 
    np.category_name,
    np.total_changes,
    np.priority_score,
    np.alert_level,
    np.notification_message,
    np.all_notification_targets,
    np.last_change,

    -- Processing metadata
    CURRENT_TIMESTAMP as processed_at,
    'product_change_handler' as processor_name,

    -- External system integration requirements
    CASE alert_level
        WHEN 'critical' THEN 'immediate_push_notification'
        WHEN 'high' THEN 'priority_email_and_push' 
        WHEN 'medium' THEN 'email_notification'
        ELSE 'dashboard_update_only'
    END as delivery_method,

    -- Routing information for message queue
    CASE 
        WHEN 'customers' = ANY(all_notification_targets) THEN 'customer_notifications_queue'
        WHEN 'pricing_team' = ANY(all_notification_targets) THEN 'internal_alerts_queue'
        ELSE 'general_updates_queue'
    END as routing_key,

    -- Deduplication key (manual implementation required)
    MD5(CONCAT(product_id, ':', array_to_string(all_notification_targets, ','), ':', DATE_TRUNC('minute', last_change))) as deduplication_key

FROM notification_prioritization np
WHERE priority_score >= 3  -- Filter low-priority notifications
ORDER BY priority_score DESC, last_change DESC;

-- Update checkpoint after processing (manual transaction management)
UPDATE change_processing_checkpoint 
SET 
    last_processed_timestamp = CURRENT_TIMESTAMP,
    processed_count = processed_count + (SELECT COUNT(*) FROM recent_changes),
    last_updated = CURRENT_TIMESTAMP
WHERE processor_name = 'product_change_handler';

-- Traditional polling approach problems:
-- 1. Expensive polling operations creating unnecessary database load
-- 2. Complex trigger-based audit tables requiring extensive maintenance
-- 3. Race conditions and missed changes during high-concurrency periods
-- 4. Manual checkpoint management and external state tracking required
-- 5. Complex field-level change detection with performance overhead
-- 6. No guaranteed delivery or automatic failure recovery mechanisms
-- 7. Difficult horizontal scaling of change processing systems
-- 8. External message queue infrastructure required for reliability
-- 9. Manual deduplication and ordering logic implementation required
-- 10. Limited filtering capabilities and expensive context enrichment queries

MongoDB Change Streams eliminate polling complexity with native real-time change notifications:

// MongoDB Change Streams - native real-time change processing with comprehensive event handling
const { MongoClient, ObjectId } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('ecommerce_platform');

// Advanced MongoDB Change Streams Manager
class MongoDBChangeStreamsManager {
  constructor(db, config = {}) {
    this.db = db;
    this.config = {
      // Change stream configuration
      enableChangeStreams: config.enableChangeStreams !== false,
      resumeAfterFailure: config.resumeAfterFailure !== false,
      batchSize: config.batchSize || 100,
      maxAwaitTimeMS: config.maxAwaitTimeMS || 1000,

      // Event processing configuration
      enableEventEnrichment: config.enableEventEnrichment !== false,
      enableEventFiltering: config.enableEventFiltering !== false,
      enableEventAggregation: config.enableEventAggregation !== false,
      enableEventRouting: config.enableEventRouting !== false,

      // Reliability and resilience  
      enableAutoResume: config.enableAutoResume !== false,
      enableDeadLetterQueue: config.enableDeadLetterQueue !== false,
      maxRetries: config.maxRetries || 3,
      retryDelayMs: config.retryDelayMs || 1000,

      // Performance optimization
      enableParallelProcessing: config.enableParallelProcessing !== false,
      processingConcurrency: config.processingConcurrency || 10,
      enableBatchProcessing: config.enableBatchProcessing !== false,
      batchProcessingWindowMs: config.batchProcessingWindowMs || 5000,

      // Monitoring and observability
      enableMetrics: config.enableMetrics !== false,
      enableLogging: config.enableLogging !== false,
      logLevel: config.logLevel || 'info',

      ...config
    };

    // Collection references
    this.collections = {
      products: db.collection('products'),
      inventory: db.collection('inventory'),
      orders: db.collection('orders'),
      customers: db.collection('customers'),

      // Event processing collections
      changeEvents: db.collection('change_events'),
      processingCheckpoints: db.collection('processing_checkpoints'),
      deadLetterQueue: db.collection('dead_letter_queue'),
      eventMetrics: db.collection('event_metrics')
    };

    // Change stream management
    this.changeStreams = new Map();
    this.eventProcessors = new Map();
    this.processingQueues = new Map();
    this.resumeTokens = new Map();

    // Performance metrics
    this.metrics = {
      eventsProcessed: 0,
      eventsFailured: 0,
      averageProcessingTime: 0,
      totalProcessingTime: 0,
      lastProcessedAt: null,
      processingErrors: []
    };

    this.initializeChangeStreams();
  }

  async initializeChangeStreams() {
    console.log('Initializing MongoDB Change Streams for real-time data processing...');

    try {
      // Setup change streams for different collections
      await this.setupProductChangeStream();
      await this.setupInventoryChangeStream();
      await this.setupOrderChangeStream();
      await this.setupCustomerChangeStream();

      // Setup cross-collection change aggregation
      await this.setupDatabaseChangeStream();

      // Initialize event processing infrastructure
      await this.setupEventProcessingInfrastructure();

      console.log('Change streams initialized successfully');

    } catch (error) {
      console.error('Error initializing change streams:', error);
      throw error;
    }
  }

  async setupProductChangeStream() {
    console.log('Setting up product change stream...');

    const productsCollection = this.collections.products;

    // Advanced change stream pipeline with filtering and enrichment
    const changeStreamPipeline = [
      // Stage 1: Filter relevant operations
      {
        $match: {
          $or: [
            { 'operationType': 'insert' },
            { 'operationType': 'update' },
            { 'operationType': 'delete' },
            { 'operationType': 'replace' }
          ],

          // Optional namespace filtering
          'ns.db': this.db.databaseName,
          'ns.coll': 'products'
        }
      },

      // Stage 2: Enrich change events with business context
      {
        $lookup: {
          from: 'categories',
          localField: 'fullDocument.categoryId',
          foreignField: '_id',
          as: 'categoryInfo'
        }
      },

      // Stage 3: Add computed fields and change analysis
      {
        $addFields: {
          // Event metadata
          eventId: { $toString: '$_id' },
          eventTimestamp: '$$NOW',
          collectionName: '$ns.coll',

          // Change analysis
          changedFields: {
            $cond: {
              if: { $eq: ['$operationType', 'update'] },
              then: { $objectToArray: '$updateDescription.updatedFields' },
              else: []
            }
          },

          // Business context
          categoryInfo: { $arrayElemAt: ['$categoryInfo', 0] },

          // Priority assessment
          eventPriority: {
            $switch: {
              branches: [
                {
                  case: { $eq: ['$operationType', 'delete'] },
                  then: 'critical'
                },
                {
                  case: {
                    $and: [
                      { $eq: ['$operationType', 'update'] },
                      { $ne: [{ $type: '$updateDescription.updatedFields.price' }, 'missing'] }
                    ]
                  },
                  then: 'high'
                },
                {
                  case: {
                    $and: [
                      { $eq: ['$operationType', 'update'] },
                      { $ne: [{ $type: '$updateDescription.updatedFields.inventory' }, 'missing'] }
                    ]
                  },
                  then: 'medium'
                },
                {
                  case: { $eq: ['$operationType', 'insert'] },
                  then: 'low'
                }
              ],
              default: 'informational'
            }
          },

          // Notification routing
          notificationTargets: {
            $switch: {
              branches: [
                {
                  case: { $eq: ['$operationType', 'insert'] },
                  then: ['product_managers', 'inventory_team']
                },
                {
                  case: {
                    $and: [
                      { $eq: ['$operationType', 'update'] },
                      { $ne: [{ $type: '$updateDescription.updatedFields.price' }, 'missing'] }
                    ]
                  },
                  then: ['pricing_team', 'sales_team', 'customers']
                },
                {
                  case: {
                    $and: [
                      { $eq: ['$operationType', 'update'] },
                      { $ne: [{ $type: '$updateDescription.updatedFields.inventory' }, 'missing'] }
                    ]
                  },
                  then: ['inventory_team', 'fulfillment']
                },
                {
                  case: { $eq: ['$operationType', 'delete'] },
                  then: ['product_managers', 'customers']
                }
              ],
              default: ['general_subscribers']
            }
          }
        }
      }
    ];

    // Create change stream with pipeline and options
    const productChangeStream = productsCollection.watch(changeStreamPipeline, {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable',
      batchSize: this.config.batchSize,
      maxAwaitTimeMS: this.config.maxAwaitTimeMS,
      resumeAfter: this.resumeTokens.get('products')
    });

    // Event processing handler
    productChangeStream.on('change', async (changeEvent) => {
      await this.processProductChangeEvent(changeEvent);
    });

    // Error handling and resume token management
    productChangeStream.on('error', async (error) => {
      console.error('Product change stream error:', error);
      await this.handleChangeStreamError('products', error);
    });

    productChangeStream.on('resumeTokenChanged', (resumeToken) => {
      this.resumeTokens.set('products', resumeToken);
      this.persistResumeToken('products', resumeToken);
    });

    this.changeStreams.set('products', productChangeStream);
    console.log('Product change stream setup complete');
  }

  async processProductChangeEvent(changeEvent) {
    const startTime = Date.now();

    try {
      console.log(`Processing product change event: ${changeEvent.operationType} for product ${changeEvent.documentKey._id}`);

      // Enrich change event with additional context
      const enrichedEvent = await this.enrichProductChangeEvent(changeEvent);

      // Apply business logic and routing
      const processedEvent = await this.applyProductBusinessLogic(enrichedEvent);

      // Route to appropriate handlers
      await this.routeProductChangeEvent(processedEvent);

      // Store event for audit and analytics
      await this.storeChangeEvent(processedEvent);

      // Update metrics
      this.updateProcessingMetrics(startTime, 'success');

    } catch (error) {
      console.error('Error processing product change event:', error);

      // Handle processing failure
      await this.handleEventProcessingError(changeEvent, error);
      this.updateProcessingMetrics(startTime, 'error');
    }
  }

  async enrichProductChangeEvent(changeEvent) {
    console.log('Enriching product change event with business context...');

    try {
      const enrichedEvent = {
        ...changeEvent,

        // Processing metadata
        processingId: new ObjectId(),
        processingTimestamp: new Date(),
        processorVersion: '1.0',

        // Document context (current and previous state)
        currentDocument: changeEvent.fullDocument,
        previousDocument: changeEvent.fullDocumentBeforeChange,

        // Change analysis
        changeAnalysis: await this.analyzeProductChange(changeEvent),

        // Business impact assessment
        businessImpact: await this.assessProductBusinessImpact(changeEvent),

        // Related data enrichment
        relatedData: await this.getRelatedProductData(changeEvent.documentKey._id),

        // Notification configuration
        notificationConfig: await this.getProductNotificationConfig(changeEvent),

        // Processing context
        processingContext: {
          correlationId: changeEvent.eventId,
          sourceCollection: changeEvent.collectionName,
          processingPipeline: 'product_changes',
          retryCount: 0,
          maxRetries: this.config.maxRetries
        }
      };

      return enrichedEvent;

    } catch (error) {
      console.error('Error enriching product change event:', error);
      throw error;
    }
  }

  async analyzeProductChange(changeEvent) {
    const analysis = {
      operationType: changeEvent.operationType,
      affectedFields: [],
      fieldChanges: {},
      changeType: 'unknown',
      changeSignificance: 'low'
    };

    switch (changeEvent.operationType) {
      case 'insert':
        analysis.changeType = 'product_creation';
        analysis.changeSignificance = 'medium';
        analysis.affectedFields = Object.keys(changeEvent.fullDocument || {});
        break;

      case 'update':
        if (changeEvent.updateDescription && changeEvent.updateDescription.updatedFields) {
          analysis.affectedFields = Object.keys(changeEvent.updateDescription.updatedFields);

          // Analyze specific field changes
          const updatedFields = changeEvent.updateDescription.updatedFields;

          for (const [field, newValue] of Object.entries(updatedFields)) {
            const oldValue = changeEvent.fullDocumentBeforeChange?.[field];

            analysis.fieldChanges[field] = {
              oldValue,
              newValue,
              changeType: this.classifyFieldChange(field, oldValue, newValue)
            };
          }

          // Determine change type and significance
          if ('price' in updatedFields) {
            analysis.changeType = 'price_update';
            analysis.changeSignificance = 'high';
          } else if ('inventory' in updatedFields) {
            analysis.changeType = 'inventory_update';
            analysis.changeSignificance = 'medium';
          } else if ('status' in updatedFields) {
            analysis.changeType = 'status_change';
            analysis.changeSignificance = 'medium';
          } else {
            analysis.changeType = 'product_modification';
            analysis.changeSignificance = 'low';
          }
        }
        break;

      case 'delete':
        analysis.changeType = 'product_deletion';
        analysis.changeSignificance = 'critical';
        break;

      case 'replace':
        analysis.changeType = 'product_replacement';
        analysis.changeSignificance = 'high';
        break;
    }

    return analysis;
  }

  classifyFieldChange(fieldName, oldValue, newValue) {
    switch (fieldName) {
      case 'price':
        if (newValue > oldValue) return 'price_increase';
        if (newValue < oldValue) return 'price_decrease';
        return 'price_change';

      case 'inventory':
        if (newValue === 0) return 'out_of_stock';
        if (newValue <= 5) return 'low_stock';
        if (newValue > oldValue) return 'stock_increase';
        if (newValue < oldValue) return 'stock_decrease';
        return 'inventory_adjustment';

      case 'status':
        if (newValue === 'discontinued') return 'product_discontinued';
        if (newValue === 'active' && oldValue !== 'active') return 'product_activated';
        if (newValue !== 'active' && oldValue === 'active') return 'product_deactivated';
        return 'status_change';

      default:
        return 'field_update';
    }
  }

  async assessProductBusinessImpact(changeEvent) {
    const impact = {
      impactLevel: 'low',
      impactAreas: [],
      affectedSystems: [],
      businessMetrics: {},
      actionRequired: false,
      recommendations: []
    };

    const productId = changeEvent.documentKey._id;
    const analysis = await this.analyzeProductChange(changeEvent);

    // Assess impact based on change type
    switch (analysis.changeType) {
      case 'price_update':
        impact.impactLevel = 'high';
        impact.impactAreas = ['revenue', 'customer_experience', 'competitive_positioning'];
        impact.affectedSystems = ['pricing_engine', 'recommendation_system', 'customer_notifications'];
        impact.actionRequired = true;
        impact.recommendations = [
          'Notify customers of price changes',
          'Update marketing materials',
          'Review competitive pricing'
        ];

        // Calculate price change impact
        const priceChange = analysis.fieldChanges?.price;
        if (priceChange) {
          impact.businessMetrics.priceChangePercentage = 
            ((priceChange.newValue - priceChange.oldValue) / priceChange.oldValue * 100).toFixed(2);
        }
        break;

      case 'inventory_update':
        impact.impactLevel = 'medium';
        impact.impactAreas = ['fulfillment', 'customer_experience'];
        impact.affectedSystems = ['inventory_management', 'order_processing'];

        const inventoryChange = analysis.fieldChanges?.inventory;
        if (inventoryChange) {
          if (inventoryChange.newValue === 0) {
            impact.impactLevel = 'high';
            impact.actionRequired = true;
            impact.recommendations = ['Update product availability', 'Notify backordered customers'];
          } else if (inventoryChange.newValue <= 5) {
            impact.recommendations = ['Monitor inventory levels', 'Plan restocking'];
          }
        }
        break;

      case 'product_deletion':
        impact.impactLevel = 'critical';
        impact.impactAreas = ['customer_experience', 'revenue', 'data_integrity'];
        impact.affectedSystems = ['catalog_management', 'order_processing', 'recommendations'];
        impact.actionRequired = true;
        impact.recommendations = [
          'Handle existing orders',
          'Update customer wishlists',
          'Archive product data',
          'Redirect product URLs'
        ];
        break;

      case 'product_creation':
        impact.impactLevel = 'medium';
        impact.impactAreas = ['catalog_expansion', 'revenue_opportunity'];
        impact.affectedSystems = ['search_indexing', 'recommendation_system', 'inventory_tracking'];
        impact.recommendations = [
          'Index for search',
          'Generate recommendations',
          'Create marketing content'
        ];
        break;
    }

    return impact;
  }

  async getRelatedProductData(productId) {
    try {
      // Get product relationships and context
      const relatedData = await Promise.allSettled([
        // Category information
        this.collections.products.aggregate([
          { $match: { _id: productId } },
          {
            $lookup: {
              from: 'categories',
              localField: 'categoryId',
              foreignField: '_id',
              as: 'category'
            }
          },
          { $project: { category: { $arrayElemAt: ['$category', 0] } } }
        ]).toArray(),

        // Inventory information
        this.collections.inventory.findOne({ productId: productId }),

        // Recent orders for this product
        this.collections.orders.find({
          'items.productId': productId,
          createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) } // Last 30 days
        }).limit(10).toArray(),

        // Customer interest metrics
        this.collections.analytics.findOne({
          productId: productId,
          type: 'product_engagement'
        })
      ]);

      const [categoryResult, inventoryResult, ordersResult, analyticsResult] = relatedData;

      return {
        category: categoryResult.status === 'fulfilled' ? categoryResult.value[0]?.category : null,
        inventory: inventoryResult.status === 'fulfilled' ? inventoryResult.value : null,
        recentOrders: ordersResult.status === 'fulfilled' ? ordersResult.value : [],
        analytics: analyticsResult.status === 'fulfilled' ? analyticsResult.value : null,
        dataRetrievedAt: new Date()
      };

    } catch (error) {
      console.error('Error getting related product data:', error);
      return { error: error.message };
    }
  }

  async getProductNotificationConfig(changeEvent) {
    const config = {
      enableNotifications: true,
      notificationTargets: changeEvent.notificationTargets || [],
      deliveryMethods: ['push', 'email'],
      priority: changeEvent.eventPriority || 'low',
      batching: {
        enabled: true,
        windowMs: 60000, // 1 minute
        maxBatchSize: 10
      },
      filtering: {
        enabled: true,
        rules: []
      }
    };

    // Customize based on event type and priority
    switch (changeEvent.operationType) {
      case 'delete':
        config.deliveryMethods = ['push', 'email', 'sms'];
        config.batching.enabled = false; // Immediate delivery
        break;

      case 'update':
        if (changeEvent.eventPriority === 'high') {
          config.deliveryMethods = ['push', 'email'];
          config.batching.windowMs = 30000; // 30 seconds
        }
        break;
    }

    return config;
  }

  async applyProductBusinessLogic(enrichedEvent) {
    console.log('Applying business logic to product change event...');

    try {
      const processedEvent = {
        ...enrichedEvent,

        // Business rules execution results
        businessRules: await this.executeProductBusinessRules(enrichedEvent),

        // Workflow triggers
        workflowTriggers: await this.identifyWorkflowTriggers(enrichedEvent),

        // Integration requirements
        integrationRequirements: await this.identifyIntegrationRequirements(enrichedEvent),

        // Compliance and governance
        complianceChecks: await this.performComplianceChecks(enrichedEvent)
      };

      return processedEvent;

    } catch (error) {
      console.error('Error applying business logic:', error);
      throw error;
    }
  }

  async executeProductBusinessRules(enrichedEvent) {
    const rules = [];
    const analysis = enrichedEvent.changeAnalysis;

    // Price change rules
    if (analysis.changeType === 'price_update') {
      const priceChange = analysis.fieldChanges.price;
      const changePercent = Math.abs(
        ((priceChange.newValue - priceChange.oldValue) / priceChange.oldValue) * 100
      );

      if (changePercent > 20) {
        rules.push({
          rule: 'significant_price_change',
          triggered: true,
          severity: 'high',
          action: 'require_manager_approval',
          details: `Price change of ${changePercent.toFixed(2)}% requires approval`
        });
      }

      if (priceChange.newValue < priceChange.oldValue * 0.5) {
        rules.push({
          rule: 'deep_discount_alert',
          triggered: true,
          severity: 'medium',
          action: 'fraud_detection_review',
          details: 'Price reduced by more than 50%'
        });
      }
    }

    // Inventory rules
    if (analysis.changeType === 'inventory_update') {
      const inventoryChange = analysis.fieldChanges.inventory;

      if (inventoryChange?.newValue === 0) {
        rules.push({
          rule: 'out_of_stock',
          triggered: true,
          severity: 'high',
          action: 'update_product_availability',
          details: 'Product is now out of stock'
        });
      }

      if (inventoryChange?.newValue <= 5 && inventoryChange?.newValue > 0) {
        rules.push({
          rule: 'low_stock_warning',
          triggered: true,
          severity: 'medium',
          action: 'reorder_notification',
          details: `Low stock: ${inventoryChange.newValue} units remaining`
        });
      }
    }

    // Product lifecycle rules
    if (analysis.changeType === 'product_deletion') {
      rules.push({
        rule: 'product_deletion',
        triggered: true,
        severity: 'critical',
        action: 'cleanup_related_data',
        details: 'Product deleted - cleanup required'
      });
    }

    return rules;
  }

  async routeProductChangeEvent(processedEvent) {
    console.log('Routing product change event to appropriate handlers...');

    try {
      const routingTasks = [];

      // Real-time notification routing
      if (processedEvent.notificationConfig.enableNotifications) {
        routingTasks.push(this.routeToNotificationSystem(processedEvent));
      }

      // Search index updates
      if (['insert', 'update', 'replace'].includes(processedEvent.operationType)) {
        routingTasks.push(this.routeToSearchIndexing(processedEvent));
      }

      // Analytics and reporting
      routingTasks.push(this.routeToAnalytics(processedEvent));

      // Integration webhooks
      if (processedEvent.integrationRequirements?.webhooks?.length > 0) {
        routingTasks.push(this.routeToWebhooks(processedEvent));
      }

      // Workflow automation
      if (processedEvent.workflowTriggers?.length > 0) {
        routingTasks.push(this.routeToWorkflowEngine(processedEvent));
      }

      // Business intelligence
      routingTasks.push(this.routeToBusinessIntelligence(processedEvent));

      // Execute routing tasks concurrently
      await Promise.allSettled(routingTasks);

    } catch (error) {
      console.error('Error routing product change event:', error);
      throw error;
    }
  }

  async routeToNotificationSystem(processedEvent) {
    console.log('Routing to notification system...');

    const notification = {
      eventId: processedEvent.processingId,
      eventType: 'product_change',
      operationType: processedEvent.operationType,
      productId: processedEvent.documentKey._id,
      priority: processedEvent.eventPriority,
      targets: processedEvent.notificationTargets,
      deliveryMethods: processedEvent.notificationConfig.deliveryMethods,

      message: this.generateNotificationMessage(processedEvent),
      payload: {
        productDetails: processedEvent.currentDocument,
        changeAnalysis: processedEvent.changeAnalysis,
        businessImpact: processedEvent.businessImpact
      },

      routing: {
        immediate: processedEvent.eventPriority === 'critical',
        batchable: processedEvent.notificationConfig.batching.enabled,
        batchWindowMs: processedEvent.notificationConfig.batching.windowMs
      },

      createdAt: new Date()
    };

    // Route to notification queue (could be MongoDB collection, message queue, etc.)
    await this.collections.notifications.insertOne(notification);

    return notification;
  }

  generateNotificationMessage(processedEvent) {
    const analysis = processedEvent.changeAnalysis;
    const product = processedEvent.currentDocument;

    switch (analysis.changeType) {
      case 'product_creation':
        return `New product added: ${product.name}`;

      case 'price_update':
        const priceChange = analysis.fieldChanges.price;
        const direction = priceChange.newValue > priceChange.oldValue ? 'increased' : 'decreased';
        return `Price ${direction} for ${product.name}: $${priceChange.oldValue} → $${priceChange.newValue}`;

      case 'inventory_update':
        const inventoryChange = analysis.fieldChanges.inventory;
        if (inventoryChange.newValue === 0) {
          return `${product.name} is now out of stock`;
        } else if (inventoryChange.newValue <= 5) {
          return `Low stock alert: ${product.name} (${inventoryChange.newValue} remaining)`;
        } else {
          return `Inventory updated for ${product.name}: ${inventoryChange.newValue} units`;
        }

      case 'product_deletion':
        return `Product removed: ${product.name}`;

      default:
        return `Product updated: ${product.name}`;
    }
  }

  async routeToSearchIndexing(processedEvent) {
    console.log('Routing to search indexing system...');

    const indexUpdate = {
      eventId: processedEvent.processingId,
      operationType: processedEvent.operationType,
      documentId: processedEvent.documentKey._id,
      collection: 'products',

      document: processedEvent.currentDocument,
      priority: processedEvent.eventPriority === 'critical' ? 'immediate' : 'normal',

      indexingInstructions: {
        fullReindex: processedEvent.operationType === 'insert',
        partialUpdate: processedEvent.operationType === 'update',
        deleteFromIndex: processedEvent.operationType === 'delete',
        affectedFields: processedEvent.changeAnalysis.affectedFields
      },

      createdAt: new Date()
    };

    await this.collections.searchIndexUpdates.insertOne(indexUpdate);
    return indexUpdate;
  }

  async setupDatabaseChangeStream() {
    console.log('Setting up database-wide change stream for cross-collection analytics...');

    // Database-level change stream for comprehensive monitoring
    const databaseChangeStream = this.db.watch([
      {
        $match: {
          'operationType': { $in: ['insert', 'update', 'delete'] },
          'ns.db': this.db.databaseName,
          'ns.coll': { $in: ['products', 'orders', 'customers', 'inventory'] }
        }
      },
      {
        $addFields: {
          eventId: { $toString: '$_id' },
          eventTimestamp: '$$NOW',

          // Cross-collection correlation
          correlationContext: {
            $switch: {
              branches: [
                {
                  case: { $eq: ['$ns.coll', 'products'] },
                  then: {
                    type: 'product_event',
                    productId: '$documentKey._id',
                    correlationKey: '$documentKey._id'
                  }
                },
                {
                  case: { $eq: ['$ns.coll', 'orders'] },
                  then: {
                    type: 'order_event',
                    orderId: '$documentKey._id',
                    correlationKey: '$fullDocument.customerId'
                  }
                }
              ],
              default: { type: 'generic_event' }
            }
          }
        }
      }
    ], {
      fullDocument: 'updateLookup'
    });

    databaseChangeStream.on('change', async (changeEvent) => {
      await this.processDatabaseChangeEvent(changeEvent);
    });

    this.changeStreams.set('database', databaseChangeStream);
    console.log('Database change stream setup complete');
  }

  async processDatabaseChangeEvent(changeEvent) {
    try {
      // Cross-collection event correlation and analytics
      await this.performCrossCollectionAnalytics(changeEvent);

      // Real-time business metrics updates
      await this.updateRealTimeMetrics(changeEvent);

      // Event pattern detection
      await this.detectEventPatterns(changeEvent);

    } catch (error) {
      console.error('Error processing database change event:', error);
    }
  }

  async storeChangeEvent(processedEvent) {
    try {
      const changeEventRecord = {
        eventId: processedEvent.processingId,
        resumeToken: processedEvent._id,

        // Event identification
        operationType: processedEvent.operationType,
        collection: processedEvent.ns?.coll,
        documentId: processedEvent.documentKey._id,

        // Timing information
        clusterTime: processedEvent.clusterTime,
        eventTimestamp: processedEvent.eventTimestamp,
        processingTimestamp: processedEvent.processingTimestamp,

        // Change details
        changeAnalysis: processedEvent.changeAnalysis,
        businessImpact: processedEvent.businessImpact,

        // Processing results
        businessRules: processedEvent.businessRules,
        routingResults: processedEvent.routingResults,

        // Status and metadata
        processingStatus: 'completed',
        processingVersion: processedEvent.processorVersion,

        // Audit trail
        createdAt: new Date(),
        retentionPolicy: 'standard' // Keep for standard retention period
      };

      await this.collections.changeEvents.insertOne(changeEventRecord);

    } catch (error) {
      console.error('Error storing change event:', error);
      // Don't throw - storage failure shouldn't stop processing
    }
  }

  async handleEventProcessingError(changeEvent, error) {
    console.log('Handling event processing error...');

    try {
      const errorRecord = {
        eventId: new ObjectId(),
        originalEventId: changeEvent.eventId,

        // Error details
        error: {
          name: error.name,
          message: error.message,
          stack: error.stack,
          code: error.code
        },

        // Event context
        changeEvent: changeEvent,
        processingAttempt: (changeEvent.processingContext?.retryCount || 0) + 1,
        maxRetries: this.config.maxRetries,

        // Status
        status: 'pending_retry',
        nextRetryAt: new Date(Date.now() + this.config.retryDelayMs),

        createdAt: new Date()
      };

      // Store in dead letter queue if max retries exceeded
      if (errorRecord.processingAttempt >= this.config.maxRetries) {
        errorRecord.status = 'dead_letter';
        errorRecord.nextRetryAt = null;
      }

      await this.collections.deadLetterQueue.insertOne(errorRecord);

      // Schedule retry if applicable
      if (errorRecord.status === 'pending_retry') {
        setTimeout(() => {
          this.retryEventProcessing(errorRecord);
        }, this.config.retryDelayMs);
      }

    } catch (storeError) {
      console.error('Error storing failed event:', storeError);
    }
  }

  updateProcessingMetrics(startTime, status) {
    const processingTime = Date.now() - startTime;

    this.metrics.eventsProcessed++;
    this.metrics.totalProcessingTime += processingTime;
    this.metrics.averageProcessingTime = this.metrics.totalProcessingTime / this.metrics.eventsProcessed;
    this.metrics.lastProcessedAt = new Date();

    if (status === 'error') {
      this.metrics.eventsFailured++;
    }

    if (this.config.enableMetrics) {
      // Log metrics periodically
      if (this.metrics.eventsProcessed % 100 === 0) {
        console.log(`Processing metrics: ${this.metrics.eventsProcessed} events processed, ` +
                   `${this.metrics.averageProcessingTime.toFixed(2)}ms avg processing time, ` +
                   `${this.metrics.eventsFailured} failures`);
      }
    }
  }

  async persistResumeToken(streamName, resumeToken) {
    try {
      await this.collections.processingCheckpoints.updateOne(
        { streamName: streamName },
        {
          $set: {
            resumeToken: resumeToken,
            lastUpdated: new Date()
          }
        },
        { upsert: true }
      );
    } catch (error) {
      console.error(`Error persisting resume token for ${streamName}:`, error);
    }
  }

  async loadResumeTokens() {
    try {
      const checkpoints = await this.collections.processingCheckpoints.find({}).toArray();

      for (const checkpoint of checkpoints) {
        this.resumeTokens.set(checkpoint.streamName, checkpoint.resumeToken);
      }

      console.log(`Loaded ${checkpoints.length} resume tokens`);
    } catch (error) {
      console.error('Error loading resume tokens:', error);
    }
  }

  async getProcessingStatistics() {
    return {
      activeStreams: this.changeStreams.size,
      eventsProcessed: this.metrics.eventsProcessed,
      eventsFailured: this.metrics.eventsFailured,
      averageProcessingTime: this.metrics.averageProcessingTime,
      successRate: ((this.metrics.eventsProcessed - this.metrics.eventsFailured) / this.metrics.eventsProcessed * 100).toFixed(2),
      lastProcessedAt: this.metrics.lastProcessedAt,

      // Stream-specific metrics
      streamMetrics: Object.fromEntries(this.changeStreams.keys().map(name => [
        name, 
        { active: true, resumeToken: this.resumeTokens.has(name) }
      ]))
    };
  }

  async shutdown() {
    console.log('Shutting down Change Streams Manager...');

    // Close all change streams
    for (const [name, stream] of this.changeStreams) {
      try {
        await stream.close();
        console.log(`Closed change stream: ${name}`);
      } catch (error) {
        console.error(`Error closing change stream ${name}:`, error);
      }
    }

    // Final metrics log
    const stats = await this.getProcessingStatistics();
    console.log('Final processing statistics:', stats);

    console.log('Change Streams Manager shutdown complete');
  }
}

// Benefits of MongoDB Change Streams:
// - Real-time change notifications without polling overhead
// - Guaranteed delivery with automatic resume capability and failure recovery
// - Advanced filtering and aggregation pipelines for targeted event processing
// - Comprehensive change context including before/after document state
// - Native integration with MongoDB's replica set and sharding architecture
// - Atomic change detection with cluster-wide ordering guarantees
// - Efficient resource utilization with intelligent batching and buffering
// - Seamless integration with existing MongoDB operations and security
// - SQL-compatible event processing through QueryLeaf integration
// - Production-ready reliability with built-in error handling and retry logic

module.exports = {
  MongoDBChangeStreamsManager
};

Understanding MongoDB Change Streams Architecture

Advanced Event-Driven Patterns for Real-Time Applications

Implement sophisticated change stream patterns for production event-driven systems:

// Production-ready Change Streams with advanced event processing and routing
class ProductionChangeStreamsProcessor extends MongoDBChangeStreamsManager {
  constructor(db, productionConfig) {
    super(db, productionConfig);

    this.productionConfig = {
      ...productionConfig,
      enableEventSourcing: true,
      enableCQRS: true,
      enableEventStore: true,
      enableSagaOrchestration: true,
      enableEventProjections: true,
      enableSnapshotting: true
    };

    this.setupProductionEventProcessing();
    this.initializeEventSourcing();
    this.setupCQRSProjections();
    this.setupSagaOrchestration();
  }

  async implementEventSourcingPattern() {
    console.log('Implementing event sourcing pattern with Change Streams...');

    const eventSourcingStrategy = {
      // Event store management
      eventStore: {
        enableEventPersistence: true,
        enableEventReplay: true,
        enableSnapshotting: true,
        snapshotFrequency: 1000
      },

      // Command handling
      commandHandling: {
        enableCommandValidation: true,
        enableCommandProjections: true,
        enableCommandSagas: true
      },

      // Query projections
      queryProjections: {
        enableRealTimeProjections: true,
        enableMaterializedViews: true,
        enableProjectionRecovery: true
      }
    };

    return await this.deployEventSourcing(eventSourcingStrategy);
  }

  async setupAdvancedEventRouting() {
    console.log('Setting up advanced event routing and distribution...');

    const routingStrategy = {
      // Message routing
      messageRouting: {
        enableTopicRouting: true,
        enableContentRouting: true,
        enableGeographicRouting: true,
        enableLoadBalancing: true
      },

      // Event transformation
      eventTransformation: {
        enableEventEnrichment: true,
        enableEventFiltering: true,
        enableEventAggregation: true,
        enableEventSplitting: true
      },

      // Delivery guarantees
      deliveryGuarantees: {
        enableAtLeastOnceDelivery: true,
        enableExactlyOnceDelivery: true,
        enableOrderedDelivery: true,
        enableDuplicateDetection: true
      }
    };

    return await this.deployAdvancedRouting(routingStrategy);
  }

  async implementReactiveStreams() {
    console.log('Implementing reactive streams for backpressure management...');

    const reactiveConfig = {
      // Backpressure handling
      backpressure: {
        enableFlowControl: true,
        bufferStrategy: 'drop_oldest',
        maxBufferSize: 10000,
        backpressureThreshold: 0.8
      },

      // Stream processing
      streamProcessing: {
        enableParallelProcessing: true,
        parallelismLevel: 10,
        enableBatching: true,
        batchSize: 100
      },

      // Error handling
      errorHandling: {
        enableCircuitBreaker: true,
        enableRetryLogic: true,
        enableDeadLetterQueue: true,
        enableGracefulDegradation: true
      }
    };

    return await this.deployReactiveStreams(reactiveConfig);
  }
}

SQL-Style Change Stream Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB Change Streams and event-driven operations:

-- QueryLeaf change stream operations with SQL-familiar syntax

-- Create change stream monitoring with SQL-style syntax
CREATE CHANGE_STREAM product_changes 
ON products 
WITH (
  -- Change stream configuration
  full_document = 'updateLookup',
  full_document_before_change = 'whenAvailable',
  batch_size = 100,
  max_await_time_ms = 1000,

  -- Event filtering
  FILTER (
    operation_type IN ('insert', 'update', 'delete') AND
    namespace.database = 'ecommerce' AND
    namespace.collection = 'products'
  ),

  -- Event enrichment pipeline
  ENRICH (
    -- Add business context
    category_info FROM categories USING fullDocument.categoryId,
    inventory_info FROM inventory USING documentKey._id,

    -- Compute derived fields
    event_priority = CASE 
      WHEN operation_type = 'delete' THEN 'critical'
      WHEN operation_type = 'update' AND updateDescription.updatedFields.price IS NOT NULL THEN 'high'
      WHEN operation_type = 'update' AND updateDescription.updatedFields.inventory IS NOT NULL THEN 'medium'
      ELSE 'low'
    END,

    -- Change analysis
    change_type = CASE
      WHEN operation_type = 'insert' THEN 'product_creation'
      WHEN operation_type = 'update' AND updateDescription.updatedFields.price IS NOT NULL THEN 'price_update'
      WHEN operation_type = 'update' AND updateDescription.updatedFields.inventory IS NOT NULL THEN 'inventory_update'
      WHEN operation_type = 'delete' THEN 'product_deletion'
      ELSE 'product_modification'
    END
  )
);

-- Monitor change events with SQL queries
SELECT 
  event_id,
  operation_type,
  document_key._id as product_id,
  full_document.name as product_name,
  full_document.price as current_price,

  -- Change analysis
  change_type,
  event_priority,
  cluster_time,

  -- Business context
  category_info.name as category_name,
  inventory_info.quantity as current_inventory,

  -- Change details for updates
  CASE 
    WHEN operation_type = 'update' THEN
      JSON_BUILD_OBJECT(
        'updated_fields', updateDescription.updatedFields,
        'removed_fields', updateDescription.removedFields,
        'truncated_arrays', updateDescription.truncatedArrays
      )
    ELSE NULL
  END as update_details,

  -- Price change analysis
  CASE 
    WHEN change_type = 'price_update' THEN
      JSON_BUILD_OBJECT(
        'old_price', fullDocumentBeforeChange.price,
        'new_price', fullDocument.price,
        'change_amount', fullDocument.price - fullDocumentBeforeChange.price,
        'change_percentage', 
          ROUND(
            ((fullDocument.price - fullDocumentBeforeChange.price) / 
             fullDocumentBeforeChange.price) * 100, 
            2
          )
      )
    ELSE NULL
  END as price_change_analysis,

  -- Inventory change analysis
  CASE 
    WHEN change_type = 'inventory_update' THEN
      JSON_BUILD_OBJECT(
        'old_inventory', fullDocumentBeforeChange.inventory,
        'new_inventory', fullDocument.inventory,
        'change_amount', fullDocument.inventory - fullDocumentBeforeChange.inventory,
        'stock_status', 
          CASE 
            WHEN fullDocument.inventory = 0 THEN 'out_of_stock'
            WHEN fullDocument.inventory <= 5 THEN 'low_stock'
            WHEN fullDocument.inventory > fullDocumentBeforeChange.inventory THEN 'restocked'
            ELSE 'normal'
          END
      )
    ELSE NULL
  END as inventory_change_analysis

FROM CHANGE_STREAM product_changes
WHERE cluster_time > TIMESTAMP '2025-01-05 00:00:00'
ORDER BY cluster_time DESC;

-- Event aggregation and analytics
WITH change_events AS (
  SELECT 
    *,
    DATE_TRUNC('hour', cluster_time) as hour_bucket,
    DATE_TRUNC('day', cluster_time) as day_bucket
  FROM CHANGE_STREAM product_changes
  WHERE cluster_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
),

hourly_change_metrics AS (
  SELECT 
    hour_bucket,

    -- Operation counts
    COUNT(*) as total_events,
    COUNT(*) FILTER (WHERE operation_type = 'insert') as product_creates,
    COUNT(*) FILTER (WHERE operation_type = 'update') as product_updates,
    COUNT(*) FILTER (WHERE operation_type = 'delete') as product_deletes,

    -- Change type analysis
    COUNT(*) FILTER (WHERE change_type = 'price_update') as price_changes,
    COUNT(*) FILTER (WHERE change_type = 'inventory_update') as inventory_changes,
    COUNT(*) FILTER (WHERE change_type = 'product_creation') as new_products,

    -- Priority distribution
    COUNT(*) FILTER (WHERE event_priority = 'critical') as critical_events,
    COUNT(*) FILTER (WHERE event_priority = 'high') as high_priority_events,
    COUNT(*) FILTER (WHERE event_priority = 'medium') as medium_priority_events,
    COUNT(*) FILTER (WHERE event_priority = 'low') as low_priority_events,

    -- Business impact metrics
    AVG(CAST(price_change_analysis->>'change_percentage' AS DECIMAL)) as avg_price_change_pct,
    COUNT(*) FILTER (WHERE inventory_change_analysis->>'stock_status' = 'out_of_stock') as out_of_stock_events,
    COUNT(*) FILTER (WHERE inventory_change_analysis->>'stock_status' = 'low_stock') as low_stock_events,

    -- Unique products affected
    COUNT(DISTINCT document_key._id) as unique_products_affected,
    COUNT(DISTINCT category_info.name) as categories_affected

  FROM change_events
  GROUP BY hour_bucket
),

change_velocity_analysis AS (
  SELECT 
    hcm.*,

    -- Change velocity metrics
    total_events / 60.0 as events_per_minute,
    unique_products_affected / 60.0 as products_changed_per_minute,

    -- Change intensity scoring
    CASE 
      WHEN critical_events > 10 THEN 'very_high_intensity'
      WHEN high_priority_events > 50 THEN 'high_intensity'
      WHEN total_events > 100 THEN 'moderate_intensity'
      ELSE 'normal_intensity'
    END as change_intensity,

    -- Business activity classification
    CASE 
      WHEN price_changes > total_events * 0.3 THEN 'pricing_focused'
      WHEN inventory_changes > total_events * 0.4 THEN 'inventory_focused'
      WHEN new_products > total_events * 0.2 THEN 'catalog_expansion'
      ELSE 'general_maintenance'
    END as activity_pattern,

    -- Alert thresholds
    CASE 
      WHEN critical_events > 5 OR out_of_stock_events > 20 THEN 'alert_required'
      WHEN high_priority_events > 30 OR events_per_minute > 5 THEN 'monitoring_required'
      ELSE 'normal_operations'
    END as operational_status

  FROM hourly_change_metrics hcm
)

SELECT 
  TO_CHAR(hour_bucket, 'YYYY-MM-DD HH24:00') as hour,

  -- Core metrics
  total_events,
  ROUND(events_per_minute, 2) as events_per_minute,
  unique_products_affected,
  categories_affected,

  -- Operation breakdown
  product_creates,
  product_updates,
  product_deletes,

  -- Change type breakdown  
  price_changes,
  inventory_changes,

  -- Priority breakdown
  critical_events,
  high_priority_events,
  medium_priority_events,
  low_priority_events,

  -- Business insights
  change_intensity,
  activity_pattern,
  operational_status,

  -- Impact metrics
  ROUND(COALESCE(avg_price_change_pct, 0), 2) as avg_price_change_pct,
  out_of_stock_events,
  low_stock_events,

  -- Health indicators
  ROUND((total_events - critical_events)::DECIMAL / total_events * 100, 1) as operational_health_pct,

  -- Recommendations
  CASE operational_status
    WHEN 'alert_required' THEN 'Immediate attention required - high critical event volume'
    WHEN 'monitoring_required' THEN 'Increased monitoring recommended'
    ELSE 'Normal operations - continue monitoring'
  END as recommendation

FROM change_velocity_analysis
ORDER BY hour_bucket DESC;

-- Real-time event routing and notifications
CREATE TRIGGER change_event_router
  ON CHANGE_STREAM product_changes
  FOR EACH CHANGE_EVENT
  EXECUTE FUNCTION (
    -- Route critical events immediately
    WHEN event_priority = 'critical' THEN
      NOTIFY 'critical_alerts' WITH PAYLOAD JSON_BUILD_OBJECT(
        'event_id', event_id,
        'product_id', document_key._id,
        'operation', operation_type,
        'priority', event_priority,
        'timestamp', cluster_time
      ),

    -- Batch medium/low priority events
    WHEN event_priority IN ('medium', 'low') THEN
      INSERT INTO event_batch_queue (
        event_id, event_priority, event_data, batch_window
      ) VALUES (
        event_id, 
        event_priority, 
        JSON_BUILD_OBJECT(
          'product_id', document_key._id,
          'operation', operation_type,
          'change_type', change_type,
          'details', full_document
        ),
        DATE_TRUNC('minute', CURRENT_TIMESTAMP, 5) -- 5-minute batching window
      ),

    -- Route to search indexing
    WHEN operation_type IN ('insert', 'update') THEN
      INSERT INTO search_index_updates (
        document_id, collection_name, operation_type, 
        document_data, priority, created_at
      ) VALUES (
        document_key._id, 
        'products', 
        operation_type,
        full_document,
        CASE WHEN event_priority = 'critical' THEN 'immediate' ELSE 'normal' END,
        CURRENT_TIMESTAMP
      ),

    -- Route to analytics pipeline
    ALWAYS THEN
      INSERT INTO analytics_events (
        event_id, event_type, collection_name, document_id,
        operation_type, event_data, processing_priority, created_at
      ) VALUES (
        event_id,
        'change_stream_event',
        'products',
        document_key._id,
        operation_type,
        JSON_BUILD_OBJECT(
          'change_type', change_type,
          'priority', event_priority,
          'business_context', JSON_BUILD_OBJECT(
            'category', category_info.name,
            'inventory', inventory_info.quantity
          )
        ),
        event_priority,
        CURRENT_TIMESTAMP
      )
  );

-- Event sourcing and audit trail queries
CREATE VIEW product_change_audit AS
SELECT 
  event_id,
  document_key._id as product_id,
  operation_type,
  cluster_time as event_time,

  -- Change details
  change_type,

  -- Document states
  full_document as current_state,
  full_document_before_change as previous_state,

  -- Change delta for updates
  CASE 
    WHEN operation_type = 'update' THEN
      JSON_BUILD_OBJECT(
        'updated_fields', updateDescription.updatedFields,
        'removed_fields', updateDescription.removedFields
      )
    ELSE NULL
  END as change_delta,

  -- Business impact
  event_priority,

  -- Audit metadata
  resume_token,
  wall_time,

  -- Computed audit fields
  ROW_NUMBER() OVER (
    PARTITION BY document_key._id 
    ORDER BY cluster_time
  ) as change_sequence,

  LAG(cluster_time) OVER (
    PARTITION BY document_key._id 
    ORDER BY cluster_time
  ) as previous_change_time,

  -- Time between changes
  EXTRACT(SECONDS FROM (
    cluster_time - LAG(cluster_time) OVER (
      PARTITION BY document_key._id 
      ORDER BY cluster_time
    )
  )) as seconds_since_last_change

FROM CHANGE_STREAM product_changes
ORDER BY document_key._id, cluster_time;

-- Advanced pattern detection
WITH product_lifecycle_events AS (
  SELECT 
    document_key._id as product_id,
    operation_type,
    change_type,
    cluster_time,

    -- Lifecycle stage detection
    CASE 
      WHEN operation_type = 'insert' THEN 'creation'
      WHEN operation_type = 'delete' THEN 'deletion'
      WHEN change_type = 'price_update' THEN 'pricing_management'
      WHEN change_type = 'inventory_update' THEN 'inventory_management'
      ELSE 'maintenance'
    END as lifecycle_stage,

    -- Change frequency analysis
    COUNT(*) OVER (
      PARTITION BY document_key._id 
      ORDER BY cluster_time 
      RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
    ) as changes_last_hour,

    -- Pattern detection
    LAG(change_type) OVER (
      PARTITION BY document_key._id 
      ORDER BY cluster_time
    ) as previous_change_type,

    LEAD(change_type) OVER (
      PARTITION BY document_key._id 
      ORDER BY cluster_time
    ) as next_change_type

  FROM CHANGE_STREAM product_changes
  WHERE cluster_time >= CURRENT_TIMESTAMP - INTERVAL '7 days'
),

change_patterns AS (
  SELECT 
    product_id,
    lifecycle_stage,
    change_type,
    previous_change_type,
    next_change_type,
    changes_last_hour,
    cluster_time,

    -- Pattern identification
    CASE 
      WHEN changes_last_hour > 10 THEN 'high_frequency_changes'
      WHEN change_type = 'price_update' AND previous_change_type = 'price_update' THEN 'price_oscillation'
      WHEN change_type = 'inventory_update' AND changes_last_hour > 5 THEN 'inventory_volatility'
      WHEN lifecycle_stage = 'creation' AND next_change_type = 'price_update' THEN 'immediate_pricing_adjustment'
      WHEN lifecycle_stage = 'deletion' AND previous_change_type = 'inventory_update' THEN 'clearance_deletion'
      ELSE 'normal_pattern'
    END as pattern_type,

    -- Anomaly scoring
    CASE 
      WHEN changes_last_hour > 20 THEN 5  -- Very high frequency
      WHEN changes_last_hour > 10 THEN 3  -- High frequency
      WHEN change_type = previous_change_type AND change_type = next_change_type THEN 2  -- Repetitive changes
      ELSE 0
    END as anomaly_score

  FROM product_lifecycle_events
),

pattern_alerts AS (
  SELECT 
    cp.*,

    -- Alert classification
    CASE 
      WHEN anomaly_score >= 5 THEN 'critical_pattern_anomaly'
      WHEN anomaly_score >= 3 THEN 'unusual_pattern_detected'
      WHEN pattern_type IN ('price_oscillation', 'inventory_volatility') THEN 'business_pattern_concern'
      ELSE 'normal_pattern'
    END as alert_level,

    -- Recommended actions
    CASE pattern_type
      WHEN 'high_frequency_changes' THEN 'Investigate automated system behavior'
      WHEN 'price_oscillation' THEN 'Review pricing strategy and rules'
      WHEN 'inventory_volatility' THEN 'Check inventory management system'
      WHEN 'clearance_deletion' THEN 'Verify clearance process completion'
      ELSE 'Continue monitoring'
    END as recommended_action

  FROM change_patterns cp
  WHERE anomaly_score > 0 OR pattern_type != 'normal_pattern'
)

SELECT 
  product_id,
  lifecycle_stage,
  pattern_type,
  alert_level,
  anomaly_score,
  changes_last_hour,
  TO_CHAR(cluster_time, 'YYYY-MM-DD HH24:MI:SS') as event_time,
  recommended_action,

  -- Pattern context
  CASE 
    WHEN alert_level = 'critical_pattern_anomaly' THEN 
      'CRITICAL: Unusual change frequency detected - immediate investigation required'
    WHEN alert_level = 'unusual_pattern_detected' THEN
      'WARNING: Pattern anomaly detected - monitoring recommended'
    WHEN alert_level = 'business_pattern_concern' THEN
      'BUSINESS ALERT: Review business process associated with detected pattern'
    ELSE 'INFO: Pattern identified for awareness'
  END as alert_description

FROM pattern_alerts
ORDER BY anomaly_score DESC, cluster_time DESC
LIMIT 100;

-- QueryLeaf provides comprehensive change stream capabilities:
-- 1. SQL-familiar change stream creation and monitoring syntax
-- 2. Advanced event filtering and enrichment with business context
-- 3. Real-time event routing and notification triggers
-- 4. Comprehensive change analytics and velocity analysis
-- 5. Pattern detection and anomaly identification
-- 6. Event sourcing and audit trail capabilities
-- 7. Business rule integration and automated responses  
-- 8. Cross-collection change correlation and analysis
-- 9. Production-ready error handling and resume capabilities
-- 10. Native integration with MongoDB Change Streams performance optimization

Best Practices for Change Streams Implementation

Event Processing Strategy and Performance Optimization

Essential principles for effective MongoDB Change Streams deployment:

  1. Resume Token Management: Implement robust resume token persistence and recovery strategies for guaranteed delivery
  2. Pipeline Optimization: Design change stream pipelines that minimize network traffic and processing overhead
  3. Error Handling: Implement comprehensive error handling with retry logic and dead letter queue management
  4. Filtering Strategy: Apply server-side filtering to reduce client processing load and network usage
  5. Batch Processing: Implement intelligent batching for high-volume event processing scenarios
  6. Performance Monitoring: Track change stream performance metrics and optimize based on usage patterns

Production Event-Driven Architecture

Optimize Change Streams for enterprise-scale event-driven systems:

  1. Scalability Design: Plan for horizontal scaling with appropriate sharding and replica set configurations
  2. Fault Tolerance: Implement automatic failover and recovery mechanisms for change stream processors
  3. Event Enrichment: Design efficient event enrichment patterns that balance context with performance
  4. Integration Patterns: Establish clear integration patterns with external systems and message queues
  5. Security Considerations: Implement proper authentication and authorization for change stream access
  6. Operational Monitoring: Deploy comprehensive monitoring and alerting for change stream health

Conclusion

MongoDB Change Streams provide comprehensive real-time event-driven capabilities that eliminate the complexity and performance overhead of traditional polling-based change detection. The combination of guaranteed delivery, automatic resume capabilities, and sophisticated filtering makes Change Streams ideal for building responsive, event-driven applications that scale efficiently with growing data volumes.

Key MongoDB Change Streams benefits include:

  • Real-Time Notifications: Native change notifications without polling overhead or database performance impact
  • Guaranteed Delivery: Automatic resume capability and failure recovery with cluster-wide ordering guarantees
  • Advanced Filtering: Server-side aggregation pipelines for targeted event processing and context enrichment
  • Production Ready: Built-in error handling, retry logic, and integration with MongoDB's operational model
  • Event-Driven Architecture: Native support for reactive patterns, event sourcing, and CQRS implementations
  • SQL Accessibility: Familiar SQL-style change stream operations through QueryLeaf for accessible event processing

Whether you're building real-time dashboards, notification systems, data synchronization services, or comprehensive event-driven architectures, MongoDB Change Streams with QueryLeaf's familiar SQL interface provide the foundation for scalable, responsive applications.

QueryLeaf Integration: QueryLeaf seamlessly manages MongoDB Change Streams while providing SQL-familiar syntax for change event monitoring, filtering, and processing. Advanced event-driven patterns including real-time analytics, pattern detection, and automated routing are elegantly handled through familiar SQL constructs, making sophisticated event processing both powerful and accessible to SQL-oriented development teams.

The combination of MongoDB's robust change stream capabilities with SQL-style event operations makes it an ideal platform for applications requiring both real-time responsiveness and familiar database interaction patterns, ensuring your event-driven systems can evolve with changing requirements while maintaining reliable, high-performance operation.

MongoDB Atlas Vector Search for AI Applications: Advanced Semantic Similarity and Machine Learning Integration

Modern AI applications require sophisticated vector similarity search capabilities to power recommendation systems, retrieval-augmented generation (RAG), content discovery, and semantic search experiences. Traditional database systems struggle with high-dimensional vector operations, requiring complex integration with specialized vector databases that add architectural complexity, operational overhead, and data consistency challenges across multiple systems.

MongoDB Atlas Vector Search provides native support for high-dimensional vector similarity operations, enabling AI-powered applications to store, index, and query vector embeddings at scale while maintaining transactional consistency and familiar database operations. Unlike standalone vector databases that require separate infrastructure and complex data synchronization, Atlas Vector Search integrates seamlessly with existing MongoDB deployments, providing unified data management for both structured data and AI vector embeddings.

The Traditional Vector Search Challenge

Building AI applications with conventional database architectures creates significant technical and operational complexity:

-- Traditional PostgreSQL vector search - requires extensions and complex setup

-- Install pgvector extension (complex setup and maintenance)
CREATE EXTENSION IF NOT EXISTS vector;

-- Vector storage table with limited optimization capabilities
CREATE TABLE document_embeddings (
    document_id BIGSERIAL PRIMARY KEY,
    document_title TEXT NOT NULL,
    document_content TEXT NOT NULL,
    document_category VARCHAR(100),
    document_metadata JSONB,

    -- Vector storage (limited to specific dimensions)
    content_embedding vector(1536),  -- OpenAI ada-002 dimensions
    title_embedding vector(1536),

    -- Metadata for AI processing
    embedding_model VARCHAR(100) DEFAULT 'text-embedding-ada-002',
    embedding_created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Document processing
    word_count INTEGER,
    language_code VARCHAR(10),
    content_hash VARCHAR(64),

    -- System metadata
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Constraints and indexing
    CONSTRAINT valid_content_length CHECK (LENGTH(document_content) > 0),
    CONSTRAINT valid_embedding_dimensions CHECK (vector_dims(content_embedding) = 1536)
);

-- Create vector indexes (limited optimization options)
CREATE INDEX idx_content_embedding_cosine ON document_embeddings 
USING ivfflat (content_embedding vector_cosine_ops) 
WITH (lists = 100);

CREATE INDEX idx_title_embedding_l2 ON document_embeddings 
USING ivfflat (title_embedding vector_l2_ops) 
WITH (lists = 100);

-- Standard indexes for hybrid search
CREATE INDEX idx_category_created ON document_embeddings (document_category, created_at DESC);
CREATE INDEX idx_metadata_gin ON document_embeddings USING GIN (document_metadata);
CREATE INDEX idx_content_hash ON document_embeddings (content_hash);

-- Vector similarity search with limited performance and scalability
WITH vector_search_results AS (
    SELECT 
        document_id,
        document_title,
        document_content,
        document_category,
        document_metadata,

        -- Similarity calculations (computationally expensive)
        1 - (content_embedding <=> $1::vector) as cosine_similarity,
        content_embedding <-> $1::vector as l2_distance,
        content_embedding <#> $1::vector as inner_product,

        -- Metadata matching
        word_count,
        language_code,
        created_at

    FROM document_embeddings
    WHERE 
        -- Pre-filtering to reduce vector search scope
        document_category = $2  -- Category filter
        AND language_code = $3  -- Language filter
        AND created_at >= $4    -- Date range filter

        -- Vector similarity threshold (rough filtering)
        AND content_embedding <=> $1::vector < 0.3  -- Cosine distance threshold

    ORDER BY content_embedding <=> $1::vector  -- Sort by similarity
    LIMIT 50  -- Limit to manage performance
),

enhanced_results AS (
    SELECT 
        vsr.*,

        -- Additional metadata enrichment (limited capabilities)
        CASE 
            WHEN cosine_similarity >= 0.8 THEN 'highly_relevant'
            WHEN cosine_similarity >= 0.6 THEN 'relevant' 
            WHEN cosine_similarity >= 0.4 THEN 'somewhat_relevant'
            ELSE 'low_relevance'
        END as relevance_category,

        -- Content analysis (basic text processing only)
        LENGTH(document_content) as content_length,
        array_length(string_to_array(document_content, ' '), 1) as estimated_word_count,

        -- Ranking score combination
        (cosine_similarity * 0.7 + 
         CASE WHEN document_metadata->>'priority' = 'high' THEN 0.3 ELSE 0.0 END) as combined_score,

        -- Query metadata
        CURRENT_TIMESTAMP as search_performed_at

    FROM vector_search_results vsr
)

SELECT 
    document_id,
    document_title,
    LEFT(document_content, 200) || '...' as content_preview,
    document_category,

    -- Similarity metrics
    ROUND(cosine_similarity::NUMERIC, 4) as similarity_score,
    relevance_category,
    ROUND(combined_score::NUMERIC, 4) as ranking_score,

    -- Document metadata
    word_count,
    content_length,
    language_code,
    TO_CHAR(created_at, 'YYYY-MM-DD HH24:MI') as document_created,

    -- Search metadata
    search_performed_at

FROM enhanced_results
WHERE cosine_similarity >= 0.3  -- Minimum relevance threshold
ORDER BY combined_score DESC, cosine_similarity DESC
LIMIT 20;

-- Complex RAG (Retrieval-Augmented Generation) implementation
CREATE OR REPLACE FUNCTION execute_rag_query(
    query_embedding vector(1536),
    query_text TEXT,
    context_limit INTEGER DEFAULT 5,
    similarity_threshold NUMERIC DEFAULT 0.4
) RETURNS TABLE (
    context_documents JSONB,
    total_context_length INTEGER,
    average_similarity NUMERIC,
    generated_response TEXT
) AS $$
DECLARE
    context_docs JSONB := '[]'::JSONB;
    total_length INTEGER := 0;
    avg_similarity NUMERIC;
    doc_record RECORD;
    context_text TEXT := '';
BEGIN
    -- Retrieve relevant documents for context
    FOR doc_record IN
        SELECT 
            document_title,
            document_content,
            1 - (content_embedding <=> query_embedding) as similarity,
            LENGTH(document_content) as content_length
        FROM document_embeddings
        WHERE 1 - (content_embedding <=> query_embedding) >= similarity_threshold
        ORDER BY content_embedding <=> query_embedding
        LIMIT context_limit
    LOOP
        -- Build context for generation
        context_docs := context_docs || jsonb_build_object(
            'title', doc_record.document_title,
            'content', LEFT(doc_record.document_content, 1000),
            'similarity', doc_record.similarity,
            'length', doc_record.content_length
        );

        context_text := context_text || E'\n\n' || doc_record.document_title || E':\n' || 
                        LEFT(doc_record.document_content, 1000);
        total_length := total_length + doc_record.content_length;
    END LOOP;

    -- Calculate average similarity
    SELECT AVG((doc->>'similarity')::NUMERIC) INTO avg_similarity
    FROM jsonb_array_elements(context_docs) as doc;

    -- Return context information (actual LLM generation would be external)
    RETURN QUERY SELECT 
        context_docs,
        total_length,
        COALESCE(avg_similarity, 0.0),
        'Generated response would be created by external LLM service using context: ' || 
        LEFT(context_text, 200) || '...' as generated_response;

END;
$$ LANGUAGE plpgsql;

-- Execute RAG query (requires external LLM integration)
SELECT * FROM execute_rag_query(
    $1::vector,  -- Query embedding
    'What are the best practices for machine learning?',  -- Original query
    5,  -- Context documents limit
    0.4 -- Similarity threshold
);

-- Problems with traditional vector search approaches:
-- 1. Limited vector dimensions and performance optimization
-- 2. Complex setup and maintenance of vector extensions
-- 3. Poor integration between vector search and document metadata
-- 4. Limited scaling capabilities for high-dimensional vectors
-- 5. No native support for multiple similarity metrics
-- 6. Complex hybrid search combining vector and traditional queries
-- 7. Limited machine learning pipeline integration
-- 8. Expensive computational overhead for similarity calculations
-- 9. No native support for embedding model versioning
-- 10. Difficult operational management of vector indexes

MongoDB Atlas Vector Search provides native, high-performance vector operations:

// MongoDB Atlas Vector Search - native AI-powered vector similarity with unified data management
const { MongoClient } = require('mongodb');

// Advanced Atlas Vector Search Manager
class AtlasVectorSearchManager {
  constructor(connectionString, vectorConfig = {}) {
    this.connectionString = connectionString;
    this.client = null;
    this.db = null;

    // Vector search configuration
    this.config = {
      // Embedding configuration
      defaultEmbeddingModel: vectorConfig.defaultEmbeddingModel || 'text-embedding-ada-002',
      embeddingDimensions: vectorConfig.embeddingDimensions || 1536,
      embeddingProvider: vectorConfig.embeddingProvider || 'openai',

      // Vector index configuration
      vectorIndexes: vectorConfig.vectorIndexes || {},
      similarityMetrics: vectorConfig.similarityMetrics || ['cosine', 'euclidean', 'dotProduct'],

      // Search optimization
      enableHybridSearch: vectorConfig.enableHybridSearch !== false,
      enableSemanticCaching: vectorConfig.enableSemanticCaching !== false,
      defaultSearchLimit: vectorConfig.defaultSearchLimit || 20,

      // Performance tuning
      numCandidates: vectorConfig.numCandidates || 100,
      searchThreads: vectorConfig.searchThreads || 4,

      // AI integration
      enableRAGPipeline: vectorConfig.enableRAGPipeline !== false,
      enableRecommendations: vectorConfig.enableRecommendations !== false,
      enableSemanticAnalytics: vectorConfig.enableSemanticAnalytics !== false
    };

    this.initializeVectorSearch();
  }

  async initializeVectorSearch() {
    console.log('Initializing Atlas Vector Search system...');

    try {
      // Connect to MongoDB Atlas
      this.client = new MongoClient(this.connectionString);
      await this.client.connect();
      this.db = this.client.db();

      // Setup collections and vector indexes
      await this.setupVectorInfrastructure();

      // Initialize AI integration services
      await this.setupAIIntegration();

      console.log('Atlas Vector Search system initialized successfully');

    } catch (error) {
      console.error('Error initializing vector search:', error);
      throw error;
    }
  }

  async setupVectorInfrastructure() {
    console.log('Setting up vector search infrastructure...');

    try {
      // Create collections with optimized configuration
      this.collections = {
        documents: this.db.collection('documents'),
        userInteractions: this.db.collection('user_interactions'),
        searchAnalytics: this.db.collection('search_analytics'),
        embeddingCache: this.db.collection('embedding_cache')
      };

      // Create vector search indexes
      await this.createVectorIndexes();

      // Create supporting indexes for hybrid search
      await this.createHybridSearchIndexes();

      console.log('Vector infrastructure setup completed');

    } catch (error) {
      console.error('Error setting up vector infrastructure:', error);
      throw error;
    }
  }

  async createVectorIndexes() {
    console.log('Creating optimized vector search indexes...');

    try {
      // Primary content vector index with multiple similarity metrics
      const contentVectorIndex = {
        name: "vector_index_content_embeddings",
        type: "vectorSearch",
        definition: {
          fields: [
            {
              type: "vector",
              path: "contentEmbedding",
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"  // Primary similarity metric
            },
            {
              type: "filter",
              path: "category"
            },
            {
              type: "filter", 
              path: "language"
            },
            {
              type: "filter",
              path: "tags"
            },
            {
              type: "filter",
              path: "metadata.contentType"
            },
            {
              type: "filter",
              path: "isPublished"
            }
          ]
        }
      };

      // Title/summary vector index for heading-based search
      const titleVectorIndex = {
        name: "vector_index_title_embeddings",
        type: "vectorSearch",
        definition: {
          fields: [
            {
              type: "vector",
              path: "titleEmbedding", 
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"
            },
            {
              type: "filter",
              path: "category"
            },
            {
              type: "filter",
              path: "metadata.priority"
            }
          ]
        }
      };

      // Multi-modal vector index for images and rich content
      const multiModalVectorIndex = {
        name: "vector_index_multimodal_embeddings",
        type: "vectorSearch", 
        definition: {
          fields: [
            {
              type: "vector",
              path: "imageEmbedding",
              numDimensions: 768,  // CLIP model dimensions
              similarity: "cosine"
            },
            {
              type: "vector",
              path: "textEmbedding",
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"
            },
            {
              type: "filter",
              path: "mediaType"
            }
          ]
        }
      };

      // User behavior vector index for recommendations
      const userVectorIndex = {
        name: "vector_index_user_preferences",
        type: "vectorSearch",
        definition: {
          fields: [
            {
              type: "vector",
              path: "preferenceEmbedding",
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"
            },
            {
              type: "filter",
              path: "userSegment"
            },
            {
              type: "filter",
              path: "isActive"
            }
          ]
        }
      };

      // Create the vector indexes
      const indexCreationTasks = [
        this.createCollectionIndex('documents', contentVectorIndex),
        this.createCollectionIndex('documents', titleVectorIndex),
        this.createCollectionIndex('documents', multiModalVectorIndex),
        this.createCollectionIndex('userInteractions', userVectorIndex)
      ];

      await Promise.all(indexCreationTasks);

      console.log('Vector indexes created successfully');

    } catch (error) {
      console.error('Error creating vector indexes:', error);
      throw error;
    }
  }

  async createCollectionIndex(collectionName, indexDefinition) {
    try {
      const collection = this.collections[collectionName];
      await collection.createIndex(
        indexDefinition.definition.fields.reduce((acc, field) => {
          if (field.type === 'vector') {
            acc[field.path] = 'vector';
          }
          return acc;
        }, {}),
        {
          name: indexDefinition.name,
          background: true
        }
      );
    } catch (error) {
      console.error(`Error creating index ${indexDefinition.name}:`, error);
    }
  }

  async performAdvancedVectorSearch(searchConfig) {
    console.log('Performing advanced vector search...');

    const searchStartTime = Date.now();

    try {
      // Build comprehensive vector search aggregation pipeline
      const vectorSearchPipeline = [
        // Stage 1: Vector similarity search
        {
          $vectorSearch: {
            index: searchConfig.indexName || "vector_index_content_embeddings",
            path: searchConfig.vectorPath || "contentEmbedding",
            queryVector: searchConfig.queryVector,
            numCandidates: searchConfig.numCandidates || this.config.numCandidates,
            limit: searchConfig.limit || this.config.defaultSearchLimit,

            // Advanced filtering for hybrid search
            filter: {
              $and: [
                ...(searchConfig.categoryFilter ? [{ category: { $in: searchConfig.categoryFilter } }] : []),
                ...(searchConfig.languageFilter ? [{ language: searchConfig.languageFilter }] : []),
                ...(searchConfig.dateRange ? [{
                  createdAt: {
                    $gte: searchConfig.dateRange.start,
                    $lte: searchConfig.dateRange.end
                  }
                }] : []),
                ...(searchConfig.tagsFilter ? [{ tags: { $in: searchConfig.tagsFilter } }] : []),
                ...(searchConfig.customFilters || [])
              ]
            }
          }
        },

        // Stage 2: Add similarity score and metadata enrichment
        {
          $addFields: {
            // Similarity scoring and ranking
            searchScore: { $meta: "vectorSearchScore" },
            searchRank: { $meta: "vectorSearchRank" },

            // Content analysis and metadata
            contentLength: { $strLenCP: "$content" },
            wordCount: {
              $size: {
                $split: ["$content", " "]
              }
            },

            // Relevance classification
            relevanceCategory: {
              $switch: {
                branches: [
                  {
                    case: { $gte: [{ $meta: "vectorSearchScore" }, 0.8] },
                    then: "highly_relevant"
                  },
                  {
                    case: { $gte: [{ $meta: "vectorSearchScore" }, 0.6] },
                    then: "relevant"
                  },
                  {
                    case: { $gte: [{ $meta: "vectorSearchScore" }, 0.4] },
                    then: "somewhat_relevant"
                  }
                ],
                default: "low_relevance"
              }
            },

            // Enhanced ranking with business logic
            enhancedScore: {
              $add: [
                { $meta: "vectorSearchScore" },

                // Boost for high-priority content
                {
                  $cond: [
                    { $eq: ["$metadata.priority", "high"] },
                    0.1,
                    0
                  ]
                },

                // Boost for recent content
                {
                  $cond: [
                    {
                      $gte: [
                        "$createdAt",
                        { $subtract: [new Date(), 7 * 24 * 60 * 60 * 1000] }
                      ]
                    },
                    0.05,
                    0
                  ]
                },

                // Boost for popular content
                {
                  $multiply: [
                    { $divide: [{ $ifNull: ["$analytics.viewCount", 0] }, 1000] },
                    0.02
                  ]
                }
              ]
            },

            // Search metadata
            searchMetadata: {
              queryProcessedAt: new Date(),
              indexUsed: searchConfig.indexName || "vector_index_content_embeddings",
              numCandidatesSearched: searchConfig.numCandidates || this.config.numCandidates
            }
          }
        },

        // Stage 3: Content enrichment and analysis
        {
          $lookup: {
            from: "user_interactions",
            let: { docId: "$_id" },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $eq: ["$documentId", "$$docId"] },
                      { $gte: ["$interactionDate", { $subtract: [new Date(), 30 * 24 * 60 * 60 * 1000] }] }
                    ]
                  }
                }
              },
              {
                $group: {
                  _id: null,
                  totalInteractions: { $sum: 1 },
                  averageRating: { $avg: "$rating" },
                  interactionTypes: { $addToSet: "$interactionType" },
                  uniqueUsers: { $addToSet: "$userId" }
                }
              }
            ],
            as: "recentInteractions"
          }
        },

        // Stage 4: Related content discovery
        {
          $lookup: {
            from: "documents",
            let: { 
              currentCategory: "$category",
              currentTags: "$tags",
              currentId: "$_id"
            },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $ne: ["$_id", "$$currentId"] },
                      { 
                        $or: [
                          { $eq: ["$category", "$$currentCategory"] },
                          { $in: ["$$currentTags", "$tags"] }
                        ]
                      }
                    ]
                  }
                }
              },
              {
                $sample: { size: 3 }
              },
              {
                $project: {
                  _id: 1,
                  title: 1,
                  category: 1,
                  tags: 1
                }
              }
            ],
            as: "relatedContent"
          }
        },

        // Stage 5: Final enrichment and formatting
        {
          $addFields: {
            // Interaction analytics
            interactionMetrics: {
              $cond: [
                { $gt: [{ $size: "$recentInteractions" }, 0] },
                {
                  totalInteractions: { $arrayElemAt: ["$recentInteractions.totalInteractions", 0] },
                  averageRating: { $arrayElemAt: ["$recentInteractions.averageRating", 0] },
                  uniqueUserCount: { $size: { $arrayElemAt: ["$recentInteractions.uniqueUsers", 0] } }
                },
                {
                  totalInteractions: 0,
                  averageRating: null,
                  uniqueUserCount: 0
                }
              ]
            },

            // Content summary for preview
            contentPreview: {
              $concat: [
                { $substr: ["$content", 0, 200] },
                "..."
              ]
            },

            // Final ranking score incorporating all factors
            finalScore: {
              $add: [
                "$enhancedScore",

                // Interaction quality boost
                {
                  $multiply: [
                    { $ifNull: ["$interactionMetrics.averageRating", 0] },
                    0.02
                  ]
                },

                // Engagement boost
                {
                  $multiply: [
                    { $divide: [{ $ifNull: ["$interactionMetrics.totalInteractions", 0] }, 100] },
                    0.03
                  ]
                }
              ]
            }
          }
        },

        // Stage 6: Final projection and cleanup
        {
          $project: {
            // Core content information
            title: 1,
            contentPreview: 1,
            category: 1,
            tags: 1,
            author: 1,
            createdAt: 1,
            language: 1,

            // Search relevance metrics
            searchScore: { $round: ["$searchScore", 4] },
            enhancedScore: { $round: ["$enhancedScore", 4] },
            finalScore: { $round: ["$finalScore", 4] },
            relevanceCategory: 1,
            searchRank: 1,

            // Content metrics
            contentLength: 1,
            wordCount: 1,

            // Engagement metrics
            interactionMetrics: 1,

            // Related content
            relatedContent: 1,

            // Metadata
            metadata: 1,
            searchMetadata: 1
          }
        },

        // Stage 7: Final sorting and ranking
        {
          $sort: {
            finalScore: -1,
            searchScore: -1,
            createdAt: -1
          }
        }
      ];

      // Execute the comprehensive vector search
      const searchResults = await this.collections.documents
        .aggregate(vectorSearchPipeline, {
          allowDiskUse: true,
          maxTimeMS: 30000
        })
        .toArray();

      const searchLatency = Date.now() - searchStartTime;

      // Log search analytics
      await this.logSearchAnalytics({
        queryVector: searchConfig.queryVector,
        resultsCount: searchResults.length,
        searchLatency: searchLatency,
        searchConfig: searchConfig,
        timestamp: new Date()
      });

      console.log(`Vector search completed: ${searchResults.length} results in ${searchLatency}ms`);

      return {
        success: true,
        results: searchResults,
        searchMetadata: {
          latency: searchLatency,
          resultsCount: searchResults.length,
          indexUsed: searchConfig.indexName || "vector_index_content_embeddings",
          numCandidatesSearched: searchConfig.numCandidates || this.config.numCandidates
        }
      };

    } catch (error) {
      console.error('Error performing vector search:', error);
      return {
        success: false,
        error: error.message,
        searchMetadata: {
          latency: Date.now() - searchStartTime
        }
      };
    }
  }

  async executeRAGPipeline(queryText, searchConfig = {}) {
    console.log('Executing RAG (Retrieval-Augmented Generation) pipeline...');

    try {
      // Generate query embedding (in production, this would call embedding API)
      const queryEmbedding = await this.generateEmbedding(queryText);

      // Perform vector search for relevant context
      const contextSearch = await this.performAdvancedVectorSearch({
        ...searchConfig,
        queryVector: queryEmbedding,
        limit: searchConfig.contextLimit || 5,
        numCandidates: searchConfig.numCandidates || 50
      });

      if (!contextSearch.success) {
        throw new Error(`Context retrieval failed: ${contextSearch.error}`);
      }

      // Build context for generation
      const contextDocuments = contextSearch.results;
      const contextText = contextDocuments
        .map((doc, index) => {
          return `Document ${index + 1} (Relevance: ${doc.relevanceCategory}):\nTitle: ${doc.title}\nContent: ${doc.contentPreview}`;
        })
        .join('\n\n');

      // Calculate context quality metrics
      const contextMetrics = {
        documentCount: contextDocuments.length,
        averageRelevance: contextDocuments.reduce((sum, doc) => sum + doc.searchScore, 0) / contextDocuments.length,
        totalContextLength: contextText.length,
        categories: [...new Set(contextDocuments.map(doc => doc.category))],
        languages: [...new Set(contextDocuments.map(doc => doc.language))]
      };

      // In production, this would call LLM API for generation
      const generatedResponse = await this.simulateResponseGeneration(queryText, contextText, contextMetrics);

      // Store RAG execution for analytics
      await this.logRAGExecution({
        query: queryText,
        contextMetrics: contextMetrics,
        responseGenerated: true,
        executionTime: Date.now(),
        contextDocuments: contextDocuments.map(doc => ({
          id: doc._id,
          title: doc.title,
          relevanceScore: doc.searchScore
        }))
      });

      return {
        success: true,
        query: queryText,
        contextDocuments: contextDocuments,
        contextMetrics: contextMetrics,
        generatedResponse: generatedResponse,
        searchMetadata: contextSearch.searchMetadata
      };

    } catch (error) {
      console.error('Error executing RAG pipeline:', error);
      return {
        success: false,
        error: error.message
      };
    }
  }

  async generateRecommendations(userId, recommendationConfig = {}) {
    console.log(`Generating recommendations for user: ${userId}`);

    try {
      // Get user interaction history and preferences
      const userProfile = await this.buildUserProfile(userId);

      if (!userProfile.success) {
        throw new Error(`Unable to build user profile: ${userProfile.error}`);
      }

      // Generate user preference embedding
      const userEmbedding = await this.generateUserPreferenceEmbedding(userProfile.data);

      // Find similar content based on user preferences
      const recommendationSearch = await this.performAdvancedVectorSearch({
        queryVector: userEmbedding,
        indexName: "vector_index_content_embeddings",
        limit: recommendationConfig.limit || 10,
        numCandidates: recommendationConfig.numCandidates || 100,

        // Filter out already interacted content
        customFilters: [
          {
            _id: {
              $nin: userProfile.data.interactedDocuments || []
            }
          }
        ]
      });

      // Find similar users for collaborative filtering
      const similarUsers = await this.findSimilarUsers(userId, userEmbedding);

      // Combine content-based and collaborative filtering
      const hybridRecommendations = await this.combineRecommendationStrategies(
        recommendationSearch.results,
        similarUsers,
        userProfile.data
      );

      return {
        success: true,
        userId: userId,
        recommendations: hybridRecommendations,
        recommendationMetadata: {
          contentBasedCount: recommendationSearch.results.length,
          collaborativeSignals: similarUsers.length,
          userProfileStrength: userProfile.data.profileStrength
        }
      };

    } catch (error) {
      console.error('Error generating recommendations:', error);
      return {
        success: false,
        error: error.message
      };
    }
  }

  async buildUserProfile(userId) {
    try {
      // Aggregate user interaction data
      const userInteractionPipeline = [
        {
          $match: {
            userId: userId,
            interactionDate: {
              $gte: new Date(Date.now() - 90 * 24 * 60 * 60 * 1000) // Last 90 days
            }
          }
        },
        {
          $lookup: {
            from: "documents",
            localField: "documentId",
            foreignField: "_id",
            as: "document"
          }
        },
        {
          $unwind: "$document"
        },
        {
          $group: {
            _id: "$userId",

            // Interaction patterns
            totalInteractions: { $sum: 1 },
            categories: { $addToSet: "$document.category" },
            tags: { $addToSet: "$document.tags" },
            languages: { $addToSet: "$document.language" },

            // Preference indicators
            averageRating: { $avg: "$rating" },
            favoriteCategories: {
              $push: {
                category: "$document.category",
                rating: "$rating",
                interactionType: "$interactionType"
              }
            },

            // Content characteristics
            preferredContentLength: { $avg: { $strLenCP: "$document.content" } },
            interactionTypes: { $addToSet: "$interactionType" },

            // Temporal patterns
            interactedDocuments: { $addToSet: "$documentId" },
            recentInteractions: {
              $push: {
                documentId: "$documentId",
                rating: "$rating",
                interactionDate: "$interactionDate"
              }
            }
          }
        }
      ];

      const userProfileData = await this.collections.userInteractions
        .aggregate(userInteractionPipeline)
        .toArray();

      if (userProfileData.length === 0) {
        return {
          success: false,
          error: 'No user interaction data found'
        };
      }

      const profile = userProfileData[0];

      // Calculate profile strength
      profile.profileStrength = Math.min(
        (profile.totalInteractions / 50) * 0.4 +
        (profile.categories.length / 10) * 0.3 +
        (profile.tags.flat().length / 20) * 0.3,
        1.0
      );

      return {
        success: true,
        data: profile
      };

    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }

  async generateEmbedding(text) {
    // In production, this would call OpenAI or other embedding API
    // For demonstration, return a mock embedding
    return Array.from({ length: this.config.embeddingDimensions }, () => Math.random() - 0.5);
  }

  async generateUserPreferenceEmbedding(userProfile) {
    // In production, this would create embeddings based on user preferences
    // For demonstration, return a mock embedding based on user categories
    const categoryWeights = userProfile.favoriteCategories.reduce((acc, item) => {
      acc[item.category] = (acc[item.category] || 0) + (item.rating || 3);
      return acc;
    }, {});

    // Create weighted embedding (simplified approach)
    return Array.from({ length: this.config.embeddingDimensions }, () => Math.random() - 0.5);
  }

  async simulateResponseGeneration(queryText, contextText, contextMetrics) {
    // In production, this would call ChatGPT or another LLM
    return `Based on ${contextMetrics.documentCount} relevant documents with average relevance of ${contextMetrics.averageRelevance.toFixed(3)}, here's a comprehensive response to "${queryText}": [Generated response would appear here using the provided context from ${contextMetrics.categories.join(', ')} categories]`;
  }

  async logSearchAnalytics(searchData) {
    try {
      await this.collections.searchAnalytics.insertOne({
        ...searchData,
        createdAt: new Date()
      });
    } catch (error) {
      console.error('Error logging search analytics:', error);
    }
  }

  async logRAGExecution(ragData) {
    try {
      await this.collections.searchAnalytics.insertOne({
        type: 'rag_execution',
        ...ragData,
        createdAt: new Date()
      });
    } catch (error) {
      console.error('Error logging RAG execution:', error);
    }
  }

  async shutdown() {
    console.log('Shutting down Atlas Vector Search manager...');

    try {
      if (this.client) {
        await this.client.close();
      }
      console.log('Atlas Vector Search manager shutdown completed');
    } catch (error) {
      console.error('Error during shutdown:', error);
    }
  }
}

// Benefits of MongoDB Atlas Vector Search:
// - Native vector similarity search with automatic optimization
// - Seamless integration with existing MongoDB data and operations  
// - Advanced hybrid search combining vector similarity with traditional queries
// - Multiple similarity metrics (cosine, euclidean, dot product) in single platform
// - Automatic scaling and performance optimization for vector workloads
// - Built-in support for multi-modal embeddings and AI model integration
// - Real-time vector search with consistent results and ACID transactions
// - SQL-compatible vector operations through QueryLeaf integration
// - Enterprise-ready security, monitoring, and operational management
// - Native support for RAG pipelines and recommendation systems

module.exports = {
  AtlasVectorSearchManager
};

Understanding Atlas Vector Search Architecture

Advanced AI Integration Patterns

MongoDB Atlas Vector Search enables sophisticated AI application architectures with native vector operations:

// Enterprise AI Application Architecture with Atlas Vector Search
class EnterpriseAIVectorPlatform extends AtlasVectorSearchManager {
  constructor(connectionString, aiConfig) {
    super(connectionString, aiConfig);

    this.aiConfig = {
      ...aiConfig,
      enableMultiModalSearch: true,
      enableRealtimeRecommendations: true,
      enableSemanticAnalytics: true,
      enableContentGeneration: true,
      enableKnowledgeGraphs: true
    };

    this.setupEnterpriseAICapabilities();
  }

  async implementAdvancedAIWorkflows() {
    console.log('Implementing enterprise AI workflows...');

    const aiWorkflows = {
      // Multi-modal content processing
      multiModalProcessing: {
        textEmbeddings: true,
        imageEmbeddings: true,
        audioEmbeddings: true,
        videoEmbeddings: true
      },

      // Advanced recommendation engines
      recommendationSystems: {
        contentBasedFiltering: true,
        collaborativeFiltering: true,
        hybridRecommendations: true,
        realTimePersonalization: true
      },

      // Knowledge management and RAG
      knowledgeManagement: {
        documentRetrieval: true,
        contextGeneration: true,
        responseGeneration: true,
        knowledgeGraphIntegration: true
      },

      // Semantic search and discovery
      semanticCapabilities: {
        intentRecognition: true,
        entityExtraction: true,
        topicModeling: true,
        conceptualSearch: true
      }
    };

    return await this.deployAIWorkflows(aiWorkflows);
  }

  async setupRealtimePersonalization() {
    console.log('Setting up real-time personalization engine...');

    // Real-time user behavior processing
    const personalizationPipeline = [
      {
        $match: {
          timestamp: { $gte: new Date(Date.now() - 60000) } // Last minute
        }
      },
      {
        $lookup: {
          from: "user_profiles",
          localField: "userId", 
          foreignField: "userId",
          as: "userProfile"
        }
      },
      {
        $vectorSearch: {
          index: "vector_index_user_preferences",
          path: "userProfile.preferenceEmbedding",
          queryVector: "$behaviorEmbedding",
          numCandidates: 100,
          limit: 20
        }
      }
    ];

    return await this.deployPersonalizationEngine(personalizationPipeline);
  }
}

SQL-Style Vector Search Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB Atlas Vector Search operations:

-- QueryLeaf advanced vector search operations with SQL-familiar syntax

-- Configure Atlas Vector Search capabilities  
CONFIGURE VECTOR_SEARCH
SET provider = 'mongodb_atlas',
    default_embedding_model = 'text-embedding-ada-002',
    embedding_dimensions = 1536,
    similarity_metrics = ['cosine', 'euclidean', 'dot_product'],
    enable_hybrid_search = true,
    enable_semantic_caching = true,
    default_num_candidates = 100,
    default_search_limit = 20;

-- Create vector-optimized table for AI applications
CREATE TABLE ai_documents (
    document_id UUID PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    category VARCHAR(100),
    language VARCHAR(10) DEFAULT 'en',

    -- Vector embeddings for semantic search
    content_embedding VECTOR(1536),
    title_embedding VECTOR(1536),
    summary_embedding VECTOR(1536),

    -- Multi-modal embeddings
    image_embedding VECTOR(768),   -- CLIP embeddings
    audio_embedding VECTOR(512),   -- Audio model embeddings

    -- Metadata for hybrid search
    tags TEXT[],
    author VARCHAR(200),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Content analysis
    word_count INTEGER,
    readability_score DECIMAL(5,2),
    sentiment_score DECIMAL(3,2),

    -- Engagement metrics
    view_count BIGINT DEFAULT 0,
    like_count BIGINT DEFAULT 0,
    share_count BIGINT DEFAULT 0,
    average_rating DECIMAL(3,2),

    -- AI metadata
    embedding_model VARCHAR(100) DEFAULT 'text-embedding-ada-002',
    embedding_version VARCHAR(20) DEFAULT 'v1',
    last_embedding_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Vector search optimization
    is_published BOOLEAN DEFAULT true,
    content_type VARCHAR(50),
    priority_level INTEGER DEFAULT 1

) WITH (
    vector_indexes = [
        {
            name: 'idx_content_vector_search',
            path: 'content_embedding', 
            similarity: 'cosine',
            dimensions: 1536,
            filters: ['category', 'language', 'is_published', 'content_type']
        },
        {
            name: 'idx_title_vector_search',
            path: 'title_embedding',
            similarity: 'cosine', 
            dimensions: 1536,
            filters: ['category', 'priority_level']
        },
        {
            name: 'idx_multimodal_vector_search',
            path: ['content_embedding', 'image_embedding'],
            similarity: 'cosine',
            filters: ['content_type', 'language']
        }
    ]
);

-- Advanced semantic search with hybrid filtering
WITH semantic_search_results AS (
  SELECT 
    document_id,
    title,
    content,
    category,
    language,
    tags,
    author,
    created_at,
    word_count,
    view_count,
    average_rating,

    -- Vector similarity scoring
    VECTOR_SIMILARITY(content_embedding, $1, 'cosine') as content_similarity,
    VECTOR_SIMILARITY(title_embedding, $1, 'cosine') as title_similarity,
    VECTOR_SIMILARITY(summary_embedding, $1, 'cosine') as summary_similarity,

    -- Distance metrics for different use cases
    VECTOR_DISTANCE(content_embedding, $1, 'euclidean') as euclidean_distance,
    VECTOR_DISTANCE(content_embedding, $1, 'manhattan') as manhattan_distance,

    -- Hybrid ranking factors
    LOG(view_count + 1) as popularity_score,
    COALESCE(average_rating, 3.0) as quality_score,

    -- Temporal relevance
    EXTRACT(DAYS FROM (CURRENT_TIMESTAMP - created_at)) as days_old,
    CASE 
      WHEN created_at >= CURRENT_TIMESTAMP - INTERVAL '7 days' THEN 0.1
      WHEN created_at >= CURRENT_TIMESTAMP - INTERVAL '30 days' THEN 0.05
      ELSE 0.0
    END as recency_boost

  FROM ai_documents
  WHERE 
    -- Vector search with advanced filtering
    VECTOR_SEARCH(
      content_embedding,
      $1,  -- Query embedding vector
      similarity_metric => 'cosine',
      num_candidates => 100,
      filters => {
        'category': $2,        -- Category filter
        'language': $3,        -- Language filter  
        'is_published': true,
        'content_type': $4     -- Content type filter
      }
    )
    AND created_at >= $5       -- Date range filter
    AND word_count >= $6       -- Minimum content length
    AND (tags && $7 OR $7 IS NULL)  -- Tag overlap filter
),

enhanced_ranking AS (
  SELECT 
    ssr.*,

    -- Multi-factor ranking calculation
    (
      content_similarity * 0.4 +           -- Primary semantic similarity
      title_similarity * 0.2 +             -- Title relevance
      summary_similarity * 0.1 +           -- Summary relevance
      (popularity_score / 10.0) * 0.1 +    -- Engagement factor
      (quality_score / 5.0) * 0.1 +        -- Quality factor
      recency_boost +                       -- Temporal relevance
      CASE 
        WHEN priority_level >= 5 THEN 0.1   -- Priority boost
        ELSE 0.0 
      END
    ) as composite_relevance_score,

    -- Content analysis and categorization
    CASE 
      WHEN content_similarity >= 0.8 THEN 'highly_relevant'
      WHEN content_similarity >= 0.6 THEN 'relevant'
      WHEN content_similarity >= 0.4 THEN 'somewhat_relevant'
      ELSE 'marginally_relevant'
    END as relevance_category,

    -- Semantic clustering for diverse results
    NTILE(5) OVER (ORDER BY content_similarity DESC) as relevance_tier,

    -- Content quality indicators
    CASE 
      WHEN word_count >= 2000 AND average_rating >= 4.0 THEN 'comprehensive_high_quality'
      WHEN word_count >= 1000 AND average_rating >= 3.5 THEN 'detailed_good_quality'
      WHEN word_count >= 500 AND average_rating >= 3.0 THEN 'standard_quality'
      ELSE 'basic_content'
    END as content_quality_tier,

    -- Engagement performance metrics
    (view_count * 0.3 + like_count * 0.4 + share_count * 0.3) as engagement_score,

    -- Search result preview
    LEFT(content, 300) || CASE WHEN LENGTH(content) > 300 THEN '...' ELSE '' END as content_preview

  FROM semantic_search_results ssr
),

diversity_optimization AS (
  SELECT 
    er.*,

    -- Category diversity to prevent over-concentration
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY composite_relevance_score DESC) as category_rank,

    -- Author diversity for varied perspectives  
    ROW_NUMBER() OVER (PARTITION BY author ORDER BY composite_relevance_score DESC) as author_rank,

    -- Temporal diversity for balanced timeline coverage
    ROW_NUMBER() OVER (
      PARTITION BY DATE_TRUNC('month', created_at) 
      ORDER BY composite_relevance_score DESC
    ) as temporal_rank,

    -- Content length diversity
    CASE 
      WHEN word_count <= 500 THEN 'short'
      WHEN word_count <= 1500 THEN 'medium' 
      WHEN word_count <= 3000 THEN 'long'
      ELSE 'comprehensive'
    END as content_length_category,

    -- Similarity to previous results (prevents near-duplicates)
    LAG(content_similarity, 1) OVER (ORDER BY composite_relevance_score DESC) as prev_result_similarity

  FROM enhanced_ranking er
)

-- Final search results with comprehensive analytics
SELECT 
  document_id,
  title,
  content_preview,
  category,
  language,
  author,
  TO_CHAR(created_at, 'YYYY-MM-DD HH24:MI') as published_date,

  -- Relevance and ranking metrics
  ROUND(content_similarity::NUMERIC, 4) as semantic_similarity,
  ROUND(composite_relevance_score::NUMERIC, 4) as final_relevance_score,
  relevance_category,
  relevance_tier,

  -- Content characteristics
  word_count,
  content_quality_tier,
  content_length_category,

  -- Engagement and quality indicators
  view_count,
  average_rating,
  ROUND(engagement_score::NUMERIC, 1) as engagement_score,

  -- Diversity indicators
  category_rank,
  author_rank,
  temporal_rank,

  -- Metadata
  tags,
  ROUND(euclidean_distance::NUMERIC, 4) as euclidean_distance,
  days_old,

  -- Search quality indicators
  CASE 
    WHEN ABS(content_similarity - COALESCE(prev_result_similarity, 0)) < 0.05 THEN 'potential_duplicate'
    WHEN composite_relevance_score >= 0.8 THEN 'excellent_match'
    WHEN composite_relevance_score >= 0.6 THEN 'good_match'
    WHEN composite_relevance_score >= 0.4 THEN 'fair_match'
    ELSE 'weak_match'
  END as search_quality_assessment,

  -- Performance metadata
  CURRENT_TIMESTAMP as search_executed_at

FROM diversity_optimization
WHERE 
  -- Quality thresholds
  composite_relevance_score >= 0.3
  AND content_similarity >= 0.2

  -- Diversity constraints (ensure balanced results)
  AND category_rank <= 3        -- Max 3 results per category
  AND author_rank <= 2          -- Max 2 results per author
  AND temporal_rank <= 2        -- Max 2 results per month

ORDER BY 
  composite_relevance_score DESC,
  content_similarity DESC,
  engagement_score DESC
LIMIT 20;

-- Real-time recommendation engine with collaborative filtering
CREATE MATERIALIZED VIEW user_recommendation_profiles AS
WITH user_interaction_patterns AS (
  SELECT 
    user_id,

    -- Interaction behavior analysis
    COUNT(*) as total_interactions,
    COUNT(DISTINCT document_id) as unique_documents_viewed,
    COUNT(DISTINCT category) as categories_explored,
    AVG(interaction_rating) as average_rating,

    -- Preference extraction from interactions
    ARRAY_AGG(DISTINCT category ORDER BY COUNT(*) DESC) as preferred_categories,
    ARRAY_AGG(DISTINCT tags) as interacted_tags,

    -- Temporal patterns
    AVG(EXTRACT(HOUR FROM interaction_timestamp)) as preferred_interaction_hour,
    MODE() WITHIN GROUP (ORDER BY EXTRACT(DOW FROM interaction_timestamp)) as preferred_day_of_week,

    -- Content preferences
    AVG(word_count) as preferred_content_length,
    AVG(readability_score) as preferred_readability,

    -- Engagement patterns
    SUM(CASE WHEN interaction_type = 'like' THEN 1 ELSE 0 END) as likes_given,
    SUM(CASE WHEN interaction_type = 'share' THEN 1 ELSE 0 END) as shares_made,
    SUM(CASE WHEN interaction_type = 'bookmark' THEN 1 ELSE 0 END) as bookmarks_created

  FROM user_interactions ui
  JOIN ai_documents ad ON ui.document_id = ad.document_id  
  WHERE ui.interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '90 days'
    AND ui.interaction_rating IS NOT NULL
  GROUP BY user_id
  HAVING COUNT(*) >= 5  -- Minimum interaction threshold
),

user_preference_vectors AS (
  SELECT 
    uip.user_id,
    uip.total_interactions,
    uip.preferred_categories,
    uip.average_rating,

    -- Generate user preference embedding from interaction patterns
    VECTOR_AGGREGATE(
      ad.content_embedding,
      weights => ui.interaction_rating,
      aggregation_method => 'weighted_average'
    ) as preference_embedding,

    -- Category preference strengths
    JSONB_OBJECT_AGG(
      ad.category,
      AVG(ui.interaction_rating)
    ) as category_preference_scores,

    -- Content characteristics preferences
    uip.preferred_content_length,
    uip.preferred_readability,

    -- Profile completeness and reliability
    LEAST(uip.total_interactions / 50.0, 1.0) as profile_completeness,
    CURRENT_TIMESTAMP as profile_generated_at

  FROM user_interaction_patterns uip
  JOIN user_interactions ui ON uip.user_id = ui.user_id
  JOIN ai_documents ad ON ui.document_id = ad.document_id
  WHERE ui.interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '90 days'
  GROUP BY 
    uip.user_id, uip.total_interactions, uip.preferred_categories, 
    uip.average_rating, uip.preferred_content_length, uip.preferred_readability
);

-- Advanced recommendation generation with multiple strategies
WITH target_user_profile AS (
  SELECT * FROM user_recommendation_profiles 
  WHERE user_id = $1  -- Target user for recommendations
),

content_based_recommendations AS (
  SELECT 
    ad.document_id,
    ad.title,
    ad.category,
    ad.content_preview,
    ad.author,
    ad.created_at,
    ad.average_rating,
    ad.view_count,

    -- Content similarity to user preferences
    VECTOR_SIMILARITY(
      ad.content_embedding, 
      tup.preference_embedding, 
      'cosine'
    ) as content_similarity,

    -- Category preference alignment
    COALESCE(
      (tup.category_preference_scores->>ad.category)::NUMERIC,
      tup.average_rating
    ) as category_preference_score,

    -- Content characteristic matching
    ABS(ad.word_count - tup.preferred_content_length) / 1000.0 as length_mismatch,
    ABS(ad.readability_score - tup.preferred_readability) as readability_mismatch,

    'content_based' as recommendation_strategy

  FROM ai_documents ad
  CROSS JOIN target_user_profile tup
  WHERE ad.is_published = true
    AND ad.document_id NOT IN (
      -- Exclude already interacted content
      SELECT DISTINCT document_id 
      FROM user_interactions 
      WHERE user_id = $1
      AND interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '30 days'
    )
  ORDER BY 
    content_similarity DESC,
    category_preference_score DESC
  LIMIT 50
),

collaborative_filtering AS (
  -- Find similar users based on preference vectors
  WITH similar_users AS (
    SELECT 
      urp.user_id as similar_user_id,
      VECTOR_SIMILARITY(
        urp.preference_embedding,
        tup.preference_embedding,
        'cosine'
      ) as user_similarity,
      urp.profile_completeness

    FROM user_recommendation_profiles urp
    CROSS JOIN target_user_profile tup
    WHERE urp.user_id != tup.user_id
      AND urp.profile_completeness >= 0.3  -- Reliable profiles only
    ORDER BY user_similarity DESC
    LIMIT 20  -- Top similar users
  ),

  collaborative_recommendations AS (
    SELECT 
      ad.document_id,
      ad.title,
      ad.category,
      ad.content_preview,
      ad.author,
      ad.created_at,
      ad.average_rating,
      ad.view_count,

      -- Weighted recommendation score from similar users
      AVG(ui.interaction_rating * su.user_similarity) as collaborative_score,
      COUNT(*) as similar_user_interactions,
      'collaborative_filtering' as recommendation_strategy

    FROM similar_users su
    JOIN user_interactions ui ON su.similar_user_id = ui.user_id
    JOIN ai_documents ad ON ui.document_id = ad.document_id
    WHERE ad.is_published = true
      AND ui.interaction_rating >= 3  -- Positive interactions only
      AND ui.interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '60 days'
      AND ad.document_id NOT IN (
        -- Exclude target user's interactions
        SELECT DISTINCT document_id 
        FROM user_interactions 
        WHERE user_id = $1
      )
    GROUP BY 
      ad.document_id, ad.title, ad.category, ad.content_preview,
      ad.author, ad.created_at, ad.average_rating, ad.view_count
    HAVING COUNT(*) >= 2  -- Multiple similar users recommended
    ORDER BY collaborative_score DESC
    LIMIT 30
  )

  SELECT * FROM collaborative_recommendations
),

hybrid_recommendations AS (
  -- Combine content-based and collaborative filtering
  SELECT 
    COALESCE(cb.document_id, cf.document_id) as document_id,
    COALESCE(cb.title, cf.title) as title,
    COALESCE(cb.category, cf.category) as category,
    COALESCE(cb.content_preview, cf.content_preview) as content_preview,
    COALESCE(cb.author, cf.author) as author,
    COALESCE(cb.created_at, cf.created_at) as created_at,
    COALESCE(cb.average_rating, cf.average_rating) as average_rating,
    COALESCE(cb.view_count, cf.view_count) as view_count,

    -- Hybrid scoring
    COALESCE(cb.content_similarity, 0.0) * 0.6 +
    COALESCE(cf.collaborative_score, 0.0) * 0.4 as hybrid_score,

    cb.content_similarity,
    cf.collaborative_score,
    cf.similar_user_interactions,

    -- Recommendation diversity factors
    ROW_NUMBER() OVER (PARTITION BY COALESCE(cb.category, cf.category) ORDER BY 
      (COALESCE(cb.content_similarity, 0.0) * 0.6 + COALESCE(cf.collaborative_score, 0.0) * 0.4) DESC
    ) as category_rank,

    -- Final recommendation strategy
    CASE 
      WHEN cb.document_id IS NOT NULL AND cf.document_id IS NOT NULL THEN 'hybrid'
      WHEN cb.document_id IS NOT NULL THEN 'content_based'
      WHEN cf.document_id IS NOT NULL THEN 'collaborative'
    END as recommendation_source

  FROM content_based_recommendations cb
  FULL OUTER JOIN collaborative_filtering cf ON cb.document_id = cf.document_id
)

-- Final personalized recommendations
SELECT 
  document_id,
  title,
  content_preview,
  category,
  author,
  TO_CHAR(created_at, 'YYYY-MM-DD') as published_date,

  -- Recommendation scoring
  ROUND(hybrid_score::NUMERIC, 4) as recommendation_score,
  ROUND(content_similarity::NUMERIC, 4) as content_match,
  ROUND(collaborative_score::NUMERIC, 4) as social_signal,
  recommendation_source,

  -- Content quality indicators
  average_rating,
  view_count,

  -- Diversity indicators
  category_rank,

  -- Confidence metrics
  CASE 
    WHEN recommendation_source = 'hybrid' AND hybrid_score >= 0.7 THEN 'high_confidence'
    WHEN hybrid_score >= 0.5 THEN 'medium_confidence'
    WHEN hybrid_score >= 0.3 THEN 'low_confidence'
    ELSE 'experimental'
  END as confidence_level,

  -- Recommendation explanation
  CASE recommendation_source
    WHEN 'content_based' THEN 'Recommended based on your content preferences'
    WHEN 'collaborative' THEN 'Recommended by users with similar interests'
    WHEN 'hybrid' THEN 'Recommended based on content analysis and user behavior'
  END as recommendation_explanation,

  CURRENT_TIMESTAMP as recommended_at

FROM hybrid_recommendations
WHERE 
  hybrid_score >= 0.2  -- Minimum recommendation threshold
  AND category_rank <= 2  -- Max 2 recommendations per category for diversity
ORDER BY 
  hybrid_score DESC,
  average_rating DESC NULLS LAST
LIMIT 15;

-- RAG (Retrieval-Augmented Generation) pipeline for question answering
CREATE FUNCTION execute_rag_pipeline(
    query_text TEXT,
    context_limit INTEGER DEFAULT 5,
    similarity_threshold DECIMAL DEFAULT 0.4,
    language_preference VARCHAR DEFAULT 'en'
) RETURNS TABLE (
    context_documents JSONB,
    context_summary TEXT,
    generated_response TEXT,
    confidence_score DECIMAL,
    sources_cited INTEGER,
    processing_metadata JSONB
) AS $$
DECLARE
    query_embedding VECTOR(1536);
    context_docs JSONB := '[]'::JSONB;
    context_text TEXT := '';
    total_context_length INTEGER := 0;
    avg_relevance DECIMAL;
    processing_start_time TIMESTAMP := CURRENT_TIMESTAMP;
BEGIN
    -- Generate embedding for the query (in production, call embedding API)
    query_embedding := GENERATE_EMBEDDING(query_text);

    -- Retrieve relevant context using vector search
    WITH context_retrieval AS (
        SELECT 
            document_id,
            title,
            content,
            category,
            author,
            created_at,
            average_rating,
            VECTOR_SIMILARITY(content_embedding, query_embedding, 'cosine') as relevance_score,
            word_count

        FROM ai_documents
        WHERE VECTOR_SEARCH(
            content_embedding,
            query_embedding,
            similarity_metric => 'cosine',
            num_candidates => 50,
            filters => {
                'language': language_preference,
                'is_published': true
            }
        )
        AND VECTOR_SIMILARITY(content_embedding, query_embedding, 'cosine') >= similarity_threshold
        ORDER BY relevance_score DESC
        LIMIT context_limit
    )

    -- Build context for generation
    SELECT 
        JSONB_AGG(
            JSONB_BUILD_OBJECT(
                'document_id', document_id,
                'title', title,
                'content', LEFT(content, 1000),
                'category', category,
                'author', author,
                'relevance_score', relevance_score,
                'word_count', word_count
            ) ORDER BY relevance_score DESC
        ),
        STRING_AGG(
            title || E':\n' || LEFT(content, 800) || E'\n\n',
            '' ORDER BY relevance_score DESC
        ),
        SUM(LENGTH(content)),
        AVG(relevance_score)
    INTO context_docs, context_text, total_context_length, avg_relevance
    FROM context_retrieval;

    -- Generate response (in production, call LLM API)
    -- For demonstration, return structured information
    RETURN QUERY SELECT 
        COALESCE(context_docs, '[]'::JSONB) as context_documents,
        CONCAT(
            'Based on ', COALESCE(JSONB_ARRAY_LENGTH(context_docs), 0), 
            ' relevant documents from categories: ',
            STRING_AGG(DISTINCT category, ', ')
        ) as context_summary,
        CONCAT(
            'Generated response to "', query_text, 
            '" based on the retrieved context. Average relevance: ',
            ROUND(COALESCE(avg_relevance, 0), 3)
        ) as generated_response,
        COALESCE(avg_relevance, 0.0) as confidence_score,
        COALESCE(JSONB_ARRAY_LENGTH(context_docs), 0) as sources_cited,
        JSONB_BUILD_OBJECT(
            'processing_time_ms', EXTRACT(MILLISECONDS FROM (CURRENT_TIMESTAMP - processing_start_time)),
            'context_length', total_context_length,
            'language_used', language_preference,
            'similarity_threshold', similarity_threshold,
            'timestamp', CURRENT_TIMESTAMP
        ) as processing_metadata
    FROM (
        SELECT DISTINCT category 
        FROM JSONB_TO_RECORDSET(context_docs) AS x(category TEXT)
    ) cat;

END;
$$ LANGUAGE plpgsql;

-- Execute RAG pipeline for question answering
SELECT * FROM execute_rag_pipeline(
    'What are the best practices for implementing microservices architecture?',
    5,    -- context_limit
    0.4,  -- similarity_threshold  
    'en'  -- language_preference
);

-- QueryLeaf provides comprehensive Atlas Vector Search capabilities:
-- 1. Native SQL syntax for vector similarity operations and embedding management
-- 2. Advanced hybrid search combining vector similarity with traditional filters
-- 3. Multi-modal vector search supporting text, image, and audio embeddings
-- 4. Intelligent recommendation systems with content-based and collaborative filtering
-- 5. Production-ready RAG pipeline implementation with context optimization
-- 6. Real-time personalization based on user behavior and preference vectors
-- 7. Comprehensive analytics and performance monitoring for AI applications
-- 8. Enterprise-ready vector indexing with automatic scaling and optimization
-- 9. SQL-familiar semantic search operations accessible to traditional database teams
-- 10. Native integration with MongoDB Atlas infrastructure and security features

Best Practices for Production Vector Search Implementation

Vector Index Design and Optimization

Essential practices for effective Atlas Vector Search deployment:

  1. Embedding Strategy: Choose appropriate embedding models and dimensions based on use case requirements and performance constraints
  2. Index Configuration: Design vector indexes with optimal similarity metrics, candidate limits, and filter combinations for query patterns
  3. Hybrid Search Architecture: Implement effective combination of vector similarity with traditional database filtering for comprehensive search experiences
  4. Performance Optimization: Monitor search latency, throughput, and resource utilization while optimizing for production workloads
  5. Embedding Management: Establish versioning, caching, and update strategies for embedding vectors and model evolution
  6. Quality Assurance: Implement relevance testing, search quality metrics, and continuous improvement processes

Enterprise AI Application Architecture

Design vector search systems for enterprise-scale AI applications:

  1. Multi-Modal Integration: Support diverse content types including text, images, audio, and video with appropriate embedding strategies
  2. Real-Time Personalization: Implement dynamic user preference modeling with real-time recommendation updates
  3. Knowledge Management: Design comprehensive RAG pipelines for question answering, document retrieval, and content generation
  4. Scalability Planning: Architecture design for growing vector collections, user bases, and query volumes
  5. Security and Governance: Implement access controls, data privacy, and audit capabilities for enterprise compliance
  6. Monitoring and Analytics: Establish comprehensive observability for search performance, user satisfaction, and business impact

Conclusion

MongoDB Atlas Vector Search provides comprehensive AI-powered semantic similarity capabilities that enable sophisticated recommendation systems, knowledge management platforms, and intelligent content discovery applications through native vector operations, advanced hybrid search, and seamless integration with existing MongoDB infrastructure. The unified platform approach eliminates the complexity of managing separate vector databases while delivering enterprise-ready performance, security, and operational simplicity.

Key Atlas Vector Search benefits include:

  • Native Vector Operations: High-performance similarity search with multiple metrics integrated directly into MongoDB Atlas infrastructure
  • Hybrid Search Excellence: Sophisticated combination of semantic similarity with traditional database filtering for comprehensive search experiences
  • AI Application Ready: Built-in support for RAG pipelines, recommendation engines, and real-time personalization systems
  • Multi-Modal Capability: Native support for text, image, audio, and video embeddings within unified document structures
  • Enterprise Integration: Seamless integration with existing MongoDB security, monitoring, and operational infrastructure
  • SQL Accessibility: Familiar SQL-style vector operations through QueryLeaf for accessible AI application development

Whether you're building intelligent search platforms, recommendation systems, knowledge management applications, or conversational AI interfaces, MongoDB Atlas Vector Search with QueryLeaf's familiar SQL interface provides the foundation for scalable, high-performance AI applications.

QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB Atlas Vector Search operations while providing SQL-familiar syntax for semantic similarity, recommendation algorithms, and RAG pipeline construction. Advanced vector operations, hybrid search strategies, and AI application patterns are seamlessly accessible through familiar SQL constructs, making sophisticated AI development approachable for SQL-oriented development teams.

The combination of Atlas Vector Search's powerful similarity capabilities with SQL-style AI operations makes it an ideal platform for applications requiring both advanced semantic understanding and familiar database interaction patterns, ensuring your AI applications can scale efficiently while delivering intelligent, personalized user experiences.

MongoDB Geospatial Indexing and Location-Based Queries: Building High-Performance GIS Applications with Advanced Spatial Analysis and SQL-Compatible Operations

Modern location-aware applications require sophisticated geospatial data management capabilities to deliver real-time proximity searches, route optimization, geofencing, and spatial analytics at massive scale. Traditional relational databases struggle with the complex geometric calculations, multi-dimensional indexing requirements, and performance demands of location-based services, often requiring expensive third-party GIS extensions or external spatial processing systems.

MongoDB provides native geospatial indexing and query capabilities that enable applications to efficiently store, index, and query location data using industry-standard GeoJSON formats. Unlike traditional database approaches that require complex extensions or specialized spatial databases, MongoDB's built-in geospatial features deliver high-performance spatial operations, intelligent indexing strategies, and comprehensive query capabilities designed for modern mapping, logistics, and location-aware applications.

Traditional Geospatial Data Challenges

Managing location data with conventional database approaches creates significant performance, complexity, and scalability challenges:

-- Traditional PostgreSQL geospatial implementation (complex setup and limited performance)

-- Requires PostGIS extension for spatial capabilities
CREATE EXTENSION IF NOT EXISTS postgis;
CREATE EXTENSION IF NOT EXISTS postgis_topology;

-- Location-based application schema with complex spatial types
CREATE TABLE business_locations (
    business_id BIGSERIAL PRIMARY KEY,
    business_name VARCHAR(200) NOT NULL,
    category VARCHAR(100) NOT NULL,
    address TEXT NOT NULL,

    -- Complex spatial column requiring PostGIS
    location GEOMETRY(POINT, 4326) NOT NULL, -- WGS84 coordinate system
    service_area GEOMETRY(POLYGON, 4326),    -- Service boundary polygon

    -- Business metadata
    phone VARCHAR(20),
    email VARCHAR(100),
    website VARCHAR(200),
    operating_hours JSONB,
    rating NUMERIC(3,2),
    price_level INTEGER CHECK (price_level BETWEEN 1 AND 4),

    -- Operational data
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Spatial indexes (PostGIS-specific syntax)
CREATE INDEX idx_business_location_gist ON business_locations 
    USING GIST (location);
CREATE INDEX idx_business_service_area_gist ON business_locations 
    USING GIST (service_area);

-- Customer location tracking table
CREATE TABLE customer_locations (
    location_id BIGSERIAL PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    location GEOMETRY(POINT, 4326) NOT NULL,
    location_accuracy_meters NUMERIC(8,2),
    location_timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Location context
    location_type VARCHAR(20) DEFAULT 'gps', -- 'gps', 'network', 'manual'
    address_geocoded TEXT,
    city VARCHAR(100),
    state VARCHAR(50),
    country VARCHAR(50),
    postal_code VARCHAR(20)
);

CREATE INDEX idx_customer_location_gist ON customer_locations 
    USING GIST (location);
CREATE INDEX idx_customer_location_timestamp ON customer_locations 
    (customer_id, location_timestamp DESC);

-- Delivery routes and logistics
CREATE TABLE delivery_routes (
    route_id BIGSERIAL PRIMARY KEY,
    driver_id INTEGER NOT NULL,
    vehicle_id INTEGER NOT NULL,
    route_date DATE NOT NULL,

    -- Route geometry as LineString
    route_path GEOMETRY(LINESTRING, 4326),
    planned_stops GEOMETRY(MULTIPOINT, 4326),

    -- Route metrics
    estimated_distance_km NUMERIC(10,3),
    estimated_duration_minutes INTEGER,
    actual_distance_km NUMERIC(10,3),
    actual_duration_minutes INTEGER,

    -- Route status
    status VARCHAR(20) DEFAULT 'planned', -- 'planned', 'in_progress', 'completed', 'cancelled'
    started_at TIMESTAMP WITH TIME ZONE,
    completed_at TIMESTAMP WITH TIME ZONE
);

CREATE INDEX idx_delivery_route_path_gist ON delivery_routes 
    USING GIST (route_path);

-- Complex proximity search with PostGIS functions
WITH nearby_businesses AS (
    SELECT 
        bl.business_id,
        bl.business_name,
        bl.category,
        bl.address,
        bl.rating,
        bl.price_level,

        -- Spatial calculations using PostGIS functions
        ST_Distance(
            bl.location::geography, 
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326)::geography
        ) as distance_meters,

        -- Convert geometry to GeoJSON for application consumption
        ST_AsGeoJSON(bl.location) as location_geojson,

        -- Additional spatial analysis
        ST_Within(
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326),
            bl.service_area
        ) as within_service_area,

        -- Bearing calculation
        ST_Azimuth(
            bl.location,
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326)
        ) * 180 / PI() as bearing_degrees

    FROM business_locations bl
    WHERE 
        bl.is_active = true

        -- Spatial filter using bounding box for initial filtering
        AND ST_DWithin(
            bl.location::geography,
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326)::geography,
            5000  -- 5km radius
        )
),

ranked_results AS (
    SELECT 
        nb.*,

        -- Complex scoring algorithm
        (
            -- Distance component (closer is better)
            (1.0 - (distance_meters / 5000.0)) * 0.4 +

            -- Rating component
            (COALESCE(rating, 0) / 5.0) * 0.3 +

            -- Service area bonus
            CASE WHEN within_service_area THEN 0.2 ELSE 0 END +

            -- Category relevance (hardcoded for example)
            CASE 
                WHEN category = 'restaurant' THEN 0.1
                WHEN category = 'retail' THEN 0.05
                ELSE 0
            END
        ) as relevance_score,

        -- Categorize distance for user display
        CASE 
            WHEN distance_meters <= 500 THEN 'Very Close'
            WHEN distance_meters <= 1000 THEN 'Walking Distance'
            WHEN distance_meters <= 2000 THEN 'Short Drive'
            ELSE 'Moderate Distance'
        END as distance_category

    FROM nearby_businesses nb
)

SELECT 
    business_id,
    business_name,
    category,
    address,
    ROUND(distance_meters::numeric, 0) as distance_meters,
    distance_category,
    rating,
    price_level,
    ROUND(relevance_score::numeric, 3) as relevance_score,
    ROUND(bearing_degrees::numeric, 1) as bearing_from_user,
    within_service_area,
    location_geojson

FROM ranked_results
WHERE distance_meters <= 5000  -- 5km maximum distance
ORDER BY relevance_score DESC, distance_meters ASC
LIMIT 20;

-- Problems with PostGIS approach:
-- 1. Complex extension setup and maintenance requirements
-- 2. Specialized spatial syntax different from standard SQL
-- 3. Performance challenges with complex spatial calculations
-- 4. Limited integration with application development workflows
-- 5. Complex data type management and coordinate system handling
-- 6. Difficult debugging and query optimization for spatial operations
-- 7. Expensive licensing and infrastructure requirements for enterprise features
-- 8. Limited support for modern GeoJSON standards and web mapping libraries
-- 9. Complex backup and replication handling for spatial indexes
-- 10. Steep learning curve for developers without GIS background

MongoDB provides native, high-performance geospatial capabilities:

// MongoDB native geospatial operations - powerful and developer-friendly
const { MongoClient } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('geospatial_app');

// Advanced Geospatial Application Manager
class MongoGeospatialManager {
  constructor(db) {
    this.db = db;
    this.collections = new Map();
    this.spatialIndexes = new Map();
    this.geoQueryCache = new Map();
  }

  async initializeGeospatialCollections() {
    console.log('Initializing geospatial collections with spatial indexes...');

    // Business locations collection
    const businessCollection = this.db.collection('business_locations');
    await this.createBusinessLocationIndexes(businessCollection);

    // Customer tracking collection
    const customerCollection = this.db.collection('customer_locations');
    await this.createCustomerLocationIndexes(customerCollection);

    // Delivery routes collection
    const routesCollection = this.db.collection('delivery_routes');
    await this.createDeliveryRouteIndexes(routesCollection);

    // Geofences and zones collection
    const geofencesCollection = this.db.collection('geofences');
    await this.createGeofenceIndexes(geofencesCollection);

    this.collections.set('businesses', businessCollection);
    this.collections.set('customers', customerCollection);
    this.collections.set('routes', routesCollection);
    this.collections.set('geofences', geofencesCollection);

    console.log('✅ Geospatial collections initialized with optimized indexes');
    return this.collections;
  }

  async createBusinessLocationIndexes(collection) {
    console.log('Creating business location spatial indexes...');

    // 2dsphere index for location-based queries (GeoJSON format)
    await collection.createIndexes([
      {
        key: { "location": "2dsphere" },
        name: "idx_business_location_2dsphere",
        background: true
      },
      {
        key: { "service_area": "2dsphere" },
        name: "idx_business_service_area_2dsphere", 
        background: true
      },
      {
        key: { "category": 1, "location": "2dsphere" },
        name: "idx_category_location_compound",
        background: true
      },
      {
        key: { "rating": -1, "location": "2dsphere" },
        name: "idx_rating_location_compound",
        background: true
      },
      {
        key: { "price_level": 1, "category": 1, "location": "2dsphere" },
        name: "idx_price_category_location_compound",
        background: true
      }
    ]);

    console.log('✅ Business location indexes created');
  }

  async createCustomerLocationIndexes(collection) {
    console.log('Creating customer location tracking indexes...');

    await collection.createIndexes([
      {
        key: { "location": "2dsphere" },
        name: "idx_customer_location_2dsphere",
        background: true
      },
      {
        key: { "customer_id": 1, "location_timestamp": -1 },
        name: "idx_customer_timeline",
        background: true
      },
      {
        key: { "location_timestamp": -1, "location": "2dsphere" },
        name: "idx_timeline_location_compound",
        background: true
      }
    ]);

    console.log('✅ Customer location indexes created');
  }

  async createDeliveryRouteIndexes(collection) {
    console.log('Creating delivery route spatial indexes...');

    await collection.createIndexes([
      {
        key: { "route_path": "2dsphere" },
        name: "idx_route_path_2dsphere",
        background: true
      },
      {
        key: { "planned_stops": "2dsphere" },
        name: "idx_planned_stops_2dsphere",
        background: true
      },
      {
        key: { "driver_id": 1, "route_date": -1 },
        name: "idx_driver_date",
        background: true
      },
      {
        key: { "status": 1, "route_date": -1 },
        name: "idx_status_date",
        background: true
      }
    ]);

    console.log('✅ Delivery route indexes created');
  }

  async createGeofenceIndexes(collection) {
    console.log('Creating geofence spatial indexes...');

    await collection.createIndexes([
      {
        key: { "boundary": "2dsphere" },
        name: "idx_geofence_boundary_2dsphere",
        background: true
      },
      {
        key: { "fence_type": 1, "boundary": "2dsphere" },
        name: "idx_fence_type_boundary_compound",
        background: true
      }
    ]);

    console.log('✅ Geofence indexes created');
  }

  async insertBusinessLocations(businesses) {
    console.log(`Inserting ${businesses.length} business locations...`);

    const businessCollection = this.collections.get('businesses');
    const businessDocuments = businesses.map(business => ({
      business_name: business.name,
      category: business.category,
      address: business.address,

      // GeoJSON Point format for location
      location: {
        type: "Point",
        coordinates: [business.longitude, business.latitude]  // [lng, lat]
      },

      // Optional service area as GeoJSON Polygon
      service_area: business.service_radius ? this.createCirclePolygon(
        [business.longitude, business.latitude], 
        business.service_radius
      ) : null,

      // Business metadata
      contact: {
        phone: business.phone,
        email: business.email,
        website: business.website
      },

      operating_hours: business.hours || {},
      rating: business.rating || 0,
      price_level: business.price_level || 1,

      // Operational data
      is_active: business.is_active !== false,
      created_at: new Date(),
      updated_at: new Date(),

      // Additional location context
      location_metadata: {
        address_components: business.address_components || {},
        geocoding_accuracy: business.geocoding_accuracy || 'high',
        timezone: business.timezone,
        locale: business.locale || 'en-US'
      }
    }));

    const result = await businessCollection.insertMany(businessDocuments, {
      ordered: false
    });

    console.log(`✅ Inserted ${result.insertedCount} business locations`);
    return result;
  }

  async findNearbyBusinesses(userLocation, options = {}) {
    console.log(`Finding nearby businesses around [${userLocation.longitude}, ${userLocation.latitude}]...`);

    const {
      maxDistance = 5000,        // 5km default radius
      category = null,
      minRating = 0,
      priceLevel = null,
      limit = 20,
      sortBy = 'distance'        // 'distance', 'rating', 'relevance'
    } = options;

    const businessCollection = this.collections.get('businesses');
    const userPoint = [userLocation.longitude, userLocation.latitude];

    try {
      const searchPipeline = [
        // Stage 1: Geospatial proximity filter
        {
          $geoNear: {
            near: {
              type: "Point",
              coordinates: userPoint
            },
            distanceField: "distance_meters",
            maxDistance: maxDistance,
            spherical: true,
            query: {
              is_active: true,
              ...(category && { category: category }),
              ...(minRating > 0 && { rating: { $gte: minRating } }),
              ...(priceLevel && { price_level: priceLevel })
            }
          }
        },

        // Stage 2: Add computed fields for analysis
        {
          $addFields: {
            // Distance categorization
            distance_category: {
              $switch: {
                branches: [
                  { case: { $lte: ["$distance_meters", 500] }, then: "very_close" },
                  { case: { $lte: ["$distance_meters", 1000] }, then: "walking_distance" },
                  { case: { $lte: ["$distance_meters", 2000] }, then: "short_drive" },
                  { case: { $lte: ["$distance_meters", 5000] }, then: "moderate_distance" }
                ],
                default: "far"
              }
            },

            // Check if user is within business service area
            within_service_area: {
              $cond: {
                if: { $ne: ["$service_area", null] },
                then: {
                  $function: {
                    body: function(serviceArea, userPoint) {
                      // Simple point-in-polygon check (simplified for example)
                      return serviceArea != null;
                    },
                    args: ["$service_area", userPoint],
                    lang: "js"
                  }
                },
                else: false
              }
            },

            // Calculate bearing from user to business
            bearing_degrees: {
              $function: {
                body: function(businessCoords, userCoords) {
                  // Calculate bearing using geographic formulas
                  const lat1 = userCoords[1] * Math.PI / 180;
                  const lat2 = businessCoords[1] * Math.PI / 180;
                  const deltaLng = (businessCoords[0] - userCoords[0]) * Math.PI / 180;

                  const y = Math.sin(deltaLng) * Math.cos(lat2);
                  const x = Math.cos(lat1) * Math.sin(lat2) - 
                           Math.sin(lat1) * Math.cos(lat2) * Math.cos(deltaLng);

                  let bearing = Math.atan2(y, x) * 180 / Math.PI;
                  return (bearing + 360) % 360;
                },
                args: ["$location.coordinates", userPoint],
                lang: "js"
              }
            },

            // Relevance score calculation
            relevance_score: {
              $add: [
                // Distance component (closer is better) - 40% weight
                {
                  $multiply: [
                    { $subtract: [1, { $divide: ["$distance_meters", maxDistance] }] },
                    0.4
                  ]
                },

                // Rating component - 30% weight
                { $multiply: [{ $divide: [{ $ifNull: ["$rating", 0] }, 5] }, 0.3] },

                // Service area bonus - 20% weight
                { $cond: ["$within_service_area", 0.2, 0] },

                // Category relevance bonus - 10% weight
                {
                  $switch: {
                    branches: [
                      { case: { $eq: ["$category", "restaurant"] }, then: 0.1 },
                      { case: { $eq: ["$category", "retail"] }, then: 0.05 }
                    ],
                    default: 0
                  }
                }
              ]
            }
          }
        },

        // Stage 3: Add directional information
        {
          $addFields: {
            direction_compass: {
              $switch: {
                branches: [
                  { case: { $and: [{ $gte: ["$bearing_degrees", 337.5] }, { $lt: ["$bearing_degrees", 22.5] }] }, then: "N" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 22.5] }, { $lt: ["$bearing_degrees", 67.5] }] }, then: "NE" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 67.5] }, { $lt: ["$bearing_degrees", 112.5] }] }, then: "E" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 112.5] }, { $lt: ["$bearing_degrees", 157.5] }] }, then: "SE" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 157.5] }, { $lt: ["$bearing_degrees", 202.5] }] }, then: "S" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 202.5] }, { $lt: ["$bearing_degrees", 247.5] }] }, then: "SW" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 247.5] }, { $lt: ["$bearing_degrees", 292.5] }] }, then: "W" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 292.5] }, { $lt: ["$bearing_degrees", 337.5] }] }, then: "NW" }
                ],
                default: "N"
              }
            },

            // Human-readable distance
            distance_display: {
              $switch: {
                branches: [
                  { 
                    case: { $lt: ["$distance_meters", 1000] },
                    then: { $concat: [{ $toString: { $round: ["$distance_meters", 0] } }, "m"] }
                  },
                  {
                    case: { $lt: ["$distance_meters", 10000] },
                    then: { $concat: [{ $toString: { $round: [{ $divide: ["$distance_meters", 1000] }, 1] } }, "km"] }
                  }
                ],
                default: { $concat: [{ $toString: { $round: [{ $divide: ["$distance_meters", 1000] }, 0] } }, "km"] }
              }
            }
          }
        },

        // Stage 4: Sort based on user preference
        {
          $sort: sortBy === 'rating' ? { rating: -1, distance_meters: 1 } :
                 sortBy === 'relevance' ? { relevance_score: -1, distance_meters: 1 } :
                 { distance_meters: 1 }
        },

        // Stage 5: Limit results
        { $limit: limit },

        // Stage 6: Project final output
        {
          $project: {
            business_name: 1,
            category: 1,
            address: 1,
            location: 1,
            contact: 1,
            rating: 1,
            price_level: 1,

            // Calculated fields
            distance_meters: { $round: ["$distance_meters", 0] },
            distance_display: 1,
            distance_category: 1,
            bearing_degrees: { $round: ["$bearing_degrees", 1] },
            direction_compass: 1,
            relevance_score: { $round: ["$relevance_score", 3] },
            within_service_area: 1,

            // Metadata
            created_at: 1,
            location_metadata: 1
          }
        }
      ];

      const startTime = Date.now();
      const nearbyBusinesses = await businessCollection.aggregate(searchPipeline, {
        allowDiskUse: true
      }).toArray();
      const queryTime = Date.now() - startTime;

      console.log(`✅ Found ${nearbyBusinesses.length} nearby businesses in ${queryTime}ms`);

      return {
        user_location: userLocation,
        search_params: options,
        query_time_ms: queryTime,
        total_results: nearbyBusinesses.length,
        businesses: nearbyBusinesses
      };

    } catch (error) {
      console.error('Error finding nearby businesses:', error);
      throw error;
    }
  }

  async implementGeofencingSystem() {
    console.log('Setting up advanced geofencing system...');

    const geofencesCollection = this.collections.get('geofences');

    // Create various types of geofences
    const sampleGeofences = [
      {
        fence_id: 'delivery_zone_downtown',
        fence_name: 'Downtown Delivery Zone',
        fence_type: 'delivery_boundary',

        // GeoJSON Polygon for complex delivery zone
        boundary: {
          type: "Polygon",
          coordinates: [[
            [-122.4194, 37.7749],  // San Francisco downtown area
            [-122.4094, 37.7849],
            [-122.3994, 37.7849],
            [-122.3994, 37.7649],
            [-122.4194, 37.7649],
            [-122.4194, 37.7749]   // Close the polygon
          ]]
        },

        // Geofence properties
        properties: {
          delivery_fee: 2.99,
          estimated_delivery_time: 30,
          service_level: 'premium',
          operating_hours: {
            monday: { start: '08:00', end: '22:00' },
            tuesday: { start: '08:00', end: '22:00' },
            wednesday: { start: '08:00', end: '22:00' },
            thursday: { start: '08:00', end: '22:00' },
            friday: { start: '08:00', end: '23:00' },
            saturday: { start: '09:00', end: '23:00' },
            sunday: { start: '10:00', end: '21:00' }
          }
        },

        is_active: true,
        created_at: new Date()
      },

      {
        fence_id: 'high_demand_area_financial',
        fence_name: 'Financial District High Demand Zone',
        fence_type: 'pricing_zone',

        // Circular geofence using buffered point
        boundary: {
          type: "Polygon",
          coordinates: [this.createCirclePolygon([-122.4000, 37.7900], 1000).coordinates[0]]
        },

        properties: {
          surge_multiplier: 1.5,
          priority_processing: true,
          rush_hour_bonus: true
        },

        is_active: true,
        created_at: new Date()
      }
    ];

    await geofencesCollection.insertMany(sampleGeofences);
    console.log('✅ Geofencing system configured with sample zones');
  }

  async checkGeofenceEntries(location, customer_id) {
    console.log(`Checking geofence entries for customer ${customer_id}...`);

    const geofencesCollection = this.collections.get('geofences');
    const point = [location.longitude, location.latitude];

    try {
      // Find all geofences containing the location
      const containingGeofences = await geofencesCollection.find({
        is_active: true,
        boundary: {
          $geoIntersects: {
            $geometry: {
              type: "Point",
              coordinates: point
            }
          }
        }
      }).toArray();

      const geofenceEvents = [];

      for (const geofence of containingGeofences) {
        // Check if this is a new entry (simplified logic)
        const event = {
          customer_id: customer_id,
          geofence_id: geofence.fence_id,
          geofence_name: geofence.fence_name,
          fence_type: geofence.fence_type,
          event_type: 'entry',
          event_timestamp: new Date(),
          location: {
            type: "Point",
            coordinates: point
          },
          properties: geofence.properties
        };

        geofenceEvents.push(event);

        // Trigger appropriate business logic based on geofence type
        await this.handleGeofenceEvent(event);
      }

      console.log(`✅ Processed ${geofenceEvents.length} geofence events`);
      return geofenceEvents;

    } catch (error) {
      console.error('Error checking geofence entries:', error);
      throw error;
    }
  }

  async handleGeofenceEvent(event) {
    console.log(`Handling geofence event: ${event.event_type} for ${event.geofence_name}`);

    // Store geofence event
    await this.db.collection('geofence_events').insertOne(event);

    // Business logic based on geofence type
    switch (event.fence_type) {
      case 'delivery_boundary':
        await this.handleDeliveryZoneEntry(event);
        break;
      case 'pricing_zone':
        await this.handlePricingZoneEntry(event);
        break;
      default:
        console.log(`No specific handler for fence type: ${event.fence_type}`);
    }
  }

  async handleDeliveryZoneEntry(event) {
    console.log(`Customer entered delivery zone: ${event.geofence_name}`);

    // Update customer delivery preferences
    await this.db.collection('customer_profiles').updateOne(
      { customer_id: event.customer_id },
      {
        $set: {
          current_delivery_zone: event.geofence_id,
          delivery_fee: event.properties.delivery_fee,
          estimated_delivery_time: event.properties.estimated_delivery_time
        },
        $push: {
          zone_history: {
            zone_id: event.geofence_id,
            entered_at: event.event_timestamp,
            properties: event.properties
          }
        }
      },
      { upsert: true }
    );
  }

  async handlePricingZoneEntry(event) {
    console.log(`Customer entered high-demand pricing zone: ${event.geofence_name}`);

    // Apply dynamic pricing
    await this.db.collection('pricing_adjustments').insertOne({
      customer_id: event.customer_id,
      zone_id: event.geofence_id,
      surge_multiplier: event.properties.surge_multiplier,
      applied_at: event.event_timestamp,
      expires_at: new Date(Date.now() + 30 * 60 * 1000) // 30 minutes
    });
  }

  async optimizeDeliveryRoutes(deliveries, startLocation) {
    console.log(`Optimizing delivery route for ${deliveries.length} stops...`);

    const routesCollection = this.collections.get('routes');

    try {
      // Simple nearest-neighbor route optimization
      let currentLocation = startLocation;
      const optimizedRoute = [];
      const remainingDeliveries = [...deliveries];

      while (remainingDeliveries.length > 0) {
        // Find nearest delivery location
        let nearestIndex = 0;
        let shortestDistance = Number.MAX_VALUE;

        for (let i = 0; i < remainingDeliveries.length; i++) {
          const delivery = remainingDeliveries[i];
          const distance = this.calculateDistance(
            currentLocation,
            delivery.location.coordinates
          );

          if (distance < shortestDistance) {
            shortestDistance = distance;
            nearestIndex = i;
          }
        }

        // Add nearest delivery to route
        const nextDelivery = remainingDeliveries.splice(nearestIndex, 1)[0];
        optimizedRoute.push({
          ...nextDelivery,
          distance_from_previous: shortestDistance,
          estimated_travel_time: Math.ceil(shortestDistance / 30 * 60) // Assume 30 km/h average
        });

        currentLocation = nextDelivery.location.coordinates;
      }

      // Calculate total route metrics
      const totalDistance = optimizedRoute.reduce((sum, stop) => sum + stop.distance_from_previous, 0);
      const totalTime = optimizedRoute.reduce((sum, stop) => sum + stop.estimated_travel_time, 0);

      // Create route path as LineString
      const routePath = {
        type: "LineString",
        coordinates: [
          [startLocation[0], startLocation[1]], // Start point
          ...optimizedRoute.map(stop => stop.location.coordinates)
        ]
      };

      // Store optimized route
      const routeDocument = {
        route_id: `route_${Date.now()}`,
        driver_id: null, // To be assigned
        vehicle_id: null, // To be assigned
        route_date: new Date(),

        route_path: routePath,
        planned_stops: {
          type: "MultiPoint",
          coordinates: optimizedRoute.map(stop => stop.location.coordinates)
        },

        deliveries: optimizedRoute,

        metrics: {
          total_distance_km: Math.round(totalDistance / 1000 * 100) / 100,
          estimated_duration_minutes: totalTime,
          stop_count: optimizedRoute.length,
          optimization_algorithm: 'nearest_neighbor'
        },

        status: 'planned',
        created_at: new Date()
      };

      const result = await routesCollection.insertOne(routeDocument);

      console.log(`✅ Route optimized: ${optimizedRoute.length} stops, ${Math.round(totalDistance/1000*10)/10}km, ${totalTime}min`);

      return {
        route_id: result.insertedId,
        optimized_route: optimizedRoute,
        route_path: routePath,
        metrics: routeDocument.metrics
      };

    } catch (error) {
      console.error('Error optimizing delivery route:', error);
      throw error;
    }
  }

  async performSpatialAnalytics(analysisType, parameters = {}) {
    console.log(`Performing spatial analysis: ${analysisType}`);

    const businessCollection = this.collections.get('businesses');

    try {
      switch (analysisType) {
        case 'density_analysis':
          return await this.performDensityAnalysis(parameters);
        case 'coverage_analysis':
          return await this.performCoverageAnalysis(parameters);
        case 'competition_analysis':
          return await this.performCompetitionAnalysis(parameters);
        default:
          throw new Error(`Unknown analysis type: ${analysisType}`);
      }
    } catch (error) {
      console.error(`Error performing ${analysisType}:`, error);
      throw error;
    }
  }

  async performDensityAnalysis(parameters) {
    const { 
      center, 
      radius = 5000, 
      gridSize = 1000,
      category = null 
    } = parameters;

    const businessCollection = this.collections.get('businesses');

    // Create analysis grid around center point
    const densityPipeline = [
      // Find businesses within analysis area
      {
        $geoNear: {
          near: {
            type: "Point",
            coordinates: [center.longitude, center.latitude]
          },
          distanceField: "distance",
          maxDistance: radius,
          spherical: true,
          query: {
            is_active: true,
            ...(category && { category: category })
          }
        }
      },

      // Group businesses into grid cells
      {
        $group: {
          _id: {
            // Simple grid cell calculation
            grid_x: {
              $floor: {
                $divide: [
                  { $multiply: [
                    { $subtract: [{ $arrayElemAt: ["$location.coordinates", 0] }, center.longitude] },
                    111320  // Approximate meters per degree longitude
                  ]},
                  gridSize
                ]
              }
            },
            grid_y: {
              $floor: {
                $divide: [
                  { $multiply: [
                    { $subtract: [{ $arrayElemAt: ["$location.coordinates", 1] }, center.latitude] },
                    110540  // Approximate meters per degree latitude
                  ]},
                  gridSize
                ]
              }
            }
          },
          business_count: { $sum: 1 },
          avg_rating: { $avg: "$rating" },
          business_types: { $addToSet: "$category" },
          businesses: { $push: {
            name: "$business_name",
            rating: "$rating",
            location: "$location"
          }}
        }
      },

      // Calculate density metrics
      {
        $addFields: {
          density_per_km2: {
            $multiply: [
              "$business_count",
              { $divide: [1000000, { $multiply: [gridSize, gridSize] }] }
            ]
          },
          diversity_index: { $size: "$business_types" }
        }
      },

      // Sort by density
      {
        $sort: { business_count: -1 }
      }
    ];

    const densityResults = await businessCollection.aggregate(densityPipeline).toArray();

    return {
      analysis_type: 'density_analysis',
      parameters: parameters,
      grid_size_meters: gridSize,
      total_grid_cells: densityResults.length,
      density_results: densityResults
    };
  }

  // Utility methods
  createCirclePolygon(center, radiusMeters) {
    const points = 64; // Number of points in circle
    const coordinates = [];

    for (let i = 0; i <= points; i++) {
      const angle = (i * 2 * Math.PI) / points;
      const dx = radiusMeters * Math.cos(angle);
      const dy = radiusMeters * Math.sin(angle);

      // Convert meters to degrees (approximate)
      const deltaLat = dy / 110540;
      const deltaLng = dx / (111320 * Math.cos(center[1] * Math.PI / 180));

      coordinates.push([
        center[0] + deltaLng,
        center[1] + deltaLat
      ]);
    }

    return {
      type: "Polygon",
      coordinates: [coordinates]
    };
  }

  calculateDistance(point1, point2) {
    // Haversine formula for calculating distance between two points
    const R = 6371e3; // Earth's radius in meters
    const lat1 = point1[1] * Math.PI / 180;
    const lat2 = point2[1] * Math.PI / 180;
    const deltaLat = (point2[1] - point1[1]) * Math.PI / 180;
    const deltaLng = (point2[0] - point1[0]) * Math.PI / 180;

    const a = Math.sin(deltaLat/2) * Math.sin(deltaLat/2) +
              Math.cos(lat1) * Math.cos(lat2) *
              Math.sin(deltaLng/2) * Math.sin(deltaLng/2);
    const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));

    return R * c;
  }

  async generateSpatialReport() {
    console.log('Generating comprehensive spatial analytics report...');

    const report = {
      generated_at: new Date(),
      collections: {}
    };

    const collectionNames = ['business_locations', 'customer_locations', 'delivery_routes', 'geofences'];

    for (const collectionName of collectionNames) {
      try {
        const collection = this.db.collection(collectionName);

        // Get basic collection statistics
        const stats = await this.db.runCommand({ collStats: collectionName });

        // Get spatial index statistics
        const indexes = await collection.listIndexes().toArray();
        const spatialIndexes = indexes.filter(idx => 
          Object.values(idx.key || {}).includes('2dsphere') || 
          Object.values(idx.key || {}).includes('2d')
        );

        // Get document count and sample
        const documentCount = await collection.countDocuments();
        const sampleDocs = await collection.find({}).limit(3).toArray();

        report.collections[collectionName] = {
          document_count: documentCount,
          storage_size: stats.storageSize,
          avg_document_size: stats.avgObjSize,
          spatial_indexes: spatialIndexes.length,
          spatial_index_details: spatialIndexes.map(idx => ({
            name: idx.name,
            key: idx.key,
            sparse: idx.sparse || false
          })),
          sample_documents: sampleDocs.map(doc => {
            // Remove sensitive data for reporting
            const { _id, location, ...metadata } = doc;
            return { location, metadata: Object.keys(metadata) };
          })
        };
      } catch (error) {
        report.collections[collectionName] = { error: error.message };
      }
    }

    return report;
  }

  async shutdown() {
    console.log('Shutting down geospatial manager...');
    await this.client.close();
    console.log('Geospatial manager shutdown completed');
  }
}

// Export the geospatial manager
module.exports = { MongoGeospatialManager };

// MongoDB Geospatial Benefits:
// - Native GeoJSON support with industry-standard spatial data formats
// - High-performance 2dsphere indexes optimized for spherical geometry calculations
// - Comprehensive spatial query operators for proximity, intersection, and containment
// - Efficient geospatial aggregation pipelines for spatial analytics
// - Built-in support for complex geometries: Point, LineString, Polygon, MultiPolygon
// - Real-time geofencing capabilities with change streams integration
// - Seamless integration with mapping libraries and GIS applications
// - SQL-compatible spatial operations through QueryLeaf integration
// - Automatic spatial index optimization for query performance
// - Scalable architecture supporting massive location datasets

Understanding MongoDB Geospatial Architecture

Advanced Location-Based Query Patterns

MongoDB's geospatial capabilities enable sophisticated location-based application patterns:

// Advanced geospatial query patterns for real-world applications
class AdvancedGeospatialQueries {
  constructor(db) {
    this.db = db;
    this.queryCache = new Map();
  }

  async implementAdvancedSpatialQueries() {
    console.log('Demonstrating advanced geospatial query patterns...');

    // Pattern 1: Multi-criteria proximity search
    await this.multiCriteriaProximitySearch();

    // Pattern 2: Route intersection analysis
    await this.routeIntersectionAnalysis();

    // Pattern 3: Spatial clustering and heat map generation
    await this.spatialClusteringAnalysis();

    // Pattern 4: Dynamic geofence management
    await this.dynamicGeofenceManagement();

    console.log('Advanced geospatial patterns demonstrated');
  }

  async multiCriteriaProximitySearch() {
    console.log('Performing multi-criteria proximity search...');

    const businessCollection = this.db.collection('business_locations');

    // Complex search combining multiple spatial and business criteria
    const complexSearchPipeline = [
      {
        $geoNear: {
          near: {
            type: "Point",
            coordinates: [-122.4194, 37.7749] // San Francisco
          },
          distanceField: "distance_meters",
          maxDistance: 3000,
          spherical: true,
          query: {
            is_active: true,
            rating: { $gte: 4.0 }
          }
        }
      },

      // Add time-based availability filtering
      {
        $addFields: {
          is_currently_open: {
            $function: {
              body: function(operatingHours) {
                const now = new Date();
                const currentDay = ['sunday', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday'][now.getDay()];
                const currentTime = now.getHours() * 100 + now.getMinutes();

                if (!operatingHours || !operatingHours[currentDay]) {
                  return false;
                }

                const dayHours = operatingHours[currentDay];
                const startTime = parseInt(dayHours.start.replace(':', ''));
                const endTime = parseInt(dayHours.end.replace(':', ''));

                return currentTime >= startTime && currentTime <= endTime;
              },
              args: ["$operating_hours"],
              lang: "js"
            }
          }
        }
      },

      // Service area intersection check
      {
        $addFields: {
          provides_delivery_to_user: {
            $cond: {
              if: { $ne: ["$service_area", null] },
              then: {
                $function: {
                  body: function(serviceArea, userLocation) {
                    // Simplified point-in-polygon check
                    // In production, use more sophisticated algorithms
                    return serviceArea != null;
                  },
                  args: ["$service_area", [-122.4194, 37.7749]],
                  lang: "js"
                }
              },
              else: false
            }
          }
        }
      },

      // Calculate composite score
      {
        $addFields: {
          composite_score: {
            $add: [
              // Distance component (40%)
              { $multiply: [
                { $subtract: [1, { $divide: ["$distance_meters", 3000] }] },
                0.4
              ]},

              // Rating component (30%)
              { $multiply: [{ $divide: ["$rating", 5] }, 0.3] },

              // Current availability bonus (20%)
              { $cond: ["$is_currently_open", 0.2, 0] },

              // Delivery service bonus (10%)
              { $cond: ["$provides_delivery_to_user", 0.1, 0] }
            ]
          }
        }
      },

      // Filter and sort by composite score
      {
        $match: {
          composite_score: { $gte: 0.5 } // Minimum quality threshold
        }
      },

      {
        $sort: { composite_score: -1, distance_meters: 1 }
      },

      { $limit: 15 },

      {
        $project: {
          business_name: 1,
          category: 1,
          rating: 1,
          distance_meters: { $round: ["$distance_meters", 0] },
          is_currently_open: 1,
          provides_delivery_to_user: 1,
          composite_score: { $round: ["$composite_score", 3] },
          location: 1
        }
      }
    ];

    const results = await businessCollection.aggregate(complexSearchPipeline).toArray();
    console.log(`✅ Found ${results.length} businesses matching complex criteria`);
    return results;
  }

  async routeIntersectionAnalysis() {
    console.log('Analyzing route intersections with geofences...');

    const routesCollection = this.db.collection('delivery_routes');
    const geofencesCollection = this.db.collection('geofences');

    // Find routes that intersect with specific geofences
    const intersectionPipeline = [
      {
        $match: {
          status: 'in_progress',
          route_path: { $exists: true }
        }
      },

      // Lookup intersecting geofences
      {
        $lookup: {
          from: 'geofences',
          let: { route_path: '$route_path' },
          pipeline: [
            {
              $match: {
                $expr: {
                  $and: [
                    { $eq: ['$is_active', true] },
                    {
                      $function: {
                        body: function(geofenceBoundary, routePath) {
                          // Simplified intersection logic
                          // In production, use proper geometric intersection algorithms
                          return geofenceBoundary && routePath;
                        },
                        args: ['$boundary', '$$route_path'],
                        lang: 'js'
                      }
                    }
                  ]
                }
              }
            }
          ],
          as: 'intersecting_geofences'
        }
      },

      // Filter routes with intersections
      {
        $match: {
          'intersecting_geofences.0': { $exists: true }
        }
      },

      // Calculate intersection impact
      {
        $addFields: {
          intersection_analysis: {
            $map: {
              input: '$intersecting_geofences',
              as: 'geofence',
              in: {
                fence_id: '$$geofence.fence_id',
                fence_type: '$$geofence.fence_type',
                impact_type: {
                  $switch: {
                    branches: [
                      { case: { $eq: ['$$geofence.fence_type', 'pricing_zone'] }, then: 'cost_increase' },
                      { case: { $eq: ['$$geofence.fence_type', 'restricted_zone'] }, then: 'route_restriction' },
                      { case: { $eq: ['$$geofence.fence_type', 'priority_zone'] }, then: 'priority_handling' }
                    ],
                    default: 'monitoring'
                  }
                },
                properties: '$$geofence.properties'
              }
            }
          }
        }
      },

      {
        $project: {
          route_id: 1,
          driver_id: 1,
          route_date: 1,
          status: 1,
          intersection_count: { $size: '$intersecting_geofences' },
          intersection_analysis: 1,
          estimated_impact: {
            $reduce: {
              input: '$intersection_analysis',
              initialValue: { cost_multiplier: 1.0, priority_boost: 0 },
              in: {
                cost_multiplier: {
                  $cond: [
                    { $eq: ['$$this.impact_type', 'cost_increase'] },
                    { $multiply: ['$$value.cost_multiplier', '$$this.properties.surge_multiplier'] },
                    '$$value.cost_multiplier'
                  ]
                },
                priority_boost: {
                  $cond: [
                    { $eq: ['$$this.impact_type', 'priority_handling'] },
                    { $add: ['$$value.priority_boost', 1] },
                    '$$value.priority_boost'
                  ]
                }
              }
            }
          }
        }
      }
    ];

    const intersectionResults = await routesCollection.aggregate(intersectionPipeline).toArray();
    console.log(`✅ Analyzed ${intersectionResults.length} routes with geofence intersections`);
    return intersectionResults;
  }

  async spatialClusteringAnalysis() {
    console.log('Performing spatial clustering analysis...');

    const businessCollection = this.db.collection('business_locations');

    // Density-based clustering for business locations
    const clusteringPipeline = [
      {
        $match: {
          is_active: true,
          location: { $exists: true }
        }
      },

      // Create spatial grid for clustering
      {
        $addFields: {
          grid_cell: {
            x: {
              $floor: {
                $multiply: [
                  { $arrayElemAt: ['$location.coordinates', 0] },
                  1000  // Grid precision
                ]
              }
            },
            y: {
              $floor: {
                $multiply: [
                  { $arrayElemAt: ['$location.coordinates', 1] },
                  1000  // Grid precision
                ]
              }
            }
          }
        }
      },

      // Group by grid cells
      {
        $group: {
          _id: '$grid_cell',
          business_count: { $sum: 1 },
          categories: { $addToSet: '$category' },
          avg_rating: { $avg: '$rating' },
          businesses: { $push: {
            business_id: '$_id',
            business_name: '$business_name',
            category: '$category',
            location: '$location',
            rating: '$rating'
          }},

          // Calculate cluster center
          center_longitude: { $avg: { $arrayElemAt: ['$location.coordinates', 0] } },
          center_latitude: { $avg: { $arrayElemAt: ['$location.coordinates', 1] } }
        }
      },

      // Filter significant clusters
      {
        $match: {
          business_count: { $gte: 3 }  // Minimum cluster size
        }
      },

      // Add cluster analysis
      {
        $addFields: {
          cluster_center: {
            type: 'Point',
            coordinates: ['$center_longitude', '$center_latitude']
          },
          diversity_index: { $size: '$categories' },
          cluster_density: '$business_count', // Simplified density metric

          cluster_characteristics: {
            $switch: {
              branches: [
                {
                  case: { $gte: ['$business_count', 10] },
                  then: 'high_density_commercial'
                },
                {
                  case: { $and: [
                    { $gte: ['$business_count', 5] },
                    { $gte: ['$diversity_index', 4] }
                  ]},
                  then: 'diverse_business_district'
                },
                {
                  case: { $eq: [{ $size: '$categories' }, 1] },
                  then: 'specialized_cluster'
                }
              ],
              default: 'mixed_commercial'
            }
          }
        }
      },

      // Sort by cluster significance
      {
        $sort: { business_count: -1, diversity_index: -1 }
      }
    ];

    const clusterResults = await businessCollection.aggregate(clusteringPipeline).toArray();

    // Generate heat map data
    const heatMapData = clusterResults.map(cluster => ({
      lat: cluster.center_latitude,
      lng: cluster.center_longitude,
      intensity: Math.min(cluster.business_count / 10, 1), // Normalized intensity
      business_count: cluster.business_count,
      characteristics: cluster.cluster_characteristics
    }));

    console.log(`✅ Identified ${clusterResults.length} business clusters`);
    return {
      clusters: clusterResults,
      heat_map_data: heatMapData
    };
  }

  async dynamicGeofenceManagement() {
    console.log('Implementing dynamic geofence management...');

    const geofencesCollection = this.db.collection('geofences');
    const eventsCollection = this.db.collection('geofence_events');

    // Analyze geofence performance and adjust boundaries
    const performancePipeline = [
      {
        $match: {
          event_timestamp: { 
            $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) // Last 24 hours
          }
        }
      },

      // Group by geofence
      {
        $group: {
          _id: '$geofence_id',
          total_events: { $sum: 1 },
          unique_customers: { $addToSet: '$customer_id' },
          event_types: { $addToSet: '$event_type' },
          avg_dwell_time: { $avg: '$dwell_time_minutes' },

          // Collect event locations for boundary analysis
          event_locations: { $push: '$location' },

          latest_properties: { $last: '$properties' }
        }
      },

      // Calculate performance metrics
      {
        $addFields: {
          unique_customer_count: { $size: '$unique_customers' },
          event_rate_per_hour: { $divide: ['$total_events', 24] },

          // Analyze spatial distribution of events
          boundary_efficiency: {
            $function: {
              body: function(eventLocations) {
                // Simplified efficiency calculation
                // In production, analyze point distribution within geofence
                return eventLocations.length > 10 ? 0.8 : 0.6;
              },
              args: ['$event_locations'],
              lang: 'js'
            }
          }
        }
      },

      // Identify geofences needing adjustment
      {
        $addFields: {
          needs_adjustment: {
            $or: [
              { $lt: ['$boundary_efficiency', 0.7] },
              { $lt: ['$event_rate_per_hour', 1] },
              { $gt: ['$event_rate_per_hour', 20] }
            ]
          },

          adjustment_type: {
            $switch: {
              branches: [
                { 
                  case: { $lt: ['$event_rate_per_hour', 1] },
                  then: 'expand_boundary'
                },
                {
                  case: { $gt: ['$event_rate_per_hour', 20] },
                  then: 'contract_boundary'
                },
                {
                  case: { $lt: ['$boundary_efficiency', 0.7] },
                  then: 'reshape_boundary'
                }
              ],
              default: 'no_change'
            }
          }
        }
      },

      // Filter geofences that need updates
      {
        $match: {
          needs_adjustment: true
        }
      }
    ];

    const adjustmentCandidates = await eventsCollection.aggregate(performancePipeline).toArray();

    // Apply recommended adjustments
    for (const candidate of adjustmentCandidates) {
      await this.applyGeofenceAdjustment(candidate);
    }

    console.log(`✅ Analyzed ${adjustmentCandidates.length} geofences for dynamic adjustment`);
    return adjustmentCandidates;
  }

  async applyGeofenceAdjustment(adjustmentCandidate) {
    const geofencesCollection = this.db.collection('geofences');
    const geofenceId = adjustmentCandidate._id;

    console.log(`Applying ${adjustmentCandidate.adjustment_type} to geofence ${geofenceId}`);

    // Create adjustment record
    const adjustment = {
      geofence_id: geofenceId,
      adjustment_type: adjustmentCandidate.adjustment_type,
      reason: `Performance optimization - ${adjustmentCandidate.adjustment_type}`,
      applied_at: new Date(),
      previous_metrics: {
        event_rate_per_hour: adjustmentCandidate.event_rate_per_hour,
        boundary_efficiency: adjustmentCandidate.boundary_efficiency,
        unique_customer_count: adjustmentCandidate.unique_customer_count
      }
    };

    // Store adjustment history
    await this.db.collection('geofence_adjustments').insertOne(adjustment);

    // Update geofence properties based on adjustment type
    const updateDoc = {
      $set: {
        last_adjusted: new Date(),
        adjustment_history: adjustment
      }
    };

    switch (adjustmentCandidate.adjustment_type) {
      case 'expand_boundary':
        // Implement boundary expansion logic
        updateDoc.$inc = { 'properties.expansion_factor': 0.1 };
        break;
      case 'contract_boundary':
        // Implement boundary contraction logic
        updateDoc.$inc = { 'properties.contraction_factor': 0.1 };
        break;
      case 'reshape_boundary':
        // Implement boundary reshaping logic
        updateDoc.$set['properties.needs_manual_review'] = true;
        break;
    }

    await geofencesCollection.updateOne(
      { fence_id: geofenceId },
      updateDoc
    );
  }
}

// Export the advanced queries class
module.exports = { AdvancedGeospatialQueries };

SQL-Style Geospatial Operations with QueryLeaf

QueryLeaf enables familiar SQL syntax for MongoDB geospatial operations:

-- QueryLeaf geospatial operations with SQL-familiar syntax

-- Create geospatial table with spatial column
CREATE TABLE business_locations (
  business_id SERIAL PRIMARY KEY,
  business_name VARCHAR(200) NOT NULL,
  category VARCHAR(100) NOT NULL,
  address TEXT NOT NULL,
  location POINT NOT NULL,  -- GeoJSON Point stored as POINT type
  service_area POLYGON,     -- GeoJSON Polygon for service boundaries
  rating DECIMAL(3,2) DEFAULT 0,
  price_level INTEGER CHECK (price_level BETWEEN 1 AND 4),
  is_active BOOLEAN DEFAULT true,
  created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
) WITH (
  spatial_indexes = '{"location": "2dsphere", "service_area": "2dsphere"}'
);

-- Insert geospatial data using standard SQL syntax
INSERT INTO business_locations (
  business_name, category, address, location, service_area, rating, price_level
) VALUES 
  ('Downtown Cafe', 'restaurant', '123 Main St', ST_Point(-122.4194, 37.7749), ST_Buffer(ST_Point(-122.4194, 37.7749), 0.01), 4.5, 2),
  ('Tech Bookstore', 'retail', '456 Tech Ave', ST_Point(-122.4094, 37.7849), ST_Buffer(ST_Point(-122.4094, 37.7849), 0.015), 4.2, 3),
  ('Local Grocery', 'grocery', '789 Local Rd', ST_Point(-122.3994, 37.7649), ST_Buffer(ST_Point(-122.3994, 37.7649), 0.008), 3.8, 1);

-- Proximity-based queries with familiar SQL spatial functions
WITH nearby_businesses AS (
  SELECT 
    business_id,
    business_name,
    category,
    address,
    rating,
    price_level,

    -- Calculate distance using SQL spatial functions
    ST_Distance(location, ST_Point(-122.4150, 37.7750)) as distance_meters,

    -- Check if user location is within service area
    ST_Within(ST_Point(-122.4150, 37.7750), service_area) as within_service_area,

    -- Calculate bearing from user to business
    ST_Azimuth(ST_Point(-122.4150, 37.7750), location) * 180 / PI() as bearing_degrees,

    -- Convert geometry to GeoJSON for application use
    ST_AsGeoJSON(location) as location_geojson

  FROM business_locations
  WHERE 
    is_active = true

    -- Spatial proximity filter (5km radius)
    AND ST_DWithin(location, ST_Point(-122.4150, 37.7750), 5000)
),

scored_results AS (
  SELECT 
    nb.*,

    -- Multi-criteria scoring algorithm
    (
      -- Distance component (40% weight) - closer is better
      (1.0 - (distance_meters / 5000.0)) * 0.4 +

      -- Rating component (30% weight)
      (rating / 5.0) * 0.3 +

      -- Service area coverage bonus (20% weight)
      CASE WHEN within_service_area THEN 0.2 ELSE 0 END +

      -- Category preference bonus (10% weight)
      CASE 
        WHEN category = 'restaurant' THEN 0.1
        WHEN category = 'grocery' THEN 0.05
        ELSE 0
      END
    ) as relevance_score,

    -- Categorize distance for user-friendly display
    CASE 
      WHEN distance_meters <= 500 THEN 'Very Close'
      WHEN distance_meters <= 1000 THEN 'Walking Distance'
      WHEN distance_meters <= 2000 THEN 'Short Drive'
      WHEN distance_meters <= 5000 THEN 'Moderate Distance'
      ELSE 'Far'
    END as distance_category,

    -- Convert bearing to compass direction
    CASE 
      WHEN bearing_degrees >= 337.5 OR bearing_degrees < 22.5 THEN 'North'
      WHEN bearing_degrees >= 22.5 AND bearing_degrees < 67.5 THEN 'Northeast'
      WHEN bearing_degrees >= 67.5 AND bearing_degrees < 112.5 THEN 'East'
      WHEN bearing_degrees >= 112.5 AND bearing_degrees < 157.5 THEN 'Southeast'
      WHEN bearing_degrees >= 157.5 AND bearing_degrees < 202.5 THEN 'South'
      WHEN bearing_degrees >= 202.5 AND bearing_degrees < 247.5 THEN 'Southwest'
      WHEN bearing_degrees >= 247.5 AND bearing_degrees < 292.5 THEN 'West'
      ELSE 'Northwest'
    END as direction_compass

  FROM nearby_businesses nb
)

SELECT 
  business_id,
  business_name,
  category,
  address,
  ROUND(distance_meters::NUMERIC, 0) as distance_meters,
  distance_category,
  direction_compass,
  ROUND(bearing_degrees::NUMERIC, 1) as bearing_degrees,
  rating,
  price_level,
  within_service_area,
  ROUND(relevance_score::NUMERIC, 3) as relevance_score,
  location_geojson

FROM scored_results
WHERE 
  distance_meters <= 5000  -- 5km maximum distance
  AND relevance_score >= 0.3  -- Minimum relevance threshold

ORDER BY relevance_score DESC, distance_meters ASC
LIMIT 20;

-- Geofencing and spatial containment analysis
CREATE TABLE geofences (
  fence_id VARCHAR(50) PRIMARY KEY,
  fence_name VARCHAR(200) NOT NULL,
  fence_type VARCHAR(50) NOT NULL,  -- 'delivery_zone', 'pricing_zone', 'restricted_area'
  boundary POLYGON NOT NULL,
  properties JSONB DEFAULT '{}',
  is_active BOOLEAN DEFAULT true,
  created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
) WITH (
  spatial_indexes = '{"boundary": "2dsphere"}'
);

-- Insert geofence boundaries
INSERT INTO geofences (fence_id, fence_name, fence_type, boundary, properties) VALUES 
  ('downtown_delivery', 'Downtown Delivery Zone', 'delivery_zone', 
   ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-122.42,-37.77],[-122.40,-37.78],[-122.39,-37.76],[-122.42,-37.77]]]}'),
   '{"delivery_fee": 2.99, "estimated_time": 30}'),
  ('high_demand_pricing', 'Financial District Surge Zone', 'pricing_zone',
   ST_Buffer(ST_Point(-122.4000, 37.7900), 0.01),
   '{"surge_multiplier": 1.5, "peak_hours": ["08:00-10:00", "17:00-19:00"]}');

-- Check which geofences contain a specific location
WITH location_analysis AS (
  SELECT 
    ST_Point(-122.4100, 37.7800) as user_location
),

geofence_containment AS (
  SELECT 
    gf.fence_id,
    gf.fence_name,
    gf.fence_type,
    gf.properties,

    -- Check if user location is within geofence
    ST_Within(la.user_location, gf.boundary) as user_inside_fence,

    -- Calculate distance to geofence boundary
    ST_Distance(la.user_location, ST_Boundary(gf.boundary)) as distance_to_boundary,

    -- Calculate area of geofence
    ST_Area(gf.boundary) as fence_area_sq_degrees

  FROM geofences gf
  CROSS JOIN location_analysis la
  WHERE gf.is_active = true
)

SELECT 
  fence_id,
  fence_name,
  fence_type,
  user_inside_fence,
  CASE 
    WHEN user_inside_fence THEN 'Inside geofence'
    WHEN distance_to_boundary <= 0.001 THEN 'Near boundary'
    ELSE 'Outside geofence'
  END as proximity_status,
  ROUND(distance_to_boundary::NUMERIC * 111000, 0) as distance_to_boundary_meters,
  properties

FROM geofence_containment
WHERE 
  user_inside_fence = true 
  OR distance_to_boundary <= 0.005  -- Within ~500m of boundary

ORDER BY distance_to_boundary ASC;

-- Route optimization and path analysis
CREATE TABLE delivery_routes (
  route_id VARCHAR(50) PRIMARY KEY,
  driver_id INTEGER NOT NULL,
  route_date DATE NOT NULL,
  route_path LINESTRING NOT NULL,  -- Path as LineString geometry
  planned_stops MULTIPOINT NOT NULL,  -- Stop locations as MultiPoint
  total_distance_km DECIMAL(10,3),
  estimated_duration_minutes INTEGER,
  status VARCHAR(20) DEFAULT 'planned',
  created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
) WITH (
  spatial_indexes = '{"route_path": "2dsphere", "planned_stops": "2dsphere"}'
);

-- Analyze route intersections with geofences
WITH route_geofence_analysis AS (
  SELECT 
    dr.route_id,
    dr.driver_id,
    dr.route_date,
    dr.status,

    -- Find intersecting geofences
    ARRAY_AGG(
      CASE 
        WHEN ST_Intersects(dr.route_path, gf.boundary) 
        THEN JSON_BUILD_OBJECT(
          'fence_id', gf.fence_id,
          'fence_type', gf.fence_type,
          'properties', gf.properties
        )
        ELSE NULL
      END
    ) FILTER (WHERE ST_Intersects(dr.route_path, gf.boundary)) as intersecting_geofences,

    -- Calculate route metrics
    ST_Length(dr.route_path) * 111000 as route_length_meters,  -- Convert to meters
    ST_NumPoints(dr.planned_stops) as stop_count,

    -- Check if route passes through restricted areas
    BOOL_OR(
      CASE 
        WHEN gf.fence_type = 'restricted_area' AND ST_Intersects(dr.route_path, gf.boundary)
        THEN true 
        ELSE false 
      END
    ) as passes_through_restricted_area

  FROM delivery_routes dr
  LEFT JOIN geofences gf ON ST_Intersects(dr.route_path, gf.boundary)
  WHERE 
    dr.route_date = CURRENT_DATE
    AND dr.status IN ('planned', 'in_progress')
  GROUP BY dr.route_id, dr.driver_id, dr.route_date, dr.status, dr.route_path, dr.planned_stops
),

route_impact_analysis AS (
  SELECT 
    rga.*,

    -- Calculate impact of geofence intersections
    CASE 
      WHEN passes_through_restricted_area THEN 'Route requires rerouting'
      WHEN ARRAY_LENGTH(intersecting_geofences, 1) > 0 THEN 'Route has cost/time implications'
      ELSE 'Route clear'
    END as route_status,

    -- Estimate cost impact
    COALESCE(
      (
        SELECT SUM(
          CASE 
            WHEN fence->>'fence_type' = 'pricing_zone' 
            THEN (fence->>'properties'->>'surge_multiplier')::NUMERIC - 1
            ELSE 0
          END
        )
        FROM UNNEST(intersecting_geofences) as fence
      ), 0
    ) as estimated_cost_increase_multiplier

  FROM route_geofence_analysis rga
)

SELECT 
  route_id,
  driver_id,
  route_date,
  status,
  ROUND(route_length_meters::NUMERIC, 0) as route_length_meters,
  stop_count,
  route_status,
  passes_through_restricted_area,
  ARRAY_LENGTH(intersecting_geofences, 1) as geofence_intersection_count,
  ROUND(estimated_cost_increase_multiplier::NUMERIC, 2) as cost_multiplier,
  intersecting_geofences

FROM route_impact_analysis
ORDER BY 
  passes_through_restricted_area DESC,
  estimated_cost_increase_multiplier DESC,
  route_length_meters ASC;

-- Spatial analytics and density analysis
CREATE VIEW business_density_analysis AS
WITH spatial_grid AS (
  -- Create analysis grid for density calculation
  SELECT 
    grid_x,
    grid_y,
    ST_MakeBox2D(
      ST_Point(grid_x * 0.01 - 122.5, grid_y * 0.01 + 37.7),
      ST_Point((grid_x + 1) * 0.01 - 122.5, (grid_y + 1) * 0.01 + 37.7)
    ) as grid_cell
  FROM 
    GENERATE_SERIES(0, 50) as grid_x,
    GENERATE_SERIES(0, 50) as grid_y
),

grid_business_counts AS (
  SELECT 
    sg.grid_x,
    sg.grid_y,
    sg.grid_cell,

    -- Count businesses in each grid cell
    COUNT(bl.business_id) as business_count,
    ARRAY_AGG(bl.category) as categories,
    AVG(bl.rating) as avg_rating,

    -- Calculate grid cell center point
    ST_Centroid(sg.grid_cell) as cell_center

  FROM spatial_grid sg
  LEFT JOIN business_locations bl ON ST_Within(bl.location, sg.grid_cell)
  WHERE bl.is_active = true OR bl.business_id IS NULL
  GROUP BY sg.grid_x, sg.grid_y, sg.grid_cell
),

density_analysis AS (
  SELECT 
    gbc.*,

    -- Calculate density metrics
    business_count * 100.0 as businesses_per_km2,  -- Approximate conversion
    ARRAY_LENGTH(ARRAY_REMOVE(categories, NULL), 1) as category_diversity,

    -- Classify density level
    CASE 
      WHEN business_count >= 10 THEN 'high_density'
      WHEN business_count >= 5 THEN 'medium_density'
      WHEN business_count >= 1 THEN 'low_density'
      ELSE 'no_businesses'
    END as density_classification,

    -- Generate GeoJSON for mapping
    ST_AsGeoJSON(cell_center) as center_geojson,
    ST_AsGeoJSON(grid_cell) as cell_boundary_geojson

  FROM grid_business_counts gbc
  WHERE business_count > 0  -- Only include cells with businesses
)

SELECT 
  grid_x,
  grid_y,
  business_count,
  ROUND(businesses_per_km2::NUMERIC, 1) as businesses_per_km2,
  category_diversity,
  density_classification,
  ROUND(avg_rating::NUMERIC, 2) as avg_rating,
  categories,
  center_geojson,
  cell_boundary_geojson

FROM density_analysis
ORDER BY business_count DESC, category_diversity DESC;

-- QueryLeaf provides comprehensive geospatial capabilities:
-- 1. Standard SQL spatial data types (POINT, POLYGON, LINESTRING)
-- 2. Familiar spatial functions (ST_Distance, ST_Within, ST_Buffer, etc.)
-- 3. Geospatial indexing with MongoDB's 2dsphere indexes
-- 4. Complex proximity searches with multi-criteria scoring
-- 5. Geofencing and spatial containment analysis
-- 6. Route optimization and intersection analysis
-- 7. Spatial analytics and density calculations
-- 8. Integration with GeoJSON for web mapping libraries
-- 9. Performance-optimized spatial queries
-- 10. Seamless conversion between SQL spatial syntax and MongoDB operations

Best Practices for Geospatial Implementation

Collection Design and Index Optimization

Essential practices for production geospatial deployments:

  1. Coordinate System: Use WGS84 (EPSG:4326) coordinate system for global compatibility
  2. GeoJSON Standards: Store location data in standard GeoJSON format for interoperability
  3. Index Strategy: Create 2dsphere indexes on location fields for optimal query performance
  4. Compound Indexes: Combine spatial indexes with business logic fields for efficient filtering
  5. Data Validation: Implement proper validation for coordinate ranges and geometry types
  6. Precision Management: Choose appropriate precision levels for coordinate storage and calculations

Performance and Scalability

Optimize geospatial operations for high-throughput location-based applications:

  1. Query Optimization: Use $geoNear for proximity searches with distance-based sorting
  2. Bounding Box Filtering: Apply initial bounding box filters before complex spatial calculations
  3. Aggregation Pipelines: Leverage aggregation frameworks for complex spatial analytics
  4. Caching Strategies: Implement intelligent caching for frequently accessed location data
  5. Data Modeling: Design schemas that align with common geospatial query patterns
  6. Sharding Considerations: Plan geospatial sharding strategies for global applications

Conclusion

MongoDB's native geospatial capabilities provide comprehensive location-based application development features that eliminate the complexity and overhead of traditional GIS database approaches. The combination of efficient spatial indexing, sophisticated query operators, and seamless GeoJSON integration enables high-performance location-aware applications that scale effectively with growing user bases and data volumes.

Key MongoDB Geospatial benefits include:

  • Native GeoJSON Support: Industry-standard spatial data formats with seamless web integration
  • High-Performance Indexing: 2dsphere indexes optimized for spherical geometry calculations
  • Comprehensive Query Operators: Complete set of spatial operations for proximity, intersection, and containment
  • Scalable Architecture: Efficient handling of massive location datasets with intelligent partitioning
  • Real-time Capabilities: Change streams enable immediate geofence and location event processing
  • SQL Compatibility: Familiar spatial query patterns for existing SQL development teams

Whether you're building ride-sharing platforms, delivery logistics systems, real estate applications, location-based social networks, or any geospatial application requiring sophisticated spatial analysis, MongoDB's geospatial features with QueryLeaf's SQL-familiar interface provide the foundation for modern location-based services that remain both powerful and approachable for traditional SQL development teams.

QueryLeaf Integration: QueryLeaf automatically leverages MongoDB's geospatial capabilities while providing familiar SQL spatial functions and syntax. Complex proximity searches, geofencing operations, and spatial analytics are seamlessly accessible through standard SQL spatial constructs, making sophisticated geospatial development both efficient and maintainable for SQL-oriented development teams.

The integration of enterprise-grade geospatial capabilities with SQL-style operations makes MongoDB an ideal platform for location-based applications that require both high-performance spatial processing and familiar development patterns, ensuring your geospatial solutions remain both effective and maintainable as they scale to global deployments.