openvidu-recording-improved: Refactor code
This commit is contained in:
parent
c1a90d2aa9
commit
2cc5d505da
@ -1,16 +1,7 @@
|
||||
import { Router } from "express";
|
||||
import { EgressClient, EncodedFileOutput, EncodedFileType } from "livekit-server-sdk";
|
||||
import {
|
||||
LIVEKIT_URL,
|
||||
LIVEKIT_API_KEY,
|
||||
LIVEKIT_API_SECRET,
|
||||
RECORDINGS_PATH,
|
||||
RECORDINGS_METADATA_PATH
|
||||
} from "../config.js";
|
||||
import { S3Service } from "../services/s3.service.js";
|
||||
import { RecordingService } from "../services/recording.service.js";
|
||||
|
||||
const egressClient = new EgressClient(LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
|
||||
const s3Service = new S3Service();
|
||||
const recordingService = new RecordingService();
|
||||
|
||||
export const recordingController = Router();
|
||||
|
||||
@ -22,28 +13,16 @@ recordingController.post("/start", async (req, res) => {
|
||||
return;
|
||||
}
|
||||
|
||||
const activeEgresses = await getActiveEgressesByRoom(roomName);
|
||||
const activeRecording = await recordingService.getActiveRecordingByRoom(roomName);
|
||||
|
||||
// Check if there is already an active egress for this room
|
||||
if (activeEgresses.length > 0) {
|
||||
if (activeRecording) {
|
||||
res.status(409).json({ errorMessage: "Recording already started for this room" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Use the EncodedFileOutput to save the recording to an MP4 file
|
||||
const fileOutput = new EncodedFileOutput({
|
||||
fileType: EncodedFileType.MP4,
|
||||
filepath: `${RECORDINGS_PATH}{room_name}-{room_id}-{time}`,
|
||||
disableManifest: true
|
||||
});
|
||||
|
||||
try {
|
||||
// Start a RoomCompositeEgress to record all participants in the room
|
||||
const egressInfo = await egressClient.startRoomCompositeEgress(roomName, { file: fileOutput });
|
||||
const recording = {
|
||||
name: egressInfo.fileResults[0].filename.split("/").pop(),
|
||||
startedAt: Number(egressInfo.startedAt) / 1_000_000
|
||||
};
|
||||
const recording = recordingService.startRecording(roomName);
|
||||
res.json({ message: "Recording started", recording });
|
||||
} catch (error) {
|
||||
console.error("Error starting recording.", error);
|
||||
@ -59,26 +38,16 @@ recordingController.post("/stop", async (req, res) => {
|
||||
return;
|
||||
}
|
||||
|
||||
const activeEgresses = await getActiveEgressesByRoom(roomName);
|
||||
const activeRecording = await recordingService.getActiveRecordingByRoom(roomName);
|
||||
|
||||
// Check if there is an active egress for this room
|
||||
if (activeEgresses.length === 0) {
|
||||
if (!activeRecording) {
|
||||
res.status(409).json({ errorMessage: "Recording not started for this room" });
|
||||
return;
|
||||
}
|
||||
|
||||
const egressId = activeEgresses[0].egressId;
|
||||
|
||||
try {
|
||||
// Stop the Egress to finish the recording
|
||||
const egressInfo = await egressClient.stopEgress(egressId);
|
||||
const file = egressInfo.fileResults[0];
|
||||
const recording = {
|
||||
name: file.filename.split("/").pop(),
|
||||
startedAt: Number(egressInfo.startedAt) / 1_000_000,
|
||||
duration: Number(file.duration) / 1_000_000_000,
|
||||
size: Number(file.size)
|
||||
};
|
||||
const recording = await recordingService.stopRecording(activeRecording);
|
||||
res.json({ message: "Recording stopped", recording });
|
||||
} catch (error) {
|
||||
console.error("Error stopping recording.", error);
|
||||
@ -91,14 +60,7 @@ recordingController.get("/", async (req, res) => {
|
||||
const roomId = req.query.roomId?.toString();
|
||||
|
||||
try {
|
||||
const keyStart =
|
||||
RECORDINGS_PATH + RECORDINGS_METADATA_PATH + (roomName ? `${roomName}` + (roomId ? `-${roomId}` : "") : "");
|
||||
const keyEnd = ".json";
|
||||
const regex = new RegExp(`^${keyStart}.*${keyEnd}$`);
|
||||
|
||||
// List all Egress metadata files in the recordings path that match the regex
|
||||
const metadataKeys = await s3Service.listObjects(RECORDINGS_PATH + RECORDINGS_METADATA_PATH, regex);
|
||||
const recordings = await Promise.all(metadataKeys.map((metadataKey) => s3Service.getObjectAsJson(metadataKey)));
|
||||
const recordings = await recordingService.listRecordings(roomName, roomId);
|
||||
res.json({ recordings });
|
||||
} catch (error) {
|
||||
console.error("Error listing recordings.", error);
|
||||
@ -108,8 +70,7 @@ recordingController.get("/", async (req, res) => {
|
||||
|
||||
recordingController.get("/:recordingName", async (req, res) => {
|
||||
const { recordingName } = req.params;
|
||||
const key = RECORDINGS_PATH + recordingName;
|
||||
const exists = await s3Service.exists(key);
|
||||
const exists = await recordingService.existsRecording(recordingName);
|
||||
|
||||
if (!exists) {
|
||||
res.status(404).json({ errorMessage: "Recording not found" });
|
||||
@ -118,7 +79,7 @@ recordingController.get("/:recordingName", async (req, res) => {
|
||||
|
||||
try {
|
||||
// Get the recording file from S3
|
||||
const { body, size } = await s3Service.getObject(key);
|
||||
const { body, size } = await recordingService.getRecordingStream(recordingName);
|
||||
|
||||
// Set the response headers
|
||||
res.setHeader("Content-Type", "video/mp4");
|
||||
@ -135,9 +96,7 @@ recordingController.get("/:recordingName", async (req, res) => {
|
||||
|
||||
recordingController.delete("/:recordingName", async (req, res) => {
|
||||
const { recordingName } = req.params;
|
||||
const recordingKey = RECORDINGS_PATH + recordingName;
|
||||
const metadataKey = RECORDINGS_PATH + RECORDINGS_METADATA_PATH + recordingName.replace(".mp4", ".json");
|
||||
const exists = await s3Service.exists(recordingKey);
|
||||
const exists = await recordingService.existsRecording(recordingName);
|
||||
|
||||
if (!exists) {
|
||||
res.status(404).json({ errorMessage: "Recording not found" });
|
||||
@ -145,21 +104,10 @@ recordingController.delete("/:recordingName", async (req, res) => {
|
||||
}
|
||||
|
||||
try {
|
||||
// Delete the recording file and metadata file from S3
|
||||
await Promise.all([s3Service.deleteObject(recordingKey), s3Service.deleteObject(metadataKey)]);
|
||||
await recordingService.deleteRecording(recordingName);
|
||||
res.json({ message: "Recording deleted" });
|
||||
} catch (error) {
|
||||
console.error("Error deleting recording.", error);
|
||||
res.status(500).json({ errorMessage: "Error deleting recording" });
|
||||
}
|
||||
});
|
||||
|
||||
const getActiveEgressesByRoom = async (roomName) => {
|
||||
try {
|
||||
// List all active egresses for the room
|
||||
return await egressClient.listEgress({ roomName, active: true });
|
||||
} catch (error) {
|
||||
console.error("Error listing egresses.", error);
|
||||
return [];
|
||||
}
|
||||
};
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
import { Router } from "express";
|
||||
import { AccessToken } from "livekit-server-sdk";
|
||||
import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET } from "../config.js";
|
||||
import { RoomService } from "../services/room.service.js";
|
||||
|
||||
const roomService = new RoomService();
|
||||
|
||||
export const roomController = Router();
|
||||
|
||||
roomController.post("/", async (req, res) => {
|
||||
const roomName = req.body.roomName;
|
||||
const participantName = req.body.participantName;
|
||||
|
||||
if (!roomName || !participantName) {
|
||||
res.status(400).json({ errorMessage: "roomName and participantName are required" });
|
||||
return;
|
||||
}
|
||||
|
||||
const at = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
|
||||
identity: participantName
|
||||
});
|
||||
const permissions = {
|
||||
room: roomName,
|
||||
roomJoin: true,
|
||||
roomAdmin: true,
|
||||
roomList: true,
|
||||
roomRecord: true
|
||||
};
|
||||
at.addGrant(permissions);
|
||||
const token = await at.toJwt();
|
||||
|
||||
try {
|
||||
// Create room if it doesn't exist
|
||||
const room = await roomService.getRoom(roomName);
|
||||
|
||||
if (!room) {
|
||||
await roomService.createRoom(roomName);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error creating room.", error);
|
||||
res.status(500).json({ errorMessage: "Error creating room" });
|
||||
}
|
||||
|
||||
res.json({ token });
|
||||
});
|
||||
@ -1,18 +1,12 @@
|
||||
import express, { Router } from "express";
|
||||
import { EgressStatus, RoomServiceClient, WebhookReceiver } from "livekit-server-sdk";
|
||||
import {
|
||||
LIVEKIT_URL,
|
||||
LIVEKIT_API_KEY,
|
||||
LIVEKIT_API_SECRET,
|
||||
APP_NAME,
|
||||
RECORDINGS_PATH,
|
||||
RECORDINGS_METADATA_PATH
|
||||
} from "../config.js";
|
||||
import { S3Service } from "../services/s3.service.js";
|
||||
import { WebhookReceiver } from "livekit-server-sdk";
|
||||
import { LIVEKIT_API_KEY, LIVEKIT_API_SECRET, APP_NAME } from "../config.js";
|
||||
import { RoomService } from "../services/room.service.js";
|
||||
import { RecordingService } from "../services/recording.service.js";
|
||||
|
||||
const webhookReceiver = new WebhookReceiver(LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
|
||||
const s3Service = new S3Service();
|
||||
const roomClient = new RoomServiceClient(LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
|
||||
const roomService = new RoomService();
|
||||
const recordingService = new RecordingService();
|
||||
|
||||
export const webhookController = Router();
|
||||
webhookController.use(express.raw({ type: "application/webhook+json" }));
|
||||
@ -29,7 +23,7 @@ webhookController.post("/", async (req, res) => {
|
||||
switch (eventType) {
|
||||
case "egress_started":
|
||||
case "egress_updated":
|
||||
await handleEgressUpdated(egressInfo);
|
||||
await notifyRecordingStatusUpdate(egressInfo);
|
||||
break;
|
||||
case "egress_ended":
|
||||
await handleEgressEnded(egressInfo);
|
||||
@ -49,65 +43,34 @@ const checkWebhookRelatedToMe = async (webhookEvent) => {
|
||||
|
||||
if (!room || !room.metadata) {
|
||||
const roomName = room?.name ?? egressInfo?.roomName ?? ingressInfo?.roomName;
|
||||
const rooms = await roomClient.listRooms([roomName]);
|
||||
roomInfo = await roomService.getRoom(roomName);
|
||||
|
||||
if (rooms.length === 0) {
|
||||
if (!roomInfo) {
|
||||
return false;
|
||||
}
|
||||
|
||||
roomInfo = rooms[0];
|
||||
}
|
||||
|
||||
const metadata = roomInfo.metadata ? JSON.parse(roomInfo.metadata) : null;
|
||||
return metadata?.createdBy === APP_NAME;
|
||||
};
|
||||
|
||||
const handleEgressUpdated = async (egressInfo) => {
|
||||
await updateRecordingStatus(egressInfo);
|
||||
};
|
||||
|
||||
const handleEgressEnded = async (egressInfo) => {
|
||||
const recordingInfo = convertToRecordingInfo(egressInfo);
|
||||
const metadataName = recordingInfo.name.replace(".mp4", ".json");
|
||||
const key = RECORDINGS_PATH + RECORDINGS_METADATA_PATH + metadataName;
|
||||
await s3Service.uploadObject(key, recordingInfo);
|
||||
try {
|
||||
await recordingService.saveRecordingMetadata(egressInfo);
|
||||
} catch (error) {
|
||||
console.error("Error saving recording metadata.", error);
|
||||
}
|
||||
|
||||
await updateRecordingStatus(egressInfo);
|
||||
await notifyRecordingStatusUpdate(egressInfo);
|
||||
};
|
||||
|
||||
const convertToRecordingInfo = (egressInfo) => {
|
||||
const file = egressInfo.fileResults[0];
|
||||
return {
|
||||
name: file.filename.split("/").pop(),
|
||||
startedAt: Number(egressInfo.startedAt) / 1_000_000,
|
||||
duration: Number(file.duration) / 1_000_000_000,
|
||||
size: Number(file.size)
|
||||
};
|
||||
};
|
||||
|
||||
const updateRecordingStatus = async (egressInfo) => {
|
||||
const notifyRecordingStatusUpdate = async (egressInfo) => {
|
||||
const roomName = egressInfo.roomName;
|
||||
const recordingStatus = getRecordingStatus(egressInfo.status);
|
||||
await roomClient.updateRoomMetadata(
|
||||
roomName,
|
||||
JSON.stringify({
|
||||
createdBy: APP_NAME,
|
||||
recordingStatus
|
||||
})
|
||||
);
|
||||
};
|
||||
const recordingStatus = recordingService.getRecordingStatus(egressInfo.status);
|
||||
|
||||
const getRecordingStatus = (egressStatus) => {
|
||||
switch (egressStatus) {
|
||||
case EgressStatus.EGRESS_STARTING:
|
||||
return "STARTING";
|
||||
case EgressStatus.EGRESS_ACTIVE:
|
||||
return "STARTED";
|
||||
case EgressStatus.EGRESS_ENDING:
|
||||
return "STOPPING";
|
||||
case EgressStatus.EGRESS_COMPLETE:
|
||||
return "STOPPED";
|
||||
default:
|
||||
return "FAILED";
|
||||
try {
|
||||
await roomService.updateRoomMetadata(roomName, recordingStatus);
|
||||
} catch (error) {
|
||||
console.error("Error updating room metadata.", error);
|
||||
}
|
||||
};
|
||||
|
||||
@ -3,8 +3,8 @@ import express from "express";
|
||||
import cors from "cors";
|
||||
import path from "path";
|
||||
import { fileURLToPath } from "url";
|
||||
import { AccessToken, RoomServiceClient } from "livekit-server-sdk";
|
||||
import { LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET, SERVER_PORT, APP_NAME } from "./config.js";
|
||||
import { SERVER_PORT } from "./config.js";
|
||||
import { roomController } from "./controllers/room.controller.js";
|
||||
import { recordingController } from "./controllers/recording.controller.js";
|
||||
import { webhookController } from "./controllers/webhook.controller.js";
|
||||
|
||||
@ -18,51 +18,10 @@ const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
app.use(express.static(path.join(__dirname, "../public")));
|
||||
|
||||
app.use("/token", roomController);
|
||||
app.use("/recordings", recordingController);
|
||||
app.use("/livekit/webhook", webhookController);
|
||||
|
||||
app.post("/token", async (req, res) => {
|
||||
const roomName = req.body.roomName;
|
||||
const participantName = req.body.participantName;
|
||||
|
||||
if (!roomName || !participantName) {
|
||||
res.status(400).json({ errorMessage: "roomName and participantName are required" });
|
||||
return;
|
||||
}
|
||||
|
||||
const at = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
|
||||
identity: participantName
|
||||
});
|
||||
const permissions = {
|
||||
room: roomName,
|
||||
roomJoin: true,
|
||||
roomAdmin: true,
|
||||
roomList: true,
|
||||
roomRecord: true
|
||||
};
|
||||
at.addGrant(permissions);
|
||||
const token = await at.toJwt();
|
||||
|
||||
const roomClient = new RoomServiceClient(LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
|
||||
|
||||
// Check if room already exists
|
||||
const rooms = await roomClient.listRooms([roomName]);
|
||||
|
||||
// Create room if it doesn't exist
|
||||
if (rooms.length === 0) {
|
||||
const roomOptions = {
|
||||
name: roomName,
|
||||
metadata: JSON.stringify({
|
||||
createdBy: APP_NAME,
|
||||
recordingStatus: "STOPPED"
|
||||
})
|
||||
};
|
||||
await roomClient.createRoom(roomOptions);
|
||||
}
|
||||
|
||||
res.json({ token });
|
||||
});
|
||||
|
||||
app.listen(SERVER_PORT, () => {
|
||||
console.log("Server started on port:", SERVER_PORT);
|
||||
});
|
||||
|
||||
@ -0,0 +1,109 @@
|
||||
import { EgressClient, EgressStatus, EncodedFileOutput, EncodedFileType } from "livekit-server-sdk";
|
||||
import { LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET, RECORDINGS_PATH, RECORDINGS_METADATA_PATH } from "../config.js";
|
||||
import { S3Service } from "./s3.service.js";
|
||||
|
||||
const s3Service = new S3Service();
|
||||
|
||||
export class RecordingService {
|
||||
static instance;
|
||||
|
||||
constructor() {
|
||||
if (RecordingService.instance) {
|
||||
return RecordingService.instance;
|
||||
}
|
||||
|
||||
this.egressClient = new EgressClient(LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
|
||||
RecordingService.instance = this;
|
||||
return this;
|
||||
}
|
||||
|
||||
async startRecording(roomName) {
|
||||
// Use the EncodedFileOutput to save the recording to an MP4 file
|
||||
const fileOutput = new EncodedFileOutput({
|
||||
fileType: EncodedFileType.MP4,
|
||||
filepath: `${RECORDINGS_PATH}{room_name}-{room_id}-{time}`,
|
||||
disableManifest: true
|
||||
});
|
||||
// Start a RoomCompositeEgress to record all participants in the room
|
||||
const egressInfo = await this.egressClient.startRoomCompositeEgress(roomName, { file: fileOutput });
|
||||
return this.convertToRecordingInfo(egressInfo);
|
||||
}
|
||||
|
||||
async stopRecording(recordingId) {
|
||||
// Stop the Egress to finish the recording
|
||||
const egressInfo = await this.egressClient.stopEgress(recordingId);
|
||||
return this.convertToRecordingInfo(egressInfo);
|
||||
}
|
||||
|
||||
async listRecordings(roomName, roomId) {
|
||||
const keyStart =
|
||||
RECORDINGS_PATH + RECORDINGS_METADATA_PATH + (roomName ? `${roomName}` + (roomId ? `-${roomId}` : "") : "");
|
||||
const keyEnd = ".json";
|
||||
const regex = new RegExp(`^${keyStart}.*${keyEnd}$`);
|
||||
|
||||
// List all Egress metadata files in the recordings path that match the regex
|
||||
const metadataKeys = await s3Service.listObjects(RECORDINGS_PATH + RECORDINGS_METADATA_PATH, regex);
|
||||
const recordings = await Promise.all(metadataKeys.map((metadataKey) => s3Service.getObjectAsJson(metadataKey)));
|
||||
return recordings;
|
||||
}
|
||||
|
||||
async getActiveRecordingByRoom(roomName) {
|
||||
try {
|
||||
// List all active egresses for the room
|
||||
const egresses = await this.egressClient.listEgress({ roomName, active: true });
|
||||
return egresses.length > 0 ? egresses[0].egressId : null;
|
||||
} catch (error) {
|
||||
console.error("Error listing egresses.", error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async getRecordingStream(recordingName) {
|
||||
const key = RECORDINGS_PATH + recordingName;
|
||||
return s3Service.getObject(key);
|
||||
}
|
||||
|
||||
async existsRecording(recordingName) {
|
||||
const key = RECORDINGS_PATH + recordingName;
|
||||
return s3Service.exists(key);
|
||||
}
|
||||
|
||||
async deleteRecording(recordingName) {
|
||||
const recordingKey = RECORDINGS_PATH + recordingName;
|
||||
const metadataKey = RECORDINGS_PATH + RECORDINGS_METADATA_PATH + recordingName.replace(".mp4", ".json");
|
||||
// Delete the recording file and metadata file from S3
|
||||
await Promise.all([s3Service.deleteObject(recordingKey), s3Service.deleteObject(metadataKey)]);
|
||||
}
|
||||
|
||||
async saveRecordingMetadata(egressInfo) {
|
||||
const recordingInfo = this.convertToRecordingInfo(egressInfo);
|
||||
const metadataName = recordingInfo.name.replace(".mp4", ".json");
|
||||
const key = RECORDINGS_PATH + RECORDINGS_METADATA_PATH + metadataName;
|
||||
await s3Service.uploadObject(key, recordingInfo);
|
||||
}
|
||||
|
||||
convertToRecordingInfo(egressInfo) {
|
||||
const file = egressInfo.fileResults[0];
|
||||
return {
|
||||
name: file.filename.split("/").pop(),
|
||||
startedAt: Number(egressInfo.startedAt) / 1_000_000,
|
||||
duration: Number(file.duration) / 1_000_000_000,
|
||||
size: Number(file.size)
|
||||
};
|
||||
}
|
||||
|
||||
getRecordingStatus(egressStatus) {
|
||||
switch (egressStatus) {
|
||||
case EgressStatus.EGRESS_STARTING:
|
||||
return "STARTING";
|
||||
case EgressStatus.EGRESS_ACTIVE:
|
||||
return "STARTED";
|
||||
case EgressStatus.EGRESS_ENDING:
|
||||
return "STOPPING";
|
||||
case EgressStatus.EGRESS_COMPLETE:
|
||||
return "STOPPED";
|
||||
default:
|
||||
return "FAILED";
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,40 @@
|
||||
import { RoomServiceClient } from "livekit-server-sdk";
|
||||
import { LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET, APP_NAME } from "../config.js";
|
||||
|
||||
export class RoomService {
|
||||
static instance;
|
||||
|
||||
constructor() {
|
||||
if (RoomService.instance) {
|
||||
return RoomService.instance;
|
||||
}
|
||||
|
||||
this.roomClient = new RoomServiceClient(LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
|
||||
RoomService.instance = this;
|
||||
return this;
|
||||
}
|
||||
|
||||
async createRoom(roomName) {
|
||||
const roomOptions = {
|
||||
name: roomName,
|
||||
metadata: JSON.stringify({
|
||||
createdBy: APP_NAME,
|
||||
recordingStatus: "STOPPED"
|
||||
})
|
||||
};
|
||||
return this.roomClient.createRoom(roomOptions);
|
||||
}
|
||||
|
||||
async getRoom(roomName) {
|
||||
const rooms = await this.roomClient.listRooms([roomName]);
|
||||
return rooms.length > 0 ? rooms[0] : null;
|
||||
}
|
||||
|
||||
async updateRoomMetadata(roomName, recordingStatus) {
|
||||
const metadata = {
|
||||
createdBy: APP_NAME,
|
||||
recordingStatus
|
||||
};
|
||||
return this.roomClient.updateRoomMetadata(roomName, JSON.stringify(metadata));
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user