backend: implement batch deletion of rooms
This commit is contained in:
parent
75f4f93946
commit
a718243409
@ -24,6 +24,7 @@ import {
|
||||
OpenViduMeetError
|
||||
} from '../models/error.model.js';
|
||||
import { LoggerService } from './index.js';
|
||||
import { chunkArray } from '../utils/array.utils.js';
|
||||
|
||||
@injectable()
|
||||
export class LiveKitService {
|
||||
@ -145,6 +146,27 @@ export class LiveKitService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes multiple LiveKit rooms in batches to avoid overwhelming the server.
|
||||
*
|
||||
* @param roomNames - Array of room names to delete
|
||||
* @param batchSize - Number of rooms to delete per batch (default: 10)
|
||||
* @returns Promise that resolves when all batches have been processed
|
||||
*/
|
||||
async batchDeleteRooms(roomNames: string[], batchSize = 10): Promise<void> {
|
||||
const batches = chunkArray(roomNames, batchSize);
|
||||
|
||||
for (const batch of batches) {
|
||||
try {
|
||||
await Promise.allSettled(batch.map((roomId) => this.deleteRoom(roomId)));
|
||||
this.logger.debug(`Deleted LiveKit batch: ${batch.join(', ')}`);
|
||||
} catch (error) {
|
||||
this.logger.warn(`Error deleting LiveKit batch ${batch.join(', ')}: ${error}`);
|
||||
// Continue with next batch even if this one fails
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves information about a specific participant in a LiveKit room.
|
||||
*
|
||||
|
||||
@ -204,50 +204,26 @@ export class RoomService {
|
||||
forceDelete: boolean
|
||||
): Promise<{ deleted: string[]; markedForDeletion: string[] }> {
|
||||
try {
|
||||
const results = await Promise.allSettled(
|
||||
roomIds.map(async (roomId) => {
|
||||
const hasParticipants = await this.livekitService.roomHasParticipants(roomId);
|
||||
const shouldDelete = forceDelete || !hasParticipants;
|
||||
this.logger.info(`Starting bulk deletion of ${roomIds.length} rooms (forceDelete: ${forceDelete})`);
|
||||
|
||||
if (shouldDelete) {
|
||||
this.logger.verbose(`Deleting room ${roomId}.`);
|
||||
// Classify rooms into those to delete and those to mark for deletion
|
||||
const { toDelete, toMark } = await this.classifyRoomsForDeletion(roomIds, forceDelete);
|
||||
|
||||
await Promise.all([
|
||||
this.storageService.deleteMeetRooms([roomId]),
|
||||
this.livekitService.deleteRoom(roomId)
|
||||
]);
|
||||
// Process each group in parallel
|
||||
|
||||
return { roomId, status: 'deleted' } as const;
|
||||
}
|
||||
const [deletedRooms, markedRooms] = await Promise.all([
|
||||
this.batchDeleteRooms(toDelete),
|
||||
this.batchMarkRoomsForDeletion(toMark)
|
||||
]);
|
||||
|
||||
this.logger.verbose(`Room ${roomId} has participants. Marking as deleted (graceful deletion).`);
|
||||
// Mark the room as deleted in the storage system. They will be deleted later when all participants leave.
|
||||
await this.markRoomAsDeleted(roomId);
|
||||
return { roomId, status: 'marked' } as const;
|
||||
})
|
||||
this.logger.info(
|
||||
`Bulk deletion completed: ${deletedRooms.length} deleted, ${markedRooms.length} marked for deletion`
|
||||
);
|
||||
|
||||
const deleted: string[] = [];
|
||||
const markedForDeletion: string[] = [];
|
||||
|
||||
results.forEach((result) => {
|
||||
if (result.status === 'fulfilled') {
|
||||
if (result.value.status === 'deleted') {
|
||||
deleted.push(result.value.roomId);
|
||||
} else if (result.value.status === 'marked') {
|
||||
markedForDeletion.push(result.value.roomId);
|
||||
}
|
||||
} else {
|
||||
this.logger.error(`Failed to process deletion for a room: ${result.reason}`);
|
||||
}
|
||||
});
|
||||
|
||||
if (deleted.length === 0 && markedForDeletion.length === 0) {
|
||||
this.logger.error('No rooms were deleted or marked as deleted.');
|
||||
throw internalError('while deleting rooms. No rooms were deleted or marked as deleted.');
|
||||
}
|
||||
|
||||
return { deleted, markedForDeletion };
|
||||
return {
|
||||
deleted: deletedRooms,
|
||||
markedForDeletion: markedRooms
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error('Error deleting rooms:', error);
|
||||
throw error;
|
||||
@ -381,6 +357,130 @@ export class RoomService {
|
||||
await this.livekitService.sendData(roomId, rawData, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Classifies rooms into those that should be deleted immediately vs marked for deletion
|
||||
*/
|
||||
protected async classifyRoomsForDeletion(
|
||||
roomIds: string[],
|
||||
forceDelete: boolean
|
||||
): Promise<{ toDelete: string[]; toMark: string[] }> {
|
||||
this.logger.debug(`Classifying ${roomIds.length} rooms for deletion strategy`);
|
||||
|
||||
// Check all rooms in parallel
|
||||
const classificationResults = await Promise.allSettled(
|
||||
roomIds.map(async (roomId) => {
|
||||
try {
|
||||
const hasParticipants = await this.livekitService.roomHasParticipants(roomId);
|
||||
const shouldDelete = forceDelete || !hasParticipants;
|
||||
|
||||
return {
|
||||
roomId,
|
||||
action: shouldDelete ? 'delete' : 'mark'
|
||||
} as const;
|
||||
} catch (error) {
|
||||
this.logger.warn(`Failed to check participants for room ${roomId}: ${error}`);
|
||||
// Default to marking for deletion if we can't check participants
|
||||
return {
|
||||
roomId,
|
||||
action: 'mark'
|
||||
} as const;
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
// Group results
|
||||
const toDelete: string[] = [];
|
||||
const toMark: string[] = [];
|
||||
|
||||
classificationResults.forEach((result, index) => {
|
||||
if (result.status === 'fulfilled') {
|
||||
if (result.value.action === 'delete') {
|
||||
toDelete.push(result.value.roomId);
|
||||
} else {
|
||||
toMark.push(result.value.roomId);
|
||||
}
|
||||
} else {
|
||||
this.logger.warn(`Failed to classify room ${roomIds[index]}: ${result.reason}`);
|
||||
// Default to marking for deletion
|
||||
toMark.push(roomIds[index]);
|
||||
}
|
||||
});
|
||||
|
||||
this.logger.debug(`Classification complete: ${toDelete.length} to delete, ${toMark.length} to mark`);
|
||||
return { toDelete, toMark };
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs batch deletion of rooms that can be deleted immediately
|
||||
*/
|
||||
protected async batchDeleteRooms(roomIds: string[]): Promise<string[]> {
|
||||
if (roomIds.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
this.logger.info(`Batch deleting ${roomIds.length} rooms`);
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
this.storageService.deleteMeetRooms(roomIds),
|
||||
this.livekitService.batchDeleteRooms(roomIds)
|
||||
]);
|
||||
|
||||
return roomIds;
|
||||
} catch (error) {
|
||||
this.logger.error(`Batch deletion failed for rooms: ${roomIds.join(', ')}`, error);
|
||||
throw internalError('Failed to delete rooms');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks multiple rooms for deletion in batch
|
||||
*/
|
||||
private async batchMarkRoomsForDeletion(roomIds: string[]): Promise<string[]> {
|
||||
if (roomIds.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
this.logger.info(`Batch marking ${roomIds.length} rooms for deletion`);
|
||||
|
||||
try {
|
||||
// Get all rooms in parallel
|
||||
const roomResults = await Promise.allSettled(
|
||||
roomIds.map((roomId) => this.storageService.getMeetRoom(roomId))
|
||||
);
|
||||
|
||||
// Prepare rooms for batch update
|
||||
const roomsToUpdate: {roomId: string; room: MeetRoom}[] = [];
|
||||
const successfulRoomIds: string[] = [];
|
||||
|
||||
roomResults.forEach((result, index) => {
|
||||
const roomId = roomIds[index];
|
||||
|
||||
if (result.status === 'fulfilled' && result.value) {
|
||||
const room = result.value;
|
||||
room.markedForDeletion = true;
|
||||
roomsToUpdate.push({roomId, room});
|
||||
successfulRoomIds.push(roomId);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Failed to get room ${roomId} for marking: ${result.status === 'rejected' ? result.reason : 'Room not found'}`
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
// Batch save all updated rooms
|
||||
if (roomsToUpdate.length > 0) {
|
||||
await Promise.allSettled(roomsToUpdate.map(({room}) => this.storageService.saveMeetRoom(room)));
|
||||
}
|
||||
|
||||
this.logger.info(`Successfully marked ${successfulRoomIds.length} rooms for deletion`);
|
||||
return successfulRoomIds;
|
||||
} catch (error) {
|
||||
this.logger.error(`Batch marking failed for rooms: ${roomIds.join(', ')}`, error);
|
||||
throw internalError('Failed to mark rooms for deletion');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gracefully deletes expired rooms.
|
||||
*
|
||||
|
||||
24
backend/src/utils/array.utils.ts
Normal file
24
backend/src/utils/array.utils.ts
Normal file
@ -0,0 +1,24 @@
|
||||
/**
|
||||
* Splits an array into smaller arrays (chunks) of a specified size.
|
||||
*
|
||||
* @template T - The type of elements in the array
|
||||
* @param array - The array to be split into chunks
|
||||
* @param size - The maximum size of each chunk
|
||||
* @returns An array of arrays, where each sub-array contains at most `size` elements
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const numbers = [1, 2, 3, 4, 5, 6, 7];
|
||||
* const chunks = chunkArray(numbers, 3);
|
||||
* // Result: [[1, 2, 3], [4, 5, 6], [7]]
|
||||
* ```
|
||||
*/
|
||||
export const chunkArray = <T>(array: T[], size: number): T[][] => {
|
||||
const chunks: T[][] = [];
|
||||
|
||||
for (let i = 0; i < array.length; i += size) {
|
||||
chunks.push(array.slice(i, i + size));
|
||||
}
|
||||
|
||||
return chunks;
|
||||
};
|
||||
Loading…
x
Reference in New Issue
Block a user