diff --git a/meet-ce/backend/openapi/components/requestBodies/internal/create-ai-assistant-request.yaml b/meet-ce/backend/openapi/components/requestBodies/internal/create-ai-assistant-request.yaml new file mode 100644 index 00000000..e8a56bb0 --- /dev/null +++ b/meet-ce/backend/openapi/components/requestBodies/internal/create-ai-assistant-request.yaml @@ -0,0 +1,6 @@ +description: Create AI assistant activation request +required: true +content: + application/json: + schema: + $ref: '../../schemas/internal/ai-assistant-create-request.yaml' diff --git a/meet-ce/backend/openapi/components/responses/internal/success-create-ai-assistant.yaml b/meet-ce/backend/openapi/components/responses/internal/success-create-ai-assistant.yaml new file mode 100644 index 00000000..caea3448 --- /dev/null +++ b/meet-ce/backend/openapi/components/responses/internal/success-create-ai-assistant.yaml @@ -0,0 +1,5 @@ +description: Successfully created or reused AI assistant activation +content: + application/json: + schema: + $ref: '../../schemas/internal/ai-assistant-create-response.yaml' diff --git a/meet-ce/backend/openapi/components/schemas/internal/ai-assistant-create-request.yaml b/meet-ce/backend/openapi/components/schemas/internal/ai-assistant-create-request.yaml new file mode 100644 index 00000000..50a085e8 --- /dev/null +++ b/meet-ce/backend/openapi/components/schemas/internal/ai-assistant-create-request.yaml @@ -0,0 +1,37 @@ +type: object +required: + # - scope + - capabilities +properties: + # scope: + # type: object + # required: + # - resourceType + # - resourceIds + # properties: + # resourceType: + # type: string + # enum: ['meeting'] + # description: Scope resource type where assistant will be activated. + # example: meeting + # resourceIds: + # type: array + # minItems: 1 + # items: + # type: string + # minLength: 1 + # description: List of target resource ids. + # example: ['meeting_123'] + capabilities: + type: array + minItems: 1 + items: + type: object + required: + - name + properties: + name: + type: string + enum: ['live_captions'] + description: AI capability to activate. + example: live_captions diff --git a/meet-ce/backend/openapi/components/schemas/internal/ai-assistant-create-response.yaml b/meet-ce/backend/openapi/components/schemas/internal/ai-assistant-create-response.yaml new file mode 100644 index 00000000..03fd8844 --- /dev/null +++ b/meet-ce/backend/openapi/components/schemas/internal/ai-assistant-create-response.yaml @@ -0,0 +1,14 @@ +type: object +required: + - id + - status +properties: + id: + type: string + description: Identifier of the assistant activation. + example: asst_123 + status: + type: string + enum: ['active'] + description: Current assistant activation state. + example: active diff --git a/meet-ce/backend/openapi/openvidu-meet-internal-api.yaml b/meet-ce/backend/openapi/openvidu-meet-internal-api.yaml index 96f13ffa..573002a1 100644 --- a/meet-ce/backend/openapi/openvidu-meet-internal-api.yaml +++ b/meet-ce/backend/openapi/openvidu-meet-internal-api.yaml @@ -42,6 +42,10 @@ paths: $ref: './paths/internal/meetings.yaml#/~1meetings~1{roomId}~1participants~1{participantIdentity}' /meetings/{roomId}/participants/{participantIdentity}/role: $ref: './paths/internal/meetings.yaml#/~1meetings~1{roomId}~1participants~1{participantIdentity}~1role' + /ai/assistants: + $ref: './paths/internal/ai-assistant.yaml#/~1ai~1assistants' + /ai/assistants/{assistantId}: + $ref: './paths/internal/ai-assistant.yaml#/~1ai~1assistants~1{assistantId}' /analytics: $ref: './paths/internal/analytics.yaml#/~1analytics' @@ -63,5 +67,9 @@ components: $ref: components/schemas/meet-room.yaml MeetAnalytics: $ref: components/schemas/internal/meet-analytics.yaml + AiAssistantCreateRequest: + $ref: components/schemas/internal/ai-assistant-create-request.yaml + AiAssistantCreateResponse: + $ref: components/schemas/internal/ai-assistant-create-response.yaml Error: $ref: components/schemas/error.yaml diff --git a/meet-ce/backend/openapi/paths/internal/ai-assistant.yaml b/meet-ce/backend/openapi/paths/internal/ai-assistant.yaml new file mode 100644 index 00000000..5454791c --- /dev/null +++ b/meet-ce/backend/openapi/paths/internal/ai-assistant.yaml @@ -0,0 +1,56 @@ +/ai/assistants: + post: + operationId: createAiAssistant + summary: Create AI assistant + description: | + Activates AI assistance. + + > Currently only meeting AI Assistand and `live_captions` capability is supported. + tags: + - Internal API - AI Assistants + security: + - roomMemberTokenHeader: [] + requestBody: + $ref: '../../components/requestBodies/internal/create-ai-assistant-request.yaml' + responses: + '200': + $ref: '../../components/responses/internal/success-create-ai-assistant.yaml' + '401': + $ref: '../../components/responses/unauthorized-error.yaml' + '403': + $ref: '../../components/responses/forbidden-error.yaml' + '422': + $ref: '../../components/responses/validation-error.yaml' + '500': + $ref: '../../components/responses/internal-server-error.yaml' + +/ai/assistants/{assistantId}: + delete: + operationId: cancelAiAssistant + summary: Cancel AI assistant + description: | + Cancels AI assistant. + + The assistant process (live_captions) is stopped only when the last participant cancels it. + tags: + - Internal API - AI Assistants + security: + - roomMemberTokenHeader: [] + parameters: + - in: path + name: assistantId + required: true + schema: + type: string + minLength: 1 + description: Identifier of the assistant activation returned by create operation. + example: asst_123 + responses: + '204': + description: AI assistant canceled successfully. + '401': + $ref: '../../components/responses/unauthorized-error.yaml' + '422': + $ref: '../../components/responses/validation-error.yaml' + '500': + $ref: '../../components/responses/internal-server-error.yaml' diff --git a/meet-ce/backend/openapi/tags/tags.yaml b/meet-ce/backend/openapi/tags/tags.yaml index 9c520434..27862614 100644 --- a/meet-ce/backend/openapi/tags/tags.yaml +++ b/meet-ce/backend/openapi/tags/tags.yaml @@ -16,5 +16,7 @@ description: Operations related to managing OpenVidu Meet rooms - name: Internal API - Meetings description: Operations related to managing meetings in OpenVidu Meet rooms +- name: Internal API - AI Assistants + description: High-level operations to manage AI assistance capabilities in meetings - name: Internal API - Recordings description: Operations related to managing OpenVidu Meet recordings diff --git a/meet-ce/backend/src/config/dependency-injector.config.ts b/meet-ce/backend/src/config/dependency-injector.config.ts index 7f889ca1..1650bb85 100644 --- a/meet-ce/backend/src/config/dependency-injector.config.ts +++ b/meet-ce/backend/src/config/dependency-injector.config.ts @@ -54,6 +54,7 @@ import { LivekitWebhookService } from '../services/livekit-webhook.service.js'; import { RoomScheduledTasksService } from '../services/room-scheduled-tasks.service.js'; import { RecordingScheduledTasksService } from '../services/recording-scheduled-tasks.service.js'; import { AnalyticsService } from '../services/analytics.service.js'; +import { AiAssistantService } from '../services/ai-assistant.service.js'; export const container: Container = new Container(); @@ -113,6 +114,7 @@ export const registerDependencies = () => { container.bind(RoomScheduledTasksService).toSelf().inSingletonScope(); container.bind(RecordingScheduledTasksService).toSelf().inSingletonScope(); container.bind(AnalyticsService).toSelf().inSingletonScope(); + container.bind(AiAssistantService).toSelf().inSingletonScope(); }; const configureStorage = (storageMode: string) => { diff --git a/meet-ce/backend/src/config/internal-config.ts b/meet-ce/backend/src/config/internal-config.ts index de7baa18..3e6d96a9 100644 --- a/meet-ce/backend/src/config/internal-config.ts +++ b/meet-ce/backend/src/config/internal-config.ts @@ -49,7 +49,7 @@ export const INTERNAL_CONFIG = { PARTICIPANT_MAX_CONCURRENT_NAME_REQUESTS: '20', // Maximum number of request by the same name at the same time allowed PARTICIPANT_NAME_RESERVATION_TTL: '12h' as StringValue, // Time-to-live for participant name reservations - CAPTIONS_AGENT_NAME: 'agent-speech-processing', + CAPTIONS_AGENT_NAME: 'speech-processing', // MongoDB Schema Versions // These define the current schema version for each collection diff --git a/meet-ce/backend/src/controllers/ai-assistant.controller.ts b/meet-ce/backend/src/controllers/ai-assistant.controller.ts new file mode 100644 index 00000000..f27d77a3 --- /dev/null +++ b/meet-ce/backend/src/controllers/ai-assistant.controller.ts @@ -0,0 +1,69 @@ +import { Request, Response } from 'express'; +import { container } from '../config/dependency-injector.config.js'; +import { handleError } from '../models/error.model.js'; +import { AiAssistantService } from '../services/ai-assistant.service.js'; +import { LoggerService } from '../services/logger.service.js'; +import { RequestSessionService } from '../services/request-session.service.js'; +import { TokenService } from '../services/token.service.js'; +import { getRoomMemberToken } from '../utils/token.utils.js'; + +const getRoomMemberIdentityFromRequest = async (req: Request): Promise => { + const tokenService = container.get(TokenService); + const token = getRoomMemberToken(req); + + if (!token) { + throw new Error('Room member token not found'); + } + + const claims = await tokenService.verifyToken(token); + + if (!claims.sub) { + throw new Error('Room member token does not include participant identity'); + } + + return claims.sub; +}; + +export const createAssistant = async (req: Request, res: Response) => { + const logger = container.get(LoggerService); + const requestSessionService = container.get(RequestSessionService); + const aiAssistantService = container.get(AiAssistantService); + // const payload: MeetCreateAssistantRequest = req.body; + const roomId = requestSessionService.getRoomIdFromToken(); + + if (!roomId) { + return handleError(res, new Error('Could not resolve room from token'), 'creating assistant'); + } + + try { + const participantIdentity = await getRoomMemberIdentityFromRequest(req); + logger.verbose(`Creating assistant for participant '${participantIdentity}' in room '${roomId}'`); + const assistant = await aiAssistantService.createLiveCaptionsAssistant(roomId, participantIdentity); + return res.status(200).json(assistant); + } catch (error) { + handleError(res, error, `creating assistant in room '${roomId}'`); + } +}; + +export const cancelAssistant = async (req: Request, res: Response) => { + const logger = container.get(LoggerService); + const requestSessionService = container.get(RequestSessionService); + const aiAssistantService = container.get(AiAssistantService); + const { assistantId } = req.params; + const roomId = requestSessionService.getRoomIdFromToken(); + + if (!roomId) { + return handleError(res, new Error('Could not resolve room from token'), 'canceling assistant'); + } + + try { + const participantIdentity = await getRoomMemberIdentityFromRequest(req); + logger.verbose( + `Canceling assistant '${assistantId}' for participant '${participantIdentity}' in room '${roomId}'` + ); + await aiAssistantService.cancelAssistant(assistantId, roomId, participantIdentity); + return res.status(204).send(); + } catch (error) { + handleError(res, error, `canceling assistant '${assistantId}' in room '${roomId}'`); + } +}; diff --git a/meet-ce/backend/src/helpers/redis.helper.ts b/meet-ce/backend/src/helpers/redis.helper.ts index 62a3b0f9..1b1ef9ea 100644 --- a/meet-ce/backend/src/helpers/redis.helper.ts +++ b/meet-ce/backend/src/helpers/redis.helper.ts @@ -45,4 +45,16 @@ export class MeetLock { return `${RedisLockPrefix.BASE}${RedisLockName.WEBHOOK}_${webhookEvent.event}_${webhookEvent.id}`; } + + static getAiAssistantLock(roomId: string, capabilityName: string): string { + if (!roomId) { + throw new Error('roomId must be a non-empty string'); + } + + if (!capabilityName) { + throw new Error('capabilityName must be a non-empty string'); + } + + return `${RedisLockPrefix.BASE}${RedisLockName.AI_ASSISTANT}_${roomId}_${capabilityName}`; + } } diff --git a/meet-ce/backend/src/middlewares/index.ts b/meet-ce/backend/src/middlewares/index.ts index 7ee12db5..2c2ca197 100644 --- a/meet-ce/backend/src/middlewares/index.ts +++ b/meet-ce/backend/src/middlewares/index.ts @@ -9,6 +9,7 @@ export * from './room.middleware.js'; // Request validators export * from './request-validators/auth-validator.middleware.js'; +export * from './request-validators/ai-assistant-validator.middleware.js'; export * from './request-validators/config-validator.middleware.js'; export * from './request-validators/meeting-validator.middleware.js'; export * from './request-validators/recording-validator.middleware.js'; diff --git a/meet-ce/backend/src/middlewares/request-validators/ai-assistant-validator.middleware.ts b/meet-ce/backend/src/middlewares/request-validators/ai-assistant-validator.middleware.ts new file mode 100644 index 00000000..1a6bc0b3 --- /dev/null +++ b/meet-ce/backend/src/middlewares/request-validators/ai-assistant-validator.middleware.ts @@ -0,0 +1,26 @@ +import { NextFunction, Request, Response } from 'express'; +import { rejectUnprocessableRequest } from '../../models/error.model.js'; +import { AssistantIdSchema, CreateAssistantReqSchema } from '../../models/zod-schemas/ai-assistant.schema.js'; + +export const validateCreateAssistantReq = (req: Request, res: Response, next: NextFunction) => { + const { success, error, data } = CreateAssistantReqSchema.safeParse(req.body); + + if (!success) { + return rejectUnprocessableRequest(res, error); + } + + req.body = data; + next(); +}; + +export const validateAssistantIdPathParam = (req: Request, res: Response, next: NextFunction) => { + const { success, error, data } = AssistantIdSchema.safeParse(req.params.assistantId); + + if (!success) { + error.errors[0].path = ['assistantId']; + return rejectUnprocessableRequest(res, error); + } + + req.params.assistantId = data; + next(); +}; diff --git a/meet-ce/backend/src/models/redis.model.ts b/meet-ce/backend/src/models/redis.model.ts index 97977c00..58d82f9d 100644 --- a/meet-ce/backend/src/models/redis.model.ts +++ b/meet-ce/backend/src/models/redis.model.ts @@ -5,7 +5,9 @@ export const enum RedisKeyName { ROOM_PARTICIPANTS = `${REDIS_KEY_PREFIX}room_participants:`, // Stores released numeric suffixes (per base name) in a sorted set, so that freed numbers // can be reused efficiently instead of always incrementing to the next highest number. - PARTICIPANT_NAME_POOL = `${REDIS_KEY_PREFIX}participant_pool:` + PARTICIPANT_NAME_POOL = `${REDIS_KEY_PREFIX}participant_pool:`, + // Tracks participant-level assistant capability state in a room. + AI_ASSISTANT_PARTICIPANT_STATE = `${REDIS_KEY_PREFIX}ai_assistant:participant_state:` } export const enum RedisLockPrefix { @@ -18,5 +20,6 @@ export const enum RedisLockName { SCHEDULED_TASK = 'scheduled_task', STORAGE_INITIALIZATION = 'storage_initialization', MIGRATION = 'migration', - WEBHOOK = 'webhook' + WEBHOOK = 'webhook', + AI_ASSISTANT = 'ai_assistant' } diff --git a/meet-ce/backend/src/models/zod-schemas/ai-assistant.schema.ts b/meet-ce/backend/src/models/zod-schemas/ai-assistant.schema.ts new file mode 100644 index 00000000..f2e17049 --- /dev/null +++ b/meet-ce/backend/src/models/zod-schemas/ai-assistant.schema.ts @@ -0,0 +1,34 @@ +import { MeetAssistantCapabilityName } from '@openvidu-meet/typings'; +import { z } from 'zod'; + +export const CreateAssistantReqSchema = z.object({ + // scope: z.object({ + // resourceType: z.nativeEnum(MeetAssistantScopeResourceType), + // resourceIds: z.array(z.string().trim().min(1)).min(1) + // }), + capabilities: z + .array( + z.object({ + name: z.string() + }) + ) + .min(1) + .transform((capabilities) => { + const validValues = Object.values(MeetAssistantCapabilityName); + + // Filter out invalid capabilities + const filtered = capabilities.filter((cap) => + validValues.includes(cap.name as MeetAssistantCapabilityName) + ); + + // Remove duplicates based on capability name + const unique = Array.from(new Map(filtered.map((cap) => [cap.name, cap])).values()); + + return unique; + }) + .refine((caps) => caps.length > 0, { + message: 'At least one valid capability is required' + }) +}); + +export const AssistantIdSchema = z.string().trim().min(1); diff --git a/meet-ce/backend/src/routes/ai-assistant.routes.ts b/meet-ce/backend/src/routes/ai-assistant.routes.ts new file mode 100644 index 00000000..90c35cce --- /dev/null +++ b/meet-ce/backend/src/routes/ai-assistant.routes.ts @@ -0,0 +1,26 @@ +import bodyParser from 'body-parser'; +import { Router } from 'express'; +import * as aiAssistantCtrl from '../controllers/ai-assistant.controller.js'; +import { roomMemberTokenValidator, withAuth } from '../middlewares/auth.middleware.js'; +import { + validateAssistantIdPathParam, + validateCreateAssistantReq +} from '../middlewares/request-validators/ai-assistant-validator.middleware.js'; + +export const aiAssistantRouter: Router = Router(); +aiAssistantRouter.use(bodyParser.urlencoded({ extended: true })); +aiAssistantRouter.use(bodyParser.json()); + +aiAssistantRouter.post( + '/assistants', + withAuth(roomMemberTokenValidator), + validateCreateAssistantReq, + aiAssistantCtrl.createAssistant +); + +aiAssistantRouter.delete( + '/assistants/:assistantId', + withAuth(roomMemberTokenValidator), + validateAssistantIdPathParam, + aiAssistantCtrl.cancelAssistant +); diff --git a/meet-ce/backend/src/routes/index.ts b/meet-ce/backend/src/routes/index.ts index 226c3aa4..94d2a2aa 100644 --- a/meet-ce/backend/src/routes/index.ts +++ b/meet-ce/backend/src/routes/index.ts @@ -1,4 +1,5 @@ export * from './analytics.routes.js'; +export * from './ai-assistant.routes.js'; export * from './api-key.routes.js'; export * from './auth.routes.js'; export * from './global-config.routes.js'; diff --git a/meet-ce/backend/src/server.ts b/meet-ce/backend/src/server.ts index 1fefb73b..56a3f63b 100644 --- a/meet-ce/backend/src/server.ts +++ b/meet-ce/backend/src/server.ts @@ -9,6 +9,7 @@ import { setBaseUrlFromRequest } from './middlewares/base-url.middleware.js'; import { jsonSyntaxErrorHandler } from './middlewares/content-type.middleware.js'; import { initRequestContext } from './middlewares/request-context.middleware.js'; import { analyticsRouter } from './routes/analytics.routes.js'; +import { aiAssistantRouter } from './routes/ai-assistant.routes.js'; import { apiKeyRouter } from './routes/api-key.routes.js'; import { authRouter } from './routes/auth.routes.js'; import { configRouter } from './routes/global-config.routes.js'; @@ -99,6 +100,7 @@ const createApp = () => { // appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/recordings`, internalRecordingRouter); appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/config`, configRouter); appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/analytics`, analyticsRouter); + appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/ai`, aiAssistantRouter); appRouter.use('/health', (_req: Request, res: Response) => res.status(200).send('OK')); diff --git a/meet-ce/backend/src/services/ai-assistant.service.ts b/meet-ce/backend/src/services/ai-assistant.service.ts new file mode 100644 index 00000000..1a8aafe0 --- /dev/null +++ b/meet-ce/backend/src/services/ai-assistant.service.ts @@ -0,0 +1,251 @@ +import { MeetAssistantCapabilityName, MeetCreateAssistantResponse } from '@openvidu-meet/typings'; +import { inject, injectable } from 'inversify'; +import ms from 'ms'; +import { INTERNAL_CONFIG } from '../config/internal-config.js'; +import { MEET_ENV } from '../environment.js'; +import { MeetLock } from '../helpers/redis.helper.js'; +import { errorInsufficientPermissions } from '../models/error.model.js'; +import { RedisKeyName } from '../models/redis.model.js'; +import { LiveKitService } from './livekit.service.js'; +import { LoggerService } from './logger.service.js'; +import { MutexService } from './mutex.service.js'; +import { RedisService } from './redis.service.js'; +import { RoomService } from './room.service.js'; + +@injectable() +export class AiAssistantService { + private readonly ASSISTANT_STATE_LOCK_TTL = ms('15s'); + + constructor( + @inject(LoggerService) protected logger: LoggerService, + @inject(RoomService) protected roomService: RoomService, + @inject(LiveKitService) protected livekitService: LiveKitService, + @inject(MutexService) protected mutexService: MutexService, + @inject(RedisService) protected redisService: RedisService + ) {} + + /** + * Creates a live captions assistant for the specified room. + * If an assistant already exists for the room, it will be reused. + * @param roomId + * @param participantIdentity + * @returns + */ + async createLiveCaptionsAssistant( + roomId: string, + participantIdentity: string + ): Promise { + // ! For now, we are assuming that the only capability is live captions. + const capability = MeetAssistantCapabilityName.LIVE_CAPTIONS; + const lockName = MeetLock.getAiAssistantLock(roomId, capability); + + try { + await this.validateCreateConditions(roomId, capability); + + const lock = await this.mutexService.acquire(lockName, this.ASSISTANT_STATE_LOCK_TTL); + + if (!lock) { + this.logger.error(`Could not acquire lock '${lockName}' for creating assistant in room '${roomId}'`); + throw new Error('Could not acquire lock for creating assistant. Please try again.'); + } + + const existingAgent = await this.livekitService.getAgent(roomId, INTERNAL_CONFIG.CAPTIONS_AGENT_NAME); + + if (existingAgent) { + await this.setParticipantAssistantState(roomId, participantIdentity, capability, true); + return { id: existingAgent.id, status: 'active' }; + } + + const assistant = await this.livekitService.createAgent(roomId, INTERNAL_CONFIG.CAPTIONS_AGENT_NAME); + + await this.setParticipantAssistantState(roomId, participantIdentity, capability, true); + + return { + id: assistant.id, + status: 'active' + }; + } finally { + await this.mutexService.release(lockName); + } + } + + /** + * Stops the specified assistant for the given participant and room. + * If the assistant is not used by any other participants in the room, it will be stopped in LiveKit. + * @param assistantId + * @param roomId + * @param participantIdentity + * @returns + */ + async cancelAssistant(assistantId: string, roomId: string, participantIdentity: string): Promise { + const capability = MeetAssistantCapabilityName.LIVE_CAPTIONS; + // The lock only protects the atomic "count → stop dispatch" decision. + const lockName = MeetLock.getAiAssistantLock(roomId, capability); + + try { + await this.setParticipantAssistantState(roomId, participantIdentity, capability, false); + + const lock = await this.mutexService.acquire(lockName, this.ASSISTANT_STATE_LOCK_TTL); + + if (!lock) { + this.logger.warn( + `Could not acquire lock '${lockName}' for stopping assistant in room '${roomId}'. Participant state saved as disabled.` + ); + return; + } + + const enabledParticipants = await this.getEnabledParticipantsCount(roomId, capability); + + if (enabledParticipants > 0) { + this.logger.debug( + `Skipping assistant stop for room '${roomId}'. Remaining enabled participants: ${enabledParticipants}` + ); + return; + } + + const assistant = await this.livekitService.getAgent(roomId, assistantId); + + if (!assistant) { + this.logger.warn(`Captions assistant not found in room '${roomId}'. Skipping stop request.`); + return; + } + + await this.livekitService.stopAgent(assistantId, roomId); + } finally { + await this.mutexService.release(lockName); + } + } + + /** + * Cleanup assistant state in a room. + * - If participantIdentity is provided, removes only that participant state. + * - If participantIdentity is omitted, removes all assistant state in the room. + * + * If no enabled participants remain after cleanup, captions agent dispatch is stopped. + */ + async cleanupState(roomId: string, participantIdentity?: string): Promise { + const capability = MeetAssistantCapabilityName.LIVE_CAPTIONS; + const lockName = MeetLock.getAiAssistantLock(roomId, capability); + + try { + if (participantIdentity) { + await this.setParticipantAssistantState(roomId, participantIdentity, capability, false); + } + + // acquireWithRetry because this is called from webhooks (participantLeft / roomFinished). + // The agent may run indefinitely with no further opportunity to stop it. + const lock = await this.mutexService.acquireWithRetry(lockName, this.ASSISTANT_STATE_LOCK_TTL); + + if (!lock) { + const scope = participantIdentity ? `participant '${participantIdentity}'` : `room '${roomId}'`; + this.logger.error( + `Could not acquire lock '${lockName}' for dispatch cleanup (${scope}) after retries. ` + + (participantIdentity + ? 'Participant state was saved but dispatch stop may be skipped.' + : 'Room state cleanup and dispatch stop were skipped.') + ); + return; + } + + if (!participantIdentity) { + const pattern = `${RedisKeyName.AI_ASSISTANT_PARTICIPANT_STATE}${roomId}:${capability}:*`; + const keys = await this.redisService.getKeys(pattern); + + if (keys.length > 0) { + await this.redisService.delete(keys); + } + } + + const enabledParticipants = await this.getEnabledParticipantsCount(roomId, capability); + + if (enabledParticipants > 0) { + return; + } + + await this.stopCaptionsAssistantIfRunning(roomId); + } catch (error) { + this.logger.error(`Error occurred while cleaning up assistant state for room '${roomId}': ${error}`); + } finally { + await this.mutexService.release(lockName); + } + } + + protected async validateCreateConditions(roomId: string, capability: MeetAssistantCapabilityName): Promise { + if (capability === MeetAssistantCapabilityName.LIVE_CAPTIONS) { + if (MEET_ENV.CAPTIONS_ENABLED !== 'true') { + throw errorInsufficientPermissions(); + } + + const room = await this.roomService.getMeetRoom(roomId); + + if (!room.config.captions.enabled) { + throw errorInsufficientPermissions(); + } + } + } + + /** + * Sets or clears the assistant state for a participant in Redis. + * @param roomId + * @param participantIdentity + * @param capability + * @param enabled + */ + protected async setParticipantAssistantState( + roomId: string, + participantIdentity: string, + capability: MeetAssistantCapabilityName, + enabled: boolean + ): Promise { + const key = this.getParticipantAssistantStateKey(roomId, participantIdentity, capability); + + if (!enabled) { + await this.redisService.delete(key); + return; + } + + await this.redisService.setIfNotExists( + key, + JSON.stringify({ + enabled: true, + updatedAt: Date.now() + }) + ); + } + + /** + * Gets the count of participants that have the specified assistant capability enabled in the given room. + * @param roomId + * @param capability + * @returns + */ + protected async getEnabledParticipantsCount( + roomId: string, + capability: MeetAssistantCapabilityName + ): Promise { + const pattern = `${RedisKeyName.AI_ASSISTANT_PARTICIPANT_STATE}${roomId}:${capability}:*`; + const keys = await this.redisService.getKeys(pattern); + return keys.length; + } + + protected getParticipantAssistantStateKey( + roomId: string, + participantIdentity: string, + capability: MeetAssistantCapabilityName + ): string { + return `${RedisKeyName.AI_ASSISTANT_PARTICIPANT_STATE}${roomId}:${capability}:${participantIdentity}`; + } + + protected async stopCaptionsAssistantIfRunning(roomId: string): Promise { + const assistants = await this.livekitService.listAgents(roomId); + const captionsAssistant = assistants.find( + (assistant) => assistant.agentName === INTERNAL_CONFIG.CAPTIONS_AGENT_NAME + ); + + if (!captionsAssistant) { + return; + } + + await this.livekitService.stopAgent(captionsAssistant.id, roomId); + } +} diff --git a/meet-ce/backend/src/services/livekit-webhook.service.ts b/meet-ce/backend/src/services/livekit-webhook.service.ts index dadaa7a0..70e90ace 100644 --- a/meet-ce/backend/src/services/livekit-webhook.service.ts +++ b/meet-ce/backend/src/services/livekit-webhook.service.ts @@ -9,6 +9,7 @@ import { MeetRoomHelper } from '../helpers/room.helper.js'; import { DistributedEventType } from '../models/distributed-event.model.js'; import { RecordingRepository } from '../repositories/recording.repository.js'; import { RoomRepository } from '../repositories/room.repository.js'; +import { AiAssistantService } from './ai-assistant.service.js'; import { DistributedEventService } from './distributed-event.service.js'; import { FrontendEventService } from './frontend-event.service.js'; import { LiveKitService } from './livekit.service.js'; @@ -33,6 +34,7 @@ export class LivekitWebhookService { @inject(DistributedEventService) protected distributedEventService: DistributedEventService, @inject(FrontendEventService) protected frontendEventService: FrontendEventService, @inject(RoomMemberService) protected roomMemberService: RoomMemberService, + @inject(AiAssistantService) protected aiAssistantService: AiAssistantService, @inject(LoggerService) protected logger: LoggerService ) { this.webhookReceiver = new WebhookReceiver(MEET_ENV.LIVEKIT_API_KEY, MEET_ENV.LIVEKIT_API_SECRET); @@ -189,8 +191,10 @@ export class LivekitWebhookService { if (!this.livekitService.isStandardParticipant(participant)) return; try { - // Release the participant's reserved name - await this.roomMemberService.releaseParticipantName(room.name, participant.name); + await Promise.all([ + this.roomMemberService.releaseParticipantName(room.name, participant.name), + this.aiAssistantService.cleanupState(room.name, participant.identity) + ]); this.logger.verbose(`Released name for participant '${participant.name}' in room '${room.name}'`); } catch (error) { this.logger.error('Error releasing participant name on participant left:', error); @@ -282,7 +286,8 @@ export class LivekitWebhookService { tasks.push( this.roomMemberService.cleanupParticipantNames(roomId), - this.recordingService.releaseRecordingLockIfNoEgress(roomId) + this.recordingService.releaseRecordingLockIfNoEgress(roomId), + this.aiAssistantService.cleanupState(roomId) ); await Promise.all(tasks); } catch (error) { diff --git a/meet-ce/backend/src/services/livekit.service.ts b/meet-ce/backend/src/services/livekit.service.ts index 72debf17..676d00a4 100644 --- a/meet-ce/backend/src/services/livekit.service.ts +++ b/meet-ce/backend/src/services/livekit.service.ts @@ -1,6 +1,7 @@ -import { ParticipantInfo_Kind } from '@livekit/protocol'; +import { AgentDispatch, ParticipantInfo_Kind } from '@livekit/protocol'; import { inject, injectable } from 'inversify'; import { + AgentDispatchClient, CreateOptions, DataPacket_Kind, EgressClient, @@ -31,6 +32,7 @@ import { LoggerService } from './logger.service.js'; export class LiveKitService { private egressClient: EgressClient; private roomClient: RoomServiceClient; + private agentClient: AgentDispatchClient; constructor(@inject(LoggerService) protected logger: LoggerService) { const livekitUrlHostname = MEET_ENV.LIVEKIT_URL_PRIVATE.replace(/^ws:/, 'http:').replace(/^wss:/, 'https:'); @@ -40,6 +42,11 @@ export class LiveKitService { MEET_ENV.LIVEKIT_API_KEY, MEET_ENV.LIVEKIT_API_SECRET ); + this.agentClient = new AgentDispatchClient( + livekitUrlHostname, + MEET_ENV.LIVEKIT_API_KEY, + MEET_ENV.LIVEKIT_API_SECRET + ); } async createRoom(options: CreateOptions): Promise { @@ -270,6 +277,67 @@ export class LiveKitService { } } + /** + * Start an agent for a specific room. + * @param roomName + * @param agentName + * @returns The created AgentDispatch + */ + async createAgent( + roomName: string, + agentName: string /*, options: CreateDispatchOptions*/ + ): Promise { + try { + return await this.agentClient.createDispatch(roomName, agentName); + } catch (error) { + this.logger.error(`Error creating agent dispatch for room '${roomName}':`, error); + throw internalError(`creating agent dispatch for room '${roomName}'`); + } + } + + /** + * Lists all agents in a LiveKit room. + * @param roomName + * @returns An array of agents in the specified room + */ + async listAgents(roomName: string): Promise { + try { + return await this.agentClient.listDispatch(roomName); + } catch (error) { + this.logger.error(`Error listing agents for room '${roomName}':`, error); + throw internalError(`listing agents for room '${roomName}'`); + } + } + + /** + * Gets an agent dispatch by its ID in a LiveKit room. + * @param roomName + * @param agentId + * @returns The agent if found, otherwise undefined + */ + async getAgent(roomName: string, agentId: string): Promise { + try { + return await this.agentClient.getDispatch(agentId, roomName); + } catch (error) { + this.logger.error(`Error getting agent dispatch '${agentId}' for room '${roomName}':`, error); + throw internalError(`getting agent dispatch '${agentId}' for room '${roomName}'`); + } + } + + /** + * Stops an agent in a LiveKit room. + * @param agentId + * @param roomName + */ + async stopAgent(agentId: string, roomName: string): Promise { + try { + await this.agentClient.deleteDispatch(agentId, roomName); + } catch (error) { + this.logger.error(`Error deleting agent dispatch '${agentId}' for room '${roomName}':`, error); + throw internalError(`deleting agent dispatch '${agentId}' for room '${roomName}'`); + } + } + async startRoomComposite( roomName: string, output: EncodedFileOutput | StreamOutput, diff --git a/meet-ce/backend/src/services/mutex.service.ts b/meet-ce/backend/src/services/mutex.service.ts index 637596a0..c6ad4be1 100644 --- a/meet-ce/backend/src/services/mutex.service.ts +++ b/meet-ce/backend/src/services/mutex.service.ts @@ -105,6 +105,33 @@ export class MutexService { return locks; } + /** + * Attempts to acquire a lock, retrying up to `maxAttempts` times with a fixed delay between + * attempts. Intended for fire-and-forget flows (e.g. webhooks) where the caller has no + * opportunity to retry externally and a missed lock acquisition would leave the system in an + * inconsistent state. + * + * @param key - The resource to acquire a lock for. + * @param ttl - The time-to-live for the lock in milliseconds. + * @param maxAttempts - Maximum number of acquisition attempts. Defaults to 3. + * @param delayMs - Fixed delay in milliseconds between attempts. Defaults to 200. + * @returns A Promise that resolves to the acquired Lock, or null if all attempts fail. + */ + async acquireWithRetry(key: string, ttl: number = this.TTL_MS, maxAttempts = 3, delayMs = 200): Promise { + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + const lock = await this.acquire(key, ttl); + + if (lock) return lock; + + if (attempt < maxAttempts) { + this.logger.warn(`Lock '${key}' attempt ${attempt}/${maxAttempts} failed. Retrying in ${delayMs}ms...`); + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + + return null; + } + lockExists(key: string): Promise { const registryKey = MeetLock.getRegistryLock(key); return this.redisService.exists(registryKey); diff --git a/meet-ce/backend/src/services/room-member.service.ts b/meet-ce/backend/src/services/room-member.service.ts index 950ae899..6fcee613 100644 --- a/meet-ce/backend/src/services/room-member.service.ts +++ b/meet-ce/backend/src/services/room-member.service.ts @@ -133,10 +133,9 @@ export class RoomMemberService { // Get participant permissions (with join meeting) const permissions = await this.getRoomMemberPermissions(roomId, role, true); - const withCaptions = room.config.captions.enabled ?? false; // Generate token with participant name - return this.tokenService.generateRoomMemberToken(role, permissions, participantName, participantIdentity, withCaptions); + return this.tokenService.generateRoomMemberToken(role, permissions, participantName, participantIdentity); } /** diff --git a/meet-ce/backend/src/services/token.service.ts b/meet-ce/backend/src/services/token.service.ts index f0af729c..7ed0536d 100644 --- a/meet-ce/backend/src/services/token.service.ts +++ b/meet-ce/backend/src/services/token.service.ts @@ -1,4 +1,3 @@ -import { RoomAgentDispatch, RoomConfiguration } from '@livekit/protocol'; import { MeetRoomMemberPermissions, MeetRoomMemberRole, @@ -42,8 +41,7 @@ export class TokenService { role: MeetRoomMemberRole, permissions: MeetRoomMemberPermissions, participantName?: string, - participantIdentity?: string, - roomWithCaptions = false + participantIdentity?: string ): Promise { const metadata: MeetRoomMemberTokenMetadata = { livekitUrl: MEET_ENV.LIVEKIT_URL, @@ -57,46 +55,16 @@ export class TokenService { ttl: INTERNAL_CONFIG.ROOM_MEMBER_TOKEN_EXPIRATION, metadata: JSON.stringify(metadata) }; - return await this.generateJwtToken(tokenOptions, permissions.livekit as VideoGrant, roomWithCaptions); + return await this.generateJwtToken(tokenOptions, permissions.livekit as VideoGrant); } - private async generateJwtToken( - tokenOptions: AccessTokenOptions, - grants?: VideoGrant, - roomWithCaptions = false - ): Promise { + private async generateJwtToken(tokenOptions: AccessTokenOptions, grants?: VideoGrant): Promise { const at = new AccessToken(MEET_ENV.LIVEKIT_API_KEY, MEET_ENV.LIVEKIT_API_SECRET, tokenOptions); if (grants) { at.addGrant(grants); } - const captionsEnabledGlobally = MEET_ENV.CAPTIONS_ENABLED === 'true'; - const captionsEnabledInRoom = Boolean(roomWithCaptions); - - // Warn if configuration is inconsistent - if (!captionsEnabledGlobally) { - if (captionsEnabledInRoom) { - this.logger.warn( - `Captions feature is disabled in environment but Room is created with captions enabled. ` + - `Please enable captions in environment by setting MEET_CAPTIONS_ENABLED=true to ensure proper functionality.` - ); - } - - return await at.toJwt(); - } - - if (captionsEnabledInRoom) { - this.logger.debug('Activating Captions Agent. Configuring Room Agent Dispatch.'); - at.roomConfig = new RoomConfiguration({ - agents: [ - new RoomAgentDispatch({ - agentName: INTERNAL_CONFIG.CAPTIONS_AGENT_NAME - }) - ] - }); - } - return await at.toJwt(); } diff --git a/meet-ce/typings/src/ai-assistant.ts b/meet-ce/typings/src/ai-assistant.ts new file mode 100644 index 00000000..50abcd42 --- /dev/null +++ b/meet-ce/typings/src/ai-assistant.ts @@ -0,0 +1,42 @@ +/** + * Assistant creation options + */ +export interface MeetCreateAssistantRequest { + // scope: MeetAssistantScope; + capabilities: MeetAssistantCapability[]; +} + +/** + * Defines the scope of an assistant, i.e. the resource(s) it is associated with. + */ +// export interface MeetAssistantScope { +// resourceType: MeetAssistantScopeResourceType; +// resourceIds: string[]; +// } + +/** + * Defines a capability that an assistant can have, such as live captions. + */ +export interface MeetAssistantCapability { + name: MeetAssistantCapabilityName; +} + +/** + * Enumeration of supported assistant capabilities. + */ +export enum MeetAssistantCapabilityName { + LIVE_CAPTIONS = 'live_captions', +} + +/** + * Enumeration of supported resource types that an assistant can be associated with. + */ +// export enum MeetAssistantScopeResourceType { +// MEETING = 'meeting', +// } + + +export interface MeetCreateAssistantResponse { + id: string; + status: 'active'; +} diff --git a/meet-ce/typings/src/index.ts b/meet-ce/typings/src/index.ts index a10c972f..e830cddd 100644 --- a/meet-ce/typings/src/index.ts +++ b/meet-ce/typings/src/index.ts @@ -1,22 +1,24 @@ export * from './api-key.js'; export * from './auth-config.js'; -export * from './global-config.js'; export * from './event.model.js'; +export * from './global-config.js'; export * from './permissions/livekit-permissions.js'; export * from './permissions/meet-permissions.js'; -export * from './sort-pagination.js'; export * from './room-member.js'; +export * from './sort-pagination.js'; export * from './user.js'; +export * from './ai-assistant.js'; +export * from './analytics.js'; +export * from './recording.model.js'; export * from './room-config.js'; export * from './room.js'; -export * from './recording.model.js'; export * from './webhook.model.js'; -export * from './analytics.js'; // Webcomponent types export * from './webcomponent/command.model.js'; export * from './webcomponent/event.model.js'; export * from './webcomponent/message.type.js'; export * from './webcomponent/properties.model.js'; +