backend: enhance webhook handling with locking mechanism and error handling

This commit is contained in:
Carlos Santos 2025-07-14 14:38:58 +02:00
parent bbd3c274c4
commit dc268a436c
5 changed files with 31 additions and 6 deletions

View File

@ -9,10 +9,16 @@ export const lkWebhookHandler = async (req: Request, res: Response) => {
try {
const lkWebhookService = container.get(LivekitWebhookService);
const webhookEvent: WebhookEvent = await lkWebhookService.getEventFromWebhook(
const webhookEvent: WebhookEvent | undefined = await lkWebhookService.getEventFromWebhook(
req.body,
req.get('Authorization')!
);
if (!webhookEvent) {
logger.debug(`Webhook processing skipped: May another instance is processing it`);
return res.status(200).send();
}
const { event: eventType, egressInfo, room, participant } = webhookEvent;
const belongsToOpenViduMeet = await lkWebhookService.webhookEventBelongsToOpenViduMeet(webhookEvent);

View File

@ -1,3 +1,4 @@
import { WebhookEvent } from 'livekit-server-sdk';
import { RedisLockName, RedisLockPrefix } from '../models/redis.model.js';
export class MeetLock {
@ -36,4 +37,12 @@ export class MeetLock {
static getGlobalPreferencesLock(): string {
return `${RedisLockPrefix.BASE}${RedisLockName.GLOBAL_PREFERENCES}`;
}
static getWebhookLock(webhookEvent: WebhookEvent) {
if (!webhookEvent || !webhookEvent.event) {
throw new Error('event must be a non-empty string');
}
return `${RedisLockPrefix.BASE}${RedisLockName.WEBHOOK}_${webhookEvent.event}_${webhookEvent.id}`;
}
}

View File

@ -21,5 +21,6 @@ export const enum RedisLockName {
ROOM_GARBAGE_COLLECTOR = 'room_garbage_collector',
RECORDING_ACTIVE = 'recording_active',
SCHEDULED_TASK = 'scheduled_task',
GLOBAL_PREFERENCES = 'global_preferences'
GLOBAL_PREFERENCES = 'global_preferences',
WEBHOOK = 'webhook'
}

View File

@ -2,7 +2,7 @@ import { MeetRecordingInfo, MeetRecordingStatus } from '@typings-ce';
import { inject, injectable } from 'inversify';
import { EgressInfo, ParticipantInfo, Room, WebhookEvent, WebhookReceiver } from 'livekit-server-sdk';
import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET } from '../environment.js';
import { MeetRoomHelper, RecordingHelper } from '../helpers/index.js';
import { MeetLock, MeetRoomHelper, RecordingHelper } from '../helpers/index.js';
import { DistributedEventType } from '../models/distributed-event.model.js';
import {
LiveKitService,
@ -15,6 +15,7 @@ import {
DistributedEventService
} from './index.js';
import { FrontendEventService } from './frontend-event.service.js';
import ms from 'ms';
@injectable()
export class LivekitWebhookService {
@ -39,9 +40,14 @@ export class LivekitWebhookService {
* @param auth - The authentication token for verifying the webhook request.
* @returns The WebhookEvent extracted from the request body.
*/
async getEventFromWebhook(body: string, auth?: string): Promise<WebhookEvent> {
async getEventFromWebhook(body: string, auth?: string): Promise<WebhookEvent | undefined> {
try {
return await this.webhookReceiver.receive(body, auth);
const webhookEvent = await this.webhookReceiver.receive(body, auth);
const lock = await this.mutexService.acquire(MeetLock.getWebhookLock(webhookEvent), ms('5s'));
if (!lock) return undefined;
return webhookEvent;
} catch (error) {
this.logger.error('Error receiving webhook event', error);
throw error;

View File

@ -20,9 +20,12 @@ export class MutexService {
/**
* Acquires a lock for the specified resource.
* This method uses the Redlock library to acquire a distributed lock on a resource identified by the key.
* The request will return null if the lock cannot be acquired.
*
* @param key The resource to acquire a lock for.
* @param ttl The time-to-live (TTL) for the lock in milliseconds. Defaults to the TTL value of the MutexService.
* @returns A Promise that resolves to the acquired Lock object.
* @returns A Promise that resolves to the acquired Lock object. If the lock cannot be acquired, it resolves to null.
*/
async acquire(key: string, ttl: number = this.TTL_MS): Promise<Lock | null> {
const registryKey = MeetLock.getRegistryLock(key);