From 55c22846bfbe148fcd409f11b36212cbf6bab13f Mon Sep 17 00:00:00 2001 From: vexorian Date: Thu, 20 Aug 2020 13:06:30 -0400 Subject: [PATCH] Use stream.pipe instead of transporting the bytes manually. Hopefully this improves playback. --- src/ffmpeg.js | 14 +++++----- src/offline-player.js | 11 ++++---- src/plex-player.js | 9 +++---- src/program-player.js | 25 +++++------------ src/video.js | 63 +++++++++---------------------------------- 5 files changed, 34 insertions(+), 88 deletions(-) diff --git a/src/ffmpeg.js b/src/ffmpeg.js index 0d0d4bf..de332c3 100644 --- a/src/ffmpeg.js +++ b/src/ffmpeg.js @@ -32,10 +32,10 @@ class FFMPEG extends events.EventEmitter { this.volumePercent = this.opts.audioVolumePercent; } async spawnConcat(streamUrl) { - this.spawn(streamUrl, undefined, undefined, undefined, true, false, undefined, true) + return await this.spawn(streamUrl, undefined, undefined, undefined, true, false, undefined, true) } async spawnStream(streamUrl, streamStats, startTime, duration, enableIcon, type) { - this.spawn(streamUrl, streamStats, startTime, duration, true, enableIcon, type, false); + return await this.spawn(streamUrl, streamStats, startTime, duration, true, enableIcon, type, false); } async spawnError(title, subtitle, duration) { if (! this.opts.enableFFMPEGTranscoding || this.opts.errorScreen == 'kill') { @@ -54,7 +54,7 @@ class FFMPEG extends events.EventEmitter { videoHeight : this.wantedH, duration : duration, }; - this.spawn({ errorTitle: title , subtitle: subtitle }, streamStats, undefined, `${streamStats.duration}ms`, true, false, 'error', false) + return await this.spawn({ errorTitle: title , subtitle: subtitle }, streamStats, undefined, `${streamStats.duration}ms`, true, false, 'error', false) } async spawnOffline(duration) { if (! this.opts.enableFFMPEGTranscoding) { @@ -68,7 +68,7 @@ class FFMPEG extends events.EventEmitter { videoHeight : this.wantedH, duration : duration, }; - this.spawn( {errorTitle: 'offline'}, streamStats, undefined, `${duration}ms`, true, false, 'offline', false); + return await this.spawn( {errorTitle: 'offline'}, streamStats, undefined, `${duration}ms`, true, false, 'offline', false); } async spawn(streamUrl, streamStats, startTime, duration, limitRead, enableIcon, type, isConcatPlaylist) { @@ -342,10 +342,7 @@ class FFMPEG extends events.EventEmitter { let doLogs = this.opts.logFfmpeg && !isConcatPlaylist; this.ffmpeg = spawn(this.ffmpegPath, ffmpegArgs, { stdio: ['ignore', 'pipe', (doLogs?process.stderr:"ignore") ] } ); - this.ffmpeg.stdout.on('data', (chunk) => { - this.sentData = true; - this.emit('data', chunk) - }) + this.ffmpeg.on('close', (code) => { if (code === null) { this.emit('close', code) @@ -360,6 +357,7 @@ class FFMPEG extends events.EventEmitter { this.emit('error', { code: code, cmd: `${this.opts.ffmpegPath} ${ffmpegArgs.join(' ')}` }) } }) + return this.ffmpeg.stdout; } kill() { if (typeof this.ffmpeg != "undefined") { diff --git a/src/offline-player.js b/src/offline-player.js index 00d5b20..4cd0361 100644 --- a/src/offline-player.js +++ b/src/offline-player.js @@ -25,21 +25,20 @@ class OfflinePlayer { this.ffmpeg.kill(); } - async play() { + async play(outStream) { try { let emitter = new EventEmitter(); let ffmpeg = this.ffmpeg; let lineupItem = this.context.lineupItem; let duration = lineupItem.streamDuration - lineupItem.start; + let ff; if (this.error) { - ffmpeg.spawnError(duration); + ff = await ffmpeg.spawnError(duration); } else { - ffmpeg.spawnOffline(duration); + ff = await ffmpeg.spawnOffline(duration); } + ff.pipe(outStream); - ffmpeg.on('data', (data) => { - emitter.emit('data', data); - }); ffmpeg.on('end', () => { emitter.emit('end'); }); diff --git a/src/plex-player.js b/src/plex-player.js index d1c4e59..9ceb4ea 100644 --- a/src/plex-player.js +++ b/src/plex-player.js @@ -41,7 +41,7 @@ class PlexPlayer { } } - async play() { + async play(outStream) { let lineupItem = this.context.lineupItem; let ffmpegSettings = this.context.ffmpegSettings; let db = this.context.db; @@ -73,13 +73,12 @@ class PlexPlayer { let emitter = new EventEmitter(); //setTimeout( () => { - ffmpeg.spawnStream(stream.streamUrl, stream.streamStats, streamStart, streamDuration, enableChannelIcon, lineupItem.type); // Spawn the ffmpeg process + let ff = await ffmpeg.spawnStream(stream.streamUrl, stream.streamStats, streamStart, streamDuration, enableChannelIcon, lineupItem.type); // Spawn the ffmpeg process + ff.pipe(outStream); //}, 100); plexTranscoder.startUpdatingPlex(); - ffmpeg.on('data', (data) => { - emitter.emit('data', data); - }); + ffmpeg.on('end', () => { emitter.emit('end'); }); diff --git a/src/program-player.js b/src/program-player.js index b3b727b..30e1283 100644 --- a/src/program-player.js +++ b/src/program-player.js @@ -58,24 +58,13 @@ class ProgramPlayer { this.delegate.cleanUp(); } - async playDelegate() { + async playDelegate(outStream) { return await new Promise( async (accept, reject) => { - setTimeout( () => { - reject( Error("program player timed out before receiving any data.") ); - }, 30000); - + try { - let stream = await this.delegate.play(); - let first = true; + let stream = await this.delegate.play(outStream); + accept(stream); let emitter = new EventEmitter(); - stream.on("data", (data) => { - if (first) { - accept( {stream: emitter, data: data} ); - first = false; - } else { - emitter.emit("data", data); - } - }); function end() { reject( Error("Stream ended with no data") ); stream.removeAllListeners("data"); @@ -95,9 +84,9 @@ class ProgramPlayer { } }) } - async play() { + async play(outStream) { try { - return await this.playDelegate(); + return await this.playDelegate(outStream); } catch(err) { if (! (err instanceof Error) ) { err= Error("Program player had an error before receiving any data. " + JSON.stringify(err) ); @@ -115,7 +104,7 @@ class ProgramPlayer { } this.delegate.cleanUp(); this.delegate = new OfflinePlayer(true, this.context); - return await this.play(); + return await this.play(outStream); } } } diff --git a/src/video.js b/src/video.js index 7899d3f..5ffd2ae 100644 --- a/src/video.js +++ b/src/video.js @@ -71,7 +71,6 @@ function video(db) { console.log(`\r\nStream starting. Channel: ${channel.number} (${channel.name})`) - let lastWrite = (new Date()).getTime(); let ffmpeg = new FFMPEG(ffmpegSettings, channel); // Set the transcoder options let stopped = false; @@ -84,28 +83,9 @@ function video(db) { ffmpeg.kill(); } } - let watcher = () => { - let t1 = (new Date()).getTime(); - if (t1 - lastWrite >= 30000) { - console.log("Client timed out, stop stream."); - //way too long without writes, time out - stop(); - } - if (! stopped) { - setTimeout(watcher, 5000); - } - }; - setTimeout(watcher, 5000); - ffmpeg.on('data', (data) => { - if (! stopped) { - lastWrite = (new Date()).getTime(); - res.write(data) - } - }) - ffmpeg.on('error', (err) => { console.error("FFMPEG ERROR", err); //status was already sent @@ -126,7 +106,8 @@ function video(db) { }) let channelNum = parseInt(req.query.channel, 10) - ffmpeg.spawnConcat(`http://localhost:${process.env.PORT}/playlist?channel=${channelNum}`); + let ff = await ffmpeg.spawnConcat(`http://localhost:${process.env.PORT}/playlist?channel=${channelNum}`); + ff.pipe(res); }) // Stream individual video to ffmpeg concat above. This is used by the server, NOT the client router.get('/stream', async (req, res) => { @@ -242,8 +223,11 @@ function video(db) { } }; var playerObj = null; + res.writeHead(200, { + 'Content-Type': 'video/mp2t' + }); try { - playerObj = await player.play(); + playerObj = await player.play(res); } catch (err) { console.log("Error when attempting to play video: " +err.stack); try { @@ -254,38 +238,15 @@ function video(db) { stop(); return; } - let lastWrite = (new Date()).getTime(); - let watcher = () => { - let t1 = (new Date()).getTime(); - if (t1 - lastWrite >= 30000) { - console.log("Demux ffmpeg timed out, stop stream."); - //way too long without writes, time out - stop(); - } - if (! stopped) { - setTimeout(watcher, 5000); - } - }; - setTimeout(watcher, 5000); - let stream = playerObj.stream; - res.writeHead(200, { - 'Content-Type': 'video/mp2t' - }); - res.write(playerObj.data); + let stream = playerObj; + + + + //res.write(playerObj.data); + - stream.on("data", (data) => { - try { - if (! stopped) { - lastWrite = (new Date()).getTime(); - res.write(data); - } - } catch (err) { - console.log("I/O Error: " + err.stack); - stop(); - } - }); stream.on("end", () => { stop(); });