backend(ai-assistant): implement AI assistant creation and management

- Add OpenAPI components for creating and responding to AI assistant requests.
- Implement AI assistant service for managing live captions capability.
- Create routes and controllers for AI assistant operations (create and cancel).
- Introduce request validation middleware for AI assistant requests.
- Update Redis helper to manage AI assistant locks.
- Integrate AI assistant cleanup in webhook service.
- Enhance LiveKit service to manage agent dispatch for AI assistants.
- Update token service to remove unnecessary parameters related to captions.
- Add typings for AI assistant requests and responses.
This commit is contained in:
CSantosM 2026-03-03 18:43:35 +01:00
parent 56d5126acb
commit c808e98820
26 changed files with 714 additions and 48 deletions

View File

@ -0,0 +1,6 @@
description: Create AI assistant activation request
required: true
content:
application/json:
schema:
$ref: '../../schemas/internal/ai-assistant-create-request.yaml'

View File

@ -0,0 +1,5 @@
description: Successfully created or reused AI assistant activation
content:
application/json:
schema:
$ref: '../../schemas/internal/ai-assistant-create-response.yaml'

View File

@ -0,0 +1,37 @@
type: object
required:
# - scope
- capabilities
properties:
# scope:
# type: object
# required:
# - resourceType
# - resourceIds
# properties:
# resourceType:
# type: string
# enum: ['meeting']
# description: Scope resource type where assistant will be activated.
# example: meeting
# resourceIds:
# type: array
# minItems: 1
# items:
# type: string
# minLength: 1
# description: List of target resource ids.
# example: ['meeting_123']
capabilities:
type: array
minItems: 1
items:
type: object
required:
- name
properties:
name:
type: string
enum: ['live_captions']
description: AI capability to activate.
example: live_captions

View File

@ -0,0 +1,14 @@
type: object
required:
- id
- status
properties:
id:
type: string
description: Identifier of the assistant activation.
example: asst_123
status:
type: string
enum: ['active']
description: Current assistant activation state.
example: active

View File

@ -42,6 +42,10 @@ paths:
$ref: './paths/internal/meetings.yaml#/~1meetings~1{roomId}~1participants~1{participantIdentity}'
/meetings/{roomId}/participants/{participantIdentity}/role:
$ref: './paths/internal/meetings.yaml#/~1meetings~1{roomId}~1participants~1{participantIdentity}~1role'
/ai/assistants:
$ref: './paths/internal/ai-assistant.yaml#/~1ai~1assistants'
/ai/assistants/{assistantId}:
$ref: './paths/internal/ai-assistant.yaml#/~1ai~1assistants~1{assistantId}'
/analytics:
$ref: './paths/internal/analytics.yaml#/~1analytics'
@ -63,5 +67,9 @@ components:
$ref: components/schemas/meet-room.yaml
MeetAnalytics:
$ref: components/schemas/internal/meet-analytics.yaml
AiAssistantCreateRequest:
$ref: components/schemas/internal/ai-assistant-create-request.yaml
AiAssistantCreateResponse:
$ref: components/schemas/internal/ai-assistant-create-response.yaml
Error:
$ref: components/schemas/error.yaml

View File

@ -0,0 +1,56 @@
/ai/assistants:
post:
operationId: createAiAssistant
summary: Create AI assistant
description: |
Activates AI assistance.
> Currently only meeting AI Assistand and `live_captions` capability is supported.
tags:
- Internal API - AI Assistants
security:
- roomMemberTokenHeader: []
requestBody:
$ref: '../../components/requestBodies/internal/create-ai-assistant-request.yaml'
responses:
'200':
$ref: '../../components/responses/internal/success-create-ai-assistant.yaml'
'401':
$ref: '../../components/responses/unauthorized-error.yaml'
'403':
$ref: '../../components/responses/forbidden-error.yaml'
'422':
$ref: '../../components/responses/validation-error.yaml'
'500':
$ref: '../../components/responses/internal-server-error.yaml'
/ai/assistants/{assistantId}:
delete:
operationId: cancelAiAssistant
summary: Cancel AI assistant
description: |
Cancels AI assistant.
The assistant process (live_captions) is stopped only when the last participant cancels it.
tags:
- Internal API - AI Assistants
security:
- roomMemberTokenHeader: []
parameters:
- in: path
name: assistantId
required: true
schema:
type: string
minLength: 1
description: Identifier of the assistant activation returned by create operation.
example: asst_123
responses:
'204':
description: AI assistant canceled successfully.
'401':
$ref: '../../components/responses/unauthorized-error.yaml'
'422':
$ref: '../../components/responses/validation-error.yaml'
'500':
$ref: '../../components/responses/internal-server-error.yaml'

View File

@ -16,5 +16,7 @@
description: Operations related to managing OpenVidu Meet rooms
- name: Internal API - Meetings
description: Operations related to managing meetings in OpenVidu Meet rooms
- name: Internal API - AI Assistants
description: High-level operations to manage AI assistance capabilities in meetings
- name: Internal API - Recordings
description: Operations related to managing OpenVidu Meet recordings

View File

@ -54,6 +54,7 @@ import { LivekitWebhookService } from '../services/livekit-webhook.service.js';
import { RoomScheduledTasksService } from '../services/room-scheduled-tasks.service.js';
import { RecordingScheduledTasksService } from '../services/recording-scheduled-tasks.service.js';
import { AnalyticsService } from '../services/analytics.service.js';
import { AiAssistantService } from '../services/ai-assistant.service.js';
export const container: Container = new Container();
@ -113,6 +114,7 @@ export const registerDependencies = () => {
container.bind(RoomScheduledTasksService).toSelf().inSingletonScope();
container.bind(RecordingScheduledTasksService).toSelf().inSingletonScope();
container.bind(AnalyticsService).toSelf().inSingletonScope();
container.bind(AiAssistantService).toSelf().inSingletonScope();
};
const configureStorage = (storageMode: string) => {

View File

@ -49,7 +49,7 @@ export const INTERNAL_CONFIG = {
PARTICIPANT_MAX_CONCURRENT_NAME_REQUESTS: '20', // Maximum number of request by the same name at the same time allowed
PARTICIPANT_NAME_RESERVATION_TTL: '12h' as StringValue, // Time-to-live for participant name reservations
CAPTIONS_AGENT_NAME: 'agent-speech-processing',
CAPTIONS_AGENT_NAME: 'speech-processing',
// MongoDB Schema Versions
// These define the current schema version for each collection

View File

@ -0,0 +1,69 @@
import { Request, Response } from 'express';
import { container } from '../config/dependency-injector.config.js';
import { handleError } from '../models/error.model.js';
import { AiAssistantService } from '../services/ai-assistant.service.js';
import { LoggerService } from '../services/logger.service.js';
import { RequestSessionService } from '../services/request-session.service.js';
import { TokenService } from '../services/token.service.js';
import { getRoomMemberToken } from '../utils/token.utils.js';
const getRoomMemberIdentityFromRequest = async (req: Request): Promise<string> => {
const tokenService = container.get(TokenService);
const token = getRoomMemberToken(req);
if (!token) {
throw new Error('Room member token not found');
}
const claims = await tokenService.verifyToken(token);
if (!claims.sub) {
throw new Error('Room member token does not include participant identity');
}
return claims.sub;
};
export const createAssistant = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const requestSessionService = container.get(RequestSessionService);
const aiAssistantService = container.get(AiAssistantService);
// const payload: MeetCreateAssistantRequest = req.body;
const roomId = requestSessionService.getRoomIdFromToken();
if (!roomId) {
return handleError(res, new Error('Could not resolve room from token'), 'creating assistant');
}
try {
const participantIdentity = await getRoomMemberIdentityFromRequest(req);
logger.verbose(`Creating assistant for participant '${participantIdentity}' in room '${roomId}'`);
const assistant = await aiAssistantService.createLiveCaptionsAssistant(roomId, participantIdentity);
return res.status(200).json(assistant);
} catch (error) {
handleError(res, error, `creating assistant in room '${roomId}'`);
}
};
export const cancelAssistant = async (req: Request, res: Response) => {
const logger = container.get(LoggerService);
const requestSessionService = container.get(RequestSessionService);
const aiAssistantService = container.get(AiAssistantService);
const { assistantId } = req.params;
const roomId = requestSessionService.getRoomIdFromToken();
if (!roomId) {
return handleError(res, new Error('Could not resolve room from token'), 'canceling assistant');
}
try {
const participantIdentity = await getRoomMemberIdentityFromRequest(req);
logger.verbose(
`Canceling assistant '${assistantId}' for participant '${participantIdentity}' in room '${roomId}'`
);
await aiAssistantService.cancelAssistant(assistantId, roomId, participantIdentity);
return res.status(204).send();
} catch (error) {
handleError(res, error, `canceling assistant '${assistantId}' in room '${roomId}'`);
}
};

View File

@ -45,4 +45,16 @@ export class MeetLock {
return `${RedisLockPrefix.BASE}${RedisLockName.WEBHOOK}_${webhookEvent.event}_${webhookEvent.id}`;
}
static getAiAssistantLock(roomId: string, capabilityName: string): string {
if (!roomId) {
throw new Error('roomId must be a non-empty string');
}
if (!capabilityName) {
throw new Error('capabilityName must be a non-empty string');
}
return `${RedisLockPrefix.BASE}${RedisLockName.AI_ASSISTANT}_${roomId}_${capabilityName}`;
}
}

View File

@ -9,6 +9,7 @@ export * from './room.middleware.js';
// Request validators
export * from './request-validators/auth-validator.middleware.js';
export * from './request-validators/ai-assistant-validator.middleware.js';
export * from './request-validators/config-validator.middleware.js';
export * from './request-validators/meeting-validator.middleware.js';
export * from './request-validators/recording-validator.middleware.js';

View File

@ -0,0 +1,26 @@
import { NextFunction, Request, Response } from 'express';
import { rejectUnprocessableRequest } from '../../models/error.model.js';
import { AssistantIdSchema, CreateAssistantReqSchema } from '../../models/zod-schemas/ai-assistant.schema.js';
export const validateCreateAssistantReq = (req: Request, res: Response, next: NextFunction) => {
const { success, error, data } = CreateAssistantReqSchema.safeParse(req.body);
if (!success) {
return rejectUnprocessableRequest(res, error);
}
req.body = data;
next();
};
export const validateAssistantIdPathParam = (req: Request, res: Response, next: NextFunction) => {
const { success, error, data } = AssistantIdSchema.safeParse(req.params.assistantId);
if (!success) {
error.errors[0].path = ['assistantId'];
return rejectUnprocessableRequest(res, error);
}
req.params.assistantId = data;
next();
};

View File

@ -5,7 +5,9 @@ export const enum RedisKeyName {
ROOM_PARTICIPANTS = `${REDIS_KEY_PREFIX}room_participants:`,
// Stores released numeric suffixes (per base name) in a sorted set, so that freed numbers
// can be reused efficiently instead of always incrementing to the next highest number.
PARTICIPANT_NAME_POOL = `${REDIS_KEY_PREFIX}participant_pool:`
PARTICIPANT_NAME_POOL = `${REDIS_KEY_PREFIX}participant_pool:`,
// Tracks participant-level assistant capability state in a room.
AI_ASSISTANT_PARTICIPANT_STATE = `${REDIS_KEY_PREFIX}ai_assistant:participant_state:`
}
export const enum RedisLockPrefix {
@ -18,5 +20,6 @@ export const enum RedisLockName {
SCHEDULED_TASK = 'scheduled_task',
STORAGE_INITIALIZATION = 'storage_initialization',
MIGRATION = 'migration',
WEBHOOK = 'webhook'
WEBHOOK = 'webhook',
AI_ASSISTANT = 'ai_assistant'
}

View File

@ -0,0 +1,34 @@
import { MeetAssistantCapabilityName } from '@openvidu-meet/typings';
import { z } from 'zod';
export const CreateAssistantReqSchema = z.object({
// scope: z.object({
// resourceType: z.nativeEnum(MeetAssistantScopeResourceType),
// resourceIds: z.array(z.string().trim().min(1)).min(1)
// }),
capabilities: z
.array(
z.object({
name: z.string()
})
)
.min(1)
.transform((capabilities) => {
const validValues = Object.values(MeetAssistantCapabilityName);
// Filter out invalid capabilities
const filtered = capabilities.filter((cap) =>
validValues.includes(cap.name as MeetAssistantCapabilityName)
);
// Remove duplicates based on capability name
const unique = Array.from(new Map(filtered.map((cap) => [cap.name, cap])).values());
return unique;
})
.refine((caps) => caps.length > 0, {
message: 'At least one valid capability is required'
})
});
export const AssistantIdSchema = z.string().trim().min(1);

View File

@ -0,0 +1,26 @@
import bodyParser from 'body-parser';
import { Router } from 'express';
import * as aiAssistantCtrl from '../controllers/ai-assistant.controller.js';
import { roomMemberTokenValidator, withAuth } from '../middlewares/auth.middleware.js';
import {
validateAssistantIdPathParam,
validateCreateAssistantReq
} from '../middlewares/request-validators/ai-assistant-validator.middleware.js';
export const aiAssistantRouter: Router = Router();
aiAssistantRouter.use(bodyParser.urlencoded({ extended: true }));
aiAssistantRouter.use(bodyParser.json());
aiAssistantRouter.post(
'/assistants',
withAuth(roomMemberTokenValidator),
validateCreateAssistantReq,
aiAssistantCtrl.createAssistant
);
aiAssistantRouter.delete(
'/assistants/:assistantId',
withAuth(roomMemberTokenValidator),
validateAssistantIdPathParam,
aiAssistantCtrl.cancelAssistant
);

View File

@ -1,4 +1,5 @@
export * from './analytics.routes.js';
export * from './ai-assistant.routes.js';
export * from './api-key.routes.js';
export * from './auth.routes.js';
export * from './global-config.routes.js';

View File

@ -9,6 +9,7 @@ import { setBaseUrlFromRequest } from './middlewares/base-url.middleware.js';
import { jsonSyntaxErrorHandler } from './middlewares/content-type.middleware.js';
import { initRequestContext } from './middlewares/request-context.middleware.js';
import { analyticsRouter } from './routes/analytics.routes.js';
import { aiAssistantRouter } from './routes/ai-assistant.routes.js';
import { apiKeyRouter } from './routes/api-key.routes.js';
import { authRouter } from './routes/auth.routes.js';
import { configRouter } from './routes/global-config.routes.js';
@ -99,6 +100,7 @@ const createApp = () => {
// appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/recordings`, internalRecordingRouter);
appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/config`, configRouter);
appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/analytics`, analyticsRouter);
appRouter.use(`${INTERNAL_CONFIG.INTERNAL_API_BASE_PATH_V1}/ai`, aiAssistantRouter);
appRouter.use('/health', (_req: Request, res: Response) => res.status(200).send('OK'));

View File

@ -0,0 +1,251 @@
import { MeetAssistantCapabilityName, MeetCreateAssistantResponse } from '@openvidu-meet/typings';
import { inject, injectable } from 'inversify';
import ms from 'ms';
import { INTERNAL_CONFIG } from '../config/internal-config.js';
import { MEET_ENV } from '../environment.js';
import { MeetLock } from '../helpers/redis.helper.js';
import { errorInsufficientPermissions } from '../models/error.model.js';
import { RedisKeyName } from '../models/redis.model.js';
import { LiveKitService } from './livekit.service.js';
import { LoggerService } from './logger.service.js';
import { MutexService } from './mutex.service.js';
import { RedisService } from './redis.service.js';
import { RoomService } from './room.service.js';
@injectable()
export class AiAssistantService {
private readonly ASSISTANT_STATE_LOCK_TTL = ms('15s');
constructor(
@inject(LoggerService) protected logger: LoggerService,
@inject(RoomService) protected roomService: RoomService,
@inject(LiveKitService) protected livekitService: LiveKitService,
@inject(MutexService) protected mutexService: MutexService,
@inject(RedisService) protected redisService: RedisService
) {}
/**
* Creates a live captions assistant for the specified room.
* If an assistant already exists for the room, it will be reused.
* @param roomId
* @param participantIdentity
* @returns
*/
async createLiveCaptionsAssistant(
roomId: string,
participantIdentity: string
): Promise<MeetCreateAssistantResponse> {
// ! For now, we are assuming that the only capability is live captions.
const capability = MeetAssistantCapabilityName.LIVE_CAPTIONS;
const lockName = MeetLock.getAiAssistantLock(roomId, capability);
try {
await this.validateCreateConditions(roomId, capability);
const lock = await this.mutexService.acquire(lockName, this.ASSISTANT_STATE_LOCK_TTL);
if (!lock) {
this.logger.error(`Could not acquire lock '${lockName}' for creating assistant in room '${roomId}'`);
throw new Error('Could not acquire lock for creating assistant. Please try again.');
}
const existingAgent = await this.livekitService.getAgent(roomId, INTERNAL_CONFIG.CAPTIONS_AGENT_NAME);
if (existingAgent) {
await this.setParticipantAssistantState(roomId, participantIdentity, capability, true);
return { id: existingAgent.id, status: 'active' };
}
const assistant = await this.livekitService.createAgent(roomId, INTERNAL_CONFIG.CAPTIONS_AGENT_NAME);
await this.setParticipantAssistantState(roomId, participantIdentity, capability, true);
return {
id: assistant.id,
status: 'active'
};
} finally {
await this.mutexService.release(lockName);
}
}
/**
* Stops the specified assistant for the given participant and room.
* If the assistant is not used by any other participants in the room, it will be stopped in LiveKit.
* @param assistantId
* @param roomId
* @param participantIdentity
* @returns
*/
async cancelAssistant(assistantId: string, roomId: string, participantIdentity: string): Promise<void> {
const capability = MeetAssistantCapabilityName.LIVE_CAPTIONS;
// The lock only protects the atomic "count → stop dispatch" decision.
const lockName = MeetLock.getAiAssistantLock(roomId, capability);
try {
await this.setParticipantAssistantState(roomId, participantIdentity, capability, false);
const lock = await this.mutexService.acquire(lockName, this.ASSISTANT_STATE_LOCK_TTL);
if (!lock) {
this.logger.warn(
`Could not acquire lock '${lockName}' for stopping assistant in room '${roomId}'. Participant state saved as disabled.`
);
return;
}
const enabledParticipants = await this.getEnabledParticipantsCount(roomId, capability);
if (enabledParticipants > 0) {
this.logger.debug(
`Skipping assistant stop for room '${roomId}'. Remaining enabled participants: ${enabledParticipants}`
);
return;
}
const assistant = await this.livekitService.getAgent(roomId, assistantId);
if (!assistant) {
this.logger.warn(`Captions assistant not found in room '${roomId}'. Skipping stop request.`);
return;
}
await this.livekitService.stopAgent(assistantId, roomId);
} finally {
await this.mutexService.release(lockName);
}
}
/**
* Cleanup assistant state in a room.
* - If participantIdentity is provided, removes only that participant state.
* - If participantIdentity is omitted, removes all assistant state in the room.
*
* If no enabled participants remain after cleanup, captions agent dispatch is stopped.
*/
async cleanupState(roomId: string, participantIdentity?: string): Promise<void> {
const capability = MeetAssistantCapabilityName.LIVE_CAPTIONS;
const lockName = MeetLock.getAiAssistantLock(roomId, capability);
try {
if (participantIdentity) {
await this.setParticipantAssistantState(roomId, participantIdentity, capability, false);
}
// acquireWithRetry because this is called from webhooks (participantLeft / roomFinished).
// The agent may run indefinitely with no further opportunity to stop it.
const lock = await this.mutexService.acquireWithRetry(lockName, this.ASSISTANT_STATE_LOCK_TTL);
if (!lock) {
const scope = participantIdentity ? `participant '${participantIdentity}'` : `room '${roomId}'`;
this.logger.error(
`Could not acquire lock '${lockName}' for dispatch cleanup (${scope}) after retries. ` +
(participantIdentity
? 'Participant state was saved but dispatch stop may be skipped.'
: 'Room state cleanup and dispatch stop were skipped.')
);
return;
}
if (!participantIdentity) {
const pattern = `${RedisKeyName.AI_ASSISTANT_PARTICIPANT_STATE}${roomId}:${capability}:*`;
const keys = await this.redisService.getKeys(pattern);
if (keys.length > 0) {
await this.redisService.delete(keys);
}
}
const enabledParticipants = await this.getEnabledParticipantsCount(roomId, capability);
if (enabledParticipants > 0) {
return;
}
await this.stopCaptionsAssistantIfRunning(roomId);
} catch (error) {
this.logger.error(`Error occurred while cleaning up assistant state for room '${roomId}': ${error}`);
} finally {
await this.mutexService.release(lockName);
}
}
protected async validateCreateConditions(roomId: string, capability: MeetAssistantCapabilityName): Promise<void> {
if (capability === MeetAssistantCapabilityName.LIVE_CAPTIONS) {
if (MEET_ENV.CAPTIONS_ENABLED !== 'true') {
throw errorInsufficientPermissions();
}
const room = await this.roomService.getMeetRoom(roomId);
if (!room.config.captions.enabled) {
throw errorInsufficientPermissions();
}
}
}
/**
* Sets or clears the assistant state for a participant in Redis.
* @param roomId
* @param participantIdentity
* @param capability
* @param enabled
*/
protected async setParticipantAssistantState(
roomId: string,
participantIdentity: string,
capability: MeetAssistantCapabilityName,
enabled: boolean
): Promise<void> {
const key = this.getParticipantAssistantStateKey(roomId, participantIdentity, capability);
if (!enabled) {
await this.redisService.delete(key);
return;
}
await this.redisService.setIfNotExists(
key,
JSON.stringify({
enabled: true,
updatedAt: Date.now()
})
);
}
/**
* Gets the count of participants that have the specified assistant capability enabled in the given room.
* @param roomId
* @param capability
* @returns
*/
protected async getEnabledParticipantsCount(
roomId: string,
capability: MeetAssistantCapabilityName
): Promise<number> {
const pattern = `${RedisKeyName.AI_ASSISTANT_PARTICIPANT_STATE}${roomId}:${capability}:*`;
const keys = await this.redisService.getKeys(pattern);
return keys.length;
}
protected getParticipantAssistantStateKey(
roomId: string,
participantIdentity: string,
capability: MeetAssistantCapabilityName
): string {
return `${RedisKeyName.AI_ASSISTANT_PARTICIPANT_STATE}${roomId}:${capability}:${participantIdentity}`;
}
protected async stopCaptionsAssistantIfRunning(roomId: string): Promise<void> {
const assistants = await this.livekitService.listAgents(roomId);
const captionsAssistant = assistants.find(
(assistant) => assistant.agentName === INTERNAL_CONFIG.CAPTIONS_AGENT_NAME
);
if (!captionsAssistant) {
return;
}
await this.livekitService.stopAgent(captionsAssistant.id, roomId);
}
}

View File

@ -9,6 +9,7 @@ import { MeetRoomHelper } from '../helpers/room.helper.js';
import { DistributedEventType } from '../models/distributed-event.model.js';
import { RecordingRepository } from '../repositories/recording.repository.js';
import { RoomRepository } from '../repositories/room.repository.js';
import { AiAssistantService } from './ai-assistant.service.js';
import { DistributedEventService } from './distributed-event.service.js';
import { FrontendEventService } from './frontend-event.service.js';
import { LiveKitService } from './livekit.service.js';
@ -33,6 +34,7 @@ export class LivekitWebhookService {
@inject(DistributedEventService) protected distributedEventService: DistributedEventService,
@inject(FrontendEventService) protected frontendEventService: FrontendEventService,
@inject(RoomMemberService) protected roomMemberService: RoomMemberService,
@inject(AiAssistantService) protected aiAssistantService: AiAssistantService,
@inject(LoggerService) protected logger: LoggerService
) {
this.webhookReceiver = new WebhookReceiver(MEET_ENV.LIVEKIT_API_KEY, MEET_ENV.LIVEKIT_API_SECRET);
@ -189,8 +191,10 @@ export class LivekitWebhookService {
if (!this.livekitService.isStandardParticipant(participant)) return;
try {
// Release the participant's reserved name
await this.roomMemberService.releaseParticipantName(room.name, participant.name);
await Promise.all([
this.roomMemberService.releaseParticipantName(room.name, participant.name),
this.aiAssistantService.cleanupState(room.name, participant.identity)
]);
this.logger.verbose(`Released name for participant '${participant.name}' in room '${room.name}'`);
} catch (error) {
this.logger.error('Error releasing participant name on participant left:', error);
@ -282,7 +286,8 @@ export class LivekitWebhookService {
tasks.push(
this.roomMemberService.cleanupParticipantNames(roomId),
this.recordingService.releaseRecordingLockIfNoEgress(roomId)
this.recordingService.releaseRecordingLockIfNoEgress(roomId),
this.aiAssistantService.cleanupState(roomId)
);
await Promise.all(tasks);
} catch (error) {

View File

@ -1,6 +1,7 @@
import { ParticipantInfo_Kind } from '@livekit/protocol';
import { AgentDispatch, ParticipantInfo_Kind } from '@livekit/protocol';
import { inject, injectable } from 'inversify';
import {
AgentDispatchClient,
CreateOptions,
DataPacket_Kind,
EgressClient,
@ -31,6 +32,7 @@ import { LoggerService } from './logger.service.js';
export class LiveKitService {
private egressClient: EgressClient;
private roomClient: RoomServiceClient;
private agentClient: AgentDispatchClient;
constructor(@inject(LoggerService) protected logger: LoggerService) {
const livekitUrlHostname = MEET_ENV.LIVEKIT_URL_PRIVATE.replace(/^ws:/, 'http:').replace(/^wss:/, 'https:');
@ -40,6 +42,11 @@ export class LiveKitService {
MEET_ENV.LIVEKIT_API_KEY,
MEET_ENV.LIVEKIT_API_SECRET
);
this.agentClient = new AgentDispatchClient(
livekitUrlHostname,
MEET_ENV.LIVEKIT_API_KEY,
MEET_ENV.LIVEKIT_API_SECRET
);
}
async createRoom(options: CreateOptions): Promise<Room> {
@ -270,6 +277,67 @@ export class LiveKitService {
}
}
/**
* Start an agent for a specific room.
* @param roomName
* @param agentName
* @returns The created AgentDispatch
*/
async createAgent(
roomName: string,
agentName: string /*, options: CreateDispatchOptions*/
): Promise<AgentDispatch> {
try {
return await this.agentClient.createDispatch(roomName, agentName);
} catch (error) {
this.logger.error(`Error creating agent dispatch for room '${roomName}':`, error);
throw internalError(`creating agent dispatch for room '${roomName}'`);
}
}
/**
* Lists all agents in a LiveKit room.
* @param roomName
* @returns An array of agents in the specified room
*/
async listAgents(roomName: string): Promise<AgentDispatch[]> {
try {
return await this.agentClient.listDispatch(roomName);
} catch (error) {
this.logger.error(`Error listing agents for room '${roomName}':`, error);
throw internalError(`listing agents for room '${roomName}'`);
}
}
/**
* Gets an agent dispatch by its ID in a LiveKit room.
* @param roomName
* @param agentId
* @returns The agent if found, otherwise undefined
*/
async getAgent(roomName: string, agentId: string): Promise<AgentDispatch | undefined> {
try {
return await this.agentClient.getDispatch(agentId, roomName);
} catch (error) {
this.logger.error(`Error getting agent dispatch '${agentId}' for room '${roomName}':`, error);
throw internalError(`getting agent dispatch '${agentId}' for room '${roomName}'`);
}
}
/**
* Stops an agent in a LiveKit room.
* @param agentId
* @param roomName
*/
async stopAgent(agentId: string, roomName: string): Promise<void> {
try {
await this.agentClient.deleteDispatch(agentId, roomName);
} catch (error) {
this.logger.error(`Error deleting agent dispatch '${agentId}' for room '${roomName}':`, error);
throw internalError(`deleting agent dispatch '${agentId}' for room '${roomName}'`);
}
}
async startRoomComposite(
roomName: string,
output: EncodedFileOutput | StreamOutput,

View File

@ -105,6 +105,33 @@ export class MutexService {
return locks;
}
/**
* Attempts to acquire a lock, retrying up to `maxAttempts` times with a fixed delay between
* attempts. Intended for fire-and-forget flows (e.g. webhooks) where the caller has no
* opportunity to retry externally and a missed lock acquisition would leave the system in an
* inconsistent state.
*
* @param key - The resource to acquire a lock for.
* @param ttl - The time-to-live for the lock in milliseconds.
* @param maxAttempts - Maximum number of acquisition attempts. Defaults to 3.
* @param delayMs - Fixed delay in milliseconds between attempts. Defaults to 200.
* @returns A Promise that resolves to the acquired Lock, or null if all attempts fail.
*/
async acquireWithRetry(key: string, ttl: number = this.TTL_MS, maxAttempts = 3, delayMs = 200): Promise<Lock | null> {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
const lock = await this.acquire(key, ttl);
if (lock) return lock;
if (attempt < maxAttempts) {
this.logger.warn(`Lock '${key}' attempt ${attempt}/${maxAttempts} failed. Retrying in ${delayMs}ms...`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
return null;
}
lockExists(key: string): Promise<boolean> {
const registryKey = MeetLock.getRegistryLock(key);
return this.redisService.exists(registryKey);

View File

@ -133,10 +133,9 @@ export class RoomMemberService {
// Get participant permissions (with join meeting)
const permissions = await this.getRoomMemberPermissions(roomId, role, true);
const withCaptions = room.config.captions.enabled ?? false;
// Generate token with participant name
return this.tokenService.generateRoomMemberToken(role, permissions, participantName, participantIdentity, withCaptions);
return this.tokenService.generateRoomMemberToken(role, permissions, participantName, participantIdentity);
}
/**

View File

@ -1,4 +1,3 @@
import { RoomAgentDispatch, RoomConfiguration } from '@livekit/protocol';
import {
MeetRoomMemberPermissions,
MeetRoomMemberRole,
@ -42,8 +41,7 @@ export class TokenService {
role: MeetRoomMemberRole,
permissions: MeetRoomMemberPermissions,
participantName?: string,
participantIdentity?: string,
roomWithCaptions = false
participantIdentity?: string
): Promise<string> {
const metadata: MeetRoomMemberTokenMetadata = {
livekitUrl: MEET_ENV.LIVEKIT_URL,
@ -57,46 +55,16 @@ export class TokenService {
ttl: INTERNAL_CONFIG.ROOM_MEMBER_TOKEN_EXPIRATION,
metadata: JSON.stringify(metadata)
};
return await this.generateJwtToken(tokenOptions, permissions.livekit as VideoGrant, roomWithCaptions);
return await this.generateJwtToken(tokenOptions, permissions.livekit as VideoGrant);
}
private async generateJwtToken(
tokenOptions: AccessTokenOptions,
grants?: VideoGrant,
roomWithCaptions = false
): Promise<string> {
private async generateJwtToken(tokenOptions: AccessTokenOptions, grants?: VideoGrant): Promise<string> {
const at = new AccessToken(MEET_ENV.LIVEKIT_API_KEY, MEET_ENV.LIVEKIT_API_SECRET, tokenOptions);
if (grants) {
at.addGrant(grants);
}
const captionsEnabledGlobally = MEET_ENV.CAPTIONS_ENABLED === 'true';
const captionsEnabledInRoom = Boolean(roomWithCaptions);
// Warn if configuration is inconsistent
if (!captionsEnabledGlobally) {
if (captionsEnabledInRoom) {
this.logger.warn(
`Captions feature is disabled in environment but Room is created with captions enabled. ` +
`Please enable captions in environment by setting MEET_CAPTIONS_ENABLED=true to ensure proper functionality.`
);
}
return await at.toJwt();
}
if (captionsEnabledInRoom) {
this.logger.debug('Activating Captions Agent. Configuring Room Agent Dispatch.');
at.roomConfig = new RoomConfiguration({
agents: [
new RoomAgentDispatch({
agentName: INTERNAL_CONFIG.CAPTIONS_AGENT_NAME
})
]
});
}
return await at.toJwt();
}

View File

@ -0,0 +1,42 @@
/**
* Assistant creation options
*/
export interface MeetCreateAssistantRequest {
// scope: MeetAssistantScope;
capabilities: MeetAssistantCapability[];
}
/**
* Defines the scope of an assistant, i.e. the resource(s) it is associated with.
*/
// export interface MeetAssistantScope {
// resourceType: MeetAssistantScopeResourceType;
// resourceIds: string[];
// }
/**
* Defines a capability that an assistant can have, such as live captions.
*/
export interface MeetAssistantCapability {
name: MeetAssistantCapabilityName;
}
/**
* Enumeration of supported assistant capabilities.
*/
export enum MeetAssistantCapabilityName {
LIVE_CAPTIONS = 'live_captions',
}
/**
* Enumeration of supported resource types that an assistant can be associated with.
*/
// export enum MeetAssistantScopeResourceType {
// MEETING = 'meeting',
// }
export interface MeetCreateAssistantResponse {
id: string;
status: 'active';
}

View File

@ -1,22 +1,24 @@
export * from './api-key.js';
export * from './auth-config.js';
export * from './global-config.js';
export * from './event.model.js';
export * from './global-config.js';
export * from './permissions/livekit-permissions.js';
export * from './permissions/meet-permissions.js';
export * from './sort-pagination.js';
export * from './room-member.js';
export * from './sort-pagination.js';
export * from './user.js';
export * from './ai-assistant.js';
export * from './analytics.js';
export * from './recording.model.js';
export * from './room-config.js';
export * from './room.js';
export * from './recording.model.js';
export * from './webhook.model.js';
export * from './analytics.js';
// Webcomponent types
export * from './webcomponent/command.model.js';
export * from './webcomponent/event.model.js';
export * from './webcomponent/message.type.js';
export * from './webcomponent/properties.model.js';