backend: Remove deprecated room and signal models; enhance room service with new status signaling

This commit is contained in:
Carlos Santos 2025-03-19 19:31:26 +01:00
parent c05d9390f9
commit baec69c3db
14 changed files with 548 additions and 343 deletions

View File

@ -27,6 +27,8 @@ export const lkWebhookHandler = async (req: Request, res: Response) => {
switch (eventType) {
case 'egress_started':
await lkWebhookService.handleEgressStarted(egressInfo!);
break;
case 'egress_updated':
await lkWebhookService.handleEgressUpdated(egressInfo!);
break;

View File

@ -7,17 +7,13 @@ import { container } from '../config/dependency-injector.config.js';
export const startRecording = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const roomName = req.body.roomName;
if (!roomName) {
return res.status(400).json({ name: 'Recording Error', message: 'Room name is required for this operation' });
}
const { roomId } = req.body;
try {
logger.info(`Starting recording in ${roomName}`);
logger.info(`Starting recording in ${roomId}`);
const recordingService = container.get(RecordingService);
const recordingInfo = await recordingService.startRecording(roomName);
const recordingInfo = await recordingService.startRecording(roomId);
return res.status(200).json(recordingInfo);
} catch (error) {
if (error instanceof OpenViduMeetError) {
@ -29,16 +25,77 @@ export const startRecording = async (req: Request, res: Response) => {
}
};
export const getRecordings = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const recordingService = container.get(RecordingService);
try {
logger.info('Getting all recordings');
const { status, page, limit } = req.query;
// const continuationToken = req.query.continuationToken as string;
const response = await recordingService.getAllRecordings();
return res
.status(200)
.json({ recordings: response.recordingInfo, continuationToken: response.continuationToken });
} catch (error) {
if (error instanceof OpenViduMeetError) {
logger.error(`Error getting all recordings: ${error.message}`);
return res.status(error.statusCode).json({ name: error.name, message: error.message });
}
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error getting recordings' });
}
};
export const bulkDeleteRecordings = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
try {
const recordingIds = req.body.recordingIds;
logger.info(`Deleting recordings: ${recordingIds}`);
const recordingService = container.get(RecordingService);
// TODO: Check role to determine if the request is from an admin or a participant
const role = req.body.payload.metadata.role;
await recordingService.bulkDeleteRecordings(recordingIds, role);
return res.status(204).json();
} catch (error) {
if (error instanceof OpenViduMeetError) {
logger.error(`Error deleting recordings: ${error.message}`);
return res.status(error.statusCode).json({ name: error.name, message: error.message });
}
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error deleting recordings' });
}
};
export const getRecording = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
try {
const recordingId = req.params.recordingId;
logger.info(`Getting recording ${recordingId}`);
const recordingService = container.get(RecordingService);
const recordingInfo = await recordingService.getRecording(recordingId);
return res.status(200).json(recordingInfo);
} catch (error) {
if (error instanceof OpenViduMeetError) {
logger.error(`Error getting recording: ${error.message}`);
return res.status(error.statusCode).json({ name: error.name, message: error.message });
}
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error getting recording' });
}
};
export const stopRecording = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const recordingId = req.params.recordingId;
if (!recordingId) {
return res
.status(400)
.json({ name: 'Recording Error', message: 'Recording ID is required for this operation' });
}
try {
logger.info(`Stopping recording ${recordingId}`);
const recordingService = container.get(RecordingService);
@ -55,44 +112,36 @@ export const stopRecording = async (req: Request, res: Response) => {
}
};
/**
* Endpoint only available for the admin user
* !WARNING: This will be removed in future versions
*/
export const getAllRecordings = async (req: Request, res: Response) => {
export const deleteRecording = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const recordingId = req.params.recordingId;
try {
logger.info('Getting all recordings');
logger.info(`Deleting recording ${recordingId}`);
const recordingService = container.get(RecordingService);
// const continuationToken = req.query.continuationToken as string;
const response = await recordingService.getAllRecordings();
return res
.status(200)
.json({ recordings: response.recordingInfo, continuationToken: response.continuationToken });
// TODO: Check role to determine if the request is from an admin or a participant
const role = req.body.payload.metadata.role;
const recordingInfo = await recordingService.deleteRecording(recordingId, role);
return res.status(204).json(recordingInfo);
} catch (error) {
if (error instanceof OpenViduMeetError) {
logger.error(`Error getting all recordings: ${error.message}`);
logger.error(`Error deleting recording: ${error.message}`);
return res.status(error.statusCode).json({ name: error.name, message: error.message });
}
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error getting recordings' });
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error deleting recording' });
}
};
// Internal Recording methods
export const streamRecording = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const recordingId = req.params.recordingId;
const range = req.headers.range;
if (!recordingId) {
return res
.status(400)
.json({ name: 'Recording Error', message: 'Recording ID is required for this operation' });
}
try {
logger.info(`Streaming recording ${recordingId}`);
const recordingService = container.get(RecordingService);
@ -132,31 +181,3 @@ export const streamRecording = async (req: Request, res: Response) => {
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error streaming recording' });
}
};
export const deleteRecording = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const recordingId = req.params.recordingId;
if (!recordingId) {
return res
.status(400)
.json({ name: 'Recording Error', message: 'Recording ID is required for this operation' });
}
try {
logger.info(`Deleting recording ${recordingId}`);
const recordingService = container.get(RecordingService);
const isRequestedByAdmin = req.url.includes('admin');
const recordingInfo = await recordingService.deleteRecording(recordingId, isRequestedByAdmin);
return res.status(204).json(recordingInfo);
} catch (error) {
if (error instanceof OpenViduMeetError) {
logger.error(`Error deleting recording: ${error.message}`);
return res.status(error.statusCode).json({ name: error.name, message: error.message });
}
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error deleting recording' });
}
};

View File

@ -1 +1,3 @@
export * from './recording.helper.js';
export * from './recording.helper.js';
export * from './ov-components-adapter.helper.js';
export * from './room.helper.js';

View File

@ -0,0 +1,96 @@
import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce';
import { SendDataOptions } from 'livekit-server-sdk';
const enum OpenViduComponentsDataTopic {
CHAT = 'chat',
RECORDING_STARTING = 'recordingStarting',
RECORDING_STARTED = 'recordingStarted',
RECORDING_STOPPING = 'recordingStopping',
RECORDING_STOPPED = 'recordingStopped',
RECORDING_DELETED = 'recordingDeleted',
RECORDING_FAILED = 'recordingFailed',
ROOM_STATUS = 'roomStatus'
}
export class OpenViduComponentsAdapterHelper {
static generateRecordingSignal(recordingInfo: MeetRecordingInfo) {
const options: SendDataOptions = {
destinationSids: [],
topic: OpenViduComponentsAdapterHelper.generateDataTopic(recordingInfo)
};
const payload = OpenViduComponentsAdapterHelper.parseRecordingInfoToOpenViduComponents(recordingInfo);
return { payload, options };
}
static generateRoomStatusSignal(isRecordingStarted: boolean, participantSid?: string) {
const payload = {
isRecordingStarted
};
const options = {
topic: OpenViduComponentsDataTopic.ROOM_STATUS,
destinationSids: participantSid ? [participantSid] : []
};
return {
payload,
options
};
}
private static parseRecordingInfoToOpenViduComponents(info: MeetRecordingInfo) {
return {
id: info.recordingId,
roomName: info.details ?? '',
roomId: info.roomId,
outputMode: info.outputMode,
status: this.mapRecordingStatus(info.status),
filename: info.filename,
startedAt: info.startDate,
endedAt: info.endDate,
duration: info.duration,
size: info.size,
location: undefined
};
}
private static generateDataTopic(info: MeetRecordingInfo) {
switch (info.status) {
case MeetRecordingStatus.STARTING:
return OpenViduComponentsDataTopic.RECORDING_STARTING;
case MeetRecordingStatus.ACTIVE:
return OpenViduComponentsDataTopic.RECORDING_STARTED;
case MeetRecordingStatus.ENDING:
return OpenViduComponentsDataTopic.RECORDING_STOPPING;
case MeetRecordingStatus.COMPLETE:
return OpenViduComponentsDataTopic.RECORDING_STOPPED;
case MeetRecordingStatus.FAILED:
case MeetRecordingStatus.ABORTED:
return OpenViduComponentsDataTopic.RECORDING_FAILED;
case MeetRecordingStatus.LIMITED_REACHED:
return OpenViduComponentsDataTopic.RECORDING_STOPPED;
default:
return OpenViduComponentsDataTopic.RECORDING_FAILED;
}
}
private static mapRecordingStatus(status: MeetRecordingStatus) {
switch (status) {
case MeetRecordingStatus.STARTING:
return 'STARTING';
case MeetRecordingStatus.ACTIVE:
return 'STARTED';
case MeetRecordingStatus.ENDING:
return 'STOPPING';
case MeetRecordingStatus.COMPLETE:
return 'READY';
case MeetRecordingStatus.FAILED:
case MeetRecordingStatus.ABORTED:
return 'FAILED';
case MeetRecordingStatus.LIMITED_REACHED:
return 'READY';
default:
return 'FAILED';
}
}
}

View File

@ -1,28 +1,30 @@
import { EgressInfo } from 'livekit-server-sdk';
import { RecordingInfo, RecordingOutputMode, RecordingStatus } from '@typings-ce';
import { MeetRecordingInfo, MeetRecordingOutputMode, MeetRecordingStatus } from '@typings-ce';
import { EgressStatus } from '@livekit/protocol';
import { DataTopic } from '../models/signal.model.js';
export class RecordingHelper {
static toRecordingInfo(egressInfo: EgressInfo): RecordingInfo {
static toRecordingInfo(egressInfo: EgressInfo): MeetRecordingInfo {
const status = RecordingHelper.extractOpenViduStatus(egressInfo.status);
const size = RecordingHelper.extractSize(egressInfo);
const outputMode = RecordingHelper.extractOutputMode(egressInfo);
const duration = RecordingHelper.extractDuration(egressInfo);
const startedAt = RecordingHelper.extractCreatedAt(egressInfo);
const endTimeInMilliseconds = RecordingHelper.extractEndedAt(egressInfo);
const startDateMs = RecordingHelper.extractStartDate(egressInfo);
const endDateMs = RecordingHelper.extractEndDate(egressInfo);
const filename = RecordingHelper.extractFilename(egressInfo);
const { egressId, roomName, errorCode, error, details } = egressInfo;
return {
id: egressInfo.egressId,
roomName: egressInfo.roomName,
roomId: egressInfo.roomId,
recordingId: egressId,
roomId: roomName,
outputMode,
status,
filename,
creationDate: startedAt,
endDate: endTimeInMilliseconds,
startDate: startDateMs,
endDate: endDateMs,
duration,
size
size,
errorCode,
error,
details: details
};
}
@ -36,40 +38,24 @@ export class RecordingHelper {
return fileResults.length > 0 && streamResults.length === 0;
}
static extractOpenViduStatus(status: EgressStatus | undefined): RecordingStatus {
static extractOpenViduStatus(status: EgressStatus | undefined): MeetRecordingStatus {
switch (status) {
case EgressStatus.EGRESS_STARTING:
return RecordingStatus.STARTING;
return MeetRecordingStatus.STARTING;
case EgressStatus.EGRESS_ACTIVE:
return RecordingStatus.STARTED;
return MeetRecordingStatus.ACTIVE;
case EgressStatus.EGRESS_ENDING:
return RecordingStatus.STOPPED;
return MeetRecordingStatus.ENDING;
case EgressStatus.EGRESS_COMPLETE:
return RecordingStatus.READY;
return MeetRecordingStatus.COMPLETE;
case EgressStatus.EGRESS_FAILED:
return MeetRecordingStatus.FAILED;
case EgressStatus.EGRESS_ABORTED:
return MeetRecordingStatus.ABORTED;
case EgressStatus.EGRESS_LIMIT_REACHED:
return RecordingStatus.FAILED;
return MeetRecordingStatus.LIMITED_REACHED;
default:
return RecordingStatus.FAILED;
}
}
static getDataTopicFromStatus(egressInfo: EgressInfo): DataTopic {
const status = RecordingHelper.extractOpenViduStatus(egressInfo.status);
switch (status) {
case RecordingStatus.STARTING:
return DataTopic.RECORDING_STARTING;
case RecordingStatus.STARTED:
return DataTopic.RECORDING_STARTED;
case RecordingStatus.STOPPED:
case RecordingStatus.READY:
return DataTopic.RECORDING_STOPPED;
case RecordingStatus.FAILED:
return DataTopic.RECORDING_FAILED;
default:
return DataTopic.RECORDING_FAILED;
return MeetRecordingStatus.FAILED;
}
}
@ -81,33 +67,30 @@ export class RecordingHelper {
* @param egressInfo - The egress information containing the roomComposite flag.
* @returns The extracted OpenVidu output mode.
*/
static extractOutputMode(egressInfo: EgressInfo): RecordingOutputMode {
if (egressInfo.request.case === 'roomComposite') {
return RecordingOutputMode.COMPOSED;
} else {
return RecordingOutputMode.INDIVIDUAL;
}
static extractOutputMode(egressInfo: EgressInfo): MeetRecordingOutputMode {
// if (egressInfo.request.case === 'roomComposite') {
// return MeetRecordingOutputMode.COMPOSED;
// } else {
// return MeetRecordingOutputMode.INDIVIDUAL;
// }
return MeetRecordingOutputMode.COMPOSED;
}
static extractFilename(recordingInfo: RecordingInfo): string | undefined;
static extractFilename(recordingInfo: MeetRecordingInfo): string | undefined;
static extractFilename(egressInfo: EgressInfo): string | undefined;
static extractFilename(info: RecordingInfo | EgressInfo): string | undefined {
static extractFilename(info: MeetRecordingInfo | EgressInfo): string | undefined {
if (!info) return undefined;
if ('request' in info) {
// EgressInfo
return info.fileResults?.[0]?.filename.split('/').pop();
} else {
// RecordingInfo
const { roomName, filename, roomId } = info;
// MeetRecordingInfo
const { filename, roomId } = info;
if (!filename) {
return undefined;
}
return roomName ? `${roomName}-${roomId}/${filename}` : filename;
return `${roomId}/${filename}`;
}
}
@ -128,7 +111,7 @@ export class RecordingHelper {
* @param egressInfo - The EgressInfo object containing the endedAt value.
* @returns The endedAt value converted to milliseconds.
*/
static extractEndedAt(egressInfo: EgressInfo): number {
static extractEndDate(egressInfo: EgressInfo): number {
return this.toMilliseconds(Number(egressInfo.endedAt ?? 0));
}
@ -138,7 +121,7 @@ export class RecordingHelper {
* @param egressInfo The EgressInfo object from which to extract the creation timestamp.
* @returns The creation timestamp in milliseconds.
*/
static extractCreatedAt(egressInfo: EgressInfo): number {
static extractStartDate(egressInfo: EgressInfo): number {
const { startedAt, updatedAt } = egressInfo;
const createdAt = startedAt && Number(startedAt) !== 0 ? startedAt : (updatedAt ?? 0);
return this.toMilliseconds(Number(createdAt));

View File

@ -1,4 +1,2 @@
export * from './room.model.js';
export * from './error.model.js';
export * from './signal.model.js';
export * from './redis.model.js';

View File

@ -1,6 +0,0 @@
import { RecordingInfo } from '@typings-ce';
export interface RoomStatusData {
isRecordingStarted: boolean;
recordingList: RecordingInfo[];
}

View File

@ -1,10 +0,0 @@
export enum DataTopic {
CHAT = 'chat',
RECORDING_STARTING = 'recordingStarting',
RECORDING_STARTED = 'recordingStarted',
RECORDING_STOPPING = 'recordingStopping',
RECORDING_STOPPED = 'recordingStopped',
RECORDING_DELETED = 'recordingDeleted',
RECORDING_FAILED = 'recordingFailed',
ROOM_STATUS = 'roomStatus'
}

View File

@ -1,28 +1,27 @@
import { inject, injectable } from '../config/dependency-injector.config.js';
import { EgressInfo, ParticipantInfo, Room, SendDataOptions, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk';
import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk';
import { RecordingHelper } from '../helpers/recording.helper.js';
import { DataTopic } from '../models/signal.model.js';
import { LiveKitService } from './livekit.service.js';
import { RecordingInfo, RecordingStatus } from '@typings-ce';
import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, MEET_NAME_ID } from '../environment.js';
import { MeetRecordingInfo } from '@typings-ce';
import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, MEET_NAME_ID, MEET_S3_RECORDINGS_PREFIX } from '../environment.js';
import { LoggerService } from './logger.service.js';
import { RoomService } from './room.service.js';
import { S3Service } from './s3.service.js';
import { RoomStatusData } from '../models/room.model.js';
import { RecordingService } from './recording.service.js';
import { OpenViduWebhookService } from './openvidu-webhook.service.js';
import { MutexService } from './mutex.service.js';
@injectable()
export class LivekitWebhookService {
private webhookReceiver: WebhookReceiver;
protected webhookReceiver: WebhookReceiver;
constructor(
@inject(S3Service) protected s3Service: S3Service,
@inject(RecordingService) protected recordingService: RecordingService,
@inject(LiveKitService) protected livekitService: LiveKitService,
@inject(RoomService) protected roomService: RoomService,
@inject(LoggerService) protected logger: LoggerService,
@inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService
@inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService,
@inject(MutexService) protected mutexService: MutexService,
@inject(LoggerService) protected logger: LoggerService
) {
this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
}
@ -80,35 +79,12 @@ export class LivekitWebhookService {
}
}
async handleEgressStarted(egressInfo: EgressInfo) {
await this.processRecordingEgress(egressInfo, 'started');
}
async handleEgressUpdated(egressInfo: EgressInfo) {
try {
const isRecording: boolean = RecordingHelper.isRecordingEgress(egressInfo);
if (!isRecording) return;
const { roomName } = egressInfo;
let recordingInfo: RecordingInfo | undefined = undefined;
this.logger.info(`Recording egress '${egressInfo.egressId}' updated: ${egressInfo.status}`);
const topic: DataTopic = RecordingHelper.getDataTopicFromStatus(egressInfo);
recordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
// Add recording metadata
const metadataPath = this.generateMetadataPath(recordingInfo);
const promises = [
this.s3Service.saveObject(metadataPath, recordingInfo),
this.roomService.sendSignal(roomName, recordingInfo, { topic })
];
if(recordingInfo.status === RecordingStatus.STARTED) {
promises.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo));
}
await Promise.all(promises);
} catch (error) {
this.logger.warn(`Error sending data on egress updated: ${error}`);
}
await this.processRecordingEgress(egressInfo, 'updated');
}
/**
@ -117,27 +93,7 @@ export class LivekitWebhookService {
* @param egressInfo - Information about the ended recording egress.
*/
async handleEgressEnded(egressInfo: EgressInfo) {
try {
const isRecording: boolean = RecordingHelper.isRecordingEgress(egressInfo);
if (!isRecording) return;
const { roomName } = egressInfo;
let payload: RecordingInfo | undefined = undefined;
const topic: DataTopic = DataTopic.RECORDING_STOPPED;
payload = RecordingHelper.toRecordingInfo(egressInfo);
// Update recording metadata
const metadataPath = this.generateMetadataPath(payload);
await Promise.all([
this.s3Service.saveObject(metadataPath, payload),
this.roomService.sendSignal(roomName, payload, { topic }),
this.openViduWebhookService.sendRecordingStoppedWebhook(payload)
]);
} catch (error) {
this.logger.warn(`Error sending data on egress ended: ${error}`);
}
await this.processRecordingEgress(egressInfo, 'ended');
}
/**
@ -149,12 +105,12 @@ export class LivekitWebhookService {
*/
async handleParticipantJoined(room: Room, participant: ParticipantInfo) {
try {
// Do not send status signal to egress participants
// Skip if the participant is an egress participant
if (this.livekitService.isEgressParticipant(participant)) {
return;
}
await this.sendStatusSignal(room.name, room.sid, participant.sid);
await this.roomService.sendRoomStatusSignalToOpenViduComponents(room.name, participant.sid);
} catch (error) {
this.logger.error(`Error sending data on participant joined: ${error}`);
}
@ -169,7 +125,7 @@ export class LivekitWebhookService {
* @param {Room} room - The room object that has finished.
* @returns {Promise<void>} A promise that resolves when the webhook has been sent.
*/
async handleRoomFinished(room: Room) {
async handleMeetingFinished(room: Room) {
try {
await this.openViduWebhookService.sendRoomFinishedWebhook(room);
} catch (error) {
@ -177,30 +133,66 @@ export class LivekitWebhookService {
}
}
/**
* Processes a recording egress event by updating metadata, sending webhook notifications,
* and performing necessary cleanup actions based on the webhook action type.
*
* @param egressInfo - The information about the egress event to process.
* @param webhookAction - The type of webhook action to handle. Can be 'started', 'updated', or 'ended'.
* @returns A promise that resolves when all processing tasks are completed.
*/
protected async processRecordingEgress(
egressInfo: EgressInfo,
webhookAction: 'started' | 'updated' | 'ended'
): Promise<void> {
if (!RecordingHelper.isRecordingEgress(egressInfo)) return;
private async sendStatusSignal(roomName: string, roomId: string, participantSid: string) {
// Get recording list
const recordingInfo = await this.recordingService.getAllRecordingsByRoom(roomName, roomId);
this.logger.debug(`Processing recording ${webhookAction} webhook.`);
// Check if recording is started in the room
const isRecordingStarted = recordingInfo.some((rec) => rec.status === RecordingStatus.STARTED);
const recordingInfo: MeetRecordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
const metadataPath = this.generateMetadataPath(recordingInfo);
const { roomId, recordingId, status } = recordingInfo;
// Construct the payload to send to the participant
const payload: RoomStatusData = {
isRecordingStarted,
recordingList: recordingInfo
};
const signalOptions: SendDataOptions = {
topic: DataTopic.ROOM_STATUS,
destinationSids: participantSid ? [participantSid] : []
};
await this.roomService.sendSignal(roomName, payload, signalOptions);
this.logger.debug(`Recording '${recordingId}' for room '${roomId}' is in status '${status}'`);
const promises: Promise<unknown>[] = [];
try {
// Update recording metadata
promises.push(
this.s3Service.saveObject(metadataPath, recordingInfo),
this.recordingService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo)
);
// Send webhook notification
switch (webhookAction) {
case 'started':
promises.push(this.openViduWebhookService.sendRecordingStartedWebhook(recordingInfo));
break;
case 'updated':
promises.push(this.openViduWebhookService.sendRecordingUpdatedWebhook(recordingInfo));
break;
case 'ended':
promises.push(
this.openViduWebhookService.sendRecordingEndedWebhook(recordingInfo),
this.recordingService.releaseRoomRecordingActiveLock(roomId)
);
break;
}
// Wait for all promises to resolve
await Promise.all(promises);
} catch (error) {
this.logger.warn(
`Error sending recording ${webhookAction} webhook for egress ${egressInfo.egressId}: ${error}`
);
}
}
private generateMetadataPath(payload: RecordingInfo): string {
const metadataFilename = `${payload.roomName}-${payload.roomId}`;
const recordingFilename = payload.filename?.split('.')[0];
const egressId = payload.id;
return `.metadata/${metadataFilename}/${recordingFilename}_${egressId}.json`;
protected generateMetadataPath(recordingInfo: MeetRecordingInfo): string {
const { roomId, recordingId } = recordingInfo;
// Remove file extension from filename
const recordingFilename = recordingInfo.filename?.split('.')[0];
return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${recordingFilename}-${recordingId}.json`;
}
}

View File

@ -96,7 +96,7 @@ export class LiveKitService {
try {
return await this.roomClient.getParticipant(roomName, participantName);
} catch (error) {
this.logger.error(`Error getting participant ${error}`);
this.logger.warn(`Participant ${participantName} not found in room ${roomName} ${error}`);
throw internalError(`Error getting participant: ${error}`);
}
}
@ -170,10 +170,50 @@ export class LiveKitService {
}
}
async getEgress(options: ListEgressOptions): Promise<EgressInfo[]> {
/**
* Retrieves a list of egress information based on the provided options.
*
* @param {ListEgressOptions} options - The options to filter the egress list.
* @returns {Promise<EgressInfo[]>} A promise that resolves to an array of EgressInfo objects.
* @throws Will throw an error if there is an issue retrieving the egress information.
*/
async getEgress(roomName?:string, egressId?: string): Promise<EgressInfo[]> {
try {
const options: ListEgressOptions = {
roomName,
egressId,
};
return await this.egressClient.listEgress(options);
} catch (error: any) {
if (error.message.includes('404')) {
return [];
}
this.logger.error(`Error getting egress: ${JSON.stringify(error)}`);
throw internalError(`Error getting egress: ${error}`);
}
}
/**
* Retrieves a list of active egress information based on the provided egress ID.
*
* @param egressId - The unique identifier of the egress to retrieve.
* @returns A promise that resolves to an array of `EgressInfo` objects representing the active egress.
* @throws Will throw an error if there is an issue retrieving the egress information.
*/
async getActiveEgress(roomName?: string, egressId?: string): Promise<EgressInfo[]> {
try {
const options: ListEgressOptions = {
roomName,
egressId,
active: true
};
return await this.egressClient.listEgress(options);
} catch (error: any) {
if (error.message.includes('404')) {
return [];
}
this.logger.error(`Error getting egress: ${JSON.stringify(error)}`);
throw internalError(`Error getting egress: ${error}`);
}

View File

@ -1,15 +1,22 @@
import Redlock, { Lock } from 'redlock';
import { RedisService } from './redis.service.js';
import { inject, injectable } from 'inversify';
import ms from 'ms';
import { LoggerService } from './logger.service.js';
export type RedisLock = Lock;
@injectable()
export class MutexService {
protected redlockWithoutRetry: Redlock;
protected locks: Map<string, Lock>;
protected readonly TTL_MS = 10_000;
protected LOCK_KEY_PREFIX = 'ov_meet_lock:'
protected readonly TTL_MS = ms('1m');
protected LOCK_KEY_PREFIX = 'ov_meet_lock:';
constructor(@inject(RedisService) protected redisService: RedisService) {
constructor(
@inject(RedisService) protected redisService: RedisService,
@inject(LoggerService) protected logger: LoggerService
) {
// Create a Redlock instance with no retry strategy
this.redlockWithoutRetry = this.redisService.createRedlock(0);
this.locks = new Map();
}
@ -21,13 +28,15 @@ export class MutexService {
* @returns A Promise that resolves to the acquired Lock object.
*/
async acquire(resource: string, ttl: number = this.TTL_MS): Promise<Lock | null> {
resource = this.LOCK_KEY_PREFIX + resource;
const key = this.LOCK_KEY_PREFIX + resource;
try {
const lock = await this.redlockWithoutRetry.acquire([resource], ttl);
this.locks.set(resource, lock);
this.logger.debug(`Acquiring lock for resource: ${resource}`);
const lock = await this.redlockWithoutRetry.acquire([key], ttl);
this.locks.set(key, lock);
return lock;
} catch (error) {
this.logger.error('Error acquiring lock:', error);
return null;
}
}
@ -39,12 +48,19 @@ export class MutexService {
* @returns A Promise that resolves when the lock is released.
*/
async release(resource: string): Promise<void> {
resource = this.LOCK_KEY_PREFIX + resource;
const lock = this.locks.get(resource);
const key = this.LOCK_KEY_PREFIX + resource;
const lock = this.locks.get(key);
if (lock) {
await lock.release();
this.locks.delete(resource);
this.logger.debug(`Releasing lock for resource: ${resource}`);
try {
await lock.release();
} catch (error) {
this.logger.error(`Error releasing lock for key ${key}:`, error);
} finally {
this.locks.delete(key);
}
}
}
}

View File

@ -3,56 +3,56 @@ import { inject, injectable } from '../config/dependency-injector.config.js';
import { Room } from 'livekit-server-sdk';
import { LoggerService } from './logger.service.js';
import { MEET_API_KEY, MEET_WEBHOOK_ENABLED, MEET_WEBHOOK_URL } from '../environment.js';
import { OpenViduWebhookEvent, OpenViduWebhookEventType, RecordingInfo } from '@typings-ce';
import { MeetWebhookEvent, MeetWebhookEventType, MeetRecordingInfo, MeetWebhookPayload } from '@typings-ce';
@injectable()
export class OpenViduWebhookService {
constructor(@inject(LoggerService) protected logger: LoggerService) {}
// TODO: Implement Room webhooks
async sendRoomFinishedWebhook(room: Room) {
const data: OpenViduWebhookEvent = {
event: OpenViduWebhookEventType.ROOM_FINISHED,
creationDate: Date.now(),
data: {
roomName: room.name
}
};
await this.sendWebhookEvent(data);
// try {
// await this.sendWebhookEvent(MeetWebhookEventType.ROOM_FINISHED, data);
// } catch (error) {
// this.logger.error(`Error sending room finished webhook: ${error}`);
// }
}
async sendRecordingStartedWebhook(recordingInfo: RecordingInfo) {
const data: OpenViduWebhookEvent = {
event: OpenViduWebhookEventType.RECORDING_STARTED,
creationDate: Date.now(),
data: {
recordingId: recordingInfo.id,
filename: recordingInfo.filename,
roomName: recordingInfo.roomName,
status: recordingInfo.status
}
};
await this.sendWebhookEvent(data);
async sendRecordingStartedWebhook(recordingInfo: MeetRecordingInfo) {
try {
await this.sendWebhookEvent(MeetWebhookEventType.RECORDING_STARTED, recordingInfo);
} catch (error) {
this.logger.error(`Error sending recording started webhook: ${error}`);
}
}
async sendRecordingStoppedWebhook(recordingInfo: RecordingInfo) {
const data: OpenViduWebhookEvent = {
event: OpenViduWebhookEventType.RECORDING_STOPPED,
creationDate: Date.now(),
data: {
recordingId: recordingInfo.id,
filename: recordingInfo.filename,
roomName: recordingInfo.roomName,
status: recordingInfo.status
}
};
await this.sendWebhookEvent(data);
async sendRecordingUpdatedWebhook(recordingInfo: MeetRecordingInfo) {
try {
await this.sendWebhookEvent(MeetWebhookEventType.RECORDING_UPDATED, recordingInfo);
} catch (error) {
this.logger.error(`Error sending recording updated webhook: ${error}`);
}
}
private async sendWebhookEvent(data: OpenViduWebhookEvent) {
async sendRecordingEndedWebhook(recordingInfo: MeetRecordingInfo) {
try {
await this.sendWebhookEvent(MeetWebhookEventType.RECORDING_ENDED, recordingInfo);
} catch (error) {
this.logger.error(`Error sending recording ended webhook: ${error}`);
}
}
private async sendWebhookEvent(event: MeetWebhookEventType, payload: MeetWebhookPayload) {
if (!this.isWebhookEnabled()) return;
const timestamp = data.creationDate;
const signature = this.generateWebhookSignature(timestamp, data);
const creationDate = Date.now();
const data: MeetWebhookEvent = {
event,
creationDate,
data: payload
};
const signature = this.generateWebhookSignature(creationDate, data);
this.logger.info(`Sending webhook event ${data.event}`);
@ -61,7 +61,7 @@ export class OpenViduWebhookService {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Timestamp': timestamp.toString(),
'X-Timestamp': creationDate.toString(),
'X-Signature': signature
},
body: JSON.stringify(data)

View File

@ -1,14 +1,8 @@
import {
EncodedFileOutput,
EncodedFileType,
ListEgressOptions,
RoomCompositeOptions,
SendDataOptions
} from 'livekit-server-sdk';
import { EncodedFileOutput, EncodedFileType, RoomCompositeOptions } from 'livekit-server-sdk';
import { uid } from 'uid';
import { Readable } from 'stream';
import { LiveKitService } from './livekit.service.js';
import {
OpenViduMeetError,
errorRecordingAlreadyStarted,
errorRecordingNotFound,
errorRecordingNotStopped,
@ -16,84 +10,68 @@ import {
internalError
} from '../models/error.model.js';
import { S3Service } from './s3.service.js';
import { DataTopic } from '../models/signal.model.js';
import { LoggerService } from './logger.service.js';
import { RecordingInfo, RecordingStatus } from '@typings-ce';
import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce';
import { RecordingHelper } from '../helpers/recording.helper.js';
import { MEET_S3_BUCKET } from '../environment.js';
import { MEET_S3_BUCKET, MEET_S3_RECORDINGS_PREFIX, MEET_S3_SUBBUCKET } from '../environment.js';
import { RoomService } from './room.service.js';
import { inject, injectable } from '../config/dependency-injector.config.js';
import { MutexService, RedisLock } from './mutex.service.js';
import { RedisLockName } from '../models/index.js';
import ms from 'ms';
import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js';
@injectable()
export class RecordingService {
protected readonly RECORDING_ACTIVE_LOCK_TTL = ms('6h');
constructor(
@inject(S3Service) protected s3Service: S3Service,
@inject(LiveKitService) protected livekitService: LiveKitService,
@inject(RoomService) protected roomService: RoomService,
@inject(MutexService) protected mutexService: MutexService,
@inject(LoggerService) protected logger: LoggerService
) {}
async startRecording(roomName: string): Promise<RecordingInfo> {
async startRecording(roomName: string): Promise<MeetRecordingInfo> {
let acquiredLock: RedisLock | null = null;
try {
const egressOptions: ListEgressOptions = {
roomName,
active: true
};
// Attempt to acquire lock.
// Note: using a high TTL to prevent expiration during a long recording.
acquiredLock = await this.acquireRoomRecordingActiveLock(roomName);
const [activeEgressResult, roomResult] = await Promise.allSettled([
this.livekitService.getEgress(egressOptions),
this.livekitService.getRoom(roomName)
]);
if (!acquiredLock) throw errorRecordingAlreadyStarted(roomName);
// Get the results of the promises
const activeEgress = activeEgressResult.status === 'fulfilled' ? activeEgressResult.value : null;
const room = roomResult.status === 'fulfilled' ? roomResult.value : null;
const room = await this.roomService.getOpenViduRoom(roomName);
// If there is an active egress, it means that the recording is already started
if (!activeEgress || activeEgressResult.status === 'rejected') {
throw errorRecordingAlreadyStarted(roomName);
}
if (!room) throw errorRoomNotFound(roomName);
if (!room) {
throw errorRoomNotFound(roomName);
}
const recordingId = `${roomName}-${room.sid || Date.now()}`;
const options = this.generateCompositeOptionsFromRequest();
const output = this.generateFileOutputFromRequest(recordingId);
const output = this.generateFileOutputFromRequest(roomName);
const egressInfo = await this.livekitService.startRoomComposite(roomName, output, options);
// Return recording info without releasing the lock here,
// as it will be released in handleEgressEnded on successful completion.
return RecordingHelper.toRecordingInfo(egressInfo);
} catch (error) {
this.logger.error(`Error starting recording in room ${roomName}: ${error}`);
let payload = { error: error, statusCode: 500 };
const options: SendDataOptions = {
destinationSids: [],
topic: DataTopic.RECORDING_FAILED
};
if (error instanceof OpenViduMeetError) {
payload = { error: error.message, statusCode: error.statusCode };
}
this.roomService.sendSignal(roomName, payload, options);
if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomName);
throw error;
}
}
async stopRecording(egressId: string): Promise<RecordingInfo> {
async stopRecording(egressId: string): Promise<MeetRecordingInfo> {
try {
const options: ListEgressOptions = {
egressId,
active: true
};
const egressArray = await this.livekitService.getEgress(options);
const egressArray = await this.livekitService.getActiveEgress(undefined, egressId);
if (egressArray.length === 0) {
throw errorRecordingNotFound(egressId);
}
const egressInfo = await this.livekitService.stopEgress(egressId);
return RecordingHelper.toRecordingInfo(egressInfo);
} catch (error) {
this.logger.error(`Error stopping recording ${egressId}: ${error}`);
@ -101,19 +79,16 @@ export class RecordingService {
}
}
async deleteRecording(egressId: string, isRequestedByAdmin: boolean): Promise<RecordingInfo> {
// TODO: Implement deleteRecording method
async deleteRecording(egressId: string, role: string): Promise<MeetRecordingInfo> {
try {
// Get the recording object from the S3 bucket
const metadataObject = await this.s3Service.listObjects('.metadata', `.*${egressId}.*.json`);
const { metadataFilePath, recordingInfo } = await this.getMeetRecordingInfoFromMetadata(egressId);
if (!metadataObject.Contents || metadataObject.Contents.length === 0) {
throw errorRecordingNotFound(egressId);
}
const metadataPath = metadataObject.Contents[0].Key;
const recordingInfo = (await this.s3Service.getObjectAsJson(metadataPath!)) as RecordingInfo;
if (recordingInfo.status === RecordingStatus.STARTED) {
if (
recordingInfo.status === MeetRecordingStatus.STARTING ||
recordingInfo.status === MeetRecordingStatus.ACTIVE ||
recordingInfo.status === MeetRecordingStatus.ENDING
) {
throw errorRecordingNotStopped(egressId);
}
@ -123,15 +98,10 @@ export class RecordingService {
this.logger.info(`Deleting recording from S3 ${recordingPath}`);
await Promise.all([this.s3Service.deleteObject(metadataPath!), this.s3Service.deleteObject(recordingPath)]);
if (!isRequestedByAdmin) {
const signalOptions: SendDataOptions = {
destinationSids: [],
topic: DataTopic.RECORDING_DELETED
};
await this.roomService.sendSignal(recordingInfo.roomName, recordingInfo, signalOptions);
}
await Promise.all([
this.s3Service.deleteObject(metadataFilePath),
this.s3Service.deleteObject(recordingPath)
]);
return recordingInfo;
} catch (error) {
@ -140,18 +110,25 @@ export class RecordingService {
}
}
// TODO: Implement bulkDeleteRecordings method
async bulkDeleteRecordings(egressIds: string[], role: string): Promise<MeetRecordingInfo[]> {
const promises = egressIds.map((egressId) => this.deleteRecording(egressId, role));
return Promise.all(promises);
}
/**
* Retrieves the list of all recordings.
* @returns A promise that resolves to an array of RecordingInfo objects.
*/
async getAllRecordings(): Promise<{ recordingInfo: RecordingInfo[]; continuationToken?: string }> {
//TODO: Implement getAllRecordings method
async getAllRecordings(): Promise<{ recordingInfo: MeetRecordingInfo[]; continuationToken?: string }> {
try {
const allEgress = await this.s3Service.listObjects('.metadata', '.json');
const promises: Promise<RecordingInfo>[] = [];
const promises: Promise<MeetRecordingInfo>[] = [];
allEgress.Contents?.forEach((item) => {
if (item?.Key?.includes('.json')) {
promises.push(this.s3Service.getObjectAsJson(item.Key) as Promise<RecordingInfo>);
promises.push(this.s3Service.getObjectAsJson(item.Key) as Promise<MeetRecordingInfo>);
}
});
@ -167,10 +144,11 @@ export class RecordingService {
*
* @param roomName - The name of the room.
* @param roomId - The ID of the room.
* @returns A promise that resolves to an array of RecordingInfo objects.
* @returns A promise that resolves to an array of MeetRecordingInfo objects.
* @throws If there is an error retrieving the recordings.
*/
async getAllRecordingsByRoom(roomName: string, roomId: string): Promise<RecordingInfo[]> {
//TODO: Implement getAllRecordingsByRoom method
async getAllRecordingsByRoom(roomName: string, roomId: string): Promise<MeetRecordingInfo[]> {
try {
// Get all recordings that match the room name and room ID from the S3 bucket
const roomNameSanitized = this.sanitizeRegExp(roomName);
@ -184,9 +162,9 @@ export class RecordingService {
return [];
}
const promises: Promise<RecordingInfo>[] = [];
const promises: Promise<MeetRecordingInfo>[] = [];
metadatagObject.Contents?.forEach((item) => {
promises.push(this.s3Service.getObjectAsJson(item.Key!) as Promise<RecordingInfo>);
promises.push(this.s3Service.getObjectAsJson(item.Key!) as Promise<MeetRecordingInfo>);
});
return Promise.all(promises);
@ -196,7 +174,8 @@ export class RecordingService {
}
}
private async getRecording(egressId: string): Promise<RecordingInfo> {
//TODO: Implement getRecording method
async getRecording(egressId: string): Promise<MeetRecordingInfo> {
const egressIdSanitized = this.sanitizeRegExp(egressId);
const regexPattern = `.*${egressIdSanitized}.*\\.json`;
const metadataObject = await this.s3Service.listObjects('.metadata', regexPattern);
@ -205,17 +184,18 @@ export class RecordingService {
throw errorRecordingNotFound(egressId);
}
const recording = (await this.s3Service.getObjectAsJson(metadataObject.Contents[0].Key!)) as RecordingInfo;
const recording = (await this.s3Service.getObjectAsJson(metadataObject.Contents[0].Key!)) as MeetRecordingInfo;
return recording;
// return RecordingHelper.toRecordingInfo(recording);
}
//TODO: Implement getRecordingAsStream method
async getRecordingAsStream(
recordingId: string,
range?: string
): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> {
const RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB
const recordingInfo: RecordingInfo = await this.getRecording(recordingId);
const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId);
const recordingPath = RecordingHelper.extractFilename(recordingInfo);
if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`);
@ -240,6 +220,73 @@ export class RecordingService {
}
}
/**
* Acquires a Redis-based lock to indicate that a recording is active for a specific room.
*/
async acquireRoomRecordingActiveLock(roomName: string): Promise<RedisLock | null> {
const lockName = `${roomName}_${RedisLockName.RECORDING_ACTIVE}`;
try {
const lock = await this.mutexService.acquire(lockName, this.RECORDING_ACTIVE_LOCK_TTL);
return lock;
} catch (error) {
this.logger.warn(`Error acquiring lock ${lockName} on egress started: ${error}`);
return null;
}
}
/**
* Releases the active recording lock for a specific room.
*
* This method attempts to release a lock associated with the active recording
* of a given room.
*/
async releaseRoomRecordingActiveLock(roomName: string): Promise<void> {
if (roomName) {
const lockName = `${roomName}_${RedisLockName.RECORDING_ACTIVE}`;
try {
await this.mutexService.release(lockName);
} catch (error) {
this.logger.warn(`Error releasing lock ${lockName} on egress ended: ${error}`);
}
}
}
/**
* Sends a recording signal to OpenVidu Components within a specified room.
*
* This method constructs a signal with the appropriate topic and payload,
* and sends it to the OpenVidu Components in the given room. The payload
* is adapted to match the expected format for OpenVidu Components.
*/
sendRecordingSignalToOpenViduComponents(roomName: string, recordingInfo: MeetRecordingInfo) {
const { payload, options } = OpenViduComponentsAdapterHelper.generateRecordingSignal(recordingInfo);
return this.roomService.sendSignal(roomName, payload, options);
}
private async getMeetRecordingInfoFromMetadata(
egressId: string
): Promise<{ metadataFilePath: string; recordingInfo: MeetRecordingInfo }> {
// Get the recording object from the S3 bucket
const metadataObject = await this.s3Service.listObjects('.metadata', `.*${egressId}.*.json`);
const content = metadataObject.Contents?.[0];
if (!content) {
throw errorRecordingNotFound(egressId);
}
const metadataPath = content.Key;
if (!metadataPath) {
throw errorRecordingNotFound(egressId);
}
const recordingInfo = (await this.s3Service.getObjectAsJson(metadataPath)) as MeetRecordingInfo;
return { recordingInfo, metadataFilePath: metadataPath };
}
private generateCompositeOptionsFromRequest(layout = 'speaker'): RoomCompositeOptions {
return {
layout: layout
@ -255,9 +302,12 @@ export class RecordingService {
* @param fileName - The name of the file (default is 'recording').
* @returns The generated file output object.
*/
private generateFileOutputFromRequest(recordingId: string): EncodedFileOutput {
private generateFileOutputFromRequest(roomName: string): EncodedFileOutput {
// Added unique identifier to the file path for avoiding overwriting
const filepath = `${recordingId}/${recordingId}-${Date.now()}`;
const recordingName = `${roomName}-${uid(10)}`;
// Generate the file path with the openviud-meet subbucket and the recording prefix
const filepath = `${MEET_S3_SUBBUCKET}/${MEET_S3_RECORDINGS_PREFIX}/${roomName}/${recordingName}`;
return new EncodedFileOutput({
fileType: EncodedFileType.DEFAULT_FILETYPE,

View File

@ -9,6 +9,7 @@ import { OpenViduRoomHelper } from '../helpers/room.helper.js';
import { SystemEventService } from './system-event.service.js';
import { TaskSchedulerService } from './task-scheduler.service.js';
import { errorParticipantUnauthorized } from '../models/error.model.js';
import { OpenViduComponentsAdapterHelper } from '../helpers/index.js';
/**
* Service for managing OpenVidu Meet rooms.
@ -40,6 +41,7 @@ export class RoomService {
}
await Promise.all([
//TODO: Livekit rooms should not be created here. They should be created when a user joins a room.
this.restoreMissingLivekitRooms().catch((error) =>
this.logger.error('Error restoring missing rooms:', error)
),
@ -182,6 +184,25 @@ export class RoomService {
}
}
async sendRoomStatusSignalToOpenViduComponents(roomName: string, participantSid: string) {
// Check if recording is started in the room
const activeEgressArray = await this.livekitService.getActiveEgress(roomName);
const isRecordingStarted = activeEgressArray.length > 0;
// Skip if recording is not started
if (!isRecordingStarted) {
return;
}
// Construct the payload and signal options
const { payload, options } = OpenViduComponentsAdapterHelper.generateRoomStatusSignal(
isRecordingStarted,
participantSid
);
await this.sendSignal(roomName, payload, options);
}
/**
* Sends a signal to participants in a specified room.
*