From 273ad8c5779313cf18a0028786c5279e167561f3 Mon Sep 17 00:00:00 2001 From: Carlos Santos <4a.santos@gmail.com> Date: Fri, 4 Jul 2025 13:33:13 +0200 Subject: [PATCH] backend: replace SystemEventService with DistributedEventService and introduce distributed event model --- .../src/config/dependency-injector.config.ts | 4 ++-- ...nt.model.ts => distributed-event.model.ts} | 6 +++--- backend/src/models/index.ts | 2 +- ...ervice.ts => distributed-event.service.ts} | 20 +++++++++++-------- backend/src/services/index.ts | 2 +- .../src/services/livekit-webhook.service.ts | 10 +++++----- backend/src/services/recording.service.ts | 16 +++++++-------- backend/src/services/redis.service.ts | 4 ++-- backend/src/services/room.service.ts | 4 ++-- .../src/services/task-scheduler.service.ts | 4 ++-- .../api/recordings/race-conditions.test.ts | 6 +++--- 11 files changed, 41 insertions(+), 37 deletions(-) rename backend/src/models/{system-event.model.ts => distributed-event.model.ts} (52%) rename backend/src/services/{system-event.service.ts => distributed-event.service.ts} (77%) diff --git a/backend/src/config/dependency-injector.config.ts b/backend/src/config/dependency-injector.config.ts index de21723..a257331 100644 --- a/backend/src/config/dependency-injector.config.ts +++ b/backend/src/config/dependency-injector.config.ts @@ -20,7 +20,7 @@ import { StorageFactory, StorageKeyBuilder, StorageProvider, - SystemEventService, + DistributedEventService, TaskSchedulerService, TokenService, UserService @@ -45,7 +45,7 @@ export const registerDependencies = () => { console.log('Registering CE dependencies'); container.bind(LoggerService).toSelf().inSingletonScope(); container.bind(RedisService).toSelf().inSingletonScope(); - container.bind(SystemEventService).toSelf().inSingletonScope(); + container.bind(DistributedEventService).toSelf().inSingletonScope(); container.bind(MutexService).toSelf().inSingletonScope(); container.bind(TaskSchedulerService).toSelf().inSingletonScope(); diff --git a/backend/src/models/system-event.model.ts b/backend/src/models/distributed-event.model.ts similarity index 52% rename from backend/src/models/system-event.model.ts rename to backend/src/models/distributed-event.model.ts index 5e28153..dd21f99 100644 --- a/backend/src/models/system-event.model.ts +++ b/backend/src/models/distributed-event.model.ts @@ -1,11 +1,11 @@ -export const enum SystemEventType { +export const enum DistributedEventType { /** * Event emitted when a egress is active. */ RECORDING_ACTIVE = 'recording_active' } -export interface SystemEventPayload { - eventType: SystemEventType; +export interface DistributedEventPayload { + eventType: DistributedEventType; payload: Record; } diff --git a/backend/src/models/index.ts b/backend/src/models/index.ts index 23cc346..882d93e 100644 --- a/backend/src/models/index.ts +++ b/backend/src/models/index.ts @@ -1,3 +1,3 @@ export * from './error.model.js'; export * from './redis.model.js'; -export * from './system-event.model.js'; +export * from './distributed-event.model.js'; diff --git a/backend/src/services/system-event.service.ts b/backend/src/services/distributed-event.service.ts similarity index 77% rename from backend/src/services/system-event.service.ts rename to backend/src/services/distributed-event.service.ts index afaf4d1..1431c30 100644 --- a/backend/src/services/system-event.service.ts +++ b/backend/src/services/distributed-event.service.ts @@ -1,10 +1,14 @@ import { EventEmitter } from 'events'; import { inject, injectable } from 'inversify'; -import { SystemEventPayload, SystemEventType } from '../models/system-event.model.js'; +import { DistributedEventPayload, DistributedEventType } from '../models/distributed-event.model.js'; import { LoggerService, RedisService } from './index.js'; +/** + * Service for managing distributed events using Redis pub/sub pattern. + * Handles internal communication between services and microservices. + */ @injectable() -export class SystemEventService { +export class DistributedEventService { protected emitter: EventEmitter = new EventEmitter(); protected readonly OPENVIDU_MEET_CHANNEL = 'ov_meet_channel'; constructor( @@ -21,7 +25,7 @@ export class SystemEventService { * @param event The event type to subscribe to. * @param listener The callback to invoke when the event is emitted. */ - on(event: SystemEventType, listener: (payload: Record) => void): void { + on(event: DistributedEventType, listener: (payload: Record) => void): void { this.emitter.on(event, listener); } @@ -31,7 +35,7 @@ export class SystemEventService { * @param event The event type to subscribe to. * @param listener The callback to invoke when the event is emitted. */ - once(event: SystemEventType, listener: (payload: Record) => void): void { + once(event: DistributedEventType, listener: (payload: Record) => void): void { this.emitter.once(event, listener); } @@ -41,7 +45,7 @@ export class SystemEventService { * @param event The event type to unsubscribe from. * @param listener Optional: the specific listener to remove. If not provided, all listeners for that event are removed. */ - off(event: SystemEventType, listener?: (payload: Record) => void): void { + off(event: DistributedEventType, listener?: (payload: Record) => void): void { if (listener) { this.emitter.off(event, listener); } else { @@ -56,7 +60,7 @@ export class SystemEventService { * @param type The event type. * @param payload The event payload. */ - async publishEvent(eventType: SystemEventType, payload: Record): Promise { + async publishEvent(eventType: DistributedEventType, payload: Record): Promise { const message = JSON.stringify({ eventType, payload }); this.logger.verbose(`Publishing system event: ${eventType}`, payload); await this.redisService.publishEvent(this.OPENVIDU_MEET_CHANNEL, message); @@ -84,7 +88,7 @@ export class SystemEventService { */ protected handleRedisMessage(message: string): void { try { - const eventData: SystemEventPayload = JSON.parse(message); + const eventData: DistributedEventPayload = JSON.parse(message); const { eventType, payload } = eventData; if (!eventType) { @@ -97,7 +101,7 @@ export class SystemEventService { // Forward the event to all listeners this.emitter.emit(eventType, payload); } catch (error) { - this.logger.error('Error parsing redis message in SystemEventsService:', error); + this.logger.error('Error parsing redis message in DistributedEventService:', error); } } } diff --git a/backend/src/services/index.ts b/backend/src/services/index.ts index ed418e3..222270c 100644 --- a/backend/src/services/index.ts +++ b/backend/src/services/index.ts @@ -1,6 +1,6 @@ export * from './logger.service.js'; export * from './redis.service.js'; -export * from './system-event.service.js'; +export * from './distributed-event.service.js'; export * from './mutex.service.js'; export * from './task-scheduler.service.js'; diff --git a/backend/src/services/livekit-webhook.service.ts b/backend/src/services/livekit-webhook.service.ts index 5937d6f..fee1580 100644 --- a/backend/src/services/livekit-webhook.service.ts +++ b/backend/src/services/livekit-webhook.service.ts @@ -3,7 +3,7 @@ import { inject, injectable } from 'inversify'; import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk'; import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET } from '../environment.js'; import { MeetRoomHelper, RecordingHelper } from '../helpers/index.js'; -import { SystemEventType } from '../models/system-event.model.js'; +import { DistributedEventType } from '../models/distributed-event.model.js'; import { LiveKitService, LoggerService, @@ -12,7 +12,7 @@ import { OpenViduWebhookService, RecordingService, RoomService, - SystemEventService + DistributedEventService } from './index.js'; @injectable() @@ -25,7 +25,7 @@ export class LivekitWebhookService { @inject(MeetStorageService) protected storageService: MeetStorageService, @inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService, @inject(MutexService) protected mutexService: MutexService, - @inject(SystemEventService) protected systemEventService: SystemEventService, + @inject(DistributedEventService) protected distributedEventService: DistributedEventService, @inject(LoggerService) protected logger: LoggerService ) { this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET); @@ -249,8 +249,8 @@ export class LivekitWebhookService { if (recordingInfo.status === MeetRecordingStatus.ACTIVE) { // Send system event for active recording with the aim of cancelling the cleanup timer specificTasks.push( - this.systemEventService.publishEvent( - SystemEventType.RECORDING_ACTIVE, + this.distributedEventService.publishEvent( + DistributedEventType.RECORDING_ACTIVE, recordingInfo as unknown as Record ) ); diff --git a/backend/src/services/recording.service.ts b/backend/src/services/recording.service.ts index 8d9783e..f9e45ce 100644 --- a/backend/src/services/recording.service.ts +++ b/backend/src/services/recording.service.ts @@ -20,7 +20,7 @@ import { isErrorRecordingCannotBeStoppedWhileStarting, isErrorRecordingNotFound, OpenViduMeetError, - SystemEventType + DistributedEventType } from '../models/index.js'; import { IScheduledTask, @@ -30,7 +30,7 @@ import { MutexService, RedisLock, RoomService, - SystemEventService, + DistributedEventService, TaskSchedulerService } from './index.js'; @@ -41,7 +41,7 @@ export class RecordingService { @inject(RoomService) protected roomService: RoomService, @inject(MutexService) protected mutexService: MutexService, @inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService, - @inject(SystemEventService) protected systemEventService: SystemEventService, + @inject(DistributedEventService) protected systemEventService: DistributedEventService, @inject(MeetStorageService) protected storageService: MeetStorageService, @inject(LoggerService) protected logger: LoggerService ) { @@ -77,7 +77,7 @@ export class RecordingService { isOperationCompleted = true; //Clean up the event listener and timeout - this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener); this.handleRecordingLockTimeout(recordingId, roomId).catch(() => {}); reject(errorRecordingStartTimeout(roomId)); }, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT)); @@ -92,11 +92,11 @@ export class RecordingService { isOperationCompleted = true; clearTimeout(timeoutId); - this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener); resolve(info as unknown as MeetRecordingInfo); }; - this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener); + this.systemEventService.on(DistributedEventType.RECORDING_ACTIVE, eventListener); }); const startRecordingPromise = (async (): Promise => { @@ -119,7 +119,7 @@ export class RecordingService { if (!isOperationCompleted) { isOperationCompleted = true; clearTimeout(timeoutId); - this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener); return recordingInfo; } } @@ -153,7 +153,7 @@ export class RecordingService { // This prevents unnecessary cleanup operations when the request was rejected // due to another recording already in progress in this room. clearTimeout(timeoutId); - this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener); + this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener); await this.releaseRecordingLockIfNoEgress(roomId); } } catch (e) { diff --git a/backend/src/services/redis.service.ts b/backend/src/services/redis.service.ts index 300cde5..e360b54 100644 --- a/backend/src/services/redis.service.ts +++ b/backend/src/services/redis.service.ts @@ -14,7 +14,7 @@ import { REDIS_SENTINEL_PASSWORD, REDIS_USERNAME } from '../environment.js'; -import { internalError, SystemEventPayload } from '../models/index.js'; +import { internalError, DistributedEventPayload } from '../models/index.js'; import { LoggerService } from './index.js'; @injectable() @@ -24,7 +24,7 @@ export class RedisService extends EventEmitter { protected redisPublisher: Redis; protected redisSubscriber: Redis; protected isConnected = false; - protected eventHandler?: (event: SystemEventPayload) => void; + protected eventHandler?: (event: DistributedEventPayload) => void; constructor(@inject(LoggerService) protected logger: LoggerService) { super(); diff --git a/backend/src/services/room.service.ts b/backend/src/services/room.service.ts index aca6371..a8488c4 100644 --- a/backend/src/services/room.service.ts +++ b/backend/src/services/room.service.ts @@ -26,7 +26,7 @@ import { LiveKitService, LoggerService, MeetStorageService, - SystemEventService, + DistributedEventService, TaskSchedulerService, TokenService } from './index.js'; @@ -43,7 +43,7 @@ export class RoomService { @inject(LoggerService) protected logger: LoggerService, @inject(MeetStorageService) protected storageService: MeetStorageService, @inject(LiveKitService) protected livekitService: LiveKitService, - @inject(SystemEventService) protected systemEventService: SystemEventService, + @inject(DistributedEventService) protected distributedEventService: DistributedEventService, @inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService, @inject(TokenService) protected tokenService: TokenService ) { diff --git a/backend/src/services/task-scheduler.service.ts b/backend/src/services/task-scheduler.service.ts index 7920aa7..2e76736 100644 --- a/backend/src/services/task-scheduler.service.ts +++ b/backend/src/services/task-scheduler.service.ts @@ -3,7 +3,7 @@ import { inject, injectable } from 'inversify'; import ms from 'ms'; import INTERNAL_CONFIG from '../config/internal-config.js'; import { MeetLock } from '../helpers/index.js'; -import { LoggerService, MutexService, SystemEventService } from './index.js'; +import { LoggerService, MutexService, DistributedEventService } from './index.js'; export type TaskType = 'cron' | 'timeout'; @@ -22,7 +22,7 @@ export class TaskSchedulerService { constructor( @inject(LoggerService) protected logger: LoggerService, - @inject(SystemEventService) protected systemEventService: SystemEventService, + @inject(DistributedEventService) protected systemEventService: DistributedEventService, @inject(MutexService) protected mutexService: MutexService ) { this.systemEventService.onRedisReady(() => { diff --git a/backend/tests/integration/api/recordings/race-conditions.test.ts b/backend/tests/integration/api/recordings/race-conditions.test.ts index e47dbd8..b172402 100644 --- a/backend/tests/integration/api/recordings/race-conditions.test.ts +++ b/backend/tests/integration/api/recordings/race-conditions.test.ts @@ -1,7 +1,7 @@ import { afterEach, beforeAll, describe, expect, it, jest } from '@jest/globals'; import { container } from '../../../../src/config/index.js'; import { setInternalConfig } from '../../../../src/config/internal-config.js'; -import { SystemEventType } from '../../../../src/models/system-event.model.js'; +import { DistributedEventType } from '../../../../src/models/distributed-event.model.js'; import { RecordingService } from '../../../../src/services'; import { expectValidStartRecordingResponse, @@ -68,7 +68,7 @@ describe('Recording API Race Conditions Tests', () => { try { // Attempt to start recording const result = await startRecording(roomData.room.roomId, roomData.moderatorCookie); - expect(eventServiceOffSpy).toHaveBeenCalledWith(SystemEventType.RECORDING_ACTIVE, expect.any(Function)); + expect(eventServiceOffSpy).toHaveBeenCalledWith(DistributedEventType.RECORDING_ACTIVE, expect.any(Function)); expect(handleRecordingLockTimeoutSpy).not.toHaveBeenCalledWith( '', // empty recordingId since it never started roomData.room.roomId @@ -121,7 +121,7 @@ describe('Recording API Race Conditions Tests', () => { // Start recording with a short timeout const result = await startRecording(roomData.room.roomId, roomData.moderatorCookie); - expect(eventServiceOffSpy).toHaveBeenCalledWith(SystemEventType.RECORDING_ACTIVE, expect.any(Function)); + expect(eventServiceOffSpy).toHaveBeenCalledWith(DistributedEventType.RECORDING_ACTIVE, expect.any(Function)); // Expect the recording to fail due to timeout expect(handleTimeoutSpy).toHaveBeenCalledWith( '', // empty recordingId since it never started