backend: Add scheduled task support and recording GC

This commit is contained in:
Carlos Santos 2025-03-31 12:44:35 +02:00
parent de9caec62a
commit 58e79e2e7e
9 changed files with 466 additions and 49 deletions

View File

@ -15,6 +15,7 @@
"cookie-parser": "1.4.7",
"cors": "2.8.5",
"cron": "^4.1.0",
"cron-parser": "^5.1.0",
"dotenv": "16.4.7",
"express": "4.21.2",
"express-openapi-validator": "^5.4.2",
@ -6265,6 +6266,18 @@
"node": ">=18.x"
}
},
"node_modules/cron-parser": {
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.1.0.tgz",
"integrity": "sha512-dnR1hHR7iGJe+Jev72sareuGO+/AzzbVGoy95lkXuw+ftvDkERdnXRnJMJ+ArxKId5QnLXGckFgVGCCwAMhlRw==",
"license": "MIT",
"dependencies": {
"luxon": "^3.5.0"
},
"engines": {
"node": ">=18"
}
},
"node_modules/cross-env": {
"version": "7.0.3",
"resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz",

View File

@ -47,6 +47,7 @@
"cookie-parser": "1.4.7",
"cors": "2.8.5",
"cron": "^4.1.0",
"cron-parser": "^5.1.0",
"dotenv": "16.4.7",
"express": "4.21.2",
"express-openapi-validator": "^5.4.2",

View File

@ -1,5 +1,6 @@
import dotenv from 'dotenv';
import chalk from 'chalk';
import ms from 'ms';
const envPath = process.env.MEET_CONFIG_DIR
? process.env.MEET_CONFIG_DIR
@ -80,8 +81,9 @@ export const MEET_S3_ROOMS_PREFIX = 'rooms';
export const MEET_S3_RECORDINGS_PREFIX = 'recordings';
// Time to live for the active recording lock in Redis
export const MEET_RECORDING_LOCK_TTL = '6h';
export const MEET_RECORDING_CLEANUP_TIMEOUT = '30s';
export const MEET_RECORDING_LOCK_TTL: ms.StringValue = '6h';
export const MEET_RECORDING_STARTED_TIMEOUT: ms.StringValue = '30s';
export const MEET_RECORDING_LOCK_GC_INTERVAL: ms.StringValue = '30m';
export function checkModuleEnabled() {
if (MODULES_FILE) {

View File

@ -10,7 +10,7 @@ export class MeetLock {
throw new Error('roomId must be a non-empty string');
}
return `${RedisLockPrefix.BASE}${roomId}_${RedisLockName.RECORDING_ACTIVE}`;
return `${RedisLockPrefix.BASE}${RedisLockName.RECORDING_ACTIVE}_${roomId}`;
}
static getRegistryLock(lockName: string): string {
@ -21,8 +21,15 @@ export class MeetLock {
return `${RedisLockPrefix.REGISTRY}${lockName}`;
}
static getScheduledTaskLock(taskName: string): string {
if (!taskName) {
throw new Error('taskName must be a non-empty string');
}
return `${RedisLockPrefix.BASE}${RedisLockName.SCHEDULED_TASK}_${taskName}`;
}
static getRoomGarbageCollectorLock(): string {
return `${RedisLockPrefix.BASE}${RedisLockName.ROOM_GARBAGE_COLLECTOR}`;
}
}

View File

@ -5,6 +5,6 @@ export const enum RedisLockPrefix {
export const enum RedisLockName {
ROOM_GARBAGE_COLLECTOR = 'room_garbage_collector',
RECORDING_ACTIVE = 'recording_active'
RECORDING_ACTIVE = 'recording_active',
SCHEDULED_TASK = 'scheduled_task'
}

View File

@ -30,6 +30,7 @@ import {
internalError
} from '../models/error.model.js';
import { ParticipantPermissions, ParticipantRole, TokenOptions } from '@typings-ce';
import { RecordingHelper } from '../helpers/recording.helper.js';
@injectable()
export class LiveKitService {
@ -178,11 +179,12 @@ export class LiveKitService {
* @returns {Promise<EgressInfo[]>} A promise that resolves to an array of EgressInfo objects.
* @throws Will throw an error if there is an issue retrieving the egress information.
*/
async getEgress(roomName?: string, egressId?: string): Promise<EgressInfo[]> {
async getEgress(roomName?: string, egressId?: string, active?: boolean): Promise<EgressInfo[]> {
try {
const options: ListEgressOptions = {
roomName,
egressId
egressId,
active
};
return await this.egressClient.listEgress(options);
} catch (error: any) {
@ -203,25 +205,77 @@ export class LiveKitService {
* @throws Will throw an error if there is an issue retrieving the egress information.
*/
async getActiveEgress(roomName?: string, egressId?: string): Promise<EgressInfo[]> {
const egress = await this.getEgress(roomName, egressId, true);
// In some cases, the egress list may contain egress that their status is ENDINDG
// which means that the egress is still active but it is in the process of stopping.
// We need to filter those out.
return egress.filter((e) => e.status === EgressStatus.EGRESS_ACTIVE);
}
/**
* Retrieves all recording egress sessions for a specific room or all rooms.
*
* @param {string} [roomName] - Optional room name to filter recordings by room
* @returns {Promise<EgressInfo[]>} A promise that resolves to an array of recording EgressInfo objects
* @throws Will throw an error if there is an issue retrieving the egress information
*/
async getRecordingsEgress(roomName?: string): Promise<EgressInfo[]> {
const egressArray = await this.getEgress(roomName);
if (egressArray.length === 0) {
return [];
}
// Filter the egress array to include only recording egress
return egressArray.filter((egress) => RecordingHelper.isRecordingEgress(egress));
}
/**
* Retrieves all active recording egress sessions for a specific room or all rooms.
*
* @param {string} [roomName] - Optional room name to filter recordings by room
* @returns {Promise<EgressInfo[]>} A promise that resolves to an array of active recording EgressInfo objects
* @throws Will throw an error if there is an issue retrieving the egress information
*/
async getActiveRecordingsEgress(roomName?: string): Promise<EgressInfo[]> {
// Get all recording egress
const recordingEgress = await this.getRecordingsEgress(roomName);
if (recordingEgress.length === 0) {
return [];
}
// Filter the recording egress array to include only active egress
return recordingEgress.filter((egress) => egress.status === EgressStatus.EGRESS_ACTIVE);
}
/**
* Retrieves all in-progress recording egress sessions for a specific room or all rooms.
*
* This method checks it is in any "in-progress" state, including EGRESS_STARTING, EGRESS_ACTIVE, and EGRESS_ENDING.
*
* @param {string} [roomName] - Optional room name to filter recordings by room
* @returns {Promise<EgressInfo[]>} A promise that resolves to an array of in-progress recording EgressInfo objects
* @throws Will throw an error if there is an issue retrieving the egress information
*/
async getInProgressRecordingsEgress(roomName?: string): Promise<EgressInfo[]> {
try {
const options: ListEgressOptions = {
roomName,
egressId,
active: true
};
const egress = await this.egressClient.listEgress(options);
const egressArray = await this.getEgress(roomName);
return egressArray.filter((egress) => {
if (!RecordingHelper.isRecordingEgress(egress)) {
return false;
}
// In some cases, the egress list may contain egress that their status is ENDINDG
// which means that the egress is still active but it is in the process of stopping.
// We need to filter those out.
return egress.filter((e) => e.status === EgressStatus.EGRESS_ACTIVE);
} catch (error: any) {
if (error.message.includes('404')) {
return [];
}
this.logger.error(`Error getting egress: ${JSON.stringify(error)}`);
throw internalError(`Error getting egress: ${error}`);
// Check if recording is in any "in-progress" state
return [EgressStatus.EGRESS_STARTING, EgressStatus.EGRESS_ACTIVE, EgressStatus.EGRESS_ENDING].includes(
egress.status
);
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Error getting in-progress recordings: ${errorMessage}`);
throw internalError(`Error getting in-progress recordings: ${errorMessage}`);
}
}

View File

@ -38,7 +38,8 @@ export class MutexService {
JSON.stringify({
resources: lock.resources,
value: lock.value,
expiration: lock.expiration
expiration: lock.expiration,
createdAt: Date.now()
}),
true
);
@ -57,17 +58,17 @@ export class MutexService {
*/
async release(key: string): Promise<void> {
const registryKey = MeetLock.getRegistryLock(key);
const lock = await this.getLockData(key);
const lock = await this.getLockData(registryKey);
if (!lock) {
this.logger.warn(`Lock not found for resource: ${key}. May be expired or released by another process.`);
return;
}
if (lock) {
this.logger.debug(`Releasing lock for resource: ${key}`);
try {
await lock.release();
this.logger.verbose(`Lock ${key} successfully released.`);
} catch (error) {
this.logger.error(`Error releasing lock for key ${key}:`, error);
} finally {
@ -77,31 +78,75 @@ export class MutexService {
}
/**
* Retrieves the lock data for a given resource.
* Retrieves all locks for a given prefix.
*
* This method first attempts to retrieve the lock from Redis. If the lock data is successfully retrieved from Redis,
* it constructs a new `Lock` instance and returns it. If the lock data cannot be found the method returns `null`.
* This method retrieves all keys from Redis that match the specified prefix and returns an array of `Lock` instances.
*
* @param key - The identifier of the resource for which the lock data is being retrieved.
* @returns A promise that resolves to the `Lock` instance if found, or `null` if the lock data is not available.
* @param pattern - The prefix to filter the keys in Redis.
* @returns A promise that resolves to an array of `Lock` instances.
*/
protected async getLockData(key: string): Promise<Lock | null> {
async getLocksByPrefix(pattern: string): Promise<Lock[]> {
const registryPattern = MeetLock.getRegistryLock(pattern);
const keys = await this.redisService.getKeys(registryPattern);
this.logger.debug(`Found ${keys.length} registry keys for pattern "${pattern}".`);
if (keys.length === 0) {
return [];
}
const lockPromises: Promise<Lock | null>[] = keys.map((key) => this.getLockData(key));
const locksResult = await Promise.all(lockPromises);
const locks = locksResult.filter((lock): lock is Lock => lock !== null);
return locks;
}
lockExists(key: string): Promise<boolean> {
const registryKey = MeetLock.getRegistryLock(key);
return this.redisService.exists(registryKey);
}
/**
* Retrieves the creation timestamp of a lock identified by the given key.
*
* @param key - The unique identifier for the lock
* @returns A Promise that resolves to the creation timestamp (as a number) of the lock, or null if the lock doesn't exist or has expired
*/
async getLockCreatedAt(key: string): Promise<number | null> {
const registryKey = MeetLock.getRegistryLock(key);
const redisLockData = await this.redisService.get(registryKey);
if (!redisLockData) {
this.logger.warn(
`Lock not found for resource: ${registryKey}. May be expired or released by another process.`
);
return null;
}
const { createdAt } = JSON.parse(redisLockData);
return createdAt;
}
/**
* Retrieves the lock data for a given resource key.
*
* @param registryKey - The resource key to retrieve the lock data for.
* @returns A promise that resolves to a `Lock` instance or null if not found.
*/
protected async getLockData(registryKey: string): Promise<Lock | null> {
try {
this.logger.debug(`Getting lock data in Redis for resource: ${key}`);
// Try to get lock from Redis
const redisLockData = await this.redisService.get(registryKey);
if (!redisLockData) {
this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`);
return null;
}
const { resources, value, expiration } = JSON.parse(redisLockData);
return new Lock(this.redlockWithoutRetry, resources, value, [], expiration);
} catch (error) {
this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`);
return null;
}
}

View File

@ -21,6 +21,7 @@ import { LoggerService } from './logger.service.js';
import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce';
import { RecordingHelper } from '../helpers/recording.helper.js';
import {
MEET_RECORDING_LOCK_GC_INTERVAL,
MEET_RECORDING_LOCK_TTL,
MEET_S3_BUCKET,
MEET_S3_RECORDINGS_PREFIX,
@ -31,7 +32,7 @@ import { inject, injectable } from '../config/dependency-injector.config.js';
import { MutexService, RedisLock } from './mutex.service.js';
import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js';
import { MeetLock } from '../helpers/redis.helper.js';
import { TaskSchedulerService } from './task-scheduler.service.js';
import { IScheduledTask, TaskSchedulerService } from './task-scheduler.service.js';
import { SystemEventService } from './system-event.service.js';
import { SystemEventType } from '../models/system-event.model.js';
@ -45,7 +46,16 @@ export class RecordingService {
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(LoggerService) protected logger: LoggerService
) {}
) {
// Register the recording garbage collector task
const recordingGarbageCollectorTask: IScheduledTask = {
name: 'activeRecordingGarbageCollector',
type: 'cron',
scheduleOrDelay: MEET_RECORDING_LOCK_GC_INTERVAL,
callback: this.deleteOrphanLocks.bind(this)
};
this.taskSchedulerService.registerTask(recordingGarbageCollectorTask);
}
async startRecording(roomId: string): Promise<MeetRecordingInfo> {
let acquiredLock: RedisLock | null = null;
@ -504,10 +514,131 @@ export class RecordingService {
}
}
protected async deleteOrphanLocks() {
// TODO: Get all recordings active locks from redis
// TODO: Extract all rooms ids from the active locks
// TODO: Check each room id if it has an active recording
// TODO: Remove the lock if the room has no active recording
/**
* Cleans up orphaned recording locks in the system.
*
* This method identifies and releases locks that are no longer needed by:
* 1. Finding all active recording locks in the system
* 2. Checking if the associated room still exists in LiveKit
* 3. For existing rooms, checking if they have active recordings in progress
* 4. Releasing lock if the room exists but has no participants or no active recordings
* 5. Releasing lock if the room does not exist
*
* Orphaned locks can occur when:
* - A room is deleted but its lock remains
* - A recording completes but the lock isn't released
* - System crashes during the recording process
*
* @returns {Promise<void>} A promise that resolves when the cleanup process completes
* @throws {OpenViduMeetError} Rethrows any errors except 404 (room not found)
* @protected
*/
protected async deleteOrphanLocks(): Promise<void> {
this.logger.debug('Starting orphaned recording locks cleanup process');
// Create the lock pattern for finding all recording locks
const lockPattern = MeetLock.getRecordingActiveLock('roomId').replace('roomId', '*');
this.logger.debug(`Searching for locks with pattern: ${lockPattern}`);
let recordingLocks: RedisLock[] = [];
try {
recordingLocks = await this.mutexService.getLocksByPrefix(lockPattern);
if (recordingLocks.length === 0) {
this.logger.debug('No active recording locks found');
return;
}
// Extract all rooms ids from the active locks
const lockPrefix = lockPattern.replace('*', '');
const roomIds = recordingLocks.map((lock) => lock.resources[0].replace(lockPrefix, ''));
// Check each room id if it exists in LiveKit
// If the room does not exist, release the lock
for (const roomId of roomIds) {
await this.processOrphanLock(roomId, lockPrefix);
}
} catch (error) {
this.logger.error('Error retrieving recording locks:', error);
}
}
/**
* Process an orphaned lock by checking if the associated room exists and releasing the lock if necessary.
*
* @param roomId - The ID of the room associated with the lock.
* @param lockPrefix - The prefix used to identify the lock.
*/
protected async processOrphanLock(roomId: string, lockPrefix: string): Promise<void> {
const lockKey = `${lockPrefix}${roomId}`;
const LOCK_GRACE_PERIOD = ms('1m');
try {
// Verify if the lock still exists before proceeding to check the room
const lockExists = await this.mutexService.lockExists(lockKey);
if (!lockExists) {
this.logger.debug(`Lock for room ${roomId} no longer exists, skipping`);
return;
}
// Verify if the lock is too recent
const createdAt = await this.mutexService.getLockCreatedAt(lockKey);
const lockAge = Date.now() - (createdAt || Date.now());
if (lockAge < LOCK_GRACE_PERIOD) {
this.logger.debug(`Lock for room ${roomId} is too recent (${ms(lockAge)}), skipping orphan lock cleanup`);
return;
}
const room = await this.livekitService.getRoom(roomId);
if (room.numPublishers === 0) {
const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId);
if (inProgressRecordings.length > 0) {
this.logger.debug(
`Room ${roomId} has ${inProgressRecordings.length} recordings in transition, keeping lock`
);
return;
}
this.logger.info(`Room ${roomId} no longer exists, releasing orphaned lock`);
await this.mutexService.release(lockKey);
return;
}
// The room has participants, check if it has in-progress recordings
const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId);
if (inProgressRecordings.length === 0) {
// If the room has no active recording, release the lock
this.logger.info(`No active recordings for room ${roomId}, releasing orphaned lock`);
await this.mutexService.release(lockKey);
}
} catch (error) {
if (error instanceof OpenViduMeetError && error.statusCode === 404) {
this.logger.info(`Room ${roomId} no longer exists, releasing orphaned lock`);
try {
const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId);
if (inProgressRecordings.length > 0) {
this.logger.debug(
`Room ${roomId} has ${inProgressRecordings.length} recordings in transition, keeping lock`
);
return;
}
await this.mutexService.release(lockKey);
} catch (error) {
this.logger.error(`Error releasing lock for room ${roomId}:`, error);
}
return;
}
throw error;
}
}
}

View File

@ -5,18 +5,40 @@ import { CronJob } from 'cron';
import { MutexService } from './mutex.service.js';
import { MeetLock } from '../helpers/redis.helper.js';
import ms from 'ms';
import { MEET_RECORDING_CLEANUP_TIMEOUT } from '../environment.js';
import { CronExpressionParser } from 'cron-parser';
import { MEET_RECORDING_STARTED_TIMEOUT } from '../environment.js';
export type TaskType = 'cron' | 'timeout';
export interface IScheduledTask {
name: string;
type: TaskType;
scheduleOrDelay: ms.StringValue;
callback: () => Promise<void>;
}
@injectable()
export class TaskSchedulerService {
protected roomGarbageCollectorJob: CronJob | null = null;
private recordingCleanupTimers: Map<string, NodeJS.Timeout> = new Map();
private taskRegistry: IScheduledTask[] = [];
private scheduledTasks = new Map<string, CronJob | NodeJS.Timeout>();
private started = false;
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(MutexService) protected mutexService: MutexService
) {}
) {
this.systemEventService.onRedisReady(() => {
this.logger.debug('Starting all registered tasks...');
this.taskRegistry.forEach((task) => {
this.scheduleTask(task);
});
this.started = true;
});
}
/**
* Starts the room garbage collector which runs a specified callback function every hour.
@ -64,10 +86,10 @@ export class TaskSchedulerService {
* the active lock for the specified room.
*/
async scheduleRecordingCleanupTimer(roomId: string, cleanupCallback: () => Promise<void>): Promise<void> {
this.logger.debug(`Recording cleanup timer (${MEET_RECORDING_CLEANUP_TIMEOUT}) scheduled for room ${roomId}.`);
this.logger.debug(`Recording cleanup timer (${MEET_RECORDING_STARTED_TIMEOUT}) scheduled for room ${roomId}.`);
// Schedule a timeout to run the cleanup callback after a specified time
const timeoutMs = ms(MEET_RECORDING_CLEANUP_TIMEOUT);
const timeoutMs = ms(MEET_RECORDING_STARTED_TIMEOUT);
const timer = setTimeout(async () => {
this.logger.warn(`Recording cleanup timer expired for room ${roomId}. Initiating cleanup process.`);
this.recordingCleanupTimers.delete(roomId);
@ -96,4 +118,146 @@ export class TaskSchedulerService {
}
});
}
/**
* Registers a new task to be scheduled.
* If the task is already registered, it will not be added again.
*/
public registerTask(task: IScheduledTask): void {
if (this.taskRegistry.find((t) => t.name === task.name)) {
this.logger.error(`Task with name "${task.name}" already exists.`);
return;
}
this.logger.debug(`Registering task "${task.name}".`);
this.taskRegistry.push(task);
if (this.started) {
this.scheduleTask(task);
}
}
protected scheduleTask(task: IScheduledTask): void {
const { name, type, scheduleOrDelay, callback } = task;
if (this.scheduledTasks.has(name)) {
this.logger.debug(`Task "${name}" already scheduled.`);
return;
}
if (type === 'cron') {
this.logger.debug(`Scheduling cron task "${name}" with schedule "${scheduleOrDelay}"`);
const cronExpression = this.msStringToCronExpression(scheduleOrDelay);
const lockDuration = this.getCronIntervalDuration(cronExpression);
const job = new CronJob(cronExpression, async () => {
try {
this.logger.debug(`Attempting to acquire lock for cron task "${name}"`);
const lock = await this.mutexService.acquire(MeetLock.getScheduledTaskLock(name), lockDuration);
if (!lock) {
this.logger.debug(`Task "${name}" skipped: another instance holds the lock.`);
return;
}
this.logger.debug(`Running cron task "${name}"...`);
await callback();
} catch (error) {
this.logger.error(`Error running cron task "${name}":`, error);
}
});
job.start();
this.scheduledTasks.set(name, job);
} else if (type === 'timeout') {
this.logger.debug(`Scheduling timeout task "${name}" with delay ${scheduleOrDelay}`);
const timeoutId = setTimeout(
async () => {
try {
await callback();
} catch (error) {
this.logger.error(`Error running timeout task "${name}":`, error);
}
},
ms(scheduleOrDelay as ms.StringValue)
);
this.scheduledTasks.set(name, timeoutId);
}
}
/**
* Cancel the scheduled task with the given name.
*/
public cancelTask(name: string): void {
const scheduled = this.scheduledTasks.get(name);
if (scheduled) {
if (scheduled instanceof CronJob) {
scheduled.stop();
} else {
clearTimeout(scheduled as NodeJS.Timeout);
}
this.scheduledTasks.delete(name);
this.logger.debug(`Task "${name}" cancelled.`);
}
}
protected getCronIntervalDuration(cronExpression: string): number {
try {
// Parse the cron expression using cron-parser
const interval = CronExpressionParser.parse(cronExpression);
// Get the next interval time
const next = interval.next().getTime();
// Get the current time
const afterNext = interval.next().getTime();
// Calculate the interval duration in milliseconds
const intervalMs = afterNext - next;
// Return the interval duration minus 1 minute for ensuring the lock expires before the next iteration
return Math.max(intervalMs - ms('1m'), ms('10s'));
} catch (error) {
this.logger.error('Error parsing cron expression:', error);
throw new Error('Invalid cron expression');
}
}
/**
* Converts a human-readable time string to a cron expression.
*
* This method takes a string representation of a time duration (e.g., '1h', '30m', '1d')
* and converts it to an equivalent cron expression that would trigger at that interval.
* The conversion uses different cron patterns based on the duration magnitude:
* - For days: Runs at midnight every X days
* - For hours: Runs at the start of every X hours
* - For minutes: Runs every X minutes
* - For seconds 30: Runs every minute
* - For seconds < 30: Runs every X seconds
*
* @param msString - A string representing time duration (parsed by the 'ms' library)
* @returns A cron expression string that represents the equivalent scheduling interval
*
*/
protected msStringToCronExpression(msString: ms.StringValue): string {
const milliseconds = ms(msString);
const totalSeconds = Math.floor(milliseconds / 1000);
const seconds = totalSeconds % 60;
const minutes = Math.floor(totalSeconds / 60) % 60;
const hours = Math.floor(totalSeconds / 3600) % 24;
const days = Math.floor(totalSeconds / 86400);
if (days > 0) {
return `0 0 */${days} * *`;
} else if (hours > 0) {
return `0 0 */${hours} * * *`;
} else if (minutes > 0) {
return `0 */${minutes} * * * *`;
} else if (seconds >= 30) {
return `0 * * * * *`;
} else {
return `*/${Math.max(seconds, 1)} * * * * *`;
}
}
}