From baec69c3dbd0bc780c2610cf8dbc3cf0b64ef2ce Mon Sep 17 00:00:00 2001 From: Carlos Santos <4a.santos@gmail.com> Date: Wed, 19 Mar 2025 19:31:26 +0100 Subject: [PATCH] backend: Remove deprecated room and signal models; enhance room service with new status signaling --- .../controllers/livekit-webhook.controller.ts | 2 + .../src/controllers/recording.controller.ts | 141 +++++++----- backend/src/helpers/index.ts | 4 +- .../helpers/ov-components-adapter.helper.ts | 96 ++++++++ backend/src/helpers/recording.helper.ts | 89 +++----- backend/src/models/index.ts | 2 - backend/src/models/room.model.ts | 6 - backend/src/models/signal.model.ts | 10 - .../src/services/livekit-webhook.service.ts | 154 ++++++------- backend/src/services/livekit.service.ts | 44 +++- backend/src/services/mutex.service.ts | 36 ++- .../src/services/openvidu-webhook.service.ts | 74 +++--- backend/src/services/recording.service.ts | 212 +++++++++++------- backend/src/services/room.service.ts | 21 ++ 14 files changed, 548 insertions(+), 343 deletions(-) create mode 100644 backend/src/helpers/ov-components-adapter.helper.ts delete mode 100644 backend/src/models/room.model.ts delete mode 100644 backend/src/models/signal.model.ts diff --git a/backend/src/controllers/livekit-webhook.controller.ts b/backend/src/controllers/livekit-webhook.controller.ts index 96c5c9b..daaba55 100644 --- a/backend/src/controllers/livekit-webhook.controller.ts +++ b/backend/src/controllers/livekit-webhook.controller.ts @@ -27,6 +27,8 @@ export const lkWebhookHandler = async (req: Request, res: Response) => { switch (eventType) { case 'egress_started': + await lkWebhookService.handleEgressStarted(egressInfo!); + break; case 'egress_updated': await lkWebhookService.handleEgressUpdated(egressInfo!); break; diff --git a/backend/src/controllers/recording.controller.ts b/backend/src/controllers/recording.controller.ts index da64ee6..ba36864 100644 --- a/backend/src/controllers/recording.controller.ts +++ b/backend/src/controllers/recording.controller.ts @@ -7,17 +7,13 @@ import { container } from '../config/dependency-injector.config.js'; export const startRecording = async (req: Request, res: Response) => { const logger = container.get(LoggerService); - const roomName = req.body.roomName; - - if (!roomName) { - return res.status(400).json({ name: 'Recording Error', message: 'Room name is required for this operation' }); - } + const { roomId } = req.body; try { - logger.info(`Starting recording in ${roomName}`); + logger.info(`Starting recording in ${roomId}`); const recordingService = container.get(RecordingService); - const recordingInfo = await recordingService.startRecording(roomName); + const recordingInfo = await recordingService.startRecording(roomId); return res.status(200).json(recordingInfo); } catch (error) { if (error instanceof OpenViduMeetError) { @@ -29,16 +25,77 @@ export const startRecording = async (req: Request, res: Response) => { } }; +export const getRecordings = async (req: Request, res: Response) => { + const logger = container.get(LoggerService); + const recordingService = container.get(RecordingService); + + try { + logger.info('Getting all recordings'); + + const { status, page, limit } = req.query; + + // const continuationToken = req.query.continuationToken as string; + const response = await recordingService.getAllRecordings(); + return res + .status(200) + .json({ recordings: response.recordingInfo, continuationToken: response.continuationToken }); + } catch (error) { + if (error instanceof OpenViduMeetError) { + logger.error(`Error getting all recordings: ${error.message}`); + return res.status(error.statusCode).json({ name: error.name, message: error.message }); + } + + return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error getting recordings' }); + } +}; + +export const bulkDeleteRecordings = async (req: Request, res: Response) => { + const logger = container.get(LoggerService); + + try { + const recordingIds = req.body.recordingIds; + logger.info(`Deleting recordings: ${recordingIds}`); + const recordingService = container.get(RecordingService); + + // TODO: Check role to determine if the request is from an admin or a participant + const role = req.body.payload.metadata.role; + await recordingService.bulkDeleteRecordings(recordingIds, role); + + return res.status(204).json(); + } catch (error) { + if (error instanceof OpenViduMeetError) { + logger.error(`Error deleting recordings: ${error.message}`); + return res.status(error.statusCode).json({ name: error.name, message: error.message }); + } + + return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error deleting recordings' }); + } +}; + +export const getRecording = async (req: Request, res: Response) => { + const logger = container.get(LoggerService); + + try { + const recordingId = req.params.recordingId; + logger.info(`Getting recording ${recordingId}`); + const recordingService = container.get(RecordingService); + + const recordingInfo = await recordingService.getRecording(recordingId); + return res.status(200).json(recordingInfo); + } catch (error) { + if (error instanceof OpenViduMeetError) { + logger.error(`Error getting recording: ${error.message}`); + return res.status(error.statusCode).json({ name: error.name, message: error.message }); + } + + return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error getting recording' }); + } +}; + export const stopRecording = async (req: Request, res: Response) => { const logger = container.get(LoggerService); const recordingId = req.params.recordingId; - if (!recordingId) { - return res - .status(400) - .json({ name: 'Recording Error', message: 'Recording ID is required for this operation' }); - } - try { logger.info(`Stopping recording ${recordingId}`); const recordingService = container.get(RecordingService); @@ -55,44 +112,36 @@ export const stopRecording = async (req: Request, res: Response) => { } }; -/** - * Endpoint only available for the admin user - * !WARNING: This will be removed in future versions - */ -export const getAllRecordings = async (req: Request, res: Response) => { +export const deleteRecording = async (req: Request, res: Response) => { const logger = container.get(LoggerService); + const recordingId = req.params.recordingId; try { - logger.info('Getting all recordings'); + logger.info(`Deleting recording ${recordingId}`); const recordingService = container.get(RecordingService); - // const continuationToken = req.query.continuationToken as string; - const response = await recordingService.getAllRecordings(); - return res - .status(200) - .json({ recordings: response.recordingInfo, continuationToken: response.continuationToken }); + // TODO: Check role to determine if the request is from an admin or a participant + const role = req.body.payload.metadata.role; + const recordingInfo = await recordingService.deleteRecording(recordingId, role); + + return res.status(204).json(recordingInfo); } catch (error) { if (error instanceof OpenViduMeetError) { - logger.error(`Error getting all recordings: ${error.message}`); + logger.error(`Error deleting recording: ${error.message}`); return res.status(error.statusCode).json({ name: error.name, message: error.message }); } - return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error getting recordings' }); + return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error deleting recording' }); } }; +// Internal Recording methods export const streamRecording = async (req: Request, res: Response) => { const logger = container.get(LoggerService); const recordingId = req.params.recordingId; const range = req.headers.range; - if (!recordingId) { - return res - .status(400) - .json({ name: 'Recording Error', message: 'Recording ID is required for this operation' }); - } - try { logger.info(`Streaming recording ${recordingId}`); const recordingService = container.get(RecordingService); @@ -132,31 +181,3 @@ export const streamRecording = async (req: Request, res: Response) => { return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error streaming recording' }); } }; - -export const deleteRecording = async (req: Request, res: Response) => { - const logger = container.get(LoggerService); - const recordingId = req.params.recordingId; - - if (!recordingId) { - return res - .status(400) - .json({ name: 'Recording Error', message: 'Recording ID is required for this operation' }); - } - - try { - logger.info(`Deleting recording ${recordingId}`); - const recordingService = container.get(RecordingService); - - const isRequestedByAdmin = req.url.includes('admin'); - const recordingInfo = await recordingService.deleteRecording(recordingId, isRequestedByAdmin); - - return res.status(204).json(recordingInfo); - } catch (error) { - if (error instanceof OpenViduMeetError) { - logger.error(`Error deleting recording: ${error.message}`); - return res.status(error.statusCode).json({ name: error.name, message: error.message }); - } - - return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error deleting recording' }); - } -}; diff --git a/backend/src/helpers/index.ts b/backend/src/helpers/index.ts index d64950a..8b91c0a 100644 --- a/backend/src/helpers/index.ts +++ b/backend/src/helpers/index.ts @@ -1 +1,3 @@ -export * from './recording.helper.js'; \ No newline at end of file +export * from './recording.helper.js'; +export * from './ov-components-adapter.helper.js'; +export * from './room.helper.js'; \ No newline at end of file diff --git a/backend/src/helpers/ov-components-adapter.helper.ts b/backend/src/helpers/ov-components-adapter.helper.ts new file mode 100644 index 0000000..fbf361b --- /dev/null +++ b/backend/src/helpers/ov-components-adapter.helper.ts @@ -0,0 +1,96 @@ +import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce'; +import { SendDataOptions } from 'livekit-server-sdk'; + +const enum OpenViduComponentsDataTopic { + CHAT = 'chat', + RECORDING_STARTING = 'recordingStarting', + RECORDING_STARTED = 'recordingStarted', + RECORDING_STOPPING = 'recordingStopping', + RECORDING_STOPPED = 'recordingStopped', + RECORDING_DELETED = 'recordingDeleted', + RECORDING_FAILED = 'recordingFailed', + ROOM_STATUS = 'roomStatus' +} + +export class OpenViduComponentsAdapterHelper { + static generateRecordingSignal(recordingInfo: MeetRecordingInfo) { + const options: SendDataOptions = { + destinationSids: [], + topic: OpenViduComponentsAdapterHelper.generateDataTopic(recordingInfo) + }; + const payload = OpenViduComponentsAdapterHelper.parseRecordingInfoToOpenViduComponents(recordingInfo); + + return { payload, options }; + } + + static generateRoomStatusSignal(isRecordingStarted: boolean, participantSid?: string) { + const payload = { + isRecordingStarted + }; + + const options = { + topic: OpenViduComponentsDataTopic.ROOM_STATUS, + destinationSids: participantSid ? [participantSid] : [] + }; + return { + payload, + options + }; + } + + private static parseRecordingInfoToOpenViduComponents(info: MeetRecordingInfo) { + return { + id: info.recordingId, + roomName: info.details ?? '', + roomId: info.roomId, + outputMode: info.outputMode, + status: this.mapRecordingStatus(info.status), + filename: info.filename, + startedAt: info.startDate, + endedAt: info.endDate, + duration: info.duration, + size: info.size, + location: undefined + }; + } + + private static generateDataTopic(info: MeetRecordingInfo) { + switch (info.status) { + case MeetRecordingStatus.STARTING: + return OpenViduComponentsDataTopic.RECORDING_STARTING; + case MeetRecordingStatus.ACTIVE: + return OpenViduComponentsDataTopic.RECORDING_STARTED; + case MeetRecordingStatus.ENDING: + return OpenViduComponentsDataTopic.RECORDING_STOPPING; + case MeetRecordingStatus.COMPLETE: + return OpenViduComponentsDataTopic.RECORDING_STOPPED; + case MeetRecordingStatus.FAILED: + case MeetRecordingStatus.ABORTED: + return OpenViduComponentsDataTopic.RECORDING_FAILED; + case MeetRecordingStatus.LIMITED_REACHED: + return OpenViduComponentsDataTopic.RECORDING_STOPPED; + default: + return OpenViduComponentsDataTopic.RECORDING_FAILED; + } + } + + private static mapRecordingStatus(status: MeetRecordingStatus) { + switch (status) { + case MeetRecordingStatus.STARTING: + return 'STARTING'; + case MeetRecordingStatus.ACTIVE: + return 'STARTED'; + case MeetRecordingStatus.ENDING: + return 'STOPPING'; + case MeetRecordingStatus.COMPLETE: + return 'READY'; + case MeetRecordingStatus.FAILED: + case MeetRecordingStatus.ABORTED: + return 'FAILED'; + case MeetRecordingStatus.LIMITED_REACHED: + return 'READY'; + default: + return 'FAILED'; + } + } +} diff --git a/backend/src/helpers/recording.helper.ts b/backend/src/helpers/recording.helper.ts index 9f5d7c6..6974fed 100644 --- a/backend/src/helpers/recording.helper.ts +++ b/backend/src/helpers/recording.helper.ts @@ -1,28 +1,30 @@ import { EgressInfo } from 'livekit-server-sdk'; -import { RecordingInfo, RecordingOutputMode, RecordingStatus } from '@typings-ce'; +import { MeetRecordingInfo, MeetRecordingOutputMode, MeetRecordingStatus } from '@typings-ce'; import { EgressStatus } from '@livekit/protocol'; -import { DataTopic } from '../models/signal.model.js'; export class RecordingHelper { - static toRecordingInfo(egressInfo: EgressInfo): RecordingInfo { + static toRecordingInfo(egressInfo: EgressInfo): MeetRecordingInfo { const status = RecordingHelper.extractOpenViduStatus(egressInfo.status); const size = RecordingHelper.extractSize(egressInfo); const outputMode = RecordingHelper.extractOutputMode(egressInfo); const duration = RecordingHelper.extractDuration(egressInfo); - const startedAt = RecordingHelper.extractCreatedAt(egressInfo); - const endTimeInMilliseconds = RecordingHelper.extractEndedAt(egressInfo); + const startDateMs = RecordingHelper.extractStartDate(egressInfo); + const endDateMs = RecordingHelper.extractEndDate(egressInfo); const filename = RecordingHelper.extractFilename(egressInfo); + const { egressId, roomName, errorCode, error, details } = egressInfo; return { - id: egressInfo.egressId, - roomName: egressInfo.roomName, - roomId: egressInfo.roomId, + recordingId: egressId, + roomId: roomName, outputMode, status, filename, - creationDate: startedAt, - endDate: endTimeInMilliseconds, + startDate: startDateMs, + endDate: endDateMs, duration, - size + size, + errorCode, + error, + details: details }; } @@ -36,40 +38,24 @@ export class RecordingHelper { return fileResults.length > 0 && streamResults.length === 0; } - static extractOpenViduStatus(status: EgressStatus | undefined): RecordingStatus { + static extractOpenViduStatus(status: EgressStatus | undefined): MeetRecordingStatus { switch (status) { case EgressStatus.EGRESS_STARTING: - return RecordingStatus.STARTING; + return MeetRecordingStatus.STARTING; case EgressStatus.EGRESS_ACTIVE: - return RecordingStatus.STARTED; + return MeetRecordingStatus.ACTIVE; case EgressStatus.EGRESS_ENDING: - return RecordingStatus.STOPPED; + return MeetRecordingStatus.ENDING; case EgressStatus.EGRESS_COMPLETE: - return RecordingStatus.READY; + return MeetRecordingStatus.COMPLETE; case EgressStatus.EGRESS_FAILED: + return MeetRecordingStatus.FAILED; case EgressStatus.EGRESS_ABORTED: + return MeetRecordingStatus.ABORTED; case EgressStatus.EGRESS_LIMIT_REACHED: - return RecordingStatus.FAILED; + return MeetRecordingStatus.LIMITED_REACHED; default: - return RecordingStatus.FAILED; - } - } - - static getDataTopicFromStatus(egressInfo: EgressInfo): DataTopic { - const status = RecordingHelper.extractOpenViduStatus(egressInfo.status); - - switch (status) { - case RecordingStatus.STARTING: - return DataTopic.RECORDING_STARTING; - case RecordingStatus.STARTED: - return DataTopic.RECORDING_STARTED; - case RecordingStatus.STOPPED: - case RecordingStatus.READY: - return DataTopic.RECORDING_STOPPED; - case RecordingStatus.FAILED: - return DataTopic.RECORDING_FAILED; - default: - return DataTopic.RECORDING_FAILED; + return MeetRecordingStatus.FAILED; } } @@ -81,33 +67,30 @@ export class RecordingHelper { * @param egressInfo - The egress information containing the roomComposite flag. * @returns The extracted OpenVidu output mode. */ - static extractOutputMode(egressInfo: EgressInfo): RecordingOutputMode { - if (egressInfo.request.case === 'roomComposite') { - return RecordingOutputMode.COMPOSED; - } else { - return RecordingOutputMode.INDIVIDUAL; - } + static extractOutputMode(egressInfo: EgressInfo): MeetRecordingOutputMode { + // if (egressInfo.request.case === 'roomComposite') { + // return MeetRecordingOutputMode.COMPOSED; + // } else { + // return MeetRecordingOutputMode.INDIVIDUAL; + // } + return MeetRecordingOutputMode.COMPOSED; } - static extractFilename(recordingInfo: RecordingInfo): string | undefined; + static extractFilename(recordingInfo: MeetRecordingInfo): string | undefined; static extractFilename(egressInfo: EgressInfo): string | undefined; - static extractFilename(info: RecordingInfo | EgressInfo): string | undefined { + static extractFilename(info: MeetRecordingInfo | EgressInfo): string | undefined { if (!info) return undefined; if ('request' in info) { // EgressInfo return info.fileResults?.[0]?.filename.split('/').pop(); } else { - // RecordingInfo - const { roomName, filename, roomId } = info; + // MeetRecordingInfo + const { filename, roomId } = info; - if (!filename) { - return undefined; - } - - return roomName ? `${roomName}-${roomId}/${filename}` : filename; + return `${roomId}/${filename}`; } } @@ -128,7 +111,7 @@ export class RecordingHelper { * @param egressInfo - The EgressInfo object containing the endedAt value. * @returns The endedAt value converted to milliseconds. */ - static extractEndedAt(egressInfo: EgressInfo): number { + static extractEndDate(egressInfo: EgressInfo): number { return this.toMilliseconds(Number(egressInfo.endedAt ?? 0)); } @@ -138,7 +121,7 @@ export class RecordingHelper { * @param egressInfo The EgressInfo object from which to extract the creation timestamp. * @returns The creation timestamp in milliseconds. */ - static extractCreatedAt(egressInfo: EgressInfo): number { + static extractStartDate(egressInfo: EgressInfo): number { const { startedAt, updatedAt } = egressInfo; const createdAt = startedAt && Number(startedAt) !== 0 ? startedAt : (updatedAt ?? 0); return this.toMilliseconds(Number(createdAt)); diff --git a/backend/src/models/index.ts b/backend/src/models/index.ts index 5bee462..02116f0 100644 --- a/backend/src/models/index.ts +++ b/backend/src/models/index.ts @@ -1,4 +1,2 @@ -export * from './room.model.js'; export * from './error.model.js'; -export * from './signal.model.js'; export * from './redis.model.js'; diff --git a/backend/src/models/room.model.ts b/backend/src/models/room.model.ts deleted file mode 100644 index 7cce003..0000000 --- a/backend/src/models/room.model.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { RecordingInfo } from '@typings-ce'; - -export interface RoomStatusData { - isRecordingStarted: boolean; - recordingList: RecordingInfo[]; -} diff --git a/backend/src/models/signal.model.ts b/backend/src/models/signal.model.ts deleted file mode 100644 index 7f83d96..0000000 --- a/backend/src/models/signal.model.ts +++ /dev/null @@ -1,10 +0,0 @@ -export enum DataTopic { - CHAT = 'chat', - RECORDING_STARTING = 'recordingStarting', - RECORDING_STARTED = 'recordingStarted', - RECORDING_STOPPING = 'recordingStopping', - RECORDING_STOPPED = 'recordingStopped', - RECORDING_DELETED = 'recordingDeleted', - RECORDING_FAILED = 'recordingFailed', - ROOM_STATUS = 'roomStatus' -} diff --git a/backend/src/services/livekit-webhook.service.ts b/backend/src/services/livekit-webhook.service.ts index d581e65..b218918 100644 --- a/backend/src/services/livekit-webhook.service.ts +++ b/backend/src/services/livekit-webhook.service.ts @@ -1,28 +1,27 @@ import { inject, injectable } from '../config/dependency-injector.config.js'; -import { EgressInfo, ParticipantInfo, Room, SendDataOptions, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk'; +import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk'; import { RecordingHelper } from '../helpers/recording.helper.js'; -import { DataTopic } from '../models/signal.model.js'; import { LiveKitService } from './livekit.service.js'; -import { RecordingInfo, RecordingStatus } from '@typings-ce'; -import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, MEET_NAME_ID } from '../environment.js'; +import { MeetRecordingInfo } from '@typings-ce'; +import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, MEET_NAME_ID, MEET_S3_RECORDINGS_PREFIX } from '../environment.js'; import { LoggerService } from './logger.service.js'; import { RoomService } from './room.service.js'; import { S3Service } from './s3.service.js'; -import { RoomStatusData } from '../models/room.model.js'; import { RecordingService } from './recording.service.js'; import { OpenViduWebhookService } from './openvidu-webhook.service.js'; +import { MutexService } from './mutex.service.js'; @injectable() export class LivekitWebhookService { - private webhookReceiver: WebhookReceiver; - + protected webhookReceiver: WebhookReceiver; constructor( @inject(S3Service) protected s3Service: S3Service, @inject(RecordingService) protected recordingService: RecordingService, @inject(LiveKitService) protected livekitService: LiveKitService, @inject(RoomService) protected roomService: RoomService, - @inject(LoggerService) protected logger: LoggerService, - @inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService + @inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService, + @inject(MutexService) protected mutexService: MutexService, + @inject(LoggerService) protected logger: LoggerService ) { this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET); } @@ -80,35 +79,12 @@ export class LivekitWebhookService { } } + async handleEgressStarted(egressInfo: EgressInfo) { + await this.processRecordingEgress(egressInfo, 'started'); + } + async handleEgressUpdated(egressInfo: EgressInfo) { - try { - const isRecording: boolean = RecordingHelper.isRecordingEgress(egressInfo); - - if (!isRecording) return; - - const { roomName } = egressInfo; - - let recordingInfo: RecordingInfo | undefined = undefined; - - this.logger.info(`Recording egress '${egressInfo.egressId}' updated: ${egressInfo.status}`); - const topic: DataTopic = RecordingHelper.getDataTopicFromStatus(egressInfo); - recordingInfo = RecordingHelper.toRecordingInfo(egressInfo); - - // Add recording metadata - const metadataPath = this.generateMetadataPath(recordingInfo); - const promises = [ - this.s3Service.saveObject(metadataPath, recordingInfo), - this.roomService.sendSignal(roomName, recordingInfo, { topic }) - ]; - - if(recordingInfo.status === RecordingStatus.STARTED) { - promises.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo)); - } - - await Promise.all(promises); - } catch (error) { - this.logger.warn(`Error sending data on egress updated: ${error}`); - } + await this.processRecordingEgress(egressInfo, 'updated'); } /** @@ -117,27 +93,7 @@ export class LivekitWebhookService { * @param egressInfo - Information about the ended recording egress. */ async handleEgressEnded(egressInfo: EgressInfo) { - try { - const isRecording: boolean = RecordingHelper.isRecordingEgress(egressInfo); - - if (!isRecording) return; - - const { roomName } = egressInfo; - let payload: RecordingInfo | undefined = undefined; - - const topic: DataTopic = DataTopic.RECORDING_STOPPED; - payload = RecordingHelper.toRecordingInfo(egressInfo); - - // Update recording metadata - const metadataPath = this.generateMetadataPath(payload); - await Promise.all([ - this.s3Service.saveObject(metadataPath, payload), - this.roomService.sendSignal(roomName, payload, { topic }), - this.openViduWebhookService.sendRecordingStoppedWebhook(payload) - ]); - } catch (error) { - this.logger.warn(`Error sending data on egress ended: ${error}`); - } + await this.processRecordingEgress(egressInfo, 'ended'); } /** @@ -149,12 +105,12 @@ export class LivekitWebhookService { */ async handleParticipantJoined(room: Room, participant: ParticipantInfo) { try { - // Do not send status signal to egress participants + // Skip if the participant is an egress participant if (this.livekitService.isEgressParticipant(participant)) { return; } - await this.sendStatusSignal(room.name, room.sid, participant.sid); + await this.roomService.sendRoomStatusSignalToOpenViduComponents(room.name, participant.sid); } catch (error) { this.logger.error(`Error sending data on participant joined: ${error}`); } @@ -169,7 +125,7 @@ export class LivekitWebhookService { * @param {Room} room - The room object that has finished. * @returns {Promise} A promise that resolves when the webhook has been sent. */ - async handleRoomFinished(room: Room) { + async handleMeetingFinished(room: Room) { try { await this.openViduWebhookService.sendRoomFinishedWebhook(room); } catch (error) { @@ -177,30 +133,66 @@ export class LivekitWebhookService { } } + /** + * Processes a recording egress event by updating metadata, sending webhook notifications, + * and performing necessary cleanup actions based on the webhook action type. + * + * @param egressInfo - The information about the egress event to process. + * @param webhookAction - The type of webhook action to handle. Can be 'started', 'updated', or 'ended'. + * @returns A promise that resolves when all processing tasks are completed. + */ + protected async processRecordingEgress( + egressInfo: EgressInfo, + webhookAction: 'started' | 'updated' | 'ended' + ): Promise { + if (!RecordingHelper.isRecordingEgress(egressInfo)) return; - private async sendStatusSignal(roomName: string, roomId: string, participantSid: string) { - // Get recording list - const recordingInfo = await this.recordingService.getAllRecordingsByRoom(roomName, roomId); + this.logger.debug(`Processing recording ${webhookAction} webhook.`); - // Check if recording is started in the room - const isRecordingStarted = recordingInfo.some((rec) => rec.status === RecordingStatus.STARTED); + const recordingInfo: MeetRecordingInfo = RecordingHelper.toRecordingInfo(egressInfo); + const metadataPath = this.generateMetadataPath(recordingInfo); + const { roomId, recordingId, status } = recordingInfo; - // Construct the payload to send to the participant - const payload: RoomStatusData = { - isRecordingStarted, - recordingList: recordingInfo - }; - const signalOptions: SendDataOptions = { - topic: DataTopic.ROOM_STATUS, - destinationSids: participantSid ? [participantSid] : [] - }; - await this.roomService.sendSignal(roomName, payload, signalOptions); + this.logger.debug(`Recording '${recordingId}' for room '${roomId}' is in status '${status}'`); + + const promises: Promise[] = []; + + try { + // Update recording metadata + promises.push( + this.s3Service.saveObject(metadataPath, recordingInfo), + this.recordingService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo) + ); + + // Send webhook notification + switch (webhookAction) { + case 'started': + promises.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo)); + break; + case 'updated': + promises.push(this.openViduWebhookService.sendRecordingUpdatedWebhook(recordingInfo)); + break; + case 'ended': + promises.push( + this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo), + this.recordingService.releaseRoomRecordingActiveLock(roomId) + ); + break; + } + + // Wait for all promises to resolve + await Promise.all(promises); + } catch (error) { + this.logger.warn( + `Error sending recording ${webhookAction} webhook for egress ${egressInfo.egressId}: ${error}` + ); + } } - private generateMetadataPath(payload: RecordingInfo): string { - const metadataFilename = `${payload.roomName}-${payload.roomId}`; - const recordingFilename = payload.filename?.split('.')[0]; - const egressId = payload.id; - return `.metadata/${metadataFilename}/${recordingFilename}_${egressId}.json`; + protected generateMetadataPath(recordingInfo: MeetRecordingInfo): string { + const { roomId, recordingId } = recordingInfo; + // Remove file extension from filename + const recordingFilename = recordingInfo.filename?.split('.')[0]; + return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${recordingFilename}-${recordingId}.json`; } } diff --git a/backend/src/services/livekit.service.ts b/backend/src/services/livekit.service.ts index e63892f..80e8f95 100644 --- a/backend/src/services/livekit.service.ts +++ b/backend/src/services/livekit.service.ts @@ -96,7 +96,7 @@ export class LiveKitService { try { return await this.roomClient.getParticipant(roomName, participantName); } catch (error) { - this.logger.error(`Error getting participant ${error}`); + this.logger.warn(`Participant ${participantName} not found in room ${roomName} ${error}`); throw internalError(`Error getting participant: ${error}`); } } @@ -170,10 +170,50 @@ export class LiveKitService { } } - async getEgress(options: ListEgressOptions): Promise { + /** + * Retrieves a list of egress information based on the provided options. + * + * @param {ListEgressOptions} options - The options to filter the egress list. + * @returns {Promise} A promise that resolves to an array of EgressInfo objects. + * @throws Will throw an error if there is an issue retrieving the egress information. + */ + async getEgress(roomName?:string, egressId?: string): Promise { try { + const options: ListEgressOptions = { + roomName, + egressId, + }; return await this.egressClient.listEgress(options); } catch (error: any) { + if (error.message.includes('404')) { + return []; + } + + this.logger.error(`Error getting egress: ${JSON.stringify(error)}`); + throw internalError(`Error getting egress: ${error}`); + } + } + + /** + * Retrieves a list of active egress information based on the provided egress ID. + * + * @param egressId - The unique identifier of the egress to retrieve. + * @returns A promise that resolves to an array of `EgressInfo` objects representing the active egress. + * @throws Will throw an error if there is an issue retrieving the egress information. + */ + async getActiveEgress(roomName?: string, egressId?: string): Promise { + try { + const options: ListEgressOptions = { + roomName, + egressId, + active: true + }; + return await this.egressClient.listEgress(options); + } catch (error: any) { + if (error.message.includes('404')) { + return []; + } + this.logger.error(`Error getting egress: ${JSON.stringify(error)}`); throw internalError(`Error getting egress: ${error}`); } diff --git a/backend/src/services/mutex.service.ts b/backend/src/services/mutex.service.ts index 7d574ae..49b962e 100644 --- a/backend/src/services/mutex.service.ts +++ b/backend/src/services/mutex.service.ts @@ -1,15 +1,22 @@ import Redlock, { Lock } from 'redlock'; import { RedisService } from './redis.service.js'; import { inject, injectable } from 'inversify'; +import ms from 'ms'; +import { LoggerService } from './logger.service.js'; +export type RedisLock = Lock; @injectable() export class MutexService { protected redlockWithoutRetry: Redlock; protected locks: Map; - protected readonly TTL_MS = 10_000; - protected LOCK_KEY_PREFIX = 'ov_meet_lock:' + protected readonly TTL_MS = ms('1m'); + protected LOCK_KEY_PREFIX = 'ov_meet_lock:'; - constructor(@inject(RedisService) protected redisService: RedisService) { + constructor( + @inject(RedisService) protected redisService: RedisService, + @inject(LoggerService) protected logger: LoggerService + ) { + // Create a Redlock instance with no retry strategy this.redlockWithoutRetry = this.redisService.createRedlock(0); this.locks = new Map(); } @@ -21,13 +28,15 @@ export class MutexService { * @returns A Promise that resolves to the acquired Lock object. */ async acquire(resource: string, ttl: number = this.TTL_MS): Promise { - resource = this.LOCK_KEY_PREFIX + resource; + const key = this.LOCK_KEY_PREFIX + resource; try { - const lock = await this.redlockWithoutRetry.acquire([resource], ttl); - this.locks.set(resource, lock); + this.logger.debug(`Acquiring lock for resource: ${resource}`); + const lock = await this.redlockWithoutRetry.acquire([key], ttl); + this.locks.set(key, lock); return lock; } catch (error) { + this.logger.error('Error acquiring lock:', error); return null; } } @@ -39,12 +48,19 @@ export class MutexService { * @returns A Promise that resolves when the lock is released. */ async release(resource: string): Promise { - resource = this.LOCK_KEY_PREFIX + resource; - const lock = this.locks.get(resource); + const key = this.LOCK_KEY_PREFIX + resource; + const lock = this.locks.get(key); if (lock) { - await lock.release(); - this.locks.delete(resource); + this.logger.debug(`Releasing lock for resource: ${resource}`); + + try { + await lock.release(); + } catch (error) { + this.logger.error(`Error releasing lock for key ${key}:`, error); + } finally { + this.locks.delete(key); + } } } } diff --git a/backend/src/services/openvidu-webhook.service.ts b/backend/src/services/openvidu-webhook.service.ts index 5644573..ea7c488 100644 --- a/backend/src/services/openvidu-webhook.service.ts +++ b/backend/src/services/openvidu-webhook.service.ts @@ -3,56 +3,56 @@ import { inject, injectable } from '../config/dependency-injector.config.js'; import { Room } from 'livekit-server-sdk'; import { LoggerService } from './logger.service.js'; import { MEET_API_KEY, MEET_WEBHOOK_ENABLED, MEET_WEBHOOK_URL } from '../environment.js'; -import { OpenViduWebhookEvent, OpenViduWebhookEventType, RecordingInfo } from '@typings-ce'; +import { MeetWebhookEvent, MeetWebhookEventType, MeetRecordingInfo, MeetWebhookPayload } from '@typings-ce'; @injectable() export class OpenViduWebhookService { constructor(@inject(LoggerService) protected logger: LoggerService) {} + // TODO: Implement Room webhooks async sendRoomFinishedWebhook(room: Room) { - const data: OpenViduWebhookEvent = { - event: OpenViduWebhookEventType.ROOM_FINISHED, - creationDate: Date.now(), - data: { - roomName: room.name - } - }; - await this.sendWebhookEvent(data); + // try { + // await this.sendWebhookEvent(MeetWebhookEventType.ROOM_FINISHED, data); + // } catch (error) { + // this.logger.error(`Error sending room finished webhook: ${error}`); + // } } - async sendRecordingStartedWebhook(recordingInfo: RecordingInfo) { - const data: OpenViduWebhookEvent = { - event: OpenViduWebhookEventType.RECORDING_STARTED, - creationDate: Date.now(), - data: { - recordingId: recordingInfo.id, - filename: recordingInfo.filename, - roomName: recordingInfo.roomName, - status: recordingInfo.status - } - }; - await this.sendWebhookEvent(data); + async sendRecordingStartedWebhook(recordingInfo: MeetRecordingInfo) { + try { + await this.sendWebhookEvent(MeetWebhookEventType.RECORDING_STARTED, recordingInfo); + } catch (error) { + this.logger.error(`Error sending recording started webhook: ${error}`); + } } - async sendRecordingStoppedWebhook(recordingInfo: RecordingInfo) { - const data: OpenViduWebhookEvent = { - event: OpenViduWebhookEventType.RECORDING_STOPPED, - creationDate: Date.now(), - data: { - recordingId: recordingInfo.id, - filename: recordingInfo.filename, - roomName: recordingInfo.roomName, - status: recordingInfo.status - } - }; - await this.sendWebhookEvent(data); + async sendRecordingUpdatedWebhook(recordingInfo: MeetRecordingInfo) { + try { + await this.sendWebhookEvent(MeetWebhookEventType.RECORDING_UPDATED, recordingInfo); + } catch (error) { + this.logger.error(`Error sending recording updated webhook: ${error}`); + } } - private async sendWebhookEvent(data: OpenViduWebhookEvent) { + async sendRecordingEndedWebhook(recordingInfo: MeetRecordingInfo) { + try { + await this.sendWebhookEvent(MeetWebhookEventType.RECORDING_ENDED, recordingInfo); + } catch (error) { + this.logger.error(`Error sending recording ended webhook: ${error}`); + } + } + + private async sendWebhookEvent(event: MeetWebhookEventType, payload: MeetWebhookPayload) { if (!this.isWebhookEnabled()) return; - const timestamp = data.creationDate; - const signature = this.generateWebhookSignature(timestamp, data); + const creationDate = Date.now(); + const data: MeetWebhookEvent = { + event, + creationDate, + data: payload + }; + + const signature = this.generateWebhookSignature(creationDate, data); this.logger.info(`Sending webhook event ${data.event}`); @@ -61,7 +61,7 @@ export class OpenViduWebhookService { method: 'POST', headers: { 'Content-Type': 'application/json', - 'X-Timestamp': timestamp.toString(), + 'X-Timestamp': creationDate.toString(), 'X-Signature': signature }, body: JSON.stringify(data) diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index 472ec1c..fb826a3 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -1,14 +1,8 @@ -import { - EncodedFileOutput, - EncodedFileType, - ListEgressOptions, - RoomCompositeOptions, - SendDataOptions -} from 'livekit-server-sdk'; +import { EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk'; +import { uid } from 'uid'; import { Readable } from 'stream'; import { LiveKitService } from './livekit.service.js'; import { - OpenViduMeetError, errorRecordingAlreadyStarted, errorRecordingNotFound, errorRecordingNotStopped, @@ -16,84 +10,68 @@ import { internalError } from '../models/error.model.js'; import { S3Service } from './s3.service.js'; -import { DataTopic } from '../models/signal.model.js'; import { LoggerService } from './logger.service.js'; -import { RecordingInfo, RecordingStatus } from '@typings-ce'; +import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce'; import { RecordingHelper } from '../helpers/recording.helper.js'; -import { MEET_S3_BUCKET } from '../environment.js'; +import { MEET_S3_BUCKET, MEET_S3_RECORDINGS_PREFIX, MEET_S3_SUBBUCKET } from '../environment.js'; import { RoomService } from './room.service.js'; import { inject, injectable } from '../config/dependency-injector.config.js'; +import { MutexService, RedisLock } from './mutex.service.js'; +import { RedisLockName } from '../models/index.js'; +import ms from 'ms'; +import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js'; @injectable() export class RecordingService { + protected readonly RECORDING_ACTIVE_LOCK_TTL = ms('6h'); constructor( @inject(S3Service) protected s3Service: S3Service, @inject(LiveKitService) protected livekitService: LiveKitService, @inject(RoomService) protected roomService: RoomService, + @inject(MutexService) protected mutexService: MutexService, @inject(LoggerService) protected logger: LoggerService ) {} - async startRecording(roomName: string): Promise { + async startRecording(roomName: string): Promise { + let acquiredLock: RedisLock | null = null; + try { - const egressOptions: ListEgressOptions = { - roomName, - active: true - }; + // Attempt to acquire lock. + // Note: using a high TTL to prevent expiration during a long recording. + acquiredLock = await this.acquireRoomRecordingActiveLock(roomName); - const [activeEgressResult, roomResult] = await Promise.allSettled([ - this.livekitService.getEgress(egressOptions), - this.livekitService.getRoom(roomName) - ]); + if (!acquiredLock) throw errorRecordingAlreadyStarted(roomName); - // Get the results of the promises - const activeEgress = activeEgressResult.status === 'fulfilled' ? activeEgressResult.value : null; - const room = roomResult.status === 'fulfilled' ? roomResult.value : null; + const room = await this.roomService.getOpenViduRoom(roomName); - // If there is an active egress, it means that the recording is already started - if (!activeEgress || activeEgressResult.status === 'rejected') { - throw errorRecordingAlreadyStarted(roomName); - } + if (!room) throw errorRoomNotFound(roomName); - if (!room) { - throw errorRoomNotFound(roomName); - } - - const recordingId = `${roomName}-${room.sid || Date.now()}`; const options = this.generateCompositeOptionsFromRequest(); - const output = this.generateFileOutputFromRequest(recordingId); + const output = this.generateFileOutputFromRequest(roomName); const egressInfo = await this.livekitService.startRoomComposite(roomName, output, options); + + // Return recording info without releasing the lock here, + // as it will be released in handleEgressEnded on successful completion. return RecordingHelper.toRecordingInfo(egressInfo); } catch (error) { this.logger.error(`Error starting recording in room ${roomName}: ${error}`); - let payload = { error: error, statusCode: 500 }; - const options: SendDataOptions = { - destinationSids: [], - topic: DataTopic.RECORDING_FAILED - }; - if (error instanceof OpenViduMeetError) { - payload = { error: error.message, statusCode: error.statusCode }; - } - - this.roomService.sendSignal(roomName, payload, options); + if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomName); throw error; } } - async stopRecording(egressId: string): Promise { + async stopRecording(egressId: string): Promise { try { - const options: ListEgressOptions = { - egressId, - active: true - }; - const egressArray = await this.livekitService.getEgress(options); + const egressArray = await this.livekitService.getActiveEgress(undefined, egressId); if (egressArray.length === 0) { throw errorRecordingNotFound(egressId); } const egressInfo = await this.livekitService.stopEgress(egressId); + return RecordingHelper.toRecordingInfo(egressInfo); } catch (error) { this.logger.error(`Error stopping recording ${egressId}: ${error}`); @@ -101,19 +79,16 @@ export class RecordingService { } } - async deleteRecording(egressId: string, isRequestedByAdmin: boolean): Promise { + // TODO: Implement deleteRecording method + async deleteRecording(egressId: string, role: string): Promise { try { - // Get the recording object from the S3 bucket - const metadataObject = await this.s3Service.listObjects('.metadata', `.*${egressId}.*.json`); + const { metadataFilePath, recordingInfo } = await this.getMeetRecordingInfoFromMetadata(egressId); - if (!metadataObject.Contents || metadataObject.Contents.length === 0) { - throw errorRecordingNotFound(egressId); - } - - const metadataPath = metadataObject.Contents[0].Key; - const recordingInfo = (await this.s3Service.getObjectAsJson(metadataPath!)) as RecordingInfo; - - if (recordingInfo.status === RecordingStatus.STARTED) { + if ( + recordingInfo.status === MeetRecordingStatus.STARTING || + recordingInfo.status === MeetRecordingStatus.ACTIVE || + recordingInfo.status === MeetRecordingStatus.ENDING + ) { throw errorRecordingNotStopped(egressId); } @@ -123,15 +98,10 @@ export class RecordingService { this.logger.info(`Deleting recording from S3 ${recordingPath}`); - await Promise.all([this.s3Service.deleteObject(metadataPath!), this.s3Service.deleteObject(recordingPath)]); - - if (!isRequestedByAdmin) { - const signalOptions: SendDataOptions = { - destinationSids: [], - topic: DataTopic.RECORDING_DELETED - }; - await this.roomService.sendSignal(recordingInfo.roomName, recordingInfo, signalOptions); - } + await Promise.all([ + this.s3Service.deleteObject(metadataFilePath), + this.s3Service.deleteObject(recordingPath) + ]); return recordingInfo; } catch (error) { @@ -140,18 +110,25 @@ export class RecordingService { } } + // TODO: Implement bulkDeleteRecordings method + async bulkDeleteRecordings(egressIds: string[], role: string): Promise { + const promises = egressIds.map((egressId) => this.deleteRecording(egressId, role)); + return Promise.all(promises); + } + /** * Retrieves the list of all recordings. * @returns A promise that resolves to an array of RecordingInfo objects. */ - async getAllRecordings(): Promise<{ recordingInfo: RecordingInfo[]; continuationToken?: string }> { + //TODO: Implement getAllRecordings method + async getAllRecordings(): Promise<{ recordingInfo: MeetRecordingInfo[]; continuationToken?: string }> { try { const allEgress = await this.s3Service.listObjects('.metadata', '.json'); - const promises: Promise[] = []; + const promises: Promise[] = []; allEgress.Contents?.forEach((item) => { if (item?.Key?.includes('.json')) { - promises.push(this.s3Service.getObjectAsJson(item.Key) as Promise); + promises.push(this.s3Service.getObjectAsJson(item.Key) as Promise); } }); @@ -167,10 +144,11 @@ export class RecordingService { * * @param roomName - The name of the room. * @param roomId - The ID of the room. - * @returns A promise that resolves to an array of RecordingInfo objects. + * @returns A promise that resolves to an array of MeetRecordingInfo objects. * @throws If there is an error retrieving the recordings. */ - async getAllRecordingsByRoom(roomName: string, roomId: string): Promise { + //TODO: Implement getAllRecordingsByRoom method + async getAllRecordingsByRoom(roomName: string, roomId: string): Promise { try { // Get all recordings that match the room name and room ID from the S3 bucket const roomNameSanitized = this.sanitizeRegExp(roomName); @@ -184,9 +162,9 @@ export class RecordingService { return []; } - const promises: Promise[] = []; + const promises: Promise[] = []; metadatagObject.Contents?.forEach((item) => { - promises.push(this.s3Service.getObjectAsJson(item.Key!) as Promise); + promises.push(this.s3Service.getObjectAsJson(item.Key!) as Promise); }); return Promise.all(promises); @@ -196,7 +174,8 @@ export class RecordingService { } } - private async getRecording(egressId: string): Promise { + //TODO: Implement getRecording method + async getRecording(egressId: string): Promise { const egressIdSanitized = this.sanitizeRegExp(egressId); const regexPattern = `.*${egressIdSanitized}.*\\.json`; const metadataObject = await this.s3Service.listObjects('.metadata', regexPattern); @@ -205,17 +184,18 @@ export class RecordingService { throw errorRecordingNotFound(egressId); } - const recording = (await this.s3Service.getObjectAsJson(metadataObject.Contents[0].Key!)) as RecordingInfo; + const recording = (await this.s3Service.getObjectAsJson(metadataObject.Contents[0].Key!)) as MeetRecordingInfo; return recording; // return RecordingHelper.toRecordingInfo(recording); } + //TODO: Implement getRecordingAsStream method async getRecordingAsStream( recordingId: string, range?: string ): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> { const RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB - const recordingInfo: RecordingInfo = await this.getRecording(recordingId); + const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId); const recordingPath = RecordingHelper.extractFilename(recordingInfo); if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`); @@ -240,6 +220,73 @@ export class RecordingService { } } + /** + * Acquires a Redis-based lock to indicate that a recording is active for a specific room. + */ + async acquireRoomRecordingActiveLock(roomName: string): Promise { + const lockName = `${roomName}_${RedisLockName.RECORDING_ACTIVE}`; + + try { + const lock = await this.mutexService.acquire(lockName, this.RECORDING_ACTIVE_LOCK_TTL); + return lock; + } catch (error) { + this.logger.warn(`Error acquiring lock ${lockName} on egress started: ${error}`); + return null; + } + } + + /** + * Releases the active recording lock for a specific room. + * + * This method attempts to release a lock associated with the active recording + * of a given room. + */ + async releaseRoomRecordingActiveLock(roomName: string): Promise { + if (roomName) { + const lockName = `${roomName}_${RedisLockName.RECORDING_ACTIVE}`; + + try { + await this.mutexService.release(lockName); + } catch (error) { + this.logger.warn(`Error releasing lock ${lockName} on egress ended: ${error}`); + } + } + } + + /** + * Sends a recording signal to OpenVidu Components within a specified room. + * + * This method constructs a signal with the appropriate topic and payload, + * and sends it to the OpenVidu Components in the given room. The payload + * is adapted to match the expected format for OpenVidu Components. + */ + sendRecordingSignalToOpenViduComponents(roomName: string, recordingInfo: MeetRecordingInfo) { + const { payload, options } = OpenViduComponentsAdapterHelper.generateRecordingSignal(recordingInfo); + return this.roomService.sendSignal(roomName, payload, options); + } + + private async getMeetRecordingInfoFromMetadata( + egressId: string + ): Promise<{ metadataFilePath: string; recordingInfo: MeetRecordingInfo }> { + // Get the recording object from the S3 bucket + const metadataObject = await this.s3Service.listObjects('.metadata', `.*${egressId}.*.json`); + + const content = metadataObject.Contents?.[0]; + + if (!content) { + throw errorRecordingNotFound(egressId); + } + + const metadataPath = content.Key; + + if (!metadataPath) { + throw errorRecordingNotFound(egressId); + } + + const recordingInfo = (await this.s3Service.getObjectAsJson(metadataPath)) as MeetRecordingInfo; + return { recordingInfo, metadataFilePath: metadataPath }; + } + private generateCompositeOptionsFromRequest(layout = 'speaker'): RoomCompositeOptions { return { layout: layout @@ -255,9 +302,12 @@ export class RecordingService { * @param fileName - The name of the file (default is 'recording'). * @returns The generated file output object. */ - private generateFileOutputFromRequest(recordingId: string): EncodedFileOutput { + private generateFileOutputFromRequest(roomName: string): EncodedFileOutput { // Added unique identifier to the file path for avoiding overwriting - const filepath = `${recordingId}/${recordingId}-${Date.now()}`; + const recordingName = `${roomName}-${uid(10)}`; + + // Generate the file path with the openviud-meet subbucket and the recording prefix + const filepath = `${MEET_S3_SUBBUCKET}/${MEET_S3_RECORDINGS_PREFIX}/${roomName}/${recordingName}`; return new EncodedFileOutput({ fileType: EncodedFileType.DEFAULT_FILETYPE, diff --git a/backend/src/services/room.service.ts b/backend/src/services/room.service.ts index 76fe090..526c387 100644 --- a/backend/src/services/room.service.ts +++ b/backend/src/services/room.service.ts @@ -9,6 +9,7 @@ import { OpenViduRoomHelper } from '../helpers/room.helper.js'; import { SystemEventService } from './system-event.service.js'; import { TaskSchedulerService } from './task-scheduler.service.js'; import { errorParticipantUnauthorized } from '../models/error.model.js'; +import { OpenViduComponentsAdapterHelper } from '../helpers/index.js'; /** * Service for managing OpenVidu Meet rooms. @@ -40,6 +41,7 @@ export class RoomService { } await Promise.all([ + //TODO: Livekit rooms should not be created here. They should be created when a user joins a room. this.restoreMissingLivekitRooms().catch((error) => this.logger.error('Error restoring missing rooms:', error) ), @@ -182,6 +184,25 @@ export class RoomService { } } + async sendRoomStatusSignalToOpenViduComponents(roomName: string, participantSid: string) { + // Check if recording is started in the room + const activeEgressArray = await this.livekitService.getActiveEgress(roomName); + const isRecordingStarted = activeEgressArray.length > 0; + + // Skip if recording is not started + if (!isRecordingStarted) { + return; + } + + // Construct the payload and signal options + const { payload, options } = OpenViduComponentsAdapterHelper.generateRoomStatusSignal( + isRecordingStarted, + participantSid + ); + + await this.sendSignal(roomName, payload, options); + } + /** * Sends a signal to participants in a specified room. *