diff --git a/backend/README.md b/backend/README.md index 49f46a3..2582d93 100644 --- a/backend/README.md +++ b/backend/README.md @@ -166,3 +166,44 @@ graph TD; M -->|No more rooms| N[Process completed] ``` + +5. **Stale recordings cleanup**: + To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 15 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default). + +```mermaid +graph TD; + A[Initiate stale recordings cleanup] --> B[Get all in-progress recordings from LiveKit] + B -->|Error| C[Log error and exit] + B -->|No recordings found| D[Log and exit] + B -->|Recordings found| E[Process recordings in batches of 10] + + E --> F[For each recording in batch] + F --> G[Extract recording ID and updatedAt] + G --> H[Get recording status from storage] + + H -->|Recording already ABORTED| I[Mark as already processed] + H -->|Recording active| J[Check if updatedAt exists] + + J -->|No updatedAt timestamp| K[Keep as fresh - log warning] + J -->|Has updatedAt| L[Calculate if stale] + + L -->|Still fresh| M[Log as fresh] + L -->|Is stale| N[Abort stale recording] + + N --> O[Update status to ABORTED in storage] + N --> P[Stop egress in LiveKit] + O --> Q[Log successful abort] + P --> Q + + I --> R[Continue to next recording] + K --> R + M --> R + Q --> R + + R -->|More recordings in batch| F + R -->|Batch complete| S[Process next batch] + S -->|More batches| E + S -->|All batches processed| T[Log completion metrics] + T --> U[Process completed] + +``` diff --git a/backend/src/config/internal-config.ts b/backend/src/config/internal-config.ts index 431a046..6ff1915 100644 --- a/backend/src/config/internal-config.ts +++ b/backend/src/config/internal-config.ts @@ -33,11 +33,14 @@ const INTERNAL_CONFIG = { S3_USERS_PREFIX: 'users', S3_API_KEYS_PREFIX: 'api_keys', - // Garbage collection and recording lock intervals + // Garbage collection and recording intervals ROOM_GC_INTERVAL: '1h' as StringValue, // e.g. garbage collector interval for rooms RECORDING_LOCK_TTL: '6h' as StringValue, // TTL for recording lock in Redis RECORDING_STARTED_TIMEOUT: '20s' as StringValue, // Timeout for recording start RECORDING_LOCK_GC_INTERVAL: '30m' as StringValue, // Garbage collection interval for recording locks + RECORDING_ORPHANED_LOCK_GRACE_PERIOD: '1m' as StringValue, // Grace period for orphaned recording locks + RECORDING_STALE_CLEANUP_INTERVAL: '15m' as StringValue, // Cleanup interval for stale recordings + RECORDING_STALE_AFTER: '5m' as StringValue, // Maximum allowed time since the last recording update before marking as stale CRON_JOB_MIN_LOCK_TTL: '59s' as StringValue, // Minimum TTL for cron job locks // Additional intervals diff --git a/backend/src/helpers/recording.helper.ts b/backend/src/helpers/recording.helper.ts index 23b8be3..c02332b 100644 --- a/backend/src/helpers/recording.helper.ts +++ b/backend/src/helpers/recording.helper.ts @@ -18,14 +18,14 @@ export class RecordingHelper { const startDateMs = RecordingHelper.extractStartDate(egressInfo); const endDateMs = RecordingHelper.extractEndDate(egressInfo); const filename = RecordingHelper.extractFilename(egressInfo); - const uid = RecordingHelper.extractUidFromFilename(filename); - const { egressId, roomName: roomId, errorCode, error, details } = egressInfo; + const recordingId = RecordingHelper.extractRecordingIdFromEgress(egressInfo); + const { roomName: roomId, errorCode, error, details } = egressInfo; const roomService = container.get(RoomService); const { roomName } = await roomService.getMeetRoom(roomId); return { - recordingId: `${roomId}--${egressId}--${uid}`, + recordingId, roomId, roomName, // outputMode, @@ -132,6 +132,13 @@ export class RecordingHelper { return uidWithExtension.split('.')[0]; } + static extractRecordingIdFromEgress(egressInfo: EgressInfo): string { + const { roomName: meetRoomId, egressId } = egressInfo; + const filename = RecordingHelper.extractFilename(egressInfo); + const uid = RecordingHelper.extractUidFromFilename(filename); + return `${meetRoomId}--${egressId}--${uid}`; + } + /** * Extracts the room name, egressId, and UID from the given recordingId. * @param recordingId ${roomId}--${egressId}--${uid} @@ -181,6 +188,11 @@ export class RecordingHelper { return this.toMilliseconds(Number(createdAt)); } + static extractUpdatedDate(egressInfo: EgressInfo): number | undefined { + const updatedAt = egressInfo.updatedAt; + return updatedAt ? this.toMilliseconds(Number(updatedAt)) : undefined; + } + /** * Extracts the size from the given EgressInfo object. * If the size is not available, it returns 0. diff --git a/backend/src/services/livekit.service.ts b/backend/src/services/livekit.service.ts index dbfa84a..e9859c5 100644 --- a/backend/src/services/livekit.service.ts +++ b/backend/src/services/livekit.service.ts @@ -193,16 +193,14 @@ export class LiveKitService { * @returns A Promise that resolves when the metadata has been successfully updated * @throws An internal error if there is an issue updating the metadata */ - async updateParticipantMetadata( - roomName: string, - participantName: string, - metadata: string - ): Promise { + async updateParticipantMetadata(roomName: string, participantName: string, metadata: string): Promise { try { await this.roomClient.updateParticipant(roomName, participantName, metadata); this.logger.verbose(`Updated metadata for participant ${participantName} in room ${roomName}`); } catch (error) { - this.logger.error(`Error updating metadata for participant ${participantName} in room ${roomName}: ${error}`); + this.logger.error( + `Error updating metadata for participant ${participantName} in room ${roomName}: ${error}` + ); throw internalError(`updating metadata for participant '${participantName}' in room '${roomName}'`); } } @@ -351,12 +349,9 @@ export class LiveKitService { */ async getInProgressRecordingsEgress(roomName?: string): Promise { try { - const egressArray = await this.getEgress(roomName); - return egressArray.filter((egress) => { - if (!RecordingHelper.isRecordingEgress(egress)) { - return false; - } + const egressArray = await this.getRecordingsEgress(roomName); + return egressArray.filter((egress) => { // Check if recording is in any "in-progress" state return [EgressStatus.EGRESS_STARTING, EgressStatus.EGRESS_ACTIVE, EgressStatus.EGRESS_ENDING].includes( egress.status diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index 34ab814..8dc3f5e 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -1,6 +1,6 @@ import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce'; import { inject, injectable } from 'inversify'; -import { EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk'; +import { EgressInfo, EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk'; import ms from 'ms'; import { Readable } from 'stream'; import { uid } from 'uid'; @@ -55,6 +55,15 @@ export class RecordingService { callback: this.performRecordingLocksGarbageCollection.bind(this) }; this.taskSchedulerService.registerTask(recordingGarbageCollectorTask); + + // Register the stale recordings cleanup task + const staleRecordingsCleanupTask: IScheduledTask = { + name: 'staleRecordingsCleanup', + type: 'cron', + scheduleOrDelay: INTERNAL_CONFIG.RECORDING_STALE_CLEANUP_INTERVAL, + callback: this.performStaleRecordingsCleanup.bind(this) + }; + this.taskSchedulerService.registerTask(staleRecordingsCleanupTask); } async startRecording(roomId: string): Promise { @@ -680,10 +689,20 @@ export class RecordingService { const roomIds = recordingLocks.map((lock) => lock.resources[0].replace(lockPrefix, '')); - // Check each room id if it exists in LiveKit - // If the room does not exist, release the lock - for (const roomId of roomIds) { - await this.evaluateAndReleaseOrphanedLock(roomId, lockPrefix); + const BATCH_SIZE = 10; + + for (let i = 0; i < roomIds.length; i += BATCH_SIZE) { + const batch = roomIds.slice(i, i + BATCH_SIZE); + + const results = await Promise.allSettled( + batch.map((roomId) => this.evaluateAndReleaseOrphanedLock(roomId, lockPrefix)) + ); + + results.forEach((result, index) => { + if (result.status === 'rejected') { + this.logger.error(`Failed to process lock for room ${batch[index]}:`, result.reason); + } + }); } } catch (error) { this.logger.error('Error retrieving recording locks:', error); @@ -698,11 +717,14 @@ export class RecordingService { */ protected async evaluateAndReleaseOrphanedLock(roomId: string, lockPrefix: string): Promise { const lockKey = `${lockPrefix}${roomId}`; - const LOCK_GRACE_PERIOD = ms('1m'); + const gracePeriodMs = ms(INTERNAL_CONFIG.RECORDING_ORPHANED_LOCK_GRACE_PERIOD); try { // Verify if the lock still exists before proceeding to check the room - const lockExists = await this.mutexService.lockExists(lockKey); + const [lockExists, lockCreatedAt] = await Promise.all([ + this.mutexService.lockExists(lockKey), + this.mutexService.getLockCreatedAt(lockKey) + ]); if (!lockExists) { this.logger.debug(`Lock for room ${roomId} no longer exists, skipping cleanup`); @@ -710,45 +732,35 @@ export class RecordingService { } // Verify if the lock is too recent - const createdAt = await this.mutexService.getLockCreatedAt(lockKey); - const lockAge = Date.now() - (createdAt || Date.now()); + const lockAge = Date.now() - (lockCreatedAt || Date.now()); - if (lockAge < LOCK_GRACE_PERIOD) { + if (lockAge < gracePeriodMs) { this.logger.debug( `Lock for room ${roomId} is too recent (${ms(lockAge)}), skipping orphan lock cleanup` ); return; } - const roomExists = await this.livekitService.roomExists(roomId); + const [lkRoomExists, inProgressRecordings] = await Promise.all([ + this.livekitService.roomExists(roomId), + this.livekitService.getInProgressRecordingsEgress(roomId) + ]); - if (roomExists) { - // Room exists, check if it has publishers - this.logger.debug(`Room ${roomId} exists, checking for publishers`); - const room = await this.livekitService.getRoom(roomId); - const hasPublishers = room.numPublishers > 0; - - if (hasPublishers) { - // Room has publishers, but no in-progress recordings - this.logger.debug(`Room ${roomId} has publishers, checking for in-progress recordings`); - } else { - // Room has no publishers - this.logger.debug(`Room ${roomId} has no publishers, checking for in-progress recordings`); - } + if (lkRoomExists) { + this.logger.debug(`Room ${roomId} exists, checking recordings`); } else { - // Room does not exist this.logger.debug(`Room ${roomId} no longer exists, checking for in-progress recordings`); } // Verify if in-progress recordings exist - const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId); const hasInProgressRecordings = inProgressRecordings.length > 0; if (hasInProgressRecordings) { - this.logger.debug(`Room ${roomId} has in-progress recordings, skipping cleanup`); + this.logger.debug(`Room ${roomId} has in-progress recordings, keeping lock`); return; } + // No in-progress recordings, releasing orphaned lock this.logger.info(`Room ${roomId} has no in-progress recordings, releasing orphaned lock`); await this.mutexService.release(lockKey); } catch (error) { @@ -756,4 +768,120 @@ export class RecordingService { throw error; } } + + /** + * Performs cleanup of stale recordings across the system. + * + * This method identifies and aborts recordings that have become stale by: + * 1. Getting all recordings in progress from LiveKit + * 2. Checking their last update time + * 3. Aborting recordings that have exceeded the stale threshold + * 4. Updating their status in storage + * + * Stale recordings can occur when: + * - Network issues prevent normal completion + * - LiveKit egress process hangs or crashes + * - Room is forcibly deleted while recording is active + * + * @returns {Promise} A promise that resolves when the cleanup process completes + * @protected + */ + protected async performStaleRecordingsCleanup(): Promise { + this.logger.debug('Starting stale recordings cleanup process'); + + try { + // Get all in-progress recordings from LiveKit (across all rooms) + const allInProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(); + + if (allInProgressRecordings.length === 0) { + this.logger.debug('No in-progress recordings found'); + return; + } + + this.logger.debug(`Found ${allInProgressRecordings.length} in-progress recordings to check`); + + // Process in batches to avoid overwhelming the system + const BATCH_SIZE = 10; + let totalProcessed = 0; + let totalAborted = 0; + + for (let i = 0; i < allInProgressRecordings.length; i += BATCH_SIZE) { + const batch = allInProgressRecordings.slice(i, i + BATCH_SIZE); + + const results = await Promise.allSettled( + batch.map((egressInfo: EgressInfo) => this.evaluateAndAbortStaleRecording(egressInfo)) + ); + + results.forEach((result: PromiseSettledResult, index: number) => { + totalProcessed++; + + if (result.status === 'fulfilled' && result.value) { + totalAborted++; + } else if (result.status === 'rejected') { + const recordingId = RecordingHelper.extractRecordingIdFromEgress(batch[index]); + this.logger.error(`Failed to process stale recording ${recordingId}:`, result.reason); + } + }); + } + + this.logger.info( + `Stale recordings cleanup completed: processed=${totalProcessed}, aborted=${totalAborted}` + ); + } catch (error) { + this.logger.error('Error in stale recordings cleanup:', error); + } + } + + /** + * Evaluates a single recording and aborts it if it's considered stale. + * + * @param egressInfo - The egress information for the recording to evaluate + * @returns {Promise} True if the recording was aborted, false if it's still fresh + * @protected + */ + protected async evaluateAndAbortStaleRecording(egressInfo: EgressInfo): Promise { + const recordingId = RecordingHelper.extractRecordingIdFromEgress(egressInfo); + const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo); + const staleAfterMs = ms(INTERNAL_CONFIG.RECORDING_STALE_AFTER); + + try { + const { status } = await this.getRecording(recordingId); + + if (status === MeetRecordingStatus.ABORTED) { + this.logger.warn(`Recording ${recordingId} is already aborted`); + return true; + } + + if (!updatedAt) { + this.logger.warn(`Recording ${recordingId} has no updatedAt timestamp, keeping it as fresh`); + return false; + } + + // Check if recording has not been updated recently + const isStale = updatedAt < Date.now() - staleAfterMs; + + this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`); + + if (!isStale) { + this.logger.debug(`Recording ${recordingId} is still fresh`); + return false; + } + + this.logger.warn(`Recording ${recordingId} is stale, aborting...`); + + // Abort the recording + const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId); + + await Promise.all([ + this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED), + this.livekitService.stopEgress(egressId) + ]); + + this.logger.info(`Successfully aborted stale recording ${recordingId}`); + return true; + } catch (error) { + this.logger.error(`Error processing stale recording ${recordingId}:`, error); + throw error; + } + } }