backend: enhance recording management with active recordings retrieval and stale cleanup improvements
This commit is contained in:
parent
253b435fbe
commit
0d6838019d
@ -196,10 +196,10 @@ flowchart TD
|
||||
L -- "Error (recording not found, already stopped,\nor unknown error)" --> O["Reject Request"] --> J
|
||||
```
|
||||
|
||||
4. **Failure handling**:
|
||||
3. **Failure handling**:
|
||||
If an OpenVidu instance crashes while a recording is active, the lock remains in place. This scenario can block subsequent recording attempts if the lock is not released promptly. To mitigate this issue, a lock garbage collector is implemented to periodically clean up orphaned locks.
|
||||
|
||||
The garbage collector runs when the OpenVidu deployment starts, and then every 30 minutes.
|
||||
The garbage collector runs when the OpenVidu deployment starts, and then every 15 minutes.
|
||||
|
||||
```mermaid
|
||||
graph TD;
|
||||
@ -225,46 +225,53 @@ graph TD;
|
||||
L --> M
|
||||
M -->|More rooms| E
|
||||
M -->|No more rooms| N[Process completed]
|
||||
|
||||
```
|
||||
|
||||
5. **Stale recordings cleanup**:
|
||||
To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 15 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default).
|
||||
4. **Stale recordings cleanup**:
|
||||
To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 14 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default).
|
||||
|
||||
```mermaid
|
||||
graph TD;
|
||||
A[Initiate stale recordings cleanup] --> B[Get all in-progress recordings from LiveKit]
|
||||
A[Initiate stale recordings cleanup] --> B[Get all active recordings from database<br/>ACTIVE or ENDING status]
|
||||
B -->|Error| C[Log error and exit]
|
||||
B -->|No recordings found| D[Log and exit]
|
||||
B -->|Recordings found| E[Process recordings in batches of 10]
|
||||
|
||||
E --> F[For each recording in batch]
|
||||
F --> G[Extract recording ID and updatedAt]
|
||||
G --> H[Get recording status from storage]
|
||||
F --> G[Extract recordingId, roomId and egressId]
|
||||
G --> H[Check for corresponding egress in LiveKit]
|
||||
|
||||
H -->|Recording already ABORTED| I[Mark as already processed]
|
||||
H -->|Recording active| J[Check if updatedAt exists]
|
||||
H -->|No egress found| I[Recording is stale - no egress exists]
|
||||
H -->|Egress exists| J[Extract updatedAt from egress]
|
||||
|
||||
J -->|No updatedAt timestamp| K[Keep as fresh - log warning]
|
||||
J -->|Has updatedAt| L[Calculate if stale]
|
||||
I --> K[Update status to ABORTED in database]
|
||||
K --> L[Log successful abort - no egress found]
|
||||
|
||||
L -->|Still fresh| M[Log as fresh]
|
||||
L -->|Is stale| N[Abort stale recording]
|
||||
J -->|No updatedAt timestamp| M[Keep as fresh - log warning]
|
||||
J -->|Has updatedAt| N[Check if recording age is stale]
|
||||
|
||||
N --> O[Update status to ABORTED in storage]
|
||||
N --> P[Stop egress in LiveKit]
|
||||
O --> Q[Log successful abort]
|
||||
P --> Q
|
||||
N -->|Age not stale| O[Log as fresh]
|
||||
N -->|Age is stale| P[Check room existence]
|
||||
|
||||
I --> R[Continue to next recording]
|
||||
K --> R
|
||||
M --> R
|
||||
Q --> R
|
||||
P -->|Room does not exist| Q[Mark as stale]
|
||||
P -->|Room exists| R[Check if room has participants]
|
||||
|
||||
R -->|More recordings in batch| F
|
||||
R -->|Batch complete| S[Process next batch]
|
||||
S -->|More batches| E
|
||||
S -->|All batches processed| T[Log completion metrics]
|
||||
T --> U[Process completed]
|
||||
R -->|No participants| Q
|
||||
R -->|Has participants| O
|
||||
|
||||
Q --> S[Update status to ABORTED in database]
|
||||
Q --> T[Stop egress in LiveKit]
|
||||
S --> U[Log successful abort]
|
||||
T --> U
|
||||
|
||||
L --> V[Continue to next recording]
|
||||
M --> V
|
||||
O --> V
|
||||
U --> V
|
||||
|
||||
V -->|More recordings in batch| F
|
||||
V -->|Batch complete| W[Process next batch]
|
||||
W -->|More batches| E
|
||||
W -->|All batches processed| X[Log completion metrics]
|
||||
X --> Y[Process completed]
|
||||
```
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { MeetRecordingInfo } from '@openvidu-meet/typings';
|
||||
import { MeetRecordingInfo, MeetRecordingStatus } from '@openvidu-meet/typings';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { uid as secureUid } from 'uid/secure';
|
||||
import { MeetRecordingDocument, MeetRecordingModel } from '../models/mongoose-schemas/index.js';
|
||||
@ -173,6 +173,18 @@ export class RecordingRepository<TRecording extends MeetRecordingInfo = MeetReco
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all active recordings (status 'ACTIVE' or 'ENDING').
|
||||
* Returns all active recordings without pagination.
|
||||
*
|
||||
* @returns Array of active recordings
|
||||
*/
|
||||
async findActiveRecordings(): Promise<TRecording[]> {
|
||||
return await this.findAll({
|
||||
status: { $in: [MeetRecordingStatus.ACTIVE, MeetRecordingStatus.ENDING] }
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a recording by its recordingId.
|
||||
*
|
||||
|
||||
@ -119,6 +119,21 @@ export class RoomRepository<TRoom extends MeetRoom = MeetRoom> extends BaseRepos
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all rooms that have expired (autoDeletionDate < now).
|
||||
* Returns all expired rooms without pagination.
|
||||
*
|
||||
* @returns Array of expired rooms with enriched URLs
|
||||
*/
|
||||
async findExpiredRooms(): Promise<TRoom[]> {
|
||||
const now = Date.now();
|
||||
|
||||
// Find all rooms where autoDeletionDate exists and is less than now
|
||||
return await this.findAll({
|
||||
autoDeletionDate: { $exists: true, $lt: now }
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a room by its roomId.
|
||||
*
|
||||
@ -139,21 +154,6 @@ export class RoomRepository<TRoom extends MeetRoom = MeetRoom> extends BaseRepos
|
||||
await this.deleteMany({ roomId: { $in: roomIds } });
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all rooms that have expired (autoDeletionDate < now).
|
||||
* Returns all expired rooms without pagination.
|
||||
*
|
||||
* @returns Array of expired rooms with enriched URLs
|
||||
*/
|
||||
async findExpiredRooms(): Promise<TRoom[]> {
|
||||
const now = Date.now();
|
||||
|
||||
// Find all rooms where autoDeletionDate exists and is less than now
|
||||
return await this.findAll({
|
||||
autoDeletionDate: { $exists: true, $lt: now }
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Counts the total number of rooms.
|
||||
*/
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@openvidu-meet/typings';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { EgressInfo, EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
|
||||
import { EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
|
||||
import ms from 'ms';
|
||||
import { Readable } from 'stream';
|
||||
import { uid } from 'uid';
|
||||
@ -894,43 +894,39 @@ export class RecordingService {
|
||||
* Performs garbage collection for stale recordings in the system.
|
||||
*
|
||||
* This method identifies and aborts recordings that have become stale by:
|
||||
* 1. Getting all recordings in progress from LiveKit
|
||||
* 2. Checking their last update time
|
||||
* 3. Aborting recordings that have exceeded the stale threshold
|
||||
* 4. Updating their status in storage
|
||||
* 1. Getting all active recordings from database (ACTIVE or ENDING status)
|
||||
* 2. Checking if there's a corresponding in-progress egress in LiveKit
|
||||
* 3. If no egress exists, marking the recording as ABORTED
|
||||
* 4. If egress exists, checking last update time and aborting if stale
|
||||
*
|
||||
* Stale recordings can occur when:
|
||||
* - Network issues prevent normal completion
|
||||
* - LiveKit egress process hangs or crashes
|
||||
* - Room is forcibly deleted while recording is active
|
||||
*
|
||||
* @returns {Promise<void>} A promise that resolves when the cleanup process completes
|
||||
* @protected
|
||||
*/
|
||||
protected async performStaleRecordingsGC(): Promise<void> {
|
||||
this.logger.debug('Starting stale recordings cleanup process');
|
||||
|
||||
try {
|
||||
// Get all in-progress recordings from LiveKit (across all rooms)
|
||||
const allInProgressRecordings = await this.livekitService.getInProgressRecordingsEgress();
|
||||
// Get all active recordings from database (ACTIVE or ENDING status)
|
||||
const activeRecordings = await this.recordingRepository.findActiveRecordings();
|
||||
|
||||
if (allInProgressRecordings.length === 0) {
|
||||
this.logger.debug('No in-progress recordings found');
|
||||
if (activeRecordings.length === 0) {
|
||||
this.logger.debug('No active recordings found in database');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(`Found ${allInProgressRecordings.length} in-progress recordings to check`);
|
||||
this.logger.debug(`Found ${activeRecordings.length} active recordings in database to check`);
|
||||
|
||||
// Process in batches to avoid overwhelming the system
|
||||
const BATCH_SIZE = 10;
|
||||
let totalProcessed = 0;
|
||||
let totalAborted = 0;
|
||||
|
||||
for (let i = 0; i < allInProgressRecordings.length; i += BATCH_SIZE) {
|
||||
const batch = allInProgressRecordings.slice(i, i + BATCH_SIZE);
|
||||
for (let i = 0; i < activeRecordings.length; i += BATCH_SIZE) {
|
||||
const batch = activeRecordings.slice(i, i + BATCH_SIZE);
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
batch.map((egressInfo: EgressInfo) => this.evaluateAndAbortStaleRecording(egressInfo))
|
||||
batch.map((recording: MeetRecordingInfo) => this.evaluateAndAbortStaleRecording(recording))
|
||||
);
|
||||
|
||||
results.forEach((result: PromiseSettledResult<boolean>, index: number) => {
|
||||
@ -939,8 +935,8 @@ export class RecordingService {
|
||||
if (result.status === 'fulfilled' && result.value) {
|
||||
totalAborted++;
|
||||
} else if (result.status === 'rejected') {
|
||||
const recordingId = RecordingHelper.extractRecordingIdFromEgress(batch[index]);
|
||||
this.logger.error(`Failed to process stale recording ${recordingId}:`, result.reason);
|
||||
const recordingId = batch[index].recordingId;
|
||||
this.logger.error(`Failed to process recording ${recordingId}:`, result.reason);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -955,34 +951,48 @@ export class RecordingService {
|
||||
|
||||
/**
|
||||
* Evaluates whether a recording is stale and aborts it if necessary.
|
||||
* A recording is considered stale if it has not been updated within the configured stale period
|
||||
* and either the associated LiveKit room does not exist or has no publishers.
|
||||
* If the recording is already aborted or has no updatedAt timestamp, it is kept as fresh.
|
||||
* First checks if there's a corresponding egress in LiveKit. If not, the recording is immediately
|
||||
* considered stale and aborted. If an egress exists, checks if it has been updated within the
|
||||
* configured stale period and whether the associated room exists or has publishers.
|
||||
*
|
||||
* @param egressInfo - The egress information containing details about the recording.
|
||||
* @param recording - The recording information from MongoDB.
|
||||
* @returns A promise that resolves to `true` if the recording was aborted, `false` otherwise.
|
||||
* @throws Will throw an error if there is an issue retrieving recording status, checking room existence,
|
||||
* @throws Will throw an error if there is an issue checking egress existence, room existence,
|
||||
* or aborting the recording.
|
||||
*/
|
||||
protected async evaluateAndAbortStaleRecording(egressInfo: EgressInfo): Promise<boolean> {
|
||||
const recordingId = RecordingHelper.extractRecordingIdFromEgress(egressInfo);
|
||||
const { roomId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo);
|
||||
protected async evaluateAndAbortStaleRecording(recording: MeetRecordingInfo): Promise<boolean> {
|
||||
const recordingId = recording.recordingId;
|
||||
const roomId = recording.roomId;
|
||||
const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
const staleAfterMs = ms(INTERNAL_CONFIG.RECORDING_STALE_GRACE_PERIOD);
|
||||
|
||||
try {
|
||||
const { status } = await this.getRecording(recordingId);
|
||||
// Check if there's a corresponding egress in LiveKit for this room
|
||||
const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId);
|
||||
const egressInfo = inProgressRecordings.find((egress) => egress.egressId === egressId);
|
||||
|
||||
if (status === MeetRecordingStatus.ABORTED) {
|
||||
this.logger.warn(`Recording ${recordingId} is already aborted`);
|
||||
if (!egressInfo) {
|
||||
// No egress found in LiveKit, recording is stale
|
||||
this.logger.warn(
|
||||
`Recording ${recordingId} has no corresponding egress in LiveKit, marking as stale and aborting...`
|
||||
);
|
||||
|
||||
await this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED);
|
||||
this.logger.info(`Successfully aborted stale recording ${recordingId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Egress exists, check if it's stale based on updatedAt timestamp
|
||||
const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo);
|
||||
|
||||
if (!updatedAt) {
|
||||
this.logger.warn(`Recording ${recordingId} has no updatedAt timestamp, keeping it as fresh`);
|
||||
return false;
|
||||
}
|
||||
|
||||
this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`);
|
||||
|
||||
// Check if recording has not been updated recently
|
||||
const lkRoomExists = await this.livekitService.roomExists(roomId);
|
||||
const ageIsStale = updatedAt < Date.now() - staleAfterMs;
|
||||
let isRecordingStale = false;
|
||||
@ -996,19 +1006,16 @@ export class RecordingService {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if recording has not been updated recently
|
||||
this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`);
|
||||
|
||||
if (!isRecordingStale) {
|
||||
this.logger.debug(`Recording ${recordingId} is still fresh`);
|
||||
return false;
|
||||
}
|
||||
|
||||
this.logger.warn(`Room ${roomId} does not exist or has no participants and recording ${recordingId} is stale, aborting...`);
|
||||
this.logger.warn(
|
||||
`Room ${roomId} does not exist or has no participants and recording ${recordingId} is stale, aborting...`
|
||||
);
|
||||
|
||||
// Abort the recording
|
||||
const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
|
||||
await Promise.all([
|
||||
this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED),
|
||||
this.livekitService.stopEgress(egressId)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user