#347 On-demand channels.

This commit is contained in:
vexorian 2021-08-06 11:39:11 -04:00
parent 1002b0dc76
commit 5d072b76bb
12 changed files with 551 additions and 11 deletions

View File

@ -23,6 +23,10 @@ const FillerDB = require("./src/dao/filler-db");
const CustomShowDB = require("./src/dao/custom-show-db");
const TVGuideService = require("./src/services/tv-guide-service");
const EventService = require("./src/services/event-service");
const OnDemandService = require("./src/services/on-demand-service");
const ProgrammingService = require("./src/services/programming-service");
const ActiveChannelService = require('./src/services/active-channel-service')
const onShutdown = require("node-graceful-shutdown").onShutdown;
console.log(
@ -171,6 +175,10 @@ let xmltvInterval = {
xmltvInterval.updateXML()
xmltvInterval.startInterval()
onDemandService = new OnDemandService(channelCache, channelDB, xmltvInterval);
programmingService = new ProgrammingService(onDemandService);
activeChannelService = new ActiveChannelService(onDemandService, channelCache, channelDB);
let hdhr = HDHR(db, channelDB)
let app = express()
eventService.setup(app);
@ -208,10 +216,10 @@ app.use('/favicon.svg', express.static(
app.use('/custom.css', express.static(path.join(process.env.DATABASE, 'custom.css')))
// API Routers
app.use(api.router(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService ))
app.use(api.router(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService, onDemandService, activeChannelService ))
app.use('/api/cache/images', cacheImageService.apiRouters())
app.use(video.router( channelDB, fillerDB, db))
app.use(video.router( channelDB, fillerDB, db, programmingService, activeChannelService ))
app.use(hdhr.router)
app.listen(process.env.PORT, () => {
console.log(`HTTP server running on port: http://*:${process.env.PORT}`)
@ -304,4 +312,7 @@ onShutdown("log" , [], async() => {
onShutdown("xmltv-writer" , [], async() => {
await xmltv.shutdown();
} );
onShutdown("active-channels", [], async() => {
await activeChannelService.shutdown();
} );

View File

@ -26,7 +26,7 @@ function safeString(object) {
}
module.exports = { router: api }
function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService ) {
function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService, onDemandService, activeChannelService ) {
let m3uService = _m3uService;
const router = express.Router()
const plexServerDB = new PlexServerDB(channelDB, channelCache, fillerDB, customShowDB, db);
@ -331,10 +331,12 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
res.status(500).send("error");
}
})
// we urgently need an actual channel service
router.post('/api/channel', async (req, res) => {
try {
await m3uService.clearCache();
cleanUpChannel(req.body);
onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) );
await channelDB.saveChannel( req.body.number, req.body );
channelCache.clear();
res.send( { number: req.body.number} )
@ -348,6 +350,7 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
try {
await m3uService.clearCache();
cleanUpChannel(req.body);
onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) );
await channelDB.saveChannel( req.body.number, req.body );
channelCache.clear();
res.send( { number: req.body.number} )

View File

@ -40,6 +40,7 @@ async function getAllChannels(channelDB) {
function saveChannelConfig(number, channel ) {
configCache[number] = [channel];
delete cache[number];
}
function getCurrentLineupItem(channelId, t1) {
@ -153,6 +154,7 @@ module.exports = {
clear: clear,
getProgramLastPlayTime: getProgramLastPlayTime,
getAllChannels: getAllChannels,
getAllNumbers: getAllNumbers,
getChannelConfig: getChannelConfig,
saveChannelConfig: saveChannelConfig,
getFillerLastPlayTime: getFillerLastPlayTime,

View File

@ -5,5 +5,22 @@ module.exports = {
TVGUIDE_MAXIMUM_FLEX_DURATION : 6 * 60 * 60 * 1000,
TOO_FREQUENT: 100,
// if a channel is stopped while something is playing, subtract
// this amount of milliseconds from the last-played timestamp, because
// video playback has latency and also because maybe the user wants
// the last 30 seconds to remember what was going on...
FORGETFULNESS_BUFFER: 30 * 1000,
// When a channel stops playing, this is a grace period before the channel is
// considered offline. It could be that the client halted the playback for some
// reason and is about to start playing again. Or maybe the user switched
// devices or something. Otherwise we would have on-demand channels constantly
// reseting on their own.
MAX_CHANNEL_IDLE: 60*1000,
// there's a timer that checks all active channels to see if they really are
// staying active, it checks every 5 seconds
PLAYED_MONITOR_CHECK_FREQUENCY: 5*1000,
VERSION_NAME: "1.5.0-development"
}

View File

@ -0,0 +1,143 @@
const constants = require("../constants");
/* Keeps track of which channels are being played, calls on-demand service
when they stop playing.
*/
class ActiveChannelService
{
/****
*
**/
constructor(onDemandService, channelCache, channelDB) {
console.log("DEFINE THIS.CACHE");
this.cache = {};
this.channelDB = channelDB;
this.onDemandService = onDemandService;
this.channelCache = channelCache;
this.timeNoDelta = new Date().getTime();
this.loadChannelsForFirstTry();
this.setupTimer();
}
loadChannelsForFirstTry() {
let fun = async() => {
try {
let numbers = await this.channelCache.getAllNumbers(this.channelDB);
numbers.forEach( (number) => {
this.ensure(this.timeNoDelta, number);
} );
this.checkChannels();
} catch (err) {
console.error("Unexpected error when checking channels for the first time.", err);
}
}
fun();
}
async shutdown() {
try {
let t = new Date().getTime() - constants.FORGETFULNESS_BUFFER;
for (const [channelNumber, value] of Object.entries(this.cache)) {
console.log("Forcefully registering channel " + channelNumber + " as stopped...");
delete this.cache[ channelNumber ];
await this.onDemandService.registerChannelStopped( channelNumber, t , true);
}
} catch (err) {
console.error("Unexpected error when shutting down active channels service.", err);
}
}
setupTimer() {
this.handle = setTimeout( () => this.timerLoop(), constants.PLAYED_MONITOR_CHECK_FREQUENCY );
}
checkChannel(t, channelNumber, value) {
if (value.active === 0) {
let delta = t - value.lastUpdate;
if ( (delta >= constants.MAX_CHANNEL_IDLE) || (value.lastUpdate <= this.timeNoDelta) ) {
console.log("Channel : " + channelNumber + " is not playing...");
onDemandService.registerChannelStopped(channelNumber, value.stopTime);
delete this.cache[channelNumber];
}
}
}
checkChannels() {
let t = new Date().getTime();
for (const [channelNumber, value] of Object.entries(this.cache)) {
this.checkChannel(t, channelNumber, value);
}
}
timerLoop() {
try {
this.checkChannels();
} catch (err) {
console.error("There was an error in active channel timer loop", err);
} finally {
this.setupTimer();
}
}
registerChannelActive(t, channelNumber) {
this.ensure(t, channelNumber);
console.log("Register that channel is being played: " + channelNumber );
this.cache[channelNumber].active++;
this.cache[channelNumber].stopTime = 0;
this.cache[channelNumber].lastUpdate = new Date().getTime();
}
registerChannelStopped(t, channelNumber) {
this.ensure(t, channelNumber);
console.log("Register that channel is no longer being played: " + channelNumber );
if (this.cache[channelNumber].active === 0) {
console.error("Serious issue with channel active service, double delete");
} else {
this.cache[channelNumber].active--;
this.cache[channelNumber].stopTime = t;
this.cache[channelNumber].lastUpdate = new Date().getTime();
}
}
ensure(t, channelNumber) {
if (typeof(this.cache[channelNumber]) === 'undefined') {
this.cache[channelNumber] = {
active: 0,
stopTime: t,
lastUpdate: t,
}
}
}
peekChannel(t, channelNumber) {
this.ensure(t, channelNumber);
}
isActiveWrapped(channelNumber) {
if (typeof(this.cache[channelNumber]) === 'undefined') {
return false;
}
if (typeof(this.cache[channelNumber].active) !== 'number') {
return false;
}
return (this.cache[channelNumber].active !== 0);
}
isActive(channelNumber) {
let bol = this.isActiveWrapped(channelNumber);
console.log( "channelNumber = " + channelNumber + " active? " + bol);
return bol;
}
}
module.exports = ActiveChannelService

View File

@ -0,0 +1,218 @@
const constants = require("../constants");
const SLACK = constants.SLACK;
class OnDemandService
{
/****
*
**/
constructor(channelCache, channelDB, xmltvInterval) {
this.channelCache = channelCache;
this.channelDB = channelDB;
this.xmltvInterval = xmltvInterval;
}
activateChannelIfNeeded(moment, channel) {
if ( this.isOnDemandChannelPaused(channel) ) {
channel = this.resumeOnDemandChannel(moment, channel);
this.updateChannelAsync(channel);
}
return channel;
}
async registerChannelStopped(channelNumber, stopTime, waitForSave) {
try {
let channel = await this.channelCache.getChannelConfig( this.channelDB, channelNumber);
if (channel.length === 0) {
console.error("Could not stop channel " + channelNumber + " because it apparently no longer exists"); // I guess if someone deletes the channel just in the grace period?
return
}
channel = channel[0];
if ( (typeof(channel.onDemand) !== 'undefined') && channel.onDemand.isOnDemand && ! channel.onDemand.paused) {
//pause the channel
channel = this.pauseOnDemandChannel( channel , stopTime );
if (waitForSave) {
await this.updateChannelSync(channel);
} else {
this.updateChannelAsync(channel);
}
}
} catch (err) {
console.error("Error stopping channel", err);
}
}
updateXmltv() {
this.xmltvInterval.updateXML()
this.xmltvInterval.restartInterval()
}
pauseOnDemandChannel(originalChannel, stopTime) {
console.log("Pause on-demand channel : " + originalChannel.number);
let channel = clone(originalChannel);
// first find what the heck is playing
let t = stopTime;
let s = new Date(channel.startTime).getTime();
let onDemand = channel.onDemand;
onDemand.paused = true;
if ( channel.programs.length == 0) {
console.log("On-demand channel has no programs. That doesn't really make a lot of sense...");
onDemand.firstProgramModulo = s % onDemand.modulo;
onDemand.playedOffset = 0;
} else if (t < s) {
// the first program didn't even play.
onDemand.firstProgramModulo = s % onDemand.modulo;
onDemand.playedOffset = 0;
} else {
let i = 0;
let total = 0;
while (true) {
let d = channel.programs[i].duration;
if ( (s + total <= t) && (t < s + total + d) ) {
break;
}
total += d;
i = (i + 1) % channel.programs.length;
}
// rotate
let programs = [];
for (let j = i; j < channel.programs.length; j++) {
programs.push( channel.programs[j] );
}
for (let j = 0; j <i; j++) {
programs.push( channel.programs[j] );
}
onDemand.firstProgramModulo = (s + total) % onDemand.modulo;
onDemand.playedOffset = t - (s + total);
channel.programs = programs;
channel.startTime = new Date(s + total).toISOString();
}
return channel;
}
async updateChannelSync(channel) {
try {
this.channelCache.saveChannelConfig(channel.number, channel );
await this.channelDB.saveChannel(channel.number, channel);
console.log("Channel " + channel.number + " saved by on-demand service...");
} catch (err) {
console.error("Error saving resumed channel: " + channel.number, err);
}
}
updateChannelAsync(channel) {
this.updateChannelSync(channel);
this.updateXmltv();
}
fixupChannelBeforeSave(channel, isActive) {
if (typeof(channel.onDemand) === 'undefined') {
channel.onDemand = {};
}
if (typeof(channel.onDemand.isOnDemand) !== 'boolean') {
channel.onDemand.isOnDemand = false;
}
if ( channel.onDemand.isOnDemand !== true ) {
channel.onDemand.modulo = 1;
channel.onDemand.firstProgramModulo = 1;
channel.onDemand.playedOffset = 0;
channel.onDemand.paused = false;
} else {
if ( typeof(channel.onDemand.modulo) !== 'number') {
channel.onDemand.modulo = 1;
}
if (isActive) {
// if it is active, the channel isn't paused
channel.onDemand.paused = false;
} else {
let s = new Date(channel.startTime).getTime();
channel.onDemand.paused = true;
channel.onDemand.firstProgramModulo = s % channel.onDemand.modulo;
channel.onDemand.playedOffset = 0;
}
}
}
resumeOnDemandChannel(t, originalChannel) {
let channel = clone(originalChannel);
console.log("Resume on-demand channel: " + channel.name);
let programs = channel.programs;
let onDemand = channel.onDemand;
onDemand.paused = false; //should be the invariant
if (programs.length == 0) {
console.log("On-demand channel is empty. This doesn't make a lot of sense...");
return channel;
}
let i = 0;
let backupFo = onDemand.firstProgramModulo;
while (i < programs.length) {
let program = programs[i];
if ( program.isOffline && (program.type !== 'redirect') ) {
//skip flex
i++;
onDemand.playedOffset = 0;
onDemand.firstProgramModulo = ( onDemand.firstProgramModulo + program.duration ) % onDemand.modulo;
} else {
break;
}
}
if (i == programs.length) {
console.log("Everything in the channel is flex... This doesn't really make a lot of sense for an onDemand channel, you know...");
i = 0;
onDemand.playedOffset = 0;
onDemand.firstProgramModulo = backupFo;
}
// Last we've seen this channel, it was playing program #i , played the first playedOffset milliseconds.
// move i to the beginning of the program list
let newPrograms = []
for (let j = i; j < programs.length; j++) {
newPrograms.push( programs[j] );
}
for (let j = 0; j < i; j++) {
newPrograms.push( programs[j] );
}
// now the start program is 0, and the "only" thing to do now is change the start time
let startTime = t - onDemand.playedOffset;
// with this startTime, it would work perfectly if modulo is 1. But what about other cases?
let tm = t % onDemand.modulo;
let pm = (onDemand.firstProgramModulo + onDemand.playedOffset) % onDemand.modulo;
if (tm < pm) {
startTime += (pm - tm);
} else {
let o = (tm - pm);
startTime = startTime - o;
if (o >= SLACK) {
startTime += onDemand.modulo;
}
}
channel.startTime = (new Date(startTime)).toISOString();
channel.programs = newPrograms;
return channel;
}
isOnDemandChannelPaused(channel) {
return (
(typeof(channel.onDemand) !== 'undefined')
&&
(channel.onDemand.isOnDemand === true)
&&
(channel.onDemand.paused === true)
);
}
}
function clone(channel) {
return JSON.parse( JSON.stringify(channel) );
}
module.exports = OnDemandService

View File

@ -0,0 +1,35 @@
const helperFuncs = require("../helperFuncs");
/* Tells us what is or should be playing in some channel
If the channel is a an on-demand channel and is paused, resume the channel.
Before running the logic.
This hub for the programming logic used to be helperFuncs.getCurrentProgramAndTimeElapsed.
This class will still call that function, but this should be the entry point
for that logic.
Eventually it looks like a good idea to move that logic here.
*/
class ProgrammingService
{
/****
*
**/
constructor(onDemandService) {
this.onDemandService = onDemandService;
}
getCurrentProgramAndTimeElapsed(moment, channel) {
channel = onDemandService.activateChannelIfNeeded(moment, channel);
return helperFuncs.getCurrentProgramAndTimeElapsed(moment, channel);
}
}
module.exports = ProgrammingService

View File

@ -81,6 +81,17 @@ class TVGuideService
async getCurrentPlayingIndex(channel, t) {
let s = (new Date(channel.startTime)).getTime();
if ( (typeof(channel.onDemand) !== 'undefined') && channel.onDemand.isOnDemand && channel.onDemand.paused ) {
// it's as flex
return {
index : -1,
start : t,
program : {
isOffline : true,
duration : 12*60*1000,
}
}
}
if (t < s) {
//it's flex time
return {

View File

@ -7,9 +7,7 @@ function equalItems(a, b) {
if ( (typeof(a) === 'undefined') || a.isOffline || b.isOffline ) {
return false;
}
console.log("no idea how to compare this: " + JSON.stringify(a) );
console.log(" with this: " + JSON.stringify(b) );
return true;
return ( a.type === b.type);
}

View File

@ -7,12 +7,13 @@ const fs = require('fs')
const ProgramPlayer = require('./program-player');
const channelCache = require('./channel-cache')
const wereThereTooManyAttempts = require('./throttler');
const constants = require('./constants');
module.exports = { router: video }
let StreamCount = 0;
function video( channelDB , fillerDB, db) {
function video( channelDB , fillerDB, db, programmingService, activeChannelService ) {
var router = express.Router()
router.get('/setup', (req, res) => {
@ -181,8 +182,9 @@ function video( channelDB , fillerDB, db) {
start: 0,
};
} else if (lineupItem == null) {
prog = helperFuncs.getCurrentProgramAndTimeElapsed(t0, channel);
prog = programmingService.getCurrentProgramAndTimeElapsed(t0, channel);
activeChannelService.peekChannel(t0, channel.number);
while (true) {
redirectChannels.push( brandChannel );
upperBounds.push( prog.program.duration - prog.timeElapsed );
@ -223,7 +225,8 @@ function video( channelDB , fillerDB, db) {
lineupItem = JSON.parse( JSON.stringify(lineupItem)) ;
break;
} else {
prog = helperFuncs.getCurrentProgramAndTimeElapsed(t0, newChannel);
prog = programmingService.getCurrentProgramAndTimeElapsed(t0, newChannel);
activeChannelService.peekChannel(t0, newChannel.number);
}
}
}
@ -332,8 +335,12 @@ function video( channelDB , fillerDB, db) {
'Content-Type': 'video/mp2t'
});
let t1;
try {
playerObj = await player.play(res);
t1 = (new Date()).getTime();
console.log("Latency: (" + (t1- t0) );
} catch (err) {
console.log("Error when attempting to play video: " +err.stack);
try {
@ -345,7 +352,35 @@ function video( channelDB , fillerDB, db) {
return;
}
if (! isLoading) {
//setup end event to mark the channel as not playing anymore
let t0 = new Date().getTime();
let b = 0;
let stopDetected = false;
if (typeof(lineupItem.beginningOffset) !== 'undefined') {
b = lineupItem.beginningOffset;
t0 -= b;
}
// we have to do it for every single redirected channel...
for (let i = redirectChannels.length-1; i >= 0; i--) {
activeChannelService.registerChannelActive(t0, redirectChannels[i].number);
}
let oldStop = stop;
stop = () => {
if (!stopDetected) {
stopDetected = true;
let t1 = new Date().getTime();
t1 = Math.max( t0 + 1, t1 - constants.FORGETFULNESS_BUFFER - b );
for (let i = redirectChannels.length-1; i >= 0; i--) {
activeChannelService.registerChannelStopped(t1, redirectChannels[i].number);
}
}
oldStop();
};
}
let stream = playerObj;
@ -354,9 +389,13 @@ function video( channelDB , fillerDB, db) {
stream.on("end", () => {
let t2 = (new Date()).getTime();
console.log("Played video for: " + (t2 - t1) + " ms");
stop();
});
res.on("close", () => {
let t2 = (new Date()).getTime();
console.log("Played video for: " + (t2 - t1) + " ms");
console.log("Client Closed");
stop();
});

View File

@ -86,6 +86,10 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get
scope.channel.transcoding = {
targetResolution: "",
}
scope.channel.onDemand = {
isOnDemand : false,
modulo: 1,
}
} else {
scope.beforeEditChannelNumber = scope.channel.number
@ -142,6 +146,16 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get
scope.channel.transcoding.targetResolution = "";
}
if (typeof(scope.channel.onDemand) === 'undefined') {
scope.channel.onDemand = {};
}
if (typeof(scope.channel.onDemand.isOnDemand) !== 'boolean') {
scope.channel.onDemand.isOnDemand = false;
}
if (typeof(scope.channel.onDemand.modulo) !== 'number') {
scope.channel.onDemand.modulo = 1;
}
adjustStartTimeToCurrentProgram();
updateChannelDuration();
@ -163,6 +177,22 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get
let t = Date.now();
let originalStart = scope.channel.startTime.getTime();
let n = scope.channel.programs.length;
if (
(scope.channel.onDemand.isOnDemand === true)
&&
(scope.channel.onDemand.paused === true)
) {
originalStart = new Date().getTime();
originalStart -= scope.channel.onDemand.playedOffset;
let m = scope.channel.onDemand.firstProgramModulo;
let n = originalStart % scope.channel.onDemand.modulo;
if (n < m) {
originalStart += (m - n);
} else if (n > m) {
originalStart -= (n - m) - scope.channel.onDemand.modulo;
}
}
//scope.channel.totalDuration might not have been initialized
let totalDuration = 0;
for (let i = 0; i < n; i++) {
@ -220,6 +250,7 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get
{ name: "Flex", id: "flex" },
{ name: "EPG", id: "epg" },
{ name: "FFmpeg", id: "ffmpeg" },
{ name: "On-demand", id: "ondemand" },
];
scope.setTab = (tab) => {
scope.tab = tab;

View File

@ -83,7 +83,7 @@
<input id="channelEndTime" class="form-control form-control-sm col-md-auto" type="datetime-local" ng-model="endTime" ng-disabled="true" aria-describedby="endTimeHelp"></input>
</div>
<div class='col-md-auto'>
<small class="text-muted form-text" id='endTimeHelp'>Programming will restart from the beginning.</small>
<small class="text-muted form-text" id='endTimeHelp'>Programming will restart from the beginning. </small><small ng-show='channel.onDemand.isOnDemand' class="text-muted form-text" id='endTimeHelp'>For on-demand channels, the times in the schedule are tentative. </small>
</div>
</div>
@ -838,6 +838,38 @@
</div>
<!--
============= TAB: ON-DEMAND =========================
-->
<div class="modal-body" ng-if="tab == 'ondemand'">
<div class="form-check">
<input type="checkbox" class="form-check-input" id="onDemand" aria-describedby="onDemandHelp" ng-model='channel.onDemand.isOnDemand'>
<label class="form-check-label" for="onDemand">On-Demand</label>
<span class='text-muted' id="stealthHelp">(The channel's programming will be paused when it is not being played. No programs will appear in the TV-guide while the channel is paused.)</span>
</div>
<br></br>
<div class='form-group' ng-show='channel.onDemand.isOnDemand'>
<label class='form-label' for="segmentLength" >Segment Length:</label>
<select class="form-control custom-select" id="segmentLength" ng-model="channel.onDemand.modulo" convert-to-number >
<option ng-value="1">Instant</option>
<option ng-value="300000">5 minutes</option>
<option ng-value="600000">10 minutes</option>
<option ng-value="900000">15 minutes</option>
<option ng-value="1800000">30 minutes</option>
<option ng-value="6000000">1 hour</option>
</select>
<small id='guideFlexHelp' class="text-muted" for='guideFlex'>Channel will be divided in segments. For example, if you use padding or time slots in your channel so that everything starts at 0:00 or 0:30 , you want a 30 minutes-segment. Use no segment if you want the channel to play exactly where you left it. Flex time will be added if necessary for padding.</small>
</div>
</div>
<div class="modal-footer">
<span class="pull-right text-danger" ng-show="error.any"> <i class='fa fa-exclamation-triangle'></i> There were errors. Please review the form.</span>
<span class="pull-right text-info" ng-show='! hasPrograms() && (tab != "programming")'> <i class='fas fa-info-circle'></i> Use the &quot;Programming&quot; tab to add programs to the channel.</span>