import { CronJob } from 'cron'; import { inject, injectable } from 'inversify'; import ms from 'ms'; import INTERNAL_CONFIG from '../config/internal-config.js'; import { MeetLock } from '../helpers/index.js'; import { LoggerService, MutexService, SystemEventService } from './index.js'; export type TaskType = 'cron' | 'timeout'; export interface IScheduledTask { name: string; type: TaskType; scheduleOrDelay: ms.StringValue; callback: () => Promise; } @injectable() export class TaskSchedulerService { private taskRegistry: IScheduledTask[] = []; private scheduledTasks = new Map(); private started = false; constructor( @inject(LoggerService) protected logger: LoggerService, @inject(SystemEventService) protected systemEventService: SystemEventService, @inject(MutexService) protected mutexService: MutexService ) { this.systemEventService.onRedisReady(() => { this.logger.debug('Starting all registered tasks...'); this.taskRegistry.forEach((task) => { this.scheduleTask(task); }); this.started = true; this.systemEventService.onceRedisError(() => { this.logger.debug('Redis shutdown detected. Cancelling all scheduled tasks...'); this.scheduledTasks.forEach((task, name) => { this.cancelTask(name); }); this.started = false; }); }); } /** * Registers a new task to be scheduled. * If the task is already registered, it will not be added again. */ public registerTask(task: IScheduledTask): void { if (this.taskRegistry.find((t) => t.name === task.name)) { this.logger.error(`Task with name "${task.name}" already exists.`); return; } this.logger.debug(`Registering task "${task.name}".`); this.taskRegistry.push(task); if (this.started) { this.scheduleTask(task); } } protected async scheduleTask(task: IScheduledTask): Promise { const { name, type, scheduleOrDelay, callback } = task; if (this.scheduledTasks.has(name)) { this.logger.debug(`Task "${name}" already scheduled.`); return; } if (type === 'cron') { this.logger.debug(`Scheduling cron task "${name}" with schedule "${scheduleOrDelay}"`); const cronExpression = this.msStringToCronExpression(scheduleOrDelay); const lockDuration = Math.max(ms(scheduleOrDelay) - ms('1m'), ms(INTERNAL_CONFIG.CRON_JOB_MIN_LOCK_TTL)); const job = new CronJob(cronExpression, async () => { try { this.logger.debug(`Attempting to acquire lock for cron task "${name}"`); const lock = await this.mutexService.acquire(MeetLock.getScheduledTaskLock(name), lockDuration); if (!lock) { this.logger.debug(`Task "${name}" skipped: another instance holds the lock.`); return; } this.logger.debug(`Running cron task "${name}"...`); await callback(); } catch (error) { this.logger.error(`Error running cron task "${name}":`, error); } }); // Start the job immediately await callback(); job.start(); this.scheduledTasks.set(name, job); } else if (type === 'timeout') { this.logger.debug(`Scheduling timeout task "${name}" with delay ${scheduleOrDelay}`); const timeoutId = setTimeout( async () => { try { this.scheduledTasks.delete(name); await callback(); } catch (error) { this.logger.error(`Error running timeout task "${name}":`, error); } }, ms(scheduleOrDelay as ms.StringValue) ); this.scheduledTasks.set(name, timeoutId); } } /** * Cancel the scheduled task with the given name. */ public cancelTask(name: string): void { const scheduled = this.scheduledTasks.get(name); if (scheduled) { if (scheduled instanceof CronJob) { scheduled.stop(); } else { clearTimeout(scheduled as NodeJS.Timeout); } this.scheduledTasks.delete(name); this.taskRegistry = this.taskRegistry.filter((task) => task.name !== name); this.logger.debug(`Task "${name}" cancelled.`); } } /** * Converts a human-readable time string to a cron expression. * * This method takes a string representation of a time duration (e.g., '1h', '30m', '1d') * and converts it to an equivalent cron expression that would trigger at that interval. * The conversion uses different cron patterns based on the duration magnitude: * - For days: Runs at midnight every X days * - For hours: Runs at the start of every X hours * - For minutes: Runs every X minutes * - For seconds ≥ 30: Runs every minute * - For seconds < 30: Runs every X seconds * * @param msString - A string representing time duration (parsed by the 'ms' library) * @returns A cron expression string that represents the equivalent scheduling interval * */ protected msStringToCronExpression(msString: ms.StringValue): string { const milliseconds = ms(msString); const totalSeconds = Math.floor(milliseconds / 1000); const seconds = totalSeconds % 60; const minutes = Math.floor(totalSeconds / 60) % 60; const hours = Math.floor(totalSeconds / 3600) % 24; const days = Math.floor(totalSeconds / 86400); if (days > 0) { return `0 0 */${days} * *`; } else if (hours > 0) { return `0 0 */${hours} * * *`; } else if (minutes > 0) { return `0 */${minutes} * * * *`; } else if (seconds >= 30) { return `0 * * * * *`; } else { return `*/${Math.max(seconds, 1)} * * * * *`; } } }