backend: fixed recordiing timeout handling for returning expected error
This commit is contained in:
parent
fd4c035fc0
commit
0d19028b46
@ -5,7 +5,7 @@ import ms from 'ms';
|
|||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
import { uid } from 'uid';
|
import { uid } from 'uid';
|
||||||
import INTERNAL_CONFIG from '../config/internal-config.js';
|
import INTERNAL_CONFIG from '../config/internal-config.js';
|
||||||
import { MEET_S3_BUCKET, MEET_S3_SUBBUCKET } from '../environment.js';
|
import { MEET_S3_SUBBUCKET } from '../environment.js';
|
||||||
import { MeetLock, OpenViduComponentsAdapterHelper, RecordingHelper, UtilsHelper } from '../helpers/index.js';
|
import { MeetLock, OpenViduComponentsAdapterHelper, RecordingHelper, UtilsHelper } from '../helpers/index.js';
|
||||||
import {
|
import {
|
||||||
errorRecordingAlreadyStarted,
|
errorRecordingAlreadyStarted,
|
||||||
@ -62,6 +62,7 @@ export class RecordingService {
|
|||||||
let eventListener!: (info: Record<string, unknown>) => void;
|
let eventListener!: (info: Record<string, unknown>) => void;
|
||||||
let recordingId = '';
|
let recordingId = '';
|
||||||
let timeoutId: NodeJS.Timeout | undefined;
|
let timeoutId: NodeJS.Timeout | undefined;
|
||||||
|
let isOperationCompleted = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Attempt to acquire lock. If the lock is not acquired, the recording is already active.
|
// Attempt to acquire lock. If the lock is not acquired, the recording is already active.
|
||||||
@ -69,32 +70,28 @@ export class RecordingService {
|
|||||||
|
|
||||||
if (!acquiredLock) throw errorRecordingAlreadyStarted(roomId);
|
if (!acquiredLock) throw errorRecordingAlreadyStarted(roomId);
|
||||||
|
|
||||||
const room = await this.roomService.getMeetRoom(roomId);
|
await this.validateRoomForStartRecording(roomId);
|
||||||
|
|
||||||
if (!room) throw errorRoomNotFound(roomId);
|
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||||
|
|
||||||
//TODO: Check if the room has participants before starting the recording
|
|
||||||
//room.numParticipants === 0 ? throw errorNoParticipants(roomId);
|
|
||||||
const lkRoom = await this.livekitService.getRoom(roomId);
|
|
||||||
|
|
||||||
if (!lkRoom) throw errorRoomNotFound(roomId);
|
|
||||||
|
|
||||||
const hasParticipants = await this.livekitService.roomHasParticipants(roomId);
|
|
||||||
|
|
||||||
if (!hasParticipants) throw errorRoomHasNoParticipants(roomId);
|
|
||||||
|
|
||||||
const startTimeoutPromise = new Promise<never>((_, reject) => {
|
|
||||||
timeoutId = setTimeout(() => {
|
timeoutId = setTimeout(() => {
|
||||||
|
if (isOperationCompleted) return;
|
||||||
|
|
||||||
|
isOperationCompleted = true;
|
||||||
|
|
||||||
|
//Clean up the event listener and timeout
|
||||||
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
|
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
|
||||||
this.handleRecordingLockTimeout(recordingId, roomId, reject);
|
this.handleRecordingLockTimeout(recordingId, roomId).catch(() => {});
|
||||||
|
reject(errorRecordingStartTimeout(roomId));
|
||||||
}, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT));
|
}, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT));
|
||||||
});
|
});
|
||||||
|
|
||||||
const eventReceivedPromise = new Promise<MeetRecordingInfo>((resolve) => {
|
const activeEgressEventPromise = new Promise<MeetRecordingInfo>((resolve) => {
|
||||||
eventListener = (info: Record<string, unknown>) => {
|
eventListener = (info: Record<string, unknown>) => {
|
||||||
// Process the event only if it belongs to the current room.
|
// Process the event only if it belongs to the current room.
|
||||||
// Each room has only ONE active recording at the same time
|
// Each room has only ONE active recording at the same time
|
||||||
if (info?.roomId !== roomId) return;
|
if (info?.roomId !== roomId || isOperationCompleted) return;
|
||||||
|
|
||||||
|
isOperationCompleted = true;
|
||||||
|
|
||||||
clearTimeout(timeoutId);
|
clearTimeout(timeoutId);
|
||||||
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
|
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
|
||||||
@ -104,22 +101,52 @@ export class RecordingService {
|
|||||||
this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener);
|
this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener);
|
||||||
});
|
});
|
||||||
|
|
||||||
const options = this.generateCompositeOptionsFromRequest();
|
const startRecordingPromise = (async (): Promise<MeetRecordingInfo> => {
|
||||||
const output = this.generateFileOutputFromRequest(roomId);
|
try {
|
||||||
const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options);
|
const options = this.generateCompositeOptionsFromRequest();
|
||||||
const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
|
const output = this.generateFileOutputFromRequest(roomId);
|
||||||
recordingId = recordingInfo.recordingId;
|
const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options);
|
||||||
|
|
||||||
if (recordingInfo.status === MeetRecordingStatus.ACTIVE) {
|
// Check if operation was completed while we were waiting
|
||||||
clearTimeout(timeoutId);
|
if (isOperationCompleted) {
|
||||||
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
|
this.logger.warn(`startRoomComposite completed after timeout for room ${roomId}`);
|
||||||
return recordingInfo;
|
throw errorRecordingStartTimeout(roomId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return await Promise.race([eventReceivedPromise, startTimeoutPromise]);
|
const recordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
|
||||||
|
recordingId = recordingInfo.recordingId;
|
||||||
|
|
||||||
|
// If the recording is already active, we can resolve the promise immediately.
|
||||||
|
if (recordingInfo.status === MeetRecordingStatus.ACTIVE) {
|
||||||
|
if (!isOperationCompleted) {
|
||||||
|
isOperationCompleted = true;
|
||||||
|
clearTimeout(timeoutId);
|
||||||
|
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
|
||||||
|
return recordingInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for RECORDING_ACTIVE event
|
||||||
|
return await activeEgressEventPromise;
|
||||||
|
} catch (error) {
|
||||||
|
if (isOperationCompleted) {
|
||||||
|
this.logger.warn(`startRoomComposite failed after timeout: ${error}`);
|
||||||
|
throw errorRecordingStartTimeout(roomId);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
// Prevent UnhandledPromiseRejection from late failures
|
||||||
|
startRecordingPromise.catch((error) => {
|
||||||
|
if (!isOperationCompleted) {
|
||||||
|
this.logger.error(`Unhandled error in startRecordingPromise: ${error}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return await Promise.race([startRecordingPromise, timeoutPromise]);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(`Error starting recording in room '${roomId}': ${error}`);
|
this.logger.error(`Error starting recording in room '${roomId}': ${error}`);
|
||||||
|
|
||||||
throw error;
|
throw error;
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
@ -443,6 +470,22 @@ export class RecordingService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected async validateRoomForStartRecording(roomId: string): Promise<void> {
|
||||||
|
const room = await this.roomService.getMeetRoom(roomId);
|
||||||
|
|
||||||
|
if (!room) throw errorRoomNotFound(roomId);
|
||||||
|
|
||||||
|
//TODO: Check if the room has participants before starting the recording
|
||||||
|
//room.numParticipants === 0 ? throw errorNoParticipants(roomId);
|
||||||
|
const lkRoom = await this.livekitService.getRoom(roomId);
|
||||||
|
|
||||||
|
if (!lkRoom) throw errorRoomNotFound(roomId);
|
||||||
|
|
||||||
|
const hasParticipants = await this.livekitService.roomHasParticipants(roomId);
|
||||||
|
|
||||||
|
if (!hasParticipants) throw errorRoomHasNoParticipants(roomId);
|
||||||
|
}
|
||||||
|
|
||||||
protected async getFullStreamResponse(
|
protected async getFullStreamResponse(
|
||||||
recordingPath: string,
|
recordingPath: string,
|
||||||
fileSize: number
|
fileSize: number
|
||||||
@ -461,7 +504,7 @@ export class RecordingService {
|
|||||||
*
|
*
|
||||||
* @param roomId - The name of the room to acquire the lock for.
|
* @param roomId - The name of the room to acquire the lock for.
|
||||||
*/
|
*/
|
||||||
async acquireRoomRecordingActiveLock(roomId: string): Promise<RedisLock | null> {
|
protected async acquireRoomRecordingActiveLock(roomId: string): Promise<RedisLock | null> {
|
||||||
const lockName = MeetLock.getRecordingActiveLock(roomId);
|
const lockName = MeetLock.getRecordingActiveLock(roomId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -601,20 +644,23 @@ export class RecordingService {
|
|||||||
* @param recordingId
|
* @param recordingId
|
||||||
* @param roomId
|
* @param roomId
|
||||||
*/
|
*/
|
||||||
protected async handleRecordingLockTimeout(
|
protected async handleRecordingLockTimeout(recordingId: string, roomId: string) {
|
||||||
recordingId: string,
|
|
||||||
roomId: string,
|
|
||||||
rejectRequest: (reason?: unknown) => void
|
|
||||||
) {
|
|
||||||
this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`);
|
this.logger.debug(`Recording cleanup timer triggered for room '${roomId}'.`);
|
||||||
|
|
||||||
let shouldReleaseLock = false;
|
let shouldReleaseLock = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.updateRecordingStatus(recordingId, MeetRecordingStatus.FAILED);
|
if (!recordingId || recordingId.trim() === '') {
|
||||||
await this.stopRecording(recordingId);
|
this.logger.warn(
|
||||||
// The recording was stopped successfully
|
`Timeout triggered but recordingId is empty for room '${roomId}'. Recording likely failed to start.`
|
||||||
// the cleanup timer will be cancelled when the egress_ended event is received.
|
);
|
||||||
|
shouldReleaseLock = true;
|
||||||
|
} else {
|
||||||
|
await this.updateRecordingStatus(recordingId, MeetRecordingStatus.FAILED);
|
||||||
|
await this.stopRecording(recordingId);
|
||||||
|
// The recording was stopped successfully
|
||||||
|
// the cleanup timer will be cancelled when the egress_ended event is received.
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error instanceof OpenViduMeetError) {
|
if (error instanceof OpenViduMeetError) {
|
||||||
// The recording is already stopped or not found in LiveKit.
|
// The recording is already stopped or not found in LiveKit.
|
||||||
@ -647,9 +693,6 @@ export class RecordingService {
|
|||||||
this.logger.error(`Error releasing active recording lock for room ${roomId}: ${releaseError}`);
|
this.logger.error(`Error releasing active recording lock for room ${roomId}: ${releaseError}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reject the REST request with a timeout error.
|
|
||||||
rejectRequest(errorRecordingStartTimeout(roomId));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user