refactor(migrations): overhaul migration system to use schema transformation maps

- Removed the BaseSchemaMigration class and replaced it with a more flexible schema migration approach using transformation functions.
- Updated global-config, recording, room, and user migrations to utilize the new schema migration map structure.
- Introduced runtime migration registry for better management of migration execution.
- Enhanced migration service to handle schema migrations more efficiently, including improved error handling and logging.
- Added utility functions for generating and validating schema migration names.
- Updated migration repository methods to streamline migration status tracking.
This commit is contained in:
juancarmore 2026-02-17 11:32:34 +01:00
parent a853aa02a2
commit 96e441726c
16 changed files with 1588 additions and 629 deletions

View File

@ -21,7 +21,6 @@ The schema migration system enables safe evolution of MongoDB document structure
- ✅ **HA-safe** (distributed locking prevents concurrent migrations)
- ✅ **Batch processing** (efficient handling of large collections)
- ✅ **Progress tracking** (migrations stored in `MeetMigration` collection)
- ✅ **Version validation** (optional runtime checks in repositories)
---
@ -47,7 +46,6 @@ Each document includes a `schemaVersion` field:
```
src/
├── migrations/
│ ├── base-migration.ts # Base class for migrations
│ ├── migration-registry.ts # Central registry of all collections
│ ├── room-migrations.ts # Room-specific migrations
│ ├── recording-migrations.ts # Recording-specific migrations
@ -59,13 +57,24 @@ src/
└── migration.model.ts # Migration types and interfaces
```
**Note**: All migration types and interfaces (`ISchemaMigration`, `MigrationContext`, `MigrationResult`, `SchemaVersion`, `CollectionMigrationRegistry`) are defined in `src/models/migration.model.ts` for better code organization.
---
## Adding New Migrations
### Step 1: Update Schema Version in Configuration
### Step 1: Update TypeScript Interface
Update the domain interface to include new fields or changes:
```typescript
// typings/src/room.ts
export interface MeetRoom extends MeetRoomOptions {
roomId: string;
// ... existing fields ...
maxParticipants: number; // New field
}
```
### Step 2: Update Schema Version in Configuration
In `src/config/internal-config.ts`, increment the version constant:
@ -78,89 +87,48 @@ export const INTERNAL_CONFIG = {
};
```
### Step 2: Create Migration Class
### Step 3: Update Moongose Schema
Update the Mongoose schema to reflect the changes (new fields, etc.):
```typescript
import { BaseSchemaMigration } from './base-migration.js';
import { MeetRoomDocument } from '../repositories/schemas/room.schema.js';
import { MigrationContext } from '../models/migration.model.js';
import { Model } from 'mongoose';
class RoomMigrationV1ToV2 extends BaseSchemaMigration<MeetRoomDocument> {
fromVersion = 1;
toVersion = 2;
description = 'Add maxParticipants field with default value of 100';
protected async transform(document: MeetRoomDocument): Promise<Partial<MeetRoomDocument>> {
// Return fields to update (schemaVersion is handled automatically)
return {
maxParticipants: 100
};
}
// Optional: Add validation before migration runs
async validate(model: Model<MeetRoomDocument>, context: MigrationContext): Promise<boolean> {
// Check prerequisites, data integrity, etc.
return true;
}
}
```
### Step 3: Register Migration
Add the migration instance to the migrations array in `room-migrations.ts`:
```typescript
import { ISchemaMigration } from '../models/migration.model.js';
import { MeetRoomDocument } from '../repositories/schemas/room.schema.js';
export const roomMigrations: ISchemaMigration<MeetRoomDocument>[] = [
new RoomMigrationV1ToV2()
// Future migrations will be added here
];
```
### Step 4: Update Schema Definition
Update the Mongoose schema default version in `internal-config.ts`:
```typescript
// config/internal-config.ts
export const INTERNAL_CONFIG = {
// ... other config
ROOM_SCHEMA_VERSION: 2 // Updated from 1
// ...
};
```
If adding new required fields, update the Mongoose schema:
```typescript
// repositories/schemas/room.schema.ts
import { INTERNAL_CONFIG } from '../../config/internal-config.js';
// models/mongoose-schemas/room.schema.ts
const MeetRoomSchema = new Schema<MeetRoomDocument>({
schemaVersion: {
type: Number,
required: true,
default: INTERNAL_CONFIG.ROOM_SCHEMA_VERSION // Uses config value (2)
},
// ... existing fields ...
maxParticipants: { type: Number, required: true, default: 100 } // New field
});
```
### Step 5: Update TypeScript Interface
Update the domain interface to include new fields:
### Step 4: Create Migration Definition
```typescript
// typings/src/room.ts
export interface MeetRoom extends MeetRoomOptions {
roomId: string;
// ... existing fields ...
maxParticipants: number; // New field
}
import { SchemaTransform, generateSchemaMigrationName } from '../models/migration.model.js';
import { MeetRoomDocument } from '../models/mongoose-schemas/room.schema.js';
const roomMigrationV1ToV2Name = generateSchemaMigrationName('MeetRoom', 1, 2);
const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = () => ({
$set: {
maxParticipants: 100
}
});
```
`transform` must return MongoDB update operators, so it can express any kind of change:
- `$set` to add/modify values
- `$unset` to remove properties
- `$rename` to rename fields
- Any other supported update operator
### Step 5: Register Migration
Add the migration to the map initialization in `room-migrations.ts`:
```typescript
export const roomMigrations: SchemaMigrationMap<MeetRoomDocument> = new Map([
[roomMigrationV1ToV2Name, roomMigrationV1ToV2Transform]
]);
```
### Step 6: Test Migration
@ -187,7 +155,6 @@ Each migration is tracked in the `MeetMigration` collection:
"fromVersion": 1,
"toVersion": 2,
"migratedCount": 1523,
"skippedCount": 0,
"failedCount": 0,
"durationMs": 123000
}

View File

@ -1,24 +1,20 @@
import { ISchemaMigration } from '../models/migration.model.js';
import { SchemaMigrationMap } from '../models/migration.model.js';
import { MeetApiKeyDocument } from '../models/mongoose-schemas/api-key.schema.js';
/**
* All migrations for the MeetApiKey collection in chronological order.
* Add new migrations to this array as the schema evolves.
* Schema migrations for MeetApiKey.
* Key format: schema_{collection}_v{from}_to_v{to}
*
* Example migration (when needed in the future):
* Example:
*
* class ApiKeyMigrationV1ToV2 extends BaseSchemaMigration<MeetApiKeyDocument> {
* fromVersion = 1;
* toVersion = 2;
* description = 'Add expirationDate field for API key expiration';
* const apiKeyMigrationV1ToV2Name = generateSchemaMigrationName('MeetApiKey', 1, 2);
*
* protected async transform(document: MeetApiKeyDocument): Promise<Partial<MeetApiKeyDocument>> {
* return {
* expirationDate: undefined // No expiration for existing keys
* };
* }
* }
* const apiKeyMigrationV1ToV2Transform: SchemaTransform<MeetApiKeyDocument> = () => ({
* $set: {
* expirationDate: undefined
* }
* });
*/
export const apiKeyMigrations: ISchemaMigration<MeetApiKeyDocument>[] = [
// Migrations will be added here as the schema evolves
];
export const apiKeyMigrations: SchemaMigrationMap<MeetApiKeyDocument> = new Map([
// [apiKeyMigrationV1ToV2Name, apiKeyMigrationV1ToV2Transform]
]);

View File

@ -1,124 +0,0 @@
import { Model } from 'mongoose';
import { ISchemaMigration, MigrationContext, MigrationResult, SchemaVersion } from '../models/migration.model.js';
/**
* Base class for schema migrations providing common functionality.
* Extend this class to implement specific migrations for collections.
*/
export abstract class BaseSchemaMigration<TDocument> implements ISchemaMigration<TDocument> {
abstract fromVersion: SchemaVersion;
abstract toVersion: SchemaVersion;
abstract description: string;
/**
* Default batch size for processing documents.
* Can be overridden in subclasses for collections with large documents.
*/
protected readonly defaultBatchSize = 50;
/**
* Executes the migration in batches.
* Processes all documents at fromVersion and upgrades them to toVersion.
*/
async execute(model: Model<TDocument>, context: MigrationContext): Promise<MigrationResult> {
const startTime = Date.now();
const batchSize = context.batchSize || this.defaultBatchSize;
let migratedCount = 0;
const skippedCount = 0;
let failedCount = 0;
context.logger.info(
`Starting schema migration: ${this.description} (v${this.fromVersion} -> v${this.toVersion})`
);
try {
// Find all documents at the source version
const totalDocs = await model.countDocuments({ schemaVersion: this.fromVersion }).exec();
if (totalDocs === 0) {
context.logger.info('No documents to migrate');
return {
migratedCount: 0,
skippedCount: 0,
failedCount: 0,
durationMs: Date.now() - startTime
};
}
context.logger.info(`Found ${totalDocs} documents to migrate`);
// Process documents in batches
let processedCount = 0;
while (processedCount < totalDocs) {
const documents = await model.find({ schemaVersion: this.fromVersion }).limit(batchSize).exec();
if (documents.length === 0) {
break;
}
// Transform and update each document
for (const doc of documents) {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const updates = await this.transform(doc as any);
// Update the document with new fields and version
await model
.updateOne(
{ _id: doc._id },
{
$set: {
...updates,
schemaVersion: this.toVersion
}
}
)
.exec();
migratedCount++;
} catch (error) {
failedCount++;
context.logger.warn(`Failed to migrate document ${doc._id}:`, error);
}
}
processedCount += documents.length;
context.logger.debug(`Processed ${processedCount}/${totalDocs} documents`);
}
const durationMs = Date.now() - startTime;
context.logger.info(
`Migration completed: ${migratedCount} migrated, ${failedCount} failed (${durationMs}ms)`
);
return {
migratedCount,
skippedCount,
failedCount,
durationMs
};
} catch (error) {
context.logger.error('Migration failed:', error);
throw error;
}
}
/**
* Transform a single document from source version to target version.
* Override this method to implement the specific transformation logic.
*
* @param document - The document to transform
* @returns Object with fields to update (excluding schemaVersion which is handled automatically)
*/
protected abstract transform(document: TDocument): Promise<Partial<TDocument>>;
/**
* Optional validation before running migration.
* Default implementation always returns true.
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async validate(_model: Model<TDocument>, _context: MigrationContext): Promise<boolean> {
return true;
}
}

View File

@ -1,27 +1,20 @@
import { ISchemaMigration } from '../models/migration.model.js';
import { SchemaMigrationMap } from '../models/migration.model.js';
import { MeetGlobalConfigDocument } from '../models/mongoose-schemas/global-config.schema.js';
/**
* All migrations for the MeetGlobalConfig collection in chronological order.
* Add new migrations to this array as the schema evolves.
* Schema migrations for MeetGlobalConfig.
* Key format: schema_{collection}_v{from}_to_v{to}
*
* Example migration (when needed in the future):
* Example:
*
* class GlobalConfigMigrationV1ToV2 extends BaseSchemaMigration<MeetGlobalConfigDocument> {
* fromVersion = 1;
* toVersion = 2;
* description = 'Add new branding configuration section';
* const globalConfigMigrationV1ToV2Name = generateSchemaMigrationName('MeetGlobalConfig', 1, 2);
*
* protected async transform(document: MeetGlobalConfigDocument): Promise<Partial<MeetGlobalConfigDocument>> {
* return {
* brandingConfig: {
* logoUrl: '',
* companyName: 'OpenVidu Meet'
* }
* };
* }
* }
* const globalConfigMigrationV1ToV2Transform: SchemaTransform<MeetGlobalConfigDocument> = () => ({
* $set: {
* newField: 'default-value'
* }
* });
*/
export const globalConfigMigrations: ISchemaMigration<MeetGlobalConfigDocument>[] = [
// Migrations will be added here as the schema evolves
];
export const globalConfigMigrations: SchemaMigrationMap<MeetGlobalConfigDocument> = new Map([
// [globalConfigMigrationV1ToV2Name, globalConfigMigrationV1ToV2Transform]
]);

View File

@ -0,0 +1,6 @@
export * from './api-key-migrations.js';
export * from './global-config-migrations.js';
export * from './migration-registry.js';
export * from './recording-migrations.js';
export * from './room-migrations.js';
export * from './user-migrations.js';

View File

@ -1,13 +1,22 @@
import { INTERNAL_CONFIG } from '../config/internal-config.js';
import { CollectionMigrationRegistry } from '../models/migration.model.js';
import { meetApiKeyCollectionName, MeetApiKeyModel } from '../models/mongoose-schemas/api-key.schema.js';
import { CollectionMigrationRegistry, SchemaMigratableDocument } from '../models/migration.model.js';
import {
meetApiKeyCollectionName,
MeetApiKeyDocument,
MeetApiKeyModel
} from '../models/mongoose-schemas/api-key.schema.js';
import {
meetGlobalConfigCollectionName,
MeetGlobalConfigDocument,
MeetGlobalConfigModel
} from '../models/mongoose-schemas/global-config.schema.js';
import { meetRecordingCollectionName, MeetRecordingModel } from '../models/mongoose-schemas/recording.schema.js';
import { meetRoomCollectionName, MeetRoomModel } from '../models/mongoose-schemas/room.schema.js';
import { meetUserCollectionName, MeetUserModel } from '../models/mongoose-schemas/user.schema.js';
import {
meetRecordingCollectionName,
MeetRecordingDocument,
MeetRecordingModel
} from '../models/mongoose-schemas/recording.schema.js';
import { meetRoomCollectionName, MeetRoomDocument, MeetRoomModel } from '../models/mongoose-schemas/room.schema.js';
import { meetUserCollectionName, MeetUserDocument, MeetUserModel } from '../models/mongoose-schemas/user.schema.js';
import { apiKeyMigrations } from './api-key-migrations.js';
import { globalConfigMigrations } from './global-config-migrations.js';
import { recordingMigrations } from './recording-migrations.js';
@ -16,12 +25,18 @@ import { userMigrations } from './user-migrations.js';
/**
* Central registry of all collection migrations.
* Defines the current version and migration path for each collection.
* Defines the current version and migration map for each collection.
*
* Order matters: collections should be listed in dependency order.
* For example, if recordings depend on rooms, rooms should come first.
*/
export const migrationRegistry: CollectionMigrationRegistry[] = [
const migrationRegistry: [
CollectionMigrationRegistry<MeetGlobalConfigDocument>,
CollectionMigrationRegistry<MeetUserDocument>,
CollectionMigrationRegistry<MeetApiKeyDocument>,
CollectionMigrationRegistry<MeetRoomDocument>,
CollectionMigrationRegistry<MeetRecordingDocument>
] = [
// GlobalConfig - no dependencies, can run first
{
collectionName: meetGlobalConfigCollectionName,
@ -59,3 +74,10 @@ export const migrationRegistry: CollectionMigrationRegistry[] = [
migrations: recordingMigrations
}
];
/**
* Homogeneous runtime view of the migration registry.
* Used by migration execution code that iterates over all collections.
*/
export const runtimeMigrationRegistry =
migrationRegistry as unknown as CollectionMigrationRegistry<SchemaMigratableDocument>[];

View File

@ -1,24 +1,20 @@
import { ISchemaMigration } from '../models/migration.model.js';
import { SchemaMigrationMap } from '../models/migration.model.js';
import { MeetRecordingDocument } from '../models/mongoose-schemas/recording.schema.js';
/**
* All migrations for the MeetRecording collection in chronological order.
* Add new migrations to this array as the schema evolves.
* Schema migrations for MeetRecording.
* Key format: schema_{collection}_v{from}_to_v{to}
*
* Example migration (when needed in the future):
* Example:
*
* class RecordingMigrationV1ToV2 extends BaseSchemaMigration<MeetRecordingDocument> {
* fromVersion = 1;
* toVersion = 2;
* description = 'Add new optional field "quality" for recording quality tracking';
* const recordingMigrationV1ToV2Name = generateSchemaMigrationName('MeetRecording', 1, 2);
*
* protected async transform(document: MeetRecordingDocument): Promise<Partial<MeetRecordingDocument>> {
* return {
* quality: 'standard' // Default quality for existing recordings
* };
* }
* }
* const recordingMigrationV1ToV2Transform: SchemaTransform<MeetRecordingDocument> = () => ({
* $set: {
* quality: 'standard'
* }
* });
*/
export const recordingMigrations: ISchemaMigration<MeetRecordingDocument>[] = [
// Migrations will be added here as the schema evolves
];
export const recordingMigrations: SchemaMigrationMap<MeetRecordingDocument> = new Map([
// [recordingMigrationV1ToV2Name, recordingMigrationV1ToV2Transform]
]);

View File

@ -1,26 +1,20 @@
import { ISchemaMigration } from '../models/migration.model.js';
import { SchemaMigrationMap } from '../models/migration.model.js';
import { MeetRoomDocument } from '../models/mongoose-schemas/room.schema.js';
/**
* All migrations for the MeetRoom collection in chronological order.
* Add new migrations to this array as the schema evolves.
* Schema migrations for MeetRoom.
* Key format: schema_{collection}_v{from}_to_v{to}
*
* Example migration (when needed in the future):
* Example:
*
* class RoomMigrationV1ToV2 extends BaseSchemaMigration<MeetRoomDocument> {
* fromVersion = 1;
* toVersion = 2;
* description = 'Add new required field "maxParticipants" with default value';
* const roomMigrationV1ToV2Name = generateSchemaMigrationName('MeetRoom', 1, 2);
*
* protected async transform(document: MeetRoomDocument): Promise<Partial<MeetRoomDocument>> {
* return {
* maxParticipants: 100 // Add default value for existing rooms
* };
* }
* }
* const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = () => ({
* $set: {
* maxParticipants: 100
* }
* });
*/
export const roomMigrations: ISchemaMigration<MeetRoomDocument>[] = [
// Migrations will be added here as the schema evolves
// Example: new RoomMigrationV1ToV2(),
// Example: new RoomMigrationV2ToV3(),
];
export const roomMigrations: SchemaMigrationMap<MeetRoomDocument> = new Map([
// [roomMigrationV1ToV2Name, roomMigrationV1ToV2Transform]
]);

View File

@ -1,24 +1,20 @@
import { ISchemaMigration } from '../models/migration.model.js';
import { SchemaMigrationMap } from '../models/migration.model.js';
import { MeetUserDocument } from '../models/mongoose-schemas/user.schema.js';
/**
* All migrations for the MeetUser collection in chronological order.
* Add new migrations to this array as the schema evolves.
* Schema migrations for MeetUser.
* Key format: schema_{collection}_v{from}_to_v{to}
*
* Example migration (when needed in the future):
* Example:
*
* class UserMigrationV1ToV2 extends BaseSchemaMigration<MeetUserDocument> {
* fromVersion = 1;
* toVersion = 2;
* description = 'Add email field for user notifications';
* const userMigrationV1ToV2Name = generateSchemaMigrationName('MeetUser', 1, 2);
*
* protected async transform(document: MeetUserDocument): Promise<Partial<MeetUserDocument>> {
* return {
* email: undefined // Email will be optional initially
* };
* }
* }
* const userMigrationV1ToV2Transform: SchemaTransform<MeetUserDocument> = () => ({
* $set: {
* email: undefined
* }
* });
*/
export const userMigrations: ISchemaMigration<MeetUserDocument>[] = [
// Migrations will be added here as the schema evolves
];
export const userMigrations: SchemaMigrationMap<MeetUserDocument> = new Map([
// [userMigrationV1ToV2Name, userMigrationV1ToV2Transform]
]);

View File

@ -1,5 +1,4 @@
import { Model } from 'mongoose';
import { LoggerService } from '../services/logger.service.js';
import { Document, Model, UpdateQuery } from 'mongoose';
/**
* Interface representing a migration document in MongoDB.
@ -58,19 +57,11 @@ export enum MigrationStatus {
}
/**
* Enum defining all possible migration names in the system.
* Each migration should have a unique identifier.
*
* Schema migrations follow the pattern: schema_{collection}_v{from}_to_v{to}
* Example: 'schema_room_v1_to_v2', 'schema_recording_v2_to_v3'
*/
export enum MigrationName {
/**
* Migration from legacy storage (S3, ABS, GCS) to MongoDB.
* Includes: GlobalConfig, Users, ApiKeys, Rooms, and Recordings.
*/
LEGACY_STORAGE_TO_MONGODB = 'legacy_storage_to_mongodb'
}
export type SchemaMigrationName = `schema_${string}_v${number}_to_v${number}`;
export type MigrationName = SchemaMigrationName;
/**
* Generates a migration name for schema version upgrades.
@ -83,12 +74,49 @@ export enum MigrationName {
* @example
* generateSchemaMigrationName('MeetRoom', 1, 2) // Returns: 'schema_room_v1_to_v2'
*/
export function generateSchemaMigrationName(collectionName: string, fromVersion: number, toVersion: number): string {
export function generateSchemaMigrationName(
collectionName: string,
fromVersion: number,
toVersion: number
): SchemaMigrationName {
// Convert collection name to lowercase and remove 'Meet' prefix
const simpleName = collectionName.replace(/^Meet/, '').toLowerCase();
return `schema_${simpleName}_v${fromVersion}_to_v${toVersion}`;
}
/**
* Checks whether a string matches the schema migration naming convention.
*/
export function isSchemaMigrationName(name: string): name is SchemaMigrationName {
return /^schema_[a-z0-9_]+_v\d+_to_v\d+$/.test(name);
}
/**
* Parses a schema migration name and extracts entity and versions.
*/
export function parseSchemaMigrationName(
name: string
): { collectionName: string; fromVersion: SchemaVersion; toVersion: SchemaVersion } | null {
const match = /^schema_([a-z0-9_]+)_v(\d+)_to_v(\d+)$/.exec(name);
if (!match) {
return null;
}
return {
collectionName: match[1],
fromVersion: Number(match[2]),
toVersion: Number(match[3])
};
}
/**
* Base document shape required for schema migrations.
*/
export interface SchemaMigratableDocument extends Document {
schemaVersion?: number;
}
/**
* Represents a schema version number.
* Versions start at 1 and increment sequentially.
@ -96,14 +124,49 @@ export function generateSchemaMigrationName(collectionName: string, fromVersion:
export type SchemaVersion = number;
/**
* Context provided to migration functions.
* Contains utilities and services needed during migration.
* MongoDB update operations generated by a migration transform.
* Supports full update operators like $set, $unset, $rename, etc.
*/
export interface MigrationContext {
/** Logger service for tracking migration progress */
logger: LoggerService;
/** Batch size for processing documents (default: 50) */
batchSize?: number;
export type MigrationUpdate<TDocument extends SchemaMigratableDocument> = UpdateQuery<TDocument>;
/**
* Function that transforms a document and returns a MongoDB update operation.
*/
export type SchemaTransform<TDocument extends SchemaMigratableDocument> = (
document: TDocument
) => MigrationUpdate<TDocument>;
/**
* Map of schema migration names to transform functions.
*/
export type SchemaMigrationMap<TDocument extends SchemaMigratableDocument> = Map<
SchemaMigrationName,
SchemaTransform<TDocument>
>;
/**
* Resolved migration step ready to be executed.
*/
export interface SchemaMigrationStep<TDocument extends SchemaMigratableDocument> {
name: SchemaMigrationName;
fromVersion: SchemaVersion;
toVersion: SchemaVersion;
transform: SchemaTransform<TDocument>;
}
/**
* Registry entry for a collection's migrations.
* Groups all migrations for a specific collection.
*/
export interface CollectionMigrationRegistry<TDocument extends SchemaMigratableDocument> {
/** Name of the collection */
collectionName: string;
/** Mongoose model for the collection */
model: Model<TDocument>;
/** Current schema version expected by the application */
currentVersion: SchemaVersion;
/** Map of migration names to their transform functions */
migrations: SchemaMigrationMap<TDocument>;
}
/**
@ -113,60 +176,8 @@ export interface MigrationContext {
export interface MigrationResult {
/** Number of documents successfully migrated */
migratedCount: number;
/** Number of documents skipped (already at target version) */
skippedCount: number;
/** Number of documents that failed migration */
failedCount: number;
/** Total time taken in milliseconds */
durationMs: number;
}
/**
* Interface for a single schema migration handler.
* Each migration transforms documents from one version to the next.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export interface ISchemaMigration<TDocument = any> {
/** The source schema version this migration upgrades from */
fromVersion: SchemaVersion;
/** The target schema version this migration upgrades to */
toVersion: SchemaVersion;
/** Short description of what this migration does */
description: string;
/**
* Executes the migration on a batch of documents.
* Should update documents using MongoDB bulk operations for efficiency.
*
* @param model - Mongoose model for the collection
* @param context - Migration context with logger and configuration
* @returns Migration result with statistics
*/
execute(model: Model<TDocument>, context: MigrationContext): Promise<MigrationResult>;
/**
* Optional validation to check if migration is safe to run.
* Can verify prerequisites or data integrity before migration starts.
*
* @param model - Mongoose model for the collection
* @param context - Migration context with logger and configuration
* @returns true if migration can proceed, false otherwise
*/
validate?(model: Model<TDocument>, context: MigrationContext): Promise<boolean>;
}
/**
* Registry entry for a collection's migrations.
* Groups all migrations for a specific collection.
*/
export interface CollectionMigrationRegistry {
/** Name of the collection */
collectionName: string;
/** Mongoose model for the collection */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
model: Model<any>;
/** Current schema version expected by the application */
currentVersion: SchemaVersion;
/** Array of migrations in chronological order */
migrations: ISchemaMigration[];
}

View File

@ -1,5 +1,5 @@
import { Document, model, Schema } from 'mongoose';
import { MeetMigration, MigrationName, MigrationStatus } from '../migration.model.js';
import { isSchemaMigrationName, MeetMigration, MigrationStatus } from '../migration.model.js';
/**
* Mongoose Document interface for MeetMigration.
@ -16,7 +16,10 @@ const MigrationSchema = new Schema<MeetMigrationDocument>(
name: {
type: String,
required: true,
enum: Object.values(MigrationName)
validate: {
validator: (value: string) => isSchemaMigrationName(value),
message: 'Invalid migration name format'
}
},
status: {
type: String,

View File

@ -1,6 +1,5 @@
import {
MeetRecordingAccess,
MeetRecordingEncodingPreset,
MeetRecordingLayout,
MeetRoom,
MeetRoomDeletionPolicyWithMeeting,
@ -54,34 +53,16 @@ const MeetRecordingConfigSchema = new Schema(
layout: {
type: String,
enum: Object.values(MeetRecordingLayout),
required: true,
default: MeetRecordingLayout.GRID
required: true
},
encoding: {
type: Schema.Types.Mixed,
required: false,
encoding: {
type: Schema.Types.Mixed,
required: true,
default: MeetRecordingEncodingPreset.H264_720P_30,
validate: {
validator: (value: any) => {
if (!value) return true;
if (typeof value === 'string') return true;
if (typeof value === 'object') return value.video || value.audio;
return false;
},
message: 'Encoding must be a preset string or options object'
}
}
required: true
},
allowAccessTo: {
type: String,
enum: Object.values(MeetRecordingAccess),
required: false
required: true
}
},
{ _id: false }
@ -120,8 +101,7 @@ const MeetE2EEConfigSchema = new Schema(
{
enabled: {
type: Boolean,
required: true,
default: false
required: true
}
},
{ _id: false }
@ -134,8 +114,7 @@ const MeetCaptionsConfigSchema = new Schema(
{
enabled: {
type: Boolean,
required: true,
default: true
required: true
}
},
{ _id: false }
@ -215,13 +194,11 @@ const MeetRoomConfigSchema = new Schema(
},
e2ee: {
type: MeetE2EEConfigSchema,
required: true,
default: { enabled: false }
required: true
},
captions: {
type: MeetCaptionsConfigSchema,
required: true,
default: { enabled: false }
required: true
}
},
{ _id: false }
@ -273,14 +250,12 @@ const MeetRoomSchema = new Schema<MeetRoomDocument>(
status: {
type: String,
enum: Object.values(MeetRoomStatus),
required: true,
default: MeetRoomStatus.OPEN
required: true
},
meetingEndAction: {
type: String,
enum: Object.values(MeetingEndAction),
required: true,
default: MeetingEndAction.NONE
required: true
}
},
{

View File

@ -95,7 +95,7 @@ export class MigrationRepository extends BaseRepository<MeetMigration, MeetMigra
{
$set: {
status: MigrationStatus.FAILED,
completedAt: new Date(),
completedAt: Date.now(),
error
}
}
@ -103,27 +103,6 @@ export class MigrationRepository extends BaseRepository<MeetMigration, MeetMigra
return this.toDomain(document);
}
/**
* Get all migrations with their current status.
*
* @returns Array of all migration documents
*/
async getAllMigrations(): Promise<MeetMigration[]> {
const documents = await this.findAll();
return documents;
}
/**
* Get a specific migration by name.
*
* @param name - The name of the migration
* @returns The migration document or null if not found
*/
async getMigration(name: MigrationName): Promise<MeetMigration | null> {
const document = await this.findOne({ name });
return document ? this.toDomain(document) : null;
}
/**
* Check if a migration has been completed successfully.
*

View File

@ -85,7 +85,9 @@ const createApp = () => {
if (process.env.NODE_ENV === 'development') {
// Serve internal API docs only in development mode
appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/docs`, (_req: Request, res: Response) =>
res.type('html').send(getOpenApiHtmlWithBasePath(internalApiHtmlFilePath, INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1))
res
.type('html')
.send(getOpenApiHtmlWithBasePath(internalApiHtmlFilePath, INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1))
);
}
@ -137,7 +139,9 @@ const startServer = (app: express.Application) => {
console.log(
'REST API Docs: ',
chalk.cyanBright(`http://localhost:${MEET_ENV.SERVER_PORT}${basePathDisplay}${INTERNAL_CONFIG.API_BASE_PATH_V1}/docs`)
chalk.cyanBright(
`http://localhost:${MEET_ENV.SERVER_PORT}${basePathDisplay}${INTERNAL_CONFIG.API_BASE_PATH_V1}/docs`
)
);
logEnvVars();
});
@ -164,8 +168,8 @@ const isMainModule = (): boolean => {
if (isMainModule()) {
registerDependencies();
const app = createApp();
startServer(app);
await initializeEagerServices();
startServer(app);
}
export { createApp, registerDependencies };

View File

@ -2,13 +2,15 @@ import { inject, injectable } from 'inversify';
import { Model } from 'mongoose';
import ms from 'ms';
import { MeetLock } from '../helpers/redis.helper.js';
import { migrationRegistry } from '../migrations/migration-registry.js';
import { runtimeMigrationRegistry } from '../migrations/migration-registry.js';
import {
CollectionMigrationRegistry,
generateSchemaMigrationName,
ISchemaMigration,
MigrationContext,
MigrationName
MigrationResult,
MigrationUpdate,
SchemaMigratableDocument,
SchemaMigrationStep,
SchemaVersion
} from '../models/migration.model.js';
import { ApiKeyRepository } from '../repositories/api-key.repository.js';
import { GlobalConfigRepository } from '../repositories/global-config.repository.js';
@ -73,133 +75,19 @@ export class MigrationService {
/**
* Runs all schema migrations to upgrade document structures to the latest version.
* Processes each collection in the registry and executes pending migrations.
*
* Schema migrations run after data migrations and upgrade existing documents
* to match the current schema version expected by the application.
*/
protected async runSchemaMigrations(): Promise<void> {
this.logger.info('Running schema migrations...');
try {
let totalMigrated = 0;
let totalSkipped = 0;
// Process each collection in the registry
for (const registry of migrationRegistry) {
this.logger.info(`Checking schema version for collection: ${registry.collectionName}`);
// Get the current version of documents in the collection
const currentVersionInDb = await this.getCurrentSchemaVersion(registry.model);
if (currentVersionInDb === null) {
this.logger.info(`No documents found in ${registry.collectionName}, skipping migration`);
continue;
}
if (currentVersionInDb === registry.currentVersion) {
this.logger.info(
`Collection ${registry.collectionName} is already at version ${registry.currentVersion}`
);
continue;
}
if (currentVersionInDb > registry.currentVersion) {
this.logger.warn(
`Collection ${registry.collectionName} has version ${currentVersionInDb} ` +
`but application expects ${registry.currentVersion}. ` +
`This may indicate a downgrade or inconsistent deployment.`
);
continue;
}
// Find migrations needed to upgrade from current to target version
const neededMigrations = this.findNeededMigrations(
registry,
currentVersionInDb,
registry.currentVersion
);
if (neededMigrations.length === 0) {
this.logger.info(`No migrations needed for ${registry.collectionName}`);
continue;
}
this.logger.info(
`Found ${neededMigrations.length} migrations for ${registry.collectionName} ` +
`(v${currentVersionInDb} -> v${registry.currentVersion})`
);
// Execute each migration in sequence
for (const migration of neededMigrations) {
const migrationName = generateSchemaMigrationName(
registry.collectionName,
migration.fromVersion,
migration.toVersion
);
// Check if this specific migration was already completed
const isCompleted = await this.migrationRepository.isCompleted(migrationName as MigrationName);
if (isCompleted) {
this.logger.info(`Migration ${migrationName} already completed, skipping`);
continue;
}
// Mark migration as started
await this.migrationRepository.markAsStarted(migrationName as MigrationName);
try {
const migrationContext: MigrationContext = {
logger: this.logger,
batchSize: 50 // Default batch size
};
// Validate migration if validation method is provided
if (migration.validate) {
const isValid = await migration.validate(registry.model, migrationContext);
if (!isValid) {
throw new Error(`Validation failed for migration ${migrationName}`);
}
}
// Execute the migration
this.logger.info(`Executing migration: ${migration.description}`);
const result = await migration.execute(registry.model, migrationContext);
// Track statistics
totalMigrated += result.migratedCount;
totalSkipped += result.skippedCount;
// Mark migration as completed with metadata
const metadata: Record<string, unknown> = {
collectionName: registry.collectionName,
fromVersion: migration.fromVersion,
toVersion: migration.toVersion,
migratedCount: result.migratedCount,
skippedCount: result.skippedCount,
failedCount: result.failedCount,
durationMs: result.durationMs
};
await this.migrationRepository.markAsCompleted(migrationName as MigrationName, metadata);
this.logger.info(
`Migration ${migrationName} completed: ${result.migratedCount} migrated, ` +
`${result.failedCount} failed (${result.durationMs}ms)`
);
} catch (error) {
// Mark migration as failed
const errorMessage = error instanceof Error ? error.message : String(error);
await this.migrationRepository.markAsFailed(migrationName as MigrationName, errorMessage);
throw error;
}
}
for (const registry of runtimeMigrationRegistry) {
totalMigrated += await this.migrateCollectionSchemas(registry);
}
this.logger.info(
`Schema migrations completed successfully: ${totalMigrated} documents migrated, ${totalSkipped} skipped`
);
this.logger.info(`Schema migrations completed successfully: ${totalMigrated} documents migrated`);
} catch (error) {
this.logger.error('Error running schema migrations:', error);
throw error;
@ -207,17 +95,238 @@ export class MigrationService {
}
/**
* Gets the current schema version of documents in a collection.
* Samples the database to determine the version.
* Migrates documents in a collection through all required schema versions to reach the current version.
*
* @param registry - The collection migration registry containing migration steps and model
* @returns Number of documents migrated in this collection
*/
protected async migrateCollectionSchemas<TDocument extends SchemaMigratableDocument>(
registry: CollectionMigrationRegistry<TDocument>
): Promise<number> {
this.logger.info(`Checking schema version for collection: ${registry.collectionName}`);
const minVersionInDb = await this.getMinSchemaVersion(registry.model);
if (minVersionInDb === null) {
this.logger.info(`No documents found in ${registry.collectionName}, skipping migration`);
return 0;
}
const maxVersionInDb = await this.getMaxSchemaVersion(registry.model);
if (maxVersionInDb && maxVersionInDb > registry.currentVersion) {
throw new Error(
`Collection ${registry.collectionName} has schemaVersion ${maxVersionInDb}, ` +
`which is higher than expected ${registry.currentVersion}. ` +
'Startup aborted to prevent inconsistent schema handling.'
);
}
if (minVersionInDb === registry.currentVersion) {
this.logger.info(`Collection ${registry.collectionName} is already at version ${registry.currentVersion}`);
return 0;
}
const migrationSteps = this.getRequiredMigrationSteps(registry, minVersionInDb);
let collectionMigrated = 0;
for (const migrationStep of migrationSteps) {
collectionMigrated += await this.executeCollectionMigrationStep(registry, migrationStep);
}
return collectionMigrated;
}
/**
* Gets the required migration steps to upgrade from the current version in the database to the target version.
* Validates that there are no missing migration steps in the chain.
*
* @param registry - The collection migration registry
* @param minVersionInDb - The minimum schema version currently present in the database
* @returns Array of migration steps that need to be executed in order
*/
protected getRequiredMigrationSteps<TDocument extends SchemaMigratableDocument>(
registry: CollectionMigrationRegistry<TDocument>,
minVersionInDb: SchemaVersion
): SchemaMigrationStep<TDocument>[] {
const migrationSteps = this.findSchemaMigrationSteps(registry, minVersionInDb, registry.currentVersion);
if (migrationSteps.length === 0) {
throw new Error(
`No migration steps found for ${registry.collectionName} ` +
`(v${minVersionInDb} -> v${registry.currentVersion}). Startup aborted.`
);
}
this.logger.info(
`Found ${migrationSteps.length} migration steps for ${registry.collectionName} ` +
`(v${minVersionInDb} -> v${registry.currentVersion})`
);
return migrationSteps;
}
/**
* Executes a single migration step for a collection, applying the transform to all documents at the fromVersion.
* Handles marking the migration as started, completed, or failed in the migration repository.
*
* @param registry - The collection migration registry
* @param migrationStep - The specific migration step to execute
* @returns Number of documents migrated in this step
*/
protected async executeCollectionMigrationStep<TDocument extends SchemaMigratableDocument>(
registry: CollectionMigrationRegistry<TDocument>,
migrationStep: SchemaMigrationStep<TDocument>
): Promise<number> {
const pendingBefore = await this.countDocumentsAtSchemaVersion(registry.model, migrationStep.fromVersion);
if (pendingBefore === 0) {
this.logger.info(`Migration ${migrationStep.name} has no pending documents, skipping execution`);
return 0;
}
const isCompleted = await this.migrationRepository.isCompleted(migrationStep.name);
if (isCompleted) {
this.logger.warn(
`Migration ${migrationStep.name} is marked as completed but still has ${pendingBefore} pending ` +
`documents at schemaVersion ${migrationStep.fromVersion}. Re-running migration step.`
);
}
await this.migrationRepository.markAsStarted(migrationStep.name);
try {
this.logger.info(`Executing migration: ${migrationStep.name}`);
const result = await this.runSchemaMigrationStep(migrationStep, registry.model);
const pendingAfter = await this.countDocumentsAtSchemaVersion(registry.model, migrationStep.fromVersion);
const metadata: Record<string, unknown> = {
collectionName: registry.collectionName,
fromVersion: migrationStep.fromVersion,
toVersion: migrationStep.toVersion,
migratedCount: result.migratedCount,
failedCount: result.failedCount,
pendingBefore,
pendingAfter,
durationMs: result.durationMs
};
if (result.failedCount > 0 || pendingAfter > 0) {
const failureReason =
`Migration ${migrationStep.name} did not complete successfully. ` +
`failedCount=${result.failedCount}, pendingAfter=${pendingAfter}`;
await this.migrationRepository.markAsFailed(migrationStep.name, failureReason);
throw new Error(failureReason);
}
await this.migrationRepository.markAsCompleted(migrationStep.name, metadata);
this.logger.info(
`Migration ${migrationStep.name} completed: ${result.migratedCount} documents migrated (${result.durationMs}ms)`
);
return result.migratedCount;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
await this.migrationRepository.markAsFailed(migrationStep.name, errorMessage);
throw error;
}
}
/**
* Executes a single schema migration step on all documents that match the fromVersion.
* Applies the transform function to each document and updates it to the toVersion.
*
* @param migrationStep - The migration step to execute
* @param model - Mongoose model for the collection being migrated
* @param batchSize - Number of documents to process in each batch. Default is 50.
* @returns Migration result with statistics about the execution
*/
protected async runSchemaMigrationStep<TDocument extends SchemaMigratableDocument>(
migrationStep: SchemaMigrationStep<TDocument>,
model: Model<TDocument>,
batchSize = 50
): Promise<MigrationResult> {
const startTime = Date.now();
let migratedCount = 0;
let failedCount = 0;
const versionFilter = { schemaVersion: migrationStep.fromVersion };
const totalDocs = await model.countDocuments(versionFilter).exec();
if (totalDocs === 0) {
return {
migratedCount,
failedCount,
durationMs: Date.now() - startTime
};
}
let processedCount = 0;
let lastProcessedId: TDocument['_id'] | null = null;
let hasMoreDocuments = true;
while (hasMoreDocuments) {
const batchFilter =
lastProcessedId === null
? versionFilter
: {
...versionFilter,
_id: { $gt: lastProcessedId }
};
const documents = await model.find(batchFilter).sort({ _id: 1 }).limit(batchSize).exec();
if (documents.length === 0) {
break;
}
const batchResults = await Promise.allSettled(
documents.map(async (doc) => {
const transformedUpdate = migrationStep.transform(doc);
const update = this.appendSchemaVersionUpdate(transformedUpdate, migrationStep.toVersion);
await model.updateOne({ _id: doc._id }, update).exec();
return String(doc._id);
})
);
for (let i = 0; i < batchResults.length; i++) {
const batchResult = batchResults[i];
if (batchResult.status === 'fulfilled') {
migratedCount++;
continue;
}
failedCount++;
this.logger.warn(`Failed to migrate document ${String(documents[i]._id)}:`, batchResult.reason);
}
processedCount += documents.length;
lastProcessedId = documents[documents.length - 1]._id;
hasMoreDocuments = documents.length === batchSize;
this.logger.debug(`Processed ${processedCount}/${totalDocs} documents`);
}
return {
migratedCount,
failedCount,
durationMs: Date.now() - startTime
};
}
/**
* Gets the minimum schema version present in the collection to detect the oldest pending version of documents.
*
* @param model - Mongoose model for the collection
* @returns Current version or null if collection is empty
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
protected async getCurrentSchemaVersion(model: Model<any>): Promise<number | null> {
protected async getMinSchemaVersion<TDocument extends SchemaMigratableDocument>(
model: Model<TDocument>
): Promise<SchemaVersion | null> {
try {
// Get a sample document to check its version
const sampleDoc = await model.findOne({}).select('schemaVersion').exec();
const sampleDoc = await model.findOne({}).sort({ schemaVersion: 1 }).select('schemaVersion').exec();
if (!sampleDoc) {
return null; // Collection is empty
@ -232,39 +341,107 @@ export class MigrationService {
}
/**
* Finds the migrations needed to upgrade from one version to another.
* Gets the maximum schema version present in the collection to detect if there are any documents above expected version.
*
* @param model - Mongoose model for the collection
* @returns Maximum version or null if collection is empty
*/
protected async getMaxSchemaVersion<TDocument extends SchemaMigratableDocument>(
model: Model<TDocument>
): Promise<SchemaVersion | null> {
try {
const sampleDoc = await model.findOne({}).sort({ schemaVersion: -1 }).select('schemaVersion').exec();
if (!sampleDoc) {
return null; // Collection is empty
}
return sampleDoc.schemaVersion ?? 1;
} catch (error) {
this.logger.error('Error getting max schema version:', error);
throw error;
}
}
/**
* Counts how many documents are at a specific schema version.
*
* @param model - Mongoose model for the collection
* @param schemaVersion - Schema version to count
* @returns Number of documents at the specified schema version
*/
protected async countDocumentsAtSchemaVersion<TDocument extends SchemaMigratableDocument>(
model: Model<TDocument>,
schemaVersion: SchemaVersion
): Promise<number> {
return model.countDocuments({ schemaVersion }).exec();
}
/**
* Finds the schema migration steps needed to upgrade from one version to another.
* Returns migrations in the correct order to apply.
*
* @param registry - Collection migration registry
* @param fromVersion - Current version in database
* @param toVersion - Target version from application
* @returns Array of migrations to execute in order
* @returns Array of schema migration steps to execute in order
*/
protected findNeededMigrations(
registry: CollectionMigrationRegistry,
fromVersion: number,
toVersion: number
): ISchemaMigration[] {
const needed: ISchemaMigration[] = [];
protected findSchemaMigrationSteps<TDocument extends SchemaMigratableDocument>(
registry: CollectionMigrationRegistry<TDocument>,
fromVersion: SchemaVersion,
toVersion: SchemaVersion
): SchemaMigrationStep<TDocument>[] {
const needed: SchemaMigrationStep<TDocument>[] = [];
// Build a chain of migrations from fromVersion to toVersion
let currentVersion = fromVersion;
while (currentVersion < toVersion) {
const nextMigration = registry.migrations.find((m) => m.fromVersion === currentVersion);
const nextVersion = currentVersion + 1;
const expectedMigrationName = generateSchemaMigrationName(
registry.collectionName,
currentVersion,
nextVersion
);
const transform = registry.migrations.get(expectedMigrationName);
if (!nextMigration) {
this.logger.warn(
`No migration found from version ${currentVersion} for ${registry.collectionName}. ` +
`Migration chain is incomplete.`
if (!transform) {
throw new Error(
`No migration found from version ${currentVersion} to ${nextVersion} for ` +
`${registry.collectionName}. Migration chain is incomplete.`
);
break;
}
needed.push(nextMigration);
currentVersion = nextMigration.toVersion;
needed.push({
name: expectedMigrationName,
fromVersion: currentVersion,
toVersion: nextVersion,
transform
});
currentVersion = nextVersion;
}
return needed;
}
/**
* Appends a schemaVersion update to the migration update operation.
* Ensures that migrated documents are marked with the new version.
*
* @param update - Original migration update operation
* @param toVersion - Target schema version to set
* @returns Updated migration operation with schemaVersion set
*/
protected appendSchemaVersionUpdate<TDocument extends SchemaMigratableDocument>(
update: MigrationUpdate<TDocument>,
toVersion: SchemaVersion
): MigrationUpdate<TDocument> {
return {
...update,
$set: {
...(update.$set ?? {}),
schemaVersion: toVersion
}
};
}
}

1070
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff