backend: update recording start method for improving its clarity and avoding race conditions receiving events

This commit is contained in:
Carlos Santos 2025-04-24 11:41:53 +02:00
parent 8c6011b5c3
commit 9897436df2
4 changed files with 61 additions and 69 deletions

View File

@ -152,7 +152,7 @@ export class LivekitWebhookService {
try {
const [meetRoom] = await Promise.all([
this.roomService.getMeetRoom(room.name),
this.recordingService.releaseRoomRecordingActiveLock(room.name),
this.recordingService.releaseRecordingLockIfNoEgress(room.name),
this.openViduWebhookService.sendRoomFinishedWebhook(room)
]);
@ -223,7 +223,7 @@ export class LivekitWebhookService {
case 'ended':
tasks.push(
this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo),
this.recordingService.releaseRoomRecordingActiveLock(roomId)
this.recordingService.releaseRecordingLockIfNoEgress(roomId)
);
break;
}

View File

@ -58,36 +58,49 @@ export class RecordingService {
async startRecording(roomId: string): Promise<MeetRecordingInfo> {
let acquiredLock: RedisLock | null = null;
let eventListener!: (info: Record<string, unknown>) => void;
let recordingId = '';
let timeoutId: NodeJS.Timeout | undefined;
try {
await this.validateRoomsPreconditions(roomId);
const room = await this.roomService.getMeetRoom(roomId);
if (!room) throw errorRoomNotFound(roomId);
//TODO: Check if the room has participants before starting the recording
//room.numParticipants === 0 ? throw errorNoParticipants(roomId);
const lkRoom = await this.livekitService.getRoom(roomId);
if (!lkRoom) throw errorRoomNotFound(roomId);
const hasParticipants = await this.livekitService.roomHasParticipants(roomId);
if (!hasParticipants) throw errorRoomHasNoParticipants(roomId);
// Attempt to acquire lock. If the lock is not acquired, the recording is already active.
acquiredLock = await this.acquireRoomRecordingActiveLock(roomId);
if (!acquiredLock) throw errorRecordingAlreadyStarted(roomId);
let resolveRecording!: (r: MeetRecordingInfo) => void;
let rejectRecording!: (e: unknown) => void;
const recordingPromise = new Promise<MeetRecordingInfo>((res, rej) => {
resolveRecording = res;
rejectRecording = rej;
});
let recordingId = '';
eventListener = (info: Record<string, unknown>) => {
// This listener is triggered only for the instance that started the recording.
// Check if the recording ID matches the one that was started
const isEventForCurrentRecording = info?.recordingId === recordingId && info?.roomId === roomId;
if (isEventForCurrentRecording) {
this.taskSchedulerService.cancelTask(`${roomId}_recording_timeout`);
const startTimeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
resolveRecording(info as unknown as MeetRecordingInfo);
}
};
this.handleRecordingLockTimeout(recordingId, roomId, reject);
}, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT));
});
this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener);
this.registerRecordingTimeout(roomId, recordingId, eventListener, rejectRecording);
const eventReceivedPromise = new Promise<MeetRecordingInfo>((resolve) => {
eventListener = (info: Record<string, unknown>) => {
// Process the event only if it belongs to the current room.
// Each room has only ONE active recording at the same time
if (info?.roomId !== roomId) return;
clearTimeout(timeoutId);
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
resolve(info as unknown as MeetRecordingInfo);
};
this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener);
});
const options = this.generateCompositeOptionsFromRequest();
const output = this.generateFileOutputFromRequest(roomId);
@ -95,17 +108,26 @@ export class RecordingService {
const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
recordingId = recordingInfo.recordingId;
return await recordingPromise;
if (recordingInfo.status === MeetRecordingStatus.ACTIVE) {
clearTimeout(timeoutId);
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
return recordingInfo;
}
return await Promise.race([eventReceivedPromise, startTimeoutPromise]);
} catch (error) {
this.logger.error(`Error starting recording in room '${roomId}': ${error}`);
if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomId);
if (eventListener) this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
this.taskSchedulerService.cancelTask(`${roomId}_recording_timeout`);
throw error;
} finally {
try {
clearTimeout(timeoutId);
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
if (acquiredLock) await this.releaseRecordingLockIfNoEgress(roomId);
} catch (e) {
this.logger.warn(`Failed to release recording lock: ${e}`);
}
}
}
@ -327,12 +349,13 @@ export class RecordingService {
}
/**
* Releases the active recording lock for a specific room.
* Releases the active recording lock for a specified room, but only if there are no active egress operations.
*
* This method attempts to release a lock associated with the active recording
* of a given room.
* This method first checks for any ongoing egress operations for the room.
* If active egress operations are found, the lock isn't released as recording is still considered active.
* Otherwise, it proceeds to release the mutex lock associated with the room's recording.
*/
async releaseRoomRecordingActiveLock(roomId: string): Promise<void> {
async releaseRecordingLockIfNoEgress(roomId: string): Promise<void> {
if (roomId) {
const lockName = MeetLock.getRecordingActiveLock(roomId);
const egress = await this.livekitService.getActiveEgress(roomId);
@ -341,7 +364,7 @@ export class RecordingService {
this.logger.verbose(
`Active egress found for room ${roomId}: ${egress.map((e) => e.egressId).join(', ')}`
);
this.logger.error(`Cannot release recording lock for room '${roomId}'.`);
this.logger.debug(`Cannot release recording lock for room '${roomId}'. Recording is still active.`);
return;
}
@ -366,22 +389,6 @@ export class RecordingService {
return this.roomService.sendSignal(roomId, payload, options);
}
protected async validateRoomsPreconditions(roomId: string): Promise<void> {
const room = await this.roomService.getMeetRoom(roomId);
if (!room) throw errorRoomNotFound(roomId);
//TODO: Check if the room has participants before starting the recording
//room.numParticipants === 0 ? throw errorNoParticipants(roomId);
const lkRoom = await this.livekitService.getRoom(roomId);
if (!lkRoom) throw errorRoomNotFound(roomId);
const hasParticipants = await this.livekitService.roomHasParticipants(roomId);
if (!hasParticipants) throw errorRoomHasNoParticipants(roomId);
}
/**
* Retrieves the data required to delete a recording, including the file paths
* to be deleted and the recording's metadata information.
@ -498,10 +505,8 @@ export class RecordingService {
protected async handleRecordingLockTimeout(
recordingId: string,
roomId: string,
listener: (info: Record<string, unknown>) => void,
rejectRequest: (reason?: unknown) => void
) {
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, listener);
this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`);
let shouldReleaseLock = false;
@ -537,7 +542,7 @@ export class RecordingService {
} finally {
if (shouldReleaseLock) {
try {
await this.releaseRoomRecordingActiveLock(roomId);
await this.releaseRecordingLockIfNoEgress(roomId);
this.logger.debug(`Recording active lock released for room ${roomId}.`);
} catch (releaseError) {
this.logger.error(`Error releasing active recording lock for room ${roomId}: ${releaseError}`);
@ -621,20 +626,6 @@ export class RecordingService {
await this.s3Service.saveObject(metadataPath, recordingInfo);
}
protected registerRecordingTimeout(
roomId: string,
recordingId: string,
eventListener: (info: Record<string, unknown>) => void,
reject: (reason?: unknown) => void
): void {
this.taskSchedulerService.registerTask({
name: `${roomId}_recording_timeout`,
type: 'timeout',
scheduleOrDelay: INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT,
callback: this.handleRecordingLockTimeout.bind(this, recordingId, roomId, eventListener, reject)
});
}
/**
* Cleans up orphaned recording locks in the system.
*

View File

@ -127,6 +127,7 @@ export class TaskSchedulerService {
}
this.scheduledTasks.delete(name);
this.taskRegistry = this.taskRegistry.filter((task) => task.name !== name);
this.logger.debug(`Task "${name}" cancelled.`);
}
}

View File

@ -207,7 +207,7 @@ export const runReleaseActiveRecordingLock = async (roomId: string) => {
}
const recordingService = container.get(RecordingService);
await recordingService.releaseRoomRecordingActiveLock(roomId);
await recordingService.releaseRecordingLockIfNoEgress(roomId);
};
/**