MongoDB Data Archiving and Lifecycle Management: SQL-Style Data Retention with Automated Cleanup and Tiered Storage
As applications mature and data volumes grow exponentially, effective data lifecycle management becomes critical for maintaining database performance, controlling storage costs, and meeting compliance requirements. Without proper archiving strategies, databases can become bloated with historical data that's rarely accessed but continues to impact query performance and storage costs.
MongoDB's flexible document model and rich aggregation framework provide powerful tools for implementing sophisticated data archiving and lifecycle management strategies. Combined with SQL-familiar data retention patterns, MongoDB enables automated data lifecycle policies that maintain optimal performance while preserving historical data when needed.
The Data Lifecycle Challenge
Traditional approaches to data management often lack systematic lifecycle policies:
-- Traditional approach - no lifecycle management
-- Production table grows indefinitely
CREATE TABLE user_activities (
activity_id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
activity_type VARCHAR(50),
activity_data JSON,
ip_address INET,
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Problems without lifecycle management:
-- - Table size grows indefinitely impacting performance
-- - Old data consumes expensive primary storage
-- - Backup and maintenance operations slow down
-- - Compliance requirements for data retention not met
-- - Query performance degrades over time
-- - No automated cleanup processes
-- Manual archival attempts are error-prone
-- Copy old data (risky operation)
INSERT INTO user_activities_archive
SELECT * FROM user_activities
WHERE created_at < CURRENT_DATE - INTERVAL '2 years';
-- Delete old data (point of no return)
DELETE FROM user_activities
WHERE created_at < CURRENT_DATE - INTERVAL '2 years';
-- Issues:
-- - Manual process prone to errors
-- - Risk of data loss during transfer
-- - No validation of archive integrity
-- - Downtime required for large operations
-- - No rollback capability
MongoDB with automated lifecycle management provides systematic solutions:
// MongoDB automated data lifecycle management
const lifecycleManager = new DataLifecycleManager(db);
// Define data retention policies
const retentionPolicies = [
{
collection: 'user_activities',
stages: [
{
name: 'hot',
duration: '90d',
storage_class: 'primary',
indexes: 'full'
},
{
name: 'warm',
duration: '1y',
storage_class: 'secondary',
indexes: 'minimal'
},
{
name: 'cold',
duration: '7y',
storage_class: 'archive',
indexes: 'none'
},
{
name: 'purge',
duration: null, // Delete after 7 years
action: 'delete'
}
],
criteria: {
date_field: 'created_at',
partition_field: 'user_id'
}
}
];
// Automated lifecycle execution
await lifecycleManager.executeRetentionPolicies(retentionPolicies);
// Benefits:
// - Automated data transitions between storage tiers
// - Performance optimization through data temperature management
// - Cost optimization with appropriate storage classes
// - Compliance through systematic retention policies
// - Data integrity validation during transitions
// - Rollback capabilities for each transition stage
Understanding Data Lifecycle Management
Data Temperature and Tiered Storage
Implement data temperature-based storage strategies:
// Data temperature classification and tiered storage
class DataTemperatureManager {
constructor(db) {
this.db = db;
this.temperatureConfig = {
hot: {
maxAge: 90, // days
storageClass: 'primary',
compressionLevel: 'fast',
indexStrategy: 'full',
replicationFactor: 3
},
warm: {
maxAge: 365, // 1 year
storageClass: 'secondary',
compressionLevel: 'standard',
indexStrategy: 'essential',
replicationFactor: 2
},
cold: {
maxAge: 2555, // 7 years
storageClass: 'archive',
compressionLevel: 'maximum',
indexStrategy: 'minimal',
replicationFactor: 1
}
};
}
async classifyDataTemperature(collection, options = {}) {
const pipeline = [
{
$addFields: {
age_days: {
$divide: [
{ $subtract: [new Date(), `$${options.dateField || 'created_at'}`] },
86400000 // milliseconds per day
]
}
}
},
{
$addFields: {
data_temperature: {
$switch: {
branches: [
{
case: { $lte: ['$age_days', this.temperatureConfig.hot.maxAge] },
then: 'hot'
},
{
case: { $lte: ['$age_days', this.temperatureConfig.warm.maxAge] },
then: 'warm'
},
{
case: { $lte: ['$age_days', this.temperatureConfig.cold.maxAge] },
then: 'cold'
}
],
default: 'expired'
}
}
}
},
{
$group: {
_id: '$data_temperature',
document_count: { $sum: 1 },
avg_size_bytes: { $avg: { $bsonSize: '$$ROOT' } },
total_size_bytes: { $sum: { $bsonSize: '$$ROOT' } },
oldest_document: { $min: `$${options.dateField || 'created_at'}` },
newest_document: { $max: `$${options.dateField || 'created_at'}` },
sample_ids: { $push: { $limit: [{ $slice: ['$$ROOT', 5] }, 1] } }
}
},
{
$project: {
temperature: '$_id',
document_count: 1,
avg_size_kb: { $round: [{ $divide: ['$avg_size_bytes', 1024] }, 2] },
total_size_mb: { $round: [{ $divide: ['$total_size_bytes', 1048576] }, 2] },
date_range: {
oldest: '$oldest_document',
newest: '$newest_document'
},
storage_recommendation: {
$switch: {
branches: [
{ case: { $eq: ['$_id', 'hot'] }, then: this.temperatureConfig.hot },
{ case: { $eq: ['$_id', 'warm'] }, then: this.temperatureConfig.warm },
{ case: { $eq: ['$_id', 'cold'] }, then: this.temperatureConfig.cold }
],
default: { action: 'archive_or_delete' }
}
},
_id: 0
}
},
{
$sort: {
document_count: -1
}
}
];
return await this.db.collection(collection).aggregate(pipeline).toArray();
}
async implementTieredStorage(collection, temperatureAnalysis) {
const results = [];
for (const tier of temperatureAnalysis) {
const { temperature, storage_recommendation } = tier;
if (temperature === 'expired') {
// Handle expired data according to retention policy
const result = await this.handleExpiredData(collection, tier);
results.push(result);
continue;
}
const targetCollection = this.getTargetCollection(collection, temperature);
// Move data to appropriate tier
const migrationResult = await this.migrateToTier(
collection,
targetCollection,
temperature,
storage_recommendation
);
results.push({
temperature,
collection: targetCollection,
migration_result: migrationResult
});
}
return results;
}
async migrateToTier(sourceCollection, targetCollection, temperature, config) {
const session = this.db.client.startSession();
try {
const result = await session.withTransaction(async () => {
// Find documents matching temperature criteria
const ageThreshold = new Date();
ageThreshold.setDate(ageThreshold.getDate() - this.getMaxAge(temperature));
const documentsToMigrate = await this.db.collection(sourceCollection).find({
data_temperature: temperature,
migrated: { $ne: true }
}, { session }).toArray();
if (documentsToMigrate.length === 0) {
return { migrated: 0, message: 'No documents to migrate' };
}
// Prepare documents for target tier
const processedDocs = documentsToMigrate.map(doc => ({
...doc,
migrated_at: new Date(),
storage_tier: temperature,
compression_applied: config.compressionLevel,
source_collection: sourceCollection
}));
// Insert into target collection with appropriate settings
await this.db.collection(targetCollection).insertMany(processedDocs, {
session,
writeConcern: { w: config.replicationFactor, j: true }
});
// Mark original documents as migrated
const migratedIds = documentsToMigrate.map(doc => doc._id);
await this.db.collection(sourceCollection).updateMany(
{ _id: { $in: migratedIds } },
{
$set: {
migrated: true,
migrated_to: targetCollection,
migrated_at: new Date()
}
},
{ session }
);
// Update indexes for new tier
await this.updateIndexesForTier(targetCollection, config.indexStrategy);
return {
migrated: documentsToMigrate.length,
target_collection: targetCollection,
storage_tier: temperature
};
});
return result;
} catch (error) {
throw new Error(`Migration failed for ${temperature} tier: ${error.message}`);
} finally {
await session.endSession();
}
}
async updateIndexesForTier(collection, indexStrategy) {
const targetCollection = this.db.collection(collection);
// Drop existing indexes (except _id)
const existingIndexes = await targetCollection.indexes();
for (const index of existingIndexes) {
if (index.name !== '_id_') {
await targetCollection.dropIndex(index.name);
}
}
// Apply tier-appropriate indexes
switch (indexStrategy) {
case 'full':
await targetCollection.createIndexes([
{ key: { created_at: -1 } },
{ key: { user_id: 1 } },
{ key: { activity_type: 1 } },
{ key: { user_id: 1, created_at: -1 } },
{ key: { activity_type: 1, created_at: -1 } }
]);
break;
case 'essential':
await targetCollection.createIndexes([
{ key: { created_at: -1 } },
{ key: { user_id: 1 } }
]);
break;
case 'minimal':
await targetCollection.createIndexes([
{ key: { created_at: -1 } }
]);
break;
case 'none':
// Only _id index remains
break;
}
}
getTargetCollection(baseCollection, temperature) {
return `${baseCollection}_${temperature}`;
}
getMaxAge(temperature) {
return this.temperatureConfig[temperature]?.maxAge || 0;
}
async handleExpiredData(collection, expiredTier) {
// Implement retention policy for expired data
const retentionPolicy = await this.getRetentionPolicy(collection);
switch (retentionPolicy.action) {
case 'archive':
return await this.archiveExpiredData(collection, expiredTier);
case 'delete':
return await this.deleteExpiredData(collection, expiredTier);
default:
return { action: 'no_action', expired_count: expiredTier.document_count };
}
}
}
Automated Retention Policies
Implement automated data retention with policy-driven management:
// Automated retention policy engine
class RetentionPolicyEngine {
constructor(db) {
this.db = db;
this.policies = new Map();
this.executionLog = db.collection('retention_execution_log');
}
async defineRetentionPolicy(policyConfig) {
const policy = {
policy_id: policyConfig.policy_id,
collection: policyConfig.collection,
created_at: new Date(),
// Retention rules
retention_rules: {
hot_data: {
duration: policyConfig.hot_duration || '90d',
action: 'maintain',
storage_class: 'primary',
backup_frequency: 'daily'
},
warm_data: {
duration: policyConfig.warm_duration || '1y',
action: 'archive_to_secondary',
storage_class: 'secondary',
backup_frequency: 'weekly'
},
cold_data: {
duration: policyConfig.cold_duration || '7y',
action: 'archive_to_glacier',
storage_class: 'archive',
backup_frequency: 'monthly'
},
expired_data: {
duration: policyConfig.retention_period || '10y',
action: policyConfig.expired_action || 'delete',
compliance_hold: policyConfig.compliance_hold || false
}
},
// Selection criteria
selection_criteria: {
date_field: policyConfig.date_field || 'created_at',
partition_fields: policyConfig.partition_fields || [],
exclude_conditions: policyConfig.exclude_conditions || {},
include_conditions: policyConfig.include_conditions || {}
},
// Execution settings
execution_settings: {
batch_size: policyConfig.batch_size || 1000,
max_execution_time: policyConfig.max_execution_time || 3600000, // 1 hour
dry_run: policyConfig.dry_run || false,
notification_settings: policyConfig.notifications || {}
}
};
// Store policy definition
await this.db.collection('retention_policies').updateOne(
{ policy_id: policy.policy_id },
{ $set: policy },
{ upsert: true }
);
this.policies.set(policy.policy_id, policy);
return policy;
}
async executeRetentionPolicy(policyId, options = {}) {
const policy = this.policies.get(policyId) ||
await this.db.collection('retention_policies').findOne({ policy_id: policyId });
if (!policy) {
throw new Error(`Retention policy ${policyId} not found`);
}
const executionId = `exec_${policyId}_${Date.now()}`;
const startTime = new Date();
try {
// Log execution start
await this.executionLog.insertOne({
execution_id: executionId,
policy_id: policyId,
started_at: startTime,
status: 'running',
dry_run: options.dry_run || policy.execution_settings.dry_run
});
const results = await this.processRetentionRules(policy, executionId, options);
// Log successful completion
await this.executionLog.updateOne(
{ execution_id: executionId },
{
$set: {
status: 'completed',
completed_at: new Date(),
execution_time_ms: Date.now() - startTime.getTime(),
results: results
}
}
);
return results;
} catch (error) {
// Log execution failure
await this.executionLog.updateOne(
{ execution_id: executionId },
{
$set: {
status: 'failed',
failed_at: new Date(),
error: error.message,
execution_time_ms: Date.now() - startTime.getTime()
}
}
);
throw error;
}
}
async processRetentionRules(policy, executionId, options) {
const { collection, retention_rules, selection_criteria } = policy;
const results = {};
for (const [ruleName, rule] of Object.entries(retention_rules)) {
const ruleResult = await this.executeRetentionRule(
collection,
rule,
selection_criteria,
executionId,
options
);
results[ruleName] = ruleResult;
}
return results;
}
async executeRetentionRule(collection, rule, criteria, executionId, options) {
const targetCollection = this.db.collection(collection);
const dryRun = options.dry_run || false;
// Calculate age threshold for this rule
const ageThreshold = this.calculateAgeThreshold(rule.duration);
// Build selection query
const selectionQuery = {
[criteria.date_field]: { $lt: ageThreshold },
...criteria.include_conditions
};
// Apply exclusion conditions
if (Object.keys(criteria.exclude_conditions).length > 0) {
selectionQuery.$nor = [criteria.exclude_conditions];
}
// Get affected documents count
const affectedCount = await targetCollection.countDocuments(selectionQuery);
if (dryRun) {
return {
action: rule.action,
affected_documents: affectedCount,
age_threshold: ageThreshold,
dry_run: true,
message: `Would ${rule.action} ${affectedCount} documents`
};
}
// Execute the retention action
const actionResult = await this.executeRetentionAction(
targetCollection,
rule.action,
selectionQuery,
rule,
executionId
);
return {
action: rule.action,
affected_documents: affectedCount,
processed_documents: actionResult.processed,
age_threshold: ageThreshold,
execution_time_ms: actionResult.execution_time,
details: actionResult.details
};
}
async executeRetentionAction(collection, action, query, rule, executionId) {
const startTime = Date.now();
switch (action) {
case 'maintain':
return {
processed: 0,
execution_time: Date.now() - startTime,
details: { message: 'Data maintained in current tier' }
};
case 'archive_to_secondary':
return await this.archiveToSecondary(collection, query, rule, executionId);
case 'archive_to_glacier':
return await this.archiveToGlacier(collection, query, rule, executionId);
case 'delete':
return await this.deleteExpiredDocuments(collection, query, rule, executionId);
default:
throw new Error(`Unknown retention action: ${action}`);
}
}
async archiveToSecondary(collection, query, rule, executionId) {
const session = this.db.client.startSession();
const archiveCollection = `${collection.collectionName}_archive`;
let processedCount = 0;
try {
await session.withTransaction(async () => {
const cursor = collection.find(query, { session }).batch(1000);
while (await cursor.hasNext()) {
const batch = [];
// Collect batch
for (let i = 0; i < 1000 && await cursor.hasNext(); i++) {
const doc = await cursor.next();
batch.push({
...doc,
archived_at: new Date(),
archived_by_policy: executionId,
storage_class: rule.storage_class,
original_collection: collection.collectionName
});
}
if (batch.length > 0) {
// Insert into archive collection
await this.db.collection(archiveCollection).insertMany(batch, { session });
// Mark documents as archived in original collection
const docIds = batch.map(doc => doc._id);
await collection.updateMany(
{ _id: { $in: docIds } },
{
$set: {
archived: true,
archived_at: new Date(),
archive_location: archiveCollection
}
},
{ session }
);
processedCount += batch.length;
}
}
});
return {
processed: processedCount,
execution_time: Date.now(),
details: {
archive_collection: archiveCollection,
storage_class: rule.storage_class
}
};
} finally {
await session.endSession();
}
}
async deleteExpiredDocuments(collection, query, rule, executionId) {
if (rule.compliance_hold) {
throw new Error('Cannot delete documents under compliance hold');
}
const session = this.db.client.startSession();
let deletedCount = 0;
try {
await session.withTransaction(async () => {
// Create deletion log before deleting
const docsToDelete = await collection.find(query, {
projection: { _id: 1, [this.criteria?.date_field || 'created_at']: 1 },
session
}).toArray();
if (docsToDelete.length > 0) {
// Log deletion for audit purposes
await this.db.collection('deletion_log').insertOne({
execution_id: executionId,
collection: collection.collectionName,
deleted_at: new Date(),
deleted_count: docsToDelete.length,
deletion_criteria: query,
deleted_document_ids: docsToDelete.map(d => d._id)
}, { session });
// Execute deletion
const deleteResult = await collection.deleteMany(query, { session });
deletedCount = deleteResult.deletedCount;
}
});
return {
processed: deletedCount,
execution_time: Date.now(),
details: {
action: 'permanent_deletion',
audit_logged: true
}
};
} finally {
await session.endSession();
}
}
calculateAgeThreshold(duration) {
const now = new Date();
const match = duration.match(/^(\d+)([dwmy])$/);
if (!match) {
throw new Error(`Invalid duration format: ${duration}`);
}
const [, amount, unit] = match;
const num = parseInt(amount);
switch (unit) {
case 'd': // days
return new Date(now.getTime() - (num * 24 * 60 * 60 * 1000));
case 'w': // weeks
return new Date(now.getTime() - (num * 7 * 24 * 60 * 60 * 1000));
case 'm': // months (approximate)
return new Date(now.getTime() - (num * 30 * 24 * 60 * 60 * 1000));
case 'y': // years (approximate)
return new Date(now.getTime() - (num * 365 * 24 * 60 * 60 * 1000));
default:
throw new Error(`Unknown duration unit: ${unit}`);
}
}
async scheduleRetentionExecution(policyId, schedule) {
// Store scheduled execution configuration
await this.db.collection('retention_schedules').updateOne(
{ policy_id: policyId },
{
$set: {
policy_id: policyId,
schedule: schedule, // cron format
enabled: true,
created_at: new Date(),
next_execution: this.calculateNextExecution(schedule)
}
},
{ upsert: true }
);
}
calculateNextExecution(cronExpression) {
// Basic cron parsing - in production, use a cron library
const now = new Date();
// Simplified: assume daily at specified hour
if (cronExpression.match(/^0 (\d+) \* \* \*$/)) {
const hour = parseInt(cronExpression.match(/^0 (\d+) \* \* \*$/)[1]);
const nextExecution = new Date(now);
nextExecution.setHours(hour, 0, 0, 0);
if (nextExecution <= now) {
nextExecution.setDate(nextExecution.getDate() + 1);
}
return nextExecution;
}
return new Date(now.getTime() + 24 * 60 * 60 * 1000); // Default: next day
}
}
Advanced Archival Patterns
Incremental Archival with Change Tracking
Implement incremental archival for efficient data movement:
// Incremental archival with change tracking
class IncrementalArchivalManager {
constructor(db) {
this.db = db;
this.changeTracker = db.collection('archival_change_log');
this.archivalState = db.collection('archival_state');
}
async setupIncrementalArchival(collection, archivalConfig) {
const config = {
source_collection: collection,
archive_collection: `${collection}_archive`,
incremental_field: archivalConfig.incremental_field || 'updated_at',
archival_criteria: archivalConfig.criteria || {},
batch_size: archivalConfig.batch_size || 1000,
schedule: archivalConfig.schedule || 'daily',
created_at: new Date()
};
// Initialize archival state tracking
await this.archivalState.updateOne(
{ collection: collection },
{
$set: {
...config,
last_archival_timestamp: new Date(0), // Start from beginning
total_archived: 0,
last_execution: null
}
},
{ upsert: true }
);
return config;
}
async executeIncrementalArchival(collection, options = {}) {
const state = await this.archivalState.findOne({ collection });
if (!state) {
throw new Error(`No archival configuration found for collection: ${collection}`);
}
const session = this.db.client.startSession();
const executionId = `incr_arch_${collection}_${Date.now()}`;
let archivedCount = 0;
try {
await session.withTransaction(async () => {
// Find documents modified since last archival
const query = {
[state.incremental_field]: { $gt: state.last_archival_timestamp },
...state.archival_criteria,
archived: { $ne: true }
};
const sourceCollection = this.db.collection(collection);
const cursor = sourceCollection
.find(query, { session })
.sort({ [state.incremental_field]: 1 })
.batch(state.batch_size);
let latestTimestamp = state.last_archival_timestamp;
while (await cursor.hasNext()) {
const batch = [];
for (let i = 0; i < state.batch_size && await cursor.hasNext(); i++) {
const doc = await cursor.next();
// Prepare document for archival
const archivalDoc = {
...doc,
archived_at: new Date(),
archived_by: executionId,
archive_method: 'incremental',
original_collection: collection
};
batch.push(archivalDoc);
// Track latest timestamp
if (doc[state.incremental_field] > latestTimestamp) {
latestTimestamp = doc[state.incremental_field];
}
}
if (batch.length > 0) {
// Insert into archive collection
await this.db.collection(state.archive_collection)
.insertMany(batch, { session });
// Mark originals as archived
const docIds = batch.map(doc => doc._id);
await sourceCollection.updateMany(
{ _id: { $in: docIds } },
{
$set: {
archived: true,
archived_at: new Date(),
archive_location: state.archive_collection
}
},
{ session }
);
archivedCount += batch.length;
// Log incremental change
await this.changeTracker.insertOne({
execution_id: executionId,
collection: collection,
batch_number: Math.floor(archivedCount / state.batch_size),
documents_archived: batch.length,
timestamp_range: {
start: batch[0][state.incremental_field],
end: batch[batch.length - 1][state.incremental_field]
},
processed_at: new Date()
}, { session });
}
}
// Update archival state
await this.archivalState.updateOne(
{ collection },
{
$set: {
last_archival_timestamp: latestTimestamp,
last_execution: new Date()
},
$inc: {
total_archived: archivedCount
}
},
{ session }
);
});
return {
success: true,
execution_id: executionId,
documents_archived: archivedCount,
last_timestamp: await this.archivalState.findOne(
{ collection },
{ projection: { last_archival_timestamp: 1 } }
)
};
} catch (error) {
return {
success: false,
execution_id: executionId,
error: error.message,
documents_archived: archivedCount
};
} finally {
await session.endSession();
}
}
async rollbackArchival(executionId, options = {}) {
const session = this.db.client.startSession();
try {
await session.withTransaction(async () => {
// Find all changes for this execution
const changes = await this.changeTracker
.find({ execution_id: executionId }, { session })
.toArray();
if (changes.length === 0) {
throw new Error(`No archival changes found for execution: ${executionId}`);
}
const collection = changes[0].collection;
const sourceCollection = this.db.collection(collection);
const archiveCollection = this.db.collection(`${collection}_archive`);
// Get archived documents for this execution
const archivedDocs = await archiveCollection
.find({ archived_by: executionId }, { session })
.toArray();
if (archivedDocs.length > 0) {
// Remove from archive
await archiveCollection.deleteMany(
{ archived_by: executionId },
{ session }
);
// Restore archived flag in source
const docIds = archivedDocs.map(doc => doc._id);
await sourceCollection.updateMany(
{ _id: { $in: docIds } },
{
$unset: {
archived: '',
archived_at: '',
archive_location: ''
}
},
{ session }
);
}
// Remove change tracking records
await this.changeTracker.deleteMany(
{ execution_id: executionId },
{ session }
);
// Update archival state if needed
if (options.updateState) {
await this.recalculateArchivalState(collection, session);
}
});
return { success: true, rolled_back_execution: executionId };
} catch (error) {
return { success: false, error: error.message };
} finally {
await session.endSession();
}
}
async recalculateArchivalState(collection, session) {
// Recalculate archival state from actual data
const sourceCollection = this.db.collection(collection);
const pipeline = [
{ $match: { archived: true } },
{
$group: {
_id: null,
total_archived: { $sum: 1 },
latest_archived: { $max: '$archived_at' }
}
}
];
const result = await sourceCollection.aggregate(pipeline, { session }).toArray();
const stats = result[0] || { total_archived: 0, latest_archived: new Date(0) };
await this.archivalState.updateOne(
{ collection },
{
$set: {
total_archived: stats.total_archived,
last_archival_timestamp: stats.latest_archived,
recalculated_at: new Date()
}
},
{ session, upsert: true }
);
}
}
Compliance and Audit Integration
Implement compliance-aware archival with audit trails:
// Compliance-aware archival with audit integration
class ComplianceArchivalManager {
constructor(db) {
this.db = db;
this.auditLog = db.collection('compliance_audit_log');
this.retentionPolicies = db.collection('compliance_policies');
this.legalHolds = db.collection('legal_holds');
}
async defineCompliancePolicy(policyConfig) {
const policy = {
policy_id: policyConfig.policy_id,
regulation_type: policyConfig.regulation_type, // GDPR, CCPA, SOX, HIPAA
data_classification: policyConfig.data_classification, // PII, PHI, Financial
retention_requirements: {
minimum_retention: policyConfig.minimum_retention,
maximum_retention: policyConfig.maximum_retention,
deletion_required: policyConfig.deletion_required || false
},
geographic_scope: policyConfig.geographic_scope || ['global'],
audit_requirements: {
audit_trail_required: policyConfig.audit_trail || true,
access_logging: policyConfig.access_logging || true,
deletion_approval: policyConfig.deletion_approval || false
},
created_at: new Date(),
effective_date: new Date(policyConfig.effective_date),
review_date: new Date(policyConfig.review_date)
};
await this.retentionPolicies.updateOne(
{ policy_id: policy.policy_id },
{ $set: policy },
{ upsert: true }
);
return policy;
}
async checkComplianceBeforeArchival(collection, documents, policyId) {
const policy = await this.retentionPolicies.findOne({ policy_id: policyId });
if (!policy) {
throw new Error(`Compliance policy ${policyId} not found`);
}
const complianceChecks = [];
// Check legal holds
const activeHolds = await this.legalHolds.find({
status: 'active',
collections: collection,
$or: [
{ expiry_date: { $gt: new Date() } },
{ expiry_date: null }
]
}).toArray();
if (activeHolds.length > 0) {
complianceChecks.push({
check: 'legal_hold',
status: 'blocked',
message: `Active legal holds prevent archival: ${activeHolds.map(h => h.hold_id).join(', ')}`,
holds: activeHolds
});
}
// Check minimum retention requirements
const now = new Date();
const minRetentionViolations = documents.filter(doc => {
const createdAt = doc.created_at || doc._id.getTimestamp();
const ageInDays = (now - createdAt) / (24 * 60 * 60 * 1000);
const minRetentionDays = this.parseDuration(policy.retention_requirements.minimum_retention);
return ageInDays < minRetentionDays;
});
if (minRetentionViolations.length > 0) {
complianceChecks.push({
check: 'minimum_retention',
status: 'blocked',
message: `${minRetentionViolations.length} documents violate minimum retention requirements`,
violation_count: minRetentionViolations.length
});
}
// Check maximum retention requirements
if (policy.retention_requirements.deletion_required) {
const maxRetentionDays = this.parseDuration(policy.retention_requirements.maximum_retention);
const maxRetentionViolations = documents.filter(doc => {
const createdAt = doc.created_at || doc._id.getTimestamp();
const ageInDays = (now - createdAt) / (24 * 60 * 60 * 1000);
return ageInDays > maxRetentionDays;
});
if (maxRetentionViolations.length > 0) {
complianceChecks.push({
check: 'maximum_retention',
status: 'warning',
message: `${maxRetentionViolations.length} documents exceed maximum retention and must be deleted`,
violation_count: maxRetentionViolations.length
});
}
}
const hasBlockingIssues = complianceChecks.some(check => check.status === 'blocked');
return {
compliant: !hasBlockingIssues,
checks: complianceChecks,
policy: policy,
checked_at: new Date()
};
}
async executeComplianceArchival(collection, policyId, options = {}) {
const session = this.db.client.startSession();
const executionId = `comp_arch_${collection}_${Date.now()}`;
try {
const result = await session.withTransaction(async () => {
// Get documents eligible for archival
const query = this.buildArchivalQuery(collection, policyId, options);
const documents = await this.db.collection(collection)
.find(query, { session })
.toArray();
if (documents.length === 0) {
return { documents_processed: 0, message: 'No documents eligible for archival' };
}
// Compliance check
const complianceResult = await this.checkComplianceBeforeArchival(
collection, documents, policyId
);
if (!complianceResult.compliant) {
throw new Error(`Compliance check failed: ${JSON.stringify(complianceResult.checks)}`);
}
// Log compliance check
await this.auditLog.insertOne({
execution_id: executionId,
action: 'compliance_check',
collection: collection,
policy_id: policyId,
documents_checked: documents.length,
compliance_result: complianceResult,
timestamp: new Date()
}, { session });
// Execute archival with audit trail
const archivalResult = await this.executeAuditedArchival(
collection, documents, policyId, executionId, session
);
return {
execution_id: executionId,
documents_processed: archivalResult.processed,
compliance_checks: complianceResult.checks,
archival_details: archivalResult
};
});
return { success: true, ...result };
} catch (error) {
// Log compliance violation or error
await this.auditLog.insertOne({
execution_id: executionId,
action: 'compliance_error',
collection: collection,
policy_id: policyId,
error: error.message,
timestamp: new Date()
});
return { success: false, error: error.message, execution_id: executionId };
} finally {
await session.endSession();
}
}
async executeAuditedArchival(collection, documents, policyId, executionId, session) {
const policy = await this.retentionPolicies.findOne({ policy_id: policyId });
const archiveCollection = `${collection}_archive`;
// Prepare documents with compliance metadata
const archivalDocuments = documents.map(doc => ({
...doc,
compliance_metadata: {
archived_under_policy: policyId,
regulation_type: policy.regulation_type,
data_classification: policy.data_classification,
archived_at: new Date(),
archived_by: executionId,
retention_expiry: this.calculateRetentionExpiry(doc, policy),
audit_trail_id: `audit_${doc._id}_${Date.now()}`
}
}));
// Insert into archive with compliance metadata
await this.db.collection(archiveCollection).insertMany(archivalDocuments, { session });
// Create detailed audit entries
for (const doc of documents) {
await this.auditLog.insertOne({
execution_id: executionId,
action: 'document_archived',
document_id: doc._id,
collection: collection,
archive_collection: archiveCollection,
policy_id: policyId,
data_classification: policy.data_classification,
retention_expiry: this.calculateRetentionExpiry(doc, policy),
timestamp: new Date(),
user_context: options.user_context || 'system'
}, { session });
}
// Mark original documents
const docIds = documents.map(doc => doc._id);
await this.db.collection(collection).updateMany(
{ _id: { $in: docIds } },
{
$set: {
archived: true,
archived_at: new Date(),
archive_location: archiveCollection,
compliance_policy: policyId
}
},
{ session }
);
return {
processed: documents.length,
archive_collection: archiveCollection,
audit_entries_created: documents.length + 1 // +1 for the compliance check
};
}
calculateRetentionExpiry(document, policy) {
const createdAt = document.created_at || document._id.getTimestamp();
const maxRetentionDays = this.parseDuration(policy.retention_requirements.maximum_retention);
if (maxRetentionDays > 0) {
return new Date(createdAt.getTime() + (maxRetentionDays * 24 * 60 * 60 * 1000));
}
return null; // No expiry
}
parseDuration(duration) {
const match = duration.match(/^(\d+)([dwmy])$/);
if (!match) return 0;
const [, amount, unit] = match;
const num = parseInt(amount);
switch (unit) {
case 'd': return num;
case 'w': return num * 7;
case 'm': return num * 30;
case 'y': return num * 365;
default: return 0;
}
}
}
QueryLeaf Data Lifecycle Integration
QueryLeaf provides SQL-familiar syntax for data lifecycle management:
-- QueryLeaf data lifecycle management with SQL-style syntax
-- Create automated retention policy using DDL-style syntax
CREATE RETENTION POLICY user_data_lifecycle
ON user_activities
WITH (
HOT_RETENTION = '90 days',
WARM_RETENTION = '1 year',
COLD_RETENTION = '7 years',
PURGE_AFTER = '10 years',
DATE_COLUMN = 'created_at',
PARTITION_BY = 'user_id',
COMPLIANCE_POLICY = 'GDPR'
);
-- Execute retention policy manually
EXEC APPLY_RETENTION_POLICY 'user_data_lifecycle';
-- Archive data using SQL-style syntax with temperature-based storage
WITH data_temperature AS (
SELECT
*,
DATEDIFF(day, created_at, CURRENT_DATE) as age_days,
CASE
WHEN DATEDIFF(day, created_at, CURRENT_DATE) <= 90 THEN 'hot'
WHEN DATEDIFF(day, created_at, CURRENT_DATE) <= 365 THEN 'warm'
WHEN DATEDIFF(day, created_at, CURRENT_DATE) <= 2555 THEN 'cold'
ELSE 'expired'
END as data_temperature
FROM user_activities
WHERE archived IS NULL
)
-- Move warm data to secondary storage
INSERT INTO user_activities_warm
SELECT
*,
CURRENT_TIMESTAMP as archived_at,
'secondary_storage' as storage_class
FROM data_temperature
WHERE data_temperature = 'warm';
-- QueryLeaf automatically handles:
-- 1. MongoDB collection creation for each temperature tier
-- 2. Appropriate index strategy for each tier
-- 3. Compression settings based on access patterns
-- 4. Transaction management for data migration
-- Compliance-aware data deletion with audit trail
BEGIN TRANSACTION;
-- Check for legal holds before deletion
IF EXISTS(
SELECT 1 FROM legal_holds lh
WHERE lh.collection_name = 'user_activities'
AND lh.status = 'active'
AND (lh.expiry_date IS NULL OR lh.expiry_date > CURRENT_DATE)
)
BEGIN
ROLLBACK TRANSACTION;
RAISERROR('Cannot delete data - active legal hold exists', 16, 1);
RETURN;
END
-- Log deletion for audit purposes
INSERT INTO data_deletion_audit (
collection_name,
deletion_policy,
records_affected,
deletion_criteria,
deleted_by,
deletion_timestamp
)
SELECT
'user_activities' as collection_name,
'GDPR_RIGHT_TO_ERASURE' as deletion_policy,
COUNT(*) as records_affected,
'user_id = @user_id AND created_at < @retention_cutoff' as deletion_criteria,
SYSTEM_USER as deleted_by,
CURRENT_TIMESTAMP as deletion_timestamp
FROM user_activities
WHERE user_id = @user_id
AND created_at < DATEADD(year, -7, CURRENT_DATE);
-- Execute compliant deletion
DELETE FROM user_activities
WHERE user_id = @user_id
AND created_at < DATEADD(year, -7, CURRENT_DATE);
-- Also clean up related data across collections
DELETE FROM user_sessions WHERE user_id = @user_id;
DELETE FROM user_preferences WHERE user_id = @user_id;
DELETE FROM audit_trail WHERE subject_id = @user_id AND created_at < DATEADD(year, -7, CURRENT_DATE);
COMMIT TRANSACTION;
-- Automated lifecycle management with SQL scheduling
CREATE SCHEDULE retention_job_daily
FOR PROCEDURE apply_all_retention_policies()
EXECUTE DAILY AT '02:00:00';
-- Real-time archival monitoring with window functions
WITH archival_stats AS (
SELECT
collection_name,
data_temperature,
COUNT(*) as document_count,
AVG(BSON_SIZE(document)) as avg_size_bytes,
SUM(BSON_SIZE(document)) as total_size_bytes,
MIN(created_at) as oldest_document,
MAX(created_at) as newest_document,
-- Calculate storage cost based on temperature
CASE data_temperature
WHEN 'hot' THEN SUM(BSON_SIZE(document)) * 0.10 -- $0.10 per GB
WHEN 'warm' THEN SUM(BSON_SIZE(document)) * 0.05 -- $0.05 per GB
WHEN 'cold' THEN SUM(BSON_SIZE(document)) * 0.01 -- $0.01 per GB
ELSE 0
END as estimated_monthly_cost,
-- Calculate archival urgency
CASE
WHEN data_temperature = 'hot' AND DATEDIFF(day, MAX(created_at), CURRENT_DATE) > 90
THEN 'URGENT_ARCHIVE_NEEDED'
WHEN data_temperature = 'warm' AND DATEDIFF(day, MAX(created_at), CURRENT_DATE) > 365
THEN 'ARCHIVE_RECOMMENDED'
ELSE 'NO_ACTION_NEEDED'
END as archival_recommendation
FROM (
SELECT
'user_activities' as collection_name,
CASE
WHEN DATEDIFF(day, created_at, CURRENT_DATE) <= 90 THEN 'hot'
WHEN DATEDIFF(day, created_at, CURRENT_DATE) <= 365 THEN 'warm'
WHEN DATEDIFF(day, created_at, CURRENT_DATE) <= 2555 THEN 'cold'
ELSE 'expired'
END as data_temperature,
created_at,
*
FROM user_activities
WHERE archived IS NULL
) classified_data
GROUP BY collection_name, data_temperature
),
cost_analysis AS (
SELECT
SUM(estimated_monthly_cost) as total_monthly_cost,
SUM(CASE WHEN archival_recommendation != 'NO_ACTION_NEEDED' THEN estimated_monthly_cost ELSE 0 END) as potential_savings,
COUNT(CASE WHEN archival_recommendation = 'URGENT_ARCHIVE_NEEDED' THEN 1 END) as urgent_collections
FROM archival_stats
)
SELECT
a.*,
c.total_monthly_cost,
c.potential_savings,
ROUND((c.potential_savings / c.total_monthly_cost) * 100, 2) as potential_savings_percent
FROM archival_stats a
CROSS JOIN cost_analysis c
ORDER BY
CASE archival_recommendation
WHEN 'URGENT_ARCHIVE_NEEDED' THEN 1
WHEN 'ARCHIVE_RECOMMENDED' THEN 2
ELSE 3
END,
estimated_monthly_cost DESC;
-- QueryLeaf provides:
-- 1. Automatic lifecycle policy creation and management
-- 2. Temperature-based storage tier management
-- 3. Compliance-aware retention with audit trails
-- 4. Cost optimization through intelligent archival
-- 5. SQL-familiar syntax for complex lifecycle operations
-- 6. Integration with MongoDB's native archival capabilities
Best Practices for Data Lifecycle Management
Architecture Guidelines
Design scalable data lifecycle architectures:
- Temperature-Based Storage: Implement hot/warm/cold data tiers based on access patterns
- Automated Policies: Use policy-driven automation to reduce manual intervention
- Incremental Processing: Implement incremental archival to minimize performance impact
- Compliance Integration: Build compliance requirements into lifecycle policies
- Monitoring and Alerting: Monitor archival processes and set up alerting for issues
- Recovery Planning: Plan for archive recovery and rollback scenarios
Performance Optimization
Optimize lifecycle operations for production environments:
- Batch Processing: Use appropriate batch sizes to balance performance and memory usage
- Index Strategy: Maintain different index strategies for each storage tier
- Compression: Apply appropriate compression based on data temperature
- Scheduling: Schedule intensive operations during low-traffic periods
- Resource Management: Monitor and limit resource usage during archival operations
- Query Optimization: Optimize queries across archived and active data
Conclusion
MongoDB data archiving and lifecycle management provide essential capabilities for maintaining database performance, controlling costs, and meeting compliance requirements as data volumes grow. Combined with SQL-familiar lifecycle management patterns, MongoDB enables systematic data lifecycle policies that scale with business needs.
Key lifecycle management benefits include:
- Performance Optimization: Keep active data performant by archiving historical data
- Cost Control: Optimize storage costs through tiered storage strategies
- Compliance Adherence: Meet regulatory requirements through systematic retention policies
- Automated Operations: Reduce manual intervention through policy-driven automation
- Data Integrity: Maintain data integrity and audit trails throughout the lifecycle
Whether you're managing user activity data, transaction records, audit logs, or analytical datasets, MongoDB lifecycle management with QueryLeaf's familiar SQL interface provides the tools for systematic data archiving at scale. This combination enables you to implement sophisticated lifecycle policies while preserving familiar development patterns.
QueryLeaf Integration: QueryLeaf automatically manages MongoDB lifecycle operations including temperature-based storage, compliance-aware archival, and automated policy execution while providing SQL-familiar DDL and DML syntax. Complex archival workflows, audit trail generation, and cost optimization strategies are seamlessly handled through familiar SQL patterns, making data lifecycle management both powerful and accessible.
The integration of automated lifecycle management with SQL-style policy definition makes MongoDB an ideal platform for applications requiring both systematic data archiving and familiar database administration patterns, ensuring your data lifecycle strategies remain both effective and maintainable as they scale and evolve.