Merge branch 'main' into feat/room-members-users
This commit is contained in:
commit
71c08dee8c
@ -51,12 +51,14 @@ export const INTERNAL_CONFIG = {
|
||||
// MongoDB Schema Versions
|
||||
// These define the current schema version for each collection
|
||||
// Increment when making breaking changes to the schema structure
|
||||
GLOBAL_CONFIG_SCHEMA_VERSION: 2 as SchemaVersion,
|
||||
USER_SCHEMA_VERSION: 2 as SchemaVersion,
|
||||
API_KEY_SCHEMA_VERSION: 1 as SchemaVersion,
|
||||
// IMPORTANT: whenever you increment a schema version, update the MIGRATION_REV timestamp too.
|
||||
// This helps surface merge conflicts when multiple branches create schema migrations concurrently.
|
||||
GLOBAL_CONFIG_SCHEMA_VERSION: 2 as SchemaVersion, // MIGRATION_REV: 1771328577054
|
||||
USER_SCHEMA_VERSION: 2 as SchemaVersion, // MIGRATION_REV: 1771328577054
|
||||
API_KEY_SCHEMA_VERSION: 1 as SchemaVersion, // MIGRATION_REV: 1771328577054
|
||||
ROOM_SCHEMA_VERSION: 2 as SchemaVersion,
|
||||
ROOM_MEMBER_SCHEMA_VERSION: 1 as SchemaVersion,
|
||||
RECORDING_SCHEMA_VERSION: 1 as SchemaVersion
|
||||
ROOM_MEMBER_SCHEMA_VERSION: 2 as SchemaVersion, // MIGRATION_REV: 1771328577054
|
||||
RECORDING_SCHEMA_VERSION: 2 as SchemaVersion // MIGRATION_REV: 1771328577054
|
||||
};
|
||||
|
||||
// This function is used to set private configuration values for testing purposes.
|
||||
|
||||
@ -21,7 +21,6 @@ The schema migration system enables safe evolution of MongoDB document structure
|
||||
- ✅ **HA-safe** (distributed locking prevents concurrent migrations)
|
||||
- ✅ **Batch processing** (efficient handling of large collections)
|
||||
- ✅ **Progress tracking** (migrations stored in `MeetMigration` collection)
|
||||
- ✅ **Version validation** (optional runtime checks in repositories)
|
||||
|
||||
---
|
||||
|
||||
@ -47,7 +46,6 @@ Each document includes a `schemaVersion` field:
|
||||
```
|
||||
src/
|
||||
├── migrations/
|
||||
│ ├── base-migration.ts # Base class for migrations
|
||||
│ ├── migration-registry.ts # Central registry of all collections
|
||||
│ ├── room-migrations.ts # Room-specific migrations
|
||||
│ ├── recording-migrations.ts # Recording-specific migrations
|
||||
@ -59,100 +57,13 @@ src/
|
||||
└── migration.model.ts # Migration types and interfaces
|
||||
```
|
||||
|
||||
**Note**: All migration types and interfaces (`ISchemaMigration`, `MigrationContext`, `MigrationResult`, `SchemaVersion`, `CollectionMigrationRegistry`) are defined in `src/models/migration.model.ts` for better code organization.
|
||||
|
||||
---
|
||||
|
||||
## Adding New Migrations
|
||||
|
||||
### Step 1: Update Schema Version in Configuration
|
||||
### Step 1: Update TypeScript Interface
|
||||
|
||||
In `src/config/internal-config.ts`, increment the version constant:
|
||||
|
||||
```typescript
|
||||
// internal-config.ts
|
||||
export const INTERNAL_CONFIG = {
|
||||
// ... other config
|
||||
ROOM_SCHEMA_VERSION: 2 // Was 1
|
||||
// ...
|
||||
};
|
||||
```
|
||||
|
||||
### Step 2: Create Migration Class
|
||||
|
||||
```typescript
|
||||
import { BaseSchemaMigration } from './base-migration.js';
|
||||
import { MeetRoomDocument } from '../repositories/schemas/room.schema.js';
|
||||
import { MigrationContext } from '../models/migration.model.js';
|
||||
import { Model } from 'mongoose';
|
||||
|
||||
class RoomMigrationV1ToV2 extends BaseSchemaMigration<MeetRoomDocument> {
|
||||
fromVersion = 1;
|
||||
toVersion = 2;
|
||||
description = 'Add maxParticipants field with default value of 100';
|
||||
|
||||
protected async transform(document: MeetRoomDocument): Promise<Partial<MeetRoomDocument>> {
|
||||
// Return fields to update (schemaVersion is handled automatically)
|
||||
return {
|
||||
maxParticipants: 100
|
||||
};
|
||||
}
|
||||
|
||||
// Optional: Add validation before migration runs
|
||||
async validate(model: Model<MeetRoomDocument>, context: MigrationContext): Promise<boolean> {
|
||||
// Check prerequisites, data integrity, etc.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Step 3: Register Migration
|
||||
|
||||
Add the migration instance to the migrations array in `room-migrations.ts`:
|
||||
|
||||
```typescript
|
||||
import { ISchemaMigration } from '../models/migration.model.js';
|
||||
import { MeetRoomDocument } from '../repositories/schemas/room.schema.js';
|
||||
|
||||
export const roomMigrations: ISchemaMigration<MeetRoomDocument>[] = [
|
||||
new RoomMigrationV1ToV2()
|
||||
// Future migrations will be added here
|
||||
];
|
||||
```
|
||||
|
||||
### Step 4: Update Schema Definition
|
||||
|
||||
Update the Mongoose schema default version in `internal-config.ts`:
|
||||
|
||||
```typescript
|
||||
// config/internal-config.ts
|
||||
export const INTERNAL_CONFIG = {
|
||||
// ... other config
|
||||
ROOM_SCHEMA_VERSION: 2 // Updated from 1
|
||||
// ...
|
||||
};
|
||||
```
|
||||
|
||||
If adding new required fields, update the Mongoose schema:
|
||||
|
||||
```typescript
|
||||
// repositories/schemas/room.schema.ts
|
||||
import { INTERNAL_CONFIG } from '../../config/internal-config.js';
|
||||
|
||||
const MeetRoomSchema = new Schema<MeetRoomDocument>({
|
||||
schemaVersion: {
|
||||
type: Number,
|
||||
required: true,
|
||||
default: INTERNAL_CONFIG.ROOM_SCHEMA_VERSION // Uses config value (2)
|
||||
},
|
||||
// ... existing fields ...
|
||||
maxParticipants: { type: Number, required: true, default: 100 } // New field
|
||||
});
|
||||
```
|
||||
|
||||
### Step 5: Update TypeScript Interface
|
||||
|
||||
Update the domain interface to include new fields:
|
||||
Update the domain interface to include new fields or changes:
|
||||
|
||||
```typescript
|
||||
// typings/src/room.ts
|
||||
@ -163,6 +74,66 @@ export interface MeetRoom extends MeetRoomOptions {
|
||||
}
|
||||
```
|
||||
|
||||
### Step 2: Update Schema Version in Configuration
|
||||
|
||||
In `src/config/internal-config.ts`, increment the version constant and update the `MIGRATION_REV` timestamp comment on the same line:
|
||||
|
||||
```typescript
|
||||
// internal-config.ts
|
||||
export const INTERNAL_CONFIG = {
|
||||
// ... other config
|
||||
ROOM_SCHEMA_VERSION: 2 as SchemaVersion // MIGRATION_REV: 1771328577054
|
||||
// ...
|
||||
};
|
||||
```
|
||||
|
||||
`MIGRATION_REV` is a unique marker (current timestamp in milliseconds) used to make concurrent schema-version bumps more visible during Git merges.
|
||||
|
||||
If a merge conflict appears in that line, it means multiple migrations were created in parallel; resolve it by:
|
||||
|
||||
1. Keeping all migration code changes.
|
||||
2. Re-evaluating the final schema version number.
|
||||
3. Updating `MIGRATION_REV` again with a new timestamp.
|
||||
|
||||
### Step 3: Update Moongose Schema
|
||||
|
||||
Update the Mongoose schema to reflect the changes (new fields, etc.):
|
||||
|
||||
```typescript
|
||||
// models/mongoose-schemas/room.schema.ts
|
||||
const MeetRoomSchema = new Schema<MeetRoomDocument>({
|
||||
// ... existing fields ...
|
||||
maxParticipants: { type: Number, required: true, default: 100 } // New field
|
||||
});
|
||||
```
|
||||
|
||||
### Step 4: Create Migration Definition
|
||||
|
||||
```typescript
|
||||
import { SchemaTransform, generateSchemaMigrationName } from '../models/migration.model.js';
|
||||
import { meetRoomCollectionName, MeetRoomDocument } from '../models/mongoose-schemas/room.schema.js';
|
||||
|
||||
const roomMigrationV1ToV2Name = generateSchemaMigrationName(meetRoomCollectionName, 1, 2);
|
||||
|
||||
const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = (room) => {
|
||||
room.maxParticipants = 100;
|
||||
return room;
|
||||
};
|
||||
```
|
||||
|
||||
`transform` must return the updated document instance.
|
||||
It can mutate the received document by adding, removing, or modifying fields as needed to conform to the new schema version.
|
||||
|
||||
### Step 5: Register Migration
|
||||
|
||||
Add the migration to the map initialization in `room-migrations.ts`:
|
||||
|
||||
```typescript
|
||||
export const roomMigrations: SchemaMigrationMap<MeetRoomDocument> = new Map([
|
||||
[roomMigrationV1ToV2Name, roomMigrationV1ToV2Transform]
|
||||
]);
|
||||
```
|
||||
|
||||
### Step 6: Test Migration
|
||||
|
||||
1. Start application - migration runs automatically
|
||||
@ -187,7 +158,6 @@ Each migration is tracked in the `MeetMigration` collection:
|
||||
"fromVersion": 1,
|
||||
"toVersion": 2,
|
||||
"migratedCount": 1523,
|
||||
"skippedCount": 0,
|
||||
"failedCount": 0,
|
||||
"durationMs": 123000
|
||||
}
|
||||
|
||||
@ -1,24 +1,19 @@
|
||||
import { ISchemaMigration } from '../models/migration.model.js';
|
||||
import { SchemaMigrationMap } from '../models/migration.model.js';
|
||||
import { MeetApiKeyDocument } from '../models/mongoose-schemas/api-key.schema.js';
|
||||
|
||||
/**
|
||||
* All migrations for the MeetApiKey collection in chronological order.
|
||||
* Add new migrations to this array as the schema evolves.
|
||||
* Schema migrations for MeetApiKey.
|
||||
* Key format: schema_{collection}_v{from}_to_v{to}
|
||||
*
|
||||
* Example migration (when needed in the future):
|
||||
* Example:
|
||||
*
|
||||
* class ApiKeyMigrationV1ToV2 extends BaseSchemaMigration<MeetApiKeyDocument> {
|
||||
* fromVersion = 1;
|
||||
* toVersion = 2;
|
||||
* description = 'Add expirationDate field for API key expiration';
|
||||
* const apiKeyMigrationV1ToV2Name = generateSchemaMigrationName(meetApiKeyCollectionName, 1, 2);
|
||||
*
|
||||
* protected async transform(document: MeetApiKeyDocument): Promise<Partial<MeetApiKeyDocument>> {
|
||||
* return {
|
||||
* expirationDate: undefined // No expiration for existing keys
|
||||
* };
|
||||
* }
|
||||
* }
|
||||
* const apiKeyMigrationV1ToV2Transform: SchemaTransform<MeetApiKeyDocument> = (apiKey) => {
|
||||
* apiKey.expirationDate = undefined;
|
||||
* return apiKey;
|
||||
* };
|
||||
*/
|
||||
export const apiKeyMigrations: ISchemaMigration<MeetApiKeyDocument>[] = [
|
||||
// Migrations will be added here as the schema evolves
|
||||
];
|
||||
export const apiKeyMigrations: SchemaMigrationMap<MeetApiKeyDocument> = new Map([
|
||||
// [apiKeyMigrationV1ToV2Name, apiKeyMigrationV1ToV2Transform]
|
||||
]);
|
||||
|
||||
@ -1,124 +0,0 @@
|
||||
import { Model } from 'mongoose';
|
||||
import { ISchemaMigration, MigrationContext, MigrationResult, SchemaVersion } from '../models/migration.model.js';
|
||||
|
||||
/**
|
||||
* Base class for schema migrations providing common functionality.
|
||||
* Extend this class to implement specific migrations for collections.
|
||||
*/
|
||||
export abstract class BaseSchemaMigration<TDocument> implements ISchemaMigration<TDocument> {
|
||||
abstract fromVersion: SchemaVersion;
|
||||
abstract toVersion: SchemaVersion;
|
||||
abstract description: string;
|
||||
|
||||
/**
|
||||
* Default batch size for processing documents.
|
||||
* Can be overridden in subclasses for collections with large documents.
|
||||
*/
|
||||
protected readonly defaultBatchSize = 50;
|
||||
|
||||
/**
|
||||
* Executes the migration in batches.
|
||||
* Processes all documents at fromVersion and upgrades them to toVersion.
|
||||
*/
|
||||
async execute(model: Model<TDocument>, context: MigrationContext): Promise<MigrationResult> {
|
||||
const startTime = Date.now();
|
||||
const batchSize = context.batchSize || this.defaultBatchSize;
|
||||
let migratedCount = 0;
|
||||
const skippedCount = 0;
|
||||
let failedCount = 0;
|
||||
|
||||
context.logger.info(
|
||||
`Starting schema migration: ${this.description} (v${this.fromVersion} -> v${this.toVersion})`
|
||||
);
|
||||
|
||||
try {
|
||||
// Find all documents at the source version
|
||||
const totalDocs = await model.countDocuments({ schemaVersion: this.fromVersion }).exec();
|
||||
|
||||
if (totalDocs === 0) {
|
||||
context.logger.info('No documents to migrate');
|
||||
return {
|
||||
migratedCount: 0,
|
||||
skippedCount: 0,
|
||||
failedCount: 0,
|
||||
durationMs: Date.now() - startTime
|
||||
};
|
||||
}
|
||||
|
||||
context.logger.info(`Found ${totalDocs} documents to migrate`);
|
||||
|
||||
// Process documents in batches
|
||||
let processedCount = 0;
|
||||
|
||||
while (processedCount < totalDocs) {
|
||||
const documents = await model.find({ schemaVersion: this.fromVersion }).limit(batchSize).exec();
|
||||
|
||||
if (documents.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Transform and update each document
|
||||
for (const doc of documents) {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const updates = await this.transform(doc as any);
|
||||
|
||||
// Update the document with new fields and version
|
||||
await model
|
||||
.updateOne(
|
||||
{ _id: doc._id },
|
||||
{
|
||||
$set: {
|
||||
...updates,
|
||||
schemaVersion: this.toVersion
|
||||
}
|
||||
}
|
||||
)
|
||||
.exec();
|
||||
|
||||
migratedCount++;
|
||||
} catch (error) {
|
||||
failedCount++;
|
||||
context.logger.warn(`Failed to migrate document ${doc._id}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
processedCount += documents.length;
|
||||
context.logger.debug(`Processed ${processedCount}/${totalDocs} documents`);
|
||||
}
|
||||
|
||||
const durationMs = Date.now() - startTime;
|
||||
context.logger.info(
|
||||
`Migration completed: ${migratedCount} migrated, ${failedCount} failed (${durationMs}ms)`
|
||||
);
|
||||
|
||||
return {
|
||||
migratedCount,
|
||||
skippedCount,
|
||||
failedCount,
|
||||
durationMs
|
||||
};
|
||||
} catch (error) {
|
||||
context.logger.error('Migration failed:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform a single document from source version to target version.
|
||||
* Override this method to implement the specific transformation logic.
|
||||
*
|
||||
* @param document - The document to transform
|
||||
* @returns Object with fields to update (excluding schemaVersion which is handled automatically)
|
||||
*/
|
||||
protected abstract transform(document: TDocument): Promise<Partial<TDocument>>;
|
||||
|
||||
/**
|
||||
* Optional validation before running migration.
|
||||
* Default implementation always returns true.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
async validate(_model: Model<TDocument>, _context: MigrationContext): Promise<boolean> {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -1,27 +1,19 @@
|
||||
import { ISchemaMigration } from '../models/migration.model.js';
|
||||
import { SchemaMigrationMap } from '../models/migration.model.js';
|
||||
import { MeetGlobalConfigDocument } from '../models/mongoose-schemas/global-config.schema.js';
|
||||
|
||||
/**
|
||||
* All migrations for the MeetGlobalConfig collection in chronological order.
|
||||
* Add new migrations to this array as the schema evolves.
|
||||
* Schema migrations for MeetGlobalConfig.
|
||||
* Key format: schema_{collection}_v{from}_to_v{to}
|
||||
*
|
||||
* Example migration (when needed in the future):
|
||||
* Example:
|
||||
*
|
||||
* class GlobalConfigMigrationV1ToV2 extends BaseSchemaMigration<MeetGlobalConfigDocument> {
|
||||
* fromVersion = 1;
|
||||
* toVersion = 2;
|
||||
* description = 'Add new branding configuration section';
|
||||
* const globalConfigMigrationV1ToV2Name = generateSchemaMigrationName(meetGlobalConfigCollectionName, 1, 2);
|
||||
*
|
||||
* protected async transform(document: MeetGlobalConfigDocument): Promise<Partial<MeetGlobalConfigDocument>> {
|
||||
* return {
|
||||
* brandingConfig: {
|
||||
* logoUrl: '',
|
||||
* companyName: 'OpenVidu Meet'
|
||||
* }
|
||||
* };
|
||||
* }
|
||||
* }
|
||||
* const globalConfigMigrationV1ToV2Transform: SchemaTransform<MeetGlobalConfigDocument> = (globalConfig) => {
|
||||
* globalConfig.newField = 'defaultValue';
|
||||
* return globalConfig;
|
||||
* };
|
||||
*/
|
||||
export const globalConfigMigrations: ISchemaMigration<MeetGlobalConfigDocument>[] = [
|
||||
// Migrations will be added here as the schema evolves
|
||||
];
|
||||
export const globalConfigMigrations: SchemaMigrationMap<MeetGlobalConfigDocument> = new Map([
|
||||
// [globalConfigMigrationV1ToV2Name, globalConfigMigrationV1ToV2Transform]
|
||||
]);
|
||||
|
||||
6
meet-ce/backend/src/migrations/index.ts
Normal file
6
meet-ce/backend/src/migrations/index.ts
Normal file
@ -0,0 +1,6 @@
|
||||
export * from './api-key-migrations.js';
|
||||
export * from './global-config-migrations.js';
|
||||
export * from './migration-registry.js';
|
||||
export * from './recording-migrations.js';
|
||||
export * from './room-migrations.js';
|
||||
export * from './user-migrations.js';
|
||||
@ -1,13 +1,22 @@
|
||||
import { INTERNAL_CONFIG } from '../config/internal-config.js';
|
||||
import { CollectionMigrationRegistry } from '../models/migration.model.js';
|
||||
import { meetApiKeyCollectionName, MeetApiKeyModel } from '../models/mongoose-schemas/api-key.schema.js';
|
||||
import { CollectionMigrationRegistry, SchemaMigratableDocument } from '../models/migration.model.js';
|
||||
import {
|
||||
meetApiKeyCollectionName,
|
||||
MeetApiKeyDocument,
|
||||
MeetApiKeyModel
|
||||
} from '../models/mongoose-schemas/api-key.schema.js';
|
||||
import {
|
||||
meetGlobalConfigCollectionName,
|
||||
MeetGlobalConfigDocument,
|
||||
MeetGlobalConfigModel
|
||||
} from '../models/mongoose-schemas/global-config.schema.js';
|
||||
import { meetRecordingCollectionName, MeetRecordingModel } from '../models/mongoose-schemas/recording.schema.js';
|
||||
import { meetRoomCollectionName, MeetRoomModel } from '../models/mongoose-schemas/room.schema.js';
|
||||
import { meetUserCollectionName, MeetUserModel } from '../models/mongoose-schemas/user.schema.js';
|
||||
import {
|
||||
meetRecordingCollectionName,
|
||||
MeetRecordingDocument,
|
||||
MeetRecordingModel
|
||||
} from '../models/mongoose-schemas/recording.schema.js';
|
||||
import { meetRoomCollectionName, MeetRoomDocument, MeetRoomModel } from '../models/mongoose-schemas/room.schema.js';
|
||||
import { meetUserCollectionName, MeetUserDocument, MeetUserModel } from '../models/mongoose-schemas/user.schema.js';
|
||||
import { apiKeyMigrations } from './api-key-migrations.js';
|
||||
import { globalConfigMigrations } from './global-config-migrations.js';
|
||||
import { recordingMigrations } from './recording-migrations.js';
|
||||
@ -16,12 +25,18 @@ import { userMigrations } from './user-migrations.js';
|
||||
|
||||
/**
|
||||
* Central registry of all collection migrations.
|
||||
* Defines the current version and migration path for each collection.
|
||||
* Defines the current version and migration map for each collection.
|
||||
*
|
||||
* Order matters: collections should be listed in dependency order.
|
||||
* For example, if recordings depend on rooms, rooms should come first.
|
||||
*/
|
||||
export const migrationRegistry: CollectionMigrationRegistry[] = [
|
||||
const migrationRegistry: [
|
||||
CollectionMigrationRegistry<MeetGlobalConfigDocument>,
|
||||
CollectionMigrationRegistry<MeetUserDocument>,
|
||||
CollectionMigrationRegistry<MeetApiKeyDocument>,
|
||||
CollectionMigrationRegistry<MeetRoomDocument>,
|
||||
CollectionMigrationRegistry<MeetRecordingDocument>
|
||||
] = [
|
||||
// GlobalConfig - no dependencies, can run first
|
||||
{
|
||||
collectionName: meetGlobalConfigCollectionName,
|
||||
@ -59,3 +74,10 @@ export const migrationRegistry: CollectionMigrationRegistry[] = [
|
||||
migrations: recordingMigrations
|
||||
}
|
||||
];
|
||||
|
||||
/**
|
||||
* Homogeneous runtime view of the migration registry.
|
||||
* Used by migration execution code that iterates over all collections.
|
||||
*/
|
||||
export const runtimeMigrationRegistry =
|
||||
migrationRegistry as unknown as CollectionMigrationRegistry<SchemaMigratableDocument>[];
|
||||
|
||||
@ -1,24 +1,19 @@
|
||||
import { ISchemaMigration } from '../models/migration.model.js';
|
||||
import { MeetRecordingDocument } from '../models/mongoose-schemas/recording.schema.js';
|
||||
import { MeetRecordingEncodingPreset, MeetRecordingLayout } from '@openvidu-meet/typings';
|
||||
import { generateSchemaMigrationName, SchemaMigrationMap, SchemaTransform } from '../models/migration.model.js';
|
||||
import { meetRecordingCollectionName, MeetRecordingDocument } from '../models/mongoose-schemas/recording.schema.js';
|
||||
|
||||
const recordingMigrationV1ToV2Name = generateSchemaMigrationName(meetRecordingCollectionName, 1, 2);
|
||||
|
||||
const recordingMigrationV1ToV2Transform: SchemaTransform<MeetRecordingDocument> = (recording) => {
|
||||
recording.layout = MeetRecordingLayout.GRID;
|
||||
recording.encoding = MeetRecordingEncodingPreset.H264_720P_30;
|
||||
return recording;
|
||||
};
|
||||
|
||||
/**
|
||||
* All migrations for the MeetRecording collection in chronological order.
|
||||
* Add new migrations to this array as the schema evolves.
|
||||
*
|
||||
* Example migration (when needed in the future):
|
||||
*
|
||||
* class RecordingMigrationV1ToV2 extends BaseSchemaMigration<MeetRecordingDocument> {
|
||||
* fromVersion = 1;
|
||||
* toVersion = 2;
|
||||
* description = 'Add new optional field "quality" for recording quality tracking';
|
||||
*
|
||||
* protected async transform(document: MeetRecordingDocument): Promise<Partial<MeetRecordingDocument>> {
|
||||
* return {
|
||||
* quality: 'standard' // Default quality for existing recordings
|
||||
* };
|
||||
* }
|
||||
* }
|
||||
* Schema migrations for MeetRecording.
|
||||
* Key format: schema_{collection}_v{from}_to_v{to}
|
||||
*/
|
||||
export const recordingMigrations: ISchemaMigration<MeetRecordingDocument>[] = [
|
||||
// Migrations will be added here as the schema evolves
|
||||
];
|
||||
export const recordingMigrations: SchemaMigrationMap<MeetRecordingDocument> = new Map([
|
||||
[recordingMigrationV1ToV2Name, recordingMigrationV1ToV2Transform]
|
||||
]);
|
||||
|
||||
@ -1,26 +1,20 @@
|
||||
import { ISchemaMigration } from '../models/migration.model.js';
|
||||
import { MeetRoomDocument } from '../models/mongoose-schemas/room.schema.js';
|
||||
import { MeetRecordingEncodingPreset, MeetRecordingLayout } from '@openvidu-meet/typings';
|
||||
import { generateSchemaMigrationName, SchemaMigrationMap, SchemaTransform } from '../models/migration.model.js';
|
||||
import { meetRoomCollectionName, MeetRoomDocument } from '../models/mongoose-schemas/room.schema.js';
|
||||
|
||||
const roomMigrationV1ToV2Name = generateSchemaMigrationName(meetRoomCollectionName, 1, 2);
|
||||
|
||||
const roomMigrationV1ToV2Transform: SchemaTransform<MeetRoomDocument> = (room) => {
|
||||
room.config.captions = { enabled: true };
|
||||
room.config.recording.layout = MeetRecordingLayout.GRID;
|
||||
room.config.recording.encoding = MeetRecordingEncodingPreset.H264_720P_30;
|
||||
return room;
|
||||
};
|
||||
|
||||
/**
|
||||
* All migrations for the MeetRoom collection in chronological order.
|
||||
* Add new migrations to this array as the schema evolves.
|
||||
*
|
||||
* Example migration (when needed in the future):
|
||||
*
|
||||
* class RoomMigrationV1ToV2 extends BaseSchemaMigration<MeetRoomDocument> {
|
||||
* fromVersion = 1;
|
||||
* toVersion = 2;
|
||||
* description = 'Add new required field "maxParticipants" with default value';
|
||||
*
|
||||
* protected async transform(document: MeetRoomDocument): Promise<Partial<MeetRoomDocument>> {
|
||||
* return {
|
||||
* maxParticipants: 100 // Add default value for existing rooms
|
||||
* };
|
||||
* }
|
||||
* }
|
||||
* Schema migrations for MeetRoom.
|
||||
* Key format: schema_{collection}_v{from}_to_v{to}
|
||||
*/
|
||||
export const roomMigrations: ISchemaMigration<MeetRoomDocument>[] = [
|
||||
// Migrations will be added here as the schema evolves
|
||||
// Example: new RoomMigrationV1ToV2(),
|
||||
// Example: new RoomMigrationV2ToV3(),
|
||||
];
|
||||
export const roomMigrations: SchemaMigrationMap<MeetRoomDocument> = new Map([
|
||||
[roomMigrationV1ToV2Name, roomMigrationV1ToV2Transform]
|
||||
]);
|
||||
|
||||
@ -1,24 +1,19 @@
|
||||
import { ISchemaMigration } from '../models/migration.model.js';
|
||||
import { SchemaMigrationMap } from '../models/migration.model.js';
|
||||
import { MeetUserDocument } from '../models/mongoose-schemas/user.schema.js';
|
||||
|
||||
/**
|
||||
* All migrations for the MeetUser collection in chronological order.
|
||||
* Add new migrations to this array as the schema evolves.
|
||||
* Schema migrations for MeetUser.
|
||||
* Key format: schema_{collection}_v{from}_to_v{to}
|
||||
*
|
||||
* Example migration (when needed in the future):
|
||||
* Example:
|
||||
*
|
||||
* class UserMigrationV1ToV2 extends BaseSchemaMigration<MeetUserDocument> {
|
||||
* fromVersion = 1;
|
||||
* toVersion = 2;
|
||||
* description = 'Add email field for user notifications';
|
||||
* const userMigrationV1ToV2Name = generateSchemaMigrationName(meetUserCollectionName, 1, 2);
|
||||
*
|
||||
* protected async transform(document: MeetUserDocument): Promise<Partial<MeetUserDocument>> {
|
||||
* return {
|
||||
* email: undefined // Email will be optional initially
|
||||
* };
|
||||
* }
|
||||
* }
|
||||
* const userMigrationV1ToV2Transform: SchemaTransform<MeetUserDocument> = (user) => {
|
||||
* user.newField = 'defaultValue';
|
||||
* return user;
|
||||
* };
|
||||
*/
|
||||
export const userMigrations: ISchemaMigration<MeetUserDocument>[] = [
|
||||
// Migrations will be added here as the schema evolves
|
||||
];
|
||||
export const userMigrations: SchemaMigrationMap<MeetUserDocument> = new Map([
|
||||
// [userMigrationV1ToV2Name, userMigrationV1ToV2Transform]
|
||||
]);
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import { Model } from 'mongoose';
|
||||
import { LoggerService } from '../services/logger.service.js';
|
||||
import { Document, Model } from 'mongoose';
|
||||
|
||||
/**
|
||||
* Interface representing a migration document in MongoDB.
|
||||
@ -58,19 +57,11 @@ export enum MigrationStatus {
|
||||
}
|
||||
|
||||
/**
|
||||
* Enum defining all possible migration names in the system.
|
||||
* Each migration should have a unique identifier.
|
||||
*
|
||||
* Schema migrations follow the pattern: schema_{collection}_v{from}_to_v{to}
|
||||
* Example: 'schema_room_v1_to_v2', 'schema_recording_v2_to_v3'
|
||||
*/
|
||||
export enum MigrationName {
|
||||
/**
|
||||
* Migration from legacy storage (S3, ABS, GCS) to MongoDB.
|
||||
* Includes: GlobalConfig, Users, ApiKeys, Rooms, and Recordings.
|
||||
*/
|
||||
LEGACY_STORAGE_TO_MONGODB = 'legacy_storage_to_mongodb'
|
||||
}
|
||||
export type SchemaMigrationName = `schema_${string}_v${number}_to_v${number}`;
|
||||
export type MigrationName = SchemaMigrationName;
|
||||
|
||||
/**
|
||||
* Generates a migration name for schema version upgrades.
|
||||
@ -83,12 +74,49 @@ export enum MigrationName {
|
||||
* @example
|
||||
* generateSchemaMigrationName('MeetRoom', 1, 2) // Returns: 'schema_room_v1_to_v2'
|
||||
*/
|
||||
export function generateSchemaMigrationName(collectionName: string, fromVersion: number, toVersion: number): string {
|
||||
export function generateSchemaMigrationName(
|
||||
collectionName: string,
|
||||
fromVersion: number,
|
||||
toVersion: number
|
||||
): SchemaMigrationName {
|
||||
// Convert collection name to lowercase and remove 'Meet' prefix
|
||||
const simpleName = collectionName.replace(/^Meet/, '').toLowerCase();
|
||||
return `schema_${simpleName}_v${fromVersion}_to_v${toVersion}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether a string matches the schema migration naming convention.
|
||||
*/
|
||||
export function isSchemaMigrationName(name: string): name is SchemaMigrationName {
|
||||
return /^schema_[a-z0-9_]+_v\d+_to_v\d+$/.test(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a schema migration name and extracts entity and versions.
|
||||
*/
|
||||
export function parseSchemaMigrationName(
|
||||
name: string
|
||||
): { collectionName: string; fromVersion: SchemaVersion; toVersion: SchemaVersion } | null {
|
||||
const match = /^schema_([a-z0-9_]+)_v(\d+)_to_v(\d+)$/.exec(name);
|
||||
|
||||
if (!match) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
collectionName: match[1],
|
||||
fromVersion: Number(match[2]),
|
||||
toVersion: Number(match[3])
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Base document shape required for schema migrations.
|
||||
*/
|
||||
export interface SchemaMigratableDocument extends Document {
|
||||
schemaVersion?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a schema version number.
|
||||
* Versions start at 1 and increment sequentially.
|
||||
@ -96,14 +124,41 @@ export function generateSchemaMigrationName(collectionName: string, fromVersion:
|
||||
export type SchemaVersion = number;
|
||||
|
||||
/**
|
||||
* Context provided to migration functions.
|
||||
* Contains utilities and services needed during migration.
|
||||
* Function that transforms a document and returns the updated document.
|
||||
*/
|
||||
export interface MigrationContext {
|
||||
/** Logger service for tracking migration progress */
|
||||
logger: LoggerService;
|
||||
/** Batch size for processing documents (default: 50) */
|
||||
batchSize?: number;
|
||||
export type SchemaTransform<TDocument extends SchemaMigratableDocument> = (document: TDocument) => TDocument;
|
||||
|
||||
/**
|
||||
* Map of schema migration names to transform functions.
|
||||
*/
|
||||
export type SchemaMigrationMap<TDocument extends SchemaMigratableDocument> = Map<
|
||||
SchemaMigrationName,
|
||||
SchemaTransform<TDocument>
|
||||
>;
|
||||
|
||||
/**
|
||||
* Resolved migration step ready to be executed.
|
||||
*/
|
||||
export interface SchemaMigrationStep<TDocument extends SchemaMigratableDocument> {
|
||||
name: SchemaMigrationName;
|
||||
fromVersion: SchemaVersion;
|
||||
toVersion: SchemaVersion;
|
||||
transform: SchemaTransform<TDocument>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registry entry for a collection's migrations.
|
||||
* Groups all migrations for a specific collection.
|
||||
*/
|
||||
export interface CollectionMigrationRegistry<TDocument extends SchemaMigratableDocument> {
|
||||
/** Name of the collection */
|
||||
collectionName: string;
|
||||
/** Mongoose model for the collection */
|
||||
model: Model<TDocument>;
|
||||
/** Current schema version expected by the application */
|
||||
currentVersion: SchemaVersion;
|
||||
/** Map of migration names to their transform functions */
|
||||
migrations: SchemaMigrationMap<TDocument>;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -113,60 +168,8 @@ export interface MigrationContext {
|
||||
export interface MigrationResult {
|
||||
/** Number of documents successfully migrated */
|
||||
migratedCount: number;
|
||||
/** Number of documents skipped (already at target version) */
|
||||
skippedCount: number;
|
||||
/** Number of documents that failed migration */
|
||||
failedCount: number;
|
||||
/** Total time taken in milliseconds */
|
||||
durationMs: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface for a single schema migration handler.
|
||||
* Each migration transforms documents from one version to the next.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
export interface ISchemaMigration<TDocument = any> {
|
||||
/** The source schema version this migration upgrades from */
|
||||
fromVersion: SchemaVersion;
|
||||
/** The target schema version this migration upgrades to */
|
||||
toVersion: SchemaVersion;
|
||||
/** Short description of what this migration does */
|
||||
description: string;
|
||||
|
||||
/**
|
||||
* Executes the migration on a batch of documents.
|
||||
* Should update documents using MongoDB bulk operations for efficiency.
|
||||
*
|
||||
* @param model - Mongoose model for the collection
|
||||
* @param context - Migration context with logger and configuration
|
||||
* @returns Migration result with statistics
|
||||
*/
|
||||
execute(model: Model<TDocument>, context: MigrationContext): Promise<MigrationResult>;
|
||||
|
||||
/**
|
||||
* Optional validation to check if migration is safe to run.
|
||||
* Can verify prerequisites or data integrity before migration starts.
|
||||
*
|
||||
* @param model - Mongoose model for the collection
|
||||
* @param context - Migration context with logger and configuration
|
||||
* @returns true if migration can proceed, false otherwise
|
||||
*/
|
||||
validate?(model: Model<TDocument>, context: MigrationContext): Promise<boolean>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registry entry for a collection's migrations.
|
||||
* Groups all migrations for a specific collection.
|
||||
*/
|
||||
export interface CollectionMigrationRegistry {
|
||||
/** Name of the collection */
|
||||
collectionName: string;
|
||||
/** Mongoose model for the collection */
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
model: Model<any>;
|
||||
/** Current schema version expected by the application */
|
||||
currentVersion: SchemaVersion;
|
||||
/** Array of migrations in chronological order */
|
||||
migrations: ISchemaMigration[];
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { Document, model, Schema } from 'mongoose';
|
||||
import { MeetMigration, MigrationName, MigrationStatus } from '../migration.model.js';
|
||||
import { isSchemaMigrationName, MeetMigration, MigrationStatus } from '../migration.model.js';
|
||||
|
||||
/**
|
||||
* Mongoose Document interface for MeetMigration.
|
||||
@ -16,7 +16,10 @@ const MigrationSchema = new Schema<MeetMigrationDocument>(
|
||||
name: {
|
||||
type: String,
|
||||
required: true,
|
||||
enum: Object.values(MigrationName)
|
||||
validate: {
|
||||
validator: (value: string) => isSchemaMigrationName(value),
|
||||
message: 'Invalid migration name format'
|
||||
}
|
||||
},
|
||||
status: {
|
||||
type: String,
|
||||
|
||||
@ -95,7 +95,7 @@ export class MigrationRepository extends BaseRepository<MeetMigration, MeetMigra
|
||||
{
|
||||
$set: {
|
||||
status: MigrationStatus.FAILED,
|
||||
completedAt: new Date(),
|
||||
completedAt: Date.now(),
|
||||
error
|
||||
}
|
||||
}
|
||||
@ -103,27 +103,6 @@ export class MigrationRepository extends BaseRepository<MeetMigration, MeetMigra
|
||||
return this.toDomain(document);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all migrations with their current status.
|
||||
*
|
||||
* @returns Array of all migration documents
|
||||
*/
|
||||
async getAllMigrations(): Promise<MeetMigration[]> {
|
||||
const documents = await this.findAll();
|
||||
return documents;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a specific migration by name.
|
||||
*
|
||||
* @param name - The name of the migration
|
||||
* @returns The migration document or null if not found
|
||||
*/
|
||||
async getMigration(name: MigrationName): Promise<MeetMigration | null> {
|
||||
const document = await this.findOne({ name });
|
||||
return document ? this.toDomain(document) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a migration has been completed successfully.
|
||||
*
|
||||
|
||||
@ -85,7 +85,9 @@ const createApp = () => {
|
||||
if (process.env.NODE_ENV === 'development') {
|
||||
// Serve internal API docs only in development mode
|
||||
appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/docs`, (_req: Request, res: Response) =>
|
||||
res.type('html').send(getOpenApiHtmlWithBasePath(internalApiHtmlFilePath, INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1))
|
||||
res
|
||||
.type('html')
|
||||
.send(getOpenApiHtmlWithBasePath(internalApiHtmlFilePath, INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1))
|
||||
);
|
||||
}
|
||||
|
||||
@ -137,7 +139,9 @@ const startServer = (app: express.Application) => {
|
||||
|
||||
console.log(
|
||||
'REST API Docs: ',
|
||||
chalk.cyanBright(`http://localhost:${MEET_ENV.SERVER_PORT}${basePathDisplay}${INTERNAL_CONFIG.API_BASE_PATH_V1}/docs`)
|
||||
chalk.cyanBright(
|
||||
`http://localhost:${MEET_ENV.SERVER_PORT}${basePathDisplay}${INTERNAL_CONFIG.API_BASE_PATH_V1}/docs`
|
||||
)
|
||||
);
|
||||
logEnvVars();
|
||||
});
|
||||
@ -164,8 +168,8 @@ const isMainModule = (): boolean => {
|
||||
if (isMainModule()) {
|
||||
registerDependencies();
|
||||
const app = createApp();
|
||||
startServer(app);
|
||||
await initializeEagerServices();
|
||||
startServer(app);
|
||||
}
|
||||
|
||||
export { createApp, registerDependencies };
|
||||
|
||||
@ -2,20 +2,16 @@ import { inject, injectable } from 'inversify';
|
||||
import { Model } from 'mongoose';
|
||||
import ms from 'ms';
|
||||
import { MeetLock } from '../helpers/redis.helper.js';
|
||||
import { migrationRegistry } from '../migrations/migration-registry.js';
|
||||
import { runtimeMigrationRegistry } from '../migrations/migration-registry.js';
|
||||
import {
|
||||
CollectionMigrationRegistry,
|
||||
generateSchemaMigrationName,
|
||||
ISchemaMigration,
|
||||
MigrationContext,
|
||||
MigrationName
|
||||
MigrationResult,
|
||||
SchemaMigratableDocument,
|
||||
SchemaMigrationStep,
|
||||
SchemaVersion
|
||||
} from '../models/migration.model.js';
|
||||
import { ApiKeyRepository } from '../repositories/api-key.repository.js';
|
||||
import { GlobalConfigRepository } from '../repositories/global-config.repository.js';
|
||||
import { MigrationRepository } from '../repositories/migration.repository.js';
|
||||
import { RecordingRepository } from '../repositories/recording.repository.js';
|
||||
import { RoomRepository } from '../repositories/room.repository.js';
|
||||
import { UserRepository } from '../repositories/user.repository.js';
|
||||
import { LoggerService } from './logger.service.js';
|
||||
import { MutexService } from './mutex.service.js';
|
||||
|
||||
@ -24,11 +20,6 @@ export class MigrationService {
|
||||
constructor(
|
||||
@inject(LoggerService) protected logger: LoggerService,
|
||||
@inject(MutexService) protected mutexService: MutexService,
|
||||
@inject(GlobalConfigRepository) protected configRepository: GlobalConfigRepository,
|
||||
@inject(UserRepository) protected userRepository: UserRepository,
|
||||
@inject(ApiKeyRepository) protected apiKeyRepository: ApiKeyRepository,
|
||||
@inject(RoomRepository) protected roomRepository: RoomRepository,
|
||||
@inject(RecordingRepository) protected recordingRepository: RecordingRepository,
|
||||
@inject(MigrationRepository) protected migrationRepository: MigrationRepository
|
||||
) {}
|
||||
|
||||
@ -73,133 +64,19 @@ export class MigrationService {
|
||||
/**
|
||||
* Runs all schema migrations to upgrade document structures to the latest version.
|
||||
* Processes each collection in the registry and executes pending migrations.
|
||||
*
|
||||
* Schema migrations run after data migrations and upgrade existing documents
|
||||
* to match the current schema version expected by the application.
|
||||
*/
|
||||
protected async runSchemaMigrations(): Promise<void> {
|
||||
this.logger.info('Running schema migrations...');
|
||||
|
||||
try {
|
||||
let totalMigrated = 0;
|
||||
let totalSkipped = 0;
|
||||
|
||||
// Process each collection in the registry
|
||||
for (const registry of migrationRegistry) {
|
||||
this.logger.info(`Checking schema version for collection: ${registry.collectionName}`);
|
||||
|
||||
// Get the current version of documents in the collection
|
||||
const currentVersionInDb = await this.getCurrentSchemaVersion(registry.model);
|
||||
|
||||
if (currentVersionInDb === null) {
|
||||
this.logger.info(`No documents found in ${registry.collectionName}, skipping migration`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (currentVersionInDb === registry.currentVersion) {
|
||||
this.logger.info(
|
||||
`Collection ${registry.collectionName} is already at version ${registry.currentVersion}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (currentVersionInDb > registry.currentVersion) {
|
||||
this.logger.warn(
|
||||
`Collection ${registry.collectionName} has version ${currentVersionInDb} ` +
|
||||
`but application expects ${registry.currentVersion}. ` +
|
||||
`This may indicate a downgrade or inconsistent deployment.`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find migrations needed to upgrade from current to target version
|
||||
const neededMigrations = this.findNeededMigrations(
|
||||
registry,
|
||||
currentVersionInDb,
|
||||
registry.currentVersion
|
||||
);
|
||||
|
||||
if (neededMigrations.length === 0) {
|
||||
this.logger.info(`No migrations needed for ${registry.collectionName}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
`Found ${neededMigrations.length} migrations for ${registry.collectionName} ` +
|
||||
`(v${currentVersionInDb} -> v${registry.currentVersion})`
|
||||
);
|
||||
|
||||
// Execute each migration in sequence
|
||||
for (const migration of neededMigrations) {
|
||||
const migrationName = generateSchemaMigrationName(
|
||||
registry.collectionName,
|
||||
migration.fromVersion,
|
||||
migration.toVersion
|
||||
);
|
||||
|
||||
// Check if this specific migration was already completed
|
||||
const isCompleted = await this.migrationRepository.isCompleted(migrationName as MigrationName);
|
||||
|
||||
if (isCompleted) {
|
||||
this.logger.info(`Migration ${migrationName} already completed, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Mark migration as started
|
||||
await this.migrationRepository.markAsStarted(migrationName as MigrationName);
|
||||
|
||||
try {
|
||||
const migrationContext: MigrationContext = {
|
||||
logger: this.logger,
|
||||
batchSize: 50 // Default batch size
|
||||
};
|
||||
|
||||
// Validate migration if validation method is provided
|
||||
if (migration.validate) {
|
||||
const isValid = await migration.validate(registry.model, migrationContext);
|
||||
|
||||
if (!isValid) {
|
||||
throw new Error(`Validation failed for migration ${migrationName}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the migration
|
||||
this.logger.info(`Executing migration: ${migration.description}`);
|
||||
const result = await migration.execute(registry.model, migrationContext);
|
||||
|
||||
// Track statistics
|
||||
totalMigrated += result.migratedCount;
|
||||
totalSkipped += result.skippedCount;
|
||||
|
||||
// Mark migration as completed with metadata
|
||||
const metadata: Record<string, unknown> = {
|
||||
collectionName: registry.collectionName,
|
||||
fromVersion: migration.fromVersion,
|
||||
toVersion: migration.toVersion,
|
||||
migratedCount: result.migratedCount,
|
||||
skippedCount: result.skippedCount,
|
||||
failedCount: result.failedCount,
|
||||
durationMs: result.durationMs
|
||||
};
|
||||
|
||||
await this.migrationRepository.markAsCompleted(migrationName as MigrationName, metadata);
|
||||
|
||||
this.logger.info(
|
||||
`Migration ${migrationName} completed: ${result.migratedCount} migrated, ` +
|
||||
`${result.failedCount} failed (${result.durationMs}ms)`
|
||||
);
|
||||
} catch (error) {
|
||||
// Mark migration as failed
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
await this.migrationRepository.markAsFailed(migrationName as MigrationName, errorMessage);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
for (const registry of runtimeMigrationRegistry) {
|
||||
totalMigrated += await this.migrateCollectionSchemas(registry);
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
`Schema migrations completed successfully: ${totalMigrated} documents migrated, ${totalSkipped} skipped`
|
||||
);
|
||||
this.logger.info(`Schema migrations completed successfully: ${totalMigrated} documents migrated`);
|
||||
} catch (error) {
|
||||
this.logger.error('Error running schema migrations:', error);
|
||||
throw error;
|
||||
@ -207,17 +84,266 @@ export class MigrationService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current schema version of documents in a collection.
|
||||
* Samples the database to determine the version.
|
||||
* Migrates documents in a collection through all required schema versions to reach the current version.
|
||||
*
|
||||
* @param registry - The collection migration registry containing migration steps and model
|
||||
* @returns Number of documents migrated in this collection
|
||||
*/
|
||||
protected async migrateCollectionSchemas<TDocument extends SchemaMigratableDocument>(
|
||||
registry: CollectionMigrationRegistry<TDocument>
|
||||
): Promise<number> {
|
||||
this.logger.info(`Checking schema version for collection: ${registry.collectionName}`);
|
||||
|
||||
const oldestSchemaVersionInDb = await this.getMinSchemaVersion(registry.model);
|
||||
|
||||
if (oldestSchemaVersionInDb === null) {
|
||||
this.logger.info(`No documents found in ${registry.collectionName}, skipping migration`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const latestSchemaVersionInDb = await this.getMaxSchemaVersion(registry.model);
|
||||
|
||||
if (latestSchemaVersionInDb && latestSchemaVersionInDb > registry.currentVersion) {
|
||||
throw new Error(
|
||||
`Collection ${registry.collectionName} has schemaVersion ${latestSchemaVersionInDb}, ` +
|
||||
`which is higher than expected ${registry.currentVersion}. ` +
|
||||
'Startup aborted to prevent inconsistent schema handling.'
|
||||
);
|
||||
}
|
||||
|
||||
if (oldestSchemaVersionInDb === registry.currentVersion) {
|
||||
this.logger.info(
|
||||
`Collection ${registry.collectionName} is already at version ${registry.currentVersion}, skipping migration`
|
||||
);
|
||||
return 0;
|
||||
}
|
||||
|
||||
let migratedDocumentsInCollection = 0;
|
||||
|
||||
for (
|
||||
let sourceSchemaVersion = oldestSchemaVersionInDb;
|
||||
sourceSchemaVersion < registry.currentVersion;
|
||||
sourceSchemaVersion++
|
||||
) {
|
||||
const migrationChain = this.getRequiredMigrationSteps(registry, sourceSchemaVersion);
|
||||
migratedDocumentsInCollection += await this.executeMigrationChainForVersion(
|
||||
registry,
|
||||
sourceSchemaVersion,
|
||||
migrationChain
|
||||
);
|
||||
}
|
||||
|
||||
return migratedDocumentsInCollection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the required migration steps to upgrade from the current version in the database to the target version.
|
||||
* Validates that there are no missing migration steps in the chain.
|
||||
*
|
||||
* @param registry - The collection migration registry
|
||||
* @param sourceSchemaVersion - Source schema version whose migration chain must be executed
|
||||
* @returns Array of migration steps that need to be executed in order
|
||||
*/
|
||||
protected getRequiredMigrationSteps<TDocument extends SchemaMigratableDocument>(
|
||||
registry: CollectionMigrationRegistry<TDocument>,
|
||||
sourceSchemaVersion: SchemaVersion
|
||||
): SchemaMigrationStep<TDocument>[] {
|
||||
const migrationSteps = this.findSchemaMigrationSteps(registry, sourceSchemaVersion, registry.currentVersion);
|
||||
|
||||
if (migrationSteps.length === 0) {
|
||||
throw new Error(
|
||||
`No migration steps found for ${registry.collectionName} ` +
|
||||
`(v${sourceSchemaVersion} -> v${registry.currentVersion}). Startup aborted.`
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
`Found ${migrationSteps.length} migration steps for ${registry.collectionName} ` +
|
||||
`(v${sourceSchemaVersion} -> v${registry.currentVersion})`
|
||||
);
|
||||
|
||||
return migrationSteps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the migration chain for all documents currently at a specific source schema version.
|
||||
* Handles marking the chain as started, completed, or failed in the migration repository.
|
||||
*
|
||||
* @param registry - The collection migration registry
|
||||
* @param sourceSchemaVersion - Source schema version to migrate from
|
||||
* @param migrationChain - Ordered migration steps from source to current version
|
||||
* @returns Number of migrated documents for this source version
|
||||
*/
|
||||
protected async executeMigrationChainForVersion<TDocument extends SchemaMigratableDocument>(
|
||||
registry: CollectionMigrationRegistry<TDocument>,
|
||||
sourceSchemaVersion: SchemaVersion,
|
||||
migrationChain: SchemaMigrationStep<TDocument>[]
|
||||
): Promise<number> {
|
||||
const pendingDocumentsBefore = await this.countDocumentsAtSchemaVersion(registry.model, sourceSchemaVersion);
|
||||
const migrationChainExecutionName = generateSchemaMigrationName(
|
||||
registry.collectionName,
|
||||
sourceSchemaVersion,
|
||||
registry.currentVersion
|
||||
);
|
||||
|
||||
if (pendingDocumentsBefore === 0) {
|
||||
this.logger.info(`Migration ${migrationChainExecutionName} has no pending documents, skipping execution`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const isCompleted = await this.migrationRepository.isCompleted(migrationChainExecutionName);
|
||||
|
||||
if (isCompleted) {
|
||||
this.logger.warn(
|
||||
`Migration ${migrationChainExecutionName} is marked as completed but still has ${pendingDocumentsBefore} pending ` +
|
||||
`documents at schemaVersion ${sourceSchemaVersion}. Re-running migration chain.`
|
||||
);
|
||||
}
|
||||
|
||||
await this.migrationRepository.markAsStarted(migrationChainExecutionName);
|
||||
|
||||
try {
|
||||
this.logger.info(`Executing migration: ${migrationChainExecutionName}`);
|
||||
const result = await this.migrateDocumentsForSourceVersion(
|
||||
registry.model,
|
||||
sourceSchemaVersion,
|
||||
registry.currentVersion,
|
||||
migrationChain
|
||||
);
|
||||
const pendingDocumentsAfter = await this.countDocumentsAtSchemaVersion(registry.model, sourceSchemaVersion);
|
||||
|
||||
const metadata = {
|
||||
collectionName: registry.collectionName,
|
||||
fromVersion: sourceSchemaVersion,
|
||||
toVersion: registry.currentVersion,
|
||||
chainLength: migrationChain.length,
|
||||
chainStepNames: migrationChain.map((step) => step.name),
|
||||
migratedCount: result.migratedCount,
|
||||
failedCount: result.failedCount,
|
||||
pendingBefore: pendingDocumentsBefore,
|
||||
pendingAfter: pendingDocumentsAfter,
|
||||
durationMs: result.durationMs
|
||||
};
|
||||
|
||||
if (result.failedCount > 0 || pendingDocumentsAfter > 0) {
|
||||
const failureReason =
|
||||
`Migration ${migrationChainExecutionName} did not complete successfully. ` +
|
||||
`failedCount=${result.failedCount}, pendingAfter=${pendingDocumentsAfter}`;
|
||||
|
||||
await this.migrationRepository.markAsFailed(migrationChainExecutionName, failureReason);
|
||||
throw new Error(failureReason);
|
||||
}
|
||||
|
||||
await this.migrationRepository.markAsCompleted(migrationChainExecutionName, metadata);
|
||||
|
||||
this.logger.info(
|
||||
`Migration ${migrationChainExecutionName} completed: ${result.migratedCount} documents migrated (${result.durationMs}ms)`
|
||||
);
|
||||
|
||||
return result.migratedCount;
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
await this.migrationRepository.markAsFailed(migrationChainExecutionName, errorMessage);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a migration chain on all documents that match the source version.
|
||||
* Applies all transforms sequentially and saves each document once at the target version.
|
||||
*
|
||||
* @param model - Mongoose model for the collection being migrated
|
||||
* @param sourceSchemaVersion - The schema version to migrate from
|
||||
* @param targetVersion - The schema version to migrate to
|
||||
* @param migrationChain - Array of migration steps to apply in order
|
||||
* @param batchSize - Number of documents to process in each batch. Default is 50.
|
||||
* @returns Migration result with statistics about the execution
|
||||
*/
|
||||
protected async migrateDocumentsForSourceVersion<TDocument extends SchemaMigratableDocument>(
|
||||
model: Model<TDocument>,
|
||||
sourceSchemaVersion: SchemaVersion,
|
||||
targetVersion: SchemaVersion,
|
||||
migrationChain: SchemaMigrationStep<TDocument>[],
|
||||
batchSize = 50
|
||||
): Promise<MigrationResult> {
|
||||
const startTime = Date.now();
|
||||
let migratedCount = 0;
|
||||
let failedCount = 0;
|
||||
|
||||
const sourceVersionFilter = { schemaVersion: sourceSchemaVersion };
|
||||
const totalSourceVersionDocuments = await model.countDocuments(sourceVersionFilter).exec();
|
||||
|
||||
if (totalSourceVersionDocuments === 0) {
|
||||
return {
|
||||
migratedCount,
|
||||
failedCount,
|
||||
durationMs: Date.now() - startTime
|
||||
};
|
||||
}
|
||||
|
||||
let processedDocumentsCount = 0;
|
||||
let lastProcessedDocumentId: TDocument['_id'] | null = null;
|
||||
let hasMoreBatches = true;
|
||||
|
||||
while (hasMoreBatches) {
|
||||
const batchFilter =
|
||||
lastProcessedDocumentId === null
|
||||
? sourceVersionFilter
|
||||
: {
|
||||
...sourceVersionFilter,
|
||||
_id: { $gt: lastProcessedDocumentId }
|
||||
};
|
||||
const documents = await model.find(batchFilter).sort({ _id: 1 }).limit(batchSize).exec();
|
||||
|
||||
if (documents.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
const batchResults = await Promise.allSettled(
|
||||
documents.map(async (doc) => {
|
||||
const migratedDocument = this.applyTransformChain(doc, migrationChain, targetVersion);
|
||||
await migratedDocument.save();
|
||||
return String(doc._id);
|
||||
})
|
||||
);
|
||||
|
||||
for (let i = 0; i < batchResults.length; i++) {
|
||||
const batchResult = batchResults[i];
|
||||
|
||||
if (batchResult.status === 'fulfilled') {
|
||||
migratedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
failedCount++;
|
||||
this.logger.warn(`Failed to migrate document ${String(documents[i]._id)}:`, batchResult.reason);
|
||||
}
|
||||
|
||||
processedDocumentsCount += documents.length;
|
||||
lastProcessedDocumentId = documents[documents.length - 1]._id;
|
||||
hasMoreBatches = documents.length === batchSize;
|
||||
this.logger.debug(`Processed ${processedDocumentsCount}/${totalSourceVersionDocuments} documents`);
|
||||
}
|
||||
|
||||
return {
|
||||
migratedCount,
|
||||
failedCount,
|
||||
durationMs: Date.now() - startTime
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the minimum schema version present in the collection.
|
||||
* This is used to detect the oldest pending version of documents.
|
||||
*
|
||||
* @param model - Mongoose model for the collection
|
||||
* @returns Current version or null if collection is empty
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
protected async getCurrentSchemaVersion(model: Model<any>): Promise<number | null> {
|
||||
protected async getMinSchemaVersion<TDocument extends SchemaMigratableDocument>(
|
||||
model: Model<TDocument>
|
||||
): Promise<SchemaVersion | null> {
|
||||
try {
|
||||
// Get a sample document to check its version
|
||||
const sampleDoc = await model.findOne({}).select('schemaVersion').exec();
|
||||
const sampleDoc = await model.findOne({}).sort({ schemaVersion: 1 }).select('schemaVersion').exec();
|
||||
|
||||
if (!sampleDoc) {
|
||||
return null; // Collection is empty
|
||||
@ -232,39 +358,111 @@ export class MigrationService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the migrations needed to upgrade from one version to another.
|
||||
* Gets the maximum schema version present in the collection.
|
||||
* This is used to detect if there are any documents above expected version.
|
||||
*
|
||||
* @param model - Mongoose model for the collection
|
||||
* @returns Maximum version or null if collection is empty
|
||||
*/
|
||||
protected async getMaxSchemaVersion<TDocument extends SchemaMigratableDocument>(
|
||||
model: Model<TDocument>
|
||||
): Promise<SchemaVersion | null> {
|
||||
try {
|
||||
const sampleDoc = await model.findOne({}).sort({ schemaVersion: -1 }).select('schemaVersion').exec();
|
||||
|
||||
if (!sampleDoc) {
|
||||
return null; // Collection is empty
|
||||
}
|
||||
|
||||
return sampleDoc.schemaVersion ?? 1;
|
||||
} catch (error) {
|
||||
this.logger.error('Error getting max schema version:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Counts how many documents are at a specific schema version.
|
||||
*
|
||||
* @param model - Mongoose model for the collection
|
||||
* @param schemaVersion - Schema version to count
|
||||
* @returns Number of documents at the specified schema version
|
||||
*/
|
||||
protected async countDocumentsAtSchemaVersion<TDocument extends SchemaMigratableDocument>(
|
||||
model: Model<TDocument>,
|
||||
schemaVersion: SchemaVersion
|
||||
): Promise<number> {
|
||||
return model.countDocuments({ schemaVersion }).exec();
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the schema migration steps needed to upgrade from one version to another.
|
||||
* Returns migrations in the correct order to apply.
|
||||
*
|
||||
* @param registry - Collection migration registry
|
||||
* @param fromVersion - Current version in database
|
||||
* @param toVersion - Target version from application
|
||||
* @returns Array of migrations to execute in order
|
||||
* @returns Array of schema migration steps to execute in order
|
||||
*/
|
||||
protected findNeededMigrations(
|
||||
registry: CollectionMigrationRegistry,
|
||||
fromVersion: number,
|
||||
toVersion: number
|
||||
): ISchemaMigration[] {
|
||||
const needed: ISchemaMigration[] = [];
|
||||
protected findSchemaMigrationSteps<TDocument extends SchemaMigratableDocument>(
|
||||
registry: CollectionMigrationRegistry<TDocument>,
|
||||
fromVersion: SchemaVersion,
|
||||
toVersion: SchemaVersion
|
||||
): SchemaMigrationStep<TDocument>[] {
|
||||
const steps: SchemaMigrationStep<TDocument>[] = [];
|
||||
|
||||
// Build a chain of migrations from fromVersion to toVersion
|
||||
// Build a chain of migration steps from fromVersion to toVersion
|
||||
let currentVersion = fromVersion;
|
||||
|
||||
while (currentVersion < toVersion) {
|
||||
const nextMigration = registry.migrations.find((m) => m.fromVersion === currentVersion);
|
||||
const nextVersion = currentVersion + 1;
|
||||
const expectedMigrationName = generateSchemaMigrationName(
|
||||
registry.collectionName,
|
||||
currentVersion,
|
||||
nextVersion
|
||||
);
|
||||
const transform = registry.migrations.get(expectedMigrationName);
|
||||
|
||||
if (!nextMigration) {
|
||||
this.logger.warn(
|
||||
`No migration found from version ${currentVersion} for ${registry.collectionName}. ` +
|
||||
`Migration chain is incomplete.`
|
||||
if (!transform) {
|
||||
throw new Error(
|
||||
`No migration found from version ${currentVersion} to ${nextVersion} for ` +
|
||||
`${registry.collectionName}. Migration chain is incomplete.`
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
needed.push(nextMigration);
|
||||
currentVersion = nextMigration.toVersion;
|
||||
steps.push({
|
||||
name: expectedMigrationName,
|
||||
fromVersion: currentVersion,
|
||||
toVersion: nextVersion,
|
||||
transform
|
||||
});
|
||||
currentVersion = nextVersion;
|
||||
}
|
||||
|
||||
return needed;
|
||||
return steps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies a chain of migration transforms to a document sequentially.
|
||||
* Updates the document's schemaVersion to the target version after applying all transforms.
|
||||
*
|
||||
* @param document - The document to transform
|
||||
* @param migrationChain - Array of migration steps to apply in order
|
||||
* @param targetVersion - The final schema version after applying the chain
|
||||
* @returns The transformed document with updated schemaVersion
|
||||
*/
|
||||
protected applyTransformChain<TDocument extends SchemaMigratableDocument>(
|
||||
document: TDocument,
|
||||
migrationChain: SchemaMigrationStep<TDocument>[],
|
||||
targetVersion: SchemaVersion
|
||||
): TDocument {
|
||||
let transformedDocument = document;
|
||||
|
||||
for (const migrationStep of migrationChain) {
|
||||
transformedDocument = migrationStep.transform(transformedDocument);
|
||||
}
|
||||
|
||||
transformedDocument.schemaVersion = targetVersion;
|
||||
return transformedDocument;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user