backend: Implement system event handling and recording cleanup logic

This commit is contained in:
Carlos Santos 2025-03-28 10:24:01 +01:00
parent 7dffc6b60a
commit 147d558af5
11 changed files with 520 additions and 139 deletions

View File

@ -81,6 +81,7 @@ export const MEET_S3_RECORDINGS_PREFIX = 'recordings';
// Time to live for the active recording lock in Redis
export const MEET_RECORDING_LOCK_TTL = '6h';
export const MEET_RECORDING_CLEANUP_TIMEOUT = '30s';
export function checkModuleEnabled() {
if (MODULES_FILE) {

View File

@ -0,0 +1,28 @@
import { RedisLockName, RedisLockPrefix } from '../models/redis.model.js';
export class MeetLock {
private constructor() {
// Prevent instantiation of this utility class
}
static getRecordingActiveLock(roomId: string): string {
if (!roomId) {
throw new Error('roomId must be a non-empty string');
}
return `${RedisLockPrefix.BASE}${roomId}_${RedisLockName.RECORDING_ACTIVE}`;
}
static getRegistryLock(lockName: string): string {
if (!lockName) {
throw new Error('lockName must be a non-empty string');
}
return `${RedisLockPrefix.REGISTRY}${lockName}`;
}
static getRoomGarbageCollectorLock(): string {
return `${RedisLockPrefix.BASE}${RedisLockName.ROOM_GARBAGE_COLLECTOR}`;
}
}

View File

@ -63,10 +63,6 @@ export const errorRecordingNotStopped = (recordingId: string): OpenViduMeetError
return new OpenViduMeetError('Recording Error', `Recording '${recordingId}' is not stopped yet`, 409);
};
export const errorRecordingNotReady = (recordingId: string): OpenViduMeetError => {
return new OpenViduMeetError('Recording Error', `Recording '${recordingId}' is not ready yet`, 409);
};
export const errorRecordingAlreadyStopped = (recordingId: string): OpenViduMeetError => {
return new OpenViduMeetError('Recording Error', `Recording '${recordingId}' is already stopped`, 409);
};
@ -79,6 +75,30 @@ export const errorRecordingAlreadyStarted = (roomName: string): OpenViduMeetErro
return new OpenViduMeetError('Recording Error', `The room '${roomName}' is already being recorded`, 409);
};
const isMatchingError = (error: OpenViduMeetError, originalError: OpenViduMeetError): boolean => {
return (
error instanceof OpenViduMeetError &&
error.name === originalError.name &&
error.statusCode === originalError.statusCode &&
error.message === originalError.message
);
};
export const isErrorRecordingAlreadyStopped = (error: OpenViduMeetError, recordingId: string): boolean => {
return isMatchingError(error, errorRecordingAlreadyStopped(recordingId));
};
export const isErrorRecordingNotFound = (error: OpenViduMeetError, recordingId: string): boolean => {
return isMatchingError(error, errorRecordingNotFound(recordingId));
};
export const isErrorRecordingCannotBeStoppedWhileStarting = (
error: OpenViduMeetError,
recordingId: string
): boolean => {
return isMatchingError(error, errorRecordingCannotBeStoppedWhileStarting(recordingId));
};
// Room errors
export const errorRoomNotFound = (roomName: string): OpenViduMeetError => {
return new OpenViduMeetError('Room Error', `The room '${roomName}' does not exist`, 404);

View File

@ -1,4 +1,10 @@
export const enum RedisLockPrefix {
BASE = 'ov_meet_lock:',
REGISTRY = 'ov_meet_lock_registry:'
}
export const enum RedisLockName {
GARBAGE_COLLECTOR = 'room_garbage_collector',
RECORDING_ACTIVE = 'recording_active',
}
ROOM_GARBAGE_COLLECTOR = 'room_garbage_collector',
RECORDING_ACTIVE = 'recording_active'
}

View File

@ -0,0 +1,11 @@
export const enum SystemEventType {
/**
* Event emitted when a egress is active.
*/
RECORDING_ACTIVE = 'recording_active'
}
export interface SystemEventPayload {
eventType: string;
payload: Record<string, unknown>;
}

View File

@ -2,7 +2,7 @@ import { inject, injectable } from '../config/dependency-injector.config.js';
import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk';
import { RecordingHelper } from '../helpers/recording.helper.js';
import { LiveKitService } from './livekit.service.js';
import { MeetRecordingInfo } from '@typings-ce';
import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce';
import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, MEET_NAME_ID, MEET_S3_RECORDINGS_PREFIX } from '../environment.js';
import { LoggerService } from './logger.service.js';
import { RoomService } from './room.service.js';
@ -10,6 +10,8 @@ import { S3Service } from './s3.service.js';
import { RecordingService } from './recording.service.js';
import { OpenViduWebhookService } from './openvidu-webhook.service.js';
import { MutexService } from './mutex.service.js';
import { SystemEventService } from './system-event.service.js';
import { SystemEventType } from '../models/system-event.model.js';
@injectable()
export class LivekitWebhookService {
@ -21,6 +23,7 @@ export class LivekitWebhookService {
@inject(RoomService) protected roomService: RoomService,
@inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService,
@inject(MutexService) protected mutexService: MutexService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(LoggerService) protected logger: LoggerService
) {
this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
@ -33,7 +36,12 @@ export class LivekitWebhookService {
* @returns The WebhookEvent extracted from the request body.
*/
async getEventFromWebhook(body: string, auth?: string): Promise<WebhookEvent> {
return await this.webhookReceiver.receive(body, auth);
try {
return await this.webhookReceiver.receive(body, auth);
} catch (error) {
this.logger.error('Error receiving webhook event', error);
throw error;
}
}
/**
@ -79,10 +87,20 @@ export class LivekitWebhookService {
}
}
/**
* Handles the 'room_created' event by sending a webhook notification indicating that the room has been created.
* If an error occurs while sending the webhook, it logs the error.
* @param room - Information about the room that was created.
*/
async handleEgressStarted(egressInfo: EgressInfo) {
await this.processRecordingEgress(egressInfo, 'started');
}
/**
* Handles the 'egress_updated' event by gathering relevant room and recording information,
* updating the recording metadata, and sending a data payload with recording information to the room.
* @param egressInfo - Information about the updated recording egress.
*/
async handleEgressUpdated(egressInfo: EgressInfo) {
await this.processRecordingEgress(egressInfo, 'updated');
}
@ -104,15 +122,13 @@ export class LivekitWebhookService {
* @param participant - Information about the newly joined participant.
*/
async handleParticipantJoined(room: Room, participant: ParticipantInfo) {
try {
// Skip if the participant is an egress participant
if (this.livekitService.isEgressParticipant(participant)) {
return;
}
// Skip if the participant is an egress participant
if (this.livekitService.isEgressParticipant(participant)) return;
try {
await this.roomService.sendRoomStatusSignalToOpenViduComponents(room.name, participant.sid);
} catch (error) {
this.logger.error(`Error sending data on participant joined: ${error}`);
this.logger.error('Error sending room status signal on participant join:', error);
}
}
@ -125,7 +141,7 @@ export class LivekitWebhookService {
* @param {Room} room - The room object that has finished.
* @returns {Promise<void>} A promise that resolves when the webhook has been sent.
*/
async handleMeetingFinished(room: Room) {
async handleMeetingFinished(room: Room): Promise<void> {
try {
await Promise.all([
this.recordingService.releaseRoomRecordingActiveLock(room.name),
@ -150,49 +166,60 @@ export class LivekitWebhookService {
): Promise<void> {
if (!RecordingHelper.isRecordingEgress(egressInfo)) return;
this.logger.debug(`Processing recording ${webhookAction} webhook.`);
this.logger.debug(`Handling recording_${webhookAction} webhook.`);
const recordingInfo: MeetRecordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
const { roomId, recordingId, status } = recordingInfo;
const metadataPath = this.generateMetadataPath(recordingId);
const metadataPath = this.buildMetadataFilePath(recordingId);
this.logger.debug(`Recording '${recordingId}' for room '${roomId}' is in status '${status}'`);
this.logger.debug(`Recording '${recordingId}' status: '${status}'`);
const promises: Promise<unknown>[] = [];
const tasks: Promise<unknown>[] = [];
// Update recording metadata
tasks.push(
this.s3Service.saveObject(metadataPath, recordingInfo),
this.recordingService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo)
);
// Send webhook notification
switch (webhookAction) {
case 'started':
tasks.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo));
break;
case 'updated':
tasks.push(this.openViduWebhookService.sendRecordingUpdatedWebhook(recordingInfo));
if (recordingInfo.status === MeetRecordingStatus.ACTIVE) {
// Send system event for active recording with the aim of cancelling the cleanup timer
tasks.push(
this.systemEventService.publishEvent(SystemEventType.RECORDING_ACTIVE, {
roomId,
recordingId
})
);
}
break;
case 'ended':
tasks.push(
this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo),
this.recordingService.releaseRoomRecordingActiveLock(roomId)
);
break;
}
try {
// Update recording metadata
promises.push(
this.s3Service.saveObject(metadataPath, recordingInfo),
this.recordingService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo)
);
// Send webhook notification
switch (webhookAction) {
case 'started':
promises.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo));
break;
case 'updated':
promises.push(this.openViduWebhookService.sendRecordingUpdatedWebhook(recordingInfo));
break;
case 'ended':
promises.push(
this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo),
this.recordingService.releaseRoomRecordingActiveLock(roomId)
);
break;
}
// Wait for all promises to resolve
await Promise.all(promises);
await Promise.all(tasks);
} catch (error) {
this.logger.warn(
`Error sending recording ${webhookAction} webhook for egress ${egressInfo.egressId}: ${error}`
`Error processing recording ${webhookAction} webhook for egress ${egressInfo.egressId}: ${error}`
);
}
}
protected generateMetadataPath(recordingId: string): string {
protected buildMetadataFilePath(recordingId: string): string {
const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`;

View File

@ -3,14 +3,13 @@ import Redlock, { Lock } from 'redlock';
import { inject, injectable } from 'inversify';
import { RedisService } from './redis.service.js';
import { LoggerService } from './logger.service.js';
import { MeetLock } from '../helpers/redis.helper.js';
export type RedisLock = Lock;
@injectable()
export class MutexService {
protected redlockWithoutRetry: Redlock;
protected readonly TTL_MS = ms('1m');
protected LOCK_KEY_PREFIX = 'ov_meet_lock:';
protected LOCK_REGISTRY_PREFIX = 'ov_meet_lock_registry:';
constructor(
@inject(RedisService) protected redisService: RedisService,
@ -22,16 +21,15 @@ export class MutexService {
/**
* Acquires a lock for the specified resource.
* @param resource The resource to acquire a lock for.
* @param key The resource to acquire a lock for.
* @param ttl The time-to-live (TTL) for the lock in milliseconds. Defaults to the TTL value of the MutexService.
* @returns A Promise that resolves to the acquired Lock object.
*/
async acquire(resource: string, ttl: number = this.TTL_MS): Promise<Lock | null> {
const key = this.getLockKey(resource);
const registryKey = this.getLockRegistryKey(resource);
async acquire(key: string, ttl: number = this.TTL_MS): Promise<Lock | null> {
const registryKey = MeetLock.getRegistryLock(key);
try {
this.logger.debug(`Acquiring lock for resource: ${resource}`);
this.logger.debug(`Requesting lock: ${key}`);
const lock = await this.redlockWithoutRetry.acquire([key], ttl);
// Store Lock data in Redis registry for support HA and release lock
@ -54,72 +52,56 @@ export class MutexService {
/**
* Releases a lock on a resource.
*
* @param resource - The resource to release the lock on.
* @param key - The resource to release the lock on.
* @returns A Promise that resolves when the lock is released.
*/
async release(resource: string): Promise<void> {
const key = this.getLockKey(resource);
const lock = await this.getLockData(resource);
async release(key: string): Promise<void> {
const registryKey = MeetLock.getRegistryLock(key);
const lock = await this.getLockData(key);
if (!lock) {
return;
}
if (lock) {
this.logger.debug(`Releasing lock for resource: ${resource}`);
this.logger.debug(`Releasing lock for resource: ${key}`);
try {
await lock.release();
} catch (error) {
this.logger.error(`Error releasing lock for key ${key}:`, error);
} finally {
await this.redisService.delete(this.getLockRegistryKey(resource));
await this.redisService.delete(registryKey);
}
}
}
/**
* Returns the complete key used to acquire the lock in Redis.
*/
protected getLockKey(resource: string): string {
return `${this.LOCK_KEY_PREFIX}${resource}`;
}
/**
* Generates a unique key for the lock registry by combining a predefined prefix
* with the specified resource identifier.
*/
protected getLockRegistryKey(resource: string): string {
return `${this.LOCK_REGISTRY_PREFIX}${resource}`;
}
/**
* Retrieves the lock data for a given resource.
*
* This method first attempts to retrieve the lock from Redis. If the lock data is successfully retrieved from Redis,
* it constructs a new `Lock` instance and returns it. If the lock data cannot be found the method returns `null`.
*
* @param resource - The identifier of the resource for which the lock data is being retrieved.
* @param key - The identifier of the resource for which the lock data is being retrieved.
* @returns A promise that resolves to the `Lock` instance if found, or `null` if the lock data is not available.
*/
protected async getLockData(resource: string): Promise<Lock | null> {
const registryKey = this.getLockRegistryKey(resource);
protected async getLockData(key: string): Promise<Lock | null> {
const registryKey = MeetLock.getRegistryLock(key);
try {
this.logger.debug(`Getting lock data in Redis for resource: ${resource}`);
this.logger.debug(`Getting lock data in Redis for resource: ${key}`);
// Try to get lock from Redis
const redisLockData = await this.redisService.get(registryKey);
if (!redisLockData) {
this.logger.error(`Cannot release lock. Lock not found for resource: ${resource}.`);
this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`);
return null;
}
const { resources, value, expiration } = JSON.parse(redisLockData);
return new Lock(this.redlockWithoutRetry, resources, value, [], expiration);
} catch (error) {
this.logger.error(`Cannot release lock. Lock not found for resource: ${resource}.`);
this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`);
return null;
}
}

View File

@ -1,5 +1,6 @@
import { EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
import { uid } from 'uid';
import ms from 'ms';
import { Readable } from 'stream';
import { LiveKitService } from './livekit.service.js';
import {
@ -10,19 +11,29 @@ import {
errorRecordingNotStopped,
errorRoomNotFound,
internalError,
isErrorRecordingAlreadyStopped,
isErrorRecordingCannotBeStoppedWhileStarting,
isErrorRecordingNotFound,
OpenViduMeetError
} from '../models/error.model.js';
import { S3Service } from './s3.service.js';
import { LoggerService } from './logger.service.js';
import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce';
import { RecordingHelper } from '../helpers/recording.helper.js';
import { MEET_RECORDING_LOCK_TTL, MEET_S3_BUCKET, MEET_S3_RECORDINGS_PREFIX, MEET_S3_SUBBUCKET } from '../environment.js';
import {
MEET_RECORDING_LOCK_TTL,
MEET_S3_BUCKET,
MEET_S3_RECORDINGS_PREFIX,
MEET_S3_SUBBUCKET
} from '../environment.js';
import { RoomService } from './room.service.js';
import { inject, injectable } from '../config/dependency-injector.config.js';
import { MutexService, RedisLock } from './mutex.service.js';
import { RedisLockName } from '../models/index.js';
import ms from 'ms';
import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js';
import { MeetLock } from '../helpers/redis.helper.js';
import { TaskSchedulerService } from './task-scheduler.service.js';
import { SystemEventService } from './system-event.service.js';
import { SystemEventType } from '../models/system-event.model.js';
@injectable()
export class RecordingService {
@ -31,6 +42,8 @@ export class RecordingService {
@inject(LiveKitService) protected livekitService: LiveKitService,
@inject(RoomService) protected roomService: RoomService,
@inject(MutexService) protected mutexService: MutexService,
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(LoggerService) protected logger: LoggerService
) {}
@ -42,6 +55,9 @@ export class RecordingService {
if (!room) throw errorRoomNotFound(roomId);
//TODO: Check if the room has participants before starting the recording
//room.numParticipants === 0 ? throw errorNoParticipants(roomId);
// Attempt to acquire lock. If the lock is not acquired, the recording is already active.
acquiredLock = await this.acquireRoomRecordingActiveLock(roomId);
@ -50,10 +66,33 @@ export class RecordingService {
const options = this.generateCompositeOptionsFromRequest();
const output = this.generateFileOutputFromRequest(roomId);
const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options);
const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
const { recordingId } = recordingInfo;
return RecordingHelper.toRecordingInfo(egressInfo);
const recordingPromise = new Promise<MeetRecordingInfo>((resolve, reject) => {
this.taskSchedulerService.scheduleRecordingCleanupTimer(
roomId,
this.handleRecordingLockTimeout.bind(this, recordingId, roomId, reject)
);
this.systemEventService.once(SystemEventType.RECORDING_ACTIVE, (payload: Record<string, unknown>) => {
// This listener is triggered only for the instance that started the recording.
// Check if the recording ID matches the one that was started
const isEventForCurrentRecording =
payload?.recordingId === recordingId && payload?.roomId === roomId;
if (isEventForCurrentRecording) {
this.taskSchedulerService.cancelRecordingCleanupTimer(roomId);
resolve(recordingInfo);
} else {
this.logger.error('Received recording active event with mismatched recording ID:', payload);
}
});
});
return await recordingPromise;
} catch (error) {
this.logger.error(`Error starting recording in room ${roomId}: ${error}`);
this.logger.error(`Error starting recording in room '${roomId}': ${error}`);
if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomId);
@ -71,6 +110,11 @@ export class RecordingService {
throw errorRecordingNotFound(egressId);
}
// Cancel the recording cleanup timer if it is running
this.taskSchedulerService.cancelRecordingCleanupTimer(roomId);
// Remove the listener for the EGRESS_STARTED event.
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE);
switch (egress.status) {
case EgressStatus.EGRESS_ACTIVE:
// Everything is fine, the recording can be stopped.
@ -85,9 +129,10 @@ export class RecordingService {
const egressInfo = await this.livekitService.stopEgress(egressId);
this.logger.info(`Recording stopped successfully for room '${roomId}'.`);
return RecordingHelper.toRecordingInfo(egressInfo);
} catch (error) {
this.logger.error(`Error stopping recording ${recordingId}: ${error}`);
this.logger.error(`Error stopping recording '${recordingId}': ${error}`);
throw error;
}
}
@ -275,7 +320,7 @@ export class RecordingService {
* @param roomId - The name of the room to acquire the lock for.
*/
async acquireRoomRecordingActiveLock(roomId: string): Promise<RedisLock | null> {
const lockName = `${roomId}_${RedisLockName.RECORDING_ACTIVE}`;
const lockName = MeetLock.getRecordingActiveLock(roomId);
try {
const lock = await this.mutexService.acquire(lockName, ms(MEET_RECORDING_LOCK_TTL));
@ -292,14 +337,24 @@ export class RecordingService {
* This method attempts to release a lock associated with the active recording
* of a given room.
*/
async releaseRoomRecordingActiveLock(roomName: string): Promise<void> {
if (roomName) {
const lockName = `${roomName}_${RedisLockName.RECORDING_ACTIVE}`;
async releaseRoomRecordingActiveLock(roomId: string): Promise<void> {
if (roomId) {
const lockName = MeetLock.getRecordingActiveLock(roomId);
const egress = await this.livekitService.getActiveEgress(roomId);
if (egress.length > 0) {
this.logger.verbose(
`Active egress found for room ${roomId}: ${egress.map((e) => e.egressId).join(', ')}`
);
this.logger.error(`Cannot release recorgin lock for room '${roomId}'.`);
return;
}
try {
await this.mutexService.release(lockName);
this.logger.verbose(`Recording active lock released for room '${roomId}'.`);
} catch (error) {
this.logger.warn(`Error releasing lock ${lockName} on egress ended: ${error}`);
this.logger.warn(`Error releasing recording lock for room '${roomId}' on egress ended: ${error}`);
}
}
}
@ -362,7 +417,7 @@ export class RecordingService {
return { recordingInfo, metadataFilePath: metadataPath };
}
private generateCompositeOptionsFromRequest(layout = 'speaker'): RoomCompositeOptions {
protected generateCompositeOptionsFromRequest(layout = 'speaker'): RoomCompositeOptions {
return {
layout: layout
// customBaseUrl: customLayout,
@ -377,7 +432,7 @@ export class RecordingService {
* @param fileName - The name of the file (default is 'recording').
* @returns The generated file output object.
*/
private generateFileOutputFromRequest(roomId: string): EncodedFileOutput {
protected generateFileOutputFromRequest(roomId: string): EncodedFileOutput {
// Added unique identifier to the file path for avoiding overwriting
const recordingName = `${roomId}--${uid(10)}`;
@ -400,7 +455,72 @@ export class RecordingService {
* @param str - The input string to sanitize for use in a regular expression.
* @returns A new string with special characters escaped.
*/
private sanitizeRegExp(str: string) {
protected sanitizeRegExp(str: string) {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
/**
* Callback function to release the active recording lock after a timeout.
* This function is scheduled by the recording cleanup timer when a recording is started.
*
* @param recordingId
* @param roomId
*/
protected async handleRecordingLockTimeout(
recordingId: string,
roomId: string,
rejectRequest: (reason?: unknown) => void
) {
this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`);
let shouldReleaseLock = false;
try {
await this.stopRecording(recordingId);
// The recording was stopped successfully
// the cleanup timer will be cancelled when the egress_ended event is received.
} catch (error) {
if (error instanceof OpenViduMeetError) {
// The recording is already stopped or not found in LiveKit.
const isRecordingAlreadyStopped = isErrorRecordingAlreadyStopped(error, recordingId);
const isRecordingNotFound = isErrorRecordingNotFound(error, recordingId);
if (isRecordingAlreadyStopped || isRecordingNotFound) {
this.logger.verbose(`Recording ${recordingId} is already stopped or not found.`);
this.logger.verbose(' Proceeding to release the recording active lock.');
shouldReleaseLock = true;
} else if (isErrorRecordingCannotBeStoppedWhileStarting(error, recordingId)) {
// The recording is still starting, the cleanup timer will be cancelled.
this.logger.warn(
`Recording ${recordingId} is still starting. Skipping recording active lock release.`
);
} else {
// An error occurred while stopping the recording.
this.logger.error(`Error stopping recording ${recordingId}: ${error.message}`);
shouldReleaseLock = true;
}
} else {
this.logger.error(`Unexpected error while run recording cleanup timer:`, error);
}
} finally {
if (shouldReleaseLock) {
try {
await this.releaseRoomRecordingActiveLock(roomId);
this.logger.debug(`Recording active lock released for room ${roomId}.`);
} catch (releaseError) {
this.logger.error(`Error releasing active recording lock for room ${roomId}: ${releaseError}`);
}
}
// Reject the REST request with a timeout error.
rejectRequest(new Error(`Timeout waiting for '${SystemEventType.RECORDING_ACTIVE}' event in room '${roomId}'`));
}
}
protected async deleteOrphanLocks() {
// TODO: Get all recordings active locks from redis
// TODO: Extract all rooms ids from the active locks
// TODO: Check each room id if it has an active recording
// TODO: Remove the lock if the room has no active recording
}
}

View File

@ -16,20 +16,29 @@ import { LoggerService } from './logger.service.js';
import { EventEmitter } from 'events';
import Redlock from 'redlock';
import ms from 'ms';
import { SystemEventPayload } from '../models/system-event.model.js';
@injectable()
export class RedisService {
export class RedisService extends EventEmitter {
protected readonly DEFAULT_TTL: number = ms('32 days');
protected redis: Redis;
protected EVENT_CHANNEL = 'ov_meet_channel';
protected redisPublisher: Redis;
protected redisSubscriber: Redis;
protected isConnected = false;
public events: EventEmitter;
protected eventHandler?: (event: SystemEventPayload) => void;
constructor(@inject(LoggerService) protected logger: LoggerService) {
this.events = new EventEmitter();
const redisOptions = this.loadRedisConfig();
this.redis = new Redis(redisOptions);
super();
this.redis.on('connect', () => {
const redisOptions = this.loadRedisConfig();
this.redisPublisher = new Redis(redisOptions);
this.redisSubscriber = new Redis(redisOptions);
this.setupEventHandlers();
}
protected setupEventHandlers(): void {
const onConnect = () => {
if (!this.isConnected) {
this.logger.verbose('Connected to Redis');
} else {
@ -37,18 +46,29 @@ export class RedisService {
}
this.isConnected = true;
this.events.emit('redisConnected');
});
this.redis.on('error', (e) => this.logger.error('Error Redis', e));
this.emit('redisConnected');
};
this.redis.on('end', () => {
const onError = (error: Error) => {
this.logger.error('Redis Error', error);
};
const onDisconnect = () => {
this.isConnected = false;
this.logger.warn('Redis disconnected');
});
this.emit('redisDisconnected');
};
this.redisPublisher.on('connect', onConnect);
this.redisSubscriber.on('connect', onConnect);
this.redisPublisher.on('error', onError);
this.redisSubscriber.on('error', onError);
this.redisPublisher.on('end', onDisconnect);
this.redisSubscriber.on('end', onDisconnect);
}
createRedlock(retryCount = -1, retryDelay = 200) {
return new Redlock([this.redis], {
return new Redlock([this.redisPublisher], {
driftFactor: 0.01,
retryCount,
retryDelay,
@ -61,7 +81,62 @@ export class RedisService {
callback();
}
this.events.on('redisConnected', callback);
this.on('redisConnected', callback);
}
/**
* Publishes a message to a specified Redis channel.
*
* @param channel - The name of the Redis channel to publish the message to.
* @param message - The message to be published to the channel.
* @returns A promise that resolves when the message has been successfully published.
*/
async publishEvent(channel: string, message: string) {
try {
await this.redisPublisher.publish(channel, message);
} catch (error) {
this.logger.error('Error publishing message to Redis', error);
}
}
/**
* Subscribes to a Redis channel.
*
* @param channel - The channel to subscribe to.
* @param callback - The callback function to execute when a message is received on the channel.
*/
subscribe(channel: string, callback: (message: string) => void) {
this.logger.verbose(`Subscribing to Redis channel: ${channel}`);
this.redisSubscriber.subscribe(channel, (err, count) => {
if (err) {
this.logger.error('Error subscribing to Redis channel', err);
return;
}
this.logger.verbose(`Subscribed to ${channel}. Now subscribed to ${count} channel(s).`);
});
this.redisSubscriber.on('message', (receivedChannel, message) => {
if (receivedChannel === channel) {
callback(message);
}
});
}
/**
* Unsubscribes from a Redis channel.
*
* @param channel - The channel to unsubscribe from.
*/
unsubscribe(channel: string) {
this.redisSubscriber.unsubscribe(channel, (err, count) => {
if (err) {
this.logger.error('Error unsubscribing from Redis channel', err);
return;
}
this.logger.verbose(`Unsubscribed from channel ${channel}. Now subscribed to ${count} channel(s).`);
});
}
/**
@ -76,7 +151,7 @@ export class RedisService {
const keys: Set<string> = new Set();
do {
const [nextCursor, partialKeys] = await this.redis.scan(cursor, 'MATCH', pattern);
const [nextCursor, partialKeys] = await this.redisPublisher.scan(cursor, 'MATCH', pattern);
partialKeys.forEach((key) => keys.add(key));
cursor = nextCursor;
} while (cursor !== '0');
@ -98,9 +173,9 @@ export class RedisService {
get(key: string, hashKey?: string): Promise<string | null> {
try {
if (hashKey) {
return this.redis.hget(key, hashKey);
return this.redisPublisher.hget(key, hashKey);
} else {
return this.redis.get(key);
return this.redisPublisher.get(key);
}
} catch (error) {
this.logger.error('Error getting value from Redis', error);
@ -108,24 +183,6 @@ export class RedisService {
}
}
// getAll(key: string): Promise<Record<string, string>> {
// try {
// return this.redis.hgetall(key);
// } catch (error) {
// this.logger.error('Error getting value from Redis', error);
// throw internalError(error);
// }
// }
// getDel(key: string): Promise<string | null> {
// try {
// return this.redis.getdel(key);
// } catch (error) {
// this.logger.error('Error getting and deleting value from Redis', error);
// throw internalError(error);
// }
// }
/**
* Sets a value in Redis with an optional TTL (time-to-live).
*
@ -141,14 +198,14 @@ export class RedisService {
if (valueType === 'string' || valueType === 'number' || valueType === 'boolean') {
if (withTTL) {
await this.redis.set(key, value, 'EX', this.DEFAULT_TTL);
await this.redisPublisher.set(key, value, 'EX', this.DEFAULT_TTL);
} else {
await this.redis.set(key, value);
await this.redisPublisher.set(key, value);
}
} else if (valueType === 'object') {
await this.redis.hmset(key, value);
await this.redisPublisher.hmset(key, value);
if (withTTL) await this.redis.expire(key, this.DEFAULT_TTL);
if (withTTL) await this.redisPublisher.expire(key, this.DEFAULT_TTL);
} else {
throw new Error('Invalid value type');
}
@ -169,9 +226,9 @@ export class RedisService {
delete(key: string, hashKey?: string): Promise<number> {
try {
if (hashKey) {
return this.redis.hdel(key, hashKey);
return this.redisPublisher.hdel(key, hashKey);
} else {
return this.redis.del(key);
return this.redisPublisher.del(key);
}
} catch (error) {
throw internalError(`Error deleting key from Redis ${error}`);
@ -179,11 +236,11 @@ export class RedisService {
}
quit() {
this.redis.quit();
this.redisPublisher.quit();
}
async checkHealth() {
return (await this.redis.ping()) === 'PONG';
return (await this.redisPublisher.ping()) === 'PONG';
}
private loadRedisConfig(): RedisOptions {

View File

@ -1,15 +1,100 @@
import { inject, injectable } from 'inversify';
import { RedisService } from './redis.service.js';
import { LoggerService } from './logger.service.js';
import { EventEmitter } from 'events';
import { SystemEventPayload, SystemEventType } from '../models/system-event.model.js';
@injectable()
export class SystemEventService {
protected emitter: EventEmitter = new EventEmitter();
protected readonly OPENVIDU_MEET_CHANNEL = 'ov_meet_channel';
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(RedisService) protected redisService: RedisService
) {}
) {
this.emitter.setMaxListeners(Infinity);
this.redisService.subscribe(this.OPENVIDU_MEET_CHANNEL, this.handleRedisMessage.bind(this));
}
/**
* Subscribes to a specific system event.
*
* @param event The event type to subscribe to.
* @param listener The callback to invoke when the event is emitted.
*/
on(event: SystemEventType, listener: (payload: Record<string, unknown>) => void): void {
this.emitter.on(event, listener);
}
/**
* Subscribes to a specific system event once.
*
* @param event The event type to subscribe to.
* @param listener The callback to invoke when the event is emitted.
*/
once(event: SystemEventType, listener: (payload: Record<string, unknown>) => void): void {
this.emitter.once(event, listener);
}
/**
* Unsubscribes from a specific system event.
*
* @param event The event type to unsubscribe from.
* @param listener Optional: the specific listener to remove. If not provided, all listeners for that event are removed.
*/
off(event: SystemEventType, listener?: (payload: Record<string, unknown>) => void): void {
if (listener) {
this.emitter.off(event, listener);
} else {
this.emitter.removeAllListeners(event);
}
}
/**
* Publishes a system event to the central Redis channel.
* This method can be used by any part of the application to send a system-wide event.
*
* @param type The event type.
* @param payload The event payload.
*/
async publishEvent(eventType: SystemEventType, payload: Record<string, unknown>): Promise<void> {
const message = JSON.stringify({ eventType, payload });
this.logger.verbose(`Publishing system event: ${eventType}`, payload);
await this.redisService.publishEvent(this.OPENVIDU_MEET_CHANNEL, message);
}
/**
* Registers a callback function to be executed when the Redis connection is ready.
*
* @param callback - A function to be called when the Redis connection is ready.
*/
onRedisReady(callback: () => void) {
this.redisService.onReady(callback);
}
/**
* Handles incoming messages from Redis, parses them as system events,
* and emits the corresponding event to all registered listeners.
*
* @param message - The raw message string received from Redis.
* @throws Will log an error if the message cannot be parsed as JSON.
*/
protected handleRedisMessage(message: string): void {
try {
const eventData: SystemEventPayload = JSON.parse(message);
const { eventType, payload } = eventData;
if (!eventType) {
this.logger.warn('Received an event without type from Redis:', message);
return;
}
this.logger.verbose(`Emitting system event: ${eventType}`, payload);
// Forward the event to all listeners
this.emitter.emit(eventType, payload);
} catch (error) {
this.logger.error('Error parsing redis message in SystemEventsService:', error);
}
}
}

View File

@ -3,11 +3,14 @@ import { LoggerService } from './index.js';
import { SystemEventService } from './system-event.service.js';
import { CronJob } from 'cron';
import { MutexService } from './mutex.service.js';
import { RedisLockName } from '../models/redis.model.js';
import { MeetLock } from '../helpers/redis.helper.js';
import ms from 'ms';
import { MEET_RECORDING_CLEANUP_TIMEOUT } from '../environment.js';
@injectable()
export class TaskSchedulerService {
protected roomGarbageCollectorJob: CronJob | null = null;
private recordingCleanupTimers: Map<string, NodeJS.Timeout> = new Map();
constructor(
@inject(LoggerService) protected logger: LoggerService,
@ -53,4 +56,45 @@ export class TaskSchedulerService {
this.logger.debug('Starting room garbage collector');
this.roomGarbageCollectorJob.start();
}
/**
* Schedules a cleanup timer for a recording that has just started.
*
* If the egress_started webhook is not received before the timer expires,
* this timer will execute a cleanup callback by stopping the recording and releasing
* the active lock for the specified room.
*/
async scheduleRecordingCleanupTimer(roomId: string, cleanupCallback: () => Promise<void>): Promise<void> {
this.logger.debug(`Recording cleanup timer (${MEET_RECORDING_CLEANUP_TIMEOUT}) scheduled for room ${roomId}.`);
// Schedule a timeout to run the cleanup callback after a specified time
const timeoutMs = ms(MEET_RECORDING_CLEANUP_TIMEOUT);
const timer = setTimeout(async () => {
this.logger.warn(`Recording cleanup timer expired for room ${roomId}. Initiating cleanup process.`);
this.recordingCleanupTimers.delete(roomId);
await cleanupCallback();
}, timeoutMs);
this.recordingCleanupTimers.set(roomId, timer);
}
cancelRecordingCleanupTimer(roomId: string): void {
const timer = this.recordingCleanupTimers.get(roomId);
if (timer) {
clearTimeout(timer);
this.recordingCleanupTimers.delete(roomId);
this.logger.info(`Recording cleanup timer cancelled for room ${roomId}`);
}
}
async startRecordingLockGarbageCollector(callbackFn: () => Promise<void>): Promise<void> {
// Create a cron job to run every minute
const recordingLockGarbageCollectorJob = new CronJob('0 * * * * *', async () => {
try {
await callbackFn();
} catch (error) {
this.logger.error('Error running recording lock garbage collection:', error);
}
});
}
}