backend: enhance recording signal handling in LivekitWebhookService and RecordingService

This commit is contained in:
Carlos Santos 2025-07-29 19:12:06 +02:00
parent 3c64e7a9f5
commit 2b9f8dd65e
3 changed files with 48 additions and 13 deletions

View File

@ -60,7 +60,8 @@ export class OpenViduComponentsAdapterHelper {
endedAt: info.endDate, endedAt: info.endDate,
duration: info.duration, duration: info.duration,
size: info.size, size: info.size,
location: undefined location: undefined,
error: info.error,
}; };
} }

View File

@ -235,10 +235,7 @@ export class LivekitWebhookService {
this.logger.debug(`Recording '${recordingId}' in room '${roomId}' status: '${status}'`); this.logger.debug(`Recording '${recordingId}' in room '${roomId}' status: '${status}'`);
// Common tasks for all webhook types // Common tasks for all webhook types
const commonTasks = [ const commonTasks = [this.storageService.saveRecordingMetadata(recordingInfo)];
this.storageService.saveRecordingMetadata(recordingInfo),
this.frontendEventService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo)
];
const specificTasks: Promise<unknown>[] = []; const specificTasks: Promise<unknown>[] = [];
@ -264,9 +261,16 @@ export class LivekitWebhookService {
); );
} }
specificTasks.push(
this.frontendEventService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo)
);
break; break;
case 'ended': case 'ended':
specificTasks.push(this.recordingService.releaseRecordingLockIfNoEgress(roomId)); specificTasks.push(
this.recordingService.releaseRecordingLockIfNoEgress(roomId),
this.frontendEventService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo)
);
this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo); this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo);
break; break;
} }

View File

@ -24,6 +24,7 @@ import {
} from '../models/index.js'; } from '../models/index.js';
import { import {
DistributedEventService, DistributedEventService,
FrontendEventService,
IScheduledTask, IScheduledTask,
LiveKitService, LiveKitService,
LoggerService, LoggerService,
@ -43,6 +44,7 @@ export class RecordingService {
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService, @inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService,
@inject(DistributedEventService) protected systemEventService: DistributedEventService, @inject(DistributedEventService) protected systemEventService: DistributedEventService,
@inject(MeetStorageService) protected storageService: MeetStorageService, @inject(MeetStorageService) protected storageService: MeetStorageService,
@inject(FrontendEventService) protected frontendEventService: FrontendEventService,
@inject(LoggerService) protected logger: LoggerService @inject(LoggerService) protected logger: LoggerService
) { ) {
// Register the recording garbage collector task // Register the recording garbage collector task
@ -70,6 +72,14 @@ export class RecordingService {
await this.validateRoomForStartRecording(roomId); await this.validateRoomForStartRecording(roomId);
// Manually send the recording signal to OpenVidu Components for avoiding missing event if timeout occurs
// and the egress_started webhook is not received.
await this.frontendEventService.sendRecordingSignalToOpenViduComponents(roomId, {
recordingId: '',
roomId,
status: MeetRecordingStatus.STARTING
});
const timeoutPromise = new Promise<never>((_, reject) => { const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => { timeoutId = setTimeout(() => {
if (isOperationCompleted) return; if (isOperationCompleted) return;
@ -78,7 +88,7 @@ export class RecordingService {
//Clean up the event listener and timeout //Clean up the event listener and timeout
this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener); this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener);
this.handleRecordingLockTimeout(recordingId, roomId).catch(() => {}); this.handleRecordingTimeout(recordingId, roomId).catch(() => {});
reject(errorRecordingStartTimeout(roomId)); reject(errorRecordingStartTimeout(roomId));
}, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT)); }, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT));
}); });
@ -178,7 +188,7 @@ export class RecordingService {
break; break;
case EgressStatus.EGRESS_STARTING: case EgressStatus.EGRESS_STARTING:
// Avoid pending egress after timeout, stop it immediately // Avoid pending egress after timeout, stop it immediately
await this.livekitService.stopEgress(egressId) await this.livekitService.stopEgress(egressId);
// The recording is still starting, it cannot be stopped yet. // The recording is still starting, it cannot be stopped yet.
throw errorRecordingCannotBeStoppedWhileStarting(recordingId); throw errorRecordingCannotBeStoppedWhileStarting(recordingId);
default: default:
@ -545,13 +555,23 @@ export class RecordingService {
} }
/** /**
* Callback function to release the active recording lock after a timeout. * Handles the timeout event for a recording session in a specific room.
* This function is scheduled by the recording cleanup timer when a recording is started.
* *
* @param recordingId * This method is triggered when a recording cleanup timer fires, indicating that a recording
* @param roomId * has either failed to start or has not been stopped within the expected timeframe.
* It attempts to update the recording status to `FAILED` and stop the recording if necessary.
*
* If the recording is already stopped, not found, or cannot be stopped because it is still starting,
* the method logs the appropriate message and determines whether to release the active recording lock.
*
* Regardless of the outcome, if the lock should be released, it attempts to release the recording lock
* for the room to allow further recordings.
*
* @param recordingId - The unique identifier of the recording session.
* @param roomId - The unique identifier of the room associated with the recording.
* @returns A promise that resolves when the timeout handling is complete.
*/ */
protected async handleRecordingLockTimeout(recordingId: string, roomId: string) { protected async handleRecordingTimeout(recordingId: string, roomId: string) {
this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`); this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`);
let shouldReleaseLock = false; let shouldReleaseLock = false;
@ -562,6 +582,16 @@ export class RecordingService {
`Timeout triggered but recordingId is empty for room '${roomId}'. Recording likely failed to start.` `Timeout triggered but recordingId is empty for room '${roomId}'. Recording likely failed to start.`
); );
shouldReleaseLock = true; shouldReleaseLock = true;
const recordingInfo: MeetRecordingInfo = {
recordingId,
roomId,
status: MeetRecordingStatus.FAILED,
error: `No egress service was able to register a request. Check your CPU usage or if there's any Media Node with enough CPU. Remember that by default, composite recording uses 4 CPUs for each room.`
};
// Manually send the recording FAILED signal to OpenVidu Components for avoiding missing event
// because of the egress_ended or egress_failed webhook is not received.
await this.frontendEventService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo);
} else { } else {
await this.updateRecordingStatus(recordingId, MeetRecordingStatus.FAILED); await this.updateRecordingStatus(recordingId, MeetRecordingStatus.FAILED);
await this.stopRecording(recordingId); await this.stopRecording(recordingId);