From 58e79e2e7e7f6c0c4c80f94efb0d4a8c2d14b699 Mon Sep 17 00:00:00 2001 From: Carlos Santos <4a.santos@gmail.com> Date: Mon, 31 Mar 2025 12:44:35 +0200 Subject: [PATCH] backend: Add scheduled task support and recording GC --- backend/package-lock.json | 13 ++ backend/package.json | 1 + backend/src/environment.ts | 6 +- backend/src/helpers/redis.helper.ts | 11 +- backend/src/models/redis.model.ts | 4 +- backend/src/services/livekit.service.ts | 92 ++++++++-- backend/src/services/mutex.service.ts | 71 ++++++-- backend/src/services/recording.service.ts | 145 ++++++++++++++- .../src/services/task-scheduler.service.ts | 172 +++++++++++++++++- 9 files changed, 466 insertions(+), 49 deletions(-) diff --git a/backend/package-lock.json b/backend/package-lock.json index 4187a50..42aaa74 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -15,6 +15,7 @@ "cookie-parser": "1.4.7", "cors": "2.8.5", "cron": "^4.1.0", + "cron-parser": "^5.1.0", "dotenv": "16.4.7", "express": "4.21.2", "express-openapi-validator": "^5.4.2", @@ -6265,6 +6266,18 @@ "node": ">=18.x" } }, + "node_modules/cron-parser": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.1.0.tgz", + "integrity": "sha512-dnR1hHR7iGJe+Jev72sareuGO+/AzzbVGoy95lkXuw+ftvDkERdnXRnJMJ+ArxKId5QnLXGckFgVGCCwAMhlRw==", + "license": "MIT", + "dependencies": { + "luxon": "^3.5.0" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/cross-env": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz", diff --git a/backend/package.json b/backend/package.json index a92c972..78bde32 100644 --- a/backend/package.json +++ b/backend/package.json @@ -47,6 +47,7 @@ "cookie-parser": "1.4.7", "cors": "2.8.5", "cron": "^4.1.0", + "cron-parser": "^5.1.0", "dotenv": "16.4.7", "express": "4.21.2", "express-openapi-validator": "^5.4.2", diff --git a/backend/src/environment.ts b/backend/src/environment.ts index f1f3c09..f7188a0 100644 --- a/backend/src/environment.ts +++ b/backend/src/environment.ts @@ -1,5 +1,6 @@ import dotenv from 'dotenv'; import chalk from 'chalk'; +import ms from 'ms'; const envPath = process.env.MEET_CONFIG_DIR ? process.env.MEET_CONFIG_DIR @@ -80,8 +81,9 @@ export const MEET_S3_ROOMS_PREFIX = 'rooms'; export const MEET_S3_RECORDINGS_PREFIX = 'recordings'; // Time to live for the active recording lock in Redis -export const MEET_RECORDING_LOCK_TTL = '6h'; -export const MEET_RECORDING_CLEANUP_TIMEOUT = '30s'; +export const MEET_RECORDING_LOCK_TTL: ms.StringValue = '6h'; +export const MEET_RECORDING_STARTED_TIMEOUT: ms.StringValue = '30s'; +export const MEET_RECORDING_LOCK_GC_INTERVAL: ms.StringValue = '30m'; export function checkModuleEnabled() { if (MODULES_FILE) { diff --git a/backend/src/helpers/redis.helper.ts b/backend/src/helpers/redis.helper.ts index 9891d09..f5fc203 100644 --- a/backend/src/helpers/redis.helper.ts +++ b/backend/src/helpers/redis.helper.ts @@ -10,7 +10,7 @@ export class MeetLock { throw new Error('roomId must be a non-empty string'); } - return `${RedisLockPrefix.BASE}${roomId}_${RedisLockName.RECORDING_ACTIVE}`; + return `${RedisLockPrefix.BASE}${RedisLockName.RECORDING_ACTIVE}_${roomId}`; } static getRegistryLock(lockName: string): string { @@ -21,8 +21,15 @@ export class MeetLock { return `${RedisLockPrefix.REGISTRY}${lockName}`; } + static getScheduledTaskLock(taskName: string): string { + if (!taskName) { + throw new Error('taskName must be a non-empty string'); + } + + return `${RedisLockPrefix.BASE}${RedisLockName.SCHEDULED_TASK}_${taskName}`; + } + static getRoomGarbageCollectorLock(): string { return `${RedisLockPrefix.BASE}${RedisLockName.ROOM_GARBAGE_COLLECTOR}`; } - } diff --git a/backend/src/models/redis.model.ts b/backend/src/models/redis.model.ts index 448ca99..0a1adc3 100644 --- a/backend/src/models/redis.model.ts +++ b/backend/src/models/redis.model.ts @@ -5,6 +5,6 @@ export const enum RedisLockPrefix { export const enum RedisLockName { ROOM_GARBAGE_COLLECTOR = 'room_garbage_collector', - RECORDING_ACTIVE = 'recording_active' + RECORDING_ACTIVE = 'recording_active', + SCHEDULED_TASK = 'scheduled_task' } - diff --git a/backend/src/services/livekit.service.ts b/backend/src/services/livekit.service.ts index 60d2b09..1cd2546 100644 --- a/backend/src/services/livekit.service.ts +++ b/backend/src/services/livekit.service.ts @@ -30,6 +30,7 @@ import { internalError } from '../models/error.model.js'; import { ParticipantPermissions, ParticipantRole, TokenOptions } from '@typings-ce'; +import { RecordingHelper } from '../helpers/recording.helper.js'; @injectable() export class LiveKitService { @@ -178,11 +179,12 @@ export class LiveKitService { * @returns {Promise} A promise that resolves to an array of EgressInfo objects. * @throws Will throw an error if there is an issue retrieving the egress information. */ - async getEgress(roomName?: string, egressId?: string): Promise { + async getEgress(roomName?: string, egressId?: string, active?: boolean): Promise { try { const options: ListEgressOptions = { roomName, - egressId + egressId, + active }; return await this.egressClient.listEgress(options); } catch (error: any) { @@ -203,25 +205,77 @@ export class LiveKitService { * @throws Will throw an error if there is an issue retrieving the egress information. */ async getActiveEgress(roomName?: string, egressId?: string): Promise { + const egress = await this.getEgress(roomName, egressId, true); + + // In some cases, the egress list may contain egress that their status is ENDINDG + // which means that the egress is still active but it is in the process of stopping. + // We need to filter those out. + return egress.filter((e) => e.status === EgressStatus.EGRESS_ACTIVE); + } + + /** + * Retrieves all recording egress sessions for a specific room or all rooms. + * + * @param {string} [roomName] - Optional room name to filter recordings by room + * @returns {Promise} A promise that resolves to an array of recording EgressInfo objects + * @throws Will throw an error if there is an issue retrieving the egress information + */ + async getRecordingsEgress(roomName?: string): Promise { + const egressArray = await this.getEgress(roomName); + + if (egressArray.length === 0) { + return []; + } + + // Filter the egress array to include only recording egress + return egressArray.filter((egress) => RecordingHelper.isRecordingEgress(egress)); + } + + /** + * Retrieves all active recording egress sessions for a specific room or all rooms. + * + * @param {string} [roomName] - Optional room name to filter recordings by room + * @returns {Promise} A promise that resolves to an array of active recording EgressInfo objects + * @throws Will throw an error if there is an issue retrieving the egress information + */ + async getActiveRecordingsEgress(roomName?: string): Promise { + // Get all recording egress + const recordingEgress = await this.getRecordingsEgress(roomName); + + if (recordingEgress.length === 0) { + return []; + } + + // Filter the recording egress array to include only active egress + return recordingEgress.filter((egress) => egress.status === EgressStatus.EGRESS_ACTIVE); + } + + /** + * Retrieves all in-progress recording egress sessions for a specific room or all rooms. + * + * This method checks it is in any "in-progress" state, including EGRESS_STARTING, EGRESS_ACTIVE, and EGRESS_ENDING. + * + * @param {string} [roomName] - Optional room name to filter recordings by room + * @returns {Promise} A promise that resolves to an array of in-progress recording EgressInfo objects + * @throws Will throw an error if there is an issue retrieving the egress information + */ + async getInProgressRecordingsEgress(roomName?: string): Promise { try { - const options: ListEgressOptions = { - roomName, - egressId, - active: true - }; - const egress = await this.egressClient.listEgress(options); + const egressArray = await this.getEgress(roomName); + return egressArray.filter((egress) => { + if (!RecordingHelper.isRecordingEgress(egress)) { + return false; + } - // In some cases, the egress list may contain egress that their status is ENDINDG - // which means that the egress is still active but it is in the process of stopping. - // We need to filter those out. - return egress.filter((e) => e.status === EgressStatus.EGRESS_ACTIVE); - } catch (error: any) { - if (error.message.includes('404')) { - return []; - } - - this.logger.error(`Error getting egress: ${JSON.stringify(error)}`); - throw internalError(`Error getting egress: ${error}`); + // Check if recording is in any "in-progress" state + return [EgressStatus.EGRESS_STARTING, EgressStatus.EGRESS_ACTIVE, EgressStatus.EGRESS_ENDING].includes( + egress.status + ); + }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Error getting in-progress recordings: ${errorMessage}`); + throw internalError(`Error getting in-progress recordings: ${errorMessage}`); } } diff --git a/backend/src/services/mutex.service.ts b/backend/src/services/mutex.service.ts index a06937e..ba9be84 100644 --- a/backend/src/services/mutex.service.ts +++ b/backend/src/services/mutex.service.ts @@ -38,7 +38,8 @@ export class MutexService { JSON.stringify({ resources: lock.resources, value: lock.value, - expiration: lock.expiration + expiration: lock.expiration, + createdAt: Date.now() }), true ); @@ -57,17 +58,17 @@ export class MutexService { */ async release(key: string): Promise { const registryKey = MeetLock.getRegistryLock(key); - const lock = await this.getLockData(key); + const lock = await this.getLockData(registryKey); if (!lock) { + this.logger.warn(`Lock not found for resource: ${key}. May be expired or released by another process.`); return; } if (lock) { - this.logger.debug(`Releasing lock for resource: ${key}`); - try { await lock.release(); + this.logger.verbose(`Lock ${key} successfully released.`); } catch (error) { this.logger.error(`Error releasing lock for key ${key}:`, error); } finally { @@ -77,31 +78,75 @@ export class MutexService { } /** - * Retrieves the lock data for a given resource. + * Retrieves all locks for a given prefix. * - * This method first attempts to retrieve the lock from Redis. If the lock data is successfully retrieved from Redis, - * it constructs a new `Lock` instance and returns it. If the lock data cannot be found the method returns `null`. + * This method retrieves all keys from Redis that match the specified prefix and returns an array of `Lock` instances. * - * @param key - The identifier of the resource for which the lock data is being retrieved. - * @returns A promise that resolves to the `Lock` instance if found, or `null` if the lock data is not available. + * @param pattern - The prefix to filter the keys in Redis. + * @returns A promise that resolves to an array of `Lock` instances. */ - protected async getLockData(key: string): Promise { + async getLocksByPrefix(pattern: string): Promise { + const registryPattern = MeetLock.getRegistryLock(pattern); + const keys = await this.redisService.getKeys(registryPattern); + this.logger.debug(`Found ${keys.length} registry keys for pattern "${pattern}".`); + + if (keys.length === 0) { + return []; + } + + const lockPromises: Promise[] = keys.map((key) => this.getLockData(key)); + + const locksResult = await Promise.all(lockPromises); + + const locks = locksResult.filter((lock): lock is Lock => lock !== null); + return locks; + } + + lockExists(key: string): Promise { + const registryKey = MeetLock.getRegistryLock(key); + return this.redisService.exists(registryKey); + } + + /** + * Retrieves the creation timestamp of a lock identified by the given key. + * + * @param key - The unique identifier for the lock + * @returns A Promise that resolves to the creation timestamp (as a number) of the lock, or null if the lock doesn't exist or has expired + */ + async getLockCreatedAt(key: string): Promise { const registryKey = MeetLock.getRegistryLock(key); + const redisLockData = await this.redisService.get(registryKey); + + if (!redisLockData) { + this.logger.warn( + `Lock not found for resource: ${registryKey}. May be expired or released by another process.` + ); + return null; + } + + const { createdAt } = JSON.parse(redisLockData); + return createdAt; + } + + /** + * Retrieves the lock data for a given resource key. + * + * @param registryKey - The resource key to retrieve the lock data for. + * @returns A promise that resolves to a `Lock` instance or null if not found. + */ + protected async getLockData(registryKey: string): Promise { try { - this.logger.debug(`Getting lock data in Redis for resource: ${key}`); // Try to get lock from Redis const redisLockData = await this.redisService.get(registryKey); if (!redisLockData) { - this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`); return null; } const { resources, value, expiration } = JSON.parse(redisLockData); return new Lock(this.redlockWithoutRetry, resources, value, [], expiration); } catch (error) { - this.logger.error(`Cannot release lock. Lock not found for resource: ${key}.`); return null; } } diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index d5bf698..fb39337 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -21,6 +21,7 @@ import { LoggerService } from './logger.service.js'; import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce'; import { RecordingHelper } from '../helpers/recording.helper.js'; import { + MEET_RECORDING_LOCK_GC_INTERVAL, MEET_RECORDING_LOCK_TTL, MEET_S3_BUCKET, MEET_S3_RECORDINGS_PREFIX, @@ -31,7 +32,7 @@ import { inject, injectable } from '../config/dependency-injector.config.js'; import { MutexService, RedisLock } from './mutex.service.js'; import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js'; import { MeetLock } from '../helpers/redis.helper.js'; -import { TaskSchedulerService } from './task-scheduler.service.js'; +import { IScheduledTask, TaskSchedulerService } from './task-scheduler.service.js'; import { SystemEventService } from './system-event.service.js'; import { SystemEventType } from '../models/system-event.model.js'; @@ -45,7 +46,16 @@ export class RecordingService { @inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService, @inject(SystemEventService) protected systemEventService: SystemEventService, @inject(LoggerService) protected logger: LoggerService - ) {} + ) { + // Register the recording garbage collector task + const recordingGarbageCollectorTask: IScheduledTask = { + name: 'activeRecordingGarbageCollector', + type: 'cron', + scheduleOrDelay: MEET_RECORDING_LOCK_GC_INTERVAL, + callback: this.deleteOrphanLocks.bind(this) + }; + this.taskSchedulerService.registerTask(recordingGarbageCollectorTask); + } async startRecording(roomId: string): Promise { let acquiredLock: RedisLock | null = null; @@ -504,10 +514,131 @@ export class RecordingService { } } - protected async deleteOrphanLocks() { - // TODO: Get all recordings active locks from redis - // TODO: Extract all rooms ids from the active locks - // TODO: Check each room id if it has an active recording - // TODO: Remove the lock if the room has no active recording + /** + * Cleans up orphaned recording locks in the system. + * + * This method identifies and releases locks that are no longer needed by: + * 1. Finding all active recording locks in the system + * 2. Checking if the associated room still exists in LiveKit + * 3. For existing rooms, checking if they have active recordings in progress + * 4. Releasing lock if the room exists but has no participants or no active recordings + * 5. Releasing lock if the room does not exist + * + * Orphaned locks can occur when: + * - A room is deleted but its lock remains + * - A recording completes but the lock isn't released + * - System crashes during the recording process + * + * @returns {Promise} A promise that resolves when the cleanup process completes + * @throws {OpenViduMeetError} Rethrows any errors except 404 (room not found) + * @protected + */ + protected async deleteOrphanLocks(): Promise { + this.logger.debug('Starting orphaned recording locks cleanup process'); + // Create the lock pattern for finding all recording locks + const lockPattern = MeetLock.getRecordingActiveLock('roomId').replace('roomId', '*'); + this.logger.debug(`Searching for locks with pattern: ${lockPattern}`); + let recordingLocks: RedisLock[] = []; + + try { + recordingLocks = await this.mutexService.getLocksByPrefix(lockPattern); + + if (recordingLocks.length === 0) { + this.logger.debug('No active recording locks found'); + return; + } + + // Extract all rooms ids from the active locks + const lockPrefix = lockPattern.replace('*', ''); + + const roomIds = recordingLocks.map((lock) => lock.resources[0].replace(lockPrefix, '')); + + // Check each room id if it exists in LiveKit + // If the room does not exist, release the lock + for (const roomId of roomIds) { + await this.processOrphanLock(roomId, lockPrefix); + } + } catch (error) { + this.logger.error('Error retrieving recording locks:', error); + } + } + + /** + * Process an orphaned lock by checking if the associated room exists and releasing the lock if necessary. + * + * @param roomId - The ID of the room associated with the lock. + * @param lockPrefix - The prefix used to identify the lock. + */ + protected async processOrphanLock(roomId: string, lockPrefix: string): Promise { + const lockKey = `${lockPrefix}${roomId}`; + const LOCK_GRACE_PERIOD = ms('1m'); + + try { + // Verify if the lock still exists before proceeding to check the room + const lockExists = await this.mutexService.lockExists(lockKey); + + if (!lockExists) { + this.logger.debug(`Lock for room ${roomId} no longer exists, skipping`); + return; + } + + // Verify if the lock is too recent + const createdAt = await this.mutexService.getLockCreatedAt(lockKey); + const lockAge = Date.now() - (createdAt || Date.now()); + + if (lockAge < LOCK_GRACE_PERIOD) { + this.logger.debug(`Lock for room ${roomId} is too recent (${ms(lockAge)}), skipping orphan lock cleanup`); + return; + } + + const room = await this.livekitService.getRoom(roomId); + + if (room.numPublishers === 0) { + const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId); + + if (inProgressRecordings.length > 0) { + this.logger.debug( + `Room ${roomId} has ${inProgressRecordings.length} recordings in transition, keeping lock` + ); + return; + } + + this.logger.info(`Room ${roomId} no longer exists, releasing orphaned lock`); + await this.mutexService.release(lockKey); + return; + } + + // The room has participants, check if it has in-progress recordings + const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId); + + if (inProgressRecordings.length === 0) { + // If the room has no active recording, release the lock + this.logger.info(`No active recordings for room ${roomId}, releasing orphaned lock`); + await this.mutexService.release(lockKey); + } + } catch (error) { + if (error instanceof OpenViduMeetError && error.statusCode === 404) { + this.logger.info(`Room ${roomId} no longer exists, releasing orphaned lock`); + + try { + const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId); + + if (inProgressRecordings.length > 0) { + this.logger.debug( + `Room ${roomId} has ${inProgressRecordings.length} recordings in transition, keeping lock` + ); + return; + } + + await this.mutexService.release(lockKey); + } catch (error) { + this.logger.error(`Error releasing lock for room ${roomId}:`, error); + } + + return; + } + + throw error; + } } } diff --git a/backend/src/services/task-scheduler.service.ts b/backend/src/services/task-scheduler.service.ts index 465577f..b66bf40 100644 --- a/backend/src/services/task-scheduler.service.ts +++ b/backend/src/services/task-scheduler.service.ts @@ -5,18 +5,40 @@ import { CronJob } from 'cron'; import { MutexService } from './mutex.service.js'; import { MeetLock } from '../helpers/redis.helper.js'; import ms from 'ms'; -import { MEET_RECORDING_CLEANUP_TIMEOUT } from '../environment.js'; +import { CronExpressionParser } from 'cron-parser'; +import { MEET_RECORDING_STARTED_TIMEOUT } from '../environment.js'; + +export type TaskType = 'cron' | 'timeout'; + +export interface IScheduledTask { + name: string; + type: TaskType; + scheduleOrDelay: ms.StringValue; + callback: () => Promise; +} @injectable() export class TaskSchedulerService { protected roomGarbageCollectorJob: CronJob | null = null; private recordingCleanupTimers: Map = new Map(); + private taskRegistry: IScheduledTask[] = []; + private scheduledTasks = new Map(); + private started = false; + constructor( @inject(LoggerService) protected logger: LoggerService, @inject(SystemEventService) protected systemEventService: SystemEventService, @inject(MutexService) protected mutexService: MutexService - ) {} + ) { + this.systemEventService.onRedisReady(() => { + this.logger.debug('Starting all registered tasks...'); + this.taskRegistry.forEach((task) => { + this.scheduleTask(task); + }); + this.started = true; + }); + } /** * Starts the room garbage collector which runs a specified callback function every hour. @@ -64,10 +86,10 @@ export class TaskSchedulerService { * the active lock for the specified room. */ async scheduleRecordingCleanupTimer(roomId: string, cleanupCallback: () => Promise): Promise { - this.logger.debug(`Recording cleanup timer (${MEET_RECORDING_CLEANUP_TIMEOUT}) scheduled for room ${roomId}.`); + this.logger.debug(`Recording cleanup timer (${MEET_RECORDING_STARTED_TIMEOUT}) scheduled for room ${roomId}.`); // Schedule a timeout to run the cleanup callback after a specified time - const timeoutMs = ms(MEET_RECORDING_CLEANUP_TIMEOUT); + const timeoutMs = ms(MEET_RECORDING_STARTED_TIMEOUT); const timer = setTimeout(async () => { this.logger.warn(`Recording cleanup timer expired for room ${roomId}. Initiating cleanup process.`); this.recordingCleanupTimers.delete(roomId); @@ -96,4 +118,146 @@ export class TaskSchedulerService { } }); } + + /** + * Registers a new task to be scheduled. + * If the task is already registered, it will not be added again. + */ + public registerTask(task: IScheduledTask): void { + if (this.taskRegistry.find((t) => t.name === task.name)) { + this.logger.error(`Task with name "${task.name}" already exists.`); + return; + } + + this.logger.debug(`Registering task "${task.name}".`); + this.taskRegistry.push(task); + + if (this.started) { + this.scheduleTask(task); + } + } + + protected scheduleTask(task: IScheduledTask): void { + const { name, type, scheduleOrDelay, callback } = task; + + if (this.scheduledTasks.has(name)) { + this.logger.debug(`Task "${name}" already scheduled.`); + return; + } + + if (type === 'cron') { + this.logger.debug(`Scheduling cron task "${name}" with schedule "${scheduleOrDelay}"`); + const cronExpression = this.msStringToCronExpression(scheduleOrDelay); + const lockDuration = this.getCronIntervalDuration(cronExpression); + + const job = new CronJob(cronExpression, async () => { + try { + this.logger.debug(`Attempting to acquire lock for cron task "${name}"`); + + const lock = await this.mutexService.acquire(MeetLock.getScheduledTaskLock(name), lockDuration); + + if (!lock) { + this.logger.debug(`Task "${name}" skipped: another instance holds the lock.`); + return; + } + + this.logger.debug(`Running cron task "${name}"...`); + await callback(); + } catch (error) { + this.logger.error(`Error running cron task "${name}":`, error); + } + }); + job.start(); + this.scheduledTasks.set(name, job); + } else if (type === 'timeout') { + this.logger.debug(`Scheduling timeout task "${name}" with delay ${scheduleOrDelay}`); + const timeoutId = setTimeout( + async () => { + try { + await callback(); + } catch (error) { + this.logger.error(`Error running timeout task "${name}":`, error); + } + }, + ms(scheduleOrDelay as ms.StringValue) + ); + this.scheduledTasks.set(name, timeoutId); + } + } + + /** + * Cancel the scheduled task with the given name. + */ + public cancelTask(name: string): void { + const scheduled = this.scheduledTasks.get(name); + + if (scheduled) { + if (scheduled instanceof CronJob) { + scheduled.stop(); + } else { + clearTimeout(scheduled as NodeJS.Timeout); + } + + this.scheduledTasks.delete(name); + this.logger.debug(`Task "${name}" cancelled.`); + } + } + + protected getCronIntervalDuration(cronExpression: string): number { + try { + // Parse the cron expression using cron-parser + const interval = CronExpressionParser.parse(cronExpression); + + // Get the next interval time + const next = interval.next().getTime(); + + // Get the current time + const afterNext = interval.next().getTime(); + + // Calculate the interval duration in milliseconds + const intervalMs = afterNext - next; + // Return the interval duration minus 1 minute for ensuring the lock expires before the next iteration + return Math.max(intervalMs - ms('1m'), ms('10s')); + } catch (error) { + this.logger.error('Error parsing cron expression:', error); + throw new Error('Invalid cron expression'); + } + } + + /** + * Converts a human-readable time string to a cron expression. + * + * This method takes a string representation of a time duration (e.g., '1h', '30m', '1d') + * and converts it to an equivalent cron expression that would trigger at that interval. + * The conversion uses different cron patterns based on the duration magnitude: + * - For days: Runs at midnight every X days + * - For hours: Runs at the start of every X hours + * - For minutes: Runs every X minutes + * - For seconds ≥ 30: Runs every minute + * - For seconds < 30: Runs every X seconds + * + * @param msString - A string representing time duration (parsed by the 'ms' library) + * @returns A cron expression string that represents the equivalent scheduling interval + * + */ + protected msStringToCronExpression(msString: ms.StringValue): string { + const milliseconds = ms(msString); + const totalSeconds = Math.floor(milliseconds / 1000); + const seconds = totalSeconds % 60; + const minutes = Math.floor(totalSeconds / 60) % 60; + const hours = Math.floor(totalSeconds / 3600) % 24; + const days = Math.floor(totalSeconds / 86400); + + if (days > 0) { + return `0 0 */${days} * *`; + } else if (hours > 0) { + return `0 0 */${hours} * * *`; + } else if (minutes > 0) { + return `0 */${minutes} * * * *`; + } else if (seconds >= 30) { + return `0 * * * * *`; + } else { + return `*/${Math.max(seconds, 1)} * * * * *`; + } + } }