backend: refactor migration transforms to return updated document instances and improve MigrationService to execute all transforms sequantilly for each document

This commit is contained in:
juancarmore 2026-02-17 16:03:05 +01:00
parent 7378a8f53e
commit 90a1c6fde9
8 changed files with 160 additions and 141 deletions

View File

@ -114,19 +114,15 @@ import { SchemaTransform, generateSchemaMigrationName } from '../models/migratio
import { meetRoomCollectionName, MeetRoomDocument } from '../models/mongoose-schemas/room.schema.js';
const roomMigrationV1ToV2Name = generateSchemaMigrationName(meetRoomCollectionName, 1, 2);
const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = () => ({
$set: {
maxParticipants: 100
}
});
const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = (room) => {
room.maxParticipants = 100;
return room;
};
```
`transform` must return MongoDB update operators, so it can express any kind of change:
- `$set` to add/modify values
- `$unset` to remove properties
- `$rename` to rename fields
- Any other supported update operator
`transform` must return the updated document instance.
It can mutate the received document by adding, removing, or modifying fields as needed to conform to the new schema version.
### Step 5: Register Migration

View File

@ -8,11 +8,11 @@ import { MeetApiKeyDocument } from '../models/mongoose-schemas/api-key.schema.js
* Example:
*
* const apiKeyMigrationV1ToV2Name = generateSchemaMigrationName(meetApiKeyCollectionName, 1, 2);
* const apiKeyMigrationV1ToV2Transform: SchemaTransform<MeetApiKeyDocument> = () => ({
* $set: {
* expirationDate: undefined
* }
* });
*
* const apiKeyMigrationV1ToV2Transform: SchemaTransform<MeetApiKeyDocument> = (apiKey) => {
* apiKey.expirationDate = undefined;
* return apiKey;
* };
*/
export const apiKeyMigrations: SchemaMigrationMap<MeetApiKeyDocument> = new Map([
// [apiKeyMigrationV1ToV2Name, apiKeyMigrationV1ToV2Transform]

View File

@ -8,11 +8,11 @@ import { MeetGlobalConfigDocument } from '../models/mongoose-schemas/global-conf
* Example:
*
* const globalConfigMigrationV1ToV2Name = generateSchemaMigrationName(meetGlobalConfigCollectionName, 1, 2);
* const globalConfigMigrationV1ToV2Transform: SchemaTransform<MeetGlobalConfigDocument> = () => ({
* $set: {
* newField: 'default-value'
* }
* });
*
* const globalConfigMigrationV1ToV2Transform: SchemaTransform<MeetGlobalConfigDocument> = (globalConfig) => {
* globalConfig.newField = 'defaultValue';
* return globalConfig;
* };
*/
export const globalConfigMigrations: SchemaMigrationMap<MeetGlobalConfigDocument> = new Map([
// [globalConfigMigrationV1ToV2Name, globalConfigMigrationV1ToV2Transform]

View File

@ -3,12 +3,12 @@ import { generateSchemaMigrationName, SchemaMigrationMap, SchemaTransform } from
import { meetRecordingCollectionName, MeetRecordingDocument } from '../models/mongoose-schemas/recording.schema.js';
const recordingMigrationV1ToV2Name = generateSchemaMigrationName(meetRecordingCollectionName, 1, 2);
const recordingMigrationV1ToV2Transform: SchemaTransform<MeetRecordingDocument> = () => ({
$set: {
layout: MeetRecordingLayout.GRID,
encoding: MeetRecordingEncodingPreset.H264_720P_30
}
});
const recordingMigrationV1ToV2Transform: SchemaTransform<MeetRecordingDocument> = (recording) => {
recording.layout = MeetRecordingLayout.GRID;
recording.encoding = MeetRecordingEncodingPreset.H264_720P_30;
return recording;
};
/**
* Schema migrations for MeetRecording.

View File

@ -3,13 +3,13 @@ import { generateSchemaMigrationName, SchemaMigrationMap, SchemaTransform } from
import { meetRoomCollectionName, MeetRoomDocument } from '../models/mongoose-schemas/room.schema.js';
const roomMigrationV1ToV2Name = generateSchemaMigrationName(meetRoomCollectionName, 1, 2);
const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = () => ({
$set: {
'config.captions': { enabled: true },
'config.recording.layout': MeetRecordingLayout.GRID,
'config.recording.encoding': MeetRecordingEncodingPreset.H264_720P_30
}
});
const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = (room) => {
room.config.captions = { enabled: true };
room.config.recording.layout = MeetRecordingLayout.GRID;
room.config.recording.encoding = MeetRecordingEncodingPreset.H264_720P_30;
return room;
};
/**
* Schema migrations for MeetRoom.

View File

@ -8,11 +8,11 @@ import { MeetUserDocument } from '../models/mongoose-schemas/user.schema.js';
* Example:
*
* const userMigrationV1ToV2Name = generateSchemaMigrationName(meetUserCollectionName, 1, 2);
* const userMigrationV1ToV2Transform: SchemaTransform<MeetUserDocument> = () => ({
* $set: {
* email: undefined
* }
* });
*
* const userMigrationV1ToV2Transform: SchemaTransform<MeetUserDocument> = (user) => {
* user.newField = 'defaultValue';
* return user;
* };
*/
export const userMigrations: SchemaMigrationMap<MeetUserDocument> = new Map([
// [userMigrationV1ToV2Name, userMigrationV1ToV2Transform]

View File

@ -1,4 +1,4 @@
import { Document, Model, UpdateQuery } from 'mongoose';
import { Document, Model } from 'mongoose';
/**
* Interface representing a migration document in MongoDB.
@ -124,17 +124,9 @@ export interface SchemaMigratableDocument extends Document {
export type SchemaVersion = number;
/**
* MongoDB update operations generated by a migration transform.
* Supports full update operators like $set, $unset, $rename, etc.
* Function that transforms a document and returns the updated document.
*/
export type MigrationUpdate<TDocument extends SchemaMigratableDocument> = UpdateQuery<TDocument>;
/**
* Function that transforms a document and returns a MongoDB update operation.
*/
export type SchemaTransform<TDocument extends SchemaMigratableDocument> = (
document: TDocument
) => MigrationUpdate<TDocument>;
export type SchemaTransform<TDocument extends SchemaMigratableDocument> = (document: TDocument) => TDocument;
/**
* Map of schema migration names to transform functions.

View File

@ -7,7 +7,6 @@ import {
CollectionMigrationRegistry,
generateSchemaMigrationName,
MigrationResult,
MigrationUpdate,
SchemaMigratableDocument,
SchemaMigrationStep,
SchemaVersion
@ -95,36 +94,46 @@ export class MigrationService {
): Promise<number> {
this.logger.info(`Checking schema version for collection: ${registry.collectionName}`);
const minVersionInDb = await this.getMinSchemaVersion(registry.model);
const oldestSchemaVersionInDb = await this.getMinSchemaVersion(registry.model);
if (minVersionInDb === null) {
if (oldestSchemaVersionInDb === null) {
this.logger.info(`No documents found in ${registry.collectionName}, skipping migration`);
return 0;
}
const maxVersionInDb = await this.getMaxSchemaVersion(registry.model);
const latestSchemaVersionInDb = await this.getMaxSchemaVersion(registry.model);
if (maxVersionInDb && maxVersionInDb > registry.currentVersion) {
if (latestSchemaVersionInDb && latestSchemaVersionInDb > registry.currentVersion) {
throw new Error(
`Collection ${registry.collectionName} has schemaVersion ${maxVersionInDb}, ` +
`Collection ${registry.collectionName} has schemaVersion ${latestSchemaVersionInDb}, ` +
`which is higher than expected ${registry.currentVersion}. ` +
'Startup aborted to prevent inconsistent schema handling.'
);
}
if (minVersionInDb === registry.currentVersion) {
this.logger.info(`Collection ${registry.collectionName} is already at version ${registry.currentVersion}`);
if (oldestSchemaVersionInDb === registry.currentVersion) {
this.logger.info(
`Collection ${registry.collectionName} is already at version ${registry.currentVersion}, skipping migration`
);
return 0;
}
const migrationSteps = this.getRequiredMigrationSteps(registry, minVersionInDb);
let collectionMigrated = 0;
let migratedDocumentsInCollection = 0;
for (const migrationStep of migrationSteps) {
collectionMigrated += await this.executeCollectionMigrationStep(registry, migrationStep);
for (
let sourceSchemaVersion = oldestSchemaVersionInDb;
sourceSchemaVersion < registry.currentVersion;
sourceSchemaVersion++
) {
const migrationChain = this.getRequiredMigrationSteps(registry, sourceSchemaVersion);
migratedDocumentsInCollection += await this.executeMigrationChainForVersion(
registry,
sourceSchemaVersion,
migrationChain
);
}
return collectionMigrated;
return migratedDocumentsInCollection;
}
/**
@ -132,121 +141,139 @@ export class MigrationService {
* Validates that there are no missing migration steps in the chain.
*
* @param registry - The collection migration registry
* @param minVersionInDb - The minimum schema version currently present in the database
* @param sourceSchemaVersion - Source schema version whose migration chain must be executed
* @returns Array of migration steps that need to be executed in order
*/
protected getRequiredMigrationSteps<TDocument extends SchemaMigratableDocument>(
registry: CollectionMigrationRegistry<TDocument>,
minVersionInDb: SchemaVersion
sourceSchemaVersion: SchemaVersion
): SchemaMigrationStep<TDocument>[] {
const migrationSteps = this.findSchemaMigrationSteps(registry, minVersionInDb, registry.currentVersion);
const migrationSteps = this.findSchemaMigrationSteps(registry, sourceSchemaVersion, registry.currentVersion);
if (migrationSteps.length === 0) {
throw new Error(
`No migration steps found for ${registry.collectionName} ` +
`(v${minVersionInDb} -> v${registry.currentVersion}). Startup aborted.`
`(v${sourceSchemaVersion} -> v${registry.currentVersion}). Startup aborted.`
);
}
this.logger.info(
`Found ${migrationSteps.length} migration steps for ${registry.collectionName} ` +
`(v${minVersionInDb} -> v${registry.currentVersion})`
`(v${sourceSchemaVersion} -> v${registry.currentVersion})`
);
return migrationSteps;
}
/**
* Executes a single migration step for a collection, applying the transform to all documents at the fromVersion.
* Handles marking the migration as started, completed, or failed in the migration repository.
* Executes the migration chain for all documents currently at a specific source schema version.
* Handles marking the chain as started, completed, or failed in the migration repository.
*
* @param registry - The collection migration registry
* @param migrationStep - The specific migration step to execute
* @returns Number of documents migrated in this step
* @param sourceSchemaVersion - Source schema version to migrate from
* @param migrationChain - Ordered migration steps from source to current version
* @returns Number of migrated documents for this source version
*/
protected async executeCollectionMigrationStep<TDocument extends SchemaMigratableDocument>(
protected async executeMigrationChainForVersion<TDocument extends SchemaMigratableDocument>(
registry: CollectionMigrationRegistry<TDocument>,
migrationStep: SchemaMigrationStep<TDocument>
sourceSchemaVersion: SchemaVersion,
migrationChain: SchemaMigrationStep<TDocument>[]
): Promise<number> {
const pendingBefore = await this.countDocumentsAtSchemaVersion(registry.model, migrationStep.fromVersion);
const pendingDocumentsBefore = await this.countDocumentsAtSchemaVersion(registry.model, sourceSchemaVersion);
const migrationChainExecutionName = generateSchemaMigrationName(
registry.collectionName,
sourceSchemaVersion,
registry.currentVersion
);
if (pendingBefore === 0) {
this.logger.info(`Migration ${migrationStep.name} has no pending documents, skipping execution`);
if (pendingDocumentsBefore === 0) {
this.logger.info(`Migration ${migrationChainExecutionName} has no pending documents, skipping execution`);
return 0;
}
const isCompleted = await this.migrationRepository.isCompleted(migrationStep.name);
const isCompleted = await this.migrationRepository.isCompleted(migrationChainExecutionName);
if (isCompleted) {
this.logger.warn(
`Migration ${migrationStep.name} is marked as completed but still has ${pendingBefore} pending ` +
`documents at schemaVersion ${migrationStep.fromVersion}. Re-running migration step.`
`Migration ${migrationChainExecutionName} is marked as completed but still has ${pendingDocumentsBefore} pending ` +
`documents at schemaVersion ${sourceSchemaVersion}. Re-running migration chain.`
);
}
await this.migrationRepository.markAsStarted(migrationStep.name);
await this.migrationRepository.markAsStarted(migrationChainExecutionName);
try {
this.logger.info(`Executing migration: ${migrationStep.name}`);
const result = await this.runSchemaMigrationStep(migrationStep, registry.model);
const pendingAfter = await this.countDocumentsAtSchemaVersion(registry.model, migrationStep.fromVersion);
this.logger.info(`Executing migration: ${migrationChainExecutionName}`);
const result = await this.migrateDocumentsForSourceVersion(
registry.model,
sourceSchemaVersion,
registry.currentVersion,
migrationChain
);
const pendingDocumentsAfter = await this.countDocumentsAtSchemaVersion(registry.model, sourceSchemaVersion);
const metadata: Record<string, unknown> = {
const metadata = {
collectionName: registry.collectionName,
fromVersion: migrationStep.fromVersion,
toVersion: migrationStep.toVersion,
fromVersion: sourceSchemaVersion,
toVersion: registry.currentVersion,
chainLength: migrationChain.length,
chainStepNames: migrationChain.map((step) => step.name),
migratedCount: result.migratedCount,
failedCount: result.failedCount,
pendingBefore,
pendingAfter,
pendingBefore: pendingDocumentsBefore,
pendingAfter: pendingDocumentsAfter,
durationMs: result.durationMs
};
if (result.failedCount > 0 || pendingAfter > 0) {
if (result.failedCount > 0 || pendingDocumentsAfter > 0) {
const failureReason =
`Migration ${migrationStep.name} did not complete successfully. ` +
`failedCount=${result.failedCount}, pendingAfter=${pendingAfter}`;
`Migration ${migrationChainExecutionName} did not complete successfully. ` +
`failedCount=${result.failedCount}, pendingAfter=${pendingDocumentsAfter}`;
await this.migrationRepository.markAsFailed(migrationStep.name, failureReason);
await this.migrationRepository.markAsFailed(migrationChainExecutionName, failureReason);
throw new Error(failureReason);
}
await this.migrationRepository.markAsCompleted(migrationStep.name, metadata);
await this.migrationRepository.markAsCompleted(migrationChainExecutionName, metadata);
this.logger.info(
`Migration ${migrationStep.name} completed: ${result.migratedCount} documents migrated (${result.durationMs}ms)`
`Migration ${migrationChainExecutionName} completed: ${result.migratedCount} documents migrated (${result.durationMs}ms)`
);
return result.migratedCount;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
await this.migrationRepository.markAsFailed(migrationStep.name, errorMessage);
await this.migrationRepository.markAsFailed(migrationChainExecutionName, errorMessage);
throw error;
}
}
/**
* Executes a single schema migration step on all documents that match the fromVersion.
* Applies the transform function to each document and updates it to the toVersion.
* Executes a migration chain on all documents that match the source version.
* Applies all transforms sequentially and saves each document once at the target version.
*
* @param migrationStep - The migration step to execute
* @param model - Mongoose model for the collection being migrated
* @param sourceSchemaVersion - The schema version to migrate from
* @param targetVersion - The schema version to migrate to
* @param migrationChain - Array of migration steps to apply in order
* @param batchSize - Number of documents to process in each batch. Default is 50.
* @returns Migration result with statistics about the execution
*/
protected async runSchemaMigrationStep<TDocument extends SchemaMigratableDocument>(
migrationStep: SchemaMigrationStep<TDocument>,
protected async migrateDocumentsForSourceVersion<TDocument extends SchemaMigratableDocument>(
model: Model<TDocument>,
sourceSchemaVersion: SchemaVersion,
targetVersion: SchemaVersion,
migrationChain: SchemaMigrationStep<TDocument>[],
batchSize = 50
): Promise<MigrationResult> {
const startTime = Date.now();
let migratedCount = 0;
let failedCount = 0;
const versionFilter = { schemaVersion: migrationStep.fromVersion };
const totalDocs = await model.countDocuments(versionFilter).exec();
const sourceVersionFilter = { schemaVersion: sourceSchemaVersion };
const totalSourceVersionDocuments = await model.countDocuments(sourceVersionFilter).exec();
if (totalDocs === 0) {
if (totalSourceVersionDocuments === 0) {
return {
migratedCount,
failedCount,
@ -254,17 +281,17 @@ export class MigrationService {
};
}
let processedCount = 0;
let lastProcessedId: TDocument['_id'] | null = null;
let hasMoreDocuments = true;
let processedDocumentsCount = 0;
let lastProcessedDocumentId: TDocument['_id'] | null = null;
let hasMoreBatches = true;
while (hasMoreDocuments) {
while (hasMoreBatches) {
const batchFilter =
lastProcessedId === null
? versionFilter
lastProcessedDocumentId === null
? sourceVersionFilter
: {
...versionFilter,
_id: { $gt: lastProcessedId }
...sourceVersionFilter,
_id: { $gt: lastProcessedDocumentId }
};
const documents = await model.find(batchFilter).sort({ _id: 1 }).limit(batchSize).exec();
@ -274,9 +301,8 @@ export class MigrationService {
const batchResults = await Promise.allSettled(
documents.map(async (doc) => {
const transformedUpdate = migrationStep.transform(doc);
const update = this.appendSchemaVersionUpdate(transformedUpdate, migrationStep.toVersion);
await model.updateOne({ _id: doc._id }, update).exec();
const migratedDocument = this.applyTransformChain(doc, migrationChain, targetVersion);
await migratedDocument.save();
return String(doc._id);
})
);
@ -293,10 +319,10 @@ export class MigrationService {
this.logger.warn(`Failed to migrate document ${String(documents[i]._id)}:`, batchResult.reason);
}
processedCount += documents.length;
lastProcessedId = documents[documents.length - 1]._id;
hasMoreDocuments = documents.length === batchSize;
this.logger.debug(`Processed ${processedCount}/${totalDocs} documents`);
processedDocumentsCount += documents.length;
lastProcessedDocumentId = documents[documents.length - 1]._id;
hasMoreBatches = documents.length === batchSize;
this.logger.debug(`Processed ${processedDocumentsCount}/${totalSourceVersionDocuments} documents`);
}
return {
@ -307,7 +333,8 @@ export class MigrationService {
}
/**
* Gets the minimum schema version present in the collection to detect the oldest pending version of documents.
* Gets the minimum schema version present in the collection.
* This is used to detect the oldest pending version of documents.
*
* @param model - Mongoose model for the collection
* @returns Current version or null if collection is empty
@ -331,7 +358,8 @@ export class MigrationService {
}
/**
* Gets the maximum schema version present in the collection to detect if there are any documents above expected version.
* Gets the maximum schema version present in the collection.
* This is used to detect if there are any documents above expected version.
*
* @param model - Mongoose model for the collection
* @returns Maximum version or null if collection is empty
@ -381,9 +409,9 @@ export class MigrationService {
fromVersion: SchemaVersion,
toVersion: SchemaVersion
): SchemaMigrationStep<TDocument>[] {
const needed: SchemaMigrationStep<TDocument>[] = [];
const steps: SchemaMigrationStep<TDocument>[] = [];
// Build a chain of migrations from fromVersion to toVersion
// Build a chain of migration steps from fromVersion to toVersion
let currentVersion = fromVersion;
while (currentVersion < toVersion) {
@ -402,7 +430,7 @@ export class MigrationService {
);
}
needed.push({
steps.push({
name: expectedMigrationName,
fromVersion: currentVersion,
toVersion: nextVersion,
@ -411,27 +439,30 @@ export class MigrationService {
currentVersion = nextVersion;
}
return needed;
return steps;
}
/**
* Appends a schemaVersion update to the migration update operation.
* Ensures that migrated documents are marked with the new version.
* Applies a chain of migration transforms to a document sequentially.
* Updates the document's schemaVersion to the target version after applying all transforms.
*
* @param update - Original migration update operation
* @param toVersion - Target schema version to set
* @returns Updated migration operation with schemaVersion set
* @param document - The document to transform
* @param migrationChain - Array of migration steps to apply in order
* @param targetVersion - The final schema version after applying the chain
* @returns The transformed document with updated schemaVersion
*/
protected appendSchemaVersionUpdate<TDocument extends SchemaMigratableDocument>(
update: MigrationUpdate<TDocument>,
toVersion: SchemaVersion
): MigrationUpdate<TDocument> {
return {
...update,
$set: {
...(update.$set ?? {}),
schemaVersion: toVersion
protected applyTransformChain<TDocument extends SchemaMigratableDocument>(
document: TDocument,
migrationChain: SchemaMigrationStep<TDocument>[],
targetVersion: SchemaVersion
): TDocument {
let transformedDocument = document;
for (const migrationStep of migrationChain) {
transformedDocument = migrationStep.transform(transformedDocument);
}
};
transformedDocument.schemaVersion = targetVersion;
return transformedDocument;
}
}