backend: Enhance recording media streaming with range support and error handling
This commit is contained in:
parent
2a7d23be7d
commit
bdddeb34c5
@ -1,8 +1,9 @@
|
|||||||
import { Request, Response } from 'express';
|
import { Request, Response } from 'express';
|
||||||
import { container } from '../config/index.js';
|
import { container } from '../config/index.js';
|
||||||
import INTERNAL_CONFIG from '../config/internal-config.js';
|
import INTERNAL_CONFIG from '../config/internal-config.js';
|
||||||
import { OpenViduMeetError } from '../models/error.model.js';
|
import { internalError, OpenViduMeetError } from '../models/error.model.js';
|
||||||
import { LoggerService, RecordingService } from '../services/index.js';
|
import { LoggerService, RecordingService } from '../services/index.js';
|
||||||
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
export const startRecording = async (req: Request, res: Response) => {
|
export const startRecording = async (req: Request, res: Response) => {
|
||||||
const logger = container.get(LoggerService);
|
const logger = container.get(LoggerService);
|
||||||
@ -170,43 +171,84 @@ export const getRecordingMedia = async (req: Request, res: Response) => {
|
|||||||
|
|
||||||
const recordingId = req.params.recordingId;
|
const recordingId = req.params.recordingId;
|
||||||
const range = req.headers.range;
|
const range = req.headers.range;
|
||||||
|
let fileStream: Readable | undefined;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.info(`Streaming recording ${recordingId}`);
|
logger.info(`Streaming recording ${recordingId}`);
|
||||||
const recordingService = container.get(RecordingService);
|
const recordingService = container.get(RecordingService);
|
||||||
|
|
||||||
const { fileSize, fileStream, start, end } = await recordingService.getRecordingAsStream(recordingId, range);
|
const result = await recordingService.getRecordingAsStream(recordingId, range);
|
||||||
|
const { fileSize, start, end } = result;
|
||||||
|
fileStream = result.fileStream;
|
||||||
|
|
||||||
|
fileStream.on('error', (streamError) => {
|
||||||
|
logger.error(`Error streaming recording ${recordingId}: ${streamError.message}`);
|
||||||
|
|
||||||
|
if (!res.headersSent) {
|
||||||
|
const error = internalError(streamError);
|
||||||
|
res.status(error.statusCode).json({ name: 'Recording Error', message: error.message });
|
||||||
|
}
|
||||||
|
|
||||||
|
res.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Handle client disconnection
|
||||||
|
req.on('close', () => {
|
||||||
|
if (fileStream && !fileStream.destroyed) {
|
||||||
|
logger.debug(`Client closed connection for recording media ${recordingId}`);
|
||||||
|
fileStream.destroy();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Handle partial requests (HTTP Range requests)
|
||||||
if (range && fileSize && start !== undefined && end !== undefined) {
|
if (range && fileSize && start !== undefined && end !== undefined) {
|
||||||
const contentLength = end - start + 1;
|
const contentLength = end - start + 1;
|
||||||
|
|
||||||
|
// Set headers for partial content response
|
||||||
res.writeHead(206, {
|
res.writeHead(206, {
|
||||||
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
|
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
|
||||||
'Accept-Ranges': 'bytes',
|
'Accept-Ranges': 'bytes',
|
||||||
'Content-Length': contentLength,
|
'Content-Length': contentLength,
|
||||||
'Content-Type': 'video/mp4'
|
'Content-Type': 'video/mp4',
|
||||||
|
'Cache-Control': 'public, max-age=3600'
|
||||||
});
|
});
|
||||||
|
|
||||||
fileStream.on('error', (streamError) => {
|
|
||||||
logger.error(`Error while streaming the file: ${streamError.message}`);
|
|
||||||
res.end();
|
|
||||||
});
|
|
||||||
|
|
||||||
fileStream.pipe(res).on('finish', () => res.end());
|
|
||||||
} else {
|
} else {
|
||||||
res.setHeader('Accept-Ranges', 'bytes');
|
// Set headers for full content response
|
||||||
res.setHeader('Content-Type', 'video/mp4');
|
res.writeHead(200, {
|
||||||
|
'Accept-Ranges': 'bytes',
|
||||||
if (fileSize) res.setHeader('Content-Length', fileSize);
|
'Content-Type': 'video/mp4',
|
||||||
|
'Content-Length': fileSize || undefined,
|
||||||
fileStream.pipe(res).on('finish', () => res.end());
|
'Cache-Control': 'public, max-age=3600'
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fileStream
|
||||||
|
.pipe(res)
|
||||||
|
.on('finish', () => {
|
||||||
|
logger.debug(`Finished streaming recording ${recordingId}`);
|
||||||
|
|
||||||
|
res.end();
|
||||||
|
})
|
||||||
|
.on('error', (err) => {
|
||||||
|
logger.error(`Error in response stream for ${recordingId}: ${err.message}`);
|
||||||
|
|
||||||
|
if (!res.headersSent) {
|
||||||
|
res.status(500).end();
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
if (fileStream && !fileStream.destroyed) {
|
||||||
|
fileStream.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
if (error instanceof OpenViduMeetError) {
|
if (error instanceof OpenViduMeetError) {
|
||||||
logger.error(`Error streaming recording: ${error.message}`);
|
logger.error(`Error streaming recording: ${error.message}`);
|
||||||
return res.status(error.statusCode).json({ name: error.name, message: error.message });
|
return res.status(error.statusCode).json({ name: error.name, message: error.message });
|
||||||
}
|
}
|
||||||
|
|
||||||
return res.status(500).json({ name: 'Recording Error', message: 'Unexpected error streaming recording' });
|
logger.error(`Unexpected error streaming recording ${recordingId}: ${error}`);
|
||||||
|
return res
|
||||||
|
.status(500)
|
||||||
|
.json({ name: 'Recording Error', message: 'An unexpected error occurred while processing the recording' });
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@ -77,6 +77,22 @@ const BulkDeleteRecordingsSchema = z.object({
|
|||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const GetRecordingMediaSchema = z.object({
|
||||||
|
params: z.object({
|
||||||
|
recordingId: nonEmptySanitizedRecordingId('recordingId')
|
||||||
|
}),
|
||||||
|
headers: z
|
||||||
|
.object({
|
||||||
|
range: z
|
||||||
|
.string()
|
||||||
|
.regex(/^bytes=\d+-\d*$/, {
|
||||||
|
message: 'Invalid range header format. Expected: bytes=start-end'
|
||||||
|
})
|
||||||
|
.optional()
|
||||||
|
})
|
||||||
|
.passthrough() // Allow other headers to pass through
|
||||||
|
});
|
||||||
|
|
||||||
const GetRecordingsFiltersSchema: z.ZodType<MeetRecordingFilters> = z.object({
|
const GetRecordingsFiltersSchema: z.ZodType<MeetRecordingFilters> = z.object({
|
||||||
maxItems: z.coerce
|
maxItems: z.coerce
|
||||||
.number()
|
.number()
|
||||||
@ -141,6 +157,21 @@ export const withValidRecordingBulkDeleteRequest = (req: Request, res: Response,
|
|||||||
next();
|
next();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const withValidGetMediaRequest = (req: Request, res: Response, next: NextFunction) => {
|
||||||
|
const { success, error, data } = GetRecordingMediaSchema.safeParse({
|
||||||
|
params: req.params,
|
||||||
|
headers: req.headers
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!success) {
|
||||||
|
return rejectRequest(res, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
req.params.recordingId = data.params.recordingId;
|
||||||
|
req.headers.range = data.headers.range;
|
||||||
|
next();
|
||||||
|
};
|
||||||
|
|
||||||
const rejectRequest = (res: Response, error: z.ZodError) => {
|
const rejectRequest = (res: Response, error: z.ZodError) => {
|
||||||
const errors = error.errors.map((error) => ({
|
const errors = error.errors.map((error) => ({
|
||||||
field: error.path.join('.'),
|
field: error.path.join('.'),
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
type StatusError = 400 | 401 | 403 | 404 | 406 | 409 | 422 | 500 | 503;
|
type StatusError = 400 | 401 | 403 | 404 | 406 | 409 | 416 | 422 | 500 | 503;
|
||||||
export class OpenViduMeetError extends Error {
|
export class OpenViduMeetError extends Error {
|
||||||
name: string;
|
name: string;
|
||||||
statusCode: StatusError;
|
statusCode: StatusError;
|
||||||
@ -79,6 +79,14 @@ export const errorRecordingStartTimeout = (roomId: string): OpenViduMeetError =>
|
|||||||
return new OpenViduMeetError('Recording Error', `Recording in room '${roomId}' timed out while starting`, 503);
|
return new OpenViduMeetError('Recording Error', `Recording in room '${roomId}' timed out while starting`, 503);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const errorRecordingRangeNotSatisfiable = (recordingId: string, fileSize: number): OpenViduMeetError => {
|
||||||
|
return new OpenViduMeetError(
|
||||||
|
'Recording Error',
|
||||||
|
`Recording '${recordingId}' range not satisfiable. File size: ${fileSize}`,
|
||||||
|
416
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
export const errorRoomHasNoParticipants = (roomId: string): OpenViduMeetError => {
|
export const errorRoomHasNoParticipants = (roomId: string): OpenViduMeetError => {
|
||||||
return new OpenViduMeetError('Recording Error', `The room '${roomId}' has no participants`, 409);
|
return new OpenViduMeetError('Recording Error', `The room '${roomId}' has no participants`, 409);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import {
|
|||||||
withCanRecordPermission,
|
withCanRecordPermission,
|
||||||
withCanRetrieveRecordingsPermission,
|
withCanRetrieveRecordingsPermission,
|
||||||
withRecordingEnabled,
|
withRecordingEnabled,
|
||||||
|
withValidGetMediaRequest,
|
||||||
withValidRecordingBulkDeleteRequest,
|
withValidRecordingBulkDeleteRequest,
|
||||||
withValidRecordingFiltersRequest,
|
withValidRecordingFiltersRequest,
|
||||||
withValidRecordingId,
|
withValidRecordingId,
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import {
|
|||||||
errorRecordingCannotBeStoppedWhileStarting,
|
errorRecordingCannotBeStoppedWhileStarting,
|
||||||
errorRecordingNotFound,
|
errorRecordingNotFound,
|
||||||
errorRecordingNotStopped,
|
errorRecordingNotStopped,
|
||||||
|
errorRecordingRangeNotSatisfiable,
|
||||||
errorRecordingStartTimeout,
|
errorRecordingStartTimeout,
|
||||||
errorRoomHasNoParticipants,
|
errorRoomHasNoParticipants,
|
||||||
errorRoomNotFound,
|
errorRoomNotFound,
|
||||||
@ -356,8 +357,13 @@ export class RecordingService {
|
|||||||
recordingId: string,
|
recordingId: string,
|
||||||
range?: string
|
range?: string
|
||||||
): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> {
|
): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> {
|
||||||
const RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB
|
const DEFAULT_RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB
|
||||||
const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId);
|
const recordingInfo: MeetRecordingInfo = await this.getRecording(recordingId);
|
||||||
|
|
||||||
|
if (recordingInfo.status !== MeetRecordingStatus.COMPLETE) {
|
||||||
|
throw errorRecordingNotStopped(recordingId);
|
||||||
|
}
|
||||||
|
|
||||||
const recordingPath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/${RecordingHelper.extractFilename(recordingInfo)}`;
|
const recordingPath = `${INTERNAL_CONFIG.S3_RECORDINGS_PREFIX}/${RecordingHelper.extractFilename(recordingInfo)}`;
|
||||||
|
|
||||||
if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`);
|
if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`);
|
||||||
@ -365,23 +371,59 @@ export class RecordingService {
|
|||||||
const data = await this.s3Service.getHeaderObject(recordingPath);
|
const data = await this.s3Service.getHeaderObject(recordingPath);
|
||||||
const fileSize = data.ContentLength;
|
const fileSize = data.ContentLength;
|
||||||
|
|
||||||
if (range && fileSize) {
|
if (!fileSize) {
|
||||||
|
this.logger.error(`Error getting file size for recording ${recordingId}`);
|
||||||
|
throw internalError(`Error getting file size for recording ${recordingId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (range) {
|
||||||
// Parse the range header
|
// Parse the range header
|
||||||
const parts = range.replace(/bytes=/, '').split('-');
|
const matches = range.match(/^bytes=(\d+)-(\d*)$/)!;
|
||||||
const start = parseInt(parts[0], 10);
|
|
||||||
const endRange = parts[1] ? parseInt(parts[1], 10) : start + RECORDING_FILE_PORTION_SIZE;
|
const start = parseInt(matches[1], 10);
|
||||||
const end = Math.min(endRange, fileSize - 1);
|
let end = matches[2] ? parseInt(matches[2], 10) : start + DEFAULT_RECORDING_FILE_PORTION_SIZE;
|
||||||
|
|
||||||
|
// Validate the range values
|
||||||
|
if (isNaN(start) || isNaN(end) || start < 0) {
|
||||||
|
this.logger.warn(`Invalid range values for recording ${recordingId}: start=${start}, end=${end}`);
|
||||||
|
this.logger.warn(`Returning full stream for recording ${recordingId}`);
|
||||||
|
return this.getFullStreamResponse(recordingPath, fileSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (start >= fileSize) {
|
||||||
|
this.logger.error(
|
||||||
|
`Invalid range values for recording ${recordingId}: start=${start}, end=${end}, fileSize=${fileSize}`
|
||||||
|
);
|
||||||
|
throw errorRecordingRangeNotSatisfiable(recordingId, fileSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adjust the end value to ensure it doesn't exceed the file size
|
||||||
|
end = Math.min(end, fileSize - 1);
|
||||||
|
|
||||||
|
// If the start is greater than the end, return the full stream
|
||||||
|
if (start > end) {
|
||||||
|
this.logger.warn(`Invalid range values after adjustment: start=${start}, end=${end}`);
|
||||||
|
return this.getFullStreamResponse(recordingPath, fileSize);
|
||||||
|
}
|
||||||
|
|
||||||
const fileStream = await this.s3Service.getObjectAsStream(recordingPath, MEET_S3_BUCKET, {
|
const fileStream = await this.s3Service.getObjectAsStream(recordingPath, MEET_S3_BUCKET, {
|
||||||
start,
|
start,
|
||||||
end
|
end
|
||||||
});
|
});
|
||||||
return { fileSize, fileStream, start, end };
|
return { fileSize, fileStream, start, end };
|
||||||
} else {
|
} else {
|
||||||
const fileStream = await this.s3Service.getObjectAsStream(recordingPath);
|
return this.getFullStreamResponse(recordingPath, fileSize);
|
||||||
return { fileSize, fileStream };
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected async getFullStreamResponse(
|
||||||
|
recordingPath: string,
|
||||||
|
fileSize: number
|
||||||
|
): Promise<{ fileSize: number; fileStream: Readable }> {
|
||||||
|
const fileStream = await this.s3Service.getObjectAsStream(recordingPath, MEET_S3_BUCKET);
|
||||||
|
return { fileSize, fileStream };
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acquires a Redis-based lock to indicate that a recording is active for a specific room.
|
* Acquires a Redis-based lock to indicate that a recording is active for a specific room.
|
||||||
*
|
*
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user