Use stream.pipe instead of transporting the bytes manually. Hopefully this improves playback.

This commit is contained in:
vexorian 2020-08-20 13:06:30 -04:00
parent f80d763e3c
commit 55c22846bf
5 changed files with 34 additions and 88 deletions

View File

@ -32,10 +32,10 @@ class FFMPEG extends events.EventEmitter {
this.volumePercent = this.opts.audioVolumePercent;
}
async spawnConcat(streamUrl) {
this.spawn(streamUrl, undefined, undefined, undefined, true, false, undefined, true)
return await this.spawn(streamUrl, undefined, undefined, undefined, true, false, undefined, true)
}
async spawnStream(streamUrl, streamStats, startTime, duration, enableIcon, type) {
this.spawn(streamUrl, streamStats, startTime, duration, true, enableIcon, type, false);
return await this.spawn(streamUrl, streamStats, startTime, duration, true, enableIcon, type, false);
}
async spawnError(title, subtitle, duration) {
if (! this.opts.enableFFMPEGTranscoding || this.opts.errorScreen == 'kill') {
@ -54,7 +54,7 @@ class FFMPEG extends events.EventEmitter {
videoHeight : this.wantedH,
duration : duration,
};
this.spawn({ errorTitle: title , subtitle: subtitle }, streamStats, undefined, `${streamStats.duration}ms`, true, false, 'error', false)
return await this.spawn({ errorTitle: title , subtitle: subtitle }, streamStats, undefined, `${streamStats.duration}ms`, true, false, 'error', false)
}
async spawnOffline(duration) {
if (! this.opts.enableFFMPEGTranscoding) {
@ -68,7 +68,7 @@ class FFMPEG extends events.EventEmitter {
videoHeight : this.wantedH,
duration : duration,
};
this.spawn( {errorTitle: 'offline'}, streamStats, undefined, `${duration}ms`, true, false, 'offline', false);
return await this.spawn( {errorTitle: 'offline'}, streamStats, undefined, `${duration}ms`, true, false, 'offline', false);
}
async spawn(streamUrl, streamStats, startTime, duration, limitRead, enableIcon, type, isConcatPlaylist) {
@ -342,10 +342,7 @@ class FFMPEG extends events.EventEmitter {
let doLogs = this.opts.logFfmpeg && !isConcatPlaylist;
this.ffmpeg = spawn(this.ffmpegPath, ffmpegArgs, { stdio: ['ignore', 'pipe', (doLogs?process.stderr:"ignore") ] } );
this.ffmpeg.stdout.on('data', (chunk) => {
this.sentData = true;
this.emit('data', chunk)
})
this.ffmpeg.on('close', (code) => {
if (code === null) {
this.emit('close', code)
@ -360,6 +357,7 @@ class FFMPEG extends events.EventEmitter {
this.emit('error', { code: code, cmd: `${this.opts.ffmpegPath} ${ffmpegArgs.join(' ')}` })
}
})
return this.ffmpeg.stdout;
}
kill() {
if (typeof this.ffmpeg != "undefined") {

View File

@ -25,21 +25,20 @@ class OfflinePlayer {
this.ffmpeg.kill();
}
async play() {
async play(outStream) {
try {
let emitter = new EventEmitter();
let ffmpeg = this.ffmpeg;
let lineupItem = this.context.lineupItem;
let duration = lineupItem.streamDuration - lineupItem.start;
let ff;
if (this.error) {
ffmpeg.spawnError(duration);
ff = await ffmpeg.spawnError(duration);
} else {
ffmpeg.spawnOffline(duration);
ff = await ffmpeg.spawnOffline(duration);
}
ff.pipe(outStream);
ffmpeg.on('data', (data) => {
emitter.emit('data', data);
});
ffmpeg.on('end', () => {
emitter.emit('end');
});

View File

@ -41,7 +41,7 @@ class PlexPlayer {
}
}
async play() {
async play(outStream) {
let lineupItem = this.context.lineupItem;
let ffmpegSettings = this.context.ffmpegSettings;
let db = this.context.db;
@ -73,13 +73,12 @@ class PlexPlayer {
let emitter = new EventEmitter();
//setTimeout( () => {
ffmpeg.spawnStream(stream.streamUrl, stream.streamStats, streamStart, streamDuration, enableChannelIcon, lineupItem.type); // Spawn the ffmpeg process
let ff = await ffmpeg.spawnStream(stream.streamUrl, stream.streamStats, streamStart, streamDuration, enableChannelIcon, lineupItem.type); // Spawn the ffmpeg process
ff.pipe(outStream);
//}, 100);
plexTranscoder.startUpdatingPlex();
ffmpeg.on('data', (data) => {
emitter.emit('data', data);
});
ffmpeg.on('end', () => {
emitter.emit('end');
});

View File

@ -58,24 +58,13 @@ class ProgramPlayer {
this.delegate.cleanUp();
}
async playDelegate() {
async playDelegate(outStream) {
return await new Promise( async (accept, reject) => {
setTimeout( () => {
reject( Error("program player timed out before receiving any data.") );
}, 30000);
try {
let stream = await this.delegate.play();
let first = true;
let stream = await this.delegate.play(outStream);
accept(stream);
let emitter = new EventEmitter();
stream.on("data", (data) => {
if (first) {
accept( {stream: emitter, data: data} );
first = false;
} else {
emitter.emit("data", data);
}
});
function end() {
reject( Error("Stream ended with no data") );
stream.removeAllListeners("data");
@ -95,9 +84,9 @@ class ProgramPlayer {
}
})
}
async play() {
async play(outStream) {
try {
return await this.playDelegate();
return await this.playDelegate(outStream);
} catch(err) {
if (! (err instanceof Error) ) {
err= Error("Program player had an error before receiving any data. " + JSON.stringify(err) );
@ -115,7 +104,7 @@ class ProgramPlayer {
}
this.delegate.cleanUp();
this.delegate = new OfflinePlayer(true, this.context);
return await this.play();
return await this.play(outStream);
}
}
}

View File

@ -71,7 +71,6 @@ function video(db) {
console.log(`\r\nStream starting. Channel: ${channel.number} (${channel.name})`)
let lastWrite = (new Date()).getTime();
let ffmpeg = new FFMPEG(ffmpegSettings, channel); // Set the transcoder options
let stopped = false;
@ -84,28 +83,9 @@ function video(db) {
ffmpeg.kill();
}
}
let watcher = () => {
let t1 = (new Date()).getTime();
if (t1 - lastWrite >= 30000) {
console.log("Client timed out, stop stream.");
//way too long without writes, time out
stop();
}
if (! stopped) {
setTimeout(watcher, 5000);
}
};
setTimeout(watcher, 5000);
ffmpeg.on('data', (data) => {
if (! stopped) {
lastWrite = (new Date()).getTime();
res.write(data)
}
})
ffmpeg.on('error', (err) => {
console.error("FFMPEG ERROR", err);
//status was already sent
@ -126,7 +106,8 @@ function video(db) {
})
let channelNum = parseInt(req.query.channel, 10)
ffmpeg.spawnConcat(`http://localhost:${process.env.PORT}/playlist?channel=${channelNum}`);
let ff = await ffmpeg.spawnConcat(`http://localhost:${process.env.PORT}/playlist?channel=${channelNum}`);
ff.pipe(res);
})
// Stream individual video to ffmpeg concat above. This is used by the server, NOT the client
router.get('/stream', async (req, res) => {
@ -242,8 +223,11 @@ function video(db) {
}
};
var playerObj = null;
res.writeHead(200, {
'Content-Type': 'video/mp2t'
});
try {
playerObj = await player.play();
playerObj = await player.play(res);
} catch (err) {
console.log("Error when attempting to play video: " +err.stack);
try {
@ -254,38 +238,15 @@ function video(db) {
stop();
return;
}
let lastWrite = (new Date()).getTime();
let watcher = () => {
let t1 = (new Date()).getTime();
if (t1 - lastWrite >= 30000) {
console.log("Demux ffmpeg timed out, stop stream.");
//way too long without writes, time out
stop();
}
if (! stopped) {
setTimeout(watcher, 5000);
}
};
setTimeout(watcher, 5000);
let stream = playerObj.stream;
res.writeHead(200, {
'Content-Type': 'video/mp2t'
});
res.write(playerObj.data);
let stream = playerObj;
//res.write(playerObj.data);
stream.on("data", (data) => {
try {
if (! stopped) {
lastWrite = (new Date()).getTime();
res.write(data);
}
} catch (err) {
console.log("I/O Error: " + err.stack);
stop();
}
});
stream.on("end", () => {
stop();
});