openvidu-recording: Refactor "/recordings" endpoint to retrieve recordings directly from S3 without using LiveKit SDK. Improve error handling

This commit is contained in:
juancarmore 2024-08-30 12:09:52 +02:00
parent 1aa3b42427
commit 55778fe01a

View File

@ -3,18 +3,12 @@ import express from "express";
import cors from "cors";
import path from "path";
import { fileURLToPath } from "url";
import {
AccessToken,
EgressClient,
EgressStatus,
EncodedFileOutput,
EncodedFileType,
WebhookReceiver
} from "livekit-server-sdk";
import { AccessToken, EgressClient, EncodedFileOutput, EncodedFileType, WebhookReceiver } from "livekit-server-sdk";
import {
S3Client,
GetObjectCommand,
DeleteObjectCommand,
ListObjectsV2Command,
HeadObjectCommand
} from "@aws-sdk/client-s3";
@ -95,14 +89,13 @@ app.post("/recordings/start", async (req, res) => {
const activeEgresses = await getActiveEgressesByRoom(roomName);
if (activeEgresses.length > 0) {
res.status(400).json({ errorMessage: "Recording already started for this room" });
res.status(409).json({ errorMessage: "Recording already started for this room" });
return;
}
const fileOutput = new EncodedFileOutput({
fileType: EncodedFileType.MP4,
filepath: `${DEFAULT_RECORDINGS_PATH}/{room_name}-{time}`,
disableManifest: true
filepath: `${DEFAULT_RECORDINGS_PATH}/{room_name}-{room_id}-{time}`
});
try {
@ -129,7 +122,7 @@ app.post("/recordings/stop", async (req, res) => {
const activeEgresses = await getActiveEgressesByRoom(roomName);
if (activeEgresses.length === 0) {
res.status(400).json({ errorMessage: "Recording not started for this room" });
res.status(409).json({ errorMessage: "Recording not started for this room" });
return;
}
@ -141,7 +134,6 @@ app.post("/recordings/stop", async (req, res) => {
const recording = {
name: file.filename.split("/").pop(),
startedAt: Number(egressInfo.startedAt) / 1_000_000,
duration: Number(file.duration) / 1_000_000_000,
size: Number(file.size)
};
res.json({ message: "Recording stopped", recording });
@ -162,29 +154,22 @@ const getActiveEgressesByRoom = async (roomName) => {
app.get("/recordings", async (req, res) => {
const roomName = req.query.roomName?.toString();
const roomId = req.query.roomId?.toString();
const command = new ListObjectsV2Command({
Bucket: S3_BUCKET,
Prefix: DEFAULT_RECORDINGS_PATH + "/"
});
try {
const egresses = await egressClient.listEgress({ roomName });
const recordings = [];
for (const egress of egresses) {
if (egress.status === EgressStatus.EGRESS_COMPLETE) {
const file = egress.fileResults[0];
const recordingExist = await checkRecordingExists(file.filename);
if (recordingExist) {
const fileName = file.filename.split("/").pop();
const recording = {
name: fileName,
startedAt: Number(egress.startedAt) / 1_000_000,
duration: Number(file.duration) / 1_000_000_000,
size: Number(file.size)
};
recordings.push(recording);
}
}
}
const { Contents: objects } = await s3Client.send(command);
const keyStart = DEFAULT_RECORDINGS_PATH + "/" + (roomName ? `${roomName}` + (roomId ? `-${roomId}` : "") : "");
const payloadKeys =
objects
?.filter((object) => object.Key.startsWith(keyStart) && object.Key.endsWith(".mp4.json"))
.map((payload) => payload.Key) ?? [];
const recordings = await Promise.all(payloadKeys.map((payloadKey) => getRecordingInfo(payloadKey)));
res.json({ recordings });
} catch (error) {
console.error("Error listing recordings.", error);
@ -192,23 +177,41 @@ app.get("/recordings", async (req, res) => {
}
});
const checkRecordingExists = async (recordingPath) => {
const command = new HeadObjectCommand({
const getRecordingInfo = async (payloadKey) => {
const objectCommand = new GetObjectCommand({
Bucket: S3_BUCKET,
Key: recordingPath
Key: payloadKey
});
const { Body } = await s3Client.send(objectCommand);
const stringifiedData = await Body.transformToString();
const data = JSON.parse(stringifiedData);
try {
await s3Client.send(command);
return true;
} catch (error) {
return false;
}
const recordingKey = payloadKey.replace(".json", "");
const headCommand = new HeadObjectCommand({
Bucket: S3_BUCKET,
Key: recordingKey
});
const { ContentLength: size } = await s3Client.send(headCommand);
const recordingName = recordingKey.split("/").pop();
const recording = {
name: recordingName,
startedAt: Number(data.started_at) / 1000000,
size: size
};
return recording;
};
app.get("/recordings/:recordingName", async (req, res) => {
const { recordingName } = req.params;
const exists = await checkRecordingExists(recordingName);
if (!exists) {
res.status(404).json({ errorMessage: "Recording not found" });
return;
}
const command = new GetObjectCommand({
Bucket: S3_BUCKET,
Key: `${DEFAULT_RECORDINGS_PATH}/${recordingName}`
@ -229,13 +232,26 @@ app.get("/recordings/:recordingName", async (req, res) => {
app.delete("/recordings/:recordingName", async (req, res) => {
const { recordingName } = req.params;
const command = new DeleteObjectCommand({
const exists = await checkRecordingExists(recordingName);
if (!exists) {
res.status(404).json({ errorMessage: "Recording not found" });
return;
}
const deleteRecordingCommand = new DeleteObjectCommand({
Bucket: S3_BUCKET,
Key: `${DEFAULT_RECORDINGS_PATH}/${recordingName}`
});
const deletePayloadCommand = new DeleteObjectCommand({
Bucket: S3_BUCKET,
Key: `${DEFAULT_RECORDINGS_PATH}/${recordingName}.json`
});
try {
await s3Client.send(command);
console.log("Deleting recording:", recordingName);
await Promise.all([s3Client.send(deleteRecordingCommand), s3Client.send(deletePayloadCommand)]);
console.log("Recording deleted:", recordingName);
res.json({ message: "Recording deleted" });
} catch (error) {
console.error("Error deleting recording.", error);
@ -243,6 +259,20 @@ app.delete("/recordings/:recordingName", async (req, res) => {
}
});
const checkRecordingExists = async (recordingName) => {
const headCommand = new HeadObjectCommand({
Bucket: S3_BUCKET,
Key: `${DEFAULT_RECORDINGS_PATH}/${recordingName}`
});
try {
await s3Client.send(headCommand);
return true;
} catch (error) {
return false;
}
};
app.listen(SERVER_PORT, () => {
console.log("Server started on port:", SERVER_PORT);
});