backend: remove legacy storage service and migration process
This commit is contained in:
parent
450aa85b88
commit
caad4bc550
@ -41,7 +41,6 @@ import { StorageInitService } from '../services/storage/storage-init.service.js'
|
||||
import { StorageKeyBuilder, StorageProvider } from '../services/storage/storage.interface.js';
|
||||
import { StorageFactory } from '../services/storage/storage.factory.js';
|
||||
import { BlobStorageService } from '../services/storage/blob-storage.service.js';
|
||||
import { LegacyStorageService } from '../services/storage/legacy-storage.service.js';
|
||||
|
||||
import { MigrationService } from '../services/migration.service.js';
|
||||
import { LiveKitService } from '../services/livekit.service.js';
|
||||
@ -101,7 +100,6 @@ export const registerDependencies = () => {
|
||||
container.bind(StorageFactory).toSelf().inSingletonScope();
|
||||
container.bind(BlobStorageService).toSelf().inSingletonScope();
|
||||
container.bind(StorageInitService).toSelf().inSingletonScope();
|
||||
container.bind(LegacyStorageService).toSelf().inSingletonScope();
|
||||
container.bind(MigrationService).toSelf().inSingletonScope();
|
||||
|
||||
container.bind(FrontendEventService).toSelf().inSingletonScope();
|
||||
|
||||
@ -1,13 +1,6 @@
|
||||
export const REDIS_KEY_PREFIX = 'ov_meet:';
|
||||
|
||||
export const enum RedisKeyName {
|
||||
GLOBAL_CONFIG = `${REDIS_KEY_PREFIX}global_config`,
|
||||
ROOM = `${REDIS_KEY_PREFIX}room:`,
|
||||
RECORDING = `${REDIS_KEY_PREFIX}recording:`,
|
||||
RECORDING_SECRETS = `${REDIS_KEY_PREFIX}recording_secrets:`,
|
||||
ARCHIVED_ROOM = `${REDIS_KEY_PREFIX}archived_room:`,
|
||||
USER = `${REDIS_KEY_PREFIX}user:`,
|
||||
API_KEYS = `${REDIS_KEY_PREFIX}api_keys:`,
|
||||
//Tracks all currently reserved participant names per room (with TTL for auto-expiration).
|
||||
ROOM_PARTICIPANTS = `${REDIS_KEY_PREFIX}room_participants:`,
|
||||
// Stores released numeric suffixes (per base name) in a sorted set, so that freed numbers
|
||||
|
||||
@ -18,14 +18,12 @@ import { RoomRepository } from '../repositories/room.repository.js';
|
||||
import { UserRepository } from '../repositories/user.repository.js';
|
||||
import { LoggerService } from './logger.service.js';
|
||||
import { MutexService } from './mutex.service.js';
|
||||
import { LegacyStorageService } from './storage/legacy-storage.service.js';
|
||||
|
||||
@injectable()
|
||||
export class MigrationService {
|
||||
constructor(
|
||||
@inject(LoggerService) protected logger: LoggerService,
|
||||
@inject(MutexService) protected mutexService: MutexService,
|
||||
@inject(LegacyStorageService) protected legacyStorageService: LegacyStorageService,
|
||||
@inject(GlobalConfigRepository) protected configRepository: GlobalConfigRepository,
|
||||
@inject(UserRepository) protected userRepository: UserRepository,
|
||||
@inject(ApiKeyRepository) protected apiKeyRepository: ApiKeyRepository,
|
||||
@ -56,9 +54,6 @@ export class MigrationService {
|
||||
|
||||
lockAcquired = true;
|
||||
|
||||
// Migrate data from legacy storage to MongoDB if needed
|
||||
await this.runMigrationsFromLegacyStorageToMongoDB();
|
||||
|
||||
// Run schema migrations to upgrade document structures
|
||||
await this.runSchemaMigrations();
|
||||
|
||||
@ -75,348 +70,6 @@ export class MigrationService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Orchestrates the migration from legacy storage to MongoDB.
|
||||
* Calls individual migration methods in the correct order.
|
||||
*/
|
||||
protected async runMigrationsFromLegacyStorageToMongoDB(): Promise<void> {
|
||||
const migrationName = MigrationName.LEGACY_STORAGE_TO_MONGODB;
|
||||
|
||||
// Check if legacy storage migration has already been completed
|
||||
const isLegacyMigrationCompleted = await this.migrationRepository.isCompleted(migrationName);
|
||||
|
||||
if (isLegacyMigrationCompleted) {
|
||||
this.logger.info('Legacy storage migration already completed. Skipping...');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info('Running migrations from legacy storage to MongoDB...');
|
||||
|
||||
try {
|
||||
// Mark migration as started
|
||||
await this.migrationRepository.markAsStarted(migrationName);
|
||||
|
||||
// Run the actual migrations
|
||||
await Promise.all([
|
||||
this.migrateLegacyGlobalConfig(),
|
||||
this.migrateLegacyUsers(),
|
||||
this.migrateLegacyApiKeys()
|
||||
]);
|
||||
await this.migrateLegacyRooms();
|
||||
await this.migrateLegacyRecordings();
|
||||
|
||||
// Mark migration as completed
|
||||
await this.migrationRepository.markAsCompleted(migrationName);
|
||||
|
||||
this.logger.info('Legacy storage migration completed successfully');
|
||||
} catch (error) {
|
||||
this.logger.error('Error running migrations from legacy storage to MongoDB:', error);
|
||||
|
||||
// Mark migration as failed
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
await this.migrationRepository.markAsFailed(migrationName, errorMessage);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrates global configuration from legacy storage to MongoDB.
|
||||
* Applies any missing fields for backwards compatibility.
|
||||
*/
|
||||
protected async migrateLegacyGlobalConfig(): Promise<void> {
|
||||
this.logger.info('Migrating global configuration from legacy storage to MongoDB...');
|
||||
|
||||
try {
|
||||
// Check if config already exists in MongoDB
|
||||
const existingConfig = await this.configRepository.get();
|
||||
|
||||
if (existingConfig) {
|
||||
this.logger.info('Global config already exists in MongoDB, skipping migration');
|
||||
return;
|
||||
}
|
||||
|
||||
// Try to get config from legacy storage
|
||||
const legacyConfig = await this.legacyStorageService.getGlobalConfig();
|
||||
|
||||
if (!legacyConfig) {
|
||||
this.logger.info('No global config found in legacy storage, skipping migration');
|
||||
return;
|
||||
}
|
||||
|
||||
// Save to MongoDB
|
||||
await this.configRepository.create(legacyConfig);
|
||||
this.logger.info('Global config migrated successfully');
|
||||
|
||||
// Delete from legacy storage
|
||||
await this.legacyStorageService.deleteGlobalConfig();
|
||||
this.logger.info('Legacy global config deleted');
|
||||
} catch (error) {
|
||||
this.logger.error('Error migrating global config from legacy storage to MongoDB:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrates users from legacy storage to MongoDB.
|
||||
*/
|
||||
protected async migrateLegacyUsers(): Promise<void> {
|
||||
this.logger.info('Migrating users from legacy storage to MongoDB...');
|
||||
|
||||
try {
|
||||
// Legacy storage only had one user (admin)
|
||||
// We need to check for the default admin username
|
||||
const adminUsername = 'admin'; // Default username in legacy systems
|
||||
|
||||
const legacyUser = await this.legacyStorageService.getUser(adminUsername);
|
||||
|
||||
if (!legacyUser) {
|
||||
this.logger.info('No users found in legacy storage, skipping migration');
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if user already exists in MongoDB
|
||||
const existingUser = await this.userRepository.findByUsername(legacyUser.username);
|
||||
|
||||
if (existingUser) {
|
||||
this.logger.info(`User '${legacyUser.username}' already exists in MongoDB, skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Save to MongoDB
|
||||
await this.userRepository.create(legacyUser);
|
||||
this.logger.info(`User '${legacyUser.username}' migrated successfully`);
|
||||
|
||||
// Delete from legacy storage
|
||||
await this.legacyStorageService.deleteUser(legacyUser.username);
|
||||
this.logger.info(`Legacy user '${legacyUser.username}' deleted`);
|
||||
} catch (error) {
|
||||
this.logger.error('Error migrating users from legacy storage to MongoDB:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrates API keys from legacy storage to MongoDB.
|
||||
*/
|
||||
protected async migrateLegacyApiKeys(): Promise<void> {
|
||||
this.logger.info('Migrating API key from legacy storage to MongoDB...');
|
||||
|
||||
try {
|
||||
const legacyApiKeys = await this.legacyStorageService.getApiKeys();
|
||||
|
||||
if (!legacyApiKeys || legacyApiKeys.length === 0) {
|
||||
this.logger.info('No API key found in legacy storage, skipping migration');
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if an API key already exists in MongoDB
|
||||
const existingApiKeys = await this.apiKeyRepository.findAll();
|
||||
|
||||
if (existingApiKeys.length > 0) {
|
||||
this.logger.info('API key already exists in MongoDB, skipping migration');
|
||||
return;
|
||||
}
|
||||
|
||||
// Save to MongoDB
|
||||
// Only one API key existed in legacy storage
|
||||
await this.apiKeyRepository.create(legacyApiKeys[0]);
|
||||
this.logger.info(`API key migrated successfully`);
|
||||
|
||||
// Delete from legacy storage
|
||||
await this.legacyStorageService.deleteApiKeys();
|
||||
this.logger.info('Legacy API key deleted');
|
||||
} catch (error) {
|
||||
this.logger.error('Error migrating API keys from legacy storage to MongoDB:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrates rooms from legacy storage to MongoDB.
|
||||
* Processes rooms in batches for better performance.
|
||||
*/
|
||||
protected async migrateLegacyRooms(): Promise<void> {
|
||||
this.logger.info('Migrating rooms from legacy storage to MongoDB...');
|
||||
|
||||
try {
|
||||
let migratedCount = 0;
|
||||
let skippedCount = 0;
|
||||
let failedCount = 0;
|
||||
let nextPageToken: string | undefined;
|
||||
const batchSize = 50; // Process rooms in batches
|
||||
|
||||
do {
|
||||
// Get batch of rooms from legacy storage
|
||||
const { rooms, nextPageToken: nextToken } = await this.legacyStorageService.getRooms(
|
||||
undefined,
|
||||
batchSize,
|
||||
nextPageToken
|
||||
);
|
||||
|
||||
if (rooms.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
const roomIdsToDelete: string[] = [];
|
||||
|
||||
for (const room of rooms) {
|
||||
try {
|
||||
// Check if room already exists in MongoDB
|
||||
const existingRoom = await this.roomRepository.findByRoomId(room.roomId);
|
||||
|
||||
if (existingRoom) {
|
||||
this.logger.debug(`Room '${room.roomId}' already exists in MongoDB, skipping`);
|
||||
skippedCount++;
|
||||
roomIdsToDelete.push(room.roomId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Save to MongoDB
|
||||
await this.roomRepository.create(room);
|
||||
migratedCount++;
|
||||
roomIdsToDelete.push(room.roomId);
|
||||
this.logger.debug(`Room '${room.roomId}' migrated successfully`);
|
||||
} catch (error) {
|
||||
this.logger.warn(`Failed to migrate room '${room.roomId}':`, error);
|
||||
failedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// Delete migrated rooms from legacy storage
|
||||
if (roomIdsToDelete.length > 0) {
|
||||
await this.legacyStorageService.deleteRooms(roomIdsToDelete);
|
||||
this.logger.debug(`Deleted ${roomIdsToDelete.length} rooms from legacy storage`);
|
||||
|
||||
// Try to delete archived room metadata in parallel for better performance
|
||||
// No need to check if exists first - just attempt deletion
|
||||
const archivedMetadataPromises = roomIdsToDelete.map(async (roomId) => {
|
||||
try {
|
||||
await this.legacyStorageService.deleteArchivedRoomMetadata(roomId);
|
||||
this.logger.debug(`Deleted archived metadata for room '${roomId}'`);
|
||||
} catch (error) {
|
||||
// Silently ignore if archived metadata doesn't exist
|
||||
// Only log if it's an unexpected error
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
|
||||
if (!errorMessage.includes('not found') && !errorMessage.includes('does not exist')) {
|
||||
this.logger.warn(`Failed to delete archived metadata for room '${roomId}':`, error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.allSettled(archivedMetadataPromises);
|
||||
}
|
||||
|
||||
nextPageToken = nextToken;
|
||||
} while (nextPageToken);
|
||||
|
||||
this.logger.info(
|
||||
`Rooms migration completed: ${migratedCount} migrated, ${skippedCount} skipped, ${failedCount} failed`
|
||||
);
|
||||
|
||||
if (failedCount > 0) {
|
||||
throw new Error(`Failed to migrate ${failedCount} room(s) from legacy storage`);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Error migrating rooms from legacy storage to MongoDB:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrates recordings from legacy storage to MongoDB.
|
||||
* Processes recordings in batches and includes access secrets.
|
||||
*/
|
||||
protected async migrateLegacyRecordings(): Promise<void> {
|
||||
this.logger.info('Migrating recordings from legacy storage to MongoDB...');
|
||||
|
||||
try {
|
||||
let migratedCount = 0;
|
||||
let skippedCount = 0;
|
||||
let failedCount = 0;
|
||||
let nextPageToken: string | undefined;
|
||||
const batchSize = 50; // Process recordings in batches
|
||||
|
||||
do {
|
||||
// Get batch of recordings from legacy storage
|
||||
const { recordings, nextContinuationToken } = await this.legacyStorageService.getRecordings(
|
||||
undefined,
|
||||
batchSize,
|
||||
nextPageToken
|
||||
);
|
||||
|
||||
if (recordings.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
const recordingIdsToDelete: string[] = [];
|
||||
|
||||
for (const recording of recordings) {
|
||||
try {
|
||||
// Check if recording already exists in MongoDB
|
||||
const existingRecording = await this.recordingRepository.findByRecordingId(
|
||||
recording.recordingId
|
||||
);
|
||||
|
||||
if (existingRecording) {
|
||||
this.logger.debug(
|
||||
`Recording '${recording.recordingId}' already exists in MongoDB, skipping`
|
||||
);
|
||||
skippedCount++;
|
||||
recordingIdsToDelete.push(recording.recordingId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get access secrets from legacy storage
|
||||
const secrets = await this.legacyStorageService.getRecordingAccessSecrets(
|
||||
recording.recordingId
|
||||
);
|
||||
|
||||
// Prepare recording document with access secrets
|
||||
const recordingWithSecrets = {
|
||||
...recording,
|
||||
accessSecrets: secrets
|
||||
? {
|
||||
public: secrets.publicAccessSecret,
|
||||
private: secrets.privateAccessSecret
|
||||
}
|
||||
: undefined
|
||||
};
|
||||
|
||||
// Save to MongoDB (will generate new secrets if not provided)
|
||||
await this.recordingRepository.create(recordingWithSecrets);
|
||||
migratedCount++;
|
||||
recordingIdsToDelete.push(recording.recordingId);
|
||||
this.logger.debug(`Recording '${recording.recordingId}' migrated successfully`);
|
||||
} catch (error) {
|
||||
this.logger.warn(`Failed to migrate recording '${recording.recordingId}':`, error);
|
||||
failedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// Delete migrated recordings from legacy storage (includes metadata and secrets)
|
||||
if (recordingIdsToDelete.length > 0) {
|
||||
await this.legacyStorageService.deleteRecordings(recordingIdsToDelete);
|
||||
this.logger.debug(`Deleted ${recordingIdsToDelete.length} recordings from legacy storage`);
|
||||
}
|
||||
|
||||
nextPageToken = nextContinuationToken;
|
||||
} while (nextPageToken);
|
||||
|
||||
this.logger.info(
|
||||
`Recordings migration completed: ${migratedCount} migrated, ${skippedCount} skipped, ${failedCount} failed`
|
||||
);
|
||||
|
||||
if (failedCount > 0) {
|
||||
throw new Error(`Failed to migrate ${failedCount} recording(s) from legacy storage`);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Error migrating recordings from legacy storage to MongoDB:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs all schema migrations to upgrade document structures to the latest version.
|
||||
* Processes each collection in the registry and executes pending migrations.
|
||||
|
||||
@ -1,474 +0,0 @@
|
||||
import { GlobalConfig, MeetApiKey, MeetRecordingInfo, MeetRoom, MeetUser } from '@openvidu-meet/typings';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { OpenViduMeetError } from '../../models/error.model.js';
|
||||
import { RedisKeyName } from '../../models/redis.model.js';
|
||||
import { LoggerService } from '../logger.service.js';
|
||||
import { RedisService } from '../redis.service.js';
|
||||
import { StorageFactory } from './storage.factory.js';
|
||||
import { StorageKeyBuilder, StorageProvider } from './storage.interface.js';
|
||||
|
||||
/**
|
||||
* Legacy storage service for reading and migrating data from S3/ABS/GCS to MongoDB.
|
||||
*
|
||||
* This service is used during the migration process to:
|
||||
* - Read existing data from legacy storage (S3/Azure Blob Storage/Google Cloud Storage)
|
||||
* - Access data cached in Redis that originated from legacy storage
|
||||
* - Clean up legacy data after successful migration to MongoDB
|
||||
*
|
||||
* **Important**: This service is read-only for migration purposes. New data should be
|
||||
* created directly in MongoDB using the appropriate repositories (RoomRepository,
|
||||
* RecordingRepository, UserRepository, etc.).
|
||||
*
|
||||
* Legacy storage structure:
|
||||
* - Rooms: Stored as JSON files in blob storage with Redis cache
|
||||
* - Recordings: Metadata as JSON files, binary media as separate blob files
|
||||
* - Users: Stored as JSON files with Redis cache
|
||||
* - API Keys: Stored as JSON files with Redis cache
|
||||
* - Global Config: Stored as JSON files with Redis cache
|
||||
*/
|
||||
@injectable()
|
||||
export class LegacyStorageService {
|
||||
protected storageProvider: StorageProvider;
|
||||
protected keyBuilder: StorageKeyBuilder;
|
||||
|
||||
constructor(
|
||||
@inject(LoggerService) protected logger: LoggerService,
|
||||
@inject(StorageFactory) protected storageFactory: StorageFactory,
|
||||
@inject(RedisService) protected redisService: RedisService
|
||||
) {
|
||||
const { provider, keyBuilder } = this.storageFactory.create();
|
||||
this.storageProvider = provider;
|
||||
this.keyBuilder = keyBuilder;
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// GLOBAL CONFIG DOMAIN LOGIC
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Retrieves the global configuration from legacy storage.
|
||||
*
|
||||
* @returns A promise that resolves to the global configuration, or null if not found
|
||||
*/
|
||||
async getGlobalConfig(): Promise<GlobalConfig | null> {
|
||||
const redisKey = RedisKeyName.GLOBAL_CONFIG;
|
||||
const storageKey = this.keyBuilder.buildGlobalConfigKey();
|
||||
|
||||
const config = await this.getFromCacheAndStorage<GlobalConfig>(redisKey, storageKey);
|
||||
return config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the global configuration from legacy storage.
|
||||
*/
|
||||
async deleteGlobalConfig(): Promise<void> {
|
||||
const redisKey = RedisKeyName.GLOBAL_CONFIG;
|
||||
const storageKey = this.keyBuilder.buildGlobalConfigKey();
|
||||
|
||||
await this.deleteFromCacheAndStorage(redisKey, storageKey);
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// ROOM DOMAIN LOGIC
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Retrieves a paginated list of rooms from legacy storage.
|
||||
*
|
||||
* @param maxItems - Optional maximum number of rooms to retrieve per page
|
||||
* @param nextPageToken - Optional token for pagination to get the next set of results
|
||||
* @returns Promise that resolves to an object containing:
|
||||
* - rooms: Array of MRoom objects retrieved from storage
|
||||
* - isTruncated: Boolean indicating if there are more results available
|
||||
* - nextPageToken: Optional token for retrieving the next page of results
|
||||
*/
|
||||
async getRooms(
|
||||
roomName?: string,
|
||||
maxItems?: number,
|
||||
nextPageToken?: string
|
||||
): Promise<{
|
||||
rooms: MeetRoom[];
|
||||
isTruncated: boolean;
|
||||
nextPageToken?: string;
|
||||
}> {
|
||||
try {
|
||||
const searchKey = this.keyBuilder.buildAllMeetRoomsKey(roomName);
|
||||
const { Contents, IsTruncated, NextContinuationToken } = await this.storageProvider.listObjects(
|
||||
searchKey,
|
||||
maxItems,
|
||||
nextPageToken
|
||||
);
|
||||
|
||||
const rooms: MeetRoom[] = [];
|
||||
|
||||
if (Contents && Contents.length > 0) {
|
||||
const roomPromises = Contents.map(async (item) => {
|
||||
if (item.Key && item.Key.endsWith('.json')) {
|
||||
try {
|
||||
const room = await this.storageProvider.getObject<MeetRoom>(item.Key);
|
||||
return room;
|
||||
} catch (error) {
|
||||
this.logger.warn(`Failed to load room from ${item.Key}: ${error}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
const roomResults = await Promise.all(roomPromises);
|
||||
rooms.push(...roomResults.filter((room): room is Awaited<MeetRoom> => room !== null));
|
||||
}
|
||||
|
||||
return {
|
||||
rooms,
|
||||
isTruncated: IsTruncated || false,
|
||||
nextPageToken: NextContinuationToken
|
||||
};
|
||||
} catch (error) {
|
||||
this.handleError(error, 'Error retrieving rooms');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes multiple rooms by roomIds from legacy storage.
|
||||
*
|
||||
* @param roomIds - Array of room identifiers to delete
|
||||
*/
|
||||
async deleteRooms(roomIds: string[]): Promise<void> {
|
||||
const roomKeys = roomIds.map((roomId) => this.keyBuilder.buildMeetRoomKey(roomId));
|
||||
const redisKeys = roomIds.map((roomId) => RedisKeyName.ROOM + roomId);
|
||||
|
||||
await this.deleteFromCacheAndStorageBatch(redisKeys, roomKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes archived room metadata for a given roomId from legacy storage.
|
||||
*
|
||||
* @param roomId - The unique room identifier
|
||||
*/
|
||||
async deleteArchivedRoomMetadata(roomId: string): Promise<void> {
|
||||
const redisKey = RedisKeyName.ARCHIVED_ROOM + roomId;
|
||||
const storageKey = this.keyBuilder.buildArchivedMeetRoomKey(roomId);
|
||||
|
||||
await this.deleteFromCacheAndStorage(redisKey, storageKey);
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// RECORDING DOMAIN LOGIC
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Retrieves a paginated list of recordings from legacy storage
|
||||
*
|
||||
* @param maxItems - Optional maximum number of items to return per page for pagination.
|
||||
* @param nextPageToken - Optional token for pagination to retrieve the next page of results.
|
||||
*
|
||||
* @returns A promise that resolves to an object containing:
|
||||
* - `recordings`: Array of recording metadata objects (MRec)
|
||||
* - `isTruncated`: Optional boolean indicating if there are more results available
|
||||
* - `nextContinuationToken`: Optional token to retrieve the next page of results
|
||||
*/
|
||||
async getRecordings(
|
||||
roomId?: string,
|
||||
maxItems?: number,
|
||||
nextPageToken?: string
|
||||
): Promise<{ recordings: MeetRecordingInfo[]; isTruncated?: boolean; nextContinuationToken?: string }> {
|
||||
try {
|
||||
const searchKey = this.keyBuilder.buildAllMeetRecordingsKey(roomId);
|
||||
const { Contents, IsTruncated, NextContinuationToken } = await this.storageProvider.listObjects(
|
||||
searchKey,
|
||||
maxItems,
|
||||
nextPageToken
|
||||
);
|
||||
|
||||
const recordings: MeetRecordingInfo[] = [];
|
||||
|
||||
if (Contents && Contents.length > 0) {
|
||||
const recordingPromises = Contents.map(async (item) => {
|
||||
if (!item.Key || !item.Key.endsWith('.json')) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const recording = await this.storageProvider.getObject<MeetRecordingInfo>(item.Key!);
|
||||
return recording;
|
||||
} catch (error) {
|
||||
this.logger.warn(`Failed to load recording metadata from ${item.Key}: ${error}`);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
const recordingResults = await Promise.all(recordingPromises);
|
||||
recordings.push(
|
||||
...recordingResults.filter(
|
||||
(recording): recording is Awaited<MeetRecordingInfo> => recording !== null
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
recordings: recordings,
|
||||
isTruncated: Boolean(IsTruncated),
|
||||
nextContinuationToken: NextContinuationToken
|
||||
};
|
||||
} catch (error) {
|
||||
this.handleError(error, 'Error retrieving recordings');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves access secrets for a specific recording from legacy storage.
|
||||
*
|
||||
* @param recordingId - The unique identifier of the recording
|
||||
* @returns A promise that resolves to an object containing public and private access secrets,
|
||||
* or null if no secrets are found for the given recordingId
|
||||
*/
|
||||
async getRecordingAccessSecrets(
|
||||
recordingId: string
|
||||
): Promise<{ publicAccessSecret: string; privateAccessSecret: string } | null> {
|
||||
try {
|
||||
const redisKey = RedisKeyName.RECORDING_SECRETS + recordingId;
|
||||
const secretsKey = this.keyBuilder.buildAccessRecordingSecretsKey(recordingId);
|
||||
|
||||
const secrets = await this.getFromCacheAndStorage<{
|
||||
publicAccessSecret: string;
|
||||
privateAccessSecret: string;
|
||||
}>(redisKey, secretsKey);
|
||||
|
||||
if (!secrets) {
|
||||
this.logger.warn(`No access secrets found for recording ${recordingId}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
return secrets;
|
||||
} catch (error) {
|
||||
this.handleError(error, `Error fetching access secrets for recording ${recordingId}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes multiple recordings by recordingIds from legacy storage.
|
||||
*
|
||||
* @param recordingIds - Array of recording identifiers to delete
|
||||
*/
|
||||
async deleteRecordings(recordingIds: string[]): Promise<void> {
|
||||
if (recordingIds.length === 0) {
|
||||
this.logger.debug('No recordings to delete');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Build all paths from recordingIds
|
||||
const redisKeys: string[] = [];
|
||||
const storageKeys: string[] = [];
|
||||
|
||||
for (const recordingId of recordingIds) {
|
||||
redisKeys.push(RedisKeyName.RECORDING + recordingId);
|
||||
redisKeys.push(RedisKeyName.RECORDING_SECRETS + recordingId);
|
||||
|
||||
storageKeys.push(this.keyBuilder.buildMeetRecordingKey(recordingId));
|
||||
storageKeys.push(this.keyBuilder.buildAccessRecordingSecretsKey(recordingId));
|
||||
}
|
||||
|
||||
await this.deleteFromCacheAndStorageBatch(redisKeys, storageKeys);
|
||||
} catch (error) {
|
||||
this.handleError(error, `Error deleting recordings: ${recordingIds.join(', ')}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// USER DOMAIN LOGIC
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Retrieves user data for a specific username from legacy storage.
|
||||
*
|
||||
* @param username - The username of the user to retrieve
|
||||
* @returns A promise that resolves to the user data, or null if not found
|
||||
*/
|
||||
async getUser(username: string): Promise<MeetUser | null> {
|
||||
const redisKey = RedisKeyName.USER + username;
|
||||
const storageKey = this.keyBuilder.buildUserKey(username);
|
||||
|
||||
const user = await this.getFromCacheAndStorage<MeetUser>(redisKey, storageKey);
|
||||
return user;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes user data for a specific username from legacy storage.
|
||||
*
|
||||
* @param username - The username of the user to delete
|
||||
*/
|
||||
async deleteUser(username: string): Promise<void> {
|
||||
const redisKey = RedisKeyName.USER + username;
|
||||
const storageKey = this.keyBuilder.buildUserKey(username);
|
||||
|
||||
await this.deleteFromCacheAndStorage(redisKey, storageKey);
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// API KEY DOMAIN LOGIC
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Retrieves all API keys from legacy storage.
|
||||
*
|
||||
* @returns A promise that resolves to an array of MeetApiKey objects
|
||||
*/
|
||||
async getApiKeys(): Promise<MeetApiKey[]> {
|
||||
const redisKey = RedisKeyName.API_KEYS;
|
||||
const storageKey = this.keyBuilder.buildApiKeysKey();
|
||||
|
||||
const apiKeys = await this.getFromCacheAndStorage<MeetApiKey[]>(redisKey, storageKey);
|
||||
|
||||
if (!apiKeys) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return apiKeys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all API keys from legacy storage.
|
||||
*/
|
||||
async deleteApiKeys(): Promise<void> {
|
||||
const redisKey = RedisKeyName.API_KEYS;
|
||||
const storageKey = this.keyBuilder.buildApiKeysKey();
|
||||
|
||||
await this.deleteFromCacheAndStorage(redisKey, storageKey);
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// PRIVATE HYBRID CACHE METHODS (Redis + Storage)
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Retrieves data from Redis cache first, falls back to storage if not found.
|
||||
*
|
||||
* @param redisKey - The Redis key to check first
|
||||
* @param storageKey - The storage key/path as fallback
|
||||
* @returns Promise that resolves with the data or null if not found
|
||||
*/
|
||||
protected async getFromCacheAndStorage<T>(redisKey: string, storageKey: string): Promise<T | null> {
|
||||
try {
|
||||
// 1. Try Redis first (fast cache)
|
||||
this.logger.debug(`Attempting to get data from Redis cache: ${redisKey}`);
|
||||
const cachedData = await this.redisService.get(redisKey);
|
||||
|
||||
if (cachedData) {
|
||||
this.logger.debug(`Cache HIT for key: ${redisKey}`);
|
||||
|
||||
try {
|
||||
return JSON.parse(cachedData) as T;
|
||||
} catch (parseError) {
|
||||
this.logger.warn(`Failed to parse cached data for key ${redisKey}: ${parseError}`);
|
||||
// Continue to storage fallback
|
||||
}
|
||||
} else {
|
||||
this.logger.debug(`Cache MISS for key: ${redisKey}`);
|
||||
}
|
||||
|
||||
// 2. Fallback to persistent storage
|
||||
this.logger.debug(`Attempting to get data from storage: ${storageKey}`);
|
||||
const storageData = await this.storageProvider.getObject<T>(storageKey);
|
||||
|
||||
if (!storageData) {
|
||||
this.logger.debug(`Data not found in storage for key: ${storageKey}`);
|
||||
}
|
||||
|
||||
return storageData;
|
||||
} catch (error) {
|
||||
this.handleError(error, `Error in hybrid cache get for keys: ${redisKey}, ${storageKey}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes data from both Redis cache and persistent storage.
|
||||
*
|
||||
* @param redisKey - The Redis key to delete
|
||||
* @param storageKey - The storage key to delete
|
||||
*/
|
||||
protected async deleteFromCacheAndStorage(redisKey: string, storageKey: string): Promise<void> {
|
||||
return await this.deleteFromCacheAndStorageBatch([redisKey], [storageKey]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes data from both Redis cache and persistent storage in batch.
|
||||
*
|
||||
* @param redisKeys - Array of Redis keys to delete
|
||||
* @param storageKeys - Array of storage keys to delete
|
||||
*/
|
||||
protected async deleteFromCacheAndStorageBatch(redisKeys: string[], storageKeys: string[]): Promise<void> {
|
||||
if (redisKeys.length === 0 && storageKeys.length === 0) {
|
||||
this.logger.debug('No keys to delete in batch');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(`Batch deleting ${redisKeys.length} Redis keys and ${storageKeys.length} storage keys`);
|
||||
const operations = [
|
||||
// Batch delete from Redis (only if there are keys to delete)
|
||||
redisKeys.length > 0
|
||||
? this.redisService.delete(redisKeys).catch((error) => {
|
||||
this.logger.warn(`Redis batch delete failed: ${error}`);
|
||||
return Promise.reject({ type: 'redis', error, affectedKeys: redisKeys });
|
||||
})
|
||||
: Promise.resolve(0),
|
||||
|
||||
// Batch delete from storage (only if there are keys to delete)
|
||||
storageKeys.length > 0
|
||||
? this.storageProvider.deleteObjects(storageKeys).catch((error) => {
|
||||
this.logger.warn(`Storage batch delete failed: ${error}`);
|
||||
return Promise.reject({ type: 'storage', error, affectedKeys: storageKeys });
|
||||
})
|
||||
: Promise.resolve()
|
||||
];
|
||||
|
||||
try {
|
||||
const results = await Promise.allSettled(operations);
|
||||
|
||||
const redisResult = results[0];
|
||||
const storageResult = results[1];
|
||||
|
||||
const redisSuccess = redisResult.status === 'fulfilled';
|
||||
const storageSuccess = storageResult.status === 'fulfilled';
|
||||
|
||||
if (redisKeys.length > 0) {
|
||||
if (redisSuccess) {
|
||||
const deletedCount = (redisResult as PromiseFulfilledResult<number>).value;
|
||||
this.logger.debug(`Redis batch delete succeeded: ${deletedCount} keys deleted`);
|
||||
} else {
|
||||
const redisError = (redisResult as PromiseRejectedResult).reason;
|
||||
this.logger.warn(`Redis batch delete failed:`, redisError.error);
|
||||
}
|
||||
}
|
||||
|
||||
if (storageKeys.length > 0) {
|
||||
if (storageSuccess) {
|
||||
this.logger.debug(`Storage batch delete succeeded: ${storageKeys.length} keys deleted`);
|
||||
} else {
|
||||
const storageError = (storageResult as PromiseRejectedResult).reason;
|
||||
this.logger.warn(`Storage batch delete failed:`, storageError.error);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug(`Batch delete completed: Redis=${redisSuccess}, Storage=${storageSuccess}`);
|
||||
} catch (error) {
|
||||
this.handleError(error, `Error in batch delete operation`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
protected handleError(error: unknown, context: string): void {
|
||||
if (error instanceof OpenViduMeetError) {
|
||||
this.logger.error(`${context}: ${error.message}`);
|
||||
} else {
|
||||
this.logger.error(`${context}: ${error}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2,48 +2,8 @@ import { RecordingHelper } from '../../../../helpers/recording.helper.js';
|
||||
import { StorageKeyBuilder } from '../../storage.interface.js';
|
||||
|
||||
export class S3KeyBuilder implements StorageKeyBuilder {
|
||||
buildGlobalConfigKey(): string {
|
||||
return `global-config.json`;
|
||||
}
|
||||
|
||||
buildMeetRoomKey(roomId: string): string {
|
||||
return `rooms/${roomId}/${roomId}.json`;
|
||||
}
|
||||
|
||||
buildAllMeetRoomsKey(roomName?: string): string {
|
||||
const roomSegment = roomName ? `/${roomName}` : '';
|
||||
return `rooms${roomSegment}`;
|
||||
}
|
||||
|
||||
buildArchivedMeetRoomKey(roomId: string): string {
|
||||
return `recordings/.room_metadata/${roomId}/room_metadata.json`;
|
||||
}
|
||||
|
||||
buildMeetRecordingKey(recordingId: string): string {
|
||||
const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
return `recordings/.metadata/${roomId}/${egressId}/${uid}.json`;
|
||||
}
|
||||
|
||||
buildBinaryRecordingKey(recordingId: string): string {
|
||||
const { roomId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
return `recordings/${roomId}/${roomId}--${uid}.mp4`;
|
||||
}
|
||||
|
||||
buildAllMeetRecordingsKey(roomId?: string): string {
|
||||
const roomSegment = roomId ? `/${roomId}` : '';
|
||||
return `recordings/.metadata${roomSegment}`;
|
||||
}
|
||||
|
||||
buildAccessRecordingSecretsKey(recordingId: string): string {
|
||||
const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
return `recordings/.secrets/${roomId}/${egressId}/${uid}.json`;
|
||||
}
|
||||
|
||||
buildUserKey(userId: string): string {
|
||||
return `users/${userId}.json`;
|
||||
}
|
||||
|
||||
buildApiKeysKey(): string {
|
||||
return `api_keys.json`;
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,69 +108,10 @@ export interface StorageProvider {
|
||||
* Provides methods to generate standardized keys for different types of data storage operations.
|
||||
*/
|
||||
export interface StorageKeyBuilder {
|
||||
/**
|
||||
* Builds the key for global config storage.
|
||||
*/
|
||||
buildGlobalConfigKey(): string;
|
||||
|
||||
/**
|
||||
* Builds the key for a specific room.
|
||||
*
|
||||
* @param roomId - The unique identifier of the meeting room
|
||||
*/
|
||||
buildMeetRoomKey(roomId: string): string;
|
||||
|
||||
/**
|
||||
* Builds the key for all meeting rooms.
|
||||
*
|
||||
* @param roomName - Optional name of the meeting room to filter by
|
||||
*/
|
||||
buildAllMeetRoomsKey(roomName?: string): string;
|
||||
|
||||
/**
|
||||
* Builds the key for archived room metadata.
|
||||
*
|
||||
* @param roomId - The unique identifier of the meeting room
|
||||
*/
|
||||
buildArchivedMeetRoomKey(roomId: string): string;
|
||||
|
||||
/**
|
||||
* Builds the key for a specific recording.
|
||||
*
|
||||
* @param recordingId - The unique identifier of the recording
|
||||
*/
|
||||
buildBinaryRecordingKey(recordingId: string): string;
|
||||
|
||||
/**
|
||||
* Builds the key for a specific recording metadata.
|
||||
*
|
||||
* @param recordingId - The unique identifier of the recording
|
||||
*/
|
||||
buildMeetRecordingKey(recordingId: string): string;
|
||||
|
||||
/**
|
||||
* Builds the key for all recordings in a room or globally.
|
||||
*
|
||||
* @param roomId - Optional room identifier to filter recordings by room
|
||||
*/
|
||||
buildAllMeetRecordingsKey(roomId?: string): string;
|
||||
|
||||
/**
|
||||
* Builds the key for access recording secrets.
|
||||
*
|
||||
* @param recordingId - The unique identifier of the recording
|
||||
*/
|
||||
buildAccessRecordingSecretsKey(recordingId: string): string;
|
||||
|
||||
/**
|
||||
* Builds the key for a specific user
|
||||
*
|
||||
* @param userId - The unique identifier of the user
|
||||
*/
|
||||
buildUserKey(userId: string): string;
|
||||
|
||||
/**
|
||||
* Builds Api Key
|
||||
*/
|
||||
buildApiKeysKey(): string;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user