test: refactor code in tests for garbage collection of orphaned locks, stale recordings and expired rooms

This commit is contained in:
juancarmore 2025-11-24 20:17:10 +01:00
parent f71b567823
commit 8ccc5d1a8b
6 changed files with 347 additions and 474 deletions

View File

@ -28,7 +28,7 @@ import { createApp, registerDependencies } from '../../src/server.js';
import { ApiKeyService } from '../../src/services/api-key.service.js';
import { GlobalConfigService } from '../../src/services/global-config.service.js';
import { RecordingService } from '../../src/services/recording.service.js';
import { RoomService } from '../../src/services/room.service.js';
import { RoomScheduledTasksService } from '../../src/services/room-scheduled-tasks.service.js';
const CREDENTIALS = {
admin: {
@ -352,15 +352,15 @@ export const deleteAllRooms = async () => {
/**
* Runs the expired rooms garbage collector.
*
* This function retrieves the RoomService from the dependency injection container
* This function retrieves the RoomScheduledTasksService from the dependency injection container
* and calls its deleteExpiredRooms method to clean up expired rooms.
* It then waits for 1 second before completing.
*/
export const runExpiredRoomsGC = async () => {
checkAppIsRunning();
const roomService = container.get(RoomService);
await (roomService as any)['deleteExpiredRooms']();
const roomTaskScheduler = container.get(RoomScheduledTasksService);
await roomTaskScheduler['deleteExpiredRooms']();
await sleep('1s');
};

View File

@ -1,438 +0,0 @@
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, jest } from '@jest/globals';
import { Lock } from '@sesamecare-oss/redlock';
import { EgressInfo, EgressStatus, Room } from 'livekit-server-sdk';
import ms from 'ms';
import { container } from '../../../../src/config/dependency-injector.config.js';
import { INTERNAL_CONFIG } from '../../../../src/config/internal-config.js';
import { MeetLock } from '../../../../src/helpers/redis.helper.js';
import { LiveKitService } from '../../../../src/services/livekit.service.js';
import { LoggerService } from '../../../../src/services/logger.service.js';
import { MutexService, RedisLock } from '../../../../src/services/mutex.service.js';
import { RecordingService } from '../../../../src/services/recording.service.js';
import { startTestServer } from '../../../helpers/request-helpers.js';
describe('Recording Garbage Collector Tests', () => {
let recordingService: RecordingService;
let mutexService: MutexService;
let livekitService: LiveKitService;
const getRecordingLock = (roomId: string) => MeetLock.getRecordingActiveLock(roomId);
const testRooms = {
recentLock: 'room-recent-lock',
withPublishers: 'room-with-publishers',
withoutPublishersWithRecording: 'room-without-publishers-with-recording',
withoutPublishersNoRecording: 'room-without-publishers-no-recording',
nonExistentWithRecording: 'room-non-existent-with-recording',
nonExistentNoRecording: 'room-non-existent-no-recording'
};
beforeAll(async () => {
await startTestServer();
recordingService = container.get(RecordingService);
mutexService = container.get(MutexService);
livekitService = container.get(LiveKitService);
// Mute logs for the test
const logger = container.get(LoggerService);
jest.spyOn(logger, 'debug').mockImplementation(() => {});
jest.spyOn(logger, 'verbose').mockImplementation(() => {});
jest.spyOn(logger, 'info').mockImplementation(() => {});
jest.spyOn(logger, 'warn').mockImplementation(() => {});
jest.spyOn(logger, 'error').mockImplementation(() => {});
});
beforeEach(async () => {
// Clean up any existing locks before each test
for (const roomId of Object.values(testRooms)) {
try {
await mutexService.release(getRecordingLock(roomId));
} catch (e) {
// Ignore errors if the lock does not exist
}
}
// Setup spies
jest.spyOn(mutexService, 'getLocksByPrefix');
jest.spyOn(mutexService, 'lockExists');
jest.spyOn(mutexService, 'getLockCreatedAt');
jest.spyOn(mutexService, 'release');
jest.spyOn(livekitService, 'roomExists');
jest.spyOn(livekitService, 'getRoom');
jest.spyOn(livekitService, 'getInProgressRecordingsEgress');
jest.spyOn(recordingService as never, 'performActiveRecordingLocksGC');
jest.spyOn(recordingService as never, 'evaluateAndReleaseOrphanedLock');
jest.clearAllMocks();
// Do not set up global mocks here to improve test isolation
});
afterEach(async () => {
// Clean up all spies and its invocations
jest.clearAllMocks();
jest.restoreAllMocks();
// Explicitly restore the mock behavior for getLockCreatedAt
if (mutexService.getLockCreatedAt && jest.isMockFunction(mutexService.getLockCreatedAt)) {
(mutexService.getLockCreatedAt as jest.Mock).mockReset();
}
});
afterAll(async () => {
// Clean up all test locks
for (const roomId of Object.values(testRooms)) {
try {
await mutexService.release(getRecordingLock(roomId));
} catch (e) {
// Ignore errors if the lock does not exist
}
}
// Restore all mocks
jest.restoreAllMocks();
});
/**
* Creates a test lock with a specified age
*/
async function createTestLock(roomId: string, ageMs = 0): Promise<Lock | null> {
const lockName = getRecordingLock(roomId);
const lock = await mutexService.acquire(lockName, ms(INTERNAL_CONFIG.RECORDING_ACTIVE_LOCK_TTL));
if (ageMs > 0) {
// Mock getLockCreatedAt to simulate lock age
(mutexService.getLockCreatedAt as jest.Mock).mockImplementationOnce((...args) => {
const lockKey = args[0] as string;
if (lockKey === lockName) {
return Date.now() - ageMs;
}
return Date.now(); // Default for other locks
});
}
return lock;
}
describe('Perform Recording Locks Garbage Collection', () => {
it('should not process any locks when the system has no active recording locks', async () => {
// Simulate empty response from lock service
(mutexService.getLocksByPrefix as jest.Mock).mockResolvedValueOnce([] as never);
// Execute the garbage collector
await recordingService['performActiveRecordingLocksGC']();
// Verify that we checked for locks but didn't attempt to process any
expect(mutexService.getLocksByPrefix).toHaveBeenCalled();
expect((recordingService as any).evaluateAndReleaseOrphanedLock).not.toHaveBeenCalled();
});
it('should gracefully handle database errors during lock retrieval', async () => {
// Simulate database connection failure or other error
(mutexService.getLocksByPrefix as jest.Mock).mockRejectedValueOnce(
new Error('Failed to retrieve locks') as never
);
// Execute the garbage collector - should not throw
await recordingService['performActiveRecordingLocksGC']();
// Verify the error was handled properly without further processing
expect(mutexService.getLocksByPrefix).toHaveBeenCalled();
expect((recordingService as any).evaluateAndReleaseOrphanedLock).not.toHaveBeenCalled();
});
it('should process each recording lock to detect and clean orphaned resources', async () => {
// Create mock locks representing different recording scenarios
const testLockResources = [
getRecordingLock(testRooms.withPublishers),
getRecordingLock(testRooms.withoutPublishersNoRecording),
getRecordingLock(testRooms.nonExistentNoRecording)
];
// Simulate existing locks in the system
(mutexService.getLocksByPrefix as jest.Mock).mockResolvedValueOnce(
testLockResources.map((resource) => ({ resources: [resource] }) as RedisLock) as never
);
// Execute the garbage collector
await recordingService['performActiveRecordingLocksGC']();
// Verify that each lock was processed individually
expect((recordingService as any).evaluateAndReleaseOrphanedLock).toHaveBeenCalledTimes(3);
expect((recordingService as any).evaluateAndReleaseOrphanedLock).toHaveBeenCalledWith(
testRooms.withPublishers,
expect.any(String)
);
expect((recordingService as any).evaluateAndReleaseOrphanedLock).toHaveBeenCalledWith(
testRooms.withoutPublishersNoRecording,
expect.any(String)
);
expect((recordingService as any).evaluateAndReleaseOrphanedLock).toHaveBeenCalledWith(
testRooms.nonExistentNoRecording,
expect.any(String)
);
});
});
describe('Evaluate and Release Orphaned Lock', () => {
it('should skip processing if the lock no longer exists', async () => {
// Simulate lock does not exist
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(false as never);
const roomId = testRooms.withPublishers;
// Execute evaluateAndReleaseOrphanedLock
await recordingService['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
const lockKey = `prefix_${roomId}`;
expect(mutexService.lockExists).toHaveBeenCalledWith(lockKey);
expect(mutexService.lockExists).toReturnWith(Promise.resolve(false));
// Verify that no further checks were performed
expect(mutexService.getLockCreatedAt).not.toHaveBeenCalled();
expect(livekitService.roomExists).not.toHaveBeenCalled();
expect(mutexService.release).not.toHaveBeenCalled();
});
it('should skip processing if the lock is too recent', async () => {
// Simulate lock exists
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
// Simulate lock is recent (20 seconds old)
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - 20000) as never);
// Execute evaluateAndReleaseOrphanedLock
await recordingService['evaluateAndReleaseOrphanedLock'](testRooms.recentLock, 'prefix_');
// Verify that lock age was checked but no further processing occurred
expect(mutexService.getLockCreatedAt).toHaveBeenCalled();
expect(livekitService.roomExists).not.toHaveBeenCalled();
expect(mutexService.release).not.toHaveBeenCalled();
});
it('should release lock for a room with no publishers and no active recordings', async () => {
const roomId = testRooms.withoutPublishersNoRecording;
// Simulate lock exists and is old enough
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never); // 5 minutes old
// Configure mocks específicos para este test
(livekitService.roomExists as jest.Mock).mockResolvedValueOnce(true as never);
(livekitService.getRoom as jest.Mock).mockResolvedValueOnce({
numParticipants: 0,
numPublishers: 0
} as Room as never);
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([] as never);
// Create actual test lock
await createTestLock(roomId, ms('5m'));
// Execute evaluateAndReleaseOrphanedLock
await recordingService['evaluateAndReleaseOrphanedLock'](
roomId,
getRecordingLock(roomId).replace(/[^:]+$/, '')
);
// Check that release was called with correct lock name
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
expect(livekitService.getRoom).toHaveBeenCalledWith(roomId);
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
expect(mutexService.release).toHaveBeenCalled();
});
it('should release the lock for a room with active recordings and lack of publishers', async () => {
const roomId = testRooms.withoutPublishersWithRecording;
// Simulate lock exists and is old enough
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
jest.useFakeTimers();
const now = 1_000_000;
jest.setSystemTime(now);
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((now - ms('5m')) as never); // 5 minutes ago
// Configure mocks específicos para este test
(livekitService.roomExists as jest.Mock).mockResolvedValue(true as never);
(livekitService.getRoom as jest.Mock).mockResolvedValue({
numParticipants: 0,
numPublishers: 0
} as Room as never);
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValue([
{
egressId: `EG_${roomId}`,
status: EgressStatus.EGRESS_ACTIVE
} as EgressInfo
] as never);
// Create actual test lock
await createTestLock(roomId, ms('5m'));
// Execute evaluateAndReleaseOrphanedLock
await recordingService['evaluateAndReleaseOrphanedLock'](
roomId,
getRecordingLock(roomId).replace(/[^:]+$/, '')
);
expect(mutexService.getLockCreatedAt).toHaveBeenCalled();
expect(mutexService.lockExists).toHaveBeenCalled();
// Verify lock is kept (release not called)
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
expect(livekitService.roomExists).toReturnWith(Promise.resolve(true));
expect(livekitService.getRoom).toHaveBeenCalledWith(roomId);
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
expect(livekitService.getInProgressRecordingsEgress).toReturnWith(
Promise.resolve([
{
egressId: `EG_${roomId}`,
status: EgressStatus.EGRESS_ACTIVE
}
])
);
expect(mutexService.release).toHaveBeenCalled();
});
it('should keep lock for a room with active recordings and with publishers', async () => {
const roomId = testRooms.withoutPublishersWithRecording;
// Simulate lock exists and is old enough
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
// Configure specific mocks for this test
(livekitService.roomExists as jest.Mock).mockResolvedValueOnce(true as never);
(livekitService.getRoom as jest.Mock).mockResolvedValueOnce({
numParticipants: 0,
numPublishers: 0
} as Room as never);
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([
{
egressId: `EG_${roomId}`,
status: EgressStatus.EGRESS_ACTIVE
} as EgressInfo
] as never);
// Create actual test lock
await createTestLock(roomId, ms('5m'));
// Execute evaluateAndReleaseOrphanedLock
await recordingService['evaluateAndReleaseOrphanedLock'](
roomId,
getRecordingLock(roomId).replace(/[^:]+$/, '')
);
expect(mutexService.getLockCreatedAt).toHaveBeenCalled();
expect(mutexService.lockExists).toHaveBeenCalled();
// Verify lock is kept (release not called)
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
expect(livekitService.roomExists).toReturnWith(Promise.resolve(true));
expect(livekitService.getRoom).toHaveBeenCalledWith(roomId);
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
expect(livekitService.getInProgressRecordingsEgress).toReturnWith(
Promise.resolve([
{
egressId: `EG_${roomId}`,
status: EgressStatus.EGRESS_ACTIVE
}
])
);
expect(mutexService.release).not.toHaveBeenCalled();
});
it('should release the lock for a non-existent room with active recordings', async () => {
const roomId = testRooms.nonExistentWithRecording;
// Simulate lock exists and is old enough
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
// Configure specific mocks for this test
(livekitService.roomExists as jest.Mock).mockResolvedValueOnce(false as never);
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([
{
egressId: `EG_${roomId}`,
status: EgressStatus.EGRESS_ACTIVE
} as EgressInfo
] as never);
// Create actual test lock
await createTestLock(roomId, ms('5m'));
// Execute evaluateAndReleaseOrphanedLock
await recordingService['evaluateAndReleaseOrphanedLock'](
roomId,
getRecordingLock(roomId).replace(/[^:]+$/, '')
);
// Verify lock is released despite room not existing
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
expect(livekitService.getRoom).not.toHaveBeenCalled(); // Room doesn't exist
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
expect(mutexService.release).toHaveBeenCalled();
});
it('should release lock for a non-existent room with no active recordings', async () => {
// Simulate lock exists and is old enough
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
// Configure specific mocks for this test
(livekitService.roomExists as jest.Mock).mockResolvedValueOnce(false as never);
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([] as never);
// Create actual test lock
await createTestLock(testRooms.nonExistentNoRecording, ms('5m'));
// Execute evaluateAndReleaseOrphanedLock
await recordingService['evaluateAndReleaseOrphanedLock'](
testRooms.nonExistentNoRecording,
getRecordingLock(testRooms.nonExistentNoRecording).replace(/[^:]+$/, '')
);
// Verify lock is released for non-existent room with no recordings
expect(livekitService.roomExists).toHaveBeenCalledWith(testRooms.nonExistentNoRecording);
expect(livekitService.getRoom).not.toHaveBeenCalled(); // Room doesn't exist
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(testRooms.nonExistentNoRecording);
expect(mutexService.release).toHaveBeenCalled();
});
it('should handle errors during room existence check', async () => {
// Simulate lock exists and is old enough
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
// Simulate error during roomExists check
(livekitService.roomExists as jest.Mock).mockRejectedValueOnce(new Error('Failed to check room') as never);
// Execute evaluateAndReleaseOrphanedLock and expect error to propagate
await expect(
recordingService['evaluateAndReleaseOrphanedLock'](testRooms.withPublishers, 'prefix_')
).rejects.toThrow('Failed to check room');
// Verify that process stopped at roomExists
expect(livekitService.getRoom).not.toHaveBeenCalled();
expect(mutexService.release).not.toHaveBeenCalled();
});
it('should handle errors during lock release', async () => {
// Simulate lock exists and is old enough
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
jest.useFakeTimers();
const now = 1_000_000;
jest.setSystemTime(now);
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((now - ms('5m')) as never);
// Configure for lock release scenario
(livekitService.roomExists as jest.Mock).mockResolvedValueOnce(false as never);
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([] as never);
// Simulate error during release
(mutexService.release as jest.Mock).mockRejectedValueOnce(new Error('Failed to release lock') as never);
// Execute evaluateAndReleaseOrphanedLock and expect error to propagate
await expect(
recordingService['evaluateAndReleaseOrphanedLock'](testRooms.nonExistentNoRecording, 'prefix_')
).rejects.toThrow('Failed to release lock');
});
});
});

View File

@ -0,0 +1,305 @@
import { afterEach, beforeAll, describe, expect, it, jest } from '@jest/globals';
import { Lock } from '@sesamecare-oss/redlock';
import { SpiedFunction } from 'jest-mock';
import { EgressInfo, Room } from 'livekit-server-sdk';
import ms from 'ms';
import { container } from '../../../../src/config/dependency-injector.config.js';
import { MeetLock } from '../../../../src/helpers/redis.helper.js';
import { LiveKitService } from '../../../../src/services/livekit.service.js';
import { LoggerService } from '../../../../src/services/logger.service.js';
import { MutexService } from '../../../../src/services/mutex.service.js';
import { RecordingScheduledTasksService } from '../../../../src/services/recording-scheduled-tasks.service.js';
import { startTestServer } from '../../../helpers/request-helpers.js';
describe('Orphaned Active Recording Locks GC Tests', () => {
let recordingTaskScheduler: RecordingScheduledTasksService;
let mutexService: MutexService;
let livekitService: LiveKitService;
// Mock functions
let getLocksByPrefixMock: SpiedFunction<(pattern: string) => Promise<Lock[]>>;
let lockExistsMock: SpiedFunction<(key: string) => Promise<boolean>>;
let getLockCreatedAtMock: SpiedFunction<(key: string) => Promise<number | null>>;
let releaseMock: SpiedFunction<(key: string) => Promise<void>>;
let roomExistsMock: SpiedFunction<(roomName: string) => Promise<boolean>>;
let getRoomMock: SpiedFunction<(roomName: string) => Promise<Room>>;
let getInProgressRecordingsEgressMock: SpiedFunction<(roomName?: string) => Promise<EgressInfo[]>>;
let evaluateAndReleaseOrphanedLockMock: SpiedFunction<(roomId: string, lockPrefix: string) => Promise<void>>;
beforeAll(async () => {
await startTestServer();
recordingTaskScheduler = container.get(RecordingScheduledTasksService);
mutexService = container.get(MutexService);
livekitService = container.get(LiveKitService);
// Mute logs for the test
const logger = container.get(LoggerService);
jest.spyOn(logger, 'debug').mockImplementation(() => {});
jest.spyOn(logger, 'verbose').mockImplementation(() => {});
jest.spyOn(logger, 'info').mockImplementation(() => {});
jest.spyOn(logger, 'warn').mockImplementation(() => {});
jest.spyOn(logger, 'error').mockImplementation(() => {});
// Setup spies and store mock references
getLocksByPrefixMock = jest.spyOn(mutexService, 'getLocksByPrefix');
lockExistsMock = jest.spyOn(mutexService, 'lockExists');
getLockCreatedAtMock = jest.spyOn(mutexService, 'getLockCreatedAt');
releaseMock = jest.spyOn(mutexService, 'release');
roomExistsMock = jest.spyOn(livekitService, 'roomExists');
getRoomMock = jest.spyOn(livekitService, 'getRoom');
getInProgressRecordingsEgressMock = jest.spyOn(livekitService, 'getInProgressRecordingsEgress');
evaluateAndReleaseOrphanedLockMock = jest.spyOn(
recordingTaskScheduler as never,
'evaluateAndReleaseOrphanedLock'
);
// Default mock implementations
releaseMock.mockResolvedValue();
});
afterEach(() => {
jest.clearAllMocks();
});
describe('performActiveRecordingLocksGC', () => {
it('should not process any locks when the system has no active recording locks', async () => {
// Simulate empty response from lock service
getLocksByPrefixMock.mockResolvedValueOnce([]);
// Execute the garbage collector
await recordingTaskScheduler['performActiveRecordingLocksGC']();
// Verify that we checked for locks but didn't attempt to process any
expect(getLocksByPrefixMock).toHaveBeenCalled();
expect(evaluateAndReleaseOrphanedLockMock).not.toHaveBeenCalled();
});
it('should gracefully handle database errors during lock retrieval', async () => {
// Simulate database connection failure or other error
getLocksByPrefixMock.mockRejectedValueOnce(new Error('Failed to retrieve locks'));
// Execute the garbage collector - should not throw
await recordingTaskScheduler['performActiveRecordingLocksGC']();
// Verify the error was handled properly without further processing
expect(getLocksByPrefixMock).toHaveBeenCalled();
expect(evaluateAndReleaseOrphanedLockMock).not.toHaveBeenCalled();
});
it('should process each recording lock to detect and clean orphaned resources', async () => {
// Create mock locks representing different recording scenarios
const testLockResources = [
MeetLock.getRecordingActiveLock('room-1'),
MeetLock.getRecordingActiveLock('room-2'),
MeetLock.getRecordingActiveLock('room-3')
];
// Simulate existing locks in the system
getLocksByPrefixMock.mockResolvedValueOnce(
testLockResources.map((resource) => ({ resources: [resource] }) as Lock)
);
// Execute the garbage collector
await recordingTaskScheduler['performActiveRecordingLocksGC']();
// Verify that each lock was processed individually
expect(evaluateAndReleaseOrphanedLockMock).toHaveBeenCalledTimes(3);
expect(evaluateAndReleaseOrphanedLockMock).toHaveBeenCalledWith('room-1', expect.any(String));
expect(evaluateAndReleaseOrphanedLockMock).toHaveBeenCalledWith('room-2', expect.any(String));
expect(evaluateAndReleaseOrphanedLockMock).toHaveBeenCalledWith('room-3', expect.any(String));
});
});
describe('evaluateAndReleaseOrphanedLock', () => {
it('should skip processing if the lock no longer exists', async () => {
const roomId = 'test-room';
// Simulate lock does not exist
lockExistsMock.mockResolvedValueOnce(false);
// Execute evaluateAndReleaseOrphanedLock
await recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
const lockKey = `prefix_${roomId}`;
expect(lockExistsMock).toHaveBeenCalledWith(lockKey);
// Verify that no further checks were performed
expect(getLockCreatedAtMock).not.toHaveBeenCalled();
expect(roomExistsMock).not.toHaveBeenCalled();
expect(releaseMock).not.toHaveBeenCalled();
});
it('should skip processing if the lock is too recent', async () => {
const roomId = 'test-room';
// Simulate lock exists
lockExistsMock.mockResolvedValueOnce(true);
// Simulate lock is recent (20 seconds old)
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - 20000);
// Execute evaluateAndReleaseOrphanedLock
await recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
// Verify that lock age was checked but no further processing occurred
expect(getLockCreatedAtMock).toHaveBeenCalled();
expect(roomExistsMock).not.toHaveBeenCalled();
expect(releaseMock).not.toHaveBeenCalled();
});
it('should release lock for a room with no publishers and no active recordings', async () => {
const roomId = 'test-room';
// Simulate lock exists and is old enough
lockExistsMock.mockResolvedValue(true);
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - ms('5m')); // 5 minutes old
// Configure specific mocks for this test
roomExistsMock.mockResolvedValueOnce(true);
getRoomMock.mockResolvedValueOnce({
numPublishers: 0
} as Room);
getInProgressRecordingsEgressMock.mockResolvedValueOnce([]);
// Execute evaluateAndReleaseOrphanedLock
await recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
// Check that release was called with correct lock name
expect(roomExistsMock).toHaveBeenCalledWith(roomId);
expect(getRoomMock).toHaveBeenCalledWith(roomId);
expect(getInProgressRecordingsEgressMock).toHaveBeenCalledWith(roomId);
expect(releaseMock).toHaveBeenCalledWith(`prefix_${roomId}`);
});
it('should release the lock for a room with active recordings and lack of publishers', async () => {
const roomId = 'test-room';
// Simulate lock exists and is old enough
lockExistsMock.mockResolvedValue(true);
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - ms('5m')); // 5 minutes ago
// Configure specific mocks for this test
roomExistsMock.mockResolvedValue(true);
getRoomMock.mockResolvedValue({
numPublishers: 0
} as Room);
getInProgressRecordingsEgressMock.mockResolvedValue([{} as EgressInfo]);
// Execute evaluateAndReleaseOrphanedLock
await recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
// Check that release was called with correct lock name
expect(roomExistsMock).toHaveBeenCalledWith(roomId);
expect(getRoomMock).toHaveBeenCalledWith(roomId);
expect(getInProgressRecordingsEgressMock).toHaveBeenCalledWith(roomId);
expect(releaseMock).toHaveBeenCalledWith(`prefix_${roomId}`);
});
it('should keep lock for a room with active recordings and with publishers', async () => {
const roomId = 'test-room';
// Simulate lock exists and is old enough
lockExistsMock.mockResolvedValueOnce(true);
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - ms('5m'));
// Configure specific mocks for this test
roomExistsMock.mockResolvedValueOnce(true);
getRoomMock.mockResolvedValueOnce({
numPublishers: 1
} as Room);
getInProgressRecordingsEgressMock.mockResolvedValueOnce([{} as EgressInfo]);
// Execute evaluateAndReleaseOrphanedLock
await recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
// Verify lock is kept (release not called)
expect(roomExistsMock).toHaveBeenCalledWith(roomId);
expect(getRoomMock).toHaveBeenCalledWith(roomId);
expect(getInProgressRecordingsEgressMock).toHaveBeenCalledWith(roomId);
expect(releaseMock).not.toHaveBeenCalled();
});
it('should release the lock for a non-existent room with active recordings', async () => {
const roomId = 'test-room';
// Simulate lock exists and is old enough
lockExistsMock.mockResolvedValue(true);
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - ms('5m'));
// Configure specific mocks for this test
roomExistsMock.mockResolvedValueOnce(false);
getInProgressRecordingsEgressMock.mockResolvedValueOnce([{} as EgressInfo]);
// Execute evaluateAndReleaseOrphanedLock
await recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
// Check that release was called with correct lock name
expect(roomExistsMock).toHaveBeenCalledWith(roomId);
expect(getRoomMock).not.toHaveBeenCalled(); // Room doesn't exist
expect(getInProgressRecordingsEgressMock).toHaveBeenCalledWith(roomId);
expect(releaseMock).toHaveBeenCalledWith(`prefix_${roomId}`);
});
it('should release lock for a non-existent room with no active recordings', async () => {
const roomId = 'test-room';
// Simulate lock exists and is old enough
lockExistsMock.mockResolvedValue(true);
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - ms('5m'));
// Configure specific mocks for this test
roomExistsMock.mockResolvedValueOnce(false);
getInProgressRecordingsEgressMock.mockResolvedValueOnce([]);
// Execute evaluateAndReleaseOrphanedLock
await recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
// Check that release was called with correct lock name
expect(roomExistsMock).toHaveBeenCalledWith(roomId);
expect(getRoomMock).not.toHaveBeenCalled(); // Room doesn't exist
expect(getInProgressRecordingsEgressMock).toHaveBeenCalledWith(roomId);
expect(releaseMock).toHaveBeenCalledWith(`prefix_${roomId}`);
});
it('should handle errors during room existence check', async () => {
const roomId = 'test-room';
// Simulate lock exists and is old enough
lockExistsMock.mockResolvedValueOnce(true);
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - ms('5m'));
// Simulate error during roomExists check
roomExistsMock.mockRejectedValueOnce(new Error('Failed to check room'));
getInProgressRecordingsEgressMock.mockResolvedValueOnce([]);
// Execute evaluateAndReleaseOrphanedLock and expect error to propagate
await expect(recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_')).rejects.toThrow(
'Failed to check room'
);
// Verify that process stopped at roomExists
expect(getRoomMock).not.toHaveBeenCalled();
expect(releaseMock).not.toHaveBeenCalled();
});
it('should handle errors during lock release', async () => {
const roomId = 'test-room';
// Simulate lock exists and is old enough
lockExistsMock.mockResolvedValue(true);
getLockCreatedAtMock.mockResolvedValueOnce(Date.now() - ms('5m'));
// Configure specific mocks for this test
roomExistsMock.mockResolvedValueOnce(false);
getInProgressRecordingsEgressMock.mockResolvedValueOnce([]);
// Simulate error during release
releaseMock.mockRejectedValueOnce(new Error('Failed to release lock'));
// Execute evaluateAndReleaseOrphanedLock and expect error to propagate
await expect(recordingTaskScheduler['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_')).rejects.toThrow(
'Failed to release lock'
);
});
});
});

View File

@ -1,6 +1,9 @@
import { afterEach, beforeAll, describe, expect, it, jest } from '@jest/globals';
import { container } from '../../../../src/config/dependency-injector.config.js';
import { setInternalConfig } from '../../../../src/config/internal-config.js';
import { DistributedEventType } from '../../../../src/models/distributed-event.model.js';
import { RecordingScheduledTasksService } from '../../../../src/services/recording-scheduled-tasks.service.js';
import { RecordingService } from '../../../../src/services/recording.service.js';
import {
expectValidStartRecordingResponse,
expectValidStopRecordingResponse
@ -25,8 +28,6 @@ import {
setupMultiRoomTestContext,
TestContext
} from '../../../helpers/test-scenarios';
import { container } from '../../../../src/config/dependency-injector.config.js';
import { RecordingService } from '../../../../src/services/recording.service.js';
describe('Recording API Race Conditions Tests', () => {
let context: TestContext | null = null;
@ -63,7 +64,7 @@ describe('Recording API Race Conditions Tests', () => {
});
const eventServiceOffSpy = jest.spyOn(recordingService['systemEventService'], 'off');
const handleRecordingLockTimeoutSpy = jest.spyOn(recordingService as any, 'handleRecordingTimeout');
const releaseLockSpy = jest.spyOn(recordingService as any, 'releaseRecordingLockIfNoEgress');
const releaseLockSpy = jest.spyOn(recordingService, 'releaseRecordingLockIfNoEgress');
try {
// Attempt to start recording
@ -117,7 +118,7 @@ describe('Recording API Race Conditions Tests', () => {
// Mock the handleRecordingLockTimeout method to prevent actual timeout handling
const handleTimeoutSpy = jest.spyOn(recordingService as any, 'handleRecordingTimeout');
// Mock the releaseRecordingLockIfNoEgress method to prevent actual lock release
const releaseLockSpy = jest.spyOn(recordingService as any, 'releaseRecordingLockIfNoEgress');
const releaseLockSpy = jest.spyOn(recordingService, 'releaseRecordingLockIfNoEgress');
const eventServiceOffSpy = jest.spyOn(recordingService['systemEventService'], 'off');
try {
@ -392,7 +393,8 @@ describe('Recording API Race Conditions Tests', () => {
context = await setupMultiRoomTestContext(1, true);
const roomData = context.getRoomByIndex(0)!;
const gcSpy = jest.spyOn(recordingService as any, 'performActiveRecordingLocksGC');
const recordingTaskScheduler = container.get(RecordingScheduledTasksService);
const gcSpy = jest.spyOn(recordingTaskScheduler as any, 'performActiveRecordingLocksGC');
const startResponse = await startRecording(roomData.room.roomId, roomData.moderatorToken);
expectValidStartRecordingResponse(startResponse, roomData.room.roomId, roomData.room.roomName);
@ -400,7 +402,7 @@ describe('Recording API Race Conditions Tests', () => {
// Execute garbage collection while stopping the recording
const stopPromise = stopRecording(recordingId, roomData.moderatorToken);
const gcPromise = recordingService['performActiveRecordingLocksGC']();
const gcPromise = recordingTaskScheduler['performActiveRecordingLocksGC']();
// Both operations should complete

View File

@ -1,4 +1,4 @@
import { afterEach, beforeAll, beforeEach, describe, expect, it, jest } from '@jest/globals';
import { afterEach, beforeAll, describe, expect, it, jest } from '@jest/globals';
import { MeetRecordingInfo, MeetRecordingStatus } from '@openvidu-meet/typings';
import { SpiedFunction } from 'jest-mock';
import { EgressInfo, EgressStatus } from 'livekit-server-sdk';
@ -8,29 +8,32 @@ import { INTERNAL_CONFIG } from '../../../../src/config/internal-config.js';
import { RecordingRepository } from '../../../../src/repositories/recording.repository.js';
import { LiveKitService } from '../../../../src/services/livekit.service.js';
import { LoggerService } from '../../../../src/services/logger.service.js';
import { RecordingScheduledTasksService } from '../../../../src/services/recording-scheduled-tasks.service.js';
import { RecordingService } from '../../../../src/services/recording.service.js';
import { startTestServer } from '../../../helpers/request-helpers.js';
describe('Stale Recordings GC Tests', () => {
let recordingService: RecordingService;
let recordingTaskScheduler: RecordingScheduledTasksService;
// Mock functions
let findActiveRecordingsMock: SpiedFunction<() => Promise<MeetRecordingInfo[]>>;
let roomExistsMock: SpiedFunction<(roomId: string) => Promise<boolean>>;
let roomHasParticipantsMock: SpiedFunction<(roomId: string) => Promise<boolean>>;
let getInProgressRecordingsEgressMock: SpiedFunction<() => Promise<EgressInfo[]>>;
let roomExistsMock: SpiedFunction<(roomName: string) => Promise<boolean>>;
let roomHasParticipantsMock: SpiedFunction<(roomName: string) => Promise<boolean>>;
let getInProgressRecordingsEgressMock: SpiedFunction<(roomName?: string) => Promise<EgressInfo[]>>;
let stopEgressMock: SpiedFunction<(egressId: string) => Promise<EgressInfo>>;
let evaluateAndAbortStaleRecordingMock: SpiedFunction<(recording: MeetRecordingInfo) => Promise<boolean>>;
let updateRecordingStatusMock: SpiedFunction<(recordingId: string, status: MeetRecordingStatus) => Promise<void>>;
beforeAll(async () => {
await startTestServer();
recordingService = container.get(RecordingService);
const logger = container.get(LoggerService);
const recordingService = container.get(RecordingService);
const recordingRepository = container.get(RecordingRepository);
const livekitService = container.get(LiveKitService);
recordingTaskScheduler = container.get(RecordingScheduledTasksService);
// Mute logs for the test
const logger = container.get(LoggerService);
jest.spyOn(logger, 'debug').mockImplementation(() => {});
jest.spyOn(logger, 'verbose').mockImplementation(() => {});
jest.spyOn(logger, 'info').mockImplementation(() => {});
@ -43,12 +46,13 @@ describe('Stale Recordings GC Tests', () => {
roomHasParticipantsMock = jest.spyOn(livekitService, 'roomHasParticipants');
getInProgressRecordingsEgressMock = jest.spyOn(livekitService, 'getInProgressRecordingsEgress');
stopEgressMock = jest.spyOn(livekitService, 'stopEgress');
evaluateAndAbortStaleRecordingMock = jest.spyOn(recordingService as never, 'evaluateAndAbortStaleRecording');
evaluateAndAbortStaleRecordingMock = jest.spyOn(
recordingTaskScheduler as never,
'evaluateAndAbortStaleRecording'
);
updateRecordingStatusMock = jest.spyOn(recordingService as never, 'updateRecordingStatus');
});
beforeEach(() => {
// Reset common mocks to default implementations
// Default mock implementations
updateRecordingStatusMock.mockResolvedValue();
stopEgressMock.mockResolvedValue({} as EgressInfo);
});
@ -113,7 +117,7 @@ describe('Stale Recordings GC Tests', () => {
findActiveRecordingsMock.mockResolvedValueOnce([]);
// Execute the stale recordings cleanup
await recordingService['performStaleRecordingsGC']();
await recordingTaskScheduler['performStaleRecordingsGC']();
// Verify that we checked for recordings but didn't attempt to process any
expect(findActiveRecordingsMock).toHaveBeenCalled();
@ -125,7 +129,7 @@ describe('Stale Recordings GC Tests', () => {
findActiveRecordingsMock.mockRejectedValueOnce(new Error('Failed to retrieve recordings'));
// Execute the stale recordings cleanup - should not throw
await recordingService['performStaleRecordingsGC']();
await recordingTaskScheduler['performStaleRecordingsGC']();
// Verify the error was handled properly without further processing
expect(findActiveRecordingsMock).toHaveBeenCalled();
@ -146,7 +150,7 @@ describe('Stale Recordings GC Tests', () => {
getInProgressRecordingsEgressMock.mockResolvedValue([]);
// Execute the stale recordings cleanup
await recordingService['performStaleRecordingsGC']();
await recordingTaskScheduler['performStaleRecordingsGC']();
// Verify that each recording was processed individually
expect(evaluateAndAbortStaleRecordingMock).toHaveBeenCalledTimes(3);
@ -166,7 +170,7 @@ describe('Stale Recordings GC Tests', () => {
getInProgressRecordingsEgressMock.mockResolvedValueOnce([]);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the recording was aborted without calling stopEgress
expect(result).toBe(true);
@ -187,7 +191,7 @@ describe('Stale Recordings GC Tests', () => {
getInProgressRecordingsEgressMock.mockResolvedValueOnce([egressInfo]);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the method returned false (kept as fresh)
expect(result).toBe(false);
@ -210,7 +214,7 @@ describe('Stale Recordings GC Tests', () => {
roomExistsMock.mockResolvedValueOnce(true);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the method returned false (still fresh)
expect(result).toBe(false);
@ -233,7 +237,7 @@ describe('Stale Recordings GC Tests', () => {
roomExistsMock.mockResolvedValueOnce(false);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the recording was aborted
expect(result).toBe(true);
@ -257,7 +261,7 @@ describe('Stale Recordings GC Tests', () => {
roomHasParticipantsMock.mockResolvedValueOnce(false);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the recording was aborted
expect(result).toBe(true);
@ -282,7 +286,7 @@ describe('Stale Recordings GC Tests', () => {
roomHasParticipantsMock.mockResolvedValueOnce(true);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the recording was kept fresh (not aborted)
expect(result).toBe(false);
@ -311,7 +315,7 @@ describe('Stale Recordings GC Tests', () => {
roomExistsMock.mockResolvedValueOnce(false);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the recording was kept fresh (threshold is not inclusive)
expect(result).toBe(false);
@ -330,7 +334,7 @@ describe('Stale Recordings GC Tests', () => {
getInProgressRecordingsEgressMock.mockRejectedValueOnce(new Error('LiveKit service unavailable'));
// Execute evaluateAndAbortStaleRecording and expect error to propagate
await expect(recordingService['evaluateAndAbortStaleRecording'](recording)).rejects.toThrow(
await expect(recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording)).rejects.toThrow(
'LiveKit service unavailable'
);
@ -351,7 +355,7 @@ describe('Stale Recordings GC Tests', () => {
stopEgressMock.mockRejectedValueOnce(new Error('Failed to stop egress'));
// Execute evaluateAndAbortStaleRecording and expect error to propagate
await expect(recordingService['evaluateAndAbortStaleRecording'](recording)).rejects.toThrow(
await expect(recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording)).rejects.toThrow(
'Failed to stop egress'
);
@ -373,7 +377,7 @@ describe('Stale Recordings GC Tests', () => {
roomExistsMock.mockResolvedValueOnce(true);
// Execute evaluateAndAbortStaleRecording and expect it to resolve to false
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
expect(result).toBe(false);
expect(getInProgressRecordingsEgressMock).toHaveBeenCalledWith(roomId);
@ -399,7 +403,7 @@ describe('Stale Recordings GC Tests', () => {
roomExistsMock.mockResolvedValueOnce(false);
// Execute evaluateAndAbortStaleRecording
const result = await recordingService['evaluateAndAbortStaleRecording'](recording);
const result = await recordingTaskScheduler['evaluateAndAbortStaleRecording'](recording);
// Verify that the correct egress was targeted
expect(result).toBe(true);

View File

@ -18,7 +18,7 @@ import {
startTestServer
} from '../../../helpers/request-helpers.js';
describe('Room Garbage Collector Tests', () => {
describe('Expired Rooms GC Tests', () => {
beforeAll(async () => {
setInternalConfig({
MIN_ROOM_AUTO_DELETE_DURATION: '0s'