diff --git a/backend/src/environment.ts b/backend/src/environment.ts index 1a58129..dcd6489 100644 --- a/backend/src/environment.ts +++ b/backend/src/environment.ts @@ -35,11 +35,14 @@ export const { // S3 configuration MEET_S3_BUCKET = 'openvidu', + MEET_S3_SUBBUCKET = 'openvidu-meet', MEET_S3_SERVICE_ENDPOINT = 'http://localhost:9000', MEET_S3_ACCESS_KEY = 'minioadmin', MEET_S3_SECRET_KEY = 'minioadmin', MEET_AWS_REGION = 'us-east-1', MEET_S3_WITH_PATH_STYLE_ACCESS = 'true', + MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR = '5', + MEET_S3_INITIAL_RETRY_DELAY_MS = '100', // Redis configuration MEET_REDIS_HOST: REDIS_HOST = 'localhost', @@ -64,6 +67,8 @@ export const MEET_API_BASE_PATH_V1 = MEET_API_BASE_PATH + '/v1'; export const PARTICIPANT_TOKEN_COOKIE_NAME = 'OvMeetParticipantToken'; export const ACCESS_TOKEN_COOKIE_NAME = 'OvMeetAccessToken'; export const REFRESH_TOKEN_COOKIE_NAME = 'OvMeetRefreshToken'; +export const MEET_S3_ROOMS_PREFIX = 'rooms'; +export const MEET_S3_RECORDINGS_PREFIX = 'recordings'; export function checkModuleEnabled() { if (MODULES_FILE) { @@ -122,6 +127,9 @@ export const logEnvVars = () => { console.log('MEET S3 ACCESS KEY:', credential('****' + MEET_S3_ACCESS_KEY.slice(-3))); console.log('MEET S3 SECRET KEY:', credential('****' + MEET_S3_SECRET_KEY.slice(-3))); console.log('MEET AWS REGION:', text(MEET_AWS_REGION)); + console.log('MEET S3 WITH PATH STYLE ACCESS:', text(MEET_S3_WITH_PATH_STYLE_ACCESS)); + console.log('MEET S3 MAX RETRIES ATTEMPTS ON SAVE ERROR:', text(MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR)); + console.log('MEET S3 INITIAL RETRY DELAY MS:', text(MEET_S3_INITIAL_RETRY_DELAY_MS)); console.log('---------------------------------------------------------'); console.log('Redis Configuration'); console.log('---------------------------------------------------------'); diff --git a/backend/src/services/s3.service.ts b/backend/src/services/s3.service.ts index d509bbb..9f2fe28 100644 --- a/backend/src/services/s3.service.ts +++ b/backend/src/services/s3.service.ts @@ -20,7 +20,10 @@ import { MEET_S3_BUCKET, MEET_S3_SERVICE_ENDPOINT, MEET_S3_SECRET_KEY, - MEET_S3_WITH_PATH_STYLE_ACCESS + MEET_S3_WITH_PATH_STYLE_ACCESS, + MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR, + MEET_S3_INITIAL_RETRY_DELAY_MS, + MEET_S3_SUBBUCKET } from '../environment.js'; import { errorS3NotAvailable, internalError } from '../models/error.model.js'; import { Readable } from 'stream'; @@ -43,20 +46,19 @@ export class S3Service { }; this.s3 = new S3Client(config); + this.logger.debug('S3 Client initialized'); } /** * Checks if a file exists in the specified S3 bucket. - * - * @param path - The path of the file to check. - * @param MEET_AWS_S3_BUCKET - The name of the S3 bucket. - * @returns A boolean indicating whether the file exists or not. */ - async exists(path: string, bucket: string = MEET_S3_BUCKET) { + async exists(name: string, bucket: string = MEET_S3_BUCKET): Promise { try { - await this.getHeaderObject(path, bucket); + await this.getHeaderObject(name, bucket); + this.logger.verbose(`S3 exists: file ${this.getFullKey(name)} found in bucket ${bucket}`); return true; } catch (error) { + this.logger.warn(`S3 exists: file ${this.getFullKey(name)} not found in bucket ${bucket}`); return false; } } @@ -77,16 +79,24 @@ export class S3Service { // return this.run(command); // } + /** + * Saves an object to a S3 bucket. + * Uses an internal retry mechanism in case of errors. + */ async saveObject(name: string, body: any, bucket: string = MEET_S3_BUCKET): Promise { + const fullKey = this.getFullKey(name); + try { const command = new PutObjectCommand({ Bucket: bucket, - Key: name, + Key: fullKey, Body: JSON.stringify(body) }); - return await this.run(command); + const result = await this.retryOperation(() => this.run(command)); + this.logger.info(`S3 saveObject: successfully saved object ${fullKey} in bucket ${bucket}`); + return result; } catch (error: any) { - this.logger.error(`Error putting object in S3: ${error}`); + this.logger.error(`S3 saveObject: error putting object ${fullKey} in bucket ${bucket}: ${error}`); if (error.code === 'ECONNREFUSED') { throw errorS3NotAvailable(error); @@ -100,17 +110,21 @@ export class S3Service { * Deletes an object from an S3 bucket. * * @param name - The name of the object to delete. - * @param bucket - The name of the S3 bucket (optional, defaults to MEET_S3_BUCKET). + * @param bucket - The name of the S3 bucket (optional, defaults to the `${MEET_S3_BUCKET}/${MEET_S3_SUBBUCKET}` * @returns A promise that resolves to the result of the delete operation. * @throws Throws an error if there was an error deleting the object. */ async deleteObject(name: string, bucket: string = MEET_S3_BUCKET): Promise { + const fullKey = this.getFullKey(name); + try { - this.logger.verbose(`Deleting object in S3: ${name}`); + this.logger.verbose(`S3 deleteObject: attempting to delete object ${fullKey} in bucket ${bucket}`); const command = new DeleteObjectCommand({ Bucket: bucket, Key: name }); - return await this.run(command); + const result = await this.run(command); + this.logger.info(`S3 deleteObject: successfully deleted object ${fullKey} in bucket ${bucket}`); + return result; } catch (error) { - this.logger.error(`Error deleting object in S3: ${error}`); + this.logger.error(`S3 deleteObject: error deleting object ${fullKey} in bucket ${bucket}: ${error}`); throw internalError(error); } } @@ -126,22 +140,24 @@ export class S3Service { * @throws {Error} - Throws an error if there is an issue listing the objects. */ async listObjects( - subbucket = '', + additionalPrefix = '', searchPattern = '', bucket: string = MEET_S3_BUCKET, maxObjects = 1000 ): Promise { - const prefix = subbucket ? `${subbucket}/` : ''; + const basePrefix = `${MEET_S3_SUBBUCKET}/${additionalPrefix}`.replace(/\/+$/, ''); let allContents: _Object[] = []; let continuationToken: string | undefined = undefined; let isTruncated = true; let fullResponse: ListObjectsV2CommandOutput | undefined = undefined; try { + this.logger.verbose(`S3 listObjects: starting listing objects with prefix "${basePrefix}"`); + while (isTruncated) { const command = new ListObjectsV2Command({ Bucket: bucket, - Prefix: prefix, + Prefix: basePrefix, MaxKeys: maxObjects, ContinuationToken: continuationToken }); @@ -166,6 +182,7 @@ export class S3Service { // Update the loop control variables isTruncated = response.IsTruncated ?? false; continuationToken = response.NextContinuationToken; + this.logger.verbose(`S3 listObjects: fetched ${objects.length} objects; isTruncated=${isTruncated}`); } if (fullResponse) { @@ -176,9 +193,10 @@ export class S3Service { fullResponse.KeyCount = allContents.length; } + this.logger.info(`S3 listObjects: total objects found under prefix "${basePrefix}": ${allContents.length}`); return fullResponse!; } catch (error) { - this.logger.error(`Error listing objects: ${error}`); + this.logger.error(`S3 listObjects: error listing objects under prefix "${basePrefix}": ${error}`); if ((error as any).code === 'ECONNREFUSED') { throw errorS3NotAvailable(error); @@ -189,13 +207,19 @@ export class S3Service { } async getObjectAsJson(name: string, bucket: string = MEET_S3_BUCKET): Promise { + const fullKey = this.getFullKey(name); + try { - const obj = await this.getObject(name, bucket); + const obj = await this.getObject(fullKey, bucket); const str = await obj.Body?.transformToString(); - return JSON.parse(str as string); + const parsed = JSON.parse(str as string); + this.logger.info( + `S3 getObjectAsJson: successfully retrieved and parsed object ${fullKey} from bucket ${bucket}` + ); + return parsed; } catch (error: any) { if (error.name === 'NoSuchKey') { - this.logger.warn(`Object '${name}' does not exist in S3`); + this.logger.warn(`S3 getObjectAsJson: object '${fullKey}' does not exist in bucket ${bucket}`); return undefined; } @@ -203,22 +227,34 @@ export class S3Service { throw errorS3NotAvailable(error); } - this.logger.error(`Error getting object from S3. Maybe it has been deleted: ${error}`); + this.logger.error(`S3 getObjectAsJson: error retrieving object ${fullKey} from bucket ${bucket}: ${error}`); throw internalError(error); } } - async getObjectAsStream(name: string, bucket: string = MEET_S3_BUCKET, range?: { start: number; end: number }) { + async getObjectAsStream( + name: string, + bucket: string = MEET_S3_BUCKET, + range?: { start: number; end: number } + ): Promise { + const fullKey = this.getFullKey(name); + try { - const obj = await this.getObject(name, bucket, range); + const obj = await this.getObject(fullKey, bucket, range); if (obj.Body) { + this.logger.info( + `S3 getObjectAsStream: successfully retrieved object ${name} stream from bucket ${bucket}` + ); + return obj.Body as Readable; } else { throw new Error('Empty body response'); } } catch (error: any) { - this.logger.error(`Error getting object from S3: ${error}`); + this.logger.error( + `S3 getObjectAsStream: error retrieving stream for object ${fullKey} from bucket ${bucket}: ${error}` + ); if (error.code === 'ECONNREFUSED') { throw errorS3NotAvailable(error); @@ -230,31 +266,48 @@ export class S3Service { async getHeaderObject(name: string, bucket: string = MEET_S3_BUCKET): Promise { try { + const fullKey = this.getFullKey(name); const headParams: HeadObjectCommand = new HeadObjectCommand({ Bucket: bucket, - Key: name + Key: fullKey }); + this.logger.verbose(`S3 getHeaderObject: requesting header for object ${fullKey} in bucket ${bucket}`); return await this.run(headParams); } catch (error) { - this.logger.error(`Error getting header object from S3 in ${name}: ${error}`); + this.logger.error( + `S3 getHeaderObject: error getting header for object ${this.getFullKey(name)} in bucket ${bucket}: ${error}` + ); + throw internalError(error); } } quit() { this.s3.destroy(); + this.logger.info('S3 client destroyed'); } - private async getObject( + /** + * Prepares a full key path by prefixing the object's name with the subbucket. + * All operations are performed under MEET_S3_BUCKET/MEET_S3_SUBBUCKET. + */ + protected getFullKey(name: string): string { + return `${MEET_S3_SUBBUCKET}/${name}`; + } + + protected async getObject( name: string, bucket: string = MEET_S3_BUCKET, range?: { start: number; end: number } ): Promise { + const fullKey = this.getFullKey(name); + const command = new GetObjectCommand({ Bucket: bucket, - Key: name, + Key: fullKey, Range: range ? `bytes=${range.start}-${range.end}` : undefined }); + this.logger.verbose(`S3 getObject: requesting object ${fullKey} from bucket ${bucket}`); return await this.run(command); } @@ -262,4 +315,39 @@ export class S3Service { protected async run(command: any) { return this.s3.send(command); } + + /** + * Retries a given asynchronous operation with exponential backoff. + */ + protected async retryOperation(operation: () => Promise): Promise { + let attempt = 0; + let delayMs = Number(MEET_S3_INITIAL_RETRY_DELAY_MS); + const maxRetries = Number(MEET_S3_MAX_RETRIES_ATTEMPTS_ON_SAVE_ERROR); + + while (true) { + try { + this.logger.verbose(`S3 retryOperation: attempt ${attempt + 1}`); + return await operation(); + } catch (error) { + attempt++; + + if (attempt >= maxRetries) { + this.logger.error(`S3 retryOperation: operation failed after ${maxRetries} attempts`); + throw error; + } + + this.logger.warn(`S3 retryOperation: attempt ${attempt} failed. Retrying in ${delayMs}ms...`); + await this.sleep(delayMs); + // Exponential back off: delay increases by a factor of 2 + delayMs *= 2; + } + } + } + + /** + * Internal helper to delay execution. + */ + protected sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } }