backend: Implement room status validation and cleanup for active meetings
This commit is contained in:
parent
a3d4fda6ae
commit
2761e68dd8
@ -30,7 +30,7 @@ export const INTERNAL_CONFIG = {
|
||||
|
||||
// Timing and cleanup settings for room lifecycle management
|
||||
ROOM_EXPIRED_GC_INTERVAL: '1h' as StringValue, // Interval for processing and deleting expired rooms
|
||||
ROOM_INACTIVE_MEETING_GC_INTERVAL: '15m' as StringValue, // Interval for cleaning up active meetings in rooms that are no longer active
|
||||
ROOM_ACTIVE_VERIFICATION_GC_INTERVAL: '15m' as StringValue, // Interval for checking room 'active_meeting' status consistency
|
||||
|
||||
// Timing and cleanup settings for recording lifecycle management
|
||||
RECORDING_STARTED_TIMEOUT: '20s' as StringValue, // Timeout for recording to be marked as started
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { MeetRoom } from '@openvidu-meet/typings';
|
||||
import { MeetRoom, MeetRoomStatus } from '@openvidu-meet/typings';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { MeetRoomDocument, MeetRoomModel } from '../models/mongoose-schemas/room.schema.js';
|
||||
import { LoggerService } from '../services/logger.service.js';
|
||||
@ -134,6 +134,18 @@ export class RoomRepository<TRoom extends MeetRoom = MeetRoom> extends BaseRepos
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all rooms with active meetings.
|
||||
* Returns all active rooms without pagination.
|
||||
*
|
||||
* @returns Array of active rooms with enriched URLs
|
||||
*/
|
||||
async findActiveRooms(): Promise<TRoom[]> {
|
||||
return await this.findAll({
|
||||
status: MeetRoomStatus.ACTIVE_MEETING
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a room by its roomId.
|
||||
*
|
||||
@ -165,7 +177,7 @@ export class RoomRepository<TRoom extends MeetRoom = MeetRoom> extends BaseRepos
|
||||
* Counts the number of rooms with active meetings.
|
||||
*/
|
||||
async countActiveRooms(): Promise<number> {
|
||||
return await this.count({ status: 'active_meeting' });
|
||||
return await this.count({ status: MeetRoomStatus.ACTIVE_MEETING });
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { Room } from 'livekit-server-sdk';
|
||||
import { INTERNAL_CONFIG } from '../config/internal-config.js';
|
||||
import { IScheduledTask } from '../models/task-scheduler.model.js';
|
||||
import { RoomRepository } from '../repositories/room.repository.js';
|
||||
import { LivekitWebhookService } from './livekit-webhook.service.js';
|
||||
import { LiveKitService } from './livekit.service.js';
|
||||
import { LoggerService } from './logger.service.js';
|
||||
import { RoomService } from './room.service.js';
|
||||
import { TaskSchedulerService } from './task-scheduler.service.js';
|
||||
@ -18,7 +21,9 @@ export class RoomScheduledTasksService {
|
||||
@inject(LoggerService) protected logger: LoggerService,
|
||||
@inject(RoomRepository) protected roomRepository: RoomRepository,
|
||||
@inject(RoomService) protected roomService: RoomService,
|
||||
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService
|
||||
@inject(TaskSchedulerService) protected taskSchedulerService: TaskSchedulerService,
|
||||
@inject(LiveKitService) protected livekitService: LiveKitService,
|
||||
@inject(LivekitWebhookService) protected livekitWebhookService: LivekitWebhookService
|
||||
) {
|
||||
this.registerScheduledTasks();
|
||||
}
|
||||
@ -34,6 +39,14 @@ export class RoomScheduledTasksService {
|
||||
callback: this.deleteExpiredRooms.bind(this)
|
||||
};
|
||||
this.taskSchedulerService.registerTask(expiredRoomsGCTask);
|
||||
|
||||
const validateRoomsStatusGCTask: IScheduledTask = {
|
||||
name: 'validateRoomsStatusGC',
|
||||
type: 'cron',
|
||||
scheduleOrDelay: INTERNAL_CONFIG.ROOM_ACTIVE_VERIFICATION_GC_INTERVAL,
|
||||
callback: this.validateRoomsStatusGC.bind(this)
|
||||
};
|
||||
this.taskSchedulerService.registerTask(validateRoomsStatusGCTask);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -61,4 +74,36 @@ export class RoomScheduledTasksService {
|
||||
this.logger.error('Error deleting expired rooms:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks for inconsistent rooms.
|
||||
*
|
||||
* This method checks for rooms that are marked as active in the database but do not exist in LiveKit.
|
||||
* If such a room is found, it triggers the room finished logic to clean up the room.
|
||||
*/
|
||||
protected async validateRoomsStatusGC(): Promise<void> {
|
||||
this.logger.verbose(`Checking inconsistent rooms at ${new Date(Date.now()).toISOString()}`);
|
||||
|
||||
try {
|
||||
const activeRooms = await this.roomRepository.findActiveRooms();
|
||||
|
||||
if (activeRooms.length === 0) {
|
||||
this.logger.verbose(`No active rooms found. Skipping room consistency check.`);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const room of activeRooms) {
|
||||
const roomExists = await this.livekitService.roomExists(room.roomId);
|
||||
|
||||
if (!roomExists) {
|
||||
this.logger.warn(
|
||||
`Room '${room.roomId}' is active in DB but does not exist in LiveKit. Cleaning up...`
|
||||
);
|
||||
await this.livekitWebhookService.handleRoomFinished({ name: room.roomId } as unknown as Room);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Error checking inconsistent rooms:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -364,6 +364,21 @@ export const runExpiredRoomsGC = async () => {
|
||||
await sleep('1s');
|
||||
};
|
||||
|
||||
/**
|
||||
* Runs the inconsistent rooms garbage collector.
|
||||
*
|
||||
* This function retrieves the RoomScheduledTasksService from the dependency injection container
|
||||
* and calls its checkInconsistentRooms method to clean up inconsistent rooms.
|
||||
* It then waits for 1 second before completing.
|
||||
*/
|
||||
export const executeRoomStatusValidationGC = async () => {
|
||||
checkAppIsRunning();
|
||||
|
||||
const roomTaskScheduler = container.get(RoomScheduledTasksService);
|
||||
await (roomTaskScheduler as any)['validateRoomsStatusGC']();
|
||||
await sleep('1s');
|
||||
};
|
||||
|
||||
export const runReleaseActiveRecordingLock = async (roomId: string) => {
|
||||
checkAppIsRunning();
|
||||
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
import { MeetRoom, MeetRoomConfig } from '@openvidu-meet/typings';
|
||||
import { MeetRoomConfig } from '@openvidu-meet/typings';
|
||||
import express, { Request, Response } from 'express';
|
||||
import http from 'http';
|
||||
import { StringValue } from 'ms';
|
||||
import { MeetRoomHelper } from '../../src/helpers/room.helper';
|
||||
import { RoomData, TestContext } from '../interfaces/scenarios';
|
||||
import { expectValidStartRecordingResponse } from './assertion-helpers';
|
||||
import {
|
||||
createRoom,
|
||||
@ -15,21 +16,6 @@ import {
|
||||
|
||||
let mockWebhookServer: http.Server;
|
||||
|
||||
export interface RoomData {
|
||||
room: MeetRoom;
|
||||
moderatorSecret: string;
|
||||
moderatorToken: string;
|
||||
speakerSecret: string;
|
||||
speakerToken: string;
|
||||
recordingId?: string;
|
||||
}
|
||||
|
||||
export interface TestContext {
|
||||
rooms: RoomData[];
|
||||
getRoomByIndex(index: number): RoomData | undefined;
|
||||
getLastRoom(): RoomData | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a single room with optional participant.
|
||||
*
|
||||
|
||||
@ -0,0 +1,186 @@
|
||||
import { afterAll, beforeAll, describe, expect, it, jest } from '@jest/globals';
|
||||
import { MeetRoomStatus } from '@openvidu-meet/typings';
|
||||
import { container } from '../../../../src/config/dependency-injector.config.js';
|
||||
import { RoomRepository } from '../../../../src/repositories/room.repository.js';
|
||||
import { LiveKitService } from '../../../../src/services/livekit.service.js';
|
||||
import {
|
||||
createRoom,
|
||||
deleteAllRecordings,
|
||||
deleteAllRooms,
|
||||
disconnectFakeParticipants,
|
||||
executeRoomStatusValidationGC,
|
||||
getRoom,
|
||||
startTestServer
|
||||
} from '../../../helpers/request-helpers.js';
|
||||
|
||||
describe('Active Rooms Status GC Tests', () => {
|
||||
let liveKitService: LiveKitService;
|
||||
let roomRepository: RoomRepository;
|
||||
|
||||
beforeAll(async () => {
|
||||
await startTestServer();
|
||||
liveKitService = container.get(LiveKitService);
|
||||
roomRepository = container.get(RoomRepository);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await disconnectFakeParticipants();
|
||||
await deleteAllRooms();
|
||||
await deleteAllRecordings();
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('should open an active room if it does not exist in LiveKit', async () => {
|
||||
const createdRoom = await createRoom({
|
||||
roomName: 'test-active-status-gc'
|
||||
});
|
||||
|
||||
// Force status to ACTIVE_MEETING directly in DB
|
||||
const room = await roomRepository.findByRoomId(createdRoom.roomId);
|
||||
|
||||
|
||||
if (room) {
|
||||
room.status = MeetRoomStatus.ACTIVE_MEETING;
|
||||
await roomRepository.update(room);
|
||||
}
|
||||
|
||||
let response = await getRoom(createdRoom.roomId);
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.body.status).toBe(MeetRoomStatus.ACTIVE_MEETING);
|
||||
|
||||
// Mock LiveKitService.roomExists to return false
|
||||
const roomExistsSpy = jest.spyOn(liveKitService, 'roomExists').mockResolvedValue(false);
|
||||
|
||||
await executeRoomStatusValidationGC();
|
||||
|
||||
response = await getRoom(createdRoom.roomId);
|
||||
expect(response.status).toBe(200);
|
||||
// Should be OPEN because default meetingEndAction is NONE
|
||||
expect(response.body.status).toBe(MeetRoomStatus.OPEN);
|
||||
|
||||
roomExistsSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should not touch an active room if it exists in LiveKit', async () => {
|
||||
const createdRoom = await createRoom({
|
||||
roomName: 'test-consistent-gc'
|
||||
});
|
||||
|
||||
// Force status to ACTIVE_MEETING directly in DB
|
||||
const room = await roomRepository.findByRoomId(createdRoom.roomId);
|
||||
|
||||
if (room) {
|
||||
room.status = MeetRoomStatus.ACTIVE_MEETING;
|
||||
await roomRepository.update(room);
|
||||
}
|
||||
|
||||
const roomExistsSpy = jest.spyOn(liveKitService, 'roomExists').mockResolvedValue(true);
|
||||
|
||||
await executeRoomStatusValidationGC();
|
||||
|
||||
const response = await getRoom(createdRoom.roomId);
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.body.status).toBe(MeetRoomStatus.ACTIVE_MEETING);
|
||||
|
||||
roomExistsSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should not run the GC if no active rooms exist', async () => {
|
||||
// Ensure DB is clean
|
||||
await deleteAllRooms();
|
||||
|
||||
// Spy on LiveKitService.roomExists to ensure it's not called
|
||||
const roomExistsSpy = jest.spyOn(liveKitService, 'roomExists');
|
||||
|
||||
// Clear any previous calls that could have been recorded by earlier test runs
|
||||
roomExistsSpy.mockClear();
|
||||
|
||||
// Run GC - it should complete without throwing even when DB has no active rooms
|
||||
await expect(executeRoomStatusValidationGC()).resolves.not.toThrow();
|
||||
|
||||
roomExistsSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should handle errors when checking room existence in LiveKit', async () => {
|
||||
const createdRoom = await createRoom({ roomName: 'test-livekit-error-gc' });
|
||||
|
||||
// Force status to ACTIVE_MEETING directly in DB
|
||||
const room = await roomRepository.findByRoomId(createdRoom.roomId);
|
||||
|
||||
if (room) {
|
||||
room.status = MeetRoomStatus.ACTIVE_MEETING;
|
||||
await roomRepository.update(room);
|
||||
}
|
||||
|
||||
// Mock LiveKitService.roomExists to throw an error
|
||||
const roomExistsSpy = jest.spyOn(liveKitService, 'roomExists').mockRejectedValue(new Error('LiveKit down'));
|
||||
|
||||
// Run GC - it should catch the error and continue without throwing
|
||||
await expect(executeRoomStatusValidationGC()).resolves.not.toThrow();
|
||||
|
||||
// Room should remain ACTIVE_MEETING because we couldn't confirm its absence
|
||||
const response = await getRoom(createdRoom.roomId);
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.body.status).toBe(MeetRoomStatus.ACTIVE_MEETING);
|
||||
|
||||
roomExistsSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should not affect rooms that are not in ACTIVE_MEETING status', async () => {
|
||||
const createdRoom = await createRoom({ roomName: 'test-not-active-gc' });
|
||||
|
||||
// Ensure room is OPEN (default) and not ACTIVE_MEETING
|
||||
const response1 = await getRoom(createdRoom.roomId);
|
||||
expect(response1.status).toBe(200);
|
||||
expect(response1.body.status).not.toBe(MeetRoomStatus.ACTIVE_MEETING);
|
||||
|
||||
// Spy on LiveKitService.roomExists to ensure GC won't query rooms that aren't active
|
||||
const roomExistsSpy = jest.spyOn(liveKitService, 'roomExists');
|
||||
|
||||
await executeRoomStatusValidationGC();
|
||||
|
||||
// Since there are no ACTIVE_MEETING rooms, roomExists should not be called for this room
|
||||
// (it may be called for other test artifacts, so we just assert we didn't change the status)
|
||||
const response2 = await getRoom(createdRoom.roomId);
|
||||
expect(response2.status).toBe(200);
|
||||
expect(response2.body.status).toBe(response1.body.status);
|
||||
|
||||
roomExistsSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should handle multiple inconsistent rooms correctly', async () => {
|
||||
// Create two rooms and force them to ACTIVE_MEETING
|
||||
const r1 = await createRoom({ roomName: 'test-multi-inconsistent-1' });
|
||||
const r2 = await createRoom({ roomName: 'test-multi-inconsistent-2' });
|
||||
|
||||
const room1 = await roomRepository.findByRoomId(r1.roomId);
|
||||
const room2 = await roomRepository.findByRoomId(r2.roomId);
|
||||
|
||||
if (room1) {
|
||||
room1.status = MeetRoomStatus.ACTIVE_MEETING;
|
||||
await roomRepository.update(room1);
|
||||
}
|
||||
|
||||
if (room2) {
|
||||
room2.status = MeetRoomStatus.ACTIVE_MEETING;
|
||||
await roomRepository.update(room2);
|
||||
}
|
||||
|
||||
// Mock LiveKitService.roomExists to return false for both rooms
|
||||
const roomExistsSpy = jest.spyOn(liveKitService, 'roomExists').mockResolvedValue(false);
|
||||
|
||||
await executeRoomStatusValidationGC();
|
||||
|
||||
const resp1 = await getRoom(r1.roomId);
|
||||
const resp2 = await getRoom(r2.roomId);
|
||||
|
||||
expect(resp1.status).toBe(200);
|
||||
expect(resp2.status).toBe(200);
|
||||
|
||||
// Both should have been closed (status no longer ACTIVE_MEETING)
|
||||
expect(resp1.body.status).not.toBe(MeetRoomStatus.ACTIVE_MEETING);
|
||||
expect(resp2.body.status).not.toBe(MeetRoomStatus.ACTIVE_MEETING);
|
||||
|
||||
roomExistsSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
16
meet-ce/backend/tests/interfaces/scenarios.ts
Normal file
16
meet-ce/backend/tests/interfaces/scenarios.ts
Normal file
@ -0,0 +1,16 @@
|
||||
import { MeetRoom } from '@openvidu-meet/typings';
|
||||
|
||||
export interface RoomData {
|
||||
room: MeetRoom;
|
||||
moderatorSecret: string;
|
||||
moderatorToken: string;
|
||||
speakerSecret: string;
|
||||
speakerToken: string;
|
||||
recordingId?: string;
|
||||
}
|
||||
|
||||
export interface TestContext {
|
||||
rooms: RoomData[];
|
||||
getRoomByIndex(index: number): RoomData | undefined;
|
||||
getLastRoom(): RoomData | undefined;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user