import { inject, injectable } from '../config/dependency-injector.config.js'; import { AccessToken, CreateOptions, DataPacket_Kind, EgressClient, EgressInfo, EgressStatus, EncodedFileOutput, ListEgressOptions, ParticipantInfo, Room, RoomCompositeOptions, RoomServiceClient, SendDataOptions, StreamOutput } from 'livekit-server-sdk'; import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, LIVEKIT_URL, LIVEKIT_URL_PRIVATE, MEET_PARTICIPANT_TOKEN_EXPIRATION } from '../environment.js'; import { LoggerService } from './logger.service.js'; import { errorLivekitIsNotAvailable, errorParticipantNotFound, errorRoomNotFound, internalError } from '../models/error.model.js'; import { ParticipantPermissions, ParticipantRole, TokenOptions } from '@typings-ce'; @injectable() export class LiveKitService { private egressClient: EgressClient; private roomClient: RoomServiceClient; constructor(@inject(LoggerService) protected logger: LoggerService) { const livekitUrlHostname = LIVEKIT_URL_PRIVATE.replace(/^ws:/, 'http:').replace(/^wss:/, 'https:'); this.egressClient = new EgressClient(livekitUrlHostname, LIVEKIT_API_KEY, LIVEKIT_API_SECRET); this.roomClient = new RoomServiceClient(livekitUrlHostname, LIVEKIT_API_KEY, LIVEKIT_API_SECRET); } async createRoom(options: CreateOptions): Promise { try { return await this.roomClient.createRoom(options); } catch (error) { this.logger.error('Error creating LiveKit room:', error); throw internalError(`Error creating room: ${error}`); } } async getRoom(roomName: string): Promise { let rooms: Room[] = []; try { rooms = await this.roomClient.listRooms([roomName]); } catch (error) { this.logger.error(`Error getting room ${error}`); throw internalError(`Error getting room: ${error}`); } if (rooms.length === 0) { throw errorRoomNotFound(roomName); } return rooms[0]; } async listRooms(): Promise { try { return await this.roomClient.listRooms(); } catch (error) { this.logger.error(`Error getting LiveKit rooms ${error}`); throw internalError(`Error getting rooms: ${error}`); } } async deleteRoom(roomName: string): Promise { try { try { await this.getRoom(roomName); } catch (error) { this.logger.warn(`Livekit Room ${roomName} not found. Skipping deletion.`); return; } await this.roomClient.deleteRoom(roomName); } catch (error) { this.logger.error(`Error deleting LiveKit room ${error}`); throw internalError(`Error deleting room: ${error}`); } } async getParticipant(roomName: string, participantName: string): Promise { try { return await this.roomClient.getParticipant(roomName, participantName); } catch (error) { this.logger.warn(`Participant ${participantName} not found in room ${roomName} ${error}`); throw internalError(`Error getting participant: ${error}`); } } async deleteParticipant(participantName: string, roomName: string): Promise { const participantExists = await this.participantExists(roomName, participantName); if (!participantExists) { throw errorParticipantNotFound(participantName, roomName); } await this.roomClient.removeParticipant(roomName, participantName); } async sendData(roomName: string, rawData: Record, options: SendDataOptions): Promise { try { if (this.roomClient) { const data: Uint8Array = new TextEncoder().encode(JSON.stringify(rawData)); await this.roomClient.sendData(roomName, data, DataPacket_Kind.RELIABLE, options); } else { throw internalError(`No RoomServiceClient available`); } } catch (error) { this.logger.error(`Error sending data ${error}`); throw internalError(`Error sending data: ${error}`); } } async generateToken( options: TokenOptions, permissions: ParticipantPermissions, role: ParticipantRole ): Promise { const { roomName, participantName } = options; this.logger.info(`Generating token for ${participantName} in room ${roomName}`); const at = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, { identity: participantName, name: participantName, ttl: MEET_PARTICIPANT_TOKEN_EXPIRATION, metadata: JSON.stringify({ livekitUrl: LIVEKIT_URL, role, permissions: permissions.openvidu }) }); at.addGrant(permissions.livekit); return at.toJwt(); } async startRoomComposite( roomName: string, output: EncodedFileOutput | StreamOutput, options: RoomCompositeOptions ): Promise { try { return await this.egressClient.startRoomCompositeEgress(roomName, output, options); } catch (error: any) { this.logger.error('Error starting Room Composite Egress'); throw internalError(`Error starting Room Composite Egress: ${JSON.stringify(error)}`); } } async stopEgress(egressId: string): Promise { try { this.logger.info(`Stopping ${egressId} egress`); return await this.egressClient.stopEgress(egressId); } catch (error: any) { this.logger.error(`Error stopping egress: JSON.stringify(error)`); throw internalError(`Error stopping egress: ${error}`); } } /** * Retrieves a list of egress information based on the provided options. * * @param {ListEgressOptions} options - The options to filter the egress list. * @returns {Promise} A promise that resolves to an array of EgressInfo objects. * @throws Will throw an error if there is an issue retrieving the egress information. */ async getEgress(roomName?: string, egressId?: string): Promise { try { const options: ListEgressOptions = { roomName, egressId }; return await this.egressClient.listEgress(options); } catch (error: any) { if (error.message.includes('404')) { return []; } this.logger.error(`Error getting egress: ${JSON.stringify(error)}`); throw internalError(`Error getting egress: ${error}`); } } /** * Retrieves a list of active egress information based on the provided egress ID. * * @param egressId - The unique identifier of the egress to retrieve. * @returns A promise that resolves to an array of `EgressInfo` objects representing the active egress. * @throws Will throw an error if there is an issue retrieving the egress information. */ async getActiveEgress(roomName?: string, egressId?: string): Promise { try { const options: ListEgressOptions = { roomName, egressId, active: true }; const egress = await this.egressClient.listEgress(options); // In some cases, the egress list may contain egress that their status is ENDINDG // which means that the egress is still active but it is in the process of stopping. // We need to filter those out. return egress.filter((e) => e.status === EgressStatus.EGRESS_ACTIVE); } catch (error: any) { if (error.message.includes('404')) { return []; } this.logger.error(`Error getting egress: ${JSON.stringify(error)}`); throw internalError(`Error getting egress: ${error}`); } } isEgressParticipant(participant: ParticipantInfo): boolean { // TODO: Remove deprecated warning by using ParticipantInfo_Kind: participant.kind === ParticipantInfo_Kind.EGRESS; return participant.identity.startsWith('EG_') && participant.permission?.recorder === true; } private async participantExists(roomName: string, participantName: string): Promise { try { const participants: ParticipantInfo[] = await this.roomClient.listParticipants(roomName); return participants.some((participant) => participant.identity === participantName); } catch (error: any) { this.logger.error(error); if (error?.cause?.code === 'ECONNREFUSED') { throw errorLivekitIsNotAvailable(); } return false; } } }