backend: enhance recording management with orphaned lock grace period and stale recording handling

This commit is contained in:
Carlos Santos 2025-08-12 17:12:11 +02:00
parent 0daa884ed0
commit 5546e92fed
5 changed files with 221 additions and 42 deletions

View File

@ -166,3 +166,44 @@ graph TD;
M -->|No more rooms| N[Process completed]
```
5. **Stale recordings cleanup**:
To handle recordings that become stale due to network issues, LiveKit or Egress crashes, or other unexpected situations, a separate cleanup process runs every 15 minutes to identify and abort recordings that haven't been updated within a configured threshold (5 minutes by default).
```mermaid
graph TD;
A[Initiate stale recordings cleanup] --> B[Get all in-progress recordings from LiveKit]
B -->|Error| C[Log error and exit]
B -->|No recordings found| D[Log and exit]
B -->|Recordings found| E[Process recordings in batches of 10]
E --> F[For each recording in batch]
F --> G[Extract recording ID and updatedAt]
G --> H[Get recording status from storage]
H -->|Recording already ABORTED| I[Mark as already processed]
H -->|Recording active| J[Check if updatedAt exists]
J -->|No updatedAt timestamp| K[Keep as fresh - log warning]
J -->|Has updatedAt| L[Calculate if stale]
L -->|Still fresh| M[Log as fresh]
L -->|Is stale| N[Abort stale recording]
N --> O[Update status to ABORTED in storage]
N --> P[Stop egress in LiveKit]
O --> Q[Log successful abort]
P --> Q
I --> R[Continue to next recording]
K --> R
M --> R
Q --> R
R -->|More recordings in batch| F
R -->|Batch complete| S[Process next batch]
S -->|More batches| E
S -->|All batches processed| T[Log completion metrics]
T --> U[Process completed]
```

View File

@ -33,11 +33,14 @@ const INTERNAL_CONFIG = {
S3_USERS_PREFIX: 'users',
S3_API_KEYS_PREFIX: 'api_keys',
// Garbage collection and recording lock intervals
// Garbage collection and recording intervals
ROOM_GC_INTERVAL: '1h' as StringValue, // e.g. garbage collector interval for rooms
RECORDING_LOCK_TTL: '6h' as StringValue, // TTL for recording lock in Redis
RECORDING_STARTED_TIMEOUT: '20s' as StringValue, // Timeout for recording start
RECORDING_LOCK_GC_INTERVAL: '30m' as StringValue, // Garbage collection interval for recording locks
RECORDING_ORPHANED_LOCK_GRACE_PERIOD: '1m' as StringValue, // Grace period for orphaned recording locks
RECORDING_STALE_CLEANUP_INTERVAL: '15m' as StringValue, // Cleanup interval for stale recordings
RECORDING_STALE_AFTER: '5m' as StringValue, // Maximum allowed time since the last recording update before marking as stale
CRON_JOB_MIN_LOCK_TTL: '59s' as StringValue, // Minimum TTL for cron job locks
// Additional intervals

View File

@ -18,14 +18,14 @@ export class RecordingHelper {
const startDateMs = RecordingHelper.extractStartDate(egressInfo);
const endDateMs = RecordingHelper.extractEndDate(egressInfo);
const filename = RecordingHelper.extractFilename(egressInfo);
const uid = RecordingHelper.extractUidFromFilename(filename);
const { egressId, roomName: roomId, errorCode, error, details } = egressInfo;
const recordingId = RecordingHelper.extractRecordingIdFromEgress(egressInfo);
const { roomName: roomId, errorCode, error, details } = egressInfo;
const roomService = container.get(RoomService);
const { roomName } = await roomService.getMeetRoom(roomId);
return {
recordingId: `${roomId}--${egressId}--${uid}`,
recordingId,
roomId,
roomName,
// outputMode,
@ -132,6 +132,13 @@ export class RecordingHelper {
return uidWithExtension.split('.')[0];
}
static extractRecordingIdFromEgress(egressInfo: EgressInfo): string {
const { roomName: meetRoomId, egressId } = egressInfo;
const filename = RecordingHelper.extractFilename(egressInfo);
const uid = RecordingHelper.extractUidFromFilename(filename);
return `${meetRoomId}--${egressId}--${uid}`;
}
/**
* Extracts the room name, egressId, and UID from the given recordingId.
* @param recordingId ${roomId}--${egressId}--${uid}
@ -181,6 +188,11 @@ export class RecordingHelper {
return this.toMilliseconds(Number(createdAt));
}
static extractUpdatedDate(egressInfo: EgressInfo): number | undefined {
const updatedAt = egressInfo.updatedAt;
return updatedAt ? this.toMilliseconds(Number(updatedAt)) : undefined;
}
/**
* Extracts the size from the given EgressInfo object.
* If the size is not available, it returns 0.

View File

@ -193,16 +193,14 @@ export class LiveKitService {
* @returns A Promise that resolves when the metadata has been successfully updated
* @throws An internal error if there is an issue updating the metadata
*/
async updateParticipantMetadata(
roomName: string,
participantName: string,
metadata: string
): Promise<void> {
async updateParticipantMetadata(roomName: string, participantName: string, metadata: string): Promise<void> {
try {
await this.roomClient.updateParticipant(roomName, participantName, metadata);
this.logger.verbose(`Updated metadata for participant ${participantName} in room ${roomName}`);
} catch (error) {
this.logger.error(`Error updating metadata for participant ${participantName} in room ${roomName}: ${error}`);
this.logger.error(
`Error updating metadata for participant ${participantName} in room ${roomName}: ${error}`
);
throw internalError(`updating metadata for participant '${participantName}' in room '${roomName}'`);
}
}
@ -351,12 +349,9 @@ export class LiveKitService {
*/
async getInProgressRecordingsEgress(roomName?: string): Promise<EgressInfo[]> {
try {
const egressArray = await this.getEgress(roomName);
return egressArray.filter((egress) => {
if (!RecordingHelper.isRecordingEgress(egress)) {
return false;
}
const egressArray = await this.getRecordingsEgress(roomName);
return egressArray.filter((egress) => {
// Check if recording is in any "in-progress" state
return [EgressStatus.EGRESS_STARTING, EgressStatus.EGRESS_ACTIVE, EgressStatus.EGRESS_ENDING].includes(
egress.status

View File

@ -1,6 +1,6 @@
import { MeetRecordingFilters, MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce';
import { inject, injectable } from 'inversify';
import { EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
import { EgressInfo, EgressStatus, EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
import ms from 'ms';
import { Readable } from 'stream';
import { uid } from 'uid';
@ -55,6 +55,15 @@ export class RecordingService {
callback: this.performRecordingLocksGarbageCollection.bind(this)
};
this.taskSchedulerService.registerTask(recordingGarbageCollectorTask);
// Register the stale recordings cleanup task
const staleRecordingsCleanupTask: IScheduledTask = {
name: 'staleRecordingsCleanup',
type: 'cron',
scheduleOrDelay: INTERNAL_CONFIG.RECORDING_STALE_CLEANUP_INTERVAL,
callback: this.performStaleRecordingsCleanup.bind(this)
};
this.taskSchedulerService.registerTask(staleRecordingsCleanupTask);
}
async startRecording(roomId: string): Promise<MeetRecordingInfo> {
@ -680,10 +689,20 @@ export class RecordingService {
const roomIds = recordingLocks.map((lock) => lock.resources[0].replace(lockPrefix, ''));
// Check each room id if it exists in LiveKit
// If the room does not exist, release the lock
for (const roomId of roomIds) {
await this.evaluateAndReleaseOrphanedLock(roomId, lockPrefix);
const BATCH_SIZE = 10;
for (let i = 0; i < roomIds.length; i += BATCH_SIZE) {
const batch = roomIds.slice(i, i + BATCH_SIZE);
const results = await Promise.allSettled(
batch.map((roomId) => this.evaluateAndReleaseOrphanedLock(roomId, lockPrefix))
);
results.forEach((result, index) => {
if (result.status === 'rejected') {
this.logger.error(`Failed to process lock for room ${batch[index]}:`, result.reason);
}
});
}
} catch (error) {
this.logger.error('Error retrieving recording locks:', error);
@ -698,11 +717,14 @@ export class RecordingService {
*/
protected async evaluateAndReleaseOrphanedLock(roomId: string, lockPrefix: string): Promise<void> {
const lockKey = `${lockPrefix}${roomId}`;
const LOCK_GRACE_PERIOD = ms('1m');
const gracePeriodMs = ms(INTERNAL_CONFIG.RECORDING_ORPHANED_LOCK_GRACE_PERIOD);
try {
// Verify if the lock still exists before proceeding to check the room
const lockExists = await this.mutexService.lockExists(lockKey);
const [lockExists, lockCreatedAt] = await Promise.all([
this.mutexService.lockExists(lockKey),
this.mutexService.getLockCreatedAt(lockKey)
]);
if (!lockExists) {
this.logger.debug(`Lock for room ${roomId} no longer exists, skipping cleanup`);
@ -710,45 +732,35 @@ export class RecordingService {
}
// Verify if the lock is too recent
const createdAt = await this.mutexService.getLockCreatedAt(lockKey);
const lockAge = Date.now() - (createdAt || Date.now());
const lockAge = Date.now() - (lockCreatedAt || Date.now());
if (lockAge < LOCK_GRACE_PERIOD) {
if (lockAge < gracePeriodMs) {
this.logger.debug(
`Lock for room ${roomId} is too recent (${ms(lockAge)}), skipping orphan lock cleanup`
);
return;
}
const roomExists = await this.livekitService.roomExists(roomId);
const [lkRoomExists, inProgressRecordings] = await Promise.all([
this.livekitService.roomExists(roomId),
this.livekitService.getInProgressRecordingsEgress(roomId)
]);
if (roomExists) {
// Room exists, check if it has publishers
this.logger.debug(`Room ${roomId} exists, checking for publishers`);
const room = await this.livekitService.getRoom(roomId);
const hasPublishers = room.numPublishers > 0;
if (hasPublishers) {
// Room has publishers, but no in-progress recordings
this.logger.debug(`Room ${roomId} has publishers, checking for in-progress recordings`);
} else {
// Room has no publishers
this.logger.debug(`Room ${roomId} has no publishers, checking for in-progress recordings`);
}
if (lkRoomExists) {
this.logger.debug(`Room ${roomId} exists, checking recordings`);
} else {
// Room does not exist
this.logger.debug(`Room ${roomId} no longer exists, checking for in-progress recordings`);
}
// Verify if in-progress recordings exist
const inProgressRecordings = await this.livekitService.getInProgressRecordingsEgress(roomId);
const hasInProgressRecordings = inProgressRecordings.length > 0;
if (hasInProgressRecordings) {
this.logger.debug(`Room ${roomId} has in-progress recordings, skipping cleanup`);
this.logger.debug(`Room ${roomId} has in-progress recordings, keeping lock`);
return;
}
// No in-progress recordings, releasing orphaned lock
this.logger.info(`Room ${roomId} has no in-progress recordings, releasing orphaned lock`);
await this.mutexService.release(lockKey);
} catch (error) {
@ -756,4 +768,120 @@ export class RecordingService {
throw error;
}
}
/**
* Performs cleanup of stale recordings across the system.
*
* This method identifies and aborts recordings that have become stale by:
* 1. Getting all recordings in progress from LiveKit
* 2. Checking their last update time
* 3. Aborting recordings that have exceeded the stale threshold
* 4. Updating their status in storage
*
* Stale recordings can occur when:
* - Network issues prevent normal completion
* - LiveKit egress process hangs or crashes
* - Room is forcibly deleted while recording is active
*
* @returns {Promise<void>} A promise that resolves when the cleanup process completes
* @protected
*/
protected async performStaleRecordingsCleanup(): Promise<void> {
this.logger.debug('Starting stale recordings cleanup process');
try {
// Get all in-progress recordings from LiveKit (across all rooms)
const allInProgressRecordings = await this.livekitService.getInProgressRecordingsEgress();
if (allInProgressRecordings.length === 0) {
this.logger.debug('No in-progress recordings found');
return;
}
this.logger.debug(`Found ${allInProgressRecordings.length} in-progress recordings to check`);
// Process in batches to avoid overwhelming the system
const BATCH_SIZE = 10;
let totalProcessed = 0;
let totalAborted = 0;
for (let i = 0; i < allInProgressRecordings.length; i += BATCH_SIZE) {
const batch = allInProgressRecordings.slice(i, i + BATCH_SIZE);
const results = await Promise.allSettled(
batch.map((egressInfo: EgressInfo) => this.evaluateAndAbortStaleRecording(egressInfo))
);
results.forEach((result: PromiseSettledResult<boolean>, index: number) => {
totalProcessed++;
if (result.status === 'fulfilled' && result.value) {
totalAborted++;
} else if (result.status === 'rejected') {
const recordingId = RecordingHelper.extractRecordingIdFromEgress(batch[index]);
this.logger.error(`Failed to process stale recording ${recordingId}:`, result.reason);
}
});
}
this.logger.info(
`Stale recordings cleanup completed: processed=${totalProcessed}, aborted=${totalAborted}`
);
} catch (error) {
this.logger.error('Error in stale recordings cleanup:', error);
}
}
/**
* Evaluates a single recording and aborts it if it's considered stale.
*
* @param egressInfo - The egress information for the recording to evaluate
* @returns {Promise<boolean>} True if the recording was aborted, false if it's still fresh
* @protected
*/
protected async evaluateAndAbortStaleRecording(egressInfo: EgressInfo): Promise<boolean> {
const recordingId = RecordingHelper.extractRecordingIdFromEgress(egressInfo);
const updatedAt = RecordingHelper.extractUpdatedDate(egressInfo);
const staleAfterMs = ms(INTERNAL_CONFIG.RECORDING_STALE_AFTER);
try {
const { status } = await this.getRecording(recordingId);
if (status === MeetRecordingStatus.ABORTED) {
this.logger.warn(`Recording ${recordingId} is already aborted`);
return true;
}
if (!updatedAt) {
this.logger.warn(`Recording ${recordingId} has no updatedAt timestamp, keeping it as fresh`);
return false;
}
// Check if recording has not been updated recently
const isStale = updatedAt < Date.now() - staleAfterMs;
this.logger.debug(`Recording ${recordingId} last updated at ${new Date(updatedAt).toISOString()}`);
if (!isStale) {
this.logger.debug(`Recording ${recordingId} is still fresh`);
return false;
}
this.logger.warn(`Recording ${recordingId} is stale, aborting...`);
// Abort the recording
const { egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
await Promise.all([
this.updateRecordingStatus(recordingId, MeetRecordingStatus.ABORTED),
this.livekitService.stopEgress(egressId)
]);
this.logger.info(`Successfully aborted stale recording ${recordingId}`);
return true;
} catch (error) {
this.logger.error(`Error processing stale recording ${recordingId}:`, error);
throw error;
}
}
}