backend: Enhance S3 service with retry mechanism and additional configuration options

This commit is contained in:
Carlos Santos 2025-03-19 17:20:26 +01:00
parent 60319cdafa
commit c05d9390f9
2 changed files with 125 additions and 29 deletions

View File

@ -35,11 +35,14 @@ export const {
// S3 configuration
MEET_S3_BUCKET = 'openvidu',
MEET_S3_SUBBUCKET = 'openvidu-meet',
MEET_S3_SERVICE_ENDPOINT = 'http://localhost:9000',
MEET_S3_ACCESS_KEY = 'minioadmin',
MEET_S3_SECRET_KEY = 'minioadmin',
MEET_AWS_REGION = 'us-east-1',
MEET_S3_WITH_PATH_STYLE_ACCESS = 'true',
MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR = '5',
MEET_S3_INITIAL_RETRY_DELAY_MS = '100',
// Redis configuration
MEET_REDIS_HOST: REDIS_HOST = 'localhost',
@ -64,6 +67,8 @@ export const MEET_API_BASE_PATH_V1 = MEET_API_BASE_PATH + '/v1';
export const PARTICIPANT_TOKEN_COOKIE_NAME = 'OvMeetParticipantToken';
export const ACCESS_TOKEN_COOKIE_NAME = 'OvMeetAccessToken';
export const REFRESH_TOKEN_COOKIE_NAME = 'OvMeetRefreshToken';
export const MEET_S3_ROOMS_PREFIX = 'rooms';
export const MEET_S3_RECORDINGS_PREFIX = 'recordings';
export function checkModuleEnabled() {
if (MODULES_FILE) {
@ -122,6 +127,9 @@ export const logEnvVars = () => {
console.log('MEET S3 ACCESS KEY:', credential('****' + MEET_S3_ACCESS_KEY.slice(-3)));
console.log('MEET S3 SECRET KEY:', credential('****' + MEET_S3_SECRET_KEY.slice(-3)));
console.log('MEET AWS REGION:', text(MEET_AWS_REGION));
console.log('MEET S3 WITH PATH STYLE ACCESS:', text(MEET_S3_WITH_PATH_STYLE_ACCESS));
console.log('MEET S3 MAX RETRIES ATTEMPTS ON SAVE ERROR:', text(MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR));
console.log('MEET S3 INITIAL RETRY DELAY MS:', text(MEET_S3_INITIAL_RETRY_DELAY_MS));
console.log('---------------------------------------------------------');
console.log('Redis Configuration');
console.log('---------------------------------------------------------');

View File

@ -20,7 +20,10 @@ import {
MEET_S3_BUCKET,
MEET_S3_SERVICE_ENDPOINT,
MEET_S3_SECRET_KEY,
MEET_S3_WITH_PATH_STYLE_ACCESS
MEET_S3_WITH_PATH_STYLE_ACCESS,
MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR,
MEET_S3_INITIAL_RETRY_DELAY_MS,
MEET_S3_SUBBUCKET
} from '../environment.js';
import { errorS3NotAvailable, internalError } from '../models/error.model.js';
import { Readable } from 'stream';
@ -43,20 +46,19 @@ export class S3Service {
};
this.s3 = new S3Client(config);
this.logger.debug('S3 Client initialized');
}
/**
* Checks if a file exists in the specified S3 bucket.
*
* @param path - The path of the file to check.
* @param MEET_AWS_S3_BUCKET - The name of the S3 bucket.
* @returns A boolean indicating whether the file exists or not.
*/
async exists(path: string, bucket: string = MEET_S3_BUCKET) {
async exists(name: string, bucket: string = MEET_S3_BUCKET): Promise<boolean> {
try {
await this.getHeaderObject(path, bucket);
await this.getHeaderObject(name, bucket);
this.logger.verbose(`S3 exists: file ${this.getFullKey(name)} found in bucket ${bucket}`);
return true;
} catch (error) {
this.logger.warn(`S3 exists: file ${this.getFullKey(name)} not found in bucket ${bucket}`);
return false;
}
}
@ -77,16 +79,24 @@ export class S3Service {
// return this.run(command);
// }
/**
* Saves an object to a S3 bucket.
* Uses an internal retry mechanism in case of errors.
*/
async saveObject(name: string, body: any, bucket: string = MEET_S3_BUCKET): Promise<PutObjectCommandOutput> {
const fullKey = this.getFullKey(name);
try {
const command = new PutObjectCommand({
Bucket: bucket,
Key: name,
Key: fullKey,
Body: JSON.stringify(body)
});
return await this.run(command);
const result = await this.retryOperation<PutObjectCommandOutput>(() => this.run(command));
this.logger.info(`S3 saveObject: successfully saved object ${fullKey} in bucket ${bucket}`);
return result;
} catch (error: any) {
this.logger.error(`Error putting object in S3: ${error}`);
this.logger.error(`S3 saveObject: error putting object ${fullKey} in bucket ${bucket}: ${error}`);
if (error.code === 'ECONNREFUSED') {
throw errorS3NotAvailable(error);
@ -100,17 +110,21 @@ export class S3Service {
* Deletes an object from an S3 bucket.
*
* @param name - The name of the object to delete.
* @param bucket - The name of the S3 bucket (optional, defaults to MEET_S3_BUCKET).
* @param bucket - The name of the S3 bucket (optional, defaults to the `${MEET_S3_BUCKET}/${MEET_S3_SUBBUCKET}`
* @returns A promise that resolves to the result of the delete operation.
* @throws Throws an error if there was an error deleting the object.
*/
async deleteObject(name: string, bucket: string = MEET_S3_BUCKET): Promise<DeleteObjectCommandOutput> {
const fullKey = this.getFullKey(name);
try {
this.logger.verbose(`Deleting object in S3: ${name}`);
this.logger.verbose(`S3 deleteObject: attempting to delete object ${fullKey} in bucket ${bucket}`);
const command = new DeleteObjectCommand({ Bucket: bucket, Key: name });
return await this.run(command);
const result = await this.run(command);
this.logger.info(`S3 deleteObject: successfully deleted object ${fullKey} in bucket ${bucket}`);
return result;
} catch (error) {
this.logger.error(`Error deleting object in S3: ${error}`);
this.logger.error(`S3 deleteObject: error deleting object ${fullKey} in bucket ${bucket}: ${error}`);
throw internalError(error);
}
}
@ -126,22 +140,24 @@ export class S3Service {
* @throws {Error} - Throws an error if there is an issue listing the objects.
*/
async listObjects(
subbucket = '',
additionalPrefix = '',
searchPattern = '',
bucket: string = MEET_S3_BUCKET,
maxObjects = 1000
): Promise<ListObjectsV2CommandOutput> {
const prefix = subbucket ? `${subbucket}/` : '';
const basePrefix = `${MEET_S3_SUBBUCKET}/${additionalPrefix}`.replace(/\/+$/, '');
let allContents: _Object[] = [];
let continuationToken: string | undefined = undefined;
let isTruncated = true;
let fullResponse: ListObjectsV2CommandOutput | undefined = undefined;
try {
this.logger.verbose(`S3 listObjects: starting listing objects with prefix "${basePrefix}"`);
while (isTruncated) {
const command = new ListObjectsV2Command({
Bucket: bucket,
Prefix: prefix,
Prefix: basePrefix,
MaxKeys: maxObjects,
ContinuationToken: continuationToken
});
@ -166,6 +182,7 @@ export class S3Service {
// Update the loop control variables
isTruncated = response.IsTruncated ?? false;
continuationToken = response.NextContinuationToken;
this.logger.verbose(`S3 listObjects: fetched ${objects.length} objects; isTruncated=${isTruncated}`);
}
if (fullResponse) {
@ -176,9 +193,10 @@ export class S3Service {
fullResponse.KeyCount = allContents.length;
}
this.logger.info(`S3 listObjects: total objects found under prefix "${basePrefix}": ${allContents.length}`);
return fullResponse!;
} catch (error) {
this.logger.error(`Error listing objects: ${error}`);
this.logger.error(`S3 listObjects: error listing objects under prefix "${basePrefix}": ${error}`);
if ((error as any).code === 'ECONNREFUSED') {
throw errorS3NotAvailable(error);
@ -189,13 +207,19 @@ export class S3Service {
}
async getObjectAsJson(name: string, bucket: string = MEET_S3_BUCKET): Promise<Object | undefined> {
const fullKey = this.getFullKey(name);
try {
const obj = await this.getObject(name, bucket);
const obj = await this.getObject(fullKey, bucket);
const str = await obj.Body?.transformToString();
return JSON.parse(str as string);
const parsed = JSON.parse(str as string);
this.logger.info(
`S3 getObjectAsJson: successfully retrieved and parsed object ${fullKey} from bucket ${bucket}`
);
return parsed;
} catch (error: any) {
if (error.name === 'NoSuchKey') {
this.logger.warn(`Object '${name}' does not exist in S3`);
this.logger.warn(`S3 getObjectAsJson: object '${fullKey}' does not exist in bucket ${bucket}`);
return undefined;
}
@ -203,22 +227,34 @@ export class S3Service {
throw errorS3NotAvailable(error);
}
this.logger.error(`Error getting object from S3. Maybe it has been deleted: ${error}`);
this.logger.error(`S3 getObjectAsJson: error retrieving object ${fullKey} from bucket ${bucket}: ${error}`);
throw internalError(error);
}
}
async getObjectAsStream(name: string, bucket: string = MEET_S3_BUCKET, range?: { start: number; end: number }) {
async getObjectAsStream(
name: string,
bucket: string = MEET_S3_BUCKET,
range?: { start: number; end: number }
): Promise<Readable> {
const fullKey = this.getFullKey(name);
try {
const obj = await this.getObject(name, bucket, range);
const obj = await this.getObject(fullKey, bucket, range);
if (obj.Body) {
this.logger.info(
`S3 getObjectAsStream: successfully retrieved object ${name} stream from bucket ${bucket}`
);
return obj.Body as Readable;
} else {
throw new Error('Empty body response');
}
} catch (error: any) {
this.logger.error(`Error getting object from S3: ${error}`);
this.logger.error(
`S3 getObjectAsStream: error retrieving stream for object ${fullKey} from bucket ${bucket}: ${error}`
);
if (error.code === 'ECONNREFUSED') {
throw errorS3NotAvailable(error);
@ -230,31 +266,48 @@ export class S3Service {
async getHeaderObject(name: string, bucket: string = MEET_S3_BUCKET): Promise<HeadObjectCommandOutput> {
try {
const fullKey = this.getFullKey(name);
const headParams: HeadObjectCommand = new HeadObjectCommand({
Bucket: bucket,
Key: name
Key: fullKey
});
this.logger.verbose(`S3 getHeaderObject: requesting header for object ${fullKey} in bucket ${bucket}`);
return await this.run(headParams);
} catch (error) {
this.logger.error(`Error getting header object from S3 in ${name}: ${error}`);
this.logger.error(
`S3 getHeaderObject: error getting header for object ${this.getFullKey(name)} in bucket ${bucket}: ${error}`
);
throw internalError(error);
}
}
quit() {
this.s3.destroy();
this.logger.info('S3 client destroyed');
}
private async getObject(
/**
* Prepares a full key path by prefixing the object's name with the subbucket.
* All operations are performed under MEET_S3_BUCKET/MEET_S3_SUBBUCKET.
*/
protected getFullKey(name: string): string {
return `${MEET_S3_SUBBUCKET}/${name}`;
}
protected async getObject(
name: string,
bucket: string = MEET_S3_BUCKET,
range?: { start: number; end: number }
): Promise<GetObjectCommandOutput> {
const fullKey = this.getFullKey(name);
const command = new GetObjectCommand({
Bucket: bucket,
Key: name,
Key: fullKey,
Range: range ? `bytes=${range.start}-${range.end}` : undefined
});
this.logger.verbose(`S3 getObject: requesting object ${fullKey} from bucket ${bucket}`);
return await this.run(command);
}
@ -262,4 +315,39 @@ export class S3Service {
protected async run(command: any) {
return this.s3.send(command);
}
/**
* Retries a given asynchronous operation with exponential backoff.
*/
protected async retryOperation<T>(operation: () => Promise<T>): Promise<T> {
let attempt = 0;
let delayMs = Number(MEET_S3_INITIAL_RETRY_DELAY_MS);
const maxRetries = Number(MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR);
while (true) {
try {
this.logger.verbose(`S3 retryOperation: attempt ${attempt + 1}`);
return await operation();
} catch (error) {
attempt++;
if (attempt >= maxRetries) {
this.logger.error(`S3 retryOperation: operation failed after ${maxRetries} attempts`);
throw error;
}
this.logger.warn(`S3 retryOperation: attempt ${attempt} failed. Retrying in ${delayMs}ms...`);
await this.sleep(delayMs);
// Exponential back off: delay increases by a factor of 2
delayMs *= 2;
}
}
}
/**
* Internal helper to delay execution.
*/
protected sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}