From 0d6838019d417afe64b7cfc5b11f9fa4abd0eec5 Mon Sep 17 00:00:00 2001 From: juancarmore Date: Fri, 21 Nov 2025 11:32:34 +0100 Subject: [PATCH] backend: enhance recording management with active recordings retrieval and stale cleanup improvements --- meet-ce/backend/README.md | 61 +++++++------- .../src/repositories/recording.repository.ts | 14 +++- .../src/repositories/room.repository.ts | 30 +++---- .../backend/src/services/recording.service.ts | 81 ++++++++++--------- 4 files changed, 106 insertions(+), 80 deletions(-) diff --git a/meet-ce/backend/README.md b/meet-ce/backend/README.md index 5efef45f..6a9fcbad 100644 --- a/meet-ce/backend/README.md +++ b/meet-ce/backend/README.md @@ -196,10 +196,10 @@ flowchart TD L -- "Error (recording not found, already stopped,\nor unknown error)" --> O["Reject Request"] --> J ``` -4. **Failure handling**: +3. **Failure handling**: If an OpenVidu instance crashes while a recording is active, the lock remains in place. This scenario can block subsequent recording attempts if the lock is not released promptly. To mitigate this issue, a lock garbage collector is implemented to periodically clean up orphaned locks. - The garbage collector runs when the OpenVidu deployment starts, and then every 30 minutes. + The garbage collector runs when the OpenVidu deployment starts, and then every 15 minutes. ```mermaid graph TD; @@ -225,46 +225,53 @@ graph TD; L --> M M -->|More rooms| E M -->|No more rooms| N[Process completed] - ``` -5. **Stale recordings cleanup**: - To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 15 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default). +4. **Stale recordings cleanup**: + To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 14 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default). ```mermaid graph TD; - A[Initiate stale recordings cleanup] --> B[Get all in-progress recordings from LiveKit] + A[Initiate stale recordings cleanup] --> B[Get all active recordings from database
ACTIVE or ENDING status] B -->|Error| C[Log error and exit] B -->|No recordings found| D[Log and exit] B -->|Recordings found| E[Process recordings in batches of 10] E --> F[For each recording in batch] - F --> G[Extract recording ID and updatedAt] - G --> H[Get recording status from storage] + F --> G[Extract recordingId, roomId and egressId] + G --> H[Check for corresponding egress in LiveKit] - H -->|Recording already ABORTED| I[Mark as already processed] - H -->|Recording active| J[Check if updatedAt exists] + H -->|No egress found| I[Recording is stale - no egress exists] + H -->|Egress exists| J[Extract updatedAt from egress] - J -->|No updatedAt timestamp| K[Keep as fresh - log warning] - J -->|Has updatedAt| L[Calculate if stale] + I --> K[Update status to ABORTED in database] + K --> L[Log successful abort - no egress found] - L -->|Still fresh| M[Log as fresh] - L -->|Is stale| N[Abort stale recording] + J -->|No updatedAt timestamp| M[Keep as fresh - log warning] + J -->|Has updatedAt| N[Check if recording age is stale] - N --> O[Update status to ABORTED in storage] - N --> P[Stop egress in LiveKit] - O --> Q[Log successful abort] - P --> Q + N -->|Age not stale| O[Log as fresh] + N -->|Age is stale| P[Check room existence] - I --> R[Continue to next recording] - K --> R - M --> R - Q --> R + P -->|Room does not exist| Q[Mark as stale] + P -->|Room exists| R[Check if room has participants] - R -->|More recordings in batch| F - R -->|Batch complete| S[Process next batch] - S -->|More batches| E - S -->|All batches processed| T[Log completion metrics] - T --> U[Process completed] + R -->|No participants| Q + R -->|Has participants| O + Q --> S[Update status to ABORTED in database] + Q --> T[Stop egress in LiveKit] + S --> U[Log successful abort] + T --> U + + L --> V[Continue to next recording] + M --> V + O --> V + U --> V + + V -->|More recordings in batch| F + V -->|Batch complete| W[Process next batch] + W -->|More batches| E + W -->|All batches processed| X[Log completion metrics] + X --> Y[Process completed] ``` diff --git a/meet-ce/backend/src/repositories/recording.repository.ts b/meet-ce/backend/src/repositories/recording.repository.ts index 1c27c18b..e5d9db2f 100644 --- a/meet-ce/backend/src/repositories/recording.repository.ts +++ b/meet-ce/backend/src/repositories/recording.repository.ts @@ -1,4 +1,4 @@ -import { MeetRecordingInfo } from '@openvidu-meet/typings'; +import { MeetRecordingInfo, MeetRecordingStatus } from '@openvidu-meet/typings'; import { inject, injectable } from 'inversify'; import { uid as secureUid } from 'uid/secure'; import { MeetRecordingDocument, MeetRecordingModel } from '../models/mongoose-schemas/index.js'; @@ -173,6 +173,18 @@ export class RecordingRepository { + return await this.findAll({ + status: { $in: [MeetRecordingStatus.ACTIVE, MeetRecordingStatus.ENDING] } + }); + } + /** * Deletes a recording by its recordingId. * diff --git a/meet-ce/backend/src/repositories/room.repository.ts b/meet-ce/backend/src/repositories/room.repository.ts index 1db40742..0510424d 100644 --- a/meet-ce/backend/src/repositories/room.repository.ts +++ b/meet-ce/backend/src/repositories/room.repository.ts @@ -119,6 +119,21 @@ export class RoomRepository extends BaseRepos }; } + /** + * Finds all rooms that have expired (autoDeletionDate < now). + * Returns all expired rooms without pagination. + * + * @returns Array of expired rooms with enriched URLs + */ + async findExpiredRooms(): Promise { + const now = Date.now(); + + // Find all rooms where autoDeletionDate exists and is less than now + return await this.findAll({ + autoDeletionDate: { $exists: true, $lt: now } + }); + } + /** * Deletes a room by its roomId. * @@ -139,21 +154,6 @@ export class RoomRepository extends BaseRepos await this.deleteMany({ roomId: { $in: roomIds } }); } - /** - * Finds all rooms that have expired (autoDeletionDate < now). - * Returns all expired rooms without pagination. - * - * @returns Array of expired rooms with enriched URLs - */ - async findExpiredRooms(): Promise { - const now = Date.now(); - - // Find all rooms where autoDeletionDate exists and is less than now - return await this.findAll({ - autoDeletionDate: { $exists: true, $lt: now } - }); - } - /** * Counts the total number of rooms. */ diff --git a/meet-ce/backend/src/services/recording.service.ts b/meet-ce/backend/src/services/recording.service.ts index 672ff22f..10ab0899 100644 --- a/meet-ce/backend/src/services/recording.service.ts +++ b/meet-ce/backend/src/services/recording.service.ts @@ -1,6 +1,6 @@ import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@openvidu-meet/typings'; import { inject, injectable } from 'inversify'; -import { EgressInfo, EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk'; +import { EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk'; import ms from 'ms'; import { Readable } from 'stream'; import { uid } from 'uid'; @@ -894,43 +894,39 @@ export class RecordingService { * Performs garbage collection for stale recordings in the system. * * This method identifies and aborts recordings that have become stale by: - * 1. Getting all recordings in progress from LiveKit - * 2. Checking their last update time - * 3. Aborting recordings that have exceeded the stale threshold - * 4. Updating their status in storage + * 1. Getting all active recordings from database (ACTIVE or ENDING status) + * 2. Checking if there's a corresponding in-progress egress in LiveKit + * 3. If no egress exists, marking the recording as ABORTED + * 4. If egress exists, checking last update time and aborting if stale * * Stale recordings can occur when: * - Network issues prevent normal completion * - LiveKit egress process hangs or crashes - * - Room is forcibly deleted while recording is active - * - * @returns {Promise} A promise that resolves when the cleanup process completes - * @protected */ protected async performStaleRecordingsGC(): Promise { this.logger.debug('Starting stale recordings cleanup process'); try { - // Get all in-progress recordings from LiveKit (across all rooms) - const allInProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(); + // Get all active recordings from database (ACTIVE or ENDING status) + const activeRecordings = await this.recordingRepository.findActiveRecordings(); - if (allInProgressRecordings.length === 0) { - this.logger.debug('No in-progress recordings found'); + if (activeRecordings.length === 0) { + this.logger.debug('No active recordings found in database'); return; } - this.logger.debug(`Found ${allInProgressRecordings.length} in-progress recordings to check`); + this.logger.debug(`Found ${activeRecordings.length} active recordings in database to check`); // Process in batches to avoid overwhelming the system const BATCH_SIZE = 10; let totalProcessed = 0; let totalAborted = 0; - for (let i = 0; i < allInProgressRecordings.length; i += BATCH_SIZE) { - const batch = allInProgressRecordings.slice(i, i + BATCH_SIZE); + for (let i = 0; i < activeRecordings.length; i += BATCH_SIZE) { + const batch = activeRecordings.slice(i, i + BATCH_SIZE); const results = await Promise.allSettled( - batch.map((egressInfo: EgressInfo) => this.evaluateAndAbortStaleRecording(egressInfo)) + batch.map((recording: MeetRecordingInfo) => this.evaluateAndAbortStaleRecording(recording)) ); results.forEach((result: PromiseSettledResult, index: number) => { @@ -939,8 +935,8 @@ export class RecordingService { if (result.status === 'fulfilled' && result.value) { totalAborted++; } else if (result.status === 'rejected') { - const recordingId = RecordingHelper.extractRecordingIdFromEgress(batch[index]); - this.logger.error(`Failed to process stale recording ${recordingId}:`, result.reason); + const recordingId = batch[index].recordingId; + this.logger.error(`Failed to process recording ${recordingId}:`, result.reason); } }); } @@ -955,34 +951,48 @@ export class RecordingService { /** * Evaluates whether a recording is stale and aborts it if necessary. - * A recording is considered stale if it has not been updated within the configured stale period - * and either the associated LiveKit room does not exist or has no publishers. - * If the recording is already aborted or has no updatedAt timestamp, it is kept as fresh. + * First checks if there's a corresponding egress in LiveKit. If not, the recording is immediately + * considered stale and aborted. If an egress exists, checks if it has been updated within the + * configured stale period and whether the associated room exists or has publishers. * - * @param egressInfo - The egress information containing details about the recording. + * @param recording - The recording information from MongoDB. * @returns A promise that resolves to `true` if the recording was aborted, `false` otherwise. - * @throws Will throw an error if there is an issue retrieving recording status, checking room existence, + * @throws Will throw an error if there is an issue checking egress existence, room existence, * or aborting the recording. */ - protected async evaluateAndAbortStaleRecording(egressInfo: EgressInfo): Promise { - const recordingId = RecordingHelper.extractRecordingIdFromEgress(egressInfo); - const { roomId } = RecordingHelper.extractInfoFromRecordingId(recordingId); - const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo); + protected async evaluateAndAbortStaleRecording(recording: MeetRecordingInfo): Promise { + const recordingId = recording.recordingId; + const roomId = recording.roomId; + const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId); const staleAfterMs = ms(INTERNAL_CONFIG.RECORDING_STALE_GRACE_PERIOD); try { - const { status } = await this.getRecording(recordingId); + // Check if there's a corresponding egress in LiveKit for this room + const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId); + const egressInfo = inProgressRecordings.find((egress) => egress.egressId === egressId); - if (status === MeetRecordingStatus.ABORTED) { - this.logger.warn(`Recording ${recordingId} is already aborted`); + if (!egressInfo) { + // No egress found in LiveKit, recording is stale + this.logger.warn( + `Recording ${recordingId} has no corresponding egress in LiveKit, marking as stale and aborting...` + ); + + await this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED); + this.logger.info(`Successfully aborted stale recording ${recordingId}`); return true; } + // Egress exists, check if it's stale based on updatedAt timestamp + const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo); + if (!updatedAt) { this.logger.warn(`Recording ${recordingId} has no updatedAt timestamp, keeping it as fresh`); return false; } + this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`); + + // Check if recording has not been updated recently const lkRoomExists = await this.livekitService.roomExists(roomId); const ageIsStale = updatedAt < Date.now() - staleAfterMs; let isRecordingStale = false; @@ -996,19 +1006,16 @@ export class RecordingService { } } - // Check if recording has not been updated recently - this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`); - if (!isRecordingStale) { this.logger.debug(`Recording ${recordingId} is still fresh`); return false; } - this.logger.warn(`Room ${roomId} does not exist or has no participants and recording ${recordingId} is stale, aborting...`); + this.logger.warn( + `Room ${roomId} does not exist or has no participants and recording ${recordingId} is stale, aborting...` + ); // Abort the recording - const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId); - await Promise.all([ this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED), this.livekitService.stopEgress(egressId)