From 10181c69ee269f043a4abccc7d2f4a698e19ef44 Mon Sep 17 00:00:00 2001 From: Carlos Santos <4a.santos@gmail.com> Date: Mon, 31 Mar 2025 13:14:30 +0200 Subject: [PATCH] backend: replace recording cleanup timer with task scheduler registration --- backend/src/services/recording.service.ts | 19 +++++--- .../src/services/task-scheduler.service.ts | 43 +------------------ 2 files changed, 13 insertions(+), 49 deletions(-) diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index fb39337..523f62a 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -23,6 +23,7 @@ import { RecordingHelper } from '../helpers/recording.helper.js'; import { MEET_RECORDING_LOCK_GC_INTERVAL, MEET_RECORDING_LOCK_TTL, + MEET_RECORDING_STARTED_TIMEOUT, MEET_S3_BUCKET, MEET_S3_RECORDINGS_PREFIX, MEET_S3_SUBBUCKET @@ -80,10 +81,12 @@ export class RecordingService { const { recordingId } = recordingInfo; const recordingPromise = new Promise((resolve, reject) => { - this.taskSchedulerService.scheduleRecordingCleanupTimer( - roomId, - this.handleRecordingLockTimeout.bind(this, recordingId, roomId, reject) - ); + this.taskSchedulerService.registerTask({ + name: `${roomId}_recording_timeout`, + type: 'timeout', + scheduleOrDelay: MEET_RECORDING_STARTED_TIMEOUT, + callback: this.handleRecordingLockTimeout.bind(this, recordingId, roomId, reject) + }); this.systemEventService.once(SystemEventType.RECORDING_ACTIVE, (payload: Record) => { // This listener is triggered only for the instance that started the recording. @@ -92,7 +95,7 @@ export class RecordingService { payload?.recordingId === recordingId && payload?.roomId === roomId; if (isEventForCurrentRecording) { - this.taskSchedulerService.cancelRecordingCleanupTimer(roomId); + this.taskSchedulerService.cancelTask(`${roomId}_recording_timeout`); resolve(recordingInfo); } else { this.logger.error('Received recording active event with mismatched recording ID:', payload); @@ -121,7 +124,7 @@ export class RecordingService { } // Cancel the recording cleanup timer if it is running - this.taskSchedulerService.cancelRecordingCleanupTimer(roomId); + this.taskSchedulerService.cancelTask(`${roomId}_recording_timeout`); // Remove the listener for the EGRESS_STARTED event. this.systemEventService.off(SystemEventType.RECORDING_ACTIVE); @@ -587,7 +590,9 @@ export class RecordingService { const lockAge = Date.now() - (createdAt || Date.now()); if (lockAge < LOCK_GRACE_PERIOD) { - this.logger.debug(`Lock for room ${roomId} is too recent (${ms(lockAge)}), skipping orphan lock cleanup`); + this.logger.debug( + `Lock for room ${roomId} is too recent (${ms(lockAge)}), skipping orphan lock cleanup` + ); return; } diff --git a/backend/src/services/task-scheduler.service.ts b/backend/src/services/task-scheduler.service.ts index 4fccb97..e0fd198 100644 --- a/backend/src/services/task-scheduler.service.ts +++ b/backend/src/services/task-scheduler.service.ts @@ -6,7 +6,6 @@ import { MutexService } from './mutex.service.js'; import { MeetLock } from '../helpers/redis.helper.js'; import ms from 'ms'; import { CronExpressionParser } from 'cron-parser'; -import { MEET_RECORDING_STARTED_TIMEOUT } from '../environment.js'; export type TaskType = 'cron' | 'timeout'; @@ -78,47 +77,6 @@ export class TaskSchedulerService { this.roomGarbageCollectorJob.start(); } - /** - * Schedules a cleanup timer for a recording that has just started. - * - * If the egress_started webhook is not received before the timer expires, - * this timer will execute a cleanup callback by stopping the recording and releasing - * the active lock for the specified room. - */ - async scheduleRecordingCleanupTimer(roomId: string, cleanupCallback: () => Promise): Promise { - this.logger.debug(`Recording cleanup timer (${MEET_RECORDING_STARTED_TIMEOUT}) scheduled for room ${roomId}.`); - - // Schedule a timeout to run the cleanup callback after a specified time - const timeoutMs = ms(MEET_RECORDING_STARTED_TIMEOUT); - const timer = setTimeout(async () => { - this.logger.warn(`Recording cleanup timer expired for room ${roomId}. Initiating cleanup process.`); - this.recordingCleanupTimers.delete(roomId); - await cleanupCallback(); - }, timeoutMs); - this.recordingCleanupTimers.set(roomId, timer); - } - - cancelRecordingCleanupTimer(roomId: string): void { - const timer = this.recordingCleanupTimers.get(roomId); - - if (timer) { - clearTimeout(timer); - this.recordingCleanupTimers.delete(roomId); - this.logger.info(`Recording cleanup timer cancelled for room ${roomId}`); - } - } - - async startRecordingLockGarbageCollector(callbackFn: () => Promise): Promise { - // Create a cron job to run every minute - const recordingLockGarbageCollectorJob = new CronJob('0 * * * * *', async () => { - try { - await callbackFn(); - } catch (error) { - this.logger.error('Error running recording lock garbage collection:', error); - } - }); - } - /** * Registers a new task to be scheduled. * If the task is already registered, it will not be added again. @@ -176,6 +134,7 @@ export class TaskSchedulerService { const timeoutId = setTimeout( async () => { try { + this.scheduledTasks.delete(name); await callback(); } catch (error) { this.logger.error(`Error running timeout task "${name}":`, error);