diff --git a/meet-ce/backend/README.md b/meet-ce/backend/README.md
index 5efef45f..6a9fcbad 100644
--- a/meet-ce/backend/README.md
+++ b/meet-ce/backend/README.md
@@ -196,10 +196,10 @@ flowchart TD
L -- "Error (recording not found, already stopped,\nor unknown error)" --> O["Reject Request"] --> J
```
-4. **Failure handling**:
+3. **Failure handling**:
If an OpenVidu instance crashes while a recording is active, the lock remains in place. This scenario can block subsequent recording attempts if the lock is not released promptly. To mitigate this issue, a lock garbage collector is implemented to periodically clean up orphaned locks.
- The garbage collector runs when the OpenVidu deployment starts, and then every 30 minutes.
+ The garbage collector runs when the OpenVidu deployment starts, and then every 15 minutes.
```mermaid
graph TD;
@@ -225,46 +225,53 @@ graph TD;
L --> M
M -->|More rooms| E
M -->|No more rooms| N[Process completed]
-
```
-5. **Stale recordings cleanup**:
- To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 15 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default).
+4. **Stale recordings cleanup**:
+ To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 14 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default).
```mermaid
graph TD;
- A[Initiate stale recordings cleanup] --> B[Get all in-progress recordings from LiveKit]
+ A[Initiate stale recordings cleanup] --> B[Get all active recordings from database
ACTIVE or ENDING status]
B -->|Error| C[Log error and exit]
B -->|No recordings found| D[Log and exit]
B -->|Recordings found| E[Process recordings in batches of 10]
E --> F[For each recording in batch]
- F --> G[Extract recording ID and updatedAt]
- G --> H[Get recording status from storage]
+ F --> G[Extract recordingId, roomId and egressId]
+ G --> H[Check for corresponding egress in LiveKit]
- H -->|Recording already ABORTED| I[Mark as already processed]
- H -->|Recording active| J[Check if updatedAt exists]
+ H -->|No egress found| I[Recording is stale - no egress exists]
+ H -->|Egress exists| J[Extract updatedAt from egress]
- J -->|No updatedAt timestamp| K[Keep as fresh - log warning]
- J -->|Has updatedAt| L[Calculate if stale]
+ I --> K[Update status to ABORTED in database]
+ K --> L[Log successful abort - no egress found]
- L -->|Still fresh| M[Log as fresh]
- L -->|Is stale| N[Abort stale recording]
+ J -->|No updatedAt timestamp| M[Keep as fresh - log warning]
+ J -->|Has updatedAt| N[Check if recording age is stale]
- N --> O[Update status to ABORTED in storage]
- N --> P[Stop egress in LiveKit]
- O --> Q[Log successful abort]
- P --> Q
+ N -->|Age not stale| O[Log as fresh]
+ N -->|Age is stale| P[Check room existence]
- I --> R[Continue to next recording]
- K --> R
- M --> R
- Q --> R
+ P -->|Room does not exist| Q[Mark as stale]
+ P -->|Room exists| R[Check if room has participants]
- R -->|More recordings in batch| F
- R -->|Batch complete| S[Process next batch]
- S -->|More batches| E
- S -->|All batches processed| T[Log completion metrics]
- T --> U[Process completed]
+ R -->|No participants| Q
+ R -->|Has participants| O
+ Q --> S[Update status to ABORTED in database]
+ Q --> T[Stop egress in LiveKit]
+ S --> U[Log successful abort]
+ T --> U
+
+ L --> V[Continue to next recording]
+ M --> V
+ O --> V
+ U --> V
+
+ V -->|More recordings in batch| F
+ V -->|Batch complete| W[Process next batch]
+ W -->|More batches| E
+ W -->|All batches processed| X[Log completion metrics]
+ X --> Y[Process completed]
```
diff --git a/meet-ce/backend/src/repositories/recording.repository.ts b/meet-ce/backend/src/repositories/recording.repository.ts
index 1c27c18b..e5d9db2f 100644
--- a/meet-ce/backend/src/repositories/recording.repository.ts
+++ b/meet-ce/backend/src/repositories/recording.repository.ts
@@ -1,4 +1,4 @@
-import { MeetRecordingInfo } from '@openvidu-meet/typings';
+import { MeetRecordingInfo, MeetRecordingStatus } from '@openvidu-meet/typings';
import { inject, injectable } from 'inversify';
import { uid as secureUid } from 'uid/secure';
import { MeetRecordingDocument, MeetRecordingModel } from '../models/mongoose-schemas/index.js';
@@ -173,6 +173,18 @@ export class RecordingRepository {
+ return await this.findAll({
+ status: { $in: [MeetRecordingStatus.ACTIVE, MeetRecordingStatus.ENDING] }
+ });
+ }
+
/**
* Deletes a recording by its recordingId.
*
diff --git a/meet-ce/backend/src/repositories/room.repository.ts b/meet-ce/backend/src/repositories/room.repository.ts
index 1db40742..0510424d 100644
--- a/meet-ce/backend/src/repositories/room.repository.ts
+++ b/meet-ce/backend/src/repositories/room.repository.ts
@@ -119,6 +119,21 @@ export class RoomRepository extends BaseRepos
};
}
+ /**
+ * Finds all rooms that have expired (autoDeletionDate < now).
+ * Returns all expired rooms without pagination.
+ *
+ * @returns Array of expired rooms with enriched URLs
+ */
+ async findExpiredRooms(): Promise {
+ const now = Date.now();
+
+ // Find all rooms where autoDeletionDate exists and is less than now
+ return await this.findAll({
+ autoDeletionDate: { $exists: true, $lt: now }
+ });
+ }
+
/**
* Deletes a room by its roomId.
*
@@ -139,21 +154,6 @@ export class RoomRepository extends BaseRepos
await this.deleteMany({ roomId: { $in: roomIds } });
}
- /**
- * Finds all rooms that have expired (autoDeletionDate < now).
- * Returns all expired rooms without pagination.
- *
- * @returns Array of expired rooms with enriched URLs
- */
- async findExpiredRooms(): Promise {
- const now = Date.now();
-
- // Find all rooms where autoDeletionDate exists and is less than now
- return await this.findAll({
- autoDeletionDate: { $exists: true, $lt: now }
- });
- }
-
/**
* Counts the total number of rooms.
*/
diff --git a/meet-ce/backend/src/services/recording.service.ts b/meet-ce/backend/src/services/recording.service.ts
index 672ff22f..10ab0899 100644
--- a/meet-ce/backend/src/services/recording.service.ts
+++ b/meet-ce/backend/src/services/recording.service.ts
@@ -1,6 +1,6 @@
import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@openvidu-meet/typings';
import { inject, injectable } from 'inversify';
-import { EgressInfo, EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
+import { EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
import ms from 'ms';
import { Readable } from 'stream';
import { uid } from 'uid';
@@ -894,43 +894,39 @@ export class RecordingService {
* Performs garbage collection for stale recordings in the system.
*
* This method identifies and aborts recordings that have become stale by:
- * 1. Getting all recordings in progress from LiveKit
- * 2. Checking their last update time
- * 3. Aborting recordings that have exceeded the stale threshold
- * 4. Updating their status in storage
+ * 1. Getting all active recordings from database (ACTIVE or ENDING status)
+ * 2. Checking if there's a corresponding in-progress egress in LiveKit
+ * 3. If no egress exists, marking the recording as ABORTED
+ * 4. If egress exists, checking last update time and aborting if stale
*
* Stale recordings can occur when:
* - Network issues prevent normal completion
* - LiveKit egress process hangs or crashes
- * - Room is forcibly deleted while recording is active
- *
- * @returns {Promise} A promise that resolves when the cleanup process completes
- * @protected
*/
protected async performStaleRecordingsGC(): Promise {
this.logger.debug('Starting stale recordings cleanup process');
try {
- // Get all in-progress recordings from LiveKit (across all rooms)
- const allInProgressRecordings = await this.livekitService.getInProgressRecordingsEgress();
+ // Get all active recordings from database (ACTIVE or ENDING status)
+ const activeRecordings = await this.recordingRepository.findActiveRecordings();
- if (allInProgressRecordings.length === 0) {
- this.logger.debug('No in-progress recordings found');
+ if (activeRecordings.length === 0) {
+ this.logger.debug('No active recordings found in database');
return;
}
- this.logger.debug(`Found ${allInProgressRecordings.length} in-progress recordings to check`);
+ this.logger.debug(`Found ${activeRecordings.length} active recordings in database to check`);
// Process in batches to avoid overwhelming the system
const BATCH_SIZE = 10;
let totalProcessed = 0;
let totalAborted = 0;
- for (let i = 0; i < allInProgressRecordings.length; i += BATCH_SIZE) {
- const batch = allInProgressRecordings.slice(i, i + BATCH_SIZE);
+ for (let i = 0; i < activeRecordings.length; i += BATCH_SIZE) {
+ const batch = activeRecordings.slice(i, i + BATCH_SIZE);
const results = await Promise.allSettled(
- batch.map((egressInfo: EgressInfo) => this.evaluateAndAbortStaleRecording(egressInfo))
+ batch.map((recording: MeetRecordingInfo) => this.evaluateAndAbortStaleRecording(recording))
);
results.forEach((result: PromiseSettledResult, index: number) => {
@@ -939,8 +935,8 @@ export class RecordingService {
if (result.status === 'fulfilled' && result.value) {
totalAborted++;
} else if (result.status === 'rejected') {
- const recordingId = RecordingHelper.extractRecordingIdFromEgress(batch[index]);
- this.logger.error(`Failed to process stale recording ${recordingId}:`, result.reason);
+ const recordingId = batch[index].recordingId;
+ this.logger.error(`Failed to process recording ${recordingId}:`, result.reason);
}
});
}
@@ -955,34 +951,48 @@ export class RecordingService {
/**
* Evaluates whether a recording is stale and aborts it if necessary.
- * A recording is considered stale if it has not been updated within the configured stale period
- * and either the associated LiveKit room does not exist or has no publishers.
- * If the recording is already aborted or has no updatedAt timestamp, it is kept as fresh.
+ * First checks if there's a corresponding egress in LiveKit. If not, the recording is immediately
+ * considered stale and aborted. If an egress exists, checks if it has been updated within the
+ * configured stale period and whether the associated room exists or has publishers.
*
- * @param egressInfo - The egress information containing details about the recording.
+ * @param recording - The recording information from MongoDB.
* @returns A promise that resolves to `true` if the recording was aborted, `false` otherwise.
- * @throws Will throw an error if there is an issue retrieving recording status, checking room existence,
+ * @throws Will throw an error if there is an issue checking egress existence, room existence,
* or aborting the recording.
*/
- protected async evaluateAndAbortStaleRecording(egressInfo: EgressInfo): Promise {
- const recordingId = RecordingHelper.extractRecordingIdFromEgress(egressInfo);
- const { roomId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
- const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo);
+ protected async evaluateAndAbortStaleRecording(recording: MeetRecordingInfo): Promise {
+ const recordingId = recording.recordingId;
+ const roomId = recording.roomId;
+ const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
const staleAfterMs = ms(INTERNAL_CONFIG.RECORDING_STALE_GRACE_PERIOD);
try {
- const { status } = await this.getRecording(recordingId);
+ // Check if there's a corresponding egress in LiveKit for this room
+ const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId);
+ const egressInfo = inProgressRecordings.find((egress) => egress.egressId === egressId);
- if (status === MeetRecordingStatus.ABORTED) {
- this.logger.warn(`Recording ${recordingId} is already aborted`);
+ if (!egressInfo) {
+ // No egress found in LiveKit, recording is stale
+ this.logger.warn(
+ `Recording ${recordingId} has no corresponding egress in LiveKit, marking as stale and aborting...`
+ );
+
+ await this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED);
+ this.logger.info(`Successfully aborted stale recording ${recordingId}`);
return true;
}
+ // Egress exists, check if it's stale based on updatedAt timestamp
+ const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo);
+
if (!updatedAt) {
this.logger.warn(`Recording ${recordingId} has no updatedAt timestamp, keeping it as fresh`);
return false;
}
+ this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`);
+
+ // Check if recording has not been updated recently
const lkRoomExists = await this.livekitService.roomExists(roomId);
const ageIsStale = updatedAt < Date.now() - staleAfterMs;
let isRecordingStale = false;
@@ -996,19 +1006,16 @@ export class RecordingService {
}
}
- // Check if recording has not been updated recently
- this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`);
-
if (!isRecordingStale) {
this.logger.debug(`Recording ${recordingId} is still fresh`);
return false;
}
- this.logger.warn(`Room ${roomId} does not exist or has no participants and recording ${recordingId} is stale, aborting...`);
+ this.logger.warn(
+ `Room ${roomId} does not exist or has no participants and recording ${recordingId} is stale, aborting...`
+ );
// Abort the recording
- const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
-
await Promise.all([
this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED),
this.livekitService.stopEgress(egressId)