diff --git a/backend/src/controllers/livekit-webhook.controller.ts b/backend/src/controllers/livekit-webhook.controller.ts index fb71ae5..f18c2f9 100644 --- a/backend/src/controllers/livekit-webhook.controller.ts +++ b/backend/src/controllers/livekit-webhook.controller.ts @@ -9,10 +9,16 @@ export const lkWebhookHandler = async (req: Request, res: Response) => { try { const lkWebhookService = container.get(LivekitWebhookService); - const webhookEvent: WebhookEvent = await lkWebhookService.getEventFromWebhook( + const webhookEvent: WebhookEvent | undefined = await lkWebhookService.getEventFromWebhook( req.body, req.get('Authorization')! ); + + if (!webhookEvent) { + logger.debug(`Webhook processing skipped: May another instance is processing it`); + return res.status(200).send(); + } + const { event: eventType, egressInfo, room, participant } = webhookEvent; const belongsToOpenViduMeet = await lkWebhookService.webhookEventBelongsToOpenViduMeet(webhookEvent); diff --git a/backend/src/helpers/redis.helper.ts b/backend/src/helpers/redis.helper.ts index fc8fd8e..12a7ca4 100644 --- a/backend/src/helpers/redis.helper.ts +++ b/backend/src/helpers/redis.helper.ts @@ -1,3 +1,4 @@ +import { WebhookEvent } from 'livekit-server-sdk'; import { RedisLockName, RedisLockPrefix } from '../models/redis.model.js'; export class MeetLock { @@ -36,4 +37,12 @@ export class MeetLock { static getGlobalPreferencesLock(): string { return `${RedisLockPrefix.BASE}${RedisLockName.GLOBAL_PREFERENCES}`; } + + static getWebhookLock(webhookEvent: WebhookEvent) { + if (!webhookEvent || !webhookEvent.event) { + throw new Error('event must be a non-empty string'); + } + + return `${RedisLockPrefix.BASE}${RedisLockName.WEBHOOK}_${webhookEvent.event}_${webhookEvent.id}`; + } } diff --git a/backend/src/models/redis.model.ts b/backend/src/models/redis.model.ts index 6327a98..3b4d7b3 100644 --- a/backend/src/models/redis.model.ts +++ b/backend/src/models/redis.model.ts @@ -21,5 +21,6 @@ export const enum RedisLockName { ROOM_GARBAGE_COLLECTOR = 'room_garbage_collector', RECORDING_ACTIVE = 'recording_active', SCHEDULED_TASK = 'scheduled_task', - GLOBAL_PREFERENCES = 'global_preferences' + GLOBAL_PREFERENCES = 'global_preferences', + WEBHOOK = 'webhook' } diff --git a/backend/src/services/livekit-webhook.service.ts b/backend/src/services/livekit-webhook.service.ts index 174317d..3d35b1e 100644 --- a/backend/src/services/livekit-webhook.service.ts +++ b/backend/src/services/livekit-webhook.service.ts @@ -2,7 +2,7 @@ import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce'; import { inject, injectable } from 'inversify'; import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk'; import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET } from '../environment.js'; -import { MeetRoomHelper, RecordingHelper } from '../helpers/index.js'; +import { MeetLock, MeetRoomHelper, RecordingHelper } from '../helpers/index.js'; import { DistributedEventType } from '../models/distributed-event.model.js'; import { LiveKitService, @@ -15,6 +15,7 @@ import { DistributedEventService } from './index.js'; import { FrontendEventService } from './frontend-event.service.js'; +import ms from 'ms'; @injectable() export class LivekitWebhookService { @@ -39,9 +40,14 @@ export class LivekitWebhookService { * @param auth - The authentication token for verifying the webhook request. * @returns The WebhookEvent extracted from the request body. */ - async getEventFromWebhook(body: string, auth?: string): Promise { + async getEventFromWebhook(body: string, auth?: string): Promise { try { - return await this.webhookReceiver.receive(body, auth); + const webhookEvent = await this.webhookReceiver.receive(body, auth); + const lock = await this.mutexService.acquire(MeetLock.getWebhookLock(webhookEvent), ms('5s')); + + if (!lock) return undefined; + + return webhookEvent; } catch (error) { this.logger.error('Error receiving webhook event', error); throw error; diff --git a/backend/src/services/mutex.service.ts b/backend/src/services/mutex.service.ts index e625a6f..fab876e 100644 --- a/backend/src/services/mutex.service.ts +++ b/backend/src/services/mutex.service.ts @@ -20,9 +20,12 @@ export class MutexService { /** * Acquires a lock for the specified resource. + * This method uses the Redlock library to acquire a distributed lock on a resource identified by the key. + * The request will return null if the lock cannot be acquired. + * * @param key The resource to acquire a lock for. * @param ttl The time-to-live (TTL) for the lock in milliseconds. Defaults to the TTL value of the MutexService. - * @returns A Promise that resolves to the acquired Lock object. + * @returns A Promise that resolves to the acquired Lock object. If the lock cannot be acquired, it resolves to null. */ async acquire(key: string, ttl: number = this.TTL_MS): Promise { const registryKey = MeetLock.getRegistryLock(key);