From 9897436df2dd57055db8af9ffd5cb8ca0b4a023d Mon Sep 17 00:00:00 2001 From: Carlos Santos <4a.santos@gmail.com> Date: Thu, 24 Apr 2025 11:41:53 +0200 Subject: [PATCH] backend: update recording start method for improving its clarity and avoding race conditions receiving events --- .../src/services/livekit-webhook.service.ts | 4 +- backend/src/services/recording.service.ts | 123 ++++++++---------- .../src/services/task-scheduler.service.ts | 1 + backend/tests/utils/helpers.ts | 2 +- 4 files changed, 61 insertions(+), 69 deletions(-) diff --git a/backend/src/services/livekit-webhook.service.ts b/backend/src/services/livekit-webhook.service.ts index 31073b7..fe4458b 100644 --- a/backend/src/services/livekit-webhook.service.ts +++ b/backend/src/services/livekit-webhook.service.ts @@ -152,7 +152,7 @@ export class LivekitWebhookService { try { const [meetRoom] = await Promise.all([ this.roomService.getMeetRoom(room.name), - this.recordingService.releaseRoomRecordingActiveLock(room.name), + this.recordingService.releaseRecordingLockIfNoEgress(room.name), this.openViduWebhookService.sendRoomFinishedWebhook(room) ]); @@ -223,7 +223,7 @@ export class LivekitWebhookService { case 'ended': tasks.push( this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo), - this.recordingService.releaseRoomRecordingActiveLock(roomId) + this.recordingService.releaseRecordingLockIfNoEgress(roomId) ); break; } diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index fdcdcce..0af7aa9 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -58,36 +58,49 @@ export class RecordingService { async startRecording(roomId: string): Promise { let acquiredLock: RedisLock | null = null; let eventListener!: (info: Record) => void; + let recordingId = ''; + let timeoutId: NodeJS.Timeout | undefined; try { - await this.validateRoomsPreconditions(roomId); + const room = await this.roomService.getMeetRoom(roomId); + + if (!room) throw errorRoomNotFound(roomId); + + //TODO: Check if the room has participants before starting the recording + //room.numParticipants === 0 ? throw errorNoParticipants(roomId); + const lkRoom = await this.livekitService.getRoom(roomId); + + if (!lkRoom) throw errorRoomNotFound(roomId); + + const hasParticipants = await this.livekitService.roomHasParticipants(roomId); + + if (!hasParticipants) throw errorRoomHasNoParticipants(roomId); + // Attempt to acquire lock. If the lock is not acquired, the recording is already active. acquiredLock = await this.acquireRoomRecordingActiveLock(roomId); if (!acquiredLock) throw errorRecordingAlreadyStarted(roomId); - let resolveRecording!: (r: MeetRecordingInfo) => void; - let rejectRecording!: (e: unknown) => void; - const recordingPromise = new Promise((res, rej) => { - resolveRecording = res; - rejectRecording = rej; - }); - let recordingId = ''; - - eventListener = (info: Record) => { - // This listener is triggered only for the instance that started the recording. - // Check if the recording ID matches the one that was started - const isEventForCurrentRecording = info?.recordingId === recordingId && info?.roomId === roomId; - - if (isEventForCurrentRecording) { - this.taskSchedulerService.cancelTask(`${roomId}_recording_timeout`); + const startTimeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); - resolveRecording(info as unknown as MeetRecordingInfo); - } - }; + this.handleRecordingLockTimeout(recordingId, roomId, reject); + }, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT)); + }); - this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener); - this.registerRecordingTimeout(roomId, recordingId, eventListener, rejectRecording); + const eventReceivedPromise = new Promise((resolve) => { + eventListener = (info: Record) => { + // Process the event only if it belongs to the current room. + // Each room has only ONE active recording at the same time + if (info?.roomId !== roomId) return; + + clearTimeout(timeoutId); + this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + resolve(info as unknown as MeetRecordingInfo); + }; + + this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener); + }); const options = this.generateCompositeOptionsFromRequest(); const output = this.generateFileOutputFromRequest(roomId); @@ -95,17 +108,26 @@ export class RecordingService { const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo); recordingId = recordingInfo.recordingId; - return await recordingPromise; + if (recordingInfo.status === MeetRecordingStatus.ACTIVE) { + clearTimeout(timeoutId); + this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + return recordingInfo; + } + + return await Promise.race([eventReceivedPromise, startTimeoutPromise]); } catch (error) { this.logger.error(`Error starting recording in room '${roomId}': ${error}`); - if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomId); - - if (eventListener) this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); - - this.taskSchedulerService.cancelTask(`${roomId}_recording_timeout`); - throw error; + } finally { + try { + clearTimeout(timeoutId); + this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + + if (acquiredLock) await this.releaseRecordingLockIfNoEgress(roomId); + } catch (e) { + this.logger.warn(`Failed to release recording lock: ${e}`); + } } } @@ -327,12 +349,13 @@ export class RecordingService { } /** - * Releases the active recording lock for a specific room. + * Releases the active recording lock for a specified room, but only if there are no active egress operations. * - * This method attempts to release a lock associated with the active recording - * of a given room. + * This method first checks for any ongoing egress operations for the room. + * If active egress operations are found, the lock isn't released as recording is still considered active. + * Otherwise, it proceeds to release the mutex lock associated with the room's recording. */ - async releaseRoomRecordingActiveLock(roomId: string): Promise { + async releaseRecordingLockIfNoEgress(roomId: string): Promise { if (roomId) { const lockName = MeetLock.getRecordingActiveLock(roomId); const egress = await this.livekitService.getActiveEgress(roomId); @@ -341,7 +364,7 @@ export class RecordingService { this.logger.verbose( `Active egress found for room ${roomId}: ${egress.map((e) => e.egressId).join(', ')}` ); - this.logger.error(`Cannot release recording lock for room '${roomId}'.`); + this.logger.debug(`Cannot release recording lock for room '${roomId}'. Recording is still active.`); return; } @@ -366,22 +389,6 @@ export class RecordingService { return this.roomService.sendSignal(roomId, payload, options); } - protected async validateRoomsPreconditions(roomId: string): Promise { - const room = await this.roomService.getMeetRoom(roomId); - - if (!room) throw errorRoomNotFound(roomId); - - //TODO: Check if the room has participants before starting the recording - //room.numParticipants === 0 ? throw errorNoParticipants(roomId); - const lkRoom = await this.livekitService.getRoom(roomId); - - if (!lkRoom) throw errorRoomNotFound(roomId); - - const hasParticipants = await this.livekitService.roomHasParticipants(roomId); - - if (!hasParticipants) throw errorRoomHasNoParticipants(roomId); - } - /** * Retrieves the data required to delete a recording, including the file paths * to be deleted and the recording's metadata information. @@ -498,10 +505,8 @@ export class RecordingService { protected async handleRecordingLockTimeout( recordingId: string, roomId: string, - listener: (info: Record) => void, rejectRequest: (reason?: unknown) => void ) { - this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, listener); this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`); let shouldReleaseLock = false; @@ -537,7 +542,7 @@ export class RecordingService { } finally { if (shouldReleaseLock) { try { - await this.releaseRoomRecordingActiveLock(roomId); + await this.releaseRecordingLockIfNoEgress(roomId); this.logger.debug(`Recording active lock released for room ${roomId}.`); } catch (releaseError) { this.logger.error(`Error releasing active recording lock for room ${roomId}: ${releaseError}`); @@ -621,20 +626,6 @@ export class RecordingService { await this.s3Service.saveObject(metadataPath, recordingInfo); } - protected registerRecordingTimeout( - roomId: string, - recordingId: string, - eventListener: (info: Record) => void, - reject: (reason?: unknown) => void - ): void { - this.taskSchedulerService.registerTask({ - name: `${roomId}_recording_timeout`, - type: 'timeout', - scheduleOrDelay: INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT, - callback: this.handleRecordingLockTimeout.bind(this, recordingId, roomId, eventListener, reject) - }); - } - /** * Cleans up orphaned recording locks in the system. * diff --git a/backend/src/services/task-scheduler.service.ts b/backend/src/services/task-scheduler.service.ts index db56953..35231d3 100644 --- a/backend/src/services/task-scheduler.service.ts +++ b/backend/src/services/task-scheduler.service.ts @@ -127,6 +127,7 @@ export class TaskSchedulerService { } this.scheduledTasks.delete(name); + this.taskRegistry = this.taskRegistry.filter((task) => task.name !== name); this.logger.debug(`Task "${name}" cancelled.`); } } diff --git a/backend/tests/utils/helpers.ts b/backend/tests/utils/helpers.ts index 6ca3f69..e6c18a9 100644 --- a/backend/tests/utils/helpers.ts +++ b/backend/tests/utils/helpers.ts @@ -207,7 +207,7 @@ export const runReleaseActiveRecordingLock = async (roomId: string) => { } const recordingService = container.get(RecordingService); - await recordingService.releaseRoomRecordingActiveLock(roomId); + await recordingService.releaseRecordingLockIfNoEgress(roomId); }; /**