diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index eeb1ef8..eb0ed7a 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -5,7 +5,7 @@ import ms from 'ms'; import { Readable } from 'stream'; import { uid } from 'uid'; import INTERNAL_CONFIG from '../config/internal-config.js'; -import { MEET_S3_BUCKET, MEET_S3_SUBBUCKET } from '../environment.js'; +import { MEET_S3_SUBBUCKET } from '../environment.js'; import { MeetLock, OpenViduComponentsAdapterHelper, RecordingHelper, UtilsHelper } from '../helpers/index.js'; import { errorRecordingAlreadyStarted, @@ -62,6 +62,7 @@ export class RecordingService { let eventListener!: (info: Record) => void; let recordingId = ''; let timeoutId: NodeJS.Timeout | undefined; + let isOperationCompleted = false; try { // Attempt to acquire lock. If the lock is not acquired, the recording is already active. @@ -69,32 +70,28 @@ export class RecordingService { if (!acquiredLock) throw errorRecordingAlreadyStarted(roomId); - const room = await this.roomService.getMeetRoom(roomId); + await this.validateRoomForStartRecording(roomId); - if (!room) throw errorRoomNotFound(roomId); - - //TODO: Check if the room has participants before starting the recording - //room.numParticipants === 0 ? throw errorNoParticipants(roomId); - const lkRoom = await this.livekitService.getRoom(roomId); - - if (!lkRoom) throw errorRoomNotFound(roomId); - - const hasParticipants = await this.livekitService.roomHasParticipants(roomId); - - if (!hasParticipants) throw errorRoomHasNoParticipants(roomId); - - const startTimeoutPromise = new Promise((_, reject) => { + const timeoutPromise = new Promise((_, reject) => { timeoutId = setTimeout(() => { + if (isOperationCompleted) return; + + isOperationCompleted = true; + + //Clean up the event listener and timeout this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); - this.handleRecordingLockTimeout(recordingId, roomId, reject); + this.handleRecordingLockTimeout(recordingId, roomId).catch(() => {}); + reject(errorRecordingStartTimeout(roomId)); }, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT)); }); - const eventReceivedPromise = new Promise((resolve) => { + const activeEgressEventPromise = new Promise((resolve) => { eventListener = (info: Record) => { // Process the event only if it belongs to the current room. // Each room has only ONE active recording at the same time - if (info?.roomId !== roomId) return; + if (info?.roomId !== roomId || isOperationCompleted) return; + + isOperationCompleted = true; clearTimeout(timeoutId); this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); @@ -104,22 +101,52 @@ export class RecordingService { this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener); }); - const options = this.generateCompositeOptionsFromRequest(); - const output = this.generateFileOutputFromRequest(roomId); - const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options); - const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo); - recordingId = recordingInfo.recordingId; + const startRecordingPromise = (async (): Promise => { + try { + const options = this.generateCompositeOptionsFromRequest(); + const output = this.generateFileOutputFromRequest(roomId); + const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options); - if (recordingInfo.status === MeetRecordingStatus.ACTIVE) { - clearTimeout(timeoutId); - this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); - return recordingInfo; - } + // Check if operation was completed while we were waiting + if (isOperationCompleted) { + this.logger.warn(`startRoomComposite completed after timeout for room ${roomId}`); + throw errorRecordingStartTimeout(roomId); + } - return await Promise.race([eventReceivedPromise, startTimeoutPromise]); + const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo); + recordingId = recordingInfo.recordingId; + + // If the recording is already active, we can resolve the promise immediately. + if (recordingInfo.status === MeetRecordingStatus.ACTIVE) { + if (!isOperationCompleted) { + isOperationCompleted = true; + clearTimeout(timeoutId); + this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + return recordingInfo; + } + } + + // Wait for RECORDING_ACTIVE event + return await activeEgressEventPromise; + } catch (error) { + if (isOperationCompleted) { + this.logger.warn(`startRoomComposite failed after timeout: ${error}`); + throw errorRecordingStartTimeout(roomId); + } + + throw error; + } + })(); + + // Prevent UnhandledPromiseRejection from late failures + startRecordingPromise.catch((error) => { + if (!isOperationCompleted) { + this.logger.error(`Unhandled error in startRecordingPromise: ${error}`); + } + }); + return await Promise.race([startRecordingPromise, timeoutPromise]); } catch (error) { this.logger.error(`Error starting recording in room '${roomId}': ${error}`); - throw error; } finally { try { @@ -443,6 +470,22 @@ export class RecordingService { } } + protected async validateRoomForStartRecording(roomId: string): Promise { + const room = await this.roomService.getMeetRoom(roomId); + + if (!room) throw errorRoomNotFound(roomId); + + //TODO: Check if the room has participants before starting the recording + //room.numParticipants === 0 ? throw errorNoParticipants(roomId); + const lkRoom = await this.livekitService.getRoom(roomId); + + if (!lkRoom) throw errorRoomNotFound(roomId); + + const hasParticipants = await this.livekitService.roomHasParticipants(roomId); + + if (!hasParticipants) throw errorRoomHasNoParticipants(roomId); + } + protected async getFullStreamResponse( recordingPath: string, fileSize: number @@ -461,7 +504,7 @@ export class RecordingService { * * @param roomId - The name of the room to acquire the lock for. */ - async acquireRoomRecordingActiveLock(roomId: string): Promise { + protected async acquireRoomRecordingActiveLock(roomId: string): Promise { const lockName = MeetLock.getRecordingActiveLock(roomId); try { @@ -601,20 +644,23 @@ export class RecordingService { * @param recordingId * @param roomId */ - protected async handleRecordingLockTimeout( - recordingId: string, - roomId: string, - rejectRequest: (reason?: unknown) => void - ) { + protected async handleRecordingLockTimeout(recordingId: string, roomId: string) { this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`); let shouldReleaseLock = false; try { - await this.updateRecordingStatus(recordingId, MeetRecordingStatus.FAILED); - await this.stopRecording(recordingId); - // The recording was stopped successfully - // the cleanup timer will be cancelled when the egress_ended event is received. + if (!recordingId || recordingId.trim() === '') { + this.logger.warn( + `Timeout triggered but recordingId is empty for room '${roomId}'. Recording likely failed to start.` + ); + shouldReleaseLock = true; + } else { + await this.updateRecordingStatus(recordingId, MeetRecordingStatus.FAILED); + await this.stopRecording(recordingId); + // The recording was stopped successfully + // the cleanup timer will be cancelled when the egress_ended event is received. + } } catch (error) { if (error instanceof OpenViduMeetError) { // The recording is already stopped or not found in LiveKit. @@ -647,9 +693,6 @@ export class RecordingService { this.logger.error(`Error releasing active recording lock for room ${roomId}: ${releaseError}`); } } - - // Reject the REST request with a timeout error. - rejectRequest(errorRecordingStartTimeout(roomId)); } }