diff --git a/backend/src/environment.ts b/backend/src/environment.ts index b220f26..f1f3c09 100644 --- a/backend/src/environment.ts +++ b/backend/src/environment.ts @@ -81,6 +81,7 @@ export const MEET_S3_RECORDINGS_PREFIX = 'recordings'; // Time to live for the active recording lock in Redis export const MEET_RECORDING_LOCK_TTL = '6h'; +export const MEET_RECORDING_CLEANUP_TIMEOUT = '30s'; export function checkModuleEnabled() { if (MODULES_FILE) { diff --git a/backend/src/helpers/redis.helper.ts b/backend/src/helpers/redis.helper.ts new file mode 100644 index 0000000..9891d09 --- /dev/null +++ b/backend/src/helpers/redis.helper.ts @@ -0,0 +1,28 @@ +import { RedisLockName, RedisLockPrefix } from '../models/redis.model.js'; + +export class MeetLock { + private constructor() { + // Prevent instantiation of this utility class + } + + static getRecordingActiveLock(roomId: string): string { + if (!roomId) { + throw new Error('roomId must be a non-empty string'); + } + + return `${RedisLockPrefix.BASE}${roomId}_${RedisLockName.RECORDING_ACTIVE}`; + } + + static getRegistryLock(lockName: string): string { + if (!lockName) { + throw new Error('lockName must be a non-empty string'); + } + + return `${RedisLockPrefix.REGISTRY}${lockName}`; + } + + static getRoomGarbageCollectorLock(): string { + return `${RedisLockPrefix.BASE}${RedisLockName.ROOM_GARBAGE_COLLECTOR}`; + } + +} diff --git a/backend/src/models/error.model.ts b/backend/src/models/error.model.ts index 12db4f7..1cd377c 100644 --- a/backend/src/models/error.model.ts +++ b/backend/src/models/error.model.ts @@ -63,10 +63,6 @@ export const errorRecordingNotStopped = (recordingId: string): OpenViduMeetError return new OpenViduMeetError('Recording Error', `Recording '${recordingId}' is not stopped yet`, 409); }; -export const errorRecordingNotReady = (recordingId: string): OpenViduMeetError => { - return new OpenViduMeetError('Recording Error', `Recording '${recordingId}' is not ready yet`, 409); -}; - export const errorRecordingAlreadyStopped = (recordingId: string): OpenViduMeetError => { return new OpenViduMeetError('Recording Error', `Recording '${recordingId}' is already stopped`, 409); }; @@ -79,6 +75,30 @@ export const errorRecordingAlreadyStarted = (roomName: string): OpenViduMeetErro return new OpenViduMeetError('Recording Error', `The room '${roomName}' is already being recorded`, 409); }; +const isMatchingError = (error: OpenViduMeetError, originalError: OpenViduMeetError): boolean => { + return ( + error instanceof OpenViduMeetError && + error.name === originalError.name && + error.statusCode === originalError.statusCode && + error.message === originalError.message + ); +}; + +export const isErrorRecordingAlreadyStopped = (error: OpenViduMeetError, recordingId: string): boolean => { + return isMatchingError(error, errorRecordingAlreadyStopped(recordingId)); +}; + +export const isErrorRecordingNotFound = (error: OpenViduMeetError, recordingId: string): boolean => { + return isMatchingError(error, errorRecordingNotFound(recordingId)); +}; + +export const isErrorRecordingCannotBeStoppedWhileStarting = ( + error: OpenViduMeetError, + recordingId: string +): boolean => { + return isMatchingError(error, errorRecordingCannotBeStoppedWhileStarting(recordingId)); +}; + // Room errors export const errorRoomNotFound = (roomName: string): OpenViduMeetError => { return new OpenViduMeetError('Room Error', `The room '${roomName}' does not exist`, 404); diff --git a/backend/src/models/redis.model.ts b/backend/src/models/redis.model.ts index cd27d34..448ca99 100644 --- a/backend/src/models/redis.model.ts +++ b/backend/src/models/redis.model.ts @@ -1,4 +1,10 @@ +export const enum RedisLockPrefix { + BASE = 'ov_meet_lock:', + REGISTRY = 'ov_meet_lock_registry:' +} + export const enum RedisLockName { - GARBAGE_COLLECTOR = 'room_garbage_collector', - RECORDING_ACTIVE = 'recording_active', -} \ No newline at end of file + ROOM_GARBAGE_COLLECTOR = 'room_garbage_collector', + RECORDING_ACTIVE = 'recording_active' +} + diff --git a/backend/src/models/system-event.model.ts b/backend/src/models/system-event.model.ts new file mode 100644 index 0000000..26858f0 --- /dev/null +++ b/backend/src/models/system-event.model.ts @@ -0,0 +1,11 @@ +export const enum SystemEventType { + /** + * Event emitted when a egress is active. + */ + RECORDING_ACTIVE = 'recording_active' +} + +export interface SystemEventPayload { + eventType: string; + payload: Record; +} diff --git a/backend/src/services/livekit-webhook.service.ts b/backend/src/services/livekit-webhook.service.ts index bc8c80c..0fe3ff0 100644 --- a/backend/src/services/livekit-webhook.service.ts +++ b/backend/src/services/livekit-webhook.service.ts @@ -2,7 +2,7 @@ import { inject, injectable } from '../config/dependency-injector.config.js'; import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk'; import { RecordingHelper } from '../helpers/recording.helper.js'; import { LiveKitService } from './livekit.service.js'; -import { MeetRecordingInfo } from '@typings-ce'; +import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce'; import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, MEET_NAME_ID, MEET_S3_RECORDINGS_PREFIX } from '../environment.js'; import { LoggerService } from './logger.service.js'; import { RoomService } from './room.service.js'; @@ -10,6 +10,8 @@ import { S3Service } from './s3.service.js'; import { RecordingService } from './recording.service.js'; import { OpenViduWebhookService } from './openvidu-webhook.service.js'; import { MutexService } from './mutex.service.js'; +import { SystemEventService } from './system-event.service.js'; +import { SystemEventType } from '../models/system-event.model.js'; @injectable() export class LivekitWebhookService { @@ -21,6 +23,7 @@ export class LivekitWebhookService { @inject(RoomService) protected roomService: RoomService, @inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService, @inject(MutexService) protected mutexService: MutexService, + @inject(SystemEventService) protected systemEventService: SystemEventService, @inject(LoggerService) protected logger: LoggerService ) { this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET); @@ -33,7 +36,12 @@ export class LivekitWebhookService { * @returns The WebhookEvent extracted from the request body. */ async getEventFromWebhook(body: string, auth?: string): Promise { - return await this.webhookReceiver.receive(body, auth); + try { + return await this.webhookReceiver.receive(body, auth); + } catch (error) { + this.logger.error('Error receiving webhook event', error); + throw error; + } } /** @@ -79,10 +87,20 @@ export class LivekitWebhookService { } } + /** + * Handles the 'room_created' event by sending a webhook notification indicating that the room has been created. + * If an error occurs while sending the webhook, it logs the error. + * @param room - Information about the room that was created. + */ async handleEgressStarted(egressInfo: EgressInfo) { await this.processRecordingEgress(egressInfo, 'started'); } + /** + * Handles the 'egress_updated' event by gathering relevant room and recording information, + * updating the recording metadata, and sending a data payload with recording information to the room. + * @param egressInfo - Information about the updated recording egress. + */ async handleEgressUpdated(egressInfo: EgressInfo) { await this.processRecordingEgress(egressInfo, 'updated'); } @@ -104,15 +122,13 @@ export class LivekitWebhookService { * @param participant - Information about the newly joined participant. */ async handleParticipantJoined(room: Room, participant: ParticipantInfo) { - try { - // Skip if the participant is an egress participant - if (this.livekitService.isEgressParticipant(participant)) { - return; - } + // Skip if the participant is an egress participant + if (this.livekitService.isEgressParticipant(participant)) return; + try { await this.roomService.sendRoomStatusSignalToOpenViduComponents(room.name, participant.sid); } catch (error) { - this.logger.error(`Error sending data on participant joined: ${error}`); + this.logger.error('Error sending room status signal on participant join:', error); } } @@ -125,7 +141,7 @@ export class LivekitWebhookService { * @param {Room} room - The room object that has finished. * @returns {Promise} A promise that resolves when the webhook has been sent. */ - async handleMeetingFinished(room: Room) { + async handleMeetingFinished(room: Room): Promise { try { await Promise.all([ this.recordingService.releaseRoomRecordingActiveLock(room.name), @@ -150,49 +166,60 @@ export class LivekitWebhookService { ): Promise { if (!RecordingHelper.isRecordingEgress(egressInfo)) return; - this.logger.debug(`Processing recording ${webhookAction} webhook.`); + this.logger.debug(`Handling recording_${webhookAction} webhook.`); const recordingInfo: MeetRecordingInfo = RecordingHelper.toRecordingInfo(egressInfo); const { roomId, recordingId, status } = recordingInfo; - const metadataPath = this.generateMetadataPath(recordingId); + const metadataPath = this.buildMetadataFilePath(recordingId); - this.logger.debug(`Recording '${recordingId}' for room '${roomId}' is in status '${status}'`); + this.logger.debug(`Recording '${recordingId}' status: '${status}'`); - const promises: Promise[] = []; + const tasks: Promise[] = []; + + // Update recording metadata + tasks.push( + this.s3Service.saveObject(metadataPath, recordingInfo), + this.recordingService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo) + ); + + // Send webhook notification + switch (webhookAction) { + case 'started': + tasks.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo)); + break; + case 'updated': + tasks.push(this.openViduWebhookService.sendRecordingUpdatedWebhook(recordingInfo)); + + if (recordingInfo.status === MeetRecordingStatus.ACTIVE) { + // Send system event for active recording with the aim of cancelling the cleanup timer + tasks.push( + this.systemEventService.publishEvent(SystemEventType.RECORDING_ACTIVE, { + roomId, + recordingId + }) + ); + } + + break; + case 'ended': + tasks.push( + this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo), + this.recordingService.releaseRoomRecordingActiveLock(roomId) + ); + break; + } try { - // Update recording metadata - promises.push( - this.s3Service.saveObject(metadataPath, recordingInfo), - this.recordingService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo) - ); - - // Send webhook notification - switch (webhookAction) { - case 'started': - promises.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo)); - break; - case 'updated': - promises.push(this.openViduWebhookService.sendRecordingUpdatedWebhook(recordingInfo)); - break; - case 'ended': - promises.push( - this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo), - this.recordingService.releaseRoomRecordingActiveLock(roomId) - ); - break; - } - // Wait for all promises to resolve - await Promise.all(promises); + await Promise.all(tasks); } catch (error) { this.logger.warn( - `Error sending recording ${webhookAction} webhook for egress ${egressInfo.egressId}: ${error}` + `Error processing recording ${webhookAction} webhook for egress ${egressInfo.egressId}: ${error}` ); } } - protected generateMetadataPath(recordingId: string): string { + protected buildMetadataFilePath(recordingId: string): string { const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId); return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`; diff --git a/backend/src/services/mutex.service.ts b/backend/src/services/mutex.service.ts index fc2618b..a06937e 100644 --- a/backend/src/services/mutex.service.ts +++ b/backend/src/services/mutex.service.ts @@ -3,14 +3,13 @@ import Redlock, { Lock } from 'redlock'; import { inject, injectable } from 'inversify'; import { RedisService } from './redis.service.js'; import { LoggerService } from './logger.service.js'; +import { MeetLock } from '../helpers/redis.helper.js'; export type RedisLock = Lock; @injectable() export class MutexService { protected redlockWithoutRetry: Redlock; protected readonly TTL_MS = ms('1m'); - protected LOCK_KEY_PREFIX = 'ov_meet_lock:'; - protected LOCK_REGISTRY_PREFIX = 'ov_meet_lock_registry:'; constructor( @inject(RedisService) protected redisService: RedisService, @@ -22,16 +21,15 @@ export class MutexService { /** * Acquires a lock for the specified resource. - * @param resource The resource to acquire a lock for. + * @param key The resource to acquire a lock for. * @param ttl The time-to-live (TTL) for the lock in milliseconds. Defaults to the TTL value of the MutexService. * @returns A Promise that resolves to the acquired Lock object. */ - async acquire(resource: string, ttl: number = this.TTL_MS): Promise { - const key = this.getLockKey(resource); - const registryKey = this.getLockRegistryKey(resource); + async acquire(key: string, ttl: number = this.TTL_MS): Promise { + const registryKey = MeetLock.getRegistryLock(key); try { - this.logger.debug(`Acquiring lock for resource: ${resource}`); + this.logger.debug(`Requesting lock: ${key}`); const lock = await this.redlockWithoutRetry.acquire([key], ttl); // Store Lock data in Redis registry for support HA and release lock @@ -54,72 +52,56 @@ export class MutexService { /** * Releases a lock on a resource. * - * @param resource - The resource to release the lock on. + * @param key - The resource to release the lock on. * @returns A Promise that resolves when the lock is released. */ - async release(resource: string): Promise { - const key = this.getLockKey(resource); - - const lock = await this.getLockData(resource); + async release(key: string): Promise { + const registryKey = MeetLock.getRegistryLock(key); + const lock = await this.getLockData(key); if (!lock) { return; } if (lock) { - this.logger.debug(`Releasing lock for resource: ${resource}`); + this.logger.debug(`Releasing lock for resource: ${key}`); try { await lock.release(); } catch (error) { this.logger.error(`Error releasing lock for key ${key}:`, error); } finally { - await this.redisService.delete(this.getLockRegistryKey(resource)); + await this.redisService.delete(registryKey); } } } - /** - * Returns the complete key used to acquire the lock in Redis. - */ - protected getLockKey(resource: string): string { - return `${this.LOCK_KEY_PREFIX}${resource}`; - } - - /** - * Generates a unique key for the lock registry by combining a predefined prefix - * with the specified resource identifier. - */ - protected getLockRegistryKey(resource: string): string { - return `${this.LOCK_REGISTRY_PREFIX}${resource}`; - } - /** * Retrieves the lock data for a given resource. * * This method first attempts to retrieve the lock from Redis. If the lock data is successfully retrieved from Redis, * it constructs a new `Lock` instance and returns it. If the lock data cannot be found the method returns `null`. * - * @param resource - The identifier of the resource for which the lock data is being retrieved. + * @param key - The identifier of the resource for which the lock data is being retrieved. * @returns A promise that resolves to the `Lock` instance if found, or `null` if the lock data is not available. */ - protected async getLockData(resource: string): Promise { - const registryKey = this.getLockRegistryKey(resource); + protected async getLockData(key: string): Promise { + const registryKey = MeetLock.getRegistryLock(key); try { - this.logger.debug(`Getting lock data in Redis for resource: ${resource}`); + this.logger.debug(`Getting lock data in Redis for resource: ${key}`); // Try to get lock from Redis const redisLockData = await this.redisService.get(registryKey); if (!redisLockData) { - this.logger.error(`Cannot release lock. Lock not found for resource: ${resource}.`); + this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`); return null; } const { resources, value, expiration } = JSON.parse(redisLockData); return new Lock(this.redlockWithoutRetry, resources, value, [], expiration); } catch (error) { - this.logger.error(`Cannot release lock. Lock not found for resource: ${resource}.`); + this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`); return null; } } diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index 118b431..c11c73b 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -1,5 +1,6 @@ import { EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk'; import { uid } from 'uid'; +import ms from 'ms'; import { Readable } from 'stream'; import { LiveKitService } from './livekit.service.js'; import { @@ -10,19 +11,29 @@ import { errorRecordingNotStopped, errorRoomNotFound, internalError, + isErrorRecordingAlreadyStopped, + isErrorRecordingCannotBeStoppedWhileStarting, + isErrorRecordingNotFound, OpenViduMeetError } from '../models/error.model.js'; import { S3Service } from './s3.service.js'; import { LoggerService } from './logger.service.js'; import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce'; import { RecordingHelper } from '../helpers/recording.helper.js'; -import { MEET_RECORDING_LOCK_TTL, MEET_S3_BUCKET, MEET_S3_RECORDINGS_PREFIX, MEET_S3_SUBBUCKET } from '../environment.js'; +import { + MEET_RECORDING_LOCK_TTL, + MEET_S3_BUCKET, + MEET_S3_RECORDINGS_PREFIX, + MEET_S3_SUBBUCKET +} from '../environment.js'; import { RoomService } from './room.service.js'; import { inject, injectable } from '../config/dependency-injector.config.js'; import { MutexService, RedisLock } from './mutex.service.js'; -import { RedisLockName } from '../models/index.js'; -import ms from 'ms'; import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js'; +import { MeetLock } from '../helpers/redis.helper.js'; +import { TaskSchedulerService } from './task-scheduler.service.js'; +import { SystemEventService } from './system-event.service.js'; +import { SystemEventType } from '../models/system-event.model.js'; @injectable() export class RecordingService { @@ -31,6 +42,8 @@ export class RecordingService { @inject(LiveKitService) protected livekitService: LiveKitService, @inject(RoomService) protected roomService: RoomService, @inject(MutexService) protected mutexService: MutexService, + @inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService, + @inject(SystemEventService) protected systemEventService: SystemEventService, @inject(LoggerService) protected logger: LoggerService ) {} @@ -42,6 +55,9 @@ export class RecordingService { if (!room) throw errorRoomNotFound(roomId); + //TODO: Check if the room has participants before starting the recording + //room.numParticipants === 0 ? throw errorNoParticipants(roomId); + // Attempt to acquire lock. If the lock is not acquired, the recording is already active. acquiredLock = await this.acquireRoomRecordingActiveLock(roomId); @@ -50,10 +66,33 @@ export class RecordingService { const options = this.generateCompositeOptionsFromRequest(); const output = this.generateFileOutputFromRequest(roomId); const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options); + const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo); + const { recordingId } = recordingInfo; - return RecordingHelper.toRecordingInfo(egressInfo); + const recordingPromise = new Promise((resolve, reject) => { + this.taskSchedulerService.scheduleRecordingCleanupTimer( + roomId, + this.handleRecordingLockTimeout.bind(this, recordingId, roomId, reject) + ); + + this.systemEventService.once(SystemEventType.RECORDING_ACTIVE, (payload: Record) => { + // This listener is triggered only for the instance that started the recording. + // Check if the recording ID matches the one that was started + const isEventForCurrentRecording = + payload?.recordingId === recordingId && payload?.roomId === roomId; + + if (isEventForCurrentRecording) { + this.taskSchedulerService.cancelRecordingCleanupTimer(roomId); + resolve(recordingInfo); + } else { + this.logger.error('Received recording active event with mismatched recording ID:', payload); + } + }); + }); + + return await recordingPromise; } catch (error) { - this.logger.error(`Error starting recording in room ${roomId}: ${error}`); + this.logger.error(`Error starting recording in room '${roomId}': ${error}`); if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomId); @@ -71,6 +110,11 @@ export class RecordingService { throw errorRecordingNotFound(egressId); } + // Cancel the recording cleanup timer if it is running + this.taskSchedulerService.cancelRecordingCleanupTimer(roomId); + // Remove the listener for the EGRESS_STARTED event. + this.systemEventService.off(SystemEventType.RECORDING_ACTIVE); + switch (egress.status) { case EgressStatus.EGRESS_ACTIVE: // Everything is fine, the recording can be stopped. @@ -85,9 +129,10 @@ export class RecordingService { const egressInfo = await this.livekitService.stopEgress(egressId); + this.logger.info(`Recording stopped successfully for room '${roomId}'.`); return RecordingHelper.toRecordingInfo(egressInfo); } catch (error) { - this.logger.error(`Error stopping recording ${recordingId}: ${error}`); + this.logger.error(`Error stopping recording '${recordingId}': ${error}`); throw error; } } @@ -275,7 +320,7 @@ export class RecordingService { * @param roomId - The name of the room to acquire the lock for. */ async acquireRoomRecordingActiveLock(roomId: string): Promise { - const lockName = `${roomId}_${RedisLockName.RECORDING_ACTIVE}`; + const lockName = MeetLock.getRecordingActiveLock(roomId); try { const lock = await this.mutexService.acquire(lockName, ms(MEET_RECORDING_LOCK_TTL)); @@ -292,14 +337,24 @@ export class RecordingService { * This method attempts to release a lock associated with the active recording * of a given room. */ - async releaseRoomRecordingActiveLock(roomName: string): Promise { - if (roomName) { - const lockName = `${roomName}_${RedisLockName.RECORDING_ACTIVE}`; + async releaseRoomRecordingActiveLock(roomId: string): Promise { + if (roomId) { + const lockName = MeetLock.getRecordingActiveLock(roomId); + const egress = await this.livekitService.getActiveEgress(roomId); + + if (egress.length > 0) { + this.logger.verbose( + `Active egress found for room ${roomId}: ${egress.map((e) => e.egressId).join(', ')}` + ); + this.logger.error(`Cannot release recorgin lock for room '${roomId}'.`); + return; + } try { await this.mutexService.release(lockName); + this.logger.verbose(`Recording active lock released for room '${roomId}'.`); } catch (error) { - this.logger.warn(`Error releasing lock ${lockName} on egress ended: ${error}`); + this.logger.warn(`Error releasing recording lock for room '${roomId}' on egress ended: ${error}`); } } } @@ -362,7 +417,7 @@ export class RecordingService { return { recordingInfo, metadataFilePath: metadataPath }; } - private generateCompositeOptionsFromRequest(layout = 'speaker'): RoomCompositeOptions { + protected generateCompositeOptionsFromRequest(layout = 'speaker'): RoomCompositeOptions { return { layout: layout // customBaseUrl: customLayout, @@ -377,7 +432,7 @@ export class RecordingService { * @param fileName - The name of the file (default is 'recording'). * @returns The generated file output object. */ - private generateFileOutputFromRequest(roomId: string): EncodedFileOutput { + protected generateFileOutputFromRequest(roomId: string): EncodedFileOutput { // Added unique identifier to the file path for avoiding overwriting const recordingName = `${roomId}--${uid(10)}`; @@ -400,7 +455,72 @@ export class RecordingService { * @param str - The input string to sanitize for use in a regular expression. * @returns A new string with special characters escaped. */ - private sanitizeRegExp(str: string) { + protected sanitizeRegExp(str: string) { return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); } + + /** + * Callback function to release the active recording lock after a timeout. + * This function is scheduled by the recording cleanup timer when a recording is started. + * + * @param recordingId + * @param roomId + */ + protected async handleRecordingLockTimeout( + recordingId: string, + roomId: string, + rejectRequest: (reason?: unknown) => void + ) { + this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`); + + let shouldReleaseLock = false; + + try { + await this.stopRecording(recordingId); + // The recording was stopped successfully + // the cleanup timer will be cancelled when the egress_ended event is received. + } catch (error) { + if (error instanceof OpenViduMeetError) { + // The recording is already stopped or not found in LiveKit. + const isRecordingAlreadyStopped = isErrorRecordingAlreadyStopped(error, recordingId); + const isRecordingNotFound = isErrorRecordingNotFound(error, recordingId); + + if (isRecordingAlreadyStopped || isRecordingNotFound) { + this.logger.verbose(`Recording ${recordingId} is already stopped or not found.`); + this.logger.verbose(' Proceeding to release the recording active lock.'); + shouldReleaseLock = true; + } else if (isErrorRecordingCannotBeStoppedWhileStarting(error, recordingId)) { + // The recording is still starting, the cleanup timer will be cancelled. + this.logger.warn( + `Recording ${recordingId} is still starting. Skipping recording active lock release.` + ); + } else { + // An error occurred while stopping the recording. + this.logger.error(`Error stopping recording ${recordingId}: ${error.message}`); + shouldReleaseLock = true; + } + } else { + this.logger.error(`Unexpected error while run recording cleanup timer:`, error); + } + } finally { + if (shouldReleaseLock) { + try { + await this.releaseRoomRecordingActiveLock(roomId); + this.logger.debug(`Recording active lock released for room ${roomId}.`); + } catch (releaseError) { + this.logger.error(`Error releasing active recording lock for room ${roomId}: ${releaseError}`); + } + } + + // Reject the REST request with a timeout error. + rejectRequest(new Error(`Timeout waiting for '${SystemEventType.RECORDING_ACTIVE}' event in room '${roomId}'`)); + } + } + + protected async deleteOrphanLocks() { + // TODO: Get all recordings active locks from redis + // TODO: Extract all rooms ids from the active locks + // TODO: Check each room id if it has an active recording + // TODO: Remove the lock if the room has no active recording + } } diff --git a/backend/src/services/redis.service.ts b/backend/src/services/redis.service.ts index 7fabc04..07e6794 100644 --- a/backend/src/services/redis.service.ts +++ b/backend/src/services/redis.service.ts @@ -16,20 +16,29 @@ import { LoggerService } from './logger.service.js'; import { EventEmitter } from 'events'; import Redlock from 'redlock'; import ms from 'ms'; +import { SystemEventPayload } from '../models/system-event.model.js'; @injectable() -export class RedisService { +export class RedisService extends EventEmitter { protected readonly DEFAULT_TTL: number = ms('32 days'); - protected redis: Redis; + protected EVENT_CHANNEL = 'ov_meet_channel'; + protected redisPublisher: Redis; + protected redisSubscriber: Redis; protected isConnected = false; - public events: EventEmitter; + protected eventHandler?: (event: SystemEventPayload) => void; constructor(@inject(LoggerService) protected logger: LoggerService) { - this.events = new EventEmitter(); - const redisOptions = this.loadRedisConfig(); - this.redis = new Redis(redisOptions); + super(); - this.redis.on('connect', () => { + const redisOptions = this.loadRedisConfig(); + this.redisPublisher = new Redis(redisOptions); + this.redisSubscriber = new Redis(redisOptions); + + this.setupEventHandlers(); + } + + protected setupEventHandlers(): void { + const onConnect = () => { if (!this.isConnected) { this.logger.verbose('Connected to Redis'); } else { @@ -37,18 +46,29 @@ export class RedisService { } this.isConnected = true; - this.events.emit('redisConnected'); - }); - this.redis.on('error', (e) => this.logger.error('Error Redis', e)); + this.emit('redisConnected'); + }; - this.redis.on('end', () => { + const onError = (error: Error) => { + this.logger.error('Redis Error', error); + }; + + const onDisconnect = () => { this.isConnected = false; this.logger.warn('Redis disconnected'); - }); + this.emit('redisDisconnected'); + }; + + this.redisPublisher.on('connect', onConnect); + this.redisSubscriber.on('connect', onConnect); + this.redisPublisher.on('error', onError); + this.redisSubscriber.on('error', onError); + this.redisPublisher.on('end', onDisconnect); + this.redisSubscriber.on('end', onDisconnect); } createRedlock(retryCount = -1, retryDelay = 200) { - return new Redlock([this.redis], { + return new Redlock([this.redisPublisher], { driftFactor: 0.01, retryCount, retryDelay, @@ -61,7 +81,62 @@ export class RedisService { callback(); } - this.events.on('redisConnected', callback); + this.on('redisConnected', callback); + } + + /** + * Publishes a message to a specified Redis channel. + * + * @param channel - The name of the Redis channel to publish the message to. + * @param message - The message to be published to the channel. + * @returns A promise that resolves when the message has been successfully published. + */ + async publishEvent(channel: string, message: string) { + try { + await this.redisPublisher.publish(channel, message); + } catch (error) { + this.logger.error('Error publishing message to Redis', error); + } + } + + /** + * Subscribes to a Redis channel. + * + * @param channel - The channel to subscribe to. + * @param callback - The callback function to execute when a message is received on the channel. + */ + subscribe(channel: string, callback: (message: string) => void) { + this.logger.verbose(`Subscribing to Redis channel: ${channel}`); + this.redisSubscriber.subscribe(channel, (err, count) => { + if (err) { + this.logger.error('Error subscribing to Redis channel', err); + return; + } + + this.logger.verbose(`Subscribed to ${channel}. Now subscribed to ${count} channel(s).`); + }); + + this.redisSubscriber.on('message', (receivedChannel, message) => { + if (receivedChannel === channel) { + callback(message); + } + }); + } + + /** + * Unsubscribes from a Redis channel. + * + * @param channel - The channel to unsubscribe from. + */ + unsubscribe(channel: string) { + this.redisSubscriber.unsubscribe(channel, (err, count) => { + if (err) { + this.logger.error('Error unsubscribing from Redis channel', err); + return; + } + + this.logger.verbose(`Unsubscribed from channel ${channel}. Now subscribed to ${count} channel(s).`); + }); } /** @@ -76,7 +151,7 @@ export class RedisService { const keys: Set = new Set(); do { - const [nextCursor, partialKeys] = await this.redis.scan(cursor, 'MATCH', pattern); + const [nextCursor, partialKeys] = await this.redisPublisher.scan(cursor, 'MATCH', pattern); partialKeys.forEach((key) => keys.add(key)); cursor = nextCursor; } while (cursor !== '0'); @@ -98,9 +173,9 @@ export class RedisService { get(key: string, hashKey?: string): Promise { try { if (hashKey) { - return this.redis.hget(key, hashKey); + return this.redisPublisher.hget(key, hashKey); } else { - return this.redis.get(key); + return this.redisPublisher.get(key); } } catch (error) { this.logger.error('Error getting value from Redis', error); @@ -108,24 +183,6 @@ export class RedisService { } } - // getAll(key: string): Promise> { - // try { - // return this.redis.hgetall(key); - // } catch (error) { - // this.logger.error('Error getting value from Redis', error); - // throw internalError(error); - // } - // } - - // getDel(key: string): Promise { - // try { - // return this.redis.getdel(key); - // } catch (error) { - // this.logger.error('Error getting and deleting value from Redis', error); - // throw internalError(error); - // } - // } - /** * Sets a value in Redis with an optional TTL (time-to-live). * @@ -141,14 +198,14 @@ export class RedisService { if (valueType === 'string' || valueType === 'number' || valueType === 'boolean') { if (withTTL) { - await this.redis.set(key, value, 'EX', this.DEFAULT_TTL); + await this.redisPublisher.set(key, value, 'EX', this.DEFAULT_TTL); } else { - await this.redis.set(key, value); + await this.redisPublisher.set(key, value); } } else if (valueType === 'object') { - await this.redis.hmset(key, value); + await this.redisPublisher.hmset(key, value); - if (withTTL) await this.redis.expire(key, this.DEFAULT_TTL); + if (withTTL) await this.redisPublisher.expire(key, this.DEFAULT_TTL); } else { throw new Error('Invalid value type'); } @@ -169,9 +226,9 @@ export class RedisService { delete(key: string, hashKey?: string): Promise { try { if (hashKey) { - return this.redis.hdel(key, hashKey); + return this.redisPublisher.hdel(key, hashKey); } else { - return this.redis.del(key); + return this.redisPublisher.del(key); } } catch (error) { throw internalError(`Error deleting key from Redis ${error}`); @@ -179,11 +236,11 @@ export class RedisService { } quit() { - this.redis.quit(); + this.redisPublisher.quit(); } async checkHealth() { - return (await this.redis.ping()) === 'PONG'; + return (await this.redisPublisher.ping()) === 'PONG'; } private loadRedisConfig(): RedisOptions { diff --git a/backend/src/services/system-event.service.ts b/backend/src/services/system-event.service.ts index 921dd0c..34697eb 100644 --- a/backend/src/services/system-event.service.ts +++ b/backend/src/services/system-event.service.ts @@ -1,15 +1,100 @@ import { inject, injectable } from 'inversify'; import { RedisService } from './redis.service.js'; import { LoggerService } from './logger.service.js'; +import { EventEmitter } from 'events'; +import { SystemEventPayload, SystemEventType } from '../models/system-event.model.js'; @injectable() export class SystemEventService { + protected emitter: EventEmitter = new EventEmitter(); + protected readonly OPENVIDU_MEET_CHANNEL = 'ov_meet_channel'; constructor( @inject(LoggerService) protected logger: LoggerService, @inject(RedisService) protected redisService: RedisService - ) {} + ) { + this.emitter.setMaxListeners(Infinity); + this.redisService.subscribe(this.OPENVIDU_MEET_CHANNEL, this.handleRedisMessage.bind(this)); + } + /** + * Subscribes to a specific system event. + * + * @param event The event type to subscribe to. + * @param listener The callback to invoke when the event is emitted. + */ + on(event: SystemEventType, listener: (payload: Record) => void): void { + this.emitter.on(event, listener); + } + + /** + * Subscribes to a specific system event once. + * + * @param event The event type to subscribe to. + * @param listener The callback to invoke when the event is emitted. + */ + once(event: SystemEventType, listener: (payload: Record) => void): void { + this.emitter.once(event, listener); + } + + /** + * Unsubscribes from a specific system event. + * + * @param event The event type to unsubscribe from. + * @param listener Optional: the specific listener to remove. If not provided, all listeners for that event are removed. + */ + off(event: SystemEventType, listener?: (payload: Record) => void): void { + if (listener) { + this.emitter.off(event, listener); + } else { + this.emitter.removeAllListeners(event); + } + } + + /** + * Publishes a system event to the central Redis channel. + * This method can be used by any part of the application to send a system-wide event. + * + * @param type The event type. + * @param payload The event payload. + */ + async publishEvent(eventType: SystemEventType, payload: Record): Promise { + const message = JSON.stringify({ eventType, payload }); + this.logger.verbose(`Publishing system event: ${eventType}`, payload); + await this.redisService.publishEvent(this.OPENVIDU_MEET_CHANNEL, message); + } + + /** + * Registers a callback function to be executed when the Redis connection is ready. + * + * @param callback - A function to be called when the Redis connection is ready. + */ onRedisReady(callback: () => void) { this.redisService.onReady(callback); } + + /** + * Handles incoming messages from Redis, parses them as system events, + * and emits the corresponding event to all registered listeners. + * + * @param message - The raw message string received from Redis. + * @throws Will log an error if the message cannot be parsed as JSON. + */ + protected handleRedisMessage(message: string): void { + try { + const eventData: SystemEventPayload = JSON.parse(message); + const { eventType, payload } = eventData; + + if (!eventType) { + this.logger.warn('Received an event without type from Redis:', message); + return; + } + + this.logger.verbose(`Emitting system event: ${eventType}`, payload); + + // Forward the event to all listeners + this.emitter.emit(eventType, payload); + } catch (error) { + this.logger.error('Error parsing redis message in SystemEventsService:', error); + } + } } diff --git a/backend/src/services/task-scheduler.service.ts b/backend/src/services/task-scheduler.service.ts index 5ccdcca..9916b93 100644 --- a/backend/src/services/task-scheduler.service.ts +++ b/backend/src/services/task-scheduler.service.ts @@ -3,11 +3,14 @@ import { LoggerService } from './index.js'; import { SystemEventService } from './system-event.service.js'; import { CronJob } from 'cron'; import { MutexService } from './mutex.service.js'; -import { RedisLockName } from '../models/redis.model.js'; +import { MeetLock } from '../helpers/redis.helper.js'; +import ms from 'ms'; +import { MEET_RECORDING_CLEANUP_TIMEOUT } from '../environment.js'; @injectable() export class TaskSchedulerService { protected roomGarbageCollectorJob: CronJob | null = null; + private recordingCleanupTimers: Map = new Map(); constructor( @inject(LoggerService) protected logger: LoggerService, @@ -53,4 +56,45 @@ export class TaskSchedulerService { this.logger.debug('Starting room garbage collector'); this.roomGarbageCollectorJob.start(); } + + /** + * Schedules a cleanup timer for a recording that has just started. + * + * If the egress_started webhook is not received before the timer expires, + * this timer will execute a cleanup callback by stopping the recording and releasing + * the active lock for the specified room. + */ + async scheduleRecordingCleanupTimer(roomId: string, cleanupCallback: () => Promise): Promise { + this.logger.debug(`Recording cleanup timer (${MEET_RECORDING_CLEANUP_TIMEOUT}) scheduled for room ${roomId}.`); + + // Schedule a timeout to run the cleanup callback after a specified time + const timeoutMs = ms(MEET_RECORDING_CLEANUP_TIMEOUT); + const timer = setTimeout(async () => { + this.logger.warn(`Recording cleanup timer expired for room ${roomId}. Initiating cleanup process.`); + this.recordingCleanupTimers.delete(roomId); + await cleanupCallback(); + }, timeoutMs); + this.recordingCleanupTimers.set(roomId, timer); + } + + cancelRecordingCleanupTimer(roomId: string): void { + const timer = this.recordingCleanupTimers.get(roomId); + + if (timer) { + clearTimeout(timer); + this.recordingCleanupTimers.delete(roomId); + this.logger.info(`Recording cleanup timer cancelled for room ${roomId}`); + } + } + + async startRecordingLockGarbageCollector(callbackFn: () => Promise): Promise { + // Create a cron job to run every minute + const recordingLockGarbageCollectorJob = new CronJob('0 * * * * *', async () => { + try { + await callbackFn(); + } catch (error) { + this.logger.error('Error running recording lock garbage collection:', error); + } + }); + } }