Refactor storage service and interfaces for improved separation of concerns

- Updated StorageFactory to create basic storage providers and key builders.
- Simplified StorageProvider interface to focus on basic CRUD operations.
- Enhanced MeetStorageService to handle domain-specific logic while delegating storage operations.
- Implemented Redis caching for room data to improve performance.
- Added error handling and logging improvements throughout the service.
- Removed deprecated methods and streamlined object retrieval processes.
refactor: update storage service and interfaces to include user key handling and improve initialization logic

refactor: update beforeAll hooks in recording tests to clear rooms and recordings

refactor: optimize integration recordings test command

Revert "refactor: optimize integration recordings test command"

This reverts commit d517a44fa282b91613f8c55130916c2af5f07267.

refactor: enhance Redis cache storage operations

refactor: streamline test setup and teardown for security and recordings APIs
This commit is contained in:
Carlos Santos 2025-06-02 15:41:37 +02:00
parent b53092f2f6
commit 8aa1bbc64b
31 changed files with 1195 additions and 1368 deletions

View File

@ -14,14 +14,25 @@ import {
S3Service, S3Service,
S3StorageProvider, S3StorageProvider,
StorageFactory, StorageFactory,
StorageKeyBuilder,
StorageProvider,
SystemEventService, SystemEventService,
TaskSchedulerService, TaskSchedulerService,
TokenService, TokenService,
UserService UserService
} from '../services/index.js'; } from '../services/index.js';
import { MEET_PREFERENCES_STORAGE_MODE } from '../environment.js';
import { S3KeyBuilder } from '../services/storage/providers/s3/s3-storage-key.builder.js';
export const container: Container = new Container(); export const container: Container = new Container();
export const STORAGE_TYPES = {
StorageProvider: Symbol.for('StorageProvider'),
KeyBuilder: Symbol.for('KeyBuilder'),
S3StorageProvider: Symbol.for('S3StorageProvider'),
S3KeyBuilder: Symbol.for('S3KeyBuilder')
};
/** /**
* Registers all necessary dependencies in the container. * Registers all necessary dependencies in the container.
* *
@ -38,6 +49,7 @@ export const registerDependencies = () => {
container.bind(MutexService).toSelf().inSingletonScope(); container.bind(MutexService).toSelf().inSingletonScope();
container.bind(TaskSchedulerService).toSelf().inSingletonScope(); container.bind(TaskSchedulerService).toSelf().inSingletonScope();
configureStorage(MEET_PREFERENCES_STORAGE_MODE);
container.bind(S3Service).toSelf().inSingletonScope(); container.bind(S3Service).toSelf().inSingletonScope();
container.bind(S3StorageProvider).toSelf().inSingletonScope(); container.bind(S3StorageProvider).toSelf().inSingletonScope();
container.bind(StorageFactory).toSelf().inSingletonScope(); container.bind(StorageFactory).toSelf().inSingletonScope();
@ -55,8 +67,20 @@ export const registerDependencies = () => {
container.bind(LivekitWebhookService).toSelf().inSingletonScope(); container.bind(LivekitWebhookService).toSelf().inSingletonScope();
}; };
const configureStorage = (storageMode: string) => {
container.get(LoggerService).info(`Creating ${storageMode} storage provider`);
switch (storageMode) {
default:
case 's3':
container.bind<StorageProvider>(STORAGE_TYPES.StorageProvider).to(S3StorageProvider).inSingletonScope();
container.bind<StorageKeyBuilder>(STORAGE_TYPES.KeyBuilder).to(S3KeyBuilder).inSingletonScope();
break;
}
};
export const initializeEagerServices = async () => { export const initializeEagerServices = async () => {
// Force the creation of services that need to be initialized at startup // Force the creation of services that need to be initialized at startup
container.get(RecordingService); container.get(RecordingService);
await container.get(MeetStorageService).initialize(); await container.get(MeetStorageService).initializeGlobalPreferences();
}; };

View File

@ -186,12 +186,6 @@ export class RecordingHelper {
return size !== 0 ? size : undefined; return size !== 0 ? size : undefined;
} }
static buildMetadataFilePath(recordingId: string): string {
const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
return `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`;
}
private static toSeconds(nanoseconds: number): number { private static toSeconds(nanoseconds: number): number {
const nanosecondsToSeconds = 1 / 1_000_000_000; const nanosecondsToSeconds = 1 / 1_000_000_000;
return nanoseconds * nanosecondsToSeconds; return nanoseconds * nanosecondsToSeconds;

View File

@ -5,6 +5,8 @@ export const enum RedisKeyPrefix {
export const enum RedisKeyName { export const enum RedisKeyName {
GLOBAL_PREFERENCES = `${RedisKeyPrefix.BASE}global_preferences`, GLOBAL_PREFERENCES = `${RedisKeyPrefix.BASE}global_preferences`,
ROOM = `${RedisKeyPrefix.BASE}room:`, ROOM = `${RedisKeyPrefix.BASE}room:`,
RECORDING = `${RedisKeyPrefix.BASE}recording:`,
ARCHIVED_ROOM = `${RedisKeyPrefix.BASE}archived_room:`,
USER = `${RedisKeyPrefix.BASE}user:`, USER = `${RedisKeyPrefix.BASE}user:`,
} }

View File

@ -4,7 +4,7 @@ export * from './system-event.service.js';
export * from './mutex.service.js'; export * from './mutex.service.js';
export * from './task-scheduler.service.js'; export * from './task-scheduler.service.js';
export * from './s3.service.js'; export * from './storage/providers/s3/s3.service.js';
export * from './storage/index.js'; export * from './storage/index.js';
export * from './token.service.js'; export * from './token.service.js';

View File

@ -12,7 +12,6 @@ import {
OpenViduWebhookService, OpenViduWebhookService,
RecordingService, RecordingService,
RoomService, RoomService,
S3Service,
SystemEventService SystemEventService
} from './index.js'; } from './index.js';
@ -20,7 +19,6 @@ import {
export class LivekitWebhookService { export class LivekitWebhookService {
protected webhookReceiver: WebhookReceiver; protected webhookReceiver: WebhookReceiver;
constructor( constructor(
@inject(S3Service) protected s3Service: S3Service,
@inject(RecordingService) protected recordingService: RecordingService, @inject(RecordingService) protected recordingService: RecordingService,
@inject(LiveKitService) protected livekitService: LiveKitService, @inject(LiveKitService) protected livekitService: LiveKitService,
@inject(RoomService) protected roomService: RoomService, @inject(RoomService) protected roomService: RoomService,

View File

@ -13,11 +13,9 @@ import {
errorRecordingCannotBeStoppedWhileStarting, errorRecordingCannotBeStoppedWhileStarting,
errorRecordingNotFound, errorRecordingNotFound,
errorRecordingNotStopped, errorRecordingNotStopped,
errorRecordingRangeNotSatisfiable,
errorRecordingStartTimeout, errorRecordingStartTimeout,
errorRoomHasNoParticipants, errorRoomHasNoParticipants,
errorRoomNotFound, errorRoomNotFound,
internalError,
isErrorRecordingAlreadyStopped, isErrorRecordingAlreadyStopped,
isErrorRecordingCannotBeStoppedWhileStarting, isErrorRecordingCannotBeStoppedWhileStarting,
isErrorRecordingNotFound, isErrorRecordingNotFound,
@ -206,29 +204,16 @@ export class RecordingService {
async deleteRecording(recordingId: string): Promise<MeetRecordingInfo> { async deleteRecording(recordingId: string): Promise<MeetRecordingInfo> {
try { try {
// Get the recording metada and recording info from the S3 bucket // Get the recording metada and recording info from the S3 bucket
const { binaryFilesToDelete, metadataFilesToDelete, recordingInfo } = const { recordingInfo } = await this.storageService.getRecordingMetadata(recordingId);
await this.getDeletableRecordingFiles(recordingId);
const { roomId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
const deleteRecordingTasks: Promise<unknown>[] = [];
if (binaryFilesToDelete.size > 0) { // Validate the recording status
// Delete video files from S3 if (!RecordingHelper.canBeDeleted(recordingInfo)) throw errorRecordingNotStopped(recordingId);
deleteRecordingTasks.push(
this.storageService.deleteRecordingBinaryFilesByPaths(Array.from(binaryFilesToDelete))
);
}
if (metadataFilesToDelete.size > 0) { await this.storageService.deleteRecording(recordingId);
// Delete metadata files from storage provider
deleteRecordingTasks.push(
this.storageService.deleteRecordingMetadataByPaths(Array.from(metadataFilesToDelete))
);
}
await Promise.all(deleteRecordingTasks); this.logger.info(`Successfully deleted recording ${recordingId}`);
this.logger.info(`Successfully deleted ${recordingId}`);
const { roomId } = recordingInfo;
const shouldDeleteRoomMetadata = await this.shouldDeleteRoomMetadata(roomId); const shouldDeleteRoomMetadata = await this.shouldDeleteRoomMetadata(roomId);
if (shouldDeleteRoomMetadata) { if (shouldDeleteRoomMetadata) {
@ -253,8 +238,7 @@ export class RecordingService {
async bulkDeleteRecordingsAndAssociatedFiles( async bulkDeleteRecordingsAndAssociatedFiles(
recordingIds: string[] recordingIds: string[]
): Promise<{ deleted: string[]; notDeleted: { recordingId: string; error: string }[] }> { ): Promise<{ deleted: string[]; notDeleted: { recordingId: string; error: string }[] }> {
let allMetadataFilesToDelete: Set<string> = new Set<string>(); const validRecordingIds: Set<string> = new Set<string>();
let allBinaryFilesToDelete: Set<string> = new Set<string>();
const deletedRecordings: Set<string> = new Set<string>(); const deletedRecordings: Set<string> = new Set<string>();
const notDeletedRecordings: Set<{ recordingId: string; error: string }> = new Set(); const notDeletedRecordings: Set<{ recordingId: string; error: string }> = new Set();
const roomsToCheck: Set<string> = new Set(); const roomsToCheck: Set<string> = new Set();
@ -262,35 +246,32 @@ export class RecordingService {
// Check if the recording is in progress // Check if the recording is in progress
for (const recordingId of recordingIds) { for (const recordingId of recordingIds) {
try { try {
const { binaryFilesToDelete, metadataFilesToDelete } = const { recordingInfo } = await this.storageService.getRecordingMetadata(recordingId);
await this.getDeletableRecordingFiles(recordingId);
// Add files to the set of files to delete
allBinaryFilesToDelete = new Set([...allBinaryFilesToDelete, ...binaryFilesToDelete]);
allMetadataFilesToDelete = new Set([...allMetadataFilesToDelete, ...metadataFilesToDelete]);
if (!RecordingHelper.canBeDeleted(recordingInfo)) {
throw errorRecordingNotStopped(recordingId);
}
validRecordingIds.add(recordingId);
deletedRecordings.add(recordingId); deletedRecordings.add(recordingId);
// Track the roomId for checking if the room metadata file should be deleted // Track room for metadata cleanup
const { roomId } = RecordingHelper.extractInfoFromRecordingId(recordingId); roomsToCheck.add(recordingInfo.roomId);
roomsToCheck.add(roomId);
} catch (error) { } catch (error) {
this.logger.error(`BulkDelete: Error processing recording ${recordingId}: ${error}`); this.logger.error(`BulkDelete: Error processing recording ${recordingId}: ${error}`);
notDeletedRecordings.add({ recordingId, error: (error as OpenViduMeetError).message }); notDeletedRecordings.add({ recordingId, error: (error as OpenViduMeetError).message });
} }
} }
if (allBinaryFilesToDelete.size === 0) { if (validRecordingIds.size === 0) {
this.logger.warn(`BulkDelete: No eligible recordings found for deletion.`); this.logger.warn(`BulkDelete: No eligible recordings found for deletion.`);
return { deleted: Array.from(deletedRecordings), notDeleted: Array.from(notDeletedRecordings) }; return { deleted: Array.from(deletedRecordings), notDeleted: Array.from(notDeletedRecordings) };
} }
// Delete recordings and its metadata from S3 // Delete recordings and its metadata from S3
try { try {
await Promise.all([ await this.storageService.deleteRecordings(Array.from(validRecordingIds));
this.storageService.deleteRecordingBinaryFilesByPaths(Array.from(allBinaryFilesToDelete)), this.logger.info(`BulkDelete: Successfully deleted ${validRecordingIds.size} recordings.`);
this.storageService.deleteRecordingMetadataByPaths(Array.from(allMetadataFilesToDelete))
]);
this.logger.info(`BulkDelete: Successfully deleted ${allBinaryFilesToDelete.size} recordings.`);
} catch (error) { } catch (error) {
this.logger.error(`BulkDelete: Error performing bulk deletion: ${error}`); this.logger.error(`BulkDelete: Error performing bulk deletion: ${error}`);
throw error; throw error;
@ -298,27 +279,35 @@ export class RecordingService {
// Check if the room metadata file should be deleted // Check if the room metadata file should be deleted
const roomMetadataToDelete: string[] = []; const roomMetadataToDelete: string[] = [];
const deleteTasks: Promise<void>[] = [];
for (const roomId of roomsToCheck) { for (const roomId of roomsToCheck) {
const shouldDeleteRoomMetadata = await this.shouldDeleteRoomMetadata(roomId); const shouldDeleteRoomMetadata = await this.shouldDeleteRoomMetadata(roomId);
if (shouldDeleteRoomMetadata) { if (shouldDeleteRoomMetadata) {
deleteTasks.push(this.storageService.deleteArchivedRoomMetadata(roomId));
roomMetadataToDelete.push(roomId); roomMetadataToDelete.push(roomId);
} }
} }
if (roomMetadataToDelete.length === 0) {
this.logger.verbose(`BulkDelete: No room metadata files to delete.`);
return { deleted: Array.from(deletedRecordings), notDeleted: Array.from(notDeletedRecordings) };
}
// Perform bulk deletion of room metadata files
try { try {
this.logger.verbose(`Deleting room_metadata.json for rooms: ${roomMetadataToDelete.join(', ')}`); await Promise.all(
await Promise.all(deleteTasks); roomMetadataToDelete.map((roomId) => this.storageService.deleteArchivedRoomMetadata(roomId))
);
this.logger.verbose(`BulkDelete: Successfully deleted ${roomMetadataToDelete.length} room metadata files.`); this.logger.verbose(`BulkDelete: Successfully deleted ${roomMetadataToDelete.length} room metadata files.`);
} catch (error) { } catch (error) {
this.logger.error(`BulkDelete: Error performing bulk deletion: ${error}`); this.logger.error(`BulkDelete: Error performing bulk deletion: ${error}`);
throw error; throw error;
} }
return { deleted: Array.from(deletedRecordings), notDeleted: Array.from(notDeletedRecordings) }; return {
deleted: Array.from(deletedRecordings),
notDeleted: Array.from(notDeletedRecordings)
};
} }
/** /**
@ -330,11 +319,10 @@ export class RecordingService {
*/ */
protected async shouldDeleteRoomMetadata(roomId: string): Promise<boolean | null> { protected async shouldDeleteRoomMetadata(roomId: string): Promise<boolean | null> {
try { try {
const metadataPrefix = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.metadata/${roomId}`; const { recordings } = await this.storageService.getAllRecordings(roomId, 1);
const { Contents } = await this.storageService.listObjects(metadataPrefix, 1);
// If no metadata files exist or the list is empty, the room metadata should be deleted // If no recordings exist or the list is empty, the room metadata should be deleted
return !Contents || Contents.length === 0; return !recordings || recordings.length === 0;
} catch (error) { } catch (error) {
this.logger.warn(`Error checking room metadata for deletion (room ${roomId}): ${error}`); this.logger.warn(`Error checking room metadata for deletion (room ${roomId}): ${error}`);
return null; return null;
@ -363,45 +351,32 @@ export class RecordingService {
* - `nextPageToken`: (Optional) A token to retrieve the next page of results, if available. * - `nextPageToken`: (Optional) A token to retrieve the next page of results, if available.
* @throws Will throw an error if there is an issue retrieving the recordings. * @throws Will throw an error if there is an issue retrieving the recordings.
*/ */
async getAllRecordings({ maxItems, nextPageToken, roomId, fields }: MeetRecordingFilters): Promise<{ async getAllRecordings(filters: MeetRecordingFilters): Promise<{
recordings: MeetRecordingInfo[]; recordings: MeetRecordingInfo[];
isTruncated: boolean; isTruncated: boolean;
nextPageToken?: string; nextPageToken?: string;
}> { }> {
try { try {
// Construct the room prefix if a room ID is provided const { maxItems, nextPageToken, roomId, fields } = filters;
const roomPrefix = roomId ? `/${roomId}` : '';
const recordingPrefix = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.metadata${roomPrefix}`;
// Retrieve the recordings from the S3 bucket const response = await this.storageService.getAllRecordings(roomId, maxItems, nextPageToken);
const { Contents, IsTruncated, NextContinuationToken } = await this.storageService.listObjects(
recordingPrefix,
maxItems,
nextPageToken
);
if (!Contents) { // Apply field filtering if specified
this.logger.verbose('No recordings found. Returning an empty array.'); if (fields) {
return { recordings: [], isTruncated: false }; response.recordings = response.recordings.map((rec) =>
UtilsHelper.filterObjectFields(rec, fields)
) as MeetRecordingInfo[];
} }
const promises: Promise<MeetRecordingInfo>[] = []; const { recordings, isTruncated, nextContinuationToken } = response;
// Retrieve the metadata for each recording
Contents.forEach((item) => {
if (item?.Key && item.Key.endsWith('.json') && !item.Key.endsWith('secrets.json')) {
promises.push(
this.storageService.getRecordingMetadataByPath(item.Key) as Promise<MeetRecordingInfo>
);
}
});
let recordings = await Promise.all(promises);
recordings = recordings.map((rec) => UtilsHelper.filterObjectFields(rec, fields)) as MeetRecordingInfo[];
this.logger.info(`Retrieved ${recordings.length} recordings.`); this.logger.info(`Retrieved ${recordings.length} recordings.`);
// Return the paginated list of recordings // Return the paginated list of recordings
return { recordings, isTruncated: !!IsTruncated, nextPageToken: NextContinuationToken }; return {
recordings,
isTruncated: Boolean(isTruncated),
nextPageToken: nextContinuationToken
};
} catch (error) { } catch (error) {
this.logger.error(`Error getting recordings: ${error}`); this.logger.error(`Error getting recordings: ${error}`);
throw error; throw error;
@ -410,64 +385,33 @@ export class RecordingService {
async getRecordingAsStream( async getRecordingAsStream(
recordingId: string, recordingId: string,
range?: string rangeHeader?: string
): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> { ): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> {
const DEFAULT_RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB const DEFAULT_CHUNK_SIZE = 5 * 1024 * 1024; // 5MB
// Ensure the recording is streamable
const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId); const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId);
if (recordingInfo.status !== MeetRecordingStatus.COMPLETE) { if (recordingInfo.status !== MeetRecordingStatus.COMPLETE) {
throw errorRecordingNotStopped(recordingId); throw errorRecordingNotStopped(recordingId);
} }
const recordingPath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/${RecordingHelper.extractFilename(recordingInfo)}`; let validatedRange = undefined;
if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`); // Parse the range header if provided
if (rangeHeader) {
const match = rangeHeader.match(/^bytes=(\d+)-(\d*)$/)!;
const endStr = match[2];
const { contentLength: fileSize } = await this.storageService.getObjectHeaders(recordingPath); const start = parseInt(match[1], 10);
const end = endStr ? parseInt(endStr, 10) : start + DEFAULT_CHUNK_SIZE - 1;
if (!fileSize) { validatedRange = { start, end };
this.logger.error(`Error getting file size for recording ${recordingId}`); this.logger.debug(`Streaming partial content for recording '${recordingId}' from ${start} to ${end}.`);
throw internalError(`getting file size for recording '${recordingId}'`);
}
if (range) {
// Parse the range header
const matches = range.match(/^bytes=(\d+)-(\d*)$/)!;
const start = parseInt(matches[1], 10);
let end = matches[2] ? parseInt(matches[2], 10) : start + DEFAULT_RECORDING_FILE_PORTION_SIZE;
// Validate the range values
if (isNaN(start) || isNaN(end) || start < 0) {
this.logger.warn(`Invalid range values for recording ${recordingId}: start=${start}, end=${end}`);
this.logger.warn(`Returning full stream for recording ${recordingId}`);
return this.getFullStreamResponse(recordingPath, fileSize);
}
if (start >= fileSize) {
this.logger.error(
`Invalid range values for recording ${recordingId}: start=${start}, end=${end}, fileSize=${fileSize}`
);
throw errorRecordingRangeNotSatisfiable(recordingId, fileSize);
}
// Adjust the end value to ensure it doesn't exceed the file size
end = Math.min(end, fileSize - 1);
// If the start is greater than the end, return the full stream
if (start > end) {
this.logger.warn(`Invalid range values after adjustment: start=${start}, end=${end}`);
return this.getFullStreamResponse(recordingPath, fileSize);
}
const fileStream = await this.storageService.getRecordingMedia(recordingPath, {
start,
end
});
return { fileSize, fileStream, start, end };
} else { } else {
return this.getFullStreamResponse(recordingPath, fileSize); this.logger.debug(`Streaming full content for recording '${recordingId}'.`);
} }
return this.storageService.getRecordingMedia(recordingId, validatedRange);
} }
protected async validateRoomForStartRecording(roomId: string): Promise<void> { protected async validateRoomForStartRecording(roomId: string): Promise<void> {
@ -486,14 +430,6 @@ export class RecordingService {
if (!hasParticipants) throw errorRoomHasNoParticipants(roomId); if (!hasParticipants) throw errorRoomHasNoParticipants(roomId);
} }
protected async getFullStreamResponse(
recordingPath: string,
fileSize: number
): Promise<{ fileSize: number; fileStream: Readable }> {
const fileStream = await this.storageService.getRecordingMedia(recordingPath);
return { fileSize, fileStream };
}
/** /**
* Acquires a Redis-based lock to indicate that a recording is active for a specific room. * Acquires a Redis-based lock to indicate that a recording is active for a specific room.
* *
@ -563,37 +499,6 @@ export class RecordingService {
} }
} }
/**
* Retrieves the data required to delete a recording, including the file paths
* to be deleted and the recording's metadata information.
*
* @param recordingId - The unique identifier of the recording egress.
*/
protected async getDeletableRecordingFiles(recordingId: string): Promise<{
binaryFilesToDelete: Set<string>;
metadataFilesToDelete: Set<string>;
recordingInfo: MeetRecordingInfo;
}> {
const { metadataFilePath, recordingInfo } = await this.storageService.getRecordingMetadata(recordingId);
const binaryFilesToDelete: Set<string> = new Set();
const metadataFilesToDelete: Set<string> = new Set();
// Validate the recording status
if (!RecordingHelper.canBeDeleted(recordingInfo)) throw errorRecordingNotStopped(recordingId);
const filename = RecordingHelper.extractFilename(recordingInfo);
if (!filename) {
throw internalError(`extracting path from recording '${recordingId}'`);
}
const recordingPath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/${filename}`;
binaryFilesToDelete.add(recordingPath);
metadataFilesToDelete.add(metadataFilePath);
return { binaryFilesToDelete, metadataFilesToDelete, recordingInfo };
}
protected generateCompositeOptionsFromRequest(layout = 'grid'): RoomCompositeOptions { protected generateCompositeOptionsFromRequest(layout = 'grid'): RoomCompositeOptions {
return { return {
layout: layout layout: layout

View File

@ -222,9 +222,11 @@ export class RedisService extends EventEmitter {
} }
/** /**
* Deletes a key from Redis. * Deletes one or more keys from Redis.
* @param key - The key to delete. *
* @returns A promise that resolves to the number of keys deleted. * @param keys - A single key string or an array of key strings to delete from Redis
* @returns A Promise that resolves to the number of keys that were successfully deleted
* @throws {Error} Throws an internal error if the deletion operation fails
*/ */
delete(keys: string | string[]): Promise<number> { delete(keys: string | string[]): Promise<number> {
try { try {
@ -238,8 +240,19 @@ export class RedisService extends EventEmitter {
} }
} }
quit() { cleanup() {
this.logger.verbose('Cleaning up Redis connections');
this.redisPublisher.quit(); this.redisPublisher.quit();
this.redisSubscriber.quit();
this.removeAllListeners();
if (this.eventHandler) {
this.off('systemEvent', this.eventHandler);
this.eventHandler = undefined;
}
this.isConnected = false;
this.logger.verbose('Redis connections cleaned up');
} }
async checkHealth() { async checkHealth() {

View File

@ -13,7 +13,12 @@ import { uid as secureUid } from 'uid/secure';
import { uid } from 'uid/single'; import { uid } from 'uid/single';
import INTERNAL_CONFIG from '../config/internal-config.js'; import INTERNAL_CONFIG from '../config/internal-config.js';
import { MeetRoomHelper, OpenViduComponentsAdapterHelper, UtilsHelper } from '../helpers/index.js'; import { MeetRoomHelper, OpenViduComponentsAdapterHelper, UtilsHelper } from '../helpers/index.js';
import { errorInvalidRoomSecret, errorRoomMetadataNotFound, internalError } from '../models/error.model.js'; import {
errorInvalidRoomSecret,
errorRoomMetadataNotFound,
errorRoomNotFound,
internalError
} from '../models/error.model.js';
import { import {
IScheduledTask, IScheduledTask,
LiveKitService, LiveKitService,
@ -126,7 +131,7 @@ export class RoomService {
await this.storageService.saveMeetRoom(room); await this.storageService.saveMeetRoom(room);
// Update the archived room metadata if it exists // Update the archived room metadata if it exists
await this.storageService.updateArchivedRoomMetadata(roomId); await this.storageService.archiveRoomMetadata(roomId);
return room; return room;
} }
@ -160,8 +165,10 @@ export class RoomService {
}> { }> {
const response = await this.storageService.getMeetRooms(maxItems, nextPageToken); const response = await this.storageService.getMeetRooms(maxItems, nextPageToken);
const filteredRooms = response.rooms.map((room) => UtilsHelper.filterObjectFields(room, fields)); if (fields) {
const filteredRooms = response.rooms.map((room: MeetRoom) => UtilsHelper.filterObjectFields(room, fields));
response.rooms = filteredRooms as MeetRoom[]; response.rooms = filteredRooms as MeetRoom[];
}
return response; return response;
} }
@ -175,6 +182,11 @@ export class RoomService {
async getMeetRoom(roomId: string, fields?: string): Promise<MeetRoom> { async getMeetRoom(roomId: string, fields?: string): Promise<MeetRoom> {
const meetRoom = await this.storageService.getMeetRoom(roomId); const meetRoom = await this.storageService.getMeetRoom(roomId);
if (!meetRoom) {
this.logger.error(`Meet room with ID ${roomId} not found.`);
throw errorRoomNotFound(roomId);
}
return UtilsHelper.filterObjectFields(meetRoom, fields) as MeetRoom; return UtilsHelper.filterObjectFields(meetRoom, fields) as MeetRoom;
} }
@ -251,6 +263,12 @@ export class RoomService {
*/ */
protected async markRoomAsDeleted(roomId: string): Promise<void> { protected async markRoomAsDeleted(roomId: string): Promise<void> {
const room = await this.storageService.getMeetRoom(roomId); const room = await this.storageService.getMeetRoom(roomId);
if (!room) {
this.logger.error(`Room with ID ${roomId} not found for deletion.`);
throw errorRoomNotFound(roomId);
}
room.markedForDeletion = true; room.markedForDeletion = true;
await this.storageService.saveMeetRoom(room); await this.storageService.saveMeetRoom(room);
} }

View File

@ -1,4 +1,5 @@
export * from './storage.interface.js'; export * from './storage.interface.js';
export * from './providers/s3-storage.provider.js';
export * from './storage.factory.js'; export * from './storage.factory.js';
export * from './storage.service.js'; export * from './storage.service.js';
export * from './providers/s3/s3-storage.provider.js';

View File

@ -1,699 +0,0 @@
import { PutObjectCommandOutput } from '@aws-sdk/client-s3';
import { GlobalPreferences, MeetRecordingInfo, MeetRoom, User } from '@typings-ce';
import { inject, injectable } from 'inversify';
import INTERNAL_CONFIG from '../../../config/internal-config.js';
import { errorRecordingNotFound, OpenViduMeetError, RedisKeyName } from '../../../models/index.js';
import { LoggerService, RedisService, S3Service, StorageProvider } from '../../index.js';
import { RecordingHelper } from '../../../helpers/recording.helper.js';
import { Readable } from 'stream';
/**
* Implementation of the StorageProvider interface using AWS S3 for persistent storage
* with Redis caching for improved performance.
*
* This class provides operations for storing and retrieving application preferences,
* rooms, recordings metadata and users with a two-tiered storage approach:
* - Redis is used as a primary cache for fast access
* - S3 serves as the persistent storage layer and fallback when data is not in Redis
*
* The storage operations are performed in parallel to both systems when writing data,
* with transaction-like rollback behavior if one operation fails.
*
* @template GPrefs - Type for global preferences data, defaults to GlobalPreferences
* @template MRoom - Type for room data, defaults to MeetRoom
* @template MRec - Type for recording metadata, defaults to MeetRecordingInfo
* @template MUser - Type for user data, defaults to User
*
* @implements {StorageProvider}
*/
@injectable()
export class S3StorageProvider<
GPrefs extends GlobalPreferences = GlobalPreferences,
MRoom extends MeetRoom = MeetRoom,
MRec extends MeetRecordingInfo = MeetRecordingInfo,
MUser extends User = User
> implements StorageProvider
{
protected readonly S3_GLOBAL_PREFERENCES_KEY = `global-preferences.json`;
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(S3Service) protected s3Service: S3Service,
@inject(RedisService) protected redisService: RedisService
) {}
/**
* Retrieves metadata headers for an object stored in S3.
*
* @param filePath - The path/key of the file in the S3 bucket
* @returns A promise that resolves to an object containing the content length and content type of the file
* @throws Will throw an error if the S3 operation fails or the file doesn't exist
*/
async getObjectHeaders(filePath: string): Promise<{ contentLength?: number; contentType?: string }> {
try {
const data = await this.s3Service.getHeaderObject(filePath);
return {
contentLength: data.ContentLength,
contentType: data.ContentType
};
} catch (error) {
this.logger.error(`Error fetching object headers for ${filePath}: ${error}`);
throw error;
}
}
/**
* Lists objects in the storage with optional pagination support.
*
* @param prefix - The prefix to filter objects by (acts as a folder path)
* @param maxItems - Maximum number of items to return (optional)
* @param nextPageToken - Token for pagination to get the next page (optional)
* @returns Promise resolving to paginated list of objects with metadata
*/
async listObjects(
prefix: string,
maxItems?: number,
nextPageToken?: string
): Promise<{
Contents?: Array<{
Key?: string;
LastModified?: Date;
Size?: number;
ETag?: string;
}>;
IsTruncated?: boolean;
NextContinuationToken?: string;
}> {
try {
this.logger.debug(
`Listing objects with prefix: ${prefix}, maxItems: ${maxItems}, nextPageToken: ${nextPageToken}`
);
return await this.s3Service.listObjectsPaginated(prefix, maxItems, nextPageToken);
} catch (error) {
this.handleError(error, `Error listing objects with prefix ${prefix}`);
throw error;
}
}
/**
* Initializes global preferences. If no preferences exist, persists the provided defaults.
* If preferences exist but belong to a different project, they are replaced.
*
* @param defaultPreferences - The default preferences to initialize with.
*/
async initialize(defaultPreferences: GPrefs): Promise<void> {
try {
const existingPreferences = await this.getGlobalPreferences();
if (!existingPreferences) {
this.logger.info('No existing preferences found. Saving default preferences to S3.');
await this.saveGlobalPreferences(defaultPreferences);
return;
}
this.logger.verbose('Global preferences found. Checking project association...');
const isDifferentProject = existingPreferences.projectId !== defaultPreferences.projectId;
if (isDifferentProject) {
this.logger.warn(
`Existing global preferences belong to project [${existingPreferences.projectId}], ` +
`which differs from current project [${defaultPreferences.projectId}]. Replacing preferences.`
);
await this.saveGlobalPreferences(defaultPreferences);
return;
}
this.logger.verbose(
'Global preferences for the current project are already initialized. No action needed.'
);
} catch (error) {
this.logger.error('Error during global preferences initialization:', error);
}
}
/**
* Retrieves the global preferences.
* First attempts to retrieve from Redis; if not available, falls back to S3.
* If fetched from S3, caches the result in Redis.
*
* @returns A promise that resolves to the global preferences or null if not found.
*/
async getGlobalPreferences(): Promise<GPrefs | null> {
try {
// Try to get preferences from Redis cache
let preferences: GPrefs | null = await this.getFromRedis<GPrefs>(RedisKeyName.GLOBAL_PREFERENCES);
if (!preferences) {
this.logger.debug('Global preferences not found in Redis. Fetching from S3...');
preferences = await this.getFromS3<GPrefs>(this.S3_GLOBAL_PREFERENCES_KEY);
if (preferences) {
this.logger.verbose('Fetched global preferences from S3. Caching them in Redis.');
const redisPayload = JSON.stringify(preferences);
await this.redisService.set(RedisKeyName.GLOBAL_PREFERENCES, redisPayload, false);
} else {
this.logger.warn('No global preferences found in S3.');
}
} else {
this.logger.verbose('Global preferences retrieved from Redis.');
}
return preferences;
} catch (error) {
this.handleError(error, 'Error fetching preferences');
return null;
}
}
/**
* Persists the global preferences to both S3 and Redis in parallel.
* Uses Promise.all to execute both operations concurrently.
*
* @param preferences - Global preferences to store.
* @returns The saved preferences.
* @throws Rethrows any error if saving fails.
*/
async saveGlobalPreferences(preferences: GPrefs): Promise<GPrefs> {
try {
const redisPayload = JSON.stringify(preferences);
await Promise.all([
this.s3Service.saveObject(this.S3_GLOBAL_PREFERENCES_KEY, preferences),
this.redisService.set(RedisKeyName.GLOBAL_PREFERENCES, redisPayload, false)
]);
this.logger.info('Global preferences saved successfully');
return preferences;
} catch (error) {
this.handleError(error, 'Error saving global preferences');
throw error;
}
}
/**
* Persists a room object to S3 and Redis concurrently.
* If at least one operation fails, performs a rollback by deleting the successfully saved object.
*
* @param meetRoom - The room object to save.
* @returns The saved room if both operations succeed.
* @throws The error from the first failed operation.
*/
async saveMeetRoom(meetRoom: MRoom): Promise<MRoom> {
const { roomId } = meetRoom;
const s3Path = `${INTERNAL_CONFIG.S3_ROOMS_PREFIX}/${roomId}/${roomId}.json`;
const redisPayload = JSON.stringify(meetRoom);
const redisKey = RedisKeyName.ROOM + roomId;
const [s3Result, redisResult] = await Promise.allSettled([
this.s3Service.saveObject(s3Path, meetRoom),
this.redisService.set(redisKey, redisPayload, false)
]);
if (s3Result.status === 'fulfilled' && redisResult.status === 'fulfilled') {
return meetRoom;
}
// Rollback any changes made by the successful operation
await this.rollbackRoomSave(roomId, s3Result, redisResult, s3Path, redisKey);
// Return the error that occurred first
const failedOperation: PromiseRejectedResult =
s3Result.status === 'rejected' ? s3Result : (redisResult as PromiseRejectedResult);
const error = failedOperation.reason;
this.handleError(error, `Error saving Room preferences for room ${roomId}`);
throw error;
}
/**
* Retrieves the list of Meet rooms from S3.
*
* @param maxItems - Maximum number of items to retrieve.
* @param nextPageToken - Continuation token for pagination.
* @returns An object containing the list of rooms, a flag indicating whether the list is truncated, and, if available, the next page token.
*/
async getMeetRooms(
maxItems: number,
nextPageToken?: string
): Promise<{
rooms: MRoom[];
isTruncated: boolean;
nextPageToken?: string;
}> {
try {
const {
Contents: roomFiles,
IsTruncated,
NextContinuationToken
} = await this.s3Service.listObjectsPaginated(INTERNAL_CONFIG.S3_ROOMS_PREFIX, maxItems, nextPageToken);
if (!roomFiles || roomFiles.length === 0) {
this.logger.verbose('No room files found in S3.');
return { rooms: [], isTruncated: false };
}
// Extract room IDs directly and filter out invalid values
const roomIds = roomFiles
.map((file) => this.extractRoomId(file.Key))
.filter((id): id is string => Boolean(id));
// Fetch and log any room lookup errors individually
// Fetch room preferences in parallel
const rooms = await Promise.all(
roomIds.map(async (roomId) => {
try {
return await this.getMeetRoom(roomId);
} catch (error: unknown) {
this.logger.warn(`Failed to fetch room "${roomId}": ${error}`);
return null;
}
})
);
// Filter out null values
const validRooms = rooms.filter((room) => room !== null) as MRoom[];
return { rooms: validRooms, isTruncated: !!IsTruncated, nextPageToken: NextContinuationToken };
} catch (error) {
this.handleError(error, 'Error fetching Room preferences');
return { rooms: [], isTruncated: false };
}
}
async getMeetRoom(roomId: string): Promise<MRoom | null> {
try {
// Try to get room preferences from Redis cache
const room: MRoom | null = await this.getFromRedis<MRoom>(roomId);
if (!room) {
const s3RoomPath = `${INTERNAL_CONFIG.S3_ROOMS_PREFIX}/${roomId}/${roomId}.json`;
this.logger.debug(`Room ${roomId} not found in Redis. Fetching from S3 at ${s3RoomPath}...`);
return await this.getFromS3<MRoom>(s3RoomPath);
}
this.logger.debug(`Room ${roomId} verified in Redis`);
return room;
} catch (error) {
this.handleError(error, `Error fetching Room preferences for room ${roomId}`);
return null;
}
}
async deleteMeetRooms(roomIds: string[]): Promise<void> {
const roomsToDelete = roomIds.map((id) => `${INTERNAL_CONFIG.S3_ROOMS_PREFIX}/${id}/${id}.json`);
const redisKeysToDelete = roomIds.map((id) => RedisKeyName.ROOM + id);
try {
await Promise.all([
this.s3Service.deleteObjects(roomsToDelete),
this.redisService.delete(redisKeysToDelete)
]);
this.logger.verbose(`Rooms deleted successfully: ${roomIds.join(', ')}`);
} catch (error) {
this.handleError(error, `Error deleting rooms: ${roomIds.join(', ')}`);
}
}
async getArchivedRoomMetadata(roomId: string): Promise<Partial<MRoom> | null> {
try {
const filePath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.room_metadata/${roomId}/room_metadata.json`;
const roomMetadata = await this.getFromS3<Partial<MRoom>>(filePath);
if (!roomMetadata) {
this.logger.warn(`Room metadata not found for room ${roomId} in recordings bucket`);
return null;
}
return roomMetadata;
} catch (error) {
this.handleError(error, `Error fetching archived room metadata for room ${roomId}`);
return null;
}
}
/**
* Saves room metadata to a JSON file in the S3 bucket if it doesn't already exist.
*
* This method checks if the metadata file for the given room already exists in the
* S3 bucket. If not, it retrieves the room information, extracts the necessary
* secrets and preferences, and saves them to a metadata JSON file in the
* .room_metadata/{roomId}/ directory of the S3 bucket.
*
* @param roomId - The unique identifier of the room
*/
async archiveRoomMetadata(roomId: string): Promise<void> {
try {
const filePath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.room_metadata/${roomId}/room_metadata.json`;
const fileExists = await this.s3Service.exists(filePath);
if (fileExists) {
this.logger.debug(`Room metadata already saved for room ${roomId} in recordings bucket`);
return;
}
const room = await this.getMeetRoom(roomId);
if (room) {
const roomMetadata = {
moderatorRoomUrl: room.moderatorRoomUrl,
publisherRoomUrl: room.publisherRoomUrl,
preferences: {
recordingPreferences: room.preferences?.recordingPreferences
}
};
await this.s3Service.saveObject(filePath, roomMetadata);
this.logger.debug(`Room metadata saved for room ${roomId} in recordings bucket`);
return;
}
this.logger.error(`Error saving room metadata for room ${roomId} in recordings bucket`);
} catch (error) {
this.logger.error(`Error saving room metadata for room ${roomId} in recordings bucket: ${error}`);
}
}
/**
* Updates the archived room metadata for a given room in the S3 recordings bucket if it exists.
*
* @param roomId - The unique identifier of the room whose metadata needs to be updated.
*/
async updateArchivedRoomMetadata(roomId: string): Promise<void> {
try {
const filePath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.room_metadata/${roomId}/room_metadata.json`;
const fileExists = await this.s3Service.exists(filePath);
if (!fileExists) {
this.logger.warn(`Room metadata not found for room ${roomId} in recordings bucket`);
return;
}
const room = await this.getMeetRoom(roomId);
if (room) {
const roomMetadata = {
moderatorRoomUrl: room.moderatorRoomUrl,
publisherRoomUrl: room.publisherRoomUrl,
preferences: {
recordingPreferences: room.preferences?.recordingPreferences
}
};
await this.s3Service.saveObject(filePath, roomMetadata);
this.logger.debug(`Room metadata updated for room ${roomId} in recordings bucket`);
return;
}
this.logger.error(`Error updating room metadata for room ${roomId} in recordings bucket`);
} catch (error) {
this.logger.error(`Error updating room metadata for room ${roomId} in recordings bucket: ${error}`);
}
}
async deleteArchivedRoomMetadata(roomId: string): Promise<void> {
const archivedRoomMetadataPath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.room_metadata/${roomId}/room_metadata.json`;
try {
await this.s3Service.deleteObjects([archivedRoomMetadataPath]);
this.logger.verbose(`Archived room metadata deleted for room ${roomId} in recordings bucket`);
} catch (error) {
this.logger.error(
`Error deleting archived room metadata for room ${roomId} in recordings bucket: ${error}`
);
this.handleError(error, `Error deleting archived room metadata for room ${roomId}`);
throw error;
}
}
async getRecordingMedia(
recordingPath: string,
range?: {
end: number;
start: number;
}
): Promise<Readable> {
try {
this.logger.debug(`Retrieving recording media from S3 at path: ${recordingPath}`);
return await this.s3Service.getObjectAsStream(recordingPath, range);
} catch (error) {
this.handleError(error, `Error fetching recording media for path ${recordingPath}`);
throw error;
}
}
/**
* Deletes multiple recording binary files from S3 storage using their file paths.
*
* @param recordingPaths - Array of file paths/keys identifying the recording files to delete from S3
* @returns A Promise that resolves when all files have been successfully deleted
* @throws Will throw an error if the S3 delete operation fails
*/
async deleteRecordingBinaryFilesByPaths(recordingPaths: string[]): Promise<void> {
try {
await this.s3Service.deleteObjects(recordingPaths);
this.logger.verbose(`Deleted recording binary files: ${recordingPaths.join(', ')}`);
} catch (error) {
this.handleError(error, `Error deleting recording binary files: ${recordingPaths.join(', ')}`);
throw error;
}
}
async getRecordingMetadata(recordingId: string): Promise<{ recordingInfo: MRec; metadataFilePath: string }> {
try {
const metadataPath = RecordingHelper.buildMetadataFilePath(recordingId);
this.logger.debug(`Retrieving metadata for recording ${recordingId} from ${metadataPath}`);
const recordingInfo = (await this.s3Service.getObjectAsJson(metadataPath)) as MRec;
if (!recordingInfo) {
throw errorRecordingNotFound(recordingId);
}
this.logger.verbose(`Retrieved metadata for recording ${recordingId} from ${metadataPath}`);
return { recordingInfo, metadataFilePath: metadataPath };
} catch (error) {
this.handleError(error, `Error fetching recording metadata for recording ${recordingId}`);
throw error;
}
}
/**
* Retrieves recording metadata from S3 storage by the specified path.
*
* @param recordingPath - The S3 path where the recording metadata is stored
* @returns A promise that resolves to the recording metadata object
* @throws Will throw an error if the S3 object retrieval fails or if the path is invalid
*/
async getRecordingMetadataByPath(recordingPath: string): Promise<MRec> {
try {
return (await this.s3Service.getObjectAsJson(recordingPath)) as MRec;
} catch (error) {
this.handleError(error, `Error fetching recording metadata for path ${recordingPath}`);
throw error;
}
}
async saveRecordingMetadata(recordingInfo: MRec): Promise<MRec> {
try {
const metadataPath = RecordingHelper.buildMetadataFilePath(recordingInfo.recordingId);
await this.s3Service.saveObject(metadataPath, recordingInfo);
return recordingInfo;
} catch (error) {
this.handleError(error, `Error saving recording metadata for recording ${recordingInfo.recordingId}`);
throw error;
}
}
/**
* Deletes multiple recording metadata files from S3 storage based on their file paths.
*
* @param metadataPaths - Array of file paths pointing to the metadata files to be deleted
* @returns A promise that resolves when all metadata files have been successfully deleted
* @throws May throw an error if any of the deletion operations fail
*/
async deleteRecordingMetadataByPaths(metadataPaths: string[]): Promise<void> {
try {
await this.s3Service.deleteObjects(metadataPaths);
this.logger.verbose(`Deleted multiple recording metadata files: ${metadataPaths.join(', ')}`);
} catch (error) {
this.handleError(error, `Error deleting multiple recording metadata files: ${metadataPaths.join(', ')}`);
throw error;
}
}
async getUser(username: string): Promise<MUser | null> {
try {
const userKey = RedisKeyName.USER + username;
const user: MUser | null = await this.getFromRedis<MUser>(userKey);
if (!user) {
this.logger.debug(`User ${username} not found in Redis. Fetching from S3...`);
const s3Path = `${INTERNAL_CONFIG.S3_USERS_PREFIX}/${username}.json`;
return await this.getFromS3<MUser>(s3Path);
}
this.logger.debug(`User ${username} retrieved from Redis`);
return user;
} catch (error) {
this.handleError(error, `Error fetching user ${username}`);
return null;
}
}
/**
* Saves a user object to both S3 storage and Redis cache atomically.
*
* This method attempts to persist the user data in S3 (as a JSON file) and in Redis (as a serialized string).
* If both operations succeed, the user is considered saved and the method returns the user object.
* If either operation fails, the method attempts to roll back any successful operation to maintain consistency.
* In case of failure, the error is logged and rethrown after rollback attempts.
*
* @param user - The user object to be saved.
* @returns A promise that resolves to the saved user object if both operations succeed.
* @throws An error if either the S3 or Redis operation fails, after attempting rollback.
*/
async saveUser(user: MUser): Promise<MUser> {
const userKey = RedisKeyName.USER + user.username;
const redisPayload = JSON.stringify(user);
const s3Path = `${INTERNAL_CONFIG.S3_USERS_PREFIX}/${user.username}.json`;
const [s3Result, redisResult] = await Promise.allSettled([
this.s3Service.saveObject(s3Path, user),
this.redisService.set(userKey, redisPayload, false)
]);
if (s3Result.status === 'fulfilled' && redisResult.status === 'fulfilled') {
this.logger.info(`User ${user.username} saved successfully`);
return user;
}
// Rollback any changes made by the successful operation
if (s3Result.status === 'fulfilled') {
try {
await this.s3Service.deleteObjects([s3Path]);
} catch (rollbackError) {
this.logger.error(`Error rolling back S3 save for user ${user.username}: ${rollbackError}`);
}
}
if (redisResult.status === 'fulfilled') {
try {
await this.redisService.delete(userKey);
} catch (rollbackError) {
this.logger.error(`Error rolling back Redis set for user ${user.username}: ${rollbackError}`);
}
}
// Return the error that occurred first
const failedOperation: PromiseRejectedResult =
s3Result.status === 'rejected' ? s3Result : (redisResult as PromiseRejectedResult);
const error = failedOperation.reason;
this.handleError(error, `Error saving user ${user.username}`);
throw error;
}
/**
* Retrieves an object of type U from Redis by the given key.
* Returns null if the key is not found or an error occurs.
*
* @param key - The Redis key to fetch.
* @returns A promise that resolves to an object of type U or null.
*/
protected async getFromRedis<U>(key: string): Promise<U | null> {
try {
const response = await this.redisService.get(key);
if (response) {
return JSON.parse(response) as U;
}
return null;
} catch (error) {
this.logger.error(`Error fetching from Redis for key ${key}: ${error}`);
return null;
}
}
/**
* Retrieves an object of type U from S3 at the specified path.
* Returns null if the object is not found.
*
* @param path - The S3 key or path to fetch.
* @returns A promise that resolves to an object of type U or null.
*/
protected async getFromS3<U>(path: string): Promise<U | null> {
try {
const response = await this.s3Service.getObjectAsJson(path);
if (response) {
this.logger.verbose(`Object found in S3 at path: ${path}`);
return response as U;
}
return null;
} catch (error) {
this.logger.error(`Error fetching from S3 for path ${path}: ${error}`);
return null;
}
}
/**
* Extracts the room ID from the given S3 file path.
* Assumes the room ID is the directory name immediately preceding the file name.
* Example: 'path/to/roomId/file.json' -> 'roomId'
*
* @param filePath - The S3 object key representing the file path.
* @returns The extracted room ID or null if extraction fails.
*/
protected extractRoomId(filePath?: string): string | null {
if (!filePath) return null;
const parts = filePath.split('/');
const roomId = parts.slice(-2, -1)[0];
if (!roomId) {
this.logger.warn(`Invalid room file path: ${filePath}`);
return null;
}
return roomId;
}
/**
* Performs rollback of saved room data.
*
* @param roomId - The room identifier.
* @param s3Result - The result of the S3 save operation.
* @param redisResult - The result of the Redis set operation.
* @param s3Path - The S3 key used to save the room data.
* @param redisKey - The Redis key used to cache the room data.
*/
protected async rollbackRoomSave(
roomId: string,
s3Result: PromiseSettledResult<PutObjectCommandOutput>,
redisResult: PromiseSettledResult<string>,
s3Path: string,
redisKey: string
): Promise<void> {
if (s3Result.status === 'fulfilled') {
try {
await this.s3Service.deleteObjects([s3Path]);
} catch (rollbackError) {
this.logger.error(`Error rolling back S3 save for room ${roomId}: ${rollbackError}`);
}
}
if (redisResult.status === 'fulfilled') {
try {
await this.redisService.delete(redisKey);
} catch (rollbackError) {
this.logger.error(`Error rolling back Redis set for room ${roomId}: ${rollbackError}`);
}
}
}
protected handleError(error: unknown, message: string) {
if (error instanceof OpenViduMeetError) {
this.logger.error(`${message}: ${error.message}`);
} else {
this.logger.error(`${message}: Unexpected error`);
}
}
}

View File

@ -0,0 +1,41 @@
import INTERNAL_CONFIG from '../../../../config/internal-config.js';
import { RecordingHelper } from '../../../../helpers/recording.helper.js';
import { StorageKeyBuilder } from '../../storage.interface.js';
export class S3KeyBuilder implements StorageKeyBuilder {
buildGlobalPreferencesKey(): string {
return `global-preferences.json`;
}
buildMeetRoomKey(roomId: string): string {
return `${INTERNAL_CONFIG.S3_ROOMS_PREFIX}/${roomId}/${roomId}.json`;
}
buildAllMeetRoomsKey(): string {
return `${INTERNAL_CONFIG.S3_ROOMS_PREFIX}`;
}
buildArchivedMeetRoomKey(roomId: string): string {
return `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.room_metadata/${roomId}/room_metadata.json`;
}
buildMeetRecordingKey(recordingId: string): string {
const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
return `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`;
}
buildBinaryRecordingKey(recordingId: string): string {
const { roomId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
return `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/${roomId}/${roomId}--${uid}.mp4`;
}
buildAllMeetRecordingsKey(roomId?: string): string {
const roomSegment = roomId ? `/${roomId}` : '';
return `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/.metadata${roomSegment}`;
}
buildUserKey(userId: string): string {
return `${INTERNAL_CONFIG.S3_USERS_PREFIX}/${userId}.json`;
}
}

View File

@ -0,0 +1,140 @@
import { inject, injectable } from 'inversify';
import { Readable } from 'stream';
import { LoggerService, S3Service } from '../../../index.js';
import { StorageProvider } from '../../storage.interface.js';
/**
* Basic S3 storage provider that implements only primitive storage operations.
*/
@injectable()
export class S3StorageProvider implements StorageProvider {
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(S3Service) protected s3Service: S3Service
) {}
/**
* Retrieves an object from S3 as a JSON object.
*/
async getObject<T = Record<string, unknown>>(key: string): Promise<T | null> {
try {
this.logger.debug(`Getting object from S3: ${key}`);
const result = await this.s3Service.getObjectAsJson(key);
return result as T;
} catch (error) {
this.logger.debug(`Object not found in S3: ${key}`);
return null;
}
}
/**
* Stores an object in S3 as JSON.
*/
async putObject<T = Record<string, unknown>>(key: string, data: T): Promise<void> {
try {
this.logger.debug(`Storing object in S3: ${key}`);
await this.s3Service.saveObject(key, data as Record<string, unknown>);
this.logger.verbose(`Successfully stored object in S3: ${key}`);
} catch (error) {
this.logger.error(`Error storing object in S3 ${key}: ${error}`);
throw error;
}
}
/**
* Deletes a single object from S3.
*/
async deleteObject(key: string): Promise<void> {
try {
this.logger.debug(`Deleting object from S3: ${key}`);
await this.s3Service.deleteObjects([key]);
this.logger.verbose(`Successfully deleted object from S3: ${key}`);
} catch (error) {
this.logger.error(`Error deleting object from S3 ${key}: ${error}`);
throw error;
}
}
/**
* Deletes multiple objects from S3.
*/
async deleteObjects(keys: string[]): Promise<void> {
try {
this.logger.debug(`Deleting ${keys.length} objects from S3`);
await this.s3Service.deleteObjects(keys);
this.logger.verbose(`Successfully deleted ${keys.length} objects from S3`);
} catch (error) {
this.logger.error(`Error deleting objects from S3: ${error}`);
throw error;
}
}
/**
* Checks if an object exists in S3.
*/
async exists(key: string): Promise<boolean> {
try {
this.logger.debug(`Checking if object exists in S3: ${key}`);
return await this.s3Service.exists(key);
} catch (error) {
this.logger.debug(`Error checking object existence in S3 ${key}: ${error}`);
return false;
}
}
/**
* Lists objects in S3 with a given prefix.
*/
async listObjects(
prefix: string,
maxItems?: number,
continuationToken?: string
): Promise<{
Contents?: Array<{
Key?: string;
LastModified?: Date;
Size?: number;
ETag?: string;
}>;
IsTruncated?: boolean;
NextContinuationToken?: string;
}> {
try {
this.logger.debug(`Listing objects in S3 with prefix: ${prefix}`);
return await this.s3Service.listObjectsPaginated(prefix, maxItems, continuationToken);
} catch (error) {
this.logger.error(`Error listing objects in S3 with prefix ${prefix}: ${error}`);
throw error;
}
}
/**
* Retrieves metadata headers for an object in S3.
*/
async getObjectHeaders(key: string): Promise<{ contentLength?: number; contentType?: string }> {
try {
this.logger.debug(`Getting object headers from S3: ${key}`);
const data = await this.s3Service.getHeaderObject(key);
return {
contentLength: data.ContentLength,
contentType: data.ContentType
};
} catch (error) {
this.logger.error(`Error fetching object headers from S3 ${key}: ${error}`);
throw error;
}
}
/**
* Retrieves an object from S3 as a readable stream.
*/
async getObjectAsStream(key: string, range?: { start: number; end: number }): Promise<Readable> {
try {
this.logger.debug(`Getting object stream from S3: ${key}`);
return await this.s3Service.getObjectAsStream(key, range);
} catch (error) {
this.logger.error(`Error fetching object stream from S3 ${key}: ${error}`);
throw error;
}
}
}

View File

@ -22,10 +22,10 @@ import {
MEET_S3_SERVICE_ENDPOINT, MEET_S3_SERVICE_ENDPOINT,
MEET_S3_SUBBUCKET, MEET_S3_SUBBUCKET,
MEET_S3_WITH_PATH_STYLE_ACCESS MEET_S3_WITH_PATH_STYLE_ACCESS
} from '../environment.js'; } from '../../../../environment.js';
import { errorS3NotAvailable, internalError } from '../models/error.model.js'; import { errorS3NotAvailable, internalError } from '../../../../models/error.model.js';
import { LoggerService } from './index.js'; import { LoggerService } from '../../../index.js';
import INTERNAL_CONFIG from '../config/internal-config.js'; import INTERNAL_CONFIG from '../../../../config/internal-config.js';
@injectable() @injectable()
export class S3Service { export class S3Service {
@ -64,7 +64,11 @@ export class S3Service {
* Saves an object to a S3 bucket. * Saves an object to a S3 bucket.
* Uses an internal retry mechanism in case of errors. * Uses an internal retry mechanism in case of errors.
*/ */
async saveObject(name: string, body: any, bucket: string = MEET_S3_BUCKET): Promise<PutObjectCommandOutput> { async saveObject(
name: string,
body: Record<string, unknown>,
bucket: string = MEET_S3_BUCKET
): Promise<PutObjectCommandOutput> {
const fullKey = this.getFullKey(name); const fullKey = this.getFullKey(name);
try { try {
@ -76,10 +80,10 @@ export class S3Service {
const result = await this.retryOperation<PutObjectCommandOutput>(() => this.run(command)); const result = await this.retryOperation<PutObjectCommandOutput>(() => this.run(command));
this.logger.verbose(`S3: successfully saved object '${fullKey}' in bucket '${bucket}'`); this.logger.verbose(`S3: successfully saved object '${fullKey}' in bucket '${bucket}'`);
return result; return result;
} catch (error: any) { } catch (error: unknown) {
this.logger.error(`S3: error saving object '${fullKey}' in bucket '${bucket}': ${error}`); this.logger.error(`S3: error saving object '${fullKey}' in bucket '${bucket}': ${error}`);
if (error.code === 'ECONNREFUSED') { if (error && typeof error === 'object' && 'code' in error && error.code === 'ECONNREFUSED') {
throw errorS3NotAvailable(error); throw errorS3NotAvailable(error);
} }

View File

@ -1,30 +1,33 @@
import { inject, injectable } from 'inversify'; import { inject, injectable } from 'inversify';
import { MEET_PREFERENCES_STORAGE_MODE } from '../../environment.js'; import { LoggerService } from '../index.js';
import { LoggerService, S3StorageProvider, StorageProvider } from '../index.js'; import { StorageKeyBuilder, StorageProvider } from './storage.interface.js';
import { container, STORAGE_TYPES } from '../../config/dependency-injector.config.js';
/** /**
* Factory class responsible for creating the appropriate storage provider based on configuration. * Factory class responsible for creating the appropriate basic storage provider
* based on configuration.
* *
* This factory determines which storage implementation to use based on the `MEET_PREFERENCES_STORAGE_MODE` * This factory determines which basic storage implementation to use based on the
* environment variable. Currently supports S3 storage, with more providers potentially added in the future. * `MEET_PREFERENCES_STORAGE_MODE` environment variable. It creates providers that
* handle only basic CRUD operations, following the Single Responsibility Principle.
*
* Domain-specific logic should be handled in the MeetStorageService layer.
*/ */
@injectable() @injectable()
export class StorageFactory { export class StorageFactory {
constructor( constructor(@inject(LoggerService) protected logger: LoggerService) {}
@inject(S3StorageProvider) protected s3StorageProvider: S3StorageProvider,
@inject(LoggerService) protected logger: LoggerService
) {}
create(): StorageProvider { /**
const storageMode = MEET_PREFERENCES_STORAGE_MODE; * Creates a basic storage provider based on the configured storage mode.
*
switch (storageMode) { * @returns StorageProvider instance configured for the specified storage backend
case 's3': */
return this.s3StorageProvider; create(): { provider: StorageProvider; keyBuilder: StorageKeyBuilder } {
// The actual binding is handled in the DI configuration
default: // This factory just returns the pre-configured instances
this.logger.info('No preferences storage mode specified. Defaulting to S3.'); return {
return this.s3StorageProvider; provider: container.get<StorageProvider>(STORAGE_TYPES.StorageProvider),
} keyBuilder: container.get<StorageKeyBuilder>(STORAGE_TYPES.KeyBuilder)
};
} }
} }

View File

@ -1,53 +1,67 @@
import { GlobalPreferences, MeetRecordingInfo, MeetRoom, User } from '@typings-ce';
import { Readable } from 'stream'; import { Readable } from 'stream';
/** /**
* An interface that defines the contract for storage providers in the OpenVidu Meet application. * Basic storage interface that defines primitive storage operations.
* Storage providers handle persistence of global application preferences, rooms, recordings metadata and users. * This interface follows the Single Responsibility Principle by focusing
* only on basic CRUD operations for object storage.
* *
* @template GPrefs - The type of global preferences, extending GlobalPreferences * This allows easy integration of different storage backends (S3, PostgreSQL,
* @template MRoom - The type of room data, extending MeetRoom * FileSystem, etc.) without mixing domain-specific business logic.
* @template MRec - The type of recording metadata, extending MeetRecordingInfo
* @template MUser - The type of user data, extending User
*
* Implementations of this interface should handle the persistent storage
* of application settings, room information, recording metadata, and user data,
* which could be backed by various storage solutions (database, file system, cloud storage, etc.).
*/ */
export interface StorageProvider< export interface StorageProvider {
GPrefs extends GlobalPreferences = GlobalPreferences,
MRoom extends MeetRoom = MeetRoom,
MRec extends MeetRecordingInfo = MeetRecordingInfo,
MUser extends User = User
> {
/** /**
* Initializes the storage with default preferences if they are not already set. * Retrieves an object from storage as a JSON object.
* *
* @param defaultPreferences - The default preferences to initialize with. * @param key - The storage key/path of the object
* @returns A promise that resolves when the initialization is complete. * @returns A promise that resolves to the parsed JSON object, or null if not found
*/ */
initialize(defaultPreferences: GPrefs): Promise<void>; getObject<T = Record<string, unknown>>(key: string): Promise<T | null>;
/** /**
* Retrives the headers of an object stored in the storage provider. * Stores an object in storage as JSON.
* This is useful to get the content length and content type of the object without downloading it.
* *
* @param filePath - The path of the file to retrieve headers for. * @param key - The storage key/path where the object should be stored
* @param data - The object to store (will be serialized to JSON)
* @returns A promise that resolves when the object is successfully stored
*/ */
getObjectHeaders(filePath: string): Promise<{ contentLength?: number; contentType?: string }>; putObject<T = Record<string, unknown>>(key: string, data: T): Promise<void>;
/** /**
* Lists objects in the storage with optional pagination support. * Deletes a single object from storage.
* *
* @param prefix - The prefix to filter objects by (acts as a folder path) * @param key - The storage key/path of the object to delete
* @returns A promise that resolves when the object is successfully deleted
*/
deleteObject(key: string): Promise<void>;
/**
* Deletes multiple objects from storage.
*
* @param keys - Array of storage keys/paths of the objects to delete
* @returns A promise that resolves when all objects are successfully deleted
*/
deleteObjects(keys: string[]): Promise<void>;
/**
* Checks if an object exists in storage.
*
* @param key - The storage key/path to check
* @returns A promise that resolves to true if the object exists, false otherwise
*/
exists(key: string): Promise<boolean>;
/**
* Lists objects in storage with a given prefix (acts like a folder).
*
* @param prefix - The prefix to filter objects by
* @param maxItems - Maximum number of items to return (optional) * @param maxItems - Maximum number of items to return (optional)
* @param nextPageToken - Token for pagination to get the next page (optional) * @param continuationToken - Token for pagination (optional)
* @returns Promise resolving to paginated list of objects with metadata * @returns A promise that resolves to a paginated list of objects
*/ */
listObjects( listObjects(
prefix: string, prefix: string,
maxItems?: number, maxItems?: number,
nextPageToken?: string continuationToken?: string
): Promise<{ ): Promise<{
Contents?: Array<{ Contents?: Array<{
Key?: string; Key?: string;
@ -60,165 +74,80 @@ export interface StorageProvider<
}>; }>;
/** /**
* Retrieves the global preferences of Openvidu Meet. * Retrieves metadata headers for an object without downloading the content.
* *
* @returns A promise that resolves to the global preferences, or null if not set. * @param key - The storage key/path of the object
* @returns A promise that resolves to object metadata
*/ */
getGlobalPreferences(): Promise<GPrefs | null>; getObjectHeaders(key: string): Promise<{
contentLength?: number;
/** contentType?: string;
* Saves the given preferences.
*
* @param preferences - The preferences to save.
* @returns A promise that resolves to the saved preferences.
*/
saveGlobalPreferences(preferences: GPrefs): Promise<GPrefs>;
/**
*
* Retrieves the OpenVidu Meet Rooms.
*
* @param maxItems - The maximum number of items to retrieve. If not provided, all items will be retrieved.
* @param nextPageToken - The token for the next page of results. If not provided, the first page will be retrieved.
* @returns A promise that resolves to an object containing:
* - the retrieved rooms.
* - a boolean indicating if there are more items to retrieve.
* - an optional next page token.
*/
getMeetRooms(
maxItems?: number,
nextPageToken?: string
): Promise<{
rooms: MRoom[];
isTruncated: boolean;
nextPageToken?: string;
}>; }>;
/** /**
* Retrieves the {@link MeetRoom}. * Retrieves an object as a readable stream.
* Useful for large files or when you need streaming access.
* *
* @param roomId - The identifier of the room to retrieve. * @param key - The storage key/path of the object
* @returns A promise that resolves to the OpenVidu Room, or null if not found. * @param range - Optional byte range for partial content retrieval
**/ * @returns A promise that resolves to a readable stream of the object content
getMeetRoom(roomId: string): Promise<MRoom | null>;
/**
* Saves the OpenVidu Meet Room.
*
* @param meetRoom - The OpenVidu Room to save.
* @returns A promise that resolves to the saved OpenVidu Room.
**/
saveMeetRoom(meetRoom: MRoom): Promise<MRoom>;
/**
* Deletes OpenVidu Meet Rooms.
*
* @param roomIds - The room IDs to delete.
* @returns A promise that resolves when the room have been deleted.
**/
deleteMeetRooms(roomIds: string[]): Promise<void>;
/**
* Gets the archived metadata for a specific room.
*
* The archived metadata is necessary for checking the permissions of the recording viewer when the room is deleted.
*
* @param roomId - The name of the room to retrieve.
*/ */
getArchivedRoomMetadata(roomId: string): Promise<Partial<MRoom> | null>; getObjectAsStream(key: string, range?: { start: number; end: number }): Promise<Readable>;
}
/**
* Archives the metadata for a specific room. /**
* * Interface for building storage keys used throughout the application.
* This is necessary for persisting the metadata of a room although it is deleted. * Provides methods to generate standardized keys for different types of data storage operations.
* The metadata will be used to check the permissions of the recording viewer. */
* export interface StorageKeyBuilder {
* @param roomId: The room ID to archive. /**
*/ * Builds the key for global preferences storage.
archiveRoomMetadata(roomId: string): Promise<void>; */
buildGlobalPreferencesKey(): string;
/** /**
* Updates the archived metadata for a specific room. * Builds the key for a specific room.
* *
* This is necessary for keeping the metadata of a room up to date. * @param roomId - The unique identifier of the meeting room
* */
* @param roomId: The room ID to update. buildMeetRoomKey(roomId: string): string;
*/
updateArchivedRoomMetadata(roomId: string): Promise<void>; /**
* Builds the key for all meeting rooms.
/** */
* Deletes the archived metadata for a specific room. buildAllMeetRoomsKey(): string;
*
* @param roomId - The room ID to delete the archived metadata for. /**
*/ * Builds the key for archived room metadata.
deleteArchivedRoomMetadata(roomId: string): Promise<void>; *
* @param roomId - The unique identifier of the meeting room
/** */
* Saves the recording metadata. buildArchivedMeetRoomKey(roomId: string): string;
*
* @param recordingInfo - The recording information to save. /**
* @returns A promise that resolves to the saved recording information. * Builds the key for a specific recording.
*/ *
saveRecordingMetadata(recordingInfo: MRec): Promise<MRec>; * @param recordingId - The unique identifier of the recording
*/
/** buildBinaryRecordingKey(recordingId: string): string;
* Retrieves the recording metadata for a specific recording ID.
* /**
* @param recordingId - The unique identifier of the recording. * Builds the key for a specific recording metadata.
* @returns A promise that resolves to the recording metadata, or null if not found. *
*/ * @param recordingId - The unique identifier of the recording
getRecordingMetadata(recordingId: string): Promise<{ recordingInfo: MRec; metadataFilePath: string }>; */
buildMeetRecordingKey(recordingId: string): string;
/**
* Retrieves the recording metadata for multiple recording IDs. /**
* * Builds the key for all recordings in a room or globally.
* @param recordingPath - The path of the recording file to retrieve metadata for. *
* @returns A promise that resolves to the recording metadata, or null if not found. * @param roomId - Optional room identifier to filter recordings by room
*/ */
getRecordingMetadataByPath(recordingPath: string): Promise<MRec | undefined>; buildAllMeetRecordingsKey(roomId?: string): string;
/** /**
* Deletes multiple recording metadata files by their paths. * Builds the key for a specific user
* *
* @param metadataPaths - An array of metadata file paths to delete. * @param userId - The unique identifier of the user
*/ */
deleteRecordingMetadataByPaths(metadataPaths: string[]): Promise<void>; buildUserKey(userId: string): string;
/**
* Retrieves the media content of a recording file.
*
* @param recordingPath - The path of the recording file to retrieve.
* @param range - An optional range object specifying the start and end byte positions to retrieve.
*/
getRecordingMedia(
recordingPath: string,
range?: {
end: number;
start: number;
}
): Promise<Readable>;
/**
* Deletes multiple recording binary files by their paths.
*
* @param recordingPaths - An array of recording file paths to delete.
* @returns A promise that resolves when the recording binary files have been deleted.
*/
deleteRecordingBinaryFilesByPaths(recordingPaths: string[]): Promise<void>;
/**
* Retrieves the user data for a specific username.
*
* @param username - The username of the user to retrieve.
* @returns A promise that resolves to the user data, or null if not found.
*/
getUser(username: string): Promise<MUser | null>;
/**
* Saves the user data.
*
* @param user - The user data to save.
* @returns A promise that resolves to the saved user data.
*/
saveUser(user: MUser): Promise<MUser>;
} }

View File

@ -10,17 +10,31 @@ import {
MEET_WEBHOOK_URL MEET_WEBHOOK_URL
} from '../../environment.js'; } from '../../environment.js';
import { MeetLock, PasswordHelper } from '../../helpers/index.js'; import { MeetLock, PasswordHelper } from '../../helpers/index.js';
import { errorRoomNotFound, internalError, OpenViduMeetError } from '../../models/error.model.js'; import {
import { LoggerService, MutexService, StorageFactory, StorageProvider } from '../index.js'; errorRecordingNotFound,
errorRecordingRangeNotSatisfiable,
errorRoomNotFound,
internalError,
OpenViduMeetError,
RedisKeyName
} from '../../models/index.js';
import { LoggerService, MutexService, RedisService } from '../index.js';
import { StorageFactory } from './storage.factory.js';
import { StorageKeyBuilder, StorageProvider } from './storage.interface.js';
/** /**
* A service for managing storage operations related to OpenVidu Meet rooms and preferences. * Domain-specific storage service for OpenVidu Meet.
* *
* This service provides an abstraction layer over the underlying storage implementation, * This service handles all domain-specific logic for rooms, recordings, and preferences,
* handling initialization, retrieval, and persistence of global preferences and room data. * while delegating basic storage operations to the StorageProvider.
*
* This architecture follows the Single Responsibility Principle:
* - StorageProvider: Handles only basic CRUD operations
* - MeetStorageService: Handles domain-specific business logic
* *
* @template GPrefs - Type for global preferences, extends GlobalPreferences * @template GPrefs - Type for global preferences, extends GlobalPreferences
* @template MRoom - Type for room data, extends MeetRoom * @template MRoom - Type for room data, extends MeetRoom
* @template MRec - Type for recording data, extends MeetRecordingInfo
*/ */
@injectable() @injectable()
export class MeetStorageService< export class MeetStorageService<
@ -30,56 +44,28 @@ export class MeetStorageService<
MUser extends User = User MUser extends User = User
> { > {
protected storageProvider: StorageProvider; protected storageProvider: StorageProvider;
protected keyBuilder: StorageKeyBuilder;
constructor( constructor(
@inject(LoggerService) protected logger: LoggerService, @inject(LoggerService) protected logger: LoggerService,
@inject(StorageFactory) protected storageFactory: StorageFactory, @inject(StorageFactory) protected storageFactory: StorageFactory,
@inject(MutexService) protected mutexService: MutexService @inject(MutexService) protected mutexService: MutexService,
@inject(RedisService) protected redisService: RedisService
) { ) {
this.storageProvider = this.storageFactory.create(); const { provider, keyBuilder } = this.storageFactory.create();
this.storageProvider = provider;
this.keyBuilder = keyBuilder;
} }
async getObjectHeaders(filePath: string): Promise<{ contentLength?: number; contentType?: string }> { // ==========================================
try { // GLOBAL PREFERENCES DOMAIN LOGIC
const headers = await this.storageProvider.getObjectHeaders(filePath); // ==========================================
this.logger.verbose(`Object headers retrieved: ${JSON.stringify(headers)}`);
return headers;
} catch (error) {
this.handleError(error, 'Error retrieving object headers');
throw internalError('Getting object headers');
}
}
/** /**
* Lists objects in the storage with optional pagination support. * Initializes default preferences if not already initialized.
*
* @param prefix - The prefix to filter objects by (acts as a folder path)
* @param maxItems - Maximum number of items to return (optional)
* @param nextPageToken - Token for pagination to get the next page (optional)
* @returns Promise resolving to paginated list of objects with metadata
*/
listObjects(
prefix: string,
maxItems?: number,
nextPageToken?: string
): Promise<{
Contents?: Array<{
Key?: string;
LastModified?: Date;
Size?: number;
ETag?: string;
}>;
IsTruncated?: boolean;
NextContinuationToken?: string;
}> {
return this.storageProvider.listObjects(prefix, maxItems, nextPageToken);
}
/**
* Initializes default preferences if not already initialized and saves the admin user.
* @returns {Promise<GPrefs>} Default global preferences. * @returns {Promise<GPrefs>} Default global preferences.
*/ */
async initialize(): Promise<void> { async initializeGlobalPreferences(): Promise<void> {
try { try {
// Acquire a global lock to prevent multiple initializations at the same time when running in HA mode // Acquire a global lock to prevent multiple initializations at the same time when running in HA mode
const lock = await this.mutexService.acquire(MeetLock.getGlobalPreferencesLock(), ms('30s')); const lock = await this.mutexService.acquire(MeetLock.getGlobalPreferencesLock(), ms('30s'));
@ -92,8 +78,26 @@ export class MeetStorageService<
} }
this.logger.verbose('Initializing global preferences with default values'); this.logger.verbose('Initializing global preferences with default values');
const preferences = await this.getDefaultPreferences(); const redisKey = RedisKeyName.GLOBAL_PREFERENCES;
await this.storageProvider.initialize(preferences); const storageKey = this.keyBuilder.buildGlobalPreferencesKey();
const preferences = this.buildDefaultPreferences();
this.logger.verbose('Initializing global preferences with default values');
const existing = await this.getFromCacheAndStorage<GPrefs>(redisKey, storageKey);
if (!existing) {
await this.saveCacheAndStorage(redisKey, storageKey, preferences);
this.logger.info('Global preferences initialized with default values');
} else {
// Check if it's from a different project
const existingProjectId = (existing as GlobalPreferences)?.projectId;
const newProjectId = (preferences as GlobalPreferences)?.projectId;
if (existingProjectId !== newProjectId) {
this.logger.info('Different project detected, overwriting global preferences');
await this.saveCacheAndStorage(redisKey, storageKey, preferences);
}
}
// Save the default admin user // Save the default admin user
const admin = { const admin = {
@ -104,59 +108,59 @@ export class MeetStorageService<
await this.saveUser(admin); await this.saveUser(admin);
} catch (error) { } catch (error) {
this.handleError(error, 'Error initializing default preferences'); this.handleError(error, 'Error initializing default preferences');
throw internalError('Failed to initialize global preferences');
} }
} }
/**
* Retrieves the global preferences, initializing them if necessary.
* @returns {Promise<GPrefs>}
*/
async getGlobalPreferences(): Promise<GPrefs> { async getGlobalPreferences(): Promise<GPrefs> {
let preferences = await this.storageProvider.getGlobalPreferences(); const redisKey = RedisKeyName.GLOBAL_PREFERENCES;
const storageKey = this.keyBuilder.buildGlobalPreferencesKey();
if (preferences) return preferences as GPrefs; const preferences = await this.getFromCacheAndStorage<GPrefs>(redisKey, storageKey);
await this.initialize(); if (preferences) return preferences;
preferences = await this.storageProvider.getGlobalPreferences();
if (!preferences) { // Build and save default preferences if not found in cache or storage
this.logger.error('Global preferences not found after initialization'); await this.initializeGlobalPreferences();
throw internalError('getting global preferences');
}
return preferences as GPrefs; return this.buildDefaultPreferences();
} }
/** /**
* Saves the global preferences to the storage provider. * Saves global preferences to the storage provider.
* @param {GPrefs} preferences * @param {GPrefs} preferences - The global preferences to save.
* @returns {Promise<GPrefs>} * @returns {Promise<GPrefs>} The saved global preferences.
*/ */
async saveGlobalPreferences(preferences: GPrefs): Promise<GPrefs> { async saveGlobalPreferences(preferences: GPrefs): Promise<GPrefs> {
this.logger.info('Saving global preferences'); this.logger.info('Saving global preferences');
return this.storageProvider.saveGlobalPreferences(preferences) as Promise<GPrefs>; const redisKey = RedisKeyName.GLOBAL_PREFERENCES;
const storageKey = this.keyBuilder.buildGlobalPreferencesKey();
return await this.saveCacheAndStorage<GPrefs>(redisKey, storageKey, preferences);
} }
/** // ==========================================
* Saves the meet room to the storage provider. // ROOM DOMAIN LOGIC
* // ==========================================
* @param meetRoom - The room object to be saved
* @returns A promise that resolves to the saved room object
*/
async saveMeetRoom(meetRoom: MRoom): Promise<MRoom> { async saveMeetRoom(meetRoom: MRoom): Promise<MRoom> {
this.logger.info(`Saving OpenVidu room ${meetRoom.roomId}`); const { roomId } = meetRoom;
return this.storageProvider.saveMeetRoom(meetRoom) as Promise<MRoom>; this.logger.info(`Saving OpenVidu room ${roomId}`);
const redisKey = RedisKeyName.ROOM + roomId;
const storageKey = this.keyBuilder.buildMeetRoomKey(roomId);
return await this.saveCacheAndStorage<MRoom>(redisKey, storageKey, meetRoom);
} }
/** /**
* Retrieves a paginated list of rooms from the storage provider. * Retrieves a paginated list of meeting rooms from storage.
* *
* @param maxItems - Optional maximum number of rooms to retrieve in a single request * @param maxItems - Optional maximum number of rooms to retrieve per page
* @param nextPageToken - Optional token for pagination to get the next page of results * @param nextPageToken - Optional token for pagination to get the next set of results
* @returns A promise that resolves to an object containing: * @returns Promise that resolves to an object containing:
* - rooms: Array of MRoom objects representing the rooms * - rooms: Array of MRoom objects retrieved from storage
* - isTruncated: Boolean indicating if there are more results available * - isTruncated: Boolean indicating if there are more results available
* - nextPageToken: Optional token for retrieving the next page of results * - nextPageToken: Optional token for retrieving the next page of results
* @throws Error if the storage operation fails or encounters an unexpected error
*/ */
async getMeetRooms( async getMeetRooms(
maxItems?: number, maxItems?: number,
@ -166,159 +170,313 @@ export class MeetStorageService<
isTruncated: boolean; isTruncated: boolean;
nextPageToken?: string; nextPageToken?: string;
}> { }> {
return this.storageProvider.getMeetRooms(maxItems, nextPageToken) as Promise<{ try {
rooms: MRoom[]; const allRoomsKey = this.keyBuilder.buildAllMeetRoomsKey();
isTruncated: boolean; const { Contents, IsTruncated, NextContinuationToken } = await this.storageProvider.listObjects(
nextPageToken?: string; allRoomsKey,
}>; maxItems,
nextPageToken
);
const rooms: MRoom[] = [];
if (Contents && Contents.length > 0) {
const roomPromises = Contents.map(async (item) => {
if (item.Key && item.Key.endsWith('.json')) {
try {
const room = await this.storageProvider.getObject<MRoom>(item.Key);
return room;
} catch (error) {
this.logger.warn(`Failed to load room from ${item.Key}: ${error}`);
return null;
}
}
return null;
});
const roomResults = await Promise.all(roomPromises);
rooms.push(...roomResults.filter((room): room is Awaited<MRoom> => room !== null));
}
return {
rooms,
isTruncated: IsTruncated || false,
nextPageToken: NextContinuationToken
};
} catch (error) {
this.handleError(error, 'Error retrieving rooms');
throw error;
}
}
async getMeetRoom(roomId: string): Promise<MRoom | null> {
const redisKey = RedisKeyName.ROOM + roomId;
const storageKey = this.keyBuilder.buildMeetRoomKey(roomId);
return await this.getFromCacheAndStorage<MRoom>(redisKey, storageKey);
}
async deleteMeetRooms(roomIds: string[]): Promise<void> {
const roomKeys = roomIds.map((roomId) => this.keyBuilder.buildMeetRoomKey(roomId));
const redisKeys = roomIds.map((roomId) => RedisKeyName.ROOM + roomId);
await this.deleteFromCacheAndStorageBatch(redisKeys, roomKeys);
}
// ==========================================
// ARCHIVED ROOM METADATA DOMAIN LOGIC
// ==========================================
async getArchivedRoomMetadata(roomId: string): Promise<Partial<MRoom> | null> {
const redisKey = RedisKeyName.ARCHIVED_ROOM + roomId;
const storageKey = this.keyBuilder.buildArchivedMeetRoomKey(roomId);
return await this.getFromCacheAndStorage<Partial<MRoom>>(redisKey, storageKey);
} }
/** /**
* Retrieves the room by its unique identifier. * Archives room metadata by storing essential room information in both cache and persistent storage.
* *
* @param roomId - The unique identifier for the room. * This method retrieves the room data, extracts key metadata (moderator/publisher URLs and
* @returns A promise that resolves to the room's preferences. * recording preferences), and saves it to an archived location for future reference.
* @throws Error if the room preferences are not found. *
* If an archived metadata for the room already exists, it will be overwritten.
*
* @param roomId - The unique identifier of the room to archive
* @throws {Error} When the room with the specified ID is not found
* @returns A promise that resolves when the archiving operation completes successfully
*/ */
async getMeetRoom(roomId: string): Promise<MRoom> { async archiveRoomMetadata(roomId: string): Promise<void> {
const meetRoom = await this.storageProvider.getMeetRoom(roomId); const redisKey = RedisKeyName.ARCHIVED_ROOM + roomId;
const storageKey = this.keyBuilder.buildArchivedMeetRoomKey(roomId);
if (!meetRoom) { const room = await this.getMeetRoom(roomId);
this.logger.error(`Room not found for room ${roomId}`);
if (!room) {
this.logger.warn(`Room ${roomId} not found, cannot archive metadata`);
throw errorRoomNotFound(roomId); throw errorRoomNotFound(roomId);
} }
return meetRoom as MRoom; const archivedRoom: Partial<MRoom> = {
moderatorRoomUrl: room.moderatorRoomUrl,
publisherRoomUrl: room.publisherRoomUrl,
preferences: {
recordingPreferences: room.preferences?.recordingPreferences
}
} as Partial<MRoom>;
await this.saveCacheAndStorage<Partial<MRoom>>(redisKey, storageKey, archivedRoom);
} }
/**
* Deletes multiple rooms from storage.
*
* @param roomIds - Array of room identifiers to be deleted
* @returns A promise that resolves when all rooms have been successfully deleted
* @throws May throw an error if the deletion operation fails for any of the rooms
*/
async deleteMeetRooms(roomIds: string[]): Promise<void> {
return this.storageProvider.deleteMeetRooms(roomIds);
}
/**
* Retrieves metadata for an archived room by its ID.
*
* @param roomId - The unique identifier of the room to retrieve metadata for
* @returns A promise that resolves to partial room metadata if found, or null if not found
*/
async getArchivedRoomMetadata(roomId: string): Promise<Partial<MRoom> | null> {
return this.storageProvider.getArchivedRoomMetadata(roomId) as Promise<Partial<MRoom> | null>;
}
/**
* Archives the metadata for a specific room.
*
* @param roomId - The unique identifier of the room whose metadata should be archived
* @returns A Promise that resolves when the archival operation is complete
* @throws May throw an error if the archival operation fails or if the room ID is invalid
*/
async archiveRoomMetadata(roomId: string): Promise<void> {
return this.storageProvider.archiveRoomMetadata(roomId);
}
/**
* Updates the metadata of an archived room.
*
* @param roomId - The unique identifier of the room whose archived metadata should be updated
* @returns A promise that resolves when the archived room metadata has been successfully updated
* @throws May throw an error if the room ID is invalid or if the storage operation fails
*/
async updateArchivedRoomMetadata(roomId: string): Promise<void> {
return this.storageProvider.updateArchivedRoomMetadata(roomId);
}
/**
* Deletes the archived metadata for a specific room.
*
* @param roomId - The unique identifier of the room whose archived metadata should be deleted
* @returns A promise that resolves when the archived room metadata has been successfully deleted
* @throws May throw an error if the deletion operation fails or if the room ID is invalid
*/
async deleteArchivedRoomMetadata(roomId: string): Promise<void> { async deleteArchivedRoomMetadata(roomId: string): Promise<void> {
return this.storageProvider.deleteArchivedRoomMetadata(roomId); const redisKey = RedisKeyName.ARCHIVED_ROOM + roomId;
const storageKey = this.keyBuilder.buildArchivedMeetRoomKey(roomId);
await this.deleteFromCacheAndStorage(redisKey, storageKey);
this.logger.verbose(`Archived room metadata deleted for room ${roomId} in recordings bucket`);
} }
/** // ==========================================
* Saves recording metadata to the storage provider. // RECORDING DOMAIN LOGIC
* // ==========================================
* @param recordingInfo - The recording metadata object to be saved
* @returns A promise that resolves to the saved recording metadata object
*/
async saveRecordingMetadata(recordingInfo: MRec): Promise<MRec> { async saveRecordingMetadata(recordingInfo: MRec): Promise<MRec> {
return this.storageProvider.saveRecordingMetadata(recordingInfo) as Promise<MRec>; const redisKey = RedisKeyName.RECORDING + recordingInfo.recordingId;
const storageKey = this.keyBuilder.buildMeetRecordingKey(recordingInfo.recordingId);
return await this.saveCacheAndStorage<MRec>(redisKey, storageKey, recordingInfo);
} }
/** /**
* Retrieves the metadata for a specific recording. * Retrieves all recordings from storage, optionally filtered by room ID.
* *
* @param recordingId - The unique identifier of the recording * @param roomId - Optional room identifier to filter recordings. If not provided, retrieves all recordings.
* @returns A promise that resolves to an object containing the recording information and metadata file path * @param maxItems - Optional maximum number of items to return per page for pagination.
* @throws May throw an error if the recording is not found or if there's an issue accessing the storage provider * @param nextPageToken - Optional token for pagination to retrieve the next page of results.
*
* @returns A promise that resolves to an object containing:
* - `recordings`: Array of recording metadata objects (MRec)
* - `isTruncated`: Optional boolean indicating if there are more results available
* - `nextContinuationToken`: Optional token to retrieve the next page of results
*
* @throws Will throw an error if storage retrieval fails or if there's an issue processing the recordings
*
* @remarks
* This method handles pagination and filters out any recordings that fail to load.
* Failed recordings are logged as warnings but don't cause the entire operation to fail.
* The method logs debug information about the retrieval process and summary statistics.
*/ */
async getAllRecordings(
roomId?: string,
maxItems?: number,
nextPageToken?: string
): Promise<{ recordings: MRec[]; isTruncated?: boolean; nextContinuationToken?: string }> {
try {
const searchKey = this.keyBuilder.buildAllMeetRecordingsKey(roomId);
const scope = roomId ? ` for room ${roomId}` : '';
this.logger.debug(`Retrieving recordings${scope} with key: ${searchKey}`);
const { Contents, IsTruncated, NextContinuationToken } = await this.storageProvider.listObjects(
searchKey,
maxItems,
nextPageToken
);
if (!Contents || Contents.length === 0) {
this.logger.verbose(`No recordings found${scope}`);
return { recordings: [], isTruncated: false };
}
const metadataFiles = Contents; //Contents.filter((item) => item.Key && item.Key.endsWith('.json'));
const recordingPromises = metadataFiles.map(async (item) => {
try {
const recording = await this.storageProvider.getObject<MRec>(item.Key!);
return recording;
} catch (error) {
this.logger.warn(`Failed to load recording metadata from ${item.Key}: ${error}`);
return null; // Return null for failed loads, filter out later
}
});
// Wait for all recordings to load and filter out failures
const recordingResults = await Promise.all(recordingPromises);
const validRecordings = recordingResults.filter(
(recording): recording is Awaited<MRec> => recording !== null && recording !== undefined
);
// Log results summary
const failedCount = recordingResults.length - validRecordings.length;
if (failedCount > 0) {
this.logger.warn(`Failed to load ${failedCount} out of ${recordingResults.length} recordings${scope}`);
}
this.logger.verbose(`Successfully retrieved ${validRecordings.length} recordings${scope}`);
return {
recordings: validRecordings,
isTruncated: Boolean(IsTruncated),
nextContinuationToken: NextContinuationToken
};
} catch (error) {
this.handleError(error, 'Error retrieving all recordings');
throw error;
}
}
async getRecordingMetadata(recordingId: string): Promise<{ recordingInfo: MRec; metadataFilePath: string }> { async getRecordingMetadata(recordingId: string): Promise<{ recordingInfo: MRec; metadataFilePath: string }> {
return this.storageProvider.getRecordingMetadata(recordingId) as Promise<{ try {
recordingInfo: MRec; const redisKey = RedisKeyName.RECORDING + recordingId;
metadataFilePath: string; const storageKey = this.keyBuilder.buildMeetRecordingKey(recordingId);
}>;
const recordingInfo = await this.getFromCacheAndStorage<MRec>(redisKey, storageKey);
if (!recordingInfo) {
throw errorRecordingNotFound(recordingId);
}
this.logger.debug(`Retrieved recording for ${recordingId}`);
return { recordingInfo, metadataFilePath: storageKey };
} catch (error) {
this.logger.error(`Error fetching recording metadata for recording ${recordingId}: ${error}`);
throw error;
}
} }
/** /**
* Retrieves metadata for recordings by their file path. * Deletes a recording and its metadata by recordingId.
* This method handles the path building internally, making it agnostic to storage backend.
* *
* @param recordingPath - The path of the recording file to retrieve metadata for * @param recordingId - The unique identifier of the recording to delete
* @returns A promise that resolves to * @returns Promise that resolves when both binary files and metadata are deleted
*/ */
async getRecordingMetadataByPath(recordingPath: string): Promise<MRec | undefined> { async deleteRecording(recordingId: string): Promise<void> {
return this.storageProvider.getRecordingMetadataByPath(recordingPath) as Promise<MRec>; try {
const redisMetadataKey = RedisKeyName.RECORDING + recordingId;
const storageMetadataKey = this.keyBuilder.buildMeetRecordingKey(recordingId);
const binaryRecordingKey = this.keyBuilder.buildBinaryRecordingKey(recordingId);
this.logger.info(`Deleting recording ${recordingId} with metadata key ${storageMetadataKey}`);
// Delete both metadata and binary files
await Promise.all([
this.deleteFromCacheAndStorage(redisMetadataKey, storageMetadataKey),
this.storageProvider.deleteObject(binaryRecordingKey)
]);
this.logger.verbose(`Successfully deleted recording ${recordingId}`);
} catch (error) {
this.handleError(error, `Error deleting recording ${recordingId}`);
throw error;
}
} }
/** /**
* Retrieves recording media as a readable stream from the storage provider. * Deletes multiple recordings by recordingIds.
* *
* @param recordingPath - The path to the recording file in storage * @param recordingIds - Array of recording identifiers to delete
* @param range - Optional byte range for partial content retrieval * @returns Promise that resolves when all recordings are deleted
* @param range.start - Starting byte position
* @param range.end - Ending byte position
* @returns A Promise that resolves to a Readable stream of the recording media
*/ */
async deleteRecordings(recordingIds: string[]): Promise<void> {
if (recordingIds.length === 0) {
this.logger.debug('No recordings to delete');
return;
}
try {
// Build all paths from recordingIds
const metadataKeys: string[] = [];
const redisKeys: string[] = [];
const binaryKeys: string[] = [];
for (const recordingId of recordingIds) {
redisKeys.push(RedisKeyName.RECORDING + recordingId);
metadataKeys.push(this.keyBuilder.buildMeetRecordingKey(recordingId));
binaryKeys.push(this.keyBuilder.buildBinaryRecordingKey(recordingId));
}
this.logger.debug(`Bulk deleting ${recordingIds.length} recordings`);
// Delete all files in parallel using batch operations
await Promise.all([
this.deleteFromCacheAndStorageBatch(redisKeys, metadataKeys),
this.storageProvider.deleteObjects(binaryKeys)
]);
this.logger.verbose(`Successfully bulk deleted ${recordingIds.length} recordings`);
} catch (error) {
this.handleError(error, `Error deleting recordings: ${recordingIds.join(', ')}`);
throw error;
}
}
async getRecordingMedia( async getRecordingMedia(
recordingPath: string, recordingId: string,
range?: { range?: { end: number; start: number }
end: number; ): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> {
start: number; try {
const binaryRecordingKey = this.keyBuilder.buildBinaryRecordingKey(recordingId);
this.logger.debug(`Retrieving recording media for recording ${recordingId} from ${binaryRecordingKey}`);
const fileSize = await this.getRecordingFileSize(binaryRecordingKey, recordingId);
const validatedRange = this.validateAndAdjustRange(range, fileSize, recordingId);
const fileStream = await this.storageProvider.getObjectAsStream(binaryRecordingKey, validatedRange);
return {
fileSize,
fileStream,
start: validatedRange?.start,
end: validatedRange?.end
};
} catch (error) {
this.logger.error(`Error fetching recording media for recording ${recordingId}: ${error}`);
throw error;
} }
): Promise<Readable> {
return this.storageProvider.getRecordingMedia(recordingPath, range) as Promise<Readable>;
} }
/** // ==========================================
* Deletes multiple recording metadata files by their paths. // USER DOMAIN LOGIC
* // ==========================================
* @param metadataPaths - Array of file paths to the recording metadata files to be deleted
* @returns A Promise that resolves when all metadata files have been successfully deleted
* @throws May throw an error if any of the deletion operations fail
*/
async deleteRecordingMetadataByPaths(metadataPaths: string[]): Promise<void> {
return this.storageProvider.deleteRecordingMetadataByPaths(metadataPaths);
}
/**
* Deletes recording binary files from storage using the provided file paths.
*
* @param recordingPaths - Array of file paths pointing to the recording binary files to be deleted
* @returns A Promise that resolves when all specified recording files have been successfully deleted
* @throws May throw an error if any of the file deletion operations fail
*/
async deleteRecordingBinaryFilesByPaths(recordingPaths: string[]): Promise<void> {
return this.storageProvider.deleteRecordingBinaryFilesByPaths(recordingPaths);
}
/** /**
* Retrieves user data for a specific username. * Retrieves user data for a specific username.
@ -327,7 +485,10 @@ export class MeetStorageService<
* @returns A promise that resolves to the user data, or null if not found * @returns A promise that resolves to the user data, or null if not found
*/ */
async getUser(username: string): Promise<MUser | null> { async getUser(username: string): Promise<MUser | null> {
return this.storageProvider.getUser(username) as Promise<MUser | null>; const redisKey = RedisKeyName.USER + username;
const storageKey = this.keyBuilder.buildUserKey(username);
return await this.getFromCacheAndStorage<MUser>(redisKey, storageKey);
} }
/** /**
@ -337,15 +498,246 @@ export class MeetStorageService<
* @returns A promise that resolves to the saved user data * @returns A promise that resolves to the saved user data
*/ */
async saveUser(user: MUser): Promise<MUser> { async saveUser(user: MUser): Promise<MUser> {
this.logger.info(`Saving user data for ${user.username}`); const { username } = user;
return this.storageProvider.saveUser(user) as Promise<MUser>; const userRedisKey = RedisKeyName.USER + username;
const storageUserKey = this.keyBuilder.buildUserKey(username);
return await this.saveCacheAndStorage(userRedisKey, storageUserKey, user);
}
// ==========================================
// PRIVATE HELPER METHODS
// ==========================================
// ==========================================
// HYBRID CACHE METHODS (Redis + Storage)
// ==========================================
/**
* Saves data to both Redis cache and persistent storage with fallback handling.
*
* @param redisKey - The Redis key to store the data
* @param storageKey - The storage key/path for persistent storage
* @param data - The data to store
* @param redisTtl - Optional TTL for Redis cache (default: 1 hour)
* @returns Promise that resolves when data is saved to at least one location
*/
protected async saveCacheAndStorage<T>(redisKey: string, storageKey: string, data: T): Promise<T> {
const operations = [
// Save to Redis (fast cache)
this.redisService.set(redisKey, JSON.stringify(data)).catch((error) => {
this.logger.warn(`Redis save failed for key ${redisKey}: ${error}`);
return Promise.reject({ type: 'redis', error });
}),
// Save to persistent storage
this.storageProvider.putObject(storageKey, data).catch((error) => {
this.logger.warn(`Storage save failed for key ${storageKey}: ${error}`);
return Promise.reject({ type: 'storage', error });
})
];
try {
// Try to save to both locations
const results = await Promise.allSettled(operations);
const redisResult = results[0];
const storageResult = results[1];
// Check if at least one succeeded
const redisSuccess = redisResult.status === 'fulfilled';
const storageSuccess = storageResult.status === 'fulfilled';
if (!redisSuccess && !storageSuccess) {
// Both failed - this is critical
const redisError = (redisResult as PromiseRejectedResult).reason;
const storageError = (storageResult as PromiseRejectedResult).reason;
this.logger.error(`Save failed for both Redis and Storage:`, {
redisKey,
storageKey,
redisError: redisError.error,
storageError: storageError.error
});
throw new Error(`Failed to save data: Redis (${redisError.error}) and Storage (${storageError.error})`);
}
// Log partial failures
if (!redisSuccess) {
const redisError = (redisResult as PromiseRejectedResult).reason;
this.logger.warn(`Redis save failed but storage succeeded for key ${redisKey}:`, redisError.error);
}
if (!storageSuccess) {
const storageError = (storageResult as PromiseRejectedResult).reason;
this.logger.warn(`Storage save failed but Redis succeeded for key ${storageKey}:`, storageError.error);
}
// Success if at least one location worked
this.logger.debug(`Save completed: Redis=${redisSuccess}, Storage=${storageSuccess}`);
return data;
} catch (error) {
this.handleError(error, `Error saving keys: ${redisKey}, ${storageKey}`);
throw error;
}
}
/**
* Retrieves data from Redis cache first, falls back to storage if not found.
* Updates Redis cache if data is retrieved from storage.
*
* @param redisKey - The Redis key to check first
* @param storageKey - The storage key/path as fallback
* @returns Promise that resolves with the data or null if not found
*/
protected async getFromCacheAndStorage<T>(redisKey: string, storageKey: string): Promise<T | null> {
try {
// 1. Try Redis first (fast cache)
this.logger.debug(`Attempting to get data from Redis cache: ${redisKey}`);
const cachedData = await this.redisService.get(redisKey);
if (cachedData) {
this.logger.debug(`Cache HIT for key: ${redisKey}`);
try {
return JSON.parse(cachedData) as T;
} catch (parseError) {
this.logger.warn(`Failed to parse cached data for key ${redisKey}: ${parseError}`);
// Continue to storage fallback
}
} else {
this.logger.debug(`Cache MISS for key: ${redisKey}`);
}
// 2. Fallback to persistent storage
this.logger.debug(`Attempting to get data from storage: ${storageKey}`);
const storageData = await this.storageProvider.getObject<T>(storageKey);
if (!storageData) {
this.logger.debug(`Data not found in storage for key: ${storageKey}`);
return null;
}
// 3. Found in storage - update Redis cache for next time
this.logger.debug(`Storage HIT for key: ${storageKey}, updating cache`);
try {
await this.redisService.set(redisKey, JSON.stringify(storageData));
this.logger.debug(`Successfully updated cache for key: ${redisKey}`);
} catch (cacheUpdateError) {
// Cache update failure shouldn't affect the main operation
this.logger.warn(`Failed to update cache for key ${redisKey}: ${cacheUpdateError}`);
}
return storageData;
} catch (error) {
this.handleError(error, `Error in hybrid cache get for keys: ${redisKey}, ${storageKey}`);
throw error; // Re-throw unexpected errors
}
}
/**
* Deletes data from both Redis cache and persistent storage.
*
* @param redisKey - The Redis key to delete
* @param storageKey - The storage key to delete
* @returns Promise that resolves when deletion is attempted on both locations
*/
protected async deleteFromCacheAndStorage(redisKey: string, storageKey: string): Promise<void> {
return await this.deleteFromCacheAndStorageBatch([redisKey], [storageKey]);
}
/**
* Deletes data from both Redis cache and persistent storage in batch.
* More efficient than multiple individual delete operations.
*
* @param deletions - Array of objects containing redisKey and storageKey pairs
* @returns Promise that resolves when batch deletion is attempted on both locations
*/
protected async deleteFromCacheAndStorageBatch(redisKeys: string[], storageKeys: string[]): Promise<void> {
if (redisKeys.length === 0 && storageKeys.length === 0) {
this.logger.debug('No keys to delete in batch');
return;
}
this.logger.debug(`Batch deleting ${redisKeys.length} Redis keys and ${storageKeys.length} storage keys`);
const operations = [
// Batch delete from Redis (only if there are keys to delete)
redisKeys.length > 0
? this.redisService.delete(redisKeys).catch((error) => {
this.logger.warn(`Redis batch delete failed: ${error}`);
return Promise.reject({ type: 'redis', error, affectedKeys: redisKeys });
})
: Promise.resolve(0),
// Batch delete from storage (only if there are keys to delete)
storageKeys.length > 0
? this.storageProvider.deleteObjects(storageKeys).catch((error) => {
this.logger.warn(`Storage batch delete failed: ${error}`);
return Promise.reject({ type: 'storage', error, affectedKeys: storageKeys });
})
: Promise.resolve()
];
try {
const results = await Promise.allSettled(operations);
const redisResult = results[0];
const storageResult = results[1];
const redisSuccess = redisResult.status === 'fulfilled';
const storageSuccess = storageResult.status === 'fulfilled';
if (redisKeys.length > 0) {
if (redisSuccess) {
const deletedCount = (redisResult as PromiseFulfilledResult<number>).value;
this.logger.debug(`Redis batch delete succeeded: ${deletedCount} keys deleted`);
} else {
const redisError = (redisResult as PromiseRejectedResult).reason;
this.logger.warn(`Redis batch delete failed:`, redisError.error);
}
}
if (storageKeys.length > 0) {
if (storageSuccess) {
this.logger.debug(`Storage batch delete succeeded: ${storageKeys.length} keys deleted`);
} else {
const storageError = (storageResult as PromiseRejectedResult).reason;
this.logger.warn(`Storage batch delete failed:`, storageError.error);
}
}
this.logger.debug(`Batch delete completed: Redis=${redisSuccess}, Storage=${storageSuccess}`);
} catch (error) {
this.handleError(error, `Error in batch delete operation`);
throw error;
}
}
/**
* Invalidates Redis cache for a specific key.
* Useful when you know data has changed and want to force next read from storage.
*/
protected async invalidateCache(redisKey: string): Promise<void> {
try {
await this.redisService.delete(redisKey);
this.logger.debug(`Cache invalidated for key: ${redisKey}`);
} catch (error) {
this.logger.warn(`Failed to invalidate cache for key ${redisKey}: ${error}`);
// Don't throw - cache invalidation failure shouldn't break main flow
}
} }
/** /**
* Returns the default global preferences. * Returns the default global preferences.
* @returns {GPrefs} * @returns {GPrefs}
*/ */
protected async getDefaultPreferences(): Promise<GPrefs> { protected buildDefaultPreferences(): GPrefs {
return { return {
projectId: MEET_NAME_ID, projectId: MEET_NAME_ID,
webhooksPreferences: { webhooksPreferences: {
@ -363,16 +755,63 @@ export class MeetStorageService<
} as GPrefs; } as GPrefs;
} }
/** protected async getRecordingFileSize(key: string, recordingId: string): Promise<number> {
* Handles errors and logs them. const { contentLength: fileSize } = await this.storageProvider.getObjectHeaders(key);
* @param {any} error
* @param {string} message if (!fileSize) {
*/ this.logger.warn(`Recording media not found for recording ${recordingId}`);
protected handleError(error: OpenViduMeetError | unknown, message: string) { throw errorRecordingNotFound(recordingId);
}
return fileSize;
}
protected validateAndAdjustRange(
range: { end: number; start: number } | undefined,
fileSize: number,
recordingId: string
): { start: number; end: number } | undefined {
if (!range) return undefined;
const { start, end: originalEnd } = range;
// Validate input values
if (isNaN(start) || isNaN(originalEnd) || start < 0) {
this.logger.warn(`Invalid range values for recording ${recordingId}: start=${start}, end=${originalEnd}`);
this.logger.warn(`Returning full stream for recording ${recordingId}`);
return undefined;
}
// Check if start is beyond file size
if (start >= fileSize) {
this.logger.error(
`Invalid range: start=${start} exceeds fileSize=${fileSize} for recording ${recordingId}`
);
throw errorRecordingRangeNotSatisfiable(recordingId, fileSize);
}
// Adjust end to not exceed file bounds
const adjustedEnd = Math.min(originalEnd, fileSize - 1);
// Validate final range
if (start > adjustedEnd) {
this.logger.warn(
`Invalid range after adjustment: start=${start}, end=${adjustedEnd} for recording ${recordingId}`
);
return undefined;
}
this.logger.debug(
`Valid range for recording ${recordingId}: start=${start}, end=${adjustedEnd}, fileSize=${fileSize}`
);
return { start, end: adjustedEnd };
}
protected handleError(error: unknown, context: string): void {
if (error instanceof OpenViduMeetError) { if (error instanceof OpenViduMeetError) {
this.logger.error(`${message}: ${error.message}`); this.logger.error(`${context}: ${error.message}`);
} else { } else {
this.logger.error(`${message}: Unexpected error`); this.logger.error(`${context}: ${error}`);
} }
} }
} }

View File

@ -1,4 +1,4 @@
import { afterEach, beforeAll, describe, expect, it } from '@jest/globals'; import { beforeAll, beforeEach, describe, expect, it } from '@jest/globals';
import { expectValidationError } from '../../../helpers/assertion-helpers.js'; import { expectValidationError } from '../../../helpers/assertion-helpers.js';
import { import {
getSecurityPreferences, getSecurityPreferences,
@ -6,6 +6,8 @@ import {
updateSecurityPreferences updateSecurityPreferences
} from '../../../helpers/request-helpers.js'; } from '../../../helpers/request-helpers.js';
import { AuthMode, AuthType } from '../../../../src/typings/ce/index.js'; import { AuthMode, AuthType } from '../../../../src/typings/ce/index.js';
import { container } from '../../../../src/config/dependency-injector.config.js';
import { MeetStorageService } from '../../../../src/services/index.js';
const defaultPreferences = { const defaultPreferences = {
authentication: { authentication: {
@ -16,17 +18,18 @@ const defaultPreferences = {
} }
}; };
const restoreDefaultSecurityPreferences = async () => { const restoreDefaultGlobalPreferences = async () => {
await updateSecurityPreferences(defaultPreferences); const defaultPref = await container.get(MeetStorageService)['buildDefaultPreferences']();
await container.get(MeetStorageService).saveGlobalPreferences(defaultPref);
}; };
describe('Security Preferences API Tests', () => { describe('Security Preferences API Tests', () => {
beforeAll(() => { beforeAll(async () => {
startTestServer(); startTestServer();
}); });
afterEach(async () => { beforeEach(async () => {
await restoreDefaultSecurityPreferences(); await restoreDefaultGlobalPreferences();
}); });
describe('Update security preferences', () => { describe('Update security preferences', () => {

View File

@ -13,8 +13,9 @@ import {
import { setupMultiRecordingsTestContext } from '../../../helpers/test-scenarios'; import { setupMultiRecordingsTestContext } from '../../../helpers/test-scenarios';
describe('Recording API Tests', () => { describe('Recording API Tests', () => {
beforeAll(() => { beforeAll(async () => {
startTestServer(); startTestServer();
await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
}); });
afterAll(async () => { afterAll(async () => {

View File

@ -15,8 +15,13 @@ import {
import { setupMultiRecordingsTestContext } from '../../../helpers/test-scenarios'; import { setupMultiRecordingsTestContext } from '../../../helpers/test-scenarios';
describe('Recording API Tests', () => { describe('Recording API Tests', () => {
beforeAll(() => { beforeAll(async () => {
startTestServer(); startTestServer();
await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
});
afterAll(async () => {
await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
}); });
describe('Delete Recording Tests', () => { describe('Delete Recording Tests', () => {

View File

@ -17,6 +17,8 @@ describe('Recording API Tests', () => {
beforeAll(async () => { beforeAll(async () => {
startTestServer(); startTestServer();
await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
const testContext = await setupMultiRecordingsTestContext(1, 1, 1, '3s'); const testContext = await setupMultiRecordingsTestContext(1, 1, 1, '3s');
const roomData = testContext.getRoomByIndex(0)!; const roomData = testContext.getRoomByIndex(0)!;

View File

@ -18,7 +18,8 @@ describe('Recording API Tests', () => {
beforeAll(async () => { beforeAll(async () => {
startTestServer(); startTestServer();
await deleteAllRecordings(); await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
// Create a room and join a participant // Create a room and join a participant
context = await setupMultiRecordingsTestContext(1, 1, 1); context = await setupMultiRecordingsTestContext(1, 1, 1);
({ room, moderatorCookie, recordingId = '' } = context.getRoomByIndex(0)!); ({ room, moderatorCookie, recordingId = '' } = context.getRoomByIndex(0)!);

View File

@ -1,4 +1,4 @@
import { afterAll, afterEach, beforeAll, describe, expect, it } from '@jest/globals'; import { afterAll, beforeAll, beforeEach, describe, expect, it } from '@jest/globals';
import { MeetRecordingInfo, MeetRecordingStatus, MeetRoom } from '../../../../src/typings/ce/index.js'; import { MeetRecordingInfo, MeetRecordingStatus, MeetRoom } from '../../../../src/typings/ce/index.js';
import { import {
expectSuccessListRecordingResponse, expectSuccessListRecordingResponse,
@ -28,16 +28,14 @@ describe('Recordings API Tests', () => {
beforeAll(async () => { beforeAll(async () => {
startTestServer(); startTestServer();
await deleteAllRecordings();
}); });
describe('List Recordings Tests', () => { describe('List Recordings Tests', () => {
afterEach(async () => { beforeEach(async () => {
await deleteAllRecordings(); await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
const response = await getAllRecordings(); const response = await getAllRecordings();
expect(response.status).toBe(200); expect(response.status).toBe(200);
expectSuccessListRecordingResponse(response, 0, false, false); expectSuccessListRecordingResponse(response, 0, false, false);
}); });
afterAll(async () => { afterAll(async () => {

View File

@ -33,6 +33,8 @@ describe('Recording API Race Conditions Tests', () => {
beforeAll(async () => { beforeAll(async () => {
startTestServer(); startTestServer();
await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
recordingService = container.get(RecordingService); recordingService = container.get(RecordingService);
}); });

View File

@ -24,15 +24,15 @@ describe('Recording API Tests', () => {
let context: TestContext | null = null; let context: TestContext | null = null;
let room: MeetRoom, moderatorCookie: string; let room: MeetRoom, moderatorCookie: string;
beforeAll(() => { beforeAll(async () => {
startTestServer(); startTestServer();
await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
}); });
afterAll(async () => { afterAll(async () => {
await stopAllRecordings(moderatorCookie); await stopAllRecordings(moderatorCookie);
await disconnectFakeParticipants(); await disconnectFakeParticipants();
await deleteAllRooms(); await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
await deleteAllRecordings();
}); });
describe('Start Recording Tests', () => { describe('Start Recording Tests', () => {

View File

@ -18,6 +18,8 @@ describe('Recording API Tests', () => {
beforeAll(async () => { beforeAll(async () => {
startTestServer(); startTestServer();
await Promise.all([deleteAllRooms(), deleteAllRecordings()]);
}); });
afterAll(async () => { afterAll(async () => {

View File

@ -25,14 +25,14 @@ describe('Room API Tests', () => {
}); });
describe('Room Creation Tests', () => { describe('Room Creation Tests', () => {
it('Should create a room without autoDeletionDate (default behavior)', async () => { it('Should create a room without autoDeletionDate (default behavior)', async () => {
const room = await createRoom({ const room = await createRoom({
roomIdPrefix: ' Test Room ' roomIdPrefix: ' Test Room '
}); });
expectValidRoom(room, 'TestRoom'); expectValidRoom(room, 'TestRoom');
}); });
it('Should create a room with a valid autoDeletionDate', async () => { it('Should create a room with a valid autoDeletionDate', async () => {
const room = await createRoom({ const room = await createRoom({
autoDeletionDate: validAutoDeletionDate, autoDeletionDate: validAutoDeletionDate,
roomIdPrefix: ' .,-------}{¡$#<+My Room *123 ' roomIdPrefix: ' .,-------}{¡$#<+My Room *123 '
@ -41,7 +41,7 @@ describe('Room API Tests', () => {
expectValidRoom(room, 'MyRoom123', validAutoDeletionDate); expectValidRoom(room, 'MyRoom123', validAutoDeletionDate);
}); });
it('Should create a room when sending full valid payload', async () => { it('Should create a room when sending full valid payload', async () => {
const payload = { const payload = {
roomIdPrefix: ' =Example Room&/ ', roomIdPrefix: ' =Example Room&/ ',
autoDeletionDate: validAutoDeletionDate, autoDeletionDate: validAutoDeletionDate,

View File

@ -9,7 +9,7 @@ export interface AuthenticationPreferences {
export const enum AuthMode { export const enum AuthMode {
NONE = 'none', // No authentication required NONE = 'none', // No authentication required
MODERATORS_ONLY = 'moderators_only', // Only moderators need authentication MODERATORS_ONLY = 'moderators_only', // Only moderators need authentication
ALL_USERS = 'all_users' // All users need authentication ALL_USERS = 'all_users', // All users need authentication
} }
/** /**
@ -23,7 +23,7 @@ export interface AuthMethod {
* Enum for authentication types. * Enum for authentication types.
*/ */
export const enum AuthType { export const enum AuthType {
SINGLE_USER = 'single-user' SINGLE_USER = 'single-user',
// MULTI_USER = 'multi-user', // MULTI_USER = 'multi-user',
// OAUTH_ONLY = 'oauth-only' // OAUTH_ONLY = 'oauth-only'
} }
@ -54,7 +54,8 @@ export interface SingleUserAuth extends AuthMethod {
/** /**
* Union type for allowed authentication methods. * Union type for allowed authentication methods.
*/ */
export type ValidAuthMethod = SingleUserAuth /* | MultiUserAuth | OAuthOnlyAuth */; export type ValidAuthMethod =
SingleUserAuth /* | MultiUserAuth | OAuthOnlyAuth */;
/** /**
* Configuration for OAuth authentication. * Configuration for OAuth authentication.

View File

@ -34,5 +34,5 @@ export interface ParticipantPermissions {
*/ */
export const enum ParticipantRole { export const enum ParticipantRole {
MODERATOR = 'moderator', MODERATOR = 'moderator',
PUBLISHER = 'publisher' PUBLISHER = 'publisher',
} }

View File

@ -19,7 +19,7 @@ export const enum MeetRecordingAccess {
ADMIN = 'admin', // Only admins can access the recording ADMIN = 'admin', // Only admins can access the recording
ADMIN_MODERATOR = 'admin-moderator', // Admins and moderators can access ADMIN_MODERATOR = 'admin-moderator', // Admins and moderators can access
ADMIN_MODERATOR_PUBLISHER = 'admin-moderator-publisher', // Admins, moderators and publishers can access ADMIN_MODERATOR_PUBLISHER = 'admin-moderator-publisher', // Admins, moderators and publishers can access
PUBLIC = 'public' // Everyone can access PUBLIC = 'public', // Everyone can access
} }
export interface MeetChatPreferences { export interface MeetChatPreferences {

View File

@ -7,7 +7,7 @@ export interface User {
export const enum UserRole { export const enum UserRole {
ADMIN = 'admin', ADMIN = 'admin',
USER = 'user', USER = 'user',
APP = 'app' APP = 'app',
} }
export type UserDTO = Omit<User, 'passwordHash'>; export type UserDTO = Omit<User, 'passwordHash'>;