backend: enhance orphaned lock handling in recording service and improve test coverage
This commit is contained in:
parent
8dbdceccee
commit
53a3c236ad
@ -719,20 +719,36 @@ export class RecordingService {
|
||||
const lockKey = `${lockPrefix}${roomId}`;
|
||||
const gracePeriodMs = ms(INTERNAL_CONFIG.RECORDING_ORPHANED_LOCK_GRACE_PERIOD);
|
||||
|
||||
const safeLockRelease = async (lockKey: string) => {
|
||||
const stillExists = await this.mutexService.lockExists(lockKey);
|
||||
|
||||
if (stillExists) {
|
||||
await this.mutexService.release(lockKey);
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
// Verify if the lock still exists before proceeding to check the room
|
||||
const [lockExists, lockCreatedAt] = await Promise.all([
|
||||
this.mutexService.lockExists(lockKey),
|
||||
this.mutexService.getLockCreatedAt(lockKey)
|
||||
]);
|
||||
// Verify if the lock still exists
|
||||
const lockExists = await this.mutexService.lockExists(lockKey);
|
||||
|
||||
if (!lockExists) {
|
||||
this.logger.debug(`Lock for room ${roomId} no longer exists, skipping cleanup`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the lock creation timestamp
|
||||
const lockCreatedAt = await this.mutexService.getLockCreatedAt(lockKey);
|
||||
|
||||
if (lockCreatedAt == null) {
|
||||
this.logger.warn(
|
||||
`Lock for room ${roomId} reported as existing but has no creation date. Treating as orphaned.`
|
||||
);
|
||||
await safeLockRelease(lockKey);
|
||||
return;
|
||||
}
|
||||
|
||||
// Verify if the lock is too recent
|
||||
const lockAge = Date.now() - (lockCreatedAt || Date.now());
|
||||
const lockAge = Date.now() - lockCreatedAt;
|
||||
|
||||
if (lockAge < gracePeriodMs) {
|
||||
this.logger.debug(
|
||||
@ -747,22 +763,27 @@ export class RecordingService {
|
||||
]);
|
||||
|
||||
if (lkRoomExists) {
|
||||
this.logger.debug(`Room ${roomId} exists, checking recordings`);
|
||||
} else {
|
||||
this.logger.debug(`Room ${roomId} no longer exists, checking for in-progress recordings`);
|
||||
const lkRoom = await this.livekitService.getRoom(roomId);
|
||||
const hasPublishers = lkRoom.numPublishers > 0;
|
||||
|
||||
if (hasPublishers) {
|
||||
this.logger.debug(`Room ${roomId} exists, checking recordings`);
|
||||
const hasInProgressRecordings = inProgressRecordings.length > 0;
|
||||
|
||||
if (hasInProgressRecordings) {
|
||||
this.logger.debug(`Room ${roomId} has in-progress recordings, keeping lock`);
|
||||
return;
|
||||
}
|
||||
|
||||
// No in-progress recordings, releasing orphaned lock
|
||||
this.logger.info(`Room ${roomId} has no in-progress recordings, releasing orphaned lock`);
|
||||
await safeLockRelease(lockKey);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Verify if in-progress recordings exist
|
||||
const hasInProgressRecordings = inProgressRecordings.length > 0;
|
||||
|
||||
if (hasInProgressRecordings) {
|
||||
this.logger.debug(`Room ${roomId} has in-progress recordings, keeping lock`);
|
||||
return;
|
||||
}
|
||||
|
||||
// No in-progress recordings, releasing orphaned lock
|
||||
this.logger.info(`Room ${roomId} has no in-progress recordings, releasing orphaned lock`);
|
||||
await this.mutexService.release(lockKey);
|
||||
this.logger.debug(`Room ${roomId} no longer exists or has no publishers, releasing orphaned lock`);
|
||||
await safeLockRelease(lockKey);
|
||||
} catch (error) {
|
||||
this.logger.error(`Error processing orphan lock for room ${roomId}:`, error);
|
||||
throw error;
|
||||
|
||||
@ -2,7 +2,7 @@ import { EventEmitter } from 'events';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { Redis, RedisOptions, SentinelAddress } from 'ioredis';
|
||||
import ms from 'ms';
|
||||
import { Redlock } from "@sesamecare-oss/redlock";
|
||||
import { Redlock } from '@sesamecare-oss/redlock';
|
||||
import {
|
||||
checkModuleEnabled,
|
||||
REDIS_DB,
|
||||
@ -176,8 +176,12 @@ export class RedisService extends EventEmitter {
|
||||
* @returns {Promise<boolean>} - A promise that resolves to `true` if the key exists, otherwise `false`.
|
||||
*/
|
||||
async exists(key: string): Promise<boolean> {
|
||||
const result = await this.get(key);
|
||||
return !!result;
|
||||
try {
|
||||
const result = await this.get(key);
|
||||
return !!result;
|
||||
} catch (error) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
get(key: string, hashKey?: string): Promise<string | null> {
|
||||
@ -261,7 +265,6 @@ export class RedisService extends EventEmitter {
|
||||
this.logger.verbose('Redis connections cleaned up');
|
||||
}
|
||||
|
||||
|
||||
private loadRedisConfig(): RedisOptions {
|
||||
// Check if openviduCall module is enabled. If not, exit the process
|
||||
checkModuleEnabled();
|
||||
@ -292,7 +295,7 @@ export class RedisService extends EventEmitter {
|
||||
password: REDIS_PASSWORD,
|
||||
name: REDIS_SENTINEL_MASTER_NAME,
|
||||
db: Number(REDIS_DB),
|
||||
maxRetriesPerRequest: null, // Infinite retries
|
||||
maxRetriesPerRequest: null // Infinite retries
|
||||
};
|
||||
} else {
|
||||
this.logger.verbose('Using Redis standalone');
|
||||
@ -302,7 +305,7 @@ export class RedisService extends EventEmitter {
|
||||
username: REDIS_USERNAME,
|
||||
password: REDIS_PASSWORD,
|
||||
db: Number(REDIS_DB),
|
||||
maxRetriesPerRequest: null, // Infinite retries
|
||||
maxRetriesPerRequest: null // Infinite retries
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,8 +184,13 @@ describe('Recording Garbage Collector Tests', () => {
|
||||
// Simulate lock does not exist
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(false as never);
|
||||
|
||||
const roomId = testRooms.withPublishers;
|
||||
// Execute evaluateAndReleaseOrphanedLock
|
||||
await recordingService['evaluateAndReleaseOrphanedLock'](testRooms.withPublishers, 'prefix_');
|
||||
await recordingService['evaluateAndReleaseOrphanedLock'](roomId, 'prefix_');
|
||||
|
||||
const lockKey = `prefix_${roomId}`;
|
||||
expect(mutexService.lockExists).toHaveBeenCalledWith(lockKey);
|
||||
expect(mutexService.lockExists).toReturnWith(Promise.resolve(false));
|
||||
|
||||
// Verify that no further checks were performed
|
||||
expect(mutexService.getLockCreatedAt).not.toHaveBeenCalled();
|
||||
@ -210,8 +215,9 @@ describe('Recording Garbage Collector Tests', () => {
|
||||
});
|
||||
|
||||
it('should release lock for a room with no publishers and no active recordings', async () => {
|
||||
const roomId = testRooms.withoutPublishersNoRecording;
|
||||
// Simulate lock exists and is old enough
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
|
||||
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never); // 5 minutes old
|
||||
|
||||
// Configure mocks específicos para este test
|
||||
@ -223,24 +229,75 @@ describe('Recording Garbage Collector Tests', () => {
|
||||
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([] as never);
|
||||
|
||||
// Create actual test lock
|
||||
await createTestLock(testRooms.withoutPublishersNoRecording, ms('5m'));
|
||||
await createTestLock(roomId, ms('5m'));
|
||||
|
||||
// Execute evaluateAndReleaseOrphanedLock
|
||||
await recordingService['evaluateAndReleaseOrphanedLock'](
|
||||
testRooms.withoutPublishersNoRecording,
|
||||
getRecordingLock(testRooms.withoutPublishersNoRecording).replace(/[^:]+$/, '')
|
||||
roomId,
|
||||
getRecordingLock(roomId).replace(/[^:]+$/, '')
|
||||
);
|
||||
|
||||
// Check that release was called with correct lock name
|
||||
expect(livekitService.roomExists).toHaveBeenCalledWith(testRooms.withoutPublishersNoRecording);
|
||||
expect(livekitService.getRoom).toHaveBeenCalledWith(testRooms.withoutPublishersNoRecording);
|
||||
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(
|
||||
testRooms.withoutPublishersNoRecording
|
||||
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.getRoom).toHaveBeenCalledWith(roomId);
|
||||
|
||||
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
|
||||
expect(mutexService.release).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should release the lock for a room with active recordings and lack of publishers', async () => {
|
||||
const roomId = testRooms.withoutPublishersWithRecording;
|
||||
// Simulate lock exists and is old enough
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
|
||||
|
||||
jest.useFakeTimers();
|
||||
const now = 1_000_000;
|
||||
jest.setSystemTime(now);
|
||||
|
||||
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((now - ms('5m')) as never); // 5 minutos ago
|
||||
|
||||
// Configure mocks específicos para este test
|
||||
(livekitService.roomExists as jest.Mock).mockResolvedValue(true as never);
|
||||
(livekitService.getRoom as jest.Mock).mockResolvedValue({
|
||||
numParticipants: 0,
|
||||
numPublishers: 0
|
||||
} as Room as never);
|
||||
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValue([
|
||||
{
|
||||
egressId: `EG_${roomId}`,
|
||||
status: EgressStatus.EGRESS_ACTIVE
|
||||
} as EgressInfo
|
||||
] as never);
|
||||
|
||||
// Create actual test lock
|
||||
await createTestLock(roomId, ms('5m'));
|
||||
|
||||
// Execute evaluateAndReleaseOrphanedLock
|
||||
await recordingService['evaluateAndReleaseOrphanedLock'](
|
||||
roomId,
|
||||
getRecordingLock(roomId).replace(/[^:]+$/, '')
|
||||
);
|
||||
|
||||
expect(mutexService.getLockCreatedAt).toHaveBeenCalled();
|
||||
expect(mutexService.lockExists).toHaveBeenCalled();
|
||||
// Verify lock is kept (release not called)
|
||||
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.roomExists).toReturnWith(Promise.resolve(true));
|
||||
expect(livekitService.getRoom).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.getInProgressRecordingsEgress).toReturnWith(
|
||||
Promise.resolve([
|
||||
{
|
||||
egressId: `EG_${roomId}`,
|
||||
status: EgressStatus.EGRESS_ACTIVE
|
||||
}
|
||||
])
|
||||
);
|
||||
expect(mutexService.release).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should keep lock for a room with active recordings regardless of publishers', async () => {
|
||||
it('should keep lock for a room with active recordings and with publishers', async () => {
|
||||
const roomId = testRooms.withoutPublishersWithRecording;
|
||||
// Simulate lock exists and is old enough
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
|
||||
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
|
||||
@ -253,64 +310,72 @@ describe('Recording Garbage Collector Tests', () => {
|
||||
} as Room as never);
|
||||
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([
|
||||
{
|
||||
egressId: `EG_${testRooms.withoutPublishersWithRecording}`,
|
||||
egressId: `EG_${roomId}`,
|
||||
status: EgressStatus.EGRESS_ACTIVE
|
||||
} as EgressInfo
|
||||
] as never);
|
||||
|
||||
// Create actual test lock
|
||||
await createTestLock(testRooms.withoutPublishersWithRecording, ms('5m'));
|
||||
await createTestLock(roomId, ms('5m'));
|
||||
|
||||
// Execute evaluateAndReleaseOrphanedLock
|
||||
await recordingService['evaluateAndReleaseOrphanedLock'](
|
||||
testRooms.withoutPublishersWithRecording,
|
||||
getRecordingLock(testRooms.withoutPublishersWithRecording).replace(/[^:]+$/, '')
|
||||
roomId,
|
||||
getRecordingLock(roomId).replace(/[^:]+$/, '')
|
||||
);
|
||||
|
||||
expect(mutexService.getLockCreatedAt).toHaveBeenCalled();
|
||||
expect(mutexService.lockExists).toHaveBeenCalled();
|
||||
// Verify lock is kept (release not called)
|
||||
expect(livekitService.roomExists).toHaveBeenCalledWith(testRooms.withoutPublishersWithRecording);
|
||||
expect(livekitService.getRoom).toHaveBeenCalledWith(testRooms.withoutPublishersWithRecording);
|
||||
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(
|
||||
testRooms.withoutPublishersWithRecording
|
||||
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.roomExists).toReturnWith(Promise.resolve(true));
|
||||
expect(livekitService.getRoom).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.getInProgressRecordingsEgress).toReturnWith(
|
||||
Promise.resolve([
|
||||
{
|
||||
egressId: `EG_${roomId}`,
|
||||
status: EgressStatus.EGRESS_ACTIVE
|
||||
}
|
||||
])
|
||||
);
|
||||
expect(mutexService.release).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should keep lock for a non-existent room with active recordings', async () => {
|
||||
it('should release the lock for a non-existent room with active recordings', async () => {
|
||||
const roomId = testRooms.nonExistentWithRecording;
|
||||
// Simulate lock exists and is old enough
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
|
||||
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
|
||||
|
||||
// Configure mocks específicos para este test
|
||||
(livekitService.roomExists as jest.Mock).mockResolvedValueOnce(false as never);
|
||||
(livekitService.getInProgressRecordingsEgress as jest.Mock).mockResolvedValueOnce([
|
||||
{
|
||||
egressId: `EG_${testRooms.nonExistentWithRecording}`,
|
||||
egressId: `EG_${roomId}`,
|
||||
status: EgressStatus.EGRESS_ACTIVE
|
||||
} as EgressInfo
|
||||
] as never);
|
||||
|
||||
// Create actual test lock
|
||||
await createTestLock(testRooms.nonExistentWithRecording, ms('5m'));
|
||||
await createTestLock(roomId, ms('5m'));
|
||||
|
||||
// Execute evaluateAndReleaseOrphanedLock
|
||||
await recordingService['evaluateAndReleaseOrphanedLock'](
|
||||
testRooms.nonExistentWithRecording,
|
||||
getRecordingLock(testRooms.nonExistentWithRecording).replace(/[^:]+$/, '')
|
||||
roomId,
|
||||
getRecordingLock(roomId).replace(/[^:]+$/, '')
|
||||
);
|
||||
|
||||
// Verify lock is kept despite room not existing
|
||||
expect(livekitService.roomExists).toHaveBeenCalledWith(testRooms.nonExistentWithRecording);
|
||||
// Verify lock is released despite room not existing
|
||||
expect(livekitService.roomExists).toHaveBeenCalledWith(roomId);
|
||||
expect(livekitService.getRoom).not.toHaveBeenCalled(); // Room doesn't exist
|
||||
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(
|
||||
testRooms.nonExistentWithRecording
|
||||
);
|
||||
expect(mutexService.release).not.toHaveBeenCalled();
|
||||
expect(livekitService.getInProgressRecordingsEgress).toHaveBeenCalledWith(roomId);
|
||||
expect(mutexService.release).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should release lock for a non-existent room with no active recordings', async () => {
|
||||
// Simulate lock exists and is old enough
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
|
||||
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
|
||||
|
||||
// Configure mocks específicos para este test
|
||||
@ -353,8 +418,12 @@ describe('Recording Garbage Collector Tests', () => {
|
||||
|
||||
it('should handle errors during lock release', async () => {
|
||||
// Simulate lock exists and is old enough
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValueOnce(true as never);
|
||||
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((Date.now() - ms('5m')) as never);
|
||||
(mutexService.lockExists as jest.Mock).mockResolvedValue(true as never);
|
||||
jest.useFakeTimers();
|
||||
const now = 1_000_000;
|
||||
jest.setSystemTime(now);
|
||||
|
||||
(mutexService.getLockCreatedAt as jest.Mock).mockResolvedValueOnce((now - ms('5m')) as never);
|
||||
|
||||
// Configure for lock release scenario
|
||||
(livekitService.roomExists as jest.Mock).mockResolvedValueOnce(false as never);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user