backend: extend storage provider to manage user data with new methods for retrieving and saving users
This commit is contained in:
parent
55bc8726d0
commit
7d128ed699
@ -58,5 +58,5 @@ export const registerDependencies = () => {
|
||||
export const initializeEagerServices = async () => {
|
||||
// Force the creation of services that need to be initialized at startup
|
||||
container.get(RecordingService);
|
||||
await container.get(MeetStorageService).initializeGlobalPreferences();
|
||||
await container.get(MeetStorageService).initialize();
|
||||
};
|
||||
|
||||
@ -24,6 +24,7 @@ const INTERNAL_CONFIG = {
|
||||
S3_INITIAL_RETRY_DELAY_MS: '100',
|
||||
S3_ROOMS_PREFIX: 'rooms',
|
||||
S3_RECORDINGS_PREFIX: 'recordings',
|
||||
S3_USERS_PREFIX: 'users',
|
||||
|
||||
// Garbage collection and recording lock intervals
|
||||
ROOM_GC_INTERVAL: '1h' as StringValue, // e.g. garbage collector interval for rooms
|
||||
|
||||
@ -4,7 +4,8 @@ export const enum RedisKeyPrefix {
|
||||
|
||||
export const enum RedisKeyName {
|
||||
GLOBAL_PREFERENCES = `${RedisKeyPrefix.BASE}global_preferences`,
|
||||
ROOM = `${RedisKeyPrefix.BASE}room:`
|
||||
ROOM = `${RedisKeyPrefix.BASE}room:`,
|
||||
USER = `${RedisKeyPrefix.BASE}user:`,
|
||||
}
|
||||
|
||||
export const enum RedisLockPrefix {
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { PutObjectCommandOutput } from '@aws-sdk/client-s3';
|
||||
import { GlobalPreferences, MeetRecordingInfo, MeetRoom } from '@typings-ce';
|
||||
import { GlobalPreferences, MeetRecordingInfo, MeetRoom, User } from '@typings-ce';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import INTERNAL_CONFIG from '../../../config/internal-config.js';
|
||||
import { errorRecordingNotFound, OpenViduMeetError, RedisKeyName } from '../../../models/index.js';
|
||||
@ -11,8 +11,8 @@ import { Readable } from 'stream';
|
||||
* Implementation of the StorageProvider interface using AWS S3 for persistent storage
|
||||
* with Redis caching for improved performance.
|
||||
*
|
||||
* This class provides operations for storing and retrieving application preferences and room data
|
||||
* with a two-tiered storage approach:
|
||||
* This class provides operations for storing and retrieving application preferences,
|
||||
* rooms, recordings metadata and users with a two-tiered storage approach:
|
||||
* - Redis is used as a primary cache for fast access
|
||||
* - S3 serves as the persistent storage layer and fallback when data is not in Redis
|
||||
*
|
||||
@ -21,6 +21,8 @@ import { Readable } from 'stream';
|
||||
*
|
||||
* @template GPrefs - Type for global preferences data, defaults to GlobalPreferences
|
||||
* @template MRoom - Type for room data, defaults to MeetRoom
|
||||
* @template MRec - Type for recording metadata, defaults to MeetRecordingInfo
|
||||
* @template MUser - Type for user data, defaults to User
|
||||
*
|
||||
* @implements {StorageProvider}
|
||||
*/
|
||||
@ -28,10 +30,12 @@ import { Readable } from 'stream';
|
||||
export class S3StorageProvider<
|
||||
GPrefs extends GlobalPreferences = GlobalPreferences,
|
||||
MRoom extends MeetRoom = MeetRoom,
|
||||
MRec extends MeetRecordingInfo = MeetRecordingInfo
|
||||
MRec extends MeetRecordingInfo = MeetRecordingInfo,
|
||||
MUser extends User = User
|
||||
> implements StorageProvider
|
||||
{
|
||||
protected readonly S3_GLOBAL_PREFERENCES_KEY = `global-preferences.json`;
|
||||
|
||||
constructor(
|
||||
@inject(LoggerService) protected logger: LoggerService,
|
||||
@inject(S3Service) protected s3Service: S3Service,
|
||||
@ -479,15 +483,13 @@ export class S3StorageProvider<
|
||||
*/
|
||||
async getRecordingMetadataByPath(recordingPath: string): Promise<MRec> {
|
||||
try {
|
||||
return await this.s3Service.getObjectAsJson(recordingPath) as MRec;
|
||||
return (await this.s3Service.getObjectAsJson(recordingPath)) as MRec;
|
||||
} catch (error) {
|
||||
this.handleError(error, `Error fetching recording metadata for path ${recordingPath}`);
|
||||
throw error;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async saveRecordingMetadata(recordingInfo: MRec): Promise<MRec> {
|
||||
try {
|
||||
const metadataPath = RecordingHelper.buildMetadataFilePath(recordingInfo.recordingId);
|
||||
@ -516,6 +518,77 @@ export class S3StorageProvider<
|
||||
}
|
||||
}
|
||||
|
||||
async getUser(username: string): Promise<MUser | null> {
|
||||
try {
|
||||
const userKey = RedisKeyName.USER + username;
|
||||
const user: MUser | null = await this.getFromRedis<MUser>(userKey);
|
||||
|
||||
if (!user) {
|
||||
this.logger.debug(`User ${username} not found in Redis. Fetching from S3...`);
|
||||
const s3Path = `${INTERNAL_CONFIG.S3_USERS_PREFIX}/${username}.json`;
|
||||
return await this.getFromS3<MUser>(s3Path);
|
||||
}
|
||||
|
||||
this.logger.debug(`User ${username} retrieved from Redis`);
|
||||
return user;
|
||||
} catch (error) {
|
||||
this.handleError(error, `Error fetching user ${username}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves a user object to both S3 storage and Redis cache atomically.
|
||||
*
|
||||
* This method attempts to persist the user data in S3 (as a JSON file) and in Redis (as a serialized string).
|
||||
* If both operations succeed, the user is considered saved and the method returns the user object.
|
||||
* If either operation fails, the method attempts to roll back any successful operation to maintain consistency.
|
||||
* In case of failure, the error is logged and rethrown after rollback attempts.
|
||||
*
|
||||
* @param user - The user object to be saved.
|
||||
* @returns A promise that resolves to the saved user object if both operations succeed.
|
||||
* @throws An error if either the S3 or Redis operation fails, after attempting rollback.
|
||||
*/
|
||||
async saveUser(user: MUser): Promise<MUser> {
|
||||
const userKey = RedisKeyName.USER + user.username;
|
||||
const redisPayload = JSON.stringify(user);
|
||||
const s3Path = `${INTERNAL_CONFIG.S3_USERS_PREFIX}/${user.username}.json`;
|
||||
|
||||
const [s3Result, redisResult] = await Promise.allSettled([
|
||||
this.s3Service.saveObject(s3Path, user),
|
||||
this.redisService.set(userKey, redisPayload, false)
|
||||
]);
|
||||
|
||||
if (s3Result.status === 'fulfilled' && redisResult.status === 'fulfilled') {
|
||||
this.logger.info(`User ${user.username} saved successfully`);
|
||||
return user;
|
||||
}
|
||||
|
||||
// Rollback any changes made by the successful operation
|
||||
if (s3Result.status === 'fulfilled') {
|
||||
try {
|
||||
await this.s3Service.deleteObjects([s3Path]);
|
||||
} catch (rollbackError) {
|
||||
this.logger.error(`Error rolling back S3 save for user ${user.username}: ${rollbackError}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (redisResult.status === 'fulfilled') {
|
||||
try {
|
||||
await this.redisService.delete(userKey);
|
||||
} catch (rollbackError) {
|
||||
this.logger.error(`Error rolling back Redis set for user ${user.username}: ${rollbackError}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Return the error that occurred first
|
||||
const failedOperation: PromiseRejectedResult =
|
||||
s3Result.status === 'rejected' ? s3Result : (redisResult as PromiseRejectedResult);
|
||||
const error = failedOperation.reason;
|
||||
this.handleError(error, `Error saving user ${user.username}`);
|
||||
throw error;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves an object of type U from Redis by the given key.
|
||||
* Returns null if the key is not found or an error occurs.
|
||||
|
||||
@ -1,21 +1,24 @@
|
||||
import { GlobalPreferences, MeetRecordingInfo, MeetRoom } from '@typings-ce';
|
||||
import { GlobalPreferences, MeetRecordingInfo, MeetRoom, User } from '@typings-ce';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
/**
|
||||
* An interface that defines the contract for storage providers in the OpenVidu Meet application.
|
||||
* Storage providers handle persistence of global application preferences and meeting room data.
|
||||
* Storage providers handle persistence of global application preferences, rooms, recordings metadata and users.
|
||||
*
|
||||
* @template GPrefs - The type of global preferences, extending GlobalPreferences
|
||||
* @template MRoom - The type of room data, extending MeetRoom
|
||||
* @template MRec - The type of recording metadata, extending MeetRecordingInfo
|
||||
* @template MUser - The type of user data, extending User
|
||||
*
|
||||
* Implementations of this interface should handle the persistent storage
|
||||
* of application settings and room information, which could be backed by
|
||||
* various storage solutions (database, file system, cloud storage, etc.).
|
||||
* of application settings, room information, recording metadata, and user data,
|
||||
* which could be backed by various storage solutions (database, file system, cloud storage, etc.).
|
||||
*/
|
||||
export interface StorageProvider<
|
||||
GPrefs extends GlobalPreferences = GlobalPreferences,
|
||||
MRoom extends MeetRoom = MeetRoom,
|
||||
MRec extends MeetRecordingInfo = MeetRecordingInfo
|
||||
MRec extends MeetRecordingInfo = MeetRecordingInfo,
|
||||
MUser extends User = User
|
||||
> {
|
||||
/**
|
||||
* Initializes the storage with default preferences if they are not already set.
|
||||
@ -77,9 +80,9 @@ export interface StorageProvider<
|
||||
*
|
||||
* @param maxItems - The maximum number of items to retrieve. If not provided, all items will be retrieved.
|
||||
* @param nextPageToken - The token for the next page of results. If not provided, the first page will be retrieved.
|
||||
* @returns A promise that resolves to an object containing ç
|
||||
* - the retrieved rooms,
|
||||
* - a boolean indicating if there are more items to retrieve
|
||||
* @returns A promise that resolves to an object containing:
|
||||
* - the retrieved rooms.
|
||||
* - a boolean indicating if there are more items to retrieve.
|
||||
* - an optional next page token.
|
||||
*/
|
||||
getMeetRooms(
|
||||
@ -94,7 +97,7 @@ export interface StorageProvider<
|
||||
/**
|
||||
* Retrieves the {@link MeetRoom}.
|
||||
*
|
||||
* @param roomId - The name of the room to retrieve.
|
||||
* @param roomId - The identifier of the room to retrieve.
|
||||
* @returns A promise that resolves to the OpenVidu Room, or null if not found.
|
||||
**/
|
||||
getMeetRoom(roomId: string): Promise<MRoom | null>;
|
||||
@ -103,14 +106,14 @@ export interface StorageProvider<
|
||||
* Saves the OpenVidu Meet Room.
|
||||
*
|
||||
* @param meetRoom - The OpenVidu Room to save.
|
||||
* @returns A promise that resolves to the saved
|
||||
* @returns A promise that resolves to the saved OpenVidu Room.
|
||||
**/
|
||||
saveMeetRoom(meetRoom: MRoom): Promise<MRoom>;
|
||||
|
||||
/**
|
||||
* Deletes OpenVidu Meet Rooms.
|
||||
*
|
||||
* @param roomIds - The room names to delete.
|
||||
* @param roomIds - The room IDs to delete.
|
||||
* @returns A promise that resolves when the room have been deleted.
|
||||
**/
|
||||
deleteMeetRooms(roomIds: string[]): Promise<void>;
|
||||
@ -194,6 +197,7 @@ export interface StorageProvider<
|
||||
start: number;
|
||||
}
|
||||
): Promise<Readable>;
|
||||
|
||||
/**
|
||||
* Deletes multiple recording binary files by their paths.
|
||||
*
|
||||
@ -201,4 +205,20 @@ export interface StorageProvider<
|
||||
* @returns A promise that resolves when the recording binary files have been deleted.
|
||||
*/
|
||||
deleteRecordingBinaryFilesByPaths(recordingPaths: string[]): Promise<void>;
|
||||
|
||||
/**
|
||||
* Retrieves the user data for a specific username.
|
||||
*
|
||||
* @param username - The username of the user to retrieve.
|
||||
* @returns A promise that resolves to the user data, or null if not found.
|
||||
*/
|
||||
getUser(username: string): Promise<MUser | null>;
|
||||
|
||||
/**
|
||||
* Saves the user data.
|
||||
*
|
||||
* @param user - The user data to save.
|
||||
* @returns A promise that resolves to the saved user data.
|
||||
*/
|
||||
saveUser(user: MUser): Promise<MUser>;
|
||||
}
|
||||
|
||||
@ -1,11 +1,17 @@
|
||||
import { AuthMode, AuthType, GlobalPreferences, MeetRecordingInfo, MeetRoom } from '@typings-ce';
|
||||
import { AuthMode, AuthType, GlobalPreferences, MeetRecordingInfo, MeetRoom, User, UserRole } from '@typings-ce';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import ms from 'ms';
|
||||
import { MEET_NAME_ID, MEET_SECRET, MEET_USER, MEET_WEBHOOK_ENABLED, MEET_WEBHOOK_URL } from '../../environment.js';
|
||||
import { Readable } from 'stream';
|
||||
import {
|
||||
MEET_ADMIN_SECRET,
|
||||
MEET_ADMIN_USER,
|
||||
MEET_NAME_ID,
|
||||
MEET_WEBHOOK_ENABLED,
|
||||
MEET_WEBHOOK_URL
|
||||
} from '../../environment.js';
|
||||
import { MeetLock, PasswordHelper } from '../../helpers/index.js';
|
||||
import { errorRoomNotFound, internalError, OpenViduMeetError } from '../../models/error.model.js';
|
||||
import { LoggerService, MutexService, StorageFactory, StorageProvider } from '../index.js';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
/**
|
||||
* A service for managing storage operations related to OpenVidu Meet rooms and preferences.
|
||||
@ -20,9 +26,11 @@ import { Readable } from 'stream';
|
||||
export class MeetStorageService<
|
||||
GPrefs extends GlobalPreferences = GlobalPreferences,
|
||||
MRoom extends MeetRoom = MeetRoom,
|
||||
MRec extends MeetRecordingInfo = MeetRecordingInfo
|
||||
MRec extends MeetRecordingInfo = MeetRecordingInfo,
|
||||
MUser extends User = User
|
||||
> {
|
||||
protected storageProvider: StorageProvider;
|
||||
|
||||
constructor(
|
||||
@inject(LoggerService) protected logger: LoggerService,
|
||||
@inject(StorageFactory) protected storageFactory: StorageFactory,
|
||||
@ -68,10 +76,10 @@ export class MeetStorageService<
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes default preferences if not already initialized.
|
||||
* Initializes default preferences if not already initialized and saves the admin user.
|
||||
* @returns {Promise<GPrefs>} Default global preferences.
|
||||
*/
|
||||
async initializeGlobalPreferences(): Promise<void> {
|
||||
async initialize(): Promise<void> {
|
||||
try {
|
||||
// Acquire a global lock to prevent multiple initializations at the same time when running in HA mode
|
||||
const lock = await this.mutexService.acquire(MeetLock.getGlobalPreferencesLock(), ms('30s'));
|
||||
@ -83,10 +91,17 @@ export class MeetStorageService<
|
||||
return;
|
||||
}
|
||||
|
||||
const preferences = await this.getDefaultPreferences();
|
||||
|
||||
this.logger.verbose('Initializing global preferences with default values');
|
||||
const preferences = await this.getDefaultPreferences();
|
||||
await this.storageProvider.initialize(preferences);
|
||||
|
||||
// Save the default admin user
|
||||
const admin = {
|
||||
username: MEET_ADMIN_USER,
|
||||
passwordHash: await PasswordHelper.hashPassword(MEET_ADMIN_SECRET),
|
||||
roles: [UserRole.ADMIN, UserRole.USER]
|
||||
} as MUser;
|
||||
await this.saveUser(admin);
|
||||
} catch (error) {
|
||||
this.handleError(error, 'Error initializing default preferences');
|
||||
}
|
||||
@ -101,7 +116,7 @@ export class MeetStorageService<
|
||||
|
||||
if (preferences) return preferences as GPrefs;
|
||||
|
||||
await this.initializeGlobalPreferences();
|
||||
await this.initialize();
|
||||
preferences = await this.storageProvider.getGlobalPreferences();
|
||||
|
||||
if (!preferences) {
|
||||
@ -125,8 +140,8 @@ export class MeetStorageService<
|
||||
/**
|
||||
* Saves the meet room to the storage provider.
|
||||
*
|
||||
* @param meetRoom - The meeting room object to be saved
|
||||
* @returns A promise that resolves to the saved meeting room object
|
||||
* @param meetRoom - The room object to be saved
|
||||
* @returns A promise that resolves to the saved room object
|
||||
*/
|
||||
async saveMeetRoom(meetRoom: MRoom): Promise<MRoom> {
|
||||
this.logger.info(`Saving OpenVidu room ${meetRoom.roomId}`);
|
||||
@ -134,12 +149,12 @@ export class MeetStorageService<
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a paginated list of meeting rooms from the storage provider.
|
||||
* Retrieves a paginated list of rooms from the storage provider.
|
||||
*
|
||||
* @param maxItems - Optional maximum number of rooms to retrieve in a single request
|
||||
* @param nextPageToken - Optional token for pagination to get the next page of results
|
||||
* @returns A promise that resolves to an object containing:
|
||||
* - rooms: Array of MRoom objects representing the meeting rooms
|
||||
* - rooms: Array of MRoom objects representing the rooms
|
||||
* - isTruncated: Boolean indicating if there are more results available
|
||||
* - nextPageToken: Optional token for retrieving the next page of results
|
||||
*/
|
||||
@ -159,7 +174,7 @@ export class MeetStorageService<
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the preferences associated with a specific room.
|
||||
* Retrieves the room by its unique identifier.
|
||||
*
|
||||
* @param roomId - The unique identifier for the room.
|
||||
* @returns A promise that resolves to the room's preferences.
|
||||
@ -177,7 +192,7 @@ export class MeetStorageService<
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes multiple meeting rooms from storage.
|
||||
* Deletes multiple rooms from storage.
|
||||
*
|
||||
* @param roomIds - Array of room identifiers to be deleted
|
||||
* @returns A promise that resolves when all rooms have been successfully deleted
|
||||
@ -305,6 +320,27 @@ export class MeetStorageService<
|
||||
return this.storageProvider.deleteRecordingBinaryFilesByPaths(recordingPaths);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves user data for a specific username.
|
||||
*
|
||||
* @param username - The username of the user to retrieve
|
||||
* @returns A promise that resolves to the user data, or null if not found
|
||||
*/
|
||||
async getUser(username: string): Promise<MUser | null> {
|
||||
return this.storageProvider.getUser(username) as Promise<MUser | null>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves user data to the storage provider.
|
||||
*
|
||||
* @param user - The user data to be saved
|
||||
* @returns A promise that resolves to the saved user data
|
||||
*/
|
||||
async saveUser(user: MUser): Promise<MUser> {
|
||||
this.logger.info(`Saving user data for ${user.username}`);
|
||||
return this.storageProvider.saveUser(user) as Promise<MUser>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default global preferences.
|
||||
* @returns {GPrefs}
|
||||
@ -317,19 +353,11 @@ export class MeetStorageService<
|
||||
url: MEET_WEBHOOK_URL
|
||||
},
|
||||
securityPreferences: {
|
||||
roomCreationPolicy: {
|
||||
allowRoomCreation: true,
|
||||
requireAuthentication: true
|
||||
},
|
||||
authentication: {
|
||||
authMode: AuthMode.NONE,
|
||||
method: {
|
||||
type: AuthType.SINGLE_USER,
|
||||
credentials: {
|
||||
username: MEET_USER,
|
||||
passwordHash: await PasswordHelper.hashPassword(MEET_SECRET)
|
||||
}
|
||||
}
|
||||
authMethod: {
|
||||
type: AuthType.SINGLE_USER
|
||||
},
|
||||
authModeToAccessRoom: AuthMode.NONE
|
||||
}
|
||||
}
|
||||
} as GPrefs;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user