backend: replace SystemEventService with DistributedEventService and introduce distributed event model

This commit is contained in:
Carlos Santos 2025-07-04 13:33:13 +02:00
parent 2aa10918f8
commit 273ad8c577
11 changed files with 41 additions and 37 deletions

View File

@ -20,7 +20,7 @@ import {
StorageFactory,
StorageKeyBuilder,
StorageProvider,
SystemEventService,
DistributedEventService,
TaskSchedulerService,
TokenService,
UserService
@ -45,7 +45,7 @@ export const registerDependencies = () => {
console.log('Registering CE dependencies');
container.bind(LoggerService).toSelf().inSingletonScope();
container.bind(RedisService).toSelf().inSingletonScope();
container.bind(SystemEventService).toSelf().inSingletonScope();
container.bind(DistributedEventService).toSelf().inSingletonScope();
container.bind(MutexService).toSelf().inSingletonScope();
container.bind(TaskSchedulerService).toSelf().inSingletonScope();

View File

@ -1,11 +1,11 @@
export const enum SystemEventType {
export const enum DistributedEventType {
/**
* Event emitted when a egress is active.
*/
RECORDING_ACTIVE = 'recording_active'
}
export interface SystemEventPayload {
eventType: SystemEventType;
export interface DistributedEventPayload {
eventType: DistributedEventType;
payload: Record<string, unknown>;
}

View File

@ -1,3 +1,3 @@
export * from './error.model.js';
export * from './redis.model.js';
export * from './system-event.model.js';
export * from './distributed-event.model.js';

View File

@ -1,10 +1,14 @@
import { EventEmitter } from 'events';
import { inject, injectable } from 'inversify';
import { SystemEventPayload, SystemEventType } from '../models/system-event.model.js';
import { DistributedEventPayload, DistributedEventType } from '../models/distributed-event.model.js';
import { LoggerService, RedisService } from './index.js';
/**
* Service for managing distributed events using Redis pub/sub pattern.
* Handles internal communication between services and microservices.
*/
@injectable()
export class SystemEventService {
export class DistributedEventService {
protected emitter: EventEmitter = new EventEmitter();
protected readonly OPENVIDU_MEET_CHANNEL = 'ov_meet_channel';
constructor(
@ -21,7 +25,7 @@ export class SystemEventService {
* @param event The event type to subscribe to.
* @param listener The callback to invoke when the event is emitted.
*/
on(event: SystemEventType, listener: (payload: Record<string, unknown>) => void): void {
on(event: DistributedEventType, listener: (payload: Record<string, unknown>) => void): void {
this.emitter.on(event, listener);
}
@ -31,7 +35,7 @@ export class SystemEventService {
* @param event The event type to subscribe to.
* @param listener The callback to invoke when the event is emitted.
*/
once(event: SystemEventType, listener: (payload: Record<string, unknown>) => void): void {
once(event: DistributedEventType, listener: (payload: Record<string, unknown>) => void): void {
this.emitter.once(event, listener);
}
@ -41,7 +45,7 @@ export class SystemEventService {
* @param event The event type to unsubscribe from.
* @param listener Optional: the specific listener to remove. If not provided, all listeners for that event are removed.
*/
off(event: SystemEventType, listener?: (payload: Record<string, unknown>) => void): void {
off(event: DistributedEventType, listener?: (payload: Record<string, unknown>) => void): void {
if (listener) {
this.emitter.off(event, listener);
} else {
@ -56,7 +60,7 @@ export class SystemEventService {
* @param type The event type.
* @param payload The event payload.
*/
async publishEvent(eventType: SystemEventType, payload: Record<string, unknown>): Promise<void> {
async publishEvent(eventType: DistributedEventType, payload: Record<string, unknown>): Promise<void> {
const message = JSON.stringify({ eventType, payload });
this.logger.verbose(`Publishing system event: ${eventType}`, payload);
await this.redisService.publishEvent(this.OPENVIDU_MEET_CHANNEL, message);
@ -84,7 +88,7 @@ export class SystemEventService {
*/
protected handleRedisMessage(message: string): void {
try {
const eventData: SystemEventPayload = JSON.parse(message);
const eventData: DistributedEventPayload = JSON.parse(message);
const { eventType, payload } = eventData;
if (!eventType) {
@ -97,7 +101,7 @@ export class SystemEventService {
// Forward the event to all listeners
this.emitter.emit(eventType, payload);
} catch (error) {
this.logger.error('Error parsing redis message in SystemEventsService:', error);
this.logger.error('Error parsing redis message in DistributedEventService:', error);
}
}
}

View File

@ -1,6 +1,6 @@
export * from './logger.service.js';
export * from './redis.service.js';
export * from './system-event.service.js';
export * from './distributed-event.service.js';
export * from './mutex.service.js';
export * from './task-scheduler.service.js';

View File

@ -3,7 +3,7 @@ import { inject, injectable } from 'inversify';
import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk';
import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET } from '../environment.js';
import { MeetRoomHelper, RecordingHelper } from '../helpers/index.js';
import { SystemEventType } from '../models/system-event.model.js';
import { DistributedEventType } from '../models/distributed-event.model.js';
import {
LiveKitService,
LoggerService,
@ -12,7 +12,7 @@ import {
OpenViduWebhookService,
RecordingService,
RoomService,
SystemEventService
DistributedEventService
} from './index.js';
@injectable()
@ -25,7 +25,7 @@ export class LivekitWebhookService {
@inject(MeetStorageService) protected storageService: MeetStorageService,
@inject(OpenViduWebhookService) protected openViduWebhookService: OpenViduWebhookService,
@inject(MutexService) protected mutexService: MutexService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(DistributedEventService) protected distributedEventService: DistributedEventService,
@inject(LoggerService) protected logger: LoggerService
) {
this.webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
@ -249,8 +249,8 @@ export class LivekitWebhookService {
if (recordingInfo.status === MeetRecordingStatus.ACTIVE) {
// Send system event for active recording with the aim of cancelling the cleanup timer
specificTasks.push(
this.systemEventService.publishEvent(
SystemEventType.RECORDING_ACTIVE,
this.distributedEventService.publishEvent(
DistributedEventType.RECORDING_ACTIVE,
recordingInfo as unknown as Record<string, unknown>
)
);

View File

@ -20,7 +20,7 @@ import {
isErrorRecordingCannotBeStoppedWhileStarting,
isErrorRecordingNotFound,
OpenViduMeetError,
SystemEventType
DistributedEventType
} from '../models/index.js';
import {
IScheduledTask,
@ -30,7 +30,7 @@ import {
MutexService,
RedisLock,
RoomService,
SystemEventService,
DistributedEventService,
TaskSchedulerService
} from './index.js';
@ -41,7 +41,7 @@ export class RecordingService {
@inject(RoomService) protected roomService: RoomService,
@inject(MutexService) protected mutexService: MutexService,
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(DistributedEventService) protected systemEventService: DistributedEventService,
@inject(MeetStorageService) protected storageService: MeetStorageService,
@inject(LoggerService) protected logger: LoggerService
) {
@ -77,7 +77,7 @@ export class RecordingService {
isOperationCompleted = true;
//Clean up the event listener and timeout
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener);
this.handleRecordingLockTimeout(recordingId, roomId).catch(() => {});
reject(errorRecordingStartTimeout(roomId));
}, ms(INTERNAL_CONFIG.RECORDING_STARTED_TIMEOUT));
@ -92,11 +92,11 @@ export class RecordingService {
isOperationCompleted = true;
clearTimeout(timeoutId);
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener);
resolve(info as unknown as MeetRecordingInfo);
};
this.systemEventService.on(SystemEventType.RECORDING_ACTIVE, eventListener);
this.systemEventService.on(DistributedEventType.RECORDING_ACTIVE, eventListener);
});
const startRecordingPromise = (async (): Promise<MeetRecordingInfo> => {
@ -119,7 +119,7 @@ export class RecordingService {
if (!isOperationCompleted) {
isOperationCompleted = true;
clearTimeout(timeoutId);
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener);
return recordingInfo;
}
}
@ -153,7 +153,7 @@ export class RecordingService {
// This prevents unnecessary cleanup operations when the request was rejected
// due to another recording already in progress in this room.
clearTimeout(timeoutId);
this.systemEventService.off(SystemEventType.RECORDING_ACTIVE, eventListener);
this.systemEventService.off(DistributedEventType.RECORDING_ACTIVE, eventListener);
await this.releaseRecordingLockIfNoEgress(roomId);
}
} catch (e) {

View File

@ -14,7 +14,7 @@ import {
REDIS_SENTINEL_PASSWORD,
REDIS_USERNAME
} from '../environment.js';
import { internalError, SystemEventPayload } from '../models/index.js';
import { internalError, DistributedEventPayload } from '../models/index.js';
import { LoggerService } from './index.js';
@injectable()
@ -24,7 +24,7 @@ export class RedisService extends EventEmitter {
protected redisPublisher: Redis;
protected redisSubscriber: Redis;
protected isConnected = false;
protected eventHandler?: (event: SystemEventPayload) => void;
protected eventHandler?: (event: DistributedEventPayload) => void;
constructor(@inject(LoggerService) protected logger: LoggerService) {
super();

View File

@ -26,7 +26,7 @@ import {
LiveKitService,
LoggerService,
MeetStorageService,
SystemEventService,
DistributedEventService,
TaskSchedulerService,
TokenService
} from './index.js';
@ -43,7 +43,7 @@ export class RoomService {
@inject(LoggerService) protected logger: LoggerService,
@inject(MeetStorageService) protected storageService: MeetStorageService,
@inject(LiveKitService) protected livekitService: LiveKitService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(DistributedEventService) protected distributedEventService: DistributedEventService,
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService,
@inject(TokenService) protected tokenService: TokenService
) {

View File

@ -3,7 +3,7 @@ import { inject, injectable } from 'inversify';
import ms from 'ms';
import INTERNAL_CONFIG from '../config/internal-config.js';
import { MeetLock } from '../helpers/index.js';
import { LoggerService, MutexService, SystemEventService } from './index.js';
import { LoggerService, MutexService, DistributedEventService } from './index.js';
export type TaskType = 'cron' | 'timeout';
@ -22,7 +22,7 @@ export class TaskSchedulerService {
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(SystemEventService) protected systemEventService: SystemEventService,
@inject(DistributedEventService) protected systemEventService: DistributedEventService,
@inject(MutexService) protected mutexService: MutexService
) {
this.systemEventService.onRedisReady(() => {

View File

@ -1,7 +1,7 @@
import { afterEach, beforeAll, describe, expect, it, jest } from '@jest/globals';
import { container } from '../../../../src/config/index.js';
import { setInternalConfig } from '../../../../src/config/internal-config.js';
import { SystemEventType } from '../../../../src/models/system-event.model.js';
import { DistributedEventType } from '../../../../src/models/distributed-event.model.js';
import { RecordingService } from '../../../../src/services';
import {
expectValidStartRecordingResponse,
@ -68,7 +68,7 @@ describe('Recording API Race Conditions Tests', () => {
try {
// Attempt to start recording
const result = await startRecording(roomData.room.roomId, roomData.moderatorCookie);
expect(eventServiceOffSpy).toHaveBeenCalledWith(SystemEventType.RECORDING_ACTIVE, expect.any(Function));
expect(eventServiceOffSpy).toHaveBeenCalledWith(DistributedEventType.RECORDING_ACTIVE, expect.any(Function));
expect(handleRecordingLockTimeoutSpy).not.toHaveBeenCalledWith(
'', // empty recordingId since it never started
roomData.room.roomId
@ -121,7 +121,7 @@ describe('Recording API Race Conditions Tests', () => {
// Start recording with a short timeout
const result = await startRecording(roomData.room.roomId, roomData.moderatorCookie);
expect(eventServiceOffSpy).toHaveBeenCalledWith(SystemEventType.RECORDING_ACTIVE, expect.any(Function));
expect(eventServiceOffSpy).toHaveBeenCalledWith(DistributedEventType.RECORDING_ACTIVE, expect.any(Function));
// Expect the recording to fail due to timeout
expect(handleTimeoutSpy).toHaveBeenCalledWith(
'', // empty recordingId since it never started