openvidu/backend/src/services/task-scheduler.service.ts
2025-04-24 14:59:46 +02:00

170 lines
5.2 KiB
TypeScript

import { CronJob } from 'cron';
import { inject, injectable } from 'inversify';
import ms from 'ms';
import INTERNAL_CONFIG from '../config/internal-config.js';
import { MeetLock } from '../helpers/index.js';
import { LoggerService, MutexService, SystemEventService } from './index.js';
export type TaskType = 'cron' | 'timeout';
export interface IScheduledTask {
name: string;
type: TaskType;
scheduleOrDelay: ms.StringValue;
callback: () => Promise<void>;
}
@injectable()
export class TaskSchedulerService {
private taskRegistry: IScheduledTask[] = [];
private scheduledTasks = new Map<string, CronJob | NodeJS.Timeout>();
private started = false;
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(MutexService) protected mutexService: MutexService
) {
this.systemEventService.onRedisReady(() => {
this.logger.debug('Starting all registered tasks...');
this.taskRegistry.forEach((task) => {
this.scheduleTask(task);
});
this.started = true;
this.systemEventService.onceRedisError(() => {
this.logger.debug('Redis shutdown detected. Cancelling all scheduled tasks...');
this.scheduledTasks.forEach((task, name) => {
this.cancelTask(name);
});
this.started = false;
});
});
}
/**
* Registers a new task to be scheduled.
* If the task is already registered, it will not be added again.
*/
public registerTask(task: IScheduledTask): void {
if (this.taskRegistry.find((t) => t.name === task.name)) {
this.logger.error(`Task with name "${task.name}" already exists.`);
return;
}
this.logger.debug(`Registering task "${task.name}".`);
this.taskRegistry.push(task);
if (this.started) {
this.scheduleTask(task);
}
}
protected async scheduleTask(task: IScheduledTask): Promise<void> {
const { name, type, scheduleOrDelay, callback } = task;
if (this.scheduledTasks.has(name)) {
this.logger.debug(`Task "${name}" already scheduled.`);
return;
}
if (type === 'cron') {
this.logger.debug(`Scheduling cron task "${name}" with schedule "${scheduleOrDelay}"`);
const cronExpression = this.msStringToCronExpression(scheduleOrDelay);
const lockDuration = Math.max(ms(scheduleOrDelay) - ms('1m'), ms(INTERNAL_CONFIG.CRON_JOB_MIN_LOCK_TTL));
const job = new CronJob(cronExpression, async () => {
try {
this.logger.debug(`Attempting to acquire lock for cron task "${name}"`);
const lock = await this.mutexService.acquire(MeetLock.getScheduledTaskLock(name), lockDuration);
if (!lock) {
this.logger.debug(`Task "${name}" skipped: another instance holds the lock.`);
return;
}
this.logger.debug(`Running cron task "${name}"...`);
await callback();
} catch (error) {
this.logger.error(`Error running cron task "${name}":`, error);
}
});
// Start the job immediately
await callback();
job.start();
this.scheduledTasks.set(name, job);
} else if (type === 'timeout') {
this.logger.debug(`Scheduling timeout task "${name}" with delay ${scheduleOrDelay}`);
const timeoutId = setTimeout(
async () => {
try {
this.scheduledTasks.delete(name);
await callback();
} catch (error) {
this.logger.error(`Error running timeout task "${name}":`, error);
}
},
ms(scheduleOrDelay as ms.StringValue)
);
this.scheduledTasks.set(name, timeoutId);
}
}
/**
* Cancel the scheduled task with the given name.
*/
public cancelTask(name: string): void {
const scheduled = this.scheduledTasks.get(name);
if (scheduled) {
if (scheduled instanceof CronJob) {
scheduled.stop();
} else {
clearTimeout(scheduled as NodeJS.Timeout);
}
this.scheduledTasks.delete(name);
this.taskRegistry = this.taskRegistry.filter((task) => task.name !== name);
this.logger.debug(`Task "${name}" cancelled.`);
}
}
/**
* Converts a human-readable time string to a cron expression.
*
* This method takes a string representation of a time duration (e.g., '1h', '30m', '1d')
* and converts it to an equivalent cron expression that would trigger at that interval.
* The conversion uses different cron patterns based on the duration magnitude:
* - For days: Runs at midnight every X days
* - For hours: Runs at the start of every X hours
* - For minutes: Runs every X minutes
* - For seconds ≥ 30: Runs every minute
* - For seconds < 30: Runs every X seconds
*
* @param msString - A string representing time duration (parsed by the 'ms' library)
* @returns A cron expression string that represents the equivalent scheduling interval
*
*/
protected msStringToCronExpression(msString: ms.StringValue): string {
const milliseconds = ms(msString);
const totalSeconds = Math.floor(milliseconds / 1000);
const seconds = totalSeconds % 60;
const minutes = Math.floor(totalSeconds / 60) % 60;
const hours = Math.floor(totalSeconds / 3600) % 24;
const days = Math.floor(totalSeconds / 86400);
if (days > 0) {
return `0 0 */${days} * *`;
} else if (hours > 0) {
return `0 0 */${hours} * * *`;
} else if (minutes > 0) {
return `0 */${minutes} * * * *`;
} else if (seconds >= 30) {
return `0 * * * * *`;
} else {
return `*/${Math.max(seconds, 1)} * * * * *`;
}
}
}