From e69f1dfb4b9e5f3d1e667a92d2734f8f86faae49 Mon Sep 17 00:00:00 2001 From: Carlos Santos <4a.santos@gmail.com> Date: Thu, 20 Mar 2025 19:51:18 +0100 Subject: [PATCH] backend: Add recording request validation middleware and refactor recording metadata handling --- backend/openapi/openvidu-meet-api.yaml | 132 +++++--- .../src/controllers/recording.controller.ts | 15 +- backend/src/helpers/recording.helper.ts | 43 ++- backend/src/middlewares/index.ts | 1 + .../recording-validator.middleware.ts | 80 +++++ .../src/services/livekit-webhook.service.ts | 11 +- backend/src/services/recording.service.ts | 311 +++++++++++------- backend/src/services/s3.service.ts | 109 +++++- 8 files changed, 505 insertions(+), 197 deletions(-) create mode 100644 backend/src/middlewares/request-validators/recording-validator.middleware.ts diff --git a/backend/openapi/openvidu-meet-api.yaml b/backend/openapi/openvidu-meet-api.yaml index 7ada585..23b1778 100644 --- a/backend/openapi/openvidu-meet-api.yaml +++ b/backend/openapi/openvidu-meet-api.yaml @@ -156,15 +156,45 @@ paths: For example: "roomName,preferences". Only allowed fields will be returned. schema: type: string + - name: page + in: query + required: false + description: The page number for pagination (default is 1). + schema: + type: integer + default: 1 + - name: limit + in: query + required: false + description: The number of rooms per page (default is 10). + schema: + type: integer + default: 10 responses: '200': description: Successfully retrieved the list of OpenVidu Meet rooms content: application/json: schema: - type: array - items: - $ref: '#/components/schemas/OpenViduMeetRoom' + type: object + properties: + rooms: + type: array + items: + $ref: '#/components/schemas/OpenViduMeetRoom' + pagination: + type: object + properties: + totalItems: + type: integer + description: Total number of rooms. + totalPages: + type: integer + description: Total number of pages. + currentPage: + type: integer + description: Current page number. + '401': description: Unauthorized — The API key is missing or invalid content: @@ -398,7 +428,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/OpenViduMeetRecording' + $ref: '#/components/schemas/MeetRecording' '401': description: Unauthorized — The API key is missing or invalid content: @@ -469,23 +499,44 @@ paths: in: query required: false description: | - The status of the recordings to retrieve. Possible values are "STARTING", "STARTED", "STOPPING", "STOPPED", "FAILED" and "READY". + Filter recordings by their status. + + Possible values: + - `STARTING` + - `ACTIVE` + - `ENDING` + - `COMPLETE` + - `FAILED` + - `ABORTED` + - `LIMITED_REACHED` + + You can provide multiple statuses as a comma-separated list (e.g., `status=ACTIVE,FAILED`). + If not specified, recordings with any status will be returned. + + > ⚠️ **Note:** Using this filter with multiple values or partial matches may impact performance for large datasets. schema: type: string - - name: page + - name: roomId in: query required: false - description: The page number for pagination (default is 1). + description: | + The unique identifier of the room for which you want to retrieve recordings. + If not provided, recordings from all rooms will be returned. schema: - type: integer - default: 1 - - name: limit + type: string + - name: maxItems in: query required: false description: The number of recordings per page (default is 10). Maximum is 100. schema: type: integer default: 10 + - name: nextPageToken + in: query + required: false + description: The token to retrieve the next page of recordings. + schema: + type: string responses: '200': description: Successfully retrieved the list of OpenVidu Meet recordings @@ -497,19 +548,16 @@ paths: recordings: type: array items: - $ref: '#/components/schemas/OpenViduMeetRecording' + $ref: '#/components/schemas/MeetRecording' pagination: type: object properties: - totalItems: - type: integer - description: Total number of recordings. - totalPages: - type: integer - description: Total number of pages. - currentPage: - type: integer - description: Current page number. + isTruncated: + type: boolean + description: Indicates if there are more recordings to retrieve. + nextPageToken: + type: string + description: The token to retrieve the next page of recordings. '401': description: Unauthorized — The API key is missing or invalid @@ -642,7 +690,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/OpenViduMeetRecording' + $ref: '#/components/schemas/MeetRecording' '401': description: Unauthorized — The API key is missing or invalid @@ -1067,24 +1115,19 @@ components: description: > The URL for the viewer to join the room. The viewer has read-only permissions to watch the room and participants. - OpenViduMeetRecording: + MeetRecording: type: object properties: - id: + recordingId: type: string - example: 'recording_123456' + example: 'room-123--EG_XYZ--XX445' description: > The unique identifier of the recording. - roomName: + roomId: type: string - example: 'OpenVidu-123456' + example: 'room-123' description: > - The name of the room where the recording was made. - # roomId: - # type: string - # example: '123456' - # description: > - # The unique identifier of the room where the recording was made. + The ID of the room where the recording was made. outputMode: type: string example: 'COMPOSED' @@ -1092,20 +1135,20 @@ components: The output mode of the recording. Possible values are "COMPOSED". status: type: string - example: 'READY' + example: 'ACTIVE' description: > - The status of the recording. Possible values are "STARTING", "STARTED", "STOPPING", "STOPPED", "FAILED" and "READY". + The status of the recording. Possible values are "STARTING", "ACTIVE", "ENDING", "COMPLETE", "FAILED", "ABORTED" and "LIMITED_REACHED". filename: type: string - example: 'recording_123456.mp4' + example: 'room-123--XX445.mp4' description: > The name of the recording file. - startedAt: + startDate: type: number example: 1620000000000 description: > The date when the recording was started in milliseconds since the Unix epoch. - endedAt: + endDate: type: number example: 1620000000000 description: > @@ -1120,6 +1163,21 @@ components: example: 1024 description: > The size of the recording file in bytes. + errorCode: + type: number + example: 100 + description: > + The error code of the recording. + error: + type: string + example: 'error' + description: > + The error message of the recording. + details: + type: string + example: 'Stopped using API' + description: > + Additional details about the recording. Error: type: object required: diff --git a/backend/src/controllers/recording.controller.ts b/backend/src/controllers/recording.controller.ts index ba36864..e088484 100644 --- a/backend/src/controllers/recording.controller.ts +++ b/backend/src/controllers/recording.controller.ts @@ -32,13 +32,10 @@ export const getRecordings = async (req: Request, res: Response) => { try { logger.info('Getting all recordings'); - const { status, page, limit } = req.query; + const queryParams = req.query; - // const continuationToken = req.query.continuationToken as string; - const response = await recordingService.getAllRecordings(); - return res - .status(200) - .json({ recordings: response.recordingInfo, continuationToken: response.continuationToken }); + const response = await recordingService.getAllRecordings(queryParams); + return res.status(200).json(response); } catch (error) { if (error instanceof OpenViduMeetError) { logger.error(`Error getting all recordings: ${error.message}`); @@ -58,8 +55,7 @@ export const bulkDeleteRecordings = async (req: Request, res: Response) => { const recordingService = container.get(RecordingService); // TODO: Check role to determine if the request is from an admin or a participant - const role = req.body.payload.metadata.role; - await recordingService.bulkDeleteRecordings(recordingIds, role); + await recordingService.bulkDeleteRecordings(recordingIds); return res.status(204).json(); } catch (error) { @@ -121,8 +117,7 @@ export const deleteRecording = async (req: Request, res: Response) => { const recordingService = container.get(RecordingService); // TODO: Check role to determine if the request is from an admin or a participant - const role = req.body.payload.metadata.role; - const recordingInfo = await recordingService.deleteRecording(recordingId, role); + const recordingInfo = await recordingService.deleteRecording(recordingId); return res.status(204).json(recordingInfo); } catch (error) { diff --git a/backend/src/helpers/recording.helper.ts b/backend/src/helpers/recording.helper.ts index 6974fed..540740e 100644 --- a/backend/src/helpers/recording.helper.ts +++ b/backend/src/helpers/recording.helper.ts @@ -11,9 +11,10 @@ export class RecordingHelper { const startDateMs = RecordingHelper.extractStartDate(egressInfo); const endDateMs = RecordingHelper.extractEndDate(egressInfo); const filename = RecordingHelper.extractFilename(egressInfo); + const uid = RecordingHelper.extractUidFromFilename(filename); const { egressId, roomName, errorCode, error, details } = egressInfo; return { - recordingId: egressId, + recordingId: `${roomName}--${egressId}--${uid}`, roomId: roomName, outputMode, status, @@ -76,16 +77,19 @@ export class RecordingHelper { return MeetRecordingOutputMode.COMPOSED; } - static extractFilename(recordingInfo: MeetRecordingInfo): string | undefined; + /** + * Extracts the filename/path for storing the recording. + * For EgressInfo, returns the last segment of the fileResults. + * For MeetRecordingInfo, returns a combination of roomId and filename. + */ + static extractFilename(recordingInfo: MeetRecordingInfo): string; - static extractFilename(egressInfo: EgressInfo): string | undefined; - - static extractFilename(info: MeetRecordingInfo | EgressInfo): string | undefined { - if (!info) return undefined; + static extractFilename(egressInfo: EgressInfo): string; + static extractFilename(info: MeetRecordingInfo | EgressInfo): string { if ('request' in info) { // EgressInfo - return info.fileResults?.[0]?.filename.split('/').pop(); + return info.fileResults[0]!.filename.split('/').pop()!; } else { // MeetRecordingInfo const { filename, roomId } = info; @@ -94,6 +98,31 @@ export class RecordingHelper { } } + /** + * Extracts the UID from the given filename. + * + * @param filename room-123--{uid}.mp4 + * @returns + */ + static extractUidFromFilename(filename: string): string { + const uidWithExtension = filename.split('--')[1]; + return uidWithExtension.split('.')[0]; + } + + /** + * Extracts the room name, egressId, and UID from the given recordingId. + * @param recordingId ${roomId}--${egressId}--${uid} + */ + static extractInfoFromRecordingId(recordingId: string): { roomId: string; egressId: string; uid: string } { + const [roomId, egressId, uid] = recordingId.split('--'); + + if (!roomId || !egressId || !uid) { + throw new Error(`Invalid recordingId format: ${recordingId}`); + } + + return { roomId, egressId, uid }; + } + /** * Extracts the duration from the given egress information. * If the duration is not available, it returns 0. diff --git a/backend/src/middlewares/index.ts b/backend/src/middlewares/index.ts index a5e3008..ddf3c22 100644 --- a/backend/src/middlewares/index.ts +++ b/backend/src/middlewares/index.ts @@ -4,3 +4,4 @@ export * from './content-type.middleware.js'; export * from './openapi.middleware.js'; export * from './request-validators/participant-validator.middleware.js'; export * from './request-validators/room-validator.middleware.js'; +export * from './request-validators/recording-validator.middleware.js'; diff --git a/backend/src/middlewares/request-validators/recording-validator.middleware.ts b/backend/src/middlewares/request-validators/recording-validator.middleware.ts new file mode 100644 index 0000000..6a9e7d6 --- /dev/null +++ b/backend/src/middlewares/request-validators/recording-validator.middleware.ts @@ -0,0 +1,80 @@ +import { Request, Response, NextFunction } from 'express'; +import { z } from 'zod'; + +const RecordingPostRequestSchema = z.object({ + roomId: z + .string() + .min(1, { message: 'roomId is required and cannot be empty' }) + .transform((val) => val.trim().replace(/\s+/g, '-')) +}); + +const getRecordingsSchema = z.object({ + maxItems: z.coerce + .number() + .int() + .optional() + .transform((val = 10) => (val > 100 ? 100 : val)) + .default(10), + status: z.string().optional(), + roomId: z.string().optional(), + nextPageToken: z.string().optional() +}); + +/** + * Middleware to validate the recording post request. + * + * This middleware uses the `RecordingPostRequestSchema` to validate the request body. + * If the validation fails, it rejects the request with an error response. + * If the validation succeeds, it passes control to the next middleware or route handler. + * + */ +export const withValidRecordingPostRequest = (req: Request, res: Response, next: NextFunction) => { + const { success, error } = RecordingPostRequestSchema.safeParse(req.body); + + if (!success) { + return rejectRequest(res, error); + } + + next(); +}; + +export const withValidGetRecordingsRequest = (req: Request, res: Response, next: NextFunction) => { + const { success, error, data } = getRecordingsSchema.safeParse(req.query); + + if (!success) { + return rejectRequest(res, error); + } + + req.query = { + ...data, + maxItems: data.maxItems?.toString() + }; + next(); +}; + +export const withValidRecordingBulkDeleteRequest = (req: Request, res: Response, next: NextFunction) => { + const { success, error } = z + .array(z.string().min(1, { message: 'recordingIds must be a non-empty string' })) + .safeParse(req.body); + + if (!success) { + return rejectRequest(res, error); + } + + next(); +}; + +const rejectRequest = (res: Response, error: z.ZodError) => { + const errors = error.errors.map((error) => ({ + field: error.path.join('.'), + message: error.message + })); + + console.log(errors); + + return res.status(422).json({ + error: 'Unprocessable Entity', + message: 'Invalid request body', + details: errors + }); +}; diff --git a/backend/src/services/livekit-webhook.service.ts b/backend/src/services/livekit-webhook.service.ts index b218918..6cfa4c1 100644 --- a/backend/src/services/livekit-webhook.service.ts +++ b/backend/src/services/livekit-webhook.service.ts @@ -150,8 +150,8 @@ export class LivekitWebhookService { this.logger.debug(`Processing recording ${webhookAction} webhook.`); const recordingInfo: MeetRecordingInfo = RecordingHelper.toRecordingInfo(egressInfo); - const metadataPath = this.generateMetadataPath(recordingInfo); const { roomId, recordingId, status } = recordingInfo; + const metadataPath = this.generateMetadataPath(recordingId); this.logger.debug(`Recording '${recordingId}' for room '${roomId}' is in status '${status}'`); @@ -189,10 +189,9 @@ export class LivekitWebhookService { } } - protected generateMetadataPath(recordingInfo: MeetRecordingInfo): string { - const { roomId, recordingId } = recordingInfo; - // Remove file extension from filename - const recordingFilename = recordingInfo.filename?.split('.')[0]; - return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${recordingFilename}-${recordingId}.json`; + protected generateMetadataPath(recordingId: string): string { + const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId); + + return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`; } } diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index fb826a3..f6b2e45 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -21,6 +21,13 @@ import { RedisLockName } from '../models/index.js'; import ms from 'ms'; import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js'; +type GetAllRecordingsParams = { + maxItems?: number; + nextPageToken?: string; + roomId?: string; + status?: string; +}; + @injectable() export class RecordingService { protected readonly RECORDING_ACTIVE_LOCK_TTL = ms('6h'); @@ -32,39 +39,41 @@ export class RecordingService { @inject(LoggerService) protected logger: LoggerService ) {} - async startRecording(roomName: string): Promise { + async startRecording(roomId: string): Promise { let acquiredLock: RedisLock | null = null; try { // Attempt to acquire lock. // Note: using a high TTL to prevent expiration during a long recording. - acquiredLock = await this.acquireRoomRecordingActiveLock(roomName); + acquiredLock = await this.acquireRoomRecordingActiveLock(roomId); - if (!acquiredLock) throw errorRecordingAlreadyStarted(roomName); + if (!acquiredLock) throw errorRecordingAlreadyStarted(roomId); - const room = await this.roomService.getOpenViduRoom(roomName); + const room = await this.roomService.getOpenViduRoom(roomId); - if (!room) throw errorRoomNotFound(roomName); + if (!room) throw errorRoomNotFound(roomId); const options = this.generateCompositeOptionsFromRequest(); - const output = this.generateFileOutputFromRequest(roomName); - const egressInfo = await this.livekitService.startRoomComposite(roomName, output, options); + const output = this.generateFileOutputFromRequest(roomId); + const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options); // Return recording info without releasing the lock here, // as it will be released in handleEgressEnded on successful completion. return RecordingHelper.toRecordingInfo(egressInfo); } catch (error) { - this.logger.error(`Error starting recording in room ${roomName}: ${error}`); + this.logger.error(`Error starting recording in room ${roomId}: ${error}`); - if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomName); + if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomId); throw error; } } - async stopRecording(egressId: string): Promise { + async stopRecording(recordingId: string): Promise { try { - const egressArray = await this.livekitService.getActiveEgress(undefined, egressId); + const { roomId, egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId); + + const egressArray = await this.livekitService.getActiveEgress(roomId, egressId); if (egressArray.length === 0) { throw errorRecordingNotFound(egressId); @@ -74,121 +83,144 @@ export class RecordingService { return RecordingHelper.toRecordingInfo(egressInfo); } catch (error) { - this.logger.error(`Error stopping recording ${egressId}: ${error}`); + this.logger.error(`Error stopping recording ${recordingId}: ${error}`); throw error; } } - // TODO: Implement deleteRecording method - async deleteRecording(egressId: string, role: string): Promise { - try { - const { metadataFilePath, recordingInfo } = await this.getMeetRecordingInfoFromMetadata(egressId); - - if ( - recordingInfo.status === MeetRecordingStatus.STARTING || - recordingInfo.status === MeetRecordingStatus.ACTIVE || - recordingInfo.status === MeetRecordingStatus.ENDING - ) { - throw errorRecordingNotStopped(egressId); - } - - const recordingPath = RecordingHelper.extractFilename(recordingInfo); - - if (!recordingPath) throw internalError(`Error extracting path from recording ${egressId}`); - - this.logger.info(`Deleting recording from S3 ${recordingPath}`); - - await Promise.all([ - this.s3Service.deleteObject(metadataFilePath), - this.s3Service.deleteObject(recordingPath) - ]); - - return recordingInfo; - } catch (error) { - this.logger.error(`Error deleting recording ${egressId}: ${error}`); - throw error; - } - } - - // TODO: Implement bulkDeleteRecordings method - async bulkDeleteRecordings(egressIds: string[], role: string): Promise { - const promises = egressIds.map((egressId) => this.deleteRecording(egressId, role)); - return Promise.all(promises); - } - /** - * Retrieves the list of all recordings. - * @returns A promise that resolves to an array of RecordingInfo objects. + * Deletes a recording from the S3 bucket based on the provided egress ID. + * + * The recording is deleted only if it is not in progress state (STARTING, ACTIVE, ENDING). + * @param recordingId - The egress ID of the recording. */ - //TODO: Implement getAllRecordings method - async getAllRecordings(): Promise<{ recordingInfo: MeetRecordingInfo[]; continuationToken?: string }> { + async deleteRecording(recordingId: string): Promise { try { - const allEgress = await this.s3Service.listObjects('.metadata', '.json'); - const promises: Promise[] = []; + // Get the recording metada and recording info from the S3 bucket + const { filesToDelete, recordingInfo } = await this.getDeletableRecordingData(recordingId); - allEgress.Contents?.forEach((item) => { - if (item?.Key?.includes('.json')) { + this.logger.verbose( + `Deleting recording from S3. Files: ${filesToDelete.join(', ')} for recordingId ${recordingId}` + ); + await this.s3Service.deleteObjects(filesToDelete); + this.logger.info(`Deletion successful for recording ${recordingId}`); + + return recordingInfo; + } catch (error) { + this.logger.error(`Error deleting recording ${recordingId}: ${error}`); + throw error; + } + } + + /** + * Deletes multiple recordings in bulk from S3. + * For each provided egressId, the metadata and recording file are deleted (only if the status is stopped). + * + * @param egressIds Array of recording identifiers. + * @returns An array with the MeetRecordingInfo of the successfully deleted recordings. + */ + async bulkDeleteRecordings(egressIds: string[]): Promise { + const keysToDelete: string[] = []; + const deletedRecordings: MeetRecordingInfo[] = []; + + for (const egressId of egressIds) { + try { + const { filesToDelete, recordingInfo } = await this.getDeletableRecordingData(egressId); + keysToDelete.push(...filesToDelete); + deletedRecordings.push(recordingInfo); + this.logger.verbose(`BulkDelete: Prepared recording ${egressId} for deletion.`); + } catch (error) { + this.logger.error(`BulkDelete: Error processing recording ${egressId}: ${error}`); + } + } + + if (keysToDelete.length > 0) { + try { + await this.s3Service.deleteObjects(keysToDelete); + this.logger.info(`BulkDelete: Successfully deleted ${keysToDelete.length} objects from S3.`); + } catch (error) { + this.logger.error(`BulkDelete: Error performing bulk deletion: ${error}`); + throw error; + } + } else { + this.logger.warn(`BulkDelete: No eligible recordings found for deletion.`); + } + + return deletedRecordings; + } + + /** + * Retrieves the recording information for a given recording ID. + * @param recordingId - The unique identifier of the recording. + * @returns A promise that resolves to a MeetRecordingInfo object. + */ + async getRecording(recordingId: string): Promise { + const { recordingInfo } = await this.getMeetRecordingInfoFromMetadata(recordingId); + + return recordingInfo; + } + + /** + * Retrieves a paginated list of all recordings stored in the S3 bucket. + * + * @param maxItems - The maximum number of items to retrieve in a single request. + * @param nextPageToken - (Optional) A token to retrieve the next page of results. + * @returns A promise that resolves to an object containing: + * - `recordings`: An array of `MeetRecordingInfo` objects representing the recordings. + * - `isTruncated`: A boolean indicating whether there are more items to retrieve. + * - `nextPageToken`: (Optional) A token to retrieve the next page of results, if available. + * @throws Will throw an error if there is an issue retrieving the recordings. + */ + async getAllRecordings({ maxItems, nextPageToken, roomId, status }: GetAllRecordingsParams): Promise<{ + recordings: MeetRecordingInfo[]; + isTruncated: boolean; + nextPageToken?: string; + }> { + try { + const roomPrefix = roomId ? `/${roomId}` : ''; + const { Contents, IsTruncated, NextContinuationToken } = await this.s3Service.listObjectsPaginated( + `${MEET_S3_RECORDINGS_PREFIX}/.metadata${roomPrefix}`, + maxItems, + nextPageToken + ); + + if (!Contents) { + this.logger.verbose('No recordings found. Returning an empty array.'); + return { recordings: [], isTruncated: false }; + } + + const promises: Promise[] = []; + Contents.forEach((item) => { + if (item?.Key && item.Key.endsWith('.json')) { promises.push(this.s3Service.getObjectAsJson(item.Key) as Promise); } }); - return { recordingInfo: await Promise.all(promises), continuationToken: undefined }; - } catch (error) { - this.logger.error(`Error getting recordings: ${error}`); - throw error; - } - } + let recordings: MeetRecordingInfo[] = await Promise.all(promises); - /** - * Retrieves all recordings for a given room. - * - * @param roomName - The name of the room. - * @param roomId - The ID of the room. - * @returns A promise that resolves to an array of MeetRecordingInfo objects. - * @throws If there is an error retrieving the recordings. - */ - //TODO: Implement getAllRecordingsByRoom method - async getAllRecordingsByRoom(roomName: string, roomId: string): Promise { - try { - // Get all recordings that match the room name and room ID from the S3 bucket - const roomNameSanitized = this.sanitizeRegExp(roomName); - const roomIdSanitized = this.sanitizeRegExp(roomId); - // Match the room name and room ID in any order - const regexPattern = `${roomNameSanitized}.*${roomIdSanitized}|${roomIdSanitized}.*${roomNameSanitized}\\.json`; - const metadatagObject = await this.s3Service.listObjects('.metadata', regexPattern); + if (status) { + // Filter recordings by status + const statusArray = status + .split(',') + .map((s) => s.trim()) + .filter(Boolean) + .map((s) => new RegExp(this.sanitizeRegExp(s))); - if (!metadatagObject.Contents || metadatagObject.Contents.length === 0) { - this.logger.verbose(`No recordings found for room ${roomName}. Returning an empty array.`); - return []; + + recordings = recordings.filter((recording) => + statusArray.some((regex) => regex.test(recording.status)) + ); } - const promises: Promise[] = []; - metadatagObject.Contents?.forEach((item) => { - promises.push(this.s3Service.getObjectAsJson(item.Key!) as Promise); - }); + this.logger.info(`Retrieved ${recordings.length} recordings.`); - return Promise.all(promises); + return { recordings, isTruncated: !!IsTruncated, nextPageToken: NextContinuationToken }; } catch (error) { this.logger.error(`Error getting recordings: ${error}`); throw error; } } - //TODO: Implement getRecording method - async getRecording(egressId: string): Promise { - const egressIdSanitized = this.sanitizeRegExp(egressId); - const regexPattern = `.*${egressIdSanitized}.*\\.json`; - const metadataObject = await this.s3Service.listObjects('.metadata', regexPattern); - - if (!metadataObject.Contents || metadataObject.Contents.length === 0) { - throw errorRecordingNotFound(egressId); - } - - const recording = (await this.s3Service.getObjectAsJson(metadataObject.Contents[0].Key!)) as MeetRecordingInfo; - return recording; - // return RecordingHelper.toRecordingInfo(recording); - } - //TODO: Implement getRecordingAsStream method async getRecordingAsStream( recordingId: string, @@ -196,7 +228,7 @@ export class RecordingService { ): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> { const RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId); - const recordingPath = RecordingHelper.extractFilename(recordingInfo); + const recordingPath = `${MEET_S3_RECORDINGS_PREFIX}/${RecordingHelper.extractFilename(recordingInfo)}`; if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`); @@ -265,25 +297,49 @@ export class RecordingService { return this.roomService.sendSignal(roomName, payload, options); } - private async getMeetRecordingInfoFromMetadata( - egressId: string + /** + * Retrieves the data required to delete a recording, including the file paths + * to be deleted and the recording's metadata information. + * + * @param recordingId - The unique identifier of the recording egress. + */ + protected async getDeletableRecordingData( + recordingId: string + ): Promise<{ filesToDelete: string[]; recordingInfo: MeetRecordingInfo }> { + const { metadataFilePath, recordingInfo } = await this.getMeetRecordingInfoFromMetadata(recordingId); + + if ( + recordingInfo.status === MeetRecordingStatus.STARTING || + recordingInfo.status === MeetRecordingStatus.ACTIVE || + recordingInfo.status === MeetRecordingStatus.ENDING + ) { + throw errorRecordingNotStopped(recordingId); + } + + const recordingPath = RecordingHelper.extractFilename(recordingInfo); + + if (!recordingPath) { + throw internalError(`Error extracting path from recording ${recordingId}`); + } + + return { filesToDelete: [metadataFilePath, recordingPath], recordingInfo }; + } + + protected async getMeetRecordingInfoFromMetadata( + recordingId: string ): Promise<{ metadataFilePath: string; recordingInfo: MeetRecordingInfo }> { - // Get the recording object from the S3 bucket - const metadataObject = await this.s3Service.listObjects('.metadata', `.*${egressId}.*.json`); - - const content = metadataObject.Contents?.[0]; - - if (!content) { - throw errorRecordingNotFound(egressId); - } - - const metadataPath = content.Key; - - if (!metadataPath) { - throw errorRecordingNotFound(egressId); - } + const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId); + const metadataPath = `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`; + this.logger.debug(`Retrieving metadata for recording ${recordingId} from ${metadataPath}`); const recordingInfo = (await this.s3Service.getObjectAsJson(metadataPath)) as MeetRecordingInfo; + + if (!recordingInfo) { + throw errorRecordingNotFound(recordingId); + } + + this.logger.verbose(`Retrieved metadata for recording ${recordingId} from ${metadataPath}`); + return { recordingInfo, metadataFilePath: metadataPath }; } @@ -302,12 +358,12 @@ export class RecordingService { * @param fileName - The name of the file (default is 'recording'). * @returns The generated file output object. */ - private generateFileOutputFromRequest(roomName: string): EncodedFileOutput { + private generateFileOutputFromRequest(roomId: string): EncodedFileOutput { // Added unique identifier to the file path for avoiding overwriting - const recordingName = `${roomName}-${uid(10)}`; + const recordingName = `${roomId}--${uid(10)}`; // Generate the file path with the openviud-meet subbucket and the recording prefix - const filepath = `${MEET_S3_SUBBUCKET}/${MEET_S3_RECORDINGS_PREFIX}/${roomName}/${recordingName}`; + const filepath = `${MEET_S3_SUBBUCKET}/${MEET_S3_RECORDINGS_PREFIX}/${roomId}/${recordingName}`; return new EncodedFileOutput({ fileType: EncodedFileType.DEFAULT_FILETYPE, @@ -316,6 +372,15 @@ export class RecordingService { }); } + /** + * Escapes special characters in a string to make it safe for use in a regular expression. + * This method ensures that characters with special meaning in regular expressions + * (e.g., `.`, `*`, `+`, `?`, `^`, `$`, `{`, `}`, `(`, `)`, `|`, `[`, `]`, `\`) are + * properly escaped. + * + * @param str - The input string to sanitize for use in a regular expression. + * @returns A new string with special characters escaped. + */ private sanitizeRegExp(str: string) { return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); } diff --git a/backend/src/services/s3.service.ts b/backend/src/services/s3.service.ts index 9f2fe28..04dcb32 100644 --- a/backend/src/services/s3.service.ts +++ b/backend/src/services/s3.service.ts @@ -2,6 +2,8 @@ import { _Object, DeleteObjectCommand, DeleteObjectCommandOutput, + DeleteObjectsCommand, + DeleteObjectsCommandOutput, GetObjectCommand, GetObjectCommandOutput, HeadObjectCommand, @@ -129,6 +131,82 @@ export class S3Service { } } + /** + * Bulk deletes objects from S3. + * @param keys Array of object keys to delete. Estos keys deben incluir el subbucket (se obtiene con getFullKey). + * @param bucket S3 bucket name (default: MEET_S3_BUCKET) + */ + async deleteObjects(keys: string[], bucket: string = MEET_S3_BUCKET): Promise { + try { + this.logger.info(`S3 delete: attempting to delete ${keys.length} objects from bucket ${bucket}`); + const command = new DeleteObjectsCommand({ + Bucket: bucket, + Delete: { + Objects: keys.map((key) => ({ Key: this.getFullKey(key) })), + Quiet: false + } + }); + console.log( + 'command', + keys.map((key) => ({ Key: key })) + ); + const result = await this.run(command); + this.logger.info(`S3 bulk delete: successfully deleted objects from bucket ${bucket}`); + return result; + } catch (error: any) { + this.logger.error(`S3 bulk delete: error deleting objects in bucket ${bucket}: ${error}`); + throw internalError(error); + } + } + + /** + * List objects with pagination. + * + * @param additionalPrefix Additional prefix relative to the subbucket. + * Por ejemplo, para listar metadata se pasa ".metadata/". + * @param searchPattern Optional regex pattern to filter keys. + * @param bucket Optional bucket name. + * @param maxKeys Maximum number of objects to return. + * @param continuationToken Token to retrieve the next page. + * + * @returns The ListObjectsV2CommandOutput with Keys and NextContinuationToken. + */ + async listObjectsPaginated( + additionalPrefix = '', + maxKeys = 50, + continuationToken?: string, + searchPattern = '', + bucket: string = MEET_S3_BUCKET + ): Promise { + // Se construye el prefijo completo combinando el subbucket y el additionalPrefix. + // Ejemplo: si s3Subbucket es "recordings" y additionalPrefix es ".metadata/", + // se listarán los objetos con key que empiece por "recordings/.metadata/". + const basePrefix = this.getFullKey(additionalPrefix); + this.logger.verbose(`S3 listObjectsPaginated: listing objects with prefix "${basePrefix}"`); + + const command = new ListObjectsV2Command({ + Bucket: bucket, + Prefix: basePrefix, + MaxKeys: maxKeys, + ContinuationToken: continuationToken + }); + + try { + const response: ListObjectsV2CommandOutput = await this.s3.send(command); + + // Si se ha proporcionado searchPattern, se filtran los resultados. + if (searchPattern) { + const regex = new RegExp(searchPattern); + response.Contents = (response.Contents || []).filter((item) => item.Key && regex.test(item.Key)); + } + + return response; + } catch (error: any) { + this.logger.error(`S3 listObjectsPaginated: error listing objects with prefix "${basePrefix}": ${error}`); + throw internalError(error); + } + } + /** * Lists all objects in an S3 bucket with optional subbucket and search pattern filtering. * @@ -207,19 +285,17 @@ export class S3Service { } async getObjectAsJson(name: string, bucket: string = MEET_S3_BUCKET): Promise { - const fullKey = this.getFullKey(name); - try { - const obj = await this.getObject(fullKey, bucket); + const obj = await this.getObject(name, bucket); const str = await obj.Body?.transformToString(); const parsed = JSON.parse(str as string); - this.logger.info( - `S3 getObjectAsJson: successfully retrieved and parsed object ${fullKey} from bucket ${bucket}` + this.logger.verbose( + `S3 getObjectAsJson: successfully retrieved and parsed object ${name} from bucket ${bucket}` ); return parsed; } catch (error: any) { if (error.name === 'NoSuchKey') { - this.logger.warn(`S3 getObjectAsJson: object '${fullKey}' does not exist in bucket ${bucket}`); + this.logger.warn(`S3 getObjectAsJson: object '${name}' does not exist in bucket ${bucket}`); return undefined; } @@ -227,7 +303,7 @@ export class S3Service { throw errorS3NotAvailable(error); } - this.logger.error(`S3 getObjectAsJson: error retrieving object ${fullKey} from bucket ${bucket}: ${error}`); + this.logger.error(`S3 getObjectAsJson: error retrieving object ${name} from bucket ${bucket}: ${error}`); throw internalError(error); } } @@ -237,10 +313,8 @@ export class S3Service { bucket: string = MEET_S3_BUCKET, range?: { start: number; end: number } ): Promise { - const fullKey = this.getFullKey(name); - try { - const obj = await this.getObject(fullKey, bucket, range); + const obj = await this.getObject(name, bucket, range); if (obj.Body) { this.logger.info( @@ -253,7 +327,7 @@ export class S3Service { } } catch (error: any) { this.logger.error( - `S3 getObjectAsStream: error retrieving stream for object ${fullKey} from bucket ${bucket}: ${error}` + `S3 getObjectAsStream: error retrieving stream for object ${name} from bucket ${bucket}: ${error}` ); if (error.code === 'ECONNREFUSED') { @@ -288,11 +362,18 @@ export class S3Service { } /** - * Prepares a full key path by prefixing the object's name with the subbucket. - * All operations are performed under MEET_S3_BUCKET/MEET_S3_SUBBUCKET. + * Constructs the full key for an S3 object by ensuring it includes the specified sub-bucket prefix. + * If the provided name already starts with the prefix, it is returned as-is. + * Otherwise, the prefix is prepended to the name. */ protected getFullKey(name: string): string { - return `${MEET_S3_SUBBUCKET}/${name}`; + const prefix = `${MEET_S3_SUBBUCKET}`; + + if (name.startsWith(prefix)) { + return name; + } + + return `${prefix}/${name}`; } protected async getObject(