backend: Add recording request validation middleware and refactor recording metadata handling
This commit is contained in:
parent
4e8c3ebcdf
commit
e69f1dfb4b
@ -156,15 +156,45 @@ paths:
|
||||
For example: "roomName,preferences". Only allowed fields will be returned.
|
||||
schema:
|
||||
type: string
|
||||
- name: page
|
||||
in: query
|
||||
required: false
|
||||
description: The page number for pagination (default is 1).
|
||||
schema:
|
||||
type: integer
|
||||
default: 1
|
||||
- name: limit
|
||||
in: query
|
||||
required: false
|
||||
description: The number of rooms per page (default is 10).
|
||||
schema:
|
||||
type: integer
|
||||
default: 10
|
||||
responses:
|
||||
'200':
|
||||
description: Successfully retrieved the list of OpenVidu Meet rooms
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/OpenViduMeetRoom'
|
||||
type: object
|
||||
properties:
|
||||
rooms:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/OpenViduMeetRoom'
|
||||
pagination:
|
||||
type: object
|
||||
properties:
|
||||
totalItems:
|
||||
type: integer
|
||||
description: Total number of rooms.
|
||||
totalPages:
|
||||
type: integer
|
||||
description: Total number of pages.
|
||||
currentPage:
|
||||
type: integer
|
||||
description: Current page number.
|
||||
|
||||
'401':
|
||||
description: Unauthorized — The API key is missing or invalid
|
||||
content:
|
||||
@ -398,7 +428,7 @@ paths:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/OpenViduMeetRecording'
|
||||
$ref: '#/components/schemas/MeetRecording'
|
||||
'401':
|
||||
description: Unauthorized — The API key is missing or invalid
|
||||
content:
|
||||
@ -469,23 +499,44 @@ paths:
|
||||
in: query
|
||||
required: false
|
||||
description: |
|
||||
The status of the recordings to retrieve. Possible values are "STARTING", "STARTED", "STOPPING", "STOPPED", "FAILED" and "READY".
|
||||
Filter recordings by their status.
|
||||
|
||||
Possible values:
|
||||
- `STARTING`
|
||||
- `ACTIVE`
|
||||
- `ENDING`
|
||||
- `COMPLETE`
|
||||
- `FAILED`
|
||||
- `ABORTED`
|
||||
- `LIMITED_REACHED`
|
||||
|
||||
You can provide multiple statuses as a comma-separated list (e.g., `status=ACTIVE,FAILED`).
|
||||
If not specified, recordings with any status will be returned.
|
||||
|
||||
> ⚠️ **Note:** Using this filter with multiple values or partial matches may impact performance for large datasets.
|
||||
schema:
|
||||
type: string
|
||||
- name: page
|
||||
- name: roomId
|
||||
in: query
|
||||
required: false
|
||||
description: The page number for pagination (default is 1).
|
||||
description: |
|
||||
The unique identifier of the room for which you want to retrieve recordings.
|
||||
If not provided, recordings from all rooms will be returned.
|
||||
schema:
|
||||
type: integer
|
||||
default: 1
|
||||
- name: limit
|
||||
type: string
|
||||
- name: maxItems
|
||||
in: query
|
||||
required: false
|
||||
description: The number of recordings per page (default is 10). Maximum is 100.
|
||||
schema:
|
||||
type: integer
|
||||
default: 10
|
||||
- name: nextPageToken
|
||||
in: query
|
||||
required: false
|
||||
description: The token to retrieve the next page of recordings.
|
||||
schema:
|
||||
type: string
|
||||
responses:
|
||||
'200':
|
||||
description: Successfully retrieved the list of OpenVidu Meet recordings
|
||||
@ -497,19 +548,16 @@ paths:
|
||||
recordings:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/OpenViduMeetRecording'
|
||||
$ref: '#/components/schemas/MeetRecording'
|
||||
pagination:
|
||||
type: object
|
||||
properties:
|
||||
totalItems:
|
||||
type: integer
|
||||
description: Total number of recordings.
|
||||
totalPages:
|
||||
type: integer
|
||||
description: Total number of pages.
|
||||
currentPage:
|
||||
type: integer
|
||||
description: Current page number.
|
||||
isTruncated:
|
||||
type: boolean
|
||||
description: Indicates if there are more recordings to retrieve.
|
||||
nextPageToken:
|
||||
type: string
|
||||
description: The token to retrieve the next page of recordings.
|
||||
|
||||
'401':
|
||||
description: Unauthorized — The API key is missing or invalid
|
||||
@ -642,7 +690,7 @@ paths:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/OpenViduMeetRecording'
|
||||
$ref: '#/components/schemas/MeetRecording'
|
||||
|
||||
'401':
|
||||
description: Unauthorized — The API key is missing or invalid
|
||||
@ -1067,24 +1115,19 @@ components:
|
||||
description: >
|
||||
The URL for the viewer to join the room. The viewer has read-only permissions to watch the room
|
||||
and participants.
|
||||
OpenViduMeetRecording:
|
||||
MeetRecording:
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
recordingId:
|
||||
type: string
|
||||
example: 'recording_123456'
|
||||
example: 'room-123--EG_XYZ--XX445'
|
||||
description: >
|
||||
The unique identifier of the recording.
|
||||
roomName:
|
||||
roomId:
|
||||
type: string
|
||||
example: 'OpenVidu-123456'
|
||||
example: 'room-123'
|
||||
description: >
|
||||
The name of the room where the recording was made.
|
||||
# roomId:
|
||||
# type: string
|
||||
# example: '123456'
|
||||
# description: >
|
||||
# The unique identifier of the room where the recording was made.
|
||||
The ID of the room where the recording was made.
|
||||
outputMode:
|
||||
type: string
|
||||
example: 'COMPOSED'
|
||||
@ -1092,20 +1135,20 @@ components:
|
||||
The output mode of the recording. Possible values are "COMPOSED".
|
||||
status:
|
||||
type: string
|
||||
example: 'READY'
|
||||
example: 'ACTIVE'
|
||||
description: >
|
||||
The status of the recording. Possible values are "STARTING", "STARTED", "STOPPING", "STOPPED", "FAILED" and "READY".
|
||||
The status of the recording. Possible values are "STARTING", "ACTIVE", "ENDING", "COMPLETE", "FAILED", "ABORTED" and "LIMITED_REACHED".
|
||||
filename:
|
||||
type: string
|
||||
example: 'recording_123456.mp4'
|
||||
example: 'room-123--XX445.mp4'
|
||||
description: >
|
||||
The name of the recording file.
|
||||
startedAt:
|
||||
startDate:
|
||||
type: number
|
||||
example: 1620000000000
|
||||
description: >
|
||||
The date when the recording was started in milliseconds since the Unix epoch.
|
||||
endedAt:
|
||||
endDate:
|
||||
type: number
|
||||
example: 1620000000000
|
||||
description: >
|
||||
@ -1120,6 +1163,21 @@ components:
|
||||
example: 1024
|
||||
description: >
|
||||
The size of the recording file in bytes.
|
||||
errorCode:
|
||||
type: number
|
||||
example: 100
|
||||
description: >
|
||||
The error code of the recording.
|
||||
error:
|
||||
type: string
|
||||
example: 'error'
|
||||
description: >
|
||||
The error message of the recording.
|
||||
details:
|
||||
type: string
|
||||
example: 'Stopped using API'
|
||||
description: >
|
||||
Additional details about the recording.
|
||||
Error:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@ -32,13 +32,10 @@ export const getRecordings = async (req: Request, res: Response) => {
|
||||
try {
|
||||
logger.info('Getting all recordings');
|
||||
|
||||
const { status, page, limit } = req.query;
|
||||
const queryParams = req.query;
|
||||
|
||||
// const continuationToken = req.query.continuationToken as string;
|
||||
const response = await recordingService.getAllRecordings();
|
||||
return res
|
||||
.status(200)
|
||||
.json({ recordings: response.recordingInfo, continuationToken: response.continuationToken });
|
||||
const response = await recordingService.getAllRecordings(queryParams);
|
||||
return res.status(200).json(response);
|
||||
} catch (error) {
|
||||
if (error instanceof OpenViduMeetError) {
|
||||
logger.error(`Error getting all recordings: ${error.message}`);
|
||||
@ -58,8 +55,7 @@ export const bulkDeleteRecordings = async (req: Request, res: Response) => {
|
||||
const recordingService = container.get(RecordingService);
|
||||
|
||||
// TODO: Check role to determine if the request is from an admin or a participant
|
||||
const role = req.body.payload.metadata.role;
|
||||
await recordingService.bulkDeleteRecordings(recordingIds, role);
|
||||
await recordingService.bulkDeleteRecordings(recordingIds);
|
||||
|
||||
return res.status(204).json();
|
||||
} catch (error) {
|
||||
@ -121,8 +117,7 @@ export const deleteRecording = async (req: Request, res: Response) => {
|
||||
const recordingService = container.get(RecordingService);
|
||||
|
||||
// TODO: Check role to determine if the request is from an admin or a participant
|
||||
const role = req.body.payload.metadata.role;
|
||||
const recordingInfo = await recordingService.deleteRecording(recordingId, role);
|
||||
const recordingInfo = await recordingService.deleteRecording(recordingId);
|
||||
|
||||
return res.status(204).json(recordingInfo);
|
||||
} catch (error) {
|
||||
|
||||
@ -11,9 +11,10 @@ export class RecordingHelper {
|
||||
const startDateMs = RecordingHelper.extractStartDate(egressInfo);
|
||||
const endDateMs = RecordingHelper.extractEndDate(egressInfo);
|
||||
const filename = RecordingHelper.extractFilename(egressInfo);
|
||||
const uid = RecordingHelper.extractUidFromFilename(filename);
|
||||
const { egressId, roomName, errorCode, error, details } = egressInfo;
|
||||
return {
|
||||
recordingId: egressId,
|
||||
recordingId: `${roomName}--${egressId}--${uid}`,
|
||||
roomId: roomName,
|
||||
outputMode,
|
||||
status,
|
||||
@ -76,16 +77,19 @@ export class RecordingHelper {
|
||||
return MeetRecordingOutputMode.COMPOSED;
|
||||
}
|
||||
|
||||
static extractFilename(recordingInfo: MeetRecordingInfo): string | undefined;
|
||||
/**
|
||||
* Extracts the filename/path for storing the recording.
|
||||
* For EgressInfo, returns the last segment of the fileResults.
|
||||
* For MeetRecordingInfo, returns a combination of roomId and filename.
|
||||
*/
|
||||
static extractFilename(recordingInfo: MeetRecordingInfo): string;
|
||||
|
||||
static extractFilename(egressInfo: EgressInfo): string | undefined;
|
||||
|
||||
static extractFilename(info: MeetRecordingInfo | EgressInfo): string | undefined {
|
||||
if (!info) return undefined;
|
||||
static extractFilename(egressInfo: EgressInfo): string;
|
||||
|
||||
static extractFilename(info: MeetRecordingInfo | EgressInfo): string {
|
||||
if ('request' in info) {
|
||||
// EgressInfo
|
||||
return info.fileResults?.[0]?.filename.split('/').pop();
|
||||
return info.fileResults[0]!.filename.split('/').pop()!;
|
||||
} else {
|
||||
// MeetRecordingInfo
|
||||
const { filename, roomId } = info;
|
||||
@ -94,6 +98,31 @@ export class RecordingHelper {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the UID from the given filename.
|
||||
*
|
||||
* @param filename room-123--{uid}.mp4
|
||||
* @returns
|
||||
*/
|
||||
static extractUidFromFilename(filename: string): string {
|
||||
const uidWithExtension = filename.split('--')[1];
|
||||
return uidWithExtension.split('.')[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the room name, egressId, and UID from the given recordingId.
|
||||
* @param recordingId ${roomId}--${egressId}--${uid}
|
||||
*/
|
||||
static extractInfoFromRecordingId(recordingId: string): { roomId: string; egressId: string; uid: string } {
|
||||
const [roomId, egressId, uid] = recordingId.split('--');
|
||||
|
||||
if (!roomId || !egressId || !uid) {
|
||||
throw new Error(`Invalid recordingId format: ${recordingId}`);
|
||||
}
|
||||
|
||||
return { roomId, egressId, uid };
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the duration from the given egress information.
|
||||
* If the duration is not available, it returns 0.
|
||||
|
||||
@ -4,3 +4,4 @@ export * from './content-type.middleware.js';
|
||||
export * from './openapi.middleware.js';
|
||||
export * from './request-validators/participant-validator.middleware.js';
|
||||
export * from './request-validators/room-validator.middleware.js';
|
||||
export * from './request-validators/recording-validator.middleware.js';
|
||||
|
||||
@ -0,0 +1,80 @@
|
||||
import { Request, Response, NextFunction } from 'express';
|
||||
import { z } from 'zod';
|
||||
|
||||
const RecordingPostRequestSchema = z.object({
|
||||
roomId: z
|
||||
.string()
|
||||
.min(1, { message: 'roomId is required and cannot be empty' })
|
||||
.transform((val) => val.trim().replace(/\s+/g, '-'))
|
||||
});
|
||||
|
||||
const getRecordingsSchema = z.object({
|
||||
maxItems: z.coerce
|
||||
.number()
|
||||
.int()
|
||||
.optional()
|
||||
.transform((val = 10) => (val > 100 ? 100 : val))
|
||||
.default(10),
|
||||
status: z.string().optional(),
|
||||
roomId: z.string().optional(),
|
||||
nextPageToken: z.string().optional()
|
||||
});
|
||||
|
||||
/**
|
||||
* Middleware to validate the recording post request.
|
||||
*
|
||||
* This middleware uses the `RecordingPostRequestSchema` to validate the request body.
|
||||
* If the validation fails, it rejects the request with an error response.
|
||||
* If the validation succeeds, it passes control to the next middleware or route handler.
|
||||
*
|
||||
*/
|
||||
export const withValidRecordingPostRequest = (req: Request, res: Response, next: NextFunction) => {
|
||||
const { success, error } = RecordingPostRequestSchema.safeParse(req.body);
|
||||
|
||||
if (!success) {
|
||||
return rejectRequest(res, error);
|
||||
}
|
||||
|
||||
next();
|
||||
};
|
||||
|
||||
export const withValidGetRecordingsRequest = (req: Request, res: Response, next: NextFunction) => {
|
||||
const { success, error, data } = getRecordingsSchema.safeParse(req.query);
|
||||
|
||||
if (!success) {
|
||||
return rejectRequest(res, error);
|
||||
}
|
||||
|
||||
req.query = {
|
||||
...data,
|
||||
maxItems: data.maxItems?.toString()
|
||||
};
|
||||
next();
|
||||
};
|
||||
|
||||
export const withValidRecordingBulkDeleteRequest = (req: Request, res: Response, next: NextFunction) => {
|
||||
const { success, error } = z
|
||||
.array(z.string().min(1, { message: 'recordingIds must be a non-empty string' }))
|
||||
.safeParse(req.body);
|
||||
|
||||
if (!success) {
|
||||
return rejectRequest(res, error);
|
||||
}
|
||||
|
||||
next();
|
||||
};
|
||||
|
||||
const rejectRequest = (res: Response, error: z.ZodError) => {
|
||||
const errors = error.errors.map((error) => ({
|
||||
field: error.path.join('.'),
|
||||
message: error.message
|
||||
}));
|
||||
|
||||
console.log(errors);
|
||||
|
||||
return res.status(422).json({
|
||||
error: 'Unprocessable Entity',
|
||||
message: 'Invalid request body',
|
||||
details: errors
|
||||
});
|
||||
};
|
||||
@ -150,8 +150,8 @@ export class LivekitWebhookService {
|
||||
this.logger.debug(`Processing recording ${webhookAction} webhook.`);
|
||||
|
||||
const recordingInfo: MeetRecordingInfo = RecordingHelper.toRecordingInfo(egressInfo);
|
||||
const metadataPath = this.generateMetadataPath(recordingInfo);
|
||||
const { roomId, recordingId, status } = recordingInfo;
|
||||
const metadataPath = this.generateMetadataPath(recordingId);
|
||||
|
||||
this.logger.debug(`Recording '${recordingId}' for room '${roomId}' is in status '${status}'`);
|
||||
|
||||
@ -189,10 +189,9 @@ export class LivekitWebhookService {
|
||||
}
|
||||
}
|
||||
|
||||
protected generateMetadataPath(recordingInfo: MeetRecordingInfo): string {
|
||||
const { roomId, recordingId } = recordingInfo;
|
||||
// Remove file extension from filename
|
||||
const recordingFilename = recordingInfo.filename?.split('.')[0];
|
||||
return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${recordingFilename}-${recordingId}.json`;
|
||||
protected generateMetadataPath(recordingId: string): string {
|
||||
const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
|
||||
return `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`;
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,6 +21,13 @@ import { RedisLockName } from '../models/index.js';
|
||||
import ms from 'ms';
|
||||
import { OpenViduComponentsAdapterHelper } from '../helpers/ov-components-adapter.helper.js';
|
||||
|
||||
type GetAllRecordingsParams = {
|
||||
maxItems?: number;
|
||||
nextPageToken?: string;
|
||||
roomId?: string;
|
||||
status?: string;
|
||||
};
|
||||
|
||||
@injectable()
|
||||
export class RecordingService {
|
||||
protected readonly RECORDING_ACTIVE_LOCK_TTL = ms('6h');
|
||||
@ -32,39 +39,41 @@ export class RecordingService {
|
||||
@inject(LoggerService) protected logger: LoggerService
|
||||
) {}
|
||||
|
||||
async startRecording(roomName: string): Promise<MeetRecordingInfo> {
|
||||
async startRecording(roomId: string): Promise<MeetRecordingInfo> {
|
||||
let acquiredLock: RedisLock | null = null;
|
||||
|
||||
try {
|
||||
// Attempt to acquire lock.
|
||||
// Note: using a high TTL to prevent expiration during a long recording.
|
||||
acquiredLock = await this.acquireRoomRecordingActiveLock(roomName);
|
||||
acquiredLock = await this.acquireRoomRecordingActiveLock(roomId);
|
||||
|
||||
if (!acquiredLock) throw errorRecordingAlreadyStarted(roomName);
|
||||
if (!acquiredLock) throw errorRecordingAlreadyStarted(roomId);
|
||||
|
||||
const room = await this.roomService.getOpenViduRoom(roomName);
|
||||
const room = await this.roomService.getOpenViduRoom(roomId);
|
||||
|
||||
if (!room) throw errorRoomNotFound(roomName);
|
||||
if (!room) throw errorRoomNotFound(roomId);
|
||||
|
||||
const options = this.generateCompositeOptionsFromRequest();
|
||||
const output = this.generateFileOutputFromRequest(roomName);
|
||||
const egressInfo = await this.livekitService.startRoomComposite(roomName, output, options);
|
||||
const output = this.generateFileOutputFromRequest(roomId);
|
||||
const egressInfo = await this.livekitService.startRoomComposite(roomId, output, options);
|
||||
|
||||
// Return recording info without releasing the lock here,
|
||||
// as it will be released in handleEgressEnded on successful completion.
|
||||
return RecordingHelper.toRecordingInfo(egressInfo);
|
||||
} catch (error) {
|
||||
this.logger.error(`Error starting recording in room ${roomName}: ${error}`);
|
||||
this.logger.error(`Error starting recording in room ${roomId}: ${error}`);
|
||||
|
||||
if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomName);
|
||||
if (acquiredLock) await this.releaseRoomRecordingActiveLock(roomId);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async stopRecording(egressId: string): Promise<MeetRecordingInfo> {
|
||||
async stopRecording(recordingId: string): Promise<MeetRecordingInfo> {
|
||||
try {
|
||||
const egressArray = await this.livekitService.getActiveEgress(undefined, egressId);
|
||||
const { roomId, egressId } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
|
||||
const egressArray = await this.livekitService.getActiveEgress(roomId, egressId);
|
||||
|
||||
if (egressArray.length === 0) {
|
||||
throw errorRecordingNotFound(egressId);
|
||||
@ -74,121 +83,144 @@ export class RecordingService {
|
||||
|
||||
return RecordingHelper.toRecordingInfo(egressInfo);
|
||||
} catch (error) {
|
||||
this.logger.error(`Error stopping recording ${egressId}: ${error}`);
|
||||
this.logger.error(`Error stopping recording ${recordingId}: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Implement deleteRecording method
|
||||
async deleteRecording(egressId: string, role: string): Promise<MeetRecordingInfo> {
|
||||
try {
|
||||
const { metadataFilePath, recordingInfo } = await this.getMeetRecordingInfoFromMetadata(egressId);
|
||||
|
||||
if (
|
||||
recordingInfo.status === MeetRecordingStatus.STARTING ||
|
||||
recordingInfo.status === MeetRecordingStatus.ACTIVE ||
|
||||
recordingInfo.status === MeetRecordingStatus.ENDING
|
||||
) {
|
||||
throw errorRecordingNotStopped(egressId);
|
||||
}
|
||||
|
||||
const recordingPath = RecordingHelper.extractFilename(recordingInfo);
|
||||
|
||||
if (!recordingPath) throw internalError(`Error extracting path from recording ${egressId}`);
|
||||
|
||||
this.logger.info(`Deleting recording from S3 ${recordingPath}`);
|
||||
|
||||
await Promise.all([
|
||||
this.s3Service.deleteObject(metadataFilePath),
|
||||
this.s3Service.deleteObject(recordingPath)
|
||||
]);
|
||||
|
||||
return recordingInfo;
|
||||
} catch (error) {
|
||||
this.logger.error(`Error deleting recording ${egressId}: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Implement bulkDeleteRecordings method
|
||||
async bulkDeleteRecordings(egressIds: string[], role: string): Promise<MeetRecordingInfo[]> {
|
||||
const promises = egressIds.map((egressId) => this.deleteRecording(egressId, role));
|
||||
return Promise.all(promises);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the list of all recordings.
|
||||
* @returns A promise that resolves to an array of RecordingInfo objects.
|
||||
* Deletes a recording from the S3 bucket based on the provided egress ID.
|
||||
*
|
||||
* The recording is deleted only if it is not in progress state (STARTING, ACTIVE, ENDING).
|
||||
* @param recordingId - The egress ID of the recording.
|
||||
*/
|
||||
//TODO: Implement getAllRecordings method
|
||||
async getAllRecordings(): Promise<{ recordingInfo: MeetRecordingInfo[]; continuationToken?: string }> {
|
||||
async deleteRecording(recordingId: string): Promise<MeetRecordingInfo> {
|
||||
try {
|
||||
const allEgress = await this.s3Service.listObjects('.metadata', '.json');
|
||||
const promises: Promise<MeetRecordingInfo>[] = [];
|
||||
// Get the recording metada and recording info from the S3 bucket
|
||||
const { filesToDelete, recordingInfo } = await this.getDeletableRecordingData(recordingId);
|
||||
|
||||
allEgress.Contents?.forEach((item) => {
|
||||
if (item?.Key?.includes('.json')) {
|
||||
this.logger.verbose(
|
||||
`Deleting recording from S3. Files: ${filesToDelete.join(', ')} for recordingId ${recordingId}`
|
||||
);
|
||||
await this.s3Service.deleteObjects(filesToDelete);
|
||||
this.logger.info(`Deletion successful for recording ${recordingId}`);
|
||||
|
||||
return recordingInfo;
|
||||
} catch (error) {
|
||||
this.logger.error(`Error deleting recording ${recordingId}: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes multiple recordings in bulk from S3.
|
||||
* For each provided egressId, the metadata and recording file are deleted (only if the status is stopped).
|
||||
*
|
||||
* @param egressIds Array of recording identifiers.
|
||||
* @returns An array with the MeetRecordingInfo of the successfully deleted recordings.
|
||||
*/
|
||||
async bulkDeleteRecordings(egressIds: string[]): Promise<MeetRecordingInfo[]> {
|
||||
const keysToDelete: string[] = [];
|
||||
const deletedRecordings: MeetRecordingInfo[] = [];
|
||||
|
||||
for (const egressId of egressIds) {
|
||||
try {
|
||||
const { filesToDelete, recordingInfo } = await this.getDeletableRecordingData(egressId);
|
||||
keysToDelete.push(...filesToDelete);
|
||||
deletedRecordings.push(recordingInfo);
|
||||
this.logger.verbose(`BulkDelete: Prepared recording ${egressId} for deletion.`);
|
||||
} catch (error) {
|
||||
this.logger.error(`BulkDelete: Error processing recording ${egressId}: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (keysToDelete.length > 0) {
|
||||
try {
|
||||
await this.s3Service.deleteObjects(keysToDelete);
|
||||
this.logger.info(`BulkDelete: Successfully deleted ${keysToDelete.length} objects from S3.`);
|
||||
} catch (error) {
|
||||
this.logger.error(`BulkDelete: Error performing bulk deletion: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
} else {
|
||||
this.logger.warn(`BulkDelete: No eligible recordings found for deletion.`);
|
||||
}
|
||||
|
||||
return deletedRecordings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the recording information for a given recording ID.
|
||||
* @param recordingId - The unique identifier of the recording.
|
||||
* @returns A promise that resolves to a MeetRecordingInfo object.
|
||||
*/
|
||||
async getRecording(recordingId: string): Promise<MeetRecordingInfo> {
|
||||
const { recordingInfo } = await this.getMeetRecordingInfoFromMetadata(recordingId);
|
||||
|
||||
return recordingInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a paginated list of all recordings stored in the S3 bucket.
|
||||
*
|
||||
* @param maxItems - The maximum number of items to retrieve in a single request.
|
||||
* @param nextPageToken - (Optional) A token to retrieve the next page of results.
|
||||
* @returns A promise that resolves to an object containing:
|
||||
* - `recordings`: An array of `MeetRecordingInfo` objects representing the recordings.
|
||||
* - `isTruncated`: A boolean indicating whether there are more items to retrieve.
|
||||
* - `nextPageToken`: (Optional) A token to retrieve the next page of results, if available.
|
||||
* @throws Will throw an error if there is an issue retrieving the recordings.
|
||||
*/
|
||||
async getAllRecordings({ maxItems, nextPageToken, roomId, status }: GetAllRecordingsParams): Promise<{
|
||||
recordings: MeetRecordingInfo[];
|
||||
isTruncated: boolean;
|
||||
nextPageToken?: string;
|
||||
}> {
|
||||
try {
|
||||
const roomPrefix = roomId ? `/${roomId}` : '';
|
||||
const { Contents, IsTruncated, NextContinuationToken } = await this.s3Service.listObjectsPaginated(
|
||||
`${MEET_S3_RECORDINGS_PREFIX}/.metadata${roomPrefix}`,
|
||||
maxItems,
|
||||
nextPageToken
|
||||
);
|
||||
|
||||
if (!Contents) {
|
||||
this.logger.verbose('No recordings found. Returning an empty array.');
|
||||
return { recordings: [], isTruncated: false };
|
||||
}
|
||||
|
||||
const promises: Promise<MeetRecordingInfo>[] = [];
|
||||
Contents.forEach((item) => {
|
||||
if (item?.Key && item.Key.endsWith('.json')) {
|
||||
promises.push(this.s3Service.getObjectAsJson(item.Key) as Promise<MeetRecordingInfo>);
|
||||
}
|
||||
});
|
||||
|
||||
return { recordingInfo: await Promise.all(promises), continuationToken: undefined };
|
||||
} catch (error) {
|
||||
this.logger.error(`Error getting recordings: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
let recordings: MeetRecordingInfo[] = await Promise.all(promises);
|
||||
|
||||
/**
|
||||
* Retrieves all recordings for a given room.
|
||||
*
|
||||
* @param roomName - The name of the room.
|
||||
* @param roomId - The ID of the room.
|
||||
* @returns A promise that resolves to an array of MeetRecordingInfo objects.
|
||||
* @throws If there is an error retrieving the recordings.
|
||||
*/
|
||||
//TODO: Implement getAllRecordingsByRoom method
|
||||
async getAllRecordingsByRoom(roomName: string, roomId: string): Promise<MeetRecordingInfo[]> {
|
||||
try {
|
||||
// Get all recordings that match the room name and room ID from the S3 bucket
|
||||
const roomNameSanitized = this.sanitizeRegExp(roomName);
|
||||
const roomIdSanitized = this.sanitizeRegExp(roomId);
|
||||
// Match the room name and room ID in any order
|
||||
const regexPattern = `${roomNameSanitized}.*${roomIdSanitized}|${roomIdSanitized}.*${roomNameSanitized}\\.json`;
|
||||
const metadatagObject = await this.s3Service.listObjects('.metadata', regexPattern);
|
||||
if (status) {
|
||||
// Filter recordings by status
|
||||
const statusArray = status
|
||||
.split(',')
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean)
|
||||
.map((s) => new RegExp(this.sanitizeRegExp(s)));
|
||||
|
||||
if (!metadatagObject.Contents || metadatagObject.Contents.length === 0) {
|
||||
this.logger.verbose(`No recordings found for room ${roomName}. Returning an empty array.`);
|
||||
return [];
|
||||
|
||||
recordings = recordings.filter((recording) =>
|
||||
statusArray.some((regex) => regex.test(recording.status))
|
||||
);
|
||||
}
|
||||
|
||||
const promises: Promise<MeetRecordingInfo>[] = [];
|
||||
metadatagObject.Contents?.forEach((item) => {
|
||||
promises.push(this.s3Service.getObjectAsJson(item.Key!) as Promise<MeetRecordingInfo>);
|
||||
});
|
||||
this.logger.info(`Retrieved ${recordings.length} recordings.`);
|
||||
|
||||
return Promise.all(promises);
|
||||
return { recordings, isTruncated: !!IsTruncated, nextPageToken: NextContinuationToken };
|
||||
} catch (error) {
|
||||
this.logger.error(`Error getting recordings: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: Implement getRecording method
|
||||
async getRecording(egressId: string): Promise<MeetRecordingInfo> {
|
||||
const egressIdSanitized = this.sanitizeRegExp(egressId);
|
||||
const regexPattern = `.*${egressIdSanitized}.*\\.json`;
|
||||
const metadataObject = await this.s3Service.listObjects('.metadata', regexPattern);
|
||||
|
||||
if (!metadataObject.Contents || metadataObject.Contents.length === 0) {
|
||||
throw errorRecordingNotFound(egressId);
|
||||
}
|
||||
|
||||
const recording = (await this.s3Service.getObjectAsJson(metadataObject.Contents[0].Key!)) as MeetRecordingInfo;
|
||||
return recording;
|
||||
// return RecordingHelper.toRecordingInfo(recording);
|
||||
}
|
||||
|
||||
//TODO: Implement getRecordingAsStream method
|
||||
async getRecordingAsStream(
|
||||
recordingId: string,
|
||||
@ -196,7 +228,7 @@ export class RecordingService {
|
||||
): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> {
|
||||
const RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB
|
||||
const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId);
|
||||
const recordingPath = RecordingHelper.extractFilename(recordingInfo);
|
||||
const recordingPath = `${MEET_S3_RECORDINGS_PREFIX}/${RecordingHelper.extractFilename(recordingInfo)}`;
|
||||
|
||||
if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`);
|
||||
|
||||
@ -265,25 +297,49 @@ export class RecordingService {
|
||||
return this.roomService.sendSignal(roomName, payload, options);
|
||||
}
|
||||
|
||||
private async getMeetRecordingInfoFromMetadata(
|
||||
egressId: string
|
||||
/**
|
||||
* Retrieves the data required to delete a recording, including the file paths
|
||||
* to be deleted and the recording's metadata information.
|
||||
*
|
||||
* @param recordingId - The unique identifier of the recording egress.
|
||||
*/
|
||||
protected async getDeletableRecordingData(
|
||||
recordingId: string
|
||||
): Promise<{ filesToDelete: string[]; recordingInfo: MeetRecordingInfo }> {
|
||||
const { metadataFilePath, recordingInfo } = await this.getMeetRecordingInfoFromMetadata(recordingId);
|
||||
|
||||
if (
|
||||
recordingInfo.status === MeetRecordingStatus.STARTING ||
|
||||
recordingInfo.status === MeetRecordingStatus.ACTIVE ||
|
||||
recordingInfo.status === MeetRecordingStatus.ENDING
|
||||
) {
|
||||
throw errorRecordingNotStopped(recordingId);
|
||||
}
|
||||
|
||||
const recordingPath = RecordingHelper.extractFilename(recordingInfo);
|
||||
|
||||
if (!recordingPath) {
|
||||
throw internalError(`Error extracting path from recording ${recordingId}`);
|
||||
}
|
||||
|
||||
return { filesToDelete: [metadataFilePath, recordingPath], recordingInfo };
|
||||
}
|
||||
|
||||
protected async getMeetRecordingInfoFromMetadata(
|
||||
recordingId: string
|
||||
): Promise<{ metadataFilePath: string; recordingInfo: MeetRecordingInfo }> {
|
||||
// Get the recording object from the S3 bucket
|
||||
const metadataObject = await this.s3Service.listObjects('.metadata', `.*${egressId}.*.json`);
|
||||
|
||||
const content = metadataObject.Contents?.[0];
|
||||
|
||||
if (!content) {
|
||||
throw errorRecordingNotFound(egressId);
|
||||
}
|
||||
|
||||
const metadataPath = content.Key;
|
||||
|
||||
if (!metadataPath) {
|
||||
throw errorRecordingNotFound(egressId);
|
||||
}
|
||||
const { roomId, egressId, uid } = RecordingHelper.extractInfoFromRecordingId(recordingId);
|
||||
|
||||
const metadataPath = `${MEET_S3_RECORDINGS_PREFIX}/.metadata/${roomId}/${egressId}/${uid}.json`;
|
||||
this.logger.debug(`Retrieving metadata for recording ${recordingId} from ${metadataPath}`);
|
||||
const recordingInfo = (await this.s3Service.getObjectAsJson(metadataPath)) as MeetRecordingInfo;
|
||||
|
||||
if (!recordingInfo) {
|
||||
throw errorRecordingNotFound(recordingId);
|
||||
}
|
||||
|
||||
this.logger.verbose(`Retrieved metadata for recording ${recordingId} from ${metadataPath}`);
|
||||
|
||||
return { recordingInfo, metadataFilePath: metadataPath };
|
||||
}
|
||||
|
||||
@ -302,12 +358,12 @@ export class RecordingService {
|
||||
* @param fileName - The name of the file (default is 'recording').
|
||||
* @returns The generated file output object.
|
||||
*/
|
||||
private generateFileOutputFromRequest(roomName: string): EncodedFileOutput {
|
||||
private generateFileOutputFromRequest(roomId: string): EncodedFileOutput {
|
||||
// Added unique identifier to the file path for avoiding overwriting
|
||||
const recordingName = `${roomName}-${uid(10)}`;
|
||||
const recordingName = `${roomId}--${uid(10)}`;
|
||||
|
||||
// Generate the file path with the openviud-meet subbucket and the recording prefix
|
||||
const filepath = `${MEET_S3_SUBBUCKET}/${MEET_S3_RECORDINGS_PREFIX}/${roomName}/${recordingName}`;
|
||||
const filepath = `${MEET_S3_SUBBUCKET}/${MEET_S3_RECORDINGS_PREFIX}/${roomId}/${recordingName}`;
|
||||
|
||||
return new EncodedFileOutput({
|
||||
fileType: EncodedFileType.DEFAULT_FILETYPE,
|
||||
@ -316,6 +372,15 @@ export class RecordingService {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Escapes special characters in a string to make it safe for use in a regular expression.
|
||||
* This method ensures that characters with special meaning in regular expressions
|
||||
* (e.g., `.`, `*`, `+`, `?`, `^`, `$`, `{`, `}`, `(`, `)`, `|`, `[`, `]`, `\`) are
|
||||
* properly escaped.
|
||||
*
|
||||
* @param str - The input string to sanitize for use in a regular expression.
|
||||
* @returns A new string with special characters escaped.
|
||||
*/
|
||||
private sanitizeRegExp(str: string) {
|
||||
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
||||
}
|
||||
|
||||
@ -2,6 +2,8 @@ import {
|
||||
_Object,
|
||||
DeleteObjectCommand,
|
||||
DeleteObjectCommandOutput,
|
||||
DeleteObjectsCommand,
|
||||
DeleteObjectsCommandOutput,
|
||||
GetObjectCommand,
|
||||
GetObjectCommandOutput,
|
||||
HeadObjectCommand,
|
||||
@ -129,6 +131,82 @@ export class S3Service {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bulk deletes objects from S3.
|
||||
* @param keys Array of object keys to delete. Estos keys deben incluir el subbucket (se obtiene con getFullKey).
|
||||
* @param bucket S3 bucket name (default: MEET_S3_BUCKET)
|
||||
*/
|
||||
async deleteObjects(keys: string[], bucket: string = MEET_S3_BUCKET): Promise<DeleteObjectsCommandOutput> {
|
||||
try {
|
||||
this.logger.info(`S3 delete: attempting to delete ${keys.length} objects from bucket ${bucket}`);
|
||||
const command = new DeleteObjectsCommand({
|
||||
Bucket: bucket,
|
||||
Delete: {
|
||||
Objects: keys.map((key) => ({ Key: this.getFullKey(key) })),
|
||||
Quiet: false
|
||||
}
|
||||
});
|
||||
console.log(
|
||||
'command',
|
||||
keys.map((key) => ({ Key: key }))
|
||||
);
|
||||
const result = await this.run(command);
|
||||
this.logger.info(`S3 bulk delete: successfully deleted objects from bucket ${bucket}`);
|
||||
return result;
|
||||
} catch (error: any) {
|
||||
this.logger.error(`S3 bulk delete: error deleting objects in bucket ${bucket}: ${error}`);
|
||||
throw internalError(error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List objects with pagination.
|
||||
*
|
||||
* @param additionalPrefix Additional prefix relative to the subbucket.
|
||||
* Por ejemplo, para listar metadata se pasa ".metadata/".
|
||||
* @param searchPattern Optional regex pattern to filter keys.
|
||||
* @param bucket Optional bucket name.
|
||||
* @param maxKeys Maximum number of objects to return.
|
||||
* @param continuationToken Token to retrieve the next page.
|
||||
*
|
||||
* @returns The ListObjectsV2CommandOutput with Keys and NextContinuationToken.
|
||||
*/
|
||||
async listObjectsPaginated(
|
||||
additionalPrefix = '',
|
||||
maxKeys = 50,
|
||||
continuationToken?: string,
|
||||
searchPattern = '',
|
||||
bucket: string = MEET_S3_BUCKET
|
||||
): Promise<ListObjectsV2CommandOutput> {
|
||||
// Se construye el prefijo completo combinando el subbucket y el additionalPrefix.
|
||||
// Ejemplo: si s3Subbucket es "recordings" y additionalPrefix es ".metadata/",
|
||||
// se listarán los objetos con key que empiece por "recordings/.metadata/".
|
||||
const basePrefix = this.getFullKey(additionalPrefix);
|
||||
this.logger.verbose(`S3 listObjectsPaginated: listing objects with prefix "${basePrefix}"`);
|
||||
|
||||
const command = new ListObjectsV2Command({
|
||||
Bucket: bucket,
|
||||
Prefix: basePrefix,
|
||||
MaxKeys: maxKeys,
|
||||
ContinuationToken: continuationToken
|
||||
});
|
||||
|
||||
try {
|
||||
const response: ListObjectsV2CommandOutput = await this.s3.send(command);
|
||||
|
||||
// Si se ha proporcionado searchPattern, se filtran los resultados.
|
||||
if (searchPattern) {
|
||||
const regex = new RegExp(searchPattern);
|
||||
response.Contents = (response.Contents || []).filter((item) => item.Key && regex.test(item.Key));
|
||||
}
|
||||
|
||||
return response;
|
||||
} catch (error: any) {
|
||||
this.logger.error(`S3 listObjectsPaginated: error listing objects with prefix "${basePrefix}": ${error}`);
|
||||
throw internalError(error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Lists all objects in an S3 bucket with optional subbucket and search pattern filtering.
|
||||
*
|
||||
@ -207,19 +285,17 @@ export class S3Service {
|
||||
}
|
||||
|
||||
async getObjectAsJson(name: string, bucket: string = MEET_S3_BUCKET): Promise<Object | undefined> {
|
||||
const fullKey = this.getFullKey(name);
|
||||
|
||||
try {
|
||||
const obj = await this.getObject(fullKey, bucket);
|
||||
const obj = await this.getObject(name, bucket);
|
||||
const str = await obj.Body?.transformToString();
|
||||
const parsed = JSON.parse(str as string);
|
||||
this.logger.info(
|
||||
`S3 getObjectAsJson: successfully retrieved and parsed object ${fullKey} from bucket ${bucket}`
|
||||
this.logger.verbose(
|
||||
`S3 getObjectAsJson: successfully retrieved and parsed object ${name} from bucket ${bucket}`
|
||||
);
|
||||
return parsed;
|
||||
} catch (error: any) {
|
||||
if (error.name === 'NoSuchKey') {
|
||||
this.logger.warn(`S3 getObjectAsJson: object '${fullKey}' does not exist in bucket ${bucket}`);
|
||||
this.logger.warn(`S3 getObjectAsJson: object '${name}' does not exist in bucket ${bucket}`);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@ -227,7 +303,7 @@ export class S3Service {
|
||||
throw errorS3NotAvailable(error);
|
||||
}
|
||||
|
||||
this.logger.error(`S3 getObjectAsJson: error retrieving object ${fullKey} from bucket ${bucket}: ${error}`);
|
||||
this.logger.error(`S3 getObjectAsJson: error retrieving object ${name} from bucket ${bucket}: ${error}`);
|
||||
throw internalError(error);
|
||||
}
|
||||
}
|
||||
@ -237,10 +313,8 @@ export class S3Service {
|
||||
bucket: string = MEET_S3_BUCKET,
|
||||
range?: { start: number; end: number }
|
||||
): Promise<Readable> {
|
||||
const fullKey = this.getFullKey(name);
|
||||
|
||||
try {
|
||||
const obj = await this.getObject(fullKey, bucket, range);
|
||||
const obj = await this.getObject(name, bucket, range);
|
||||
|
||||
if (obj.Body) {
|
||||
this.logger.info(
|
||||
@ -253,7 +327,7 @@ export class S3Service {
|
||||
}
|
||||
} catch (error: any) {
|
||||
this.logger.error(
|
||||
`S3 getObjectAsStream: error retrieving stream for object ${fullKey} from bucket ${bucket}: ${error}`
|
||||
`S3 getObjectAsStream: error retrieving stream for object ${name} from bucket ${bucket}: ${error}`
|
||||
);
|
||||
|
||||
if (error.code === 'ECONNREFUSED') {
|
||||
@ -288,11 +362,18 @@ export class S3Service {
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares a full key path by prefixing the object's name with the subbucket.
|
||||
* All operations are performed under MEET_S3_BUCKET/MEET_S3_SUBBUCKET.
|
||||
* Constructs the full key for an S3 object by ensuring it includes the specified sub-bucket prefix.
|
||||
* If the provided name already starts with the prefix, it is returned as-is.
|
||||
* Otherwise, the prefix is prepended to the name.
|
||||
*/
|
||||
protected getFullKey(name: string): string {
|
||||
return `${MEET_S3_SUBBUCKET}/${name}`;
|
||||
const prefix = `${MEET_S3_SUBBUCKET}`;
|
||||
|
||||
if (name.startsWith(prefix)) {
|
||||
return name;
|
||||
}
|
||||
|
||||
return `${prefix}/${name}`;
|
||||
}
|
||||
|
||||
protected async getObject(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user