backend: introduce FrontendEventService for frontend communication and update dependencies

This commit is contained in:
Carlos Santos 2025-07-04 15:10:14 +02:00
parent 273ad8c577
commit 7361b71a7a
8 changed files with 156 additions and 70 deletions

View File

@ -23,7 +23,8 @@ import {
DistributedEventService, DistributedEventService,
TaskSchedulerService, TaskSchedulerService,
TokenService, TokenService,
UserService UserService,
FrontendEventService
} from '../services/index.js'; } from '../services/index.js';
export const container: Container = new Container(); export const container: Container = new Container();
@ -57,6 +58,7 @@ export const registerDependencies = () => {
container.bind(UserService).toSelf().inSingletonScope(); container.bind(UserService).toSelf().inSingletonScope();
container.bind(AuthService).toSelf().inSingletonScope(); container.bind(AuthService).toSelf().inSingletonScope();
container.bind(FrontendEventService).toSelf().inSingletonScope();
container.bind(LiveKitService).toSelf().inSingletonScope(); container.bind(LiveKitService).toSelf().inSingletonScope();
container.bind(RoomService).toSelf().inSingletonScope(); container.bind(RoomService).toSelf().inSingletonScope();
container.bind(ParticipantService).toSelf().inSingletonScope(); container.bind(ParticipantService).toSelf().inSingletonScope();

View File

@ -0,0 +1,103 @@
import { MeetRoom, MeetRecordingInfo } from '@typings-ce';
import { inject, injectable } from 'inversify';
import { SendDataOptions } from 'livekit-server-sdk';
import { OpenViduComponentsAdapterHelper } from '../helpers/index.js';
import { LiveKitService, LoggerService } from './index.js';
import { MeetSignalType } from '../typings/ce/event.model.js';
/**
* Service responsible for all communication with the frontend
* Centralizes all signals and events sent to the frontend
*/
@injectable()
export class FrontendEventService {
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(LiveKitService) protected livekitService: LiveKitService
) {}
/**
* Sends a recording signal to OpenVidu Components within a specified room.
*
* This method constructs a signal with the appropriate topic and payload,
* and sends it to the OpenVidu Components in the given room. The payload
* is adapted to match the expected format for OpenVidu Components.
*/
async sendRecordingSignalToOpenViduComponents(roomId: string, recordingInfo: MeetRecordingInfo) {
this.logger.debug(`Sending recording signal to OpenVidu Components for room '${roomId}'`);
const { payload, options } = OpenViduComponentsAdapterHelper.generateRecordingSignal(recordingInfo);
try {
await this.sendSignal(roomId, payload, options);
} catch (error) {
this.logger.debug(`Error sending recording signal to OpenVidu Components for room '${roomId}': ${error}`);
}
}
/**
* Sends a room status signal to OpenVidu Components.
*
* This method checks if recording is started in the room and sends a signal
* with the room status to OpenVidu Components. If recording is not started,
* it skips sending the signal.
*/
async sendRoomStatusSignalToOpenViduComponents(roomId: string, participantSid: string) {
this.logger.debug(`Sending room status signal for room ${roomId} to OpenVidu Components.`);
try {
// Check if recording is started in the room
const activeEgressArray = await this.livekitService.getActiveEgress(roomId);
const isRecordingStarted = activeEgressArray.length > 0;
// Skip if recording is not started
if (!isRecordingStarted) {
return;
}
// Construct the payload and signal options
const { payload, options } = OpenViduComponentsAdapterHelper.generateRoomStatusSignal(
isRecordingStarted,
participantSid
);
await this.sendSignal(roomId, payload, options);
} catch (error) {
this.logger.debug(`Error sending room status signal for room ${roomId}:`, error);
}
}
/**
* Sends a signal to notify participants in a room about updated room preferences.
*/
async sendRoomPreferencesUpdatedSignal(roomId: string, updatedRoom: MeetRoom): Promise<void> {
this.logger.debug(`Sending room preferences updated signal for room ${roomId}`);
try {
const payload = {
roomId,
preferences: updatedRoom.preferences
};
const options: SendDataOptions = {
topic: MeetSignalType.MEET_ROOM_PREFERENCES_UPDATED
};
await this.sendSignal(roomId, payload, options);
} catch (error) {
this.logger.error(`Error sending room preferences updated signal for room ${roomId}:`, error);
}
}
/**
* Generic method to send signals to the frontend
*/
protected async sendSignal(
roomId: string,
rawData: Record<string, unknown>,
options: SendDataOptions
): Promise<void> {
this.logger.verbose(`Notifying participants in room ${roomId}: "${options.topic}".`);
await this.livekitService.sendData(roomId, rawData, options);
}
}

View File

@ -11,6 +11,7 @@ export * from './user.service.js';
export * from './auth.service.js'; export * from './auth.service.js';
export * from './livekit.service.js'; export * from './livekit.service.js';
export * from './frontend-event.service.js';
export * from './room.service.js'; export * from './room.service.js';
export * from './participant.service.js'; export * from './participant.service.js';
export * from './recording.service.js'; export * from './recording.service.js';

View File

@ -14,6 +14,7 @@ import {
RoomService, RoomService,
DistributedEventService DistributedEventService
} from './index.js'; } from './index.js';
import { FrontendEventService } from './frontend-event.service.js';
@injectable() @injectable()
export class LivekitWebhookService { export class LivekitWebhookService {
@ -26,6 +27,7 @@ export class LivekitWebhookService {
@inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService, @inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService,
@inject(MutexService) protected mutexService: MutexService, @inject(MutexService) protected mutexService: MutexService,
@inject(DistributedEventService) protected distributedEventService: DistributedEventService, @inject(DistributedEventService) protected distributedEventService: DistributedEventService,
@inject(FrontendEventService) protected frontendEventService: FrontendEventService,
@inject(LoggerService) protected logger: LoggerService @inject(LoggerService) protected logger: LoggerService
) { ) {
this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET); this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
@ -139,7 +141,7 @@ export class LivekitWebhookService {
if (this.livekitService.isEgressParticipant(participant)) return; if (this.livekitService.isEgressParticipant(participant)) return;
try { try {
await this.roomService.sendRoomStatusSignalToOpenViduComponents(room.name, participant.sid); await this.frontendEventService.sendRoomStatusSignalToOpenViduComponents(room.name, participant.sid);
} catch (error) { } catch (error) {
this.logger.error('Error sending room status signal on participant join:', error); this.logger.error('Error sending room status signal on participant join:', error);
} }
@ -229,7 +231,7 @@ export class LivekitWebhookService {
// Common tasks for all webhook types // Common tasks for all webhook types
const commonTasks = [ const commonTasks = [
this.storageService.saveRecordingMetadata(recordingInfo), this.storageService.saveRecordingMetadata(recordingInfo),
this.recordingService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo) this.frontendEventService.sendRecordingSignalToOpenViduComponents(roomId, recordingInfo)
]; ];
const specificTasks: Promise<unknown>[] = []; const specificTasks: Promise<unknown>[] = [];

View File

@ -6,8 +6,9 @@ import { Readable } from 'stream';
import { uid } from 'uid'; import { uid } from 'uid';
import INTERNAL_CONFIG from '../config/internal-config.js'; import INTERNAL_CONFIG from '../config/internal-config.js';
import { MEET_S3_SUBBUCKET } from '../environment.js'; import { MEET_S3_SUBBUCKET } from '../environment.js';
import { MeetLock, OpenViduComponentsAdapterHelper, RecordingHelper, UtilsHelper } from '../helpers/index.js'; import { MeetLock, RecordingHelper, UtilsHelper } from '../helpers/index.js';
import { import {
DistributedEventType,
errorRecordingAlreadyStarted, errorRecordingAlreadyStarted,
errorRecordingAlreadyStopped, errorRecordingAlreadyStopped,
errorRecordingCannotBeStoppedWhileStarting, errorRecordingCannotBeStoppedWhileStarting,
@ -19,10 +20,10 @@ import {
isErrorRecordingAlreadyStopped, isErrorRecordingAlreadyStopped,
isErrorRecordingCannotBeStoppedWhileStarting, isErrorRecordingCannotBeStoppedWhileStarting,
isErrorRecordingNotFound, isErrorRecordingNotFound,
OpenViduMeetError, OpenViduMeetError
DistributedEventType
} from '../models/index.js'; } from '../models/index.js';
import { import {
DistributedEventService,
IScheduledTask, IScheduledTask,
LiveKitService, LiveKitService,
LoggerService, LoggerService,
@ -30,7 +31,6 @@ import {
MutexService, MutexService,
RedisLock, RedisLock,
RoomService, RoomService,
DistributedEventService,
TaskSchedulerService TaskSchedulerService
} from './index.js'; } from './index.js';
@ -254,7 +254,10 @@ export class RecordingService {
if (recRoomId !== roomId) { if (recRoomId !== roomId) {
this.logger.warn(`Skipping recording '${recordingId}' as it does not belong to room '${roomId}'`); this.logger.warn(`Skipping recording '${recordingId}' as it does not belong to room '${roomId}'`);
notDeletedRecordings.add({ recordingId, error: `Recording '${recordingId}' does not belong to room '${roomId}'` }); notDeletedRecordings.add({
recordingId,
error: `Recording '${recordingId}' does not belong to room '${roomId}'`
});
continue; continue;
} }
} }
@ -496,24 +499,6 @@ export class RecordingService {
} }
} }
/**
* Sends a recording signal to OpenVidu Components within a specified room.
*
* This method constructs a signal with the appropriate topic and payload,
* and sends it to the OpenVidu Components in the given room. The payload
* is adapted to match the expected format for OpenVidu Components.
*/
async sendRecordingSignalToOpenViduComponents(roomId: string, recordingInfo: MeetRecordingInfo) {
this.logger.debug(`Sending recording signal to OpenVidu Components for room '${roomId}'`);
const { payload, options } = OpenViduComponentsAdapterHelper.generateRecordingSignal(recordingInfo);
try {
await this.roomService.sendSignal(roomId, payload, options);
} catch (error) {
this.logger.debug(`Error sending recording signal to OpenVidu Components for room '${roomId}': ${error}`);
}
}
protected generateCompositeOptionsFromRequest(layout = 'grid'): RoomCompositeOptions { protected generateCompositeOptionsFromRequest(layout = 'grid'): RoomCompositeOptions {
return { return {
layout: layout layout: layout

View File

@ -8,13 +8,13 @@ import {
RecordingPermissions RecordingPermissions
} from '@typings-ce'; } from '@typings-ce';
import { inject, injectable } from 'inversify'; import { inject, injectable } from 'inversify';
import { CreateOptions, Room, SendDataOptions } from 'livekit-server-sdk'; import { CreateOptions, Room } from 'livekit-server-sdk';
import ms from 'ms'; import ms from 'ms';
import { uid as secureUid } from 'uid/secure'; import { uid as secureUid } from 'uid/secure';
import { uid } from 'uid/single'; import { uid } from 'uid/single';
import INTERNAL_CONFIG from '../config/internal-config.js'; import INTERNAL_CONFIG from '../config/internal-config.js';
import { MEET_NAME_ID } from '../environment.js'; import { MEET_NAME_ID } from '../environment.js';
import { MeetRoomHelper, OpenViduComponentsAdapterHelper, UtilsHelper } from '../helpers/index.js'; import { MeetRoomHelper, UtilsHelper } from '../helpers/index.js';
import { import {
errorInvalidRoomSecret, errorInvalidRoomSecret,
errorRoomMetadataNotFound, errorRoomMetadataNotFound,
@ -22,13 +22,14 @@ import {
internalError internalError
} from '../models/error.model.js'; } from '../models/error.model.js';
import { import {
DistributedEventService,
IScheduledTask, IScheduledTask,
LiveKitService, LiveKitService,
LoggerService, LoggerService,
MeetStorageService, MeetStorageService,
DistributedEventService,
TaskSchedulerService, TaskSchedulerService,
TokenService TokenService,
FrontendEventService
} from './index.js'; } from './index.js';
/** /**
@ -44,6 +45,7 @@ export class RoomService {
@inject(MeetStorageService) protected storageService: MeetStorageService, @inject(MeetStorageService) protected storageService: MeetStorageService,
@inject(LiveKitService) protected livekitService: LiveKitService, @inject(LiveKitService) protected livekitService: LiveKitService,
@inject(DistributedEventService) protected distributedEventService: DistributedEventService, @inject(DistributedEventService) protected distributedEventService: DistributedEventService,
@inject(FrontendEventService) protected frontendEventService: FrontendEventService,
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService, @inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService,
@inject(TokenService) protected tokenService: TokenService @inject(TokenService) protected tokenService: TokenService
) { ) {
@ -131,7 +133,10 @@ export class RoomService {
await this.storageService.saveMeetRoom(room); await this.storageService.saveMeetRoom(room);
// Update the archived room metadata if it exists // Update the archived room metadata if it exists
await this.storageService.archiveRoomMetadata(roomId, true); await Promise.all([
this.storageService.archiveRoomMetadata(roomId, true),
this.frontendEventService.sendRoomPreferencesUpdatedSignal(roomId, room)
]);
return room; return room;
} }
@ -298,44 +303,6 @@ export class RoomService {
}; };
} }
async sendRoomStatusSignalToOpenViduComponents(roomId: string, participantSid: string) {
this.logger.debug(`Sending room status signal for room ${roomId} to OpenVidu Components.`);
try {
// Check if recording is started in the room
const activeEgressArray = await this.livekitService.getActiveEgress(roomId);
const isRecordingStarted = activeEgressArray.length > 0;
// Skip if recording is not started
if (!isRecordingStarted) {
return;
}
// Construct the payload and signal options
const { payload, options } = OpenViduComponentsAdapterHelper.generateRoomStatusSignal(
isRecordingStarted,
participantSid
);
await this.sendSignal(roomId, payload, options);
} catch (error) {
this.logger.debug(`Error sending room status signal for room ${roomId}:`, error);
}
}
/**
* Sends a signal to participants in a specified room.
*
* @param roomId - The name of the room where the signal will be sent.
* @param rawData - The raw data to be sent as the signal.
* @param options - Options for sending the data, including the topic and destination identities.
* @returns A promise that resolves when the signal has been sent.
*/
async sendSignal(roomId: string, rawData: Record<string, unknown>, options: SendDataOptions): Promise<void> {
this.logger.verbose(`Notifying participants in room ${roomId}: "${options.topic}".`);
await this.livekitService.sendData(roomId, rawData, options);
}
/** /**
* Classifies rooms into those that should be deleted immediately vs marked for deletion * Classifies rooms into those that should be deleted immediately vs marked for deletion
*/ */

View File

@ -1,4 +1,4 @@
import { afterEach, beforeAll, describe, expect, it } from '@jest/globals'; import { afterEach, beforeAll, describe, expect, it, jest } from '@jest/globals';
import { MeetRecordingAccess } from '../../../../src/typings/ce/index.js'; import { MeetRecordingAccess } from '../../../../src/typings/ce/index.js';
import { import {
createRoom, createRoom,
@ -7,6 +7,9 @@ import {
startTestServer, startTestServer,
updateRoomPreferences updateRoomPreferences
} from '../../../helpers/request-helpers.js'; } from '../../../helpers/request-helpers.js';
import { FrontendEventService } from '../../../../src/services/index.js';
import { container } from '../../../../src/config/index.js';
import { MeetSignalType } from '../../../../src/typings/ce/event.model.js';
describe('Room API Tests', () => { describe('Room API Tests', () => {
beforeAll(() => { beforeAll(() => {
@ -19,7 +22,15 @@ describe('Room API Tests', () => {
}); });
describe('Update Room Tests', () => { describe('Update Room Tests', () => {
let frontendEventService: FrontendEventService;
beforeAll(() => {
// Ensure the FrontendEventService is registered
frontendEventService = container.get(FrontendEventService);
});
it('should successfully update room preferences', async () => { it('should successfully update room preferences', async () => {
const sendSignalSpy = jest.spyOn(frontendEventService as any, 'sendSignal');
const createdRoom = await createRoom({ const createdRoom = await createRoom({
roomIdPrefix: 'update-test', roomIdPrefix: 'update-test',
preferences: { preferences: {
@ -44,6 +55,18 @@ describe('Room API Tests', () => {
const updateResponse = await updateRoomPreferences(createdRoom.roomId, updatedPreferences); const updateResponse = await updateRoomPreferences(createdRoom.roomId, updatedPreferences);
// Verify a method of frontend event service is called
expect(sendSignalSpy).toHaveBeenCalledWith(
createdRoom.roomId,
{
roomId: createdRoom.roomId,
preferences: updatedPreferences
},
{
topic: MeetSignalType.MEET_ROOM_PREFERENCES_UPDATED
}
);
// Verify update response // Verify update response
expect(updateResponse.status).toBe(200); expect(updateResponse.status).toBe(200);
expect(updateResponse.body).toBeDefined(); expect(updateResponse.body).toBeDefined();

View File

@ -0,0 +1,3 @@
export enum MeetSignalType {
MEET_ROOM_PREFERENCES_UPDATED = 'meet_room_preferences_updated',
}