diff --git a/index.js b/index.js index cfe1f39..9f4ee46 100644 --- a/index.js +++ b/index.js @@ -23,6 +23,10 @@ const FillerDB = require("./src/dao/filler-db"); const CustomShowDB = require("./src/dao/custom-show-db"); const TVGuideService = require("./src/services/tv-guide-service"); const EventService = require("./src/services/event-service"); +const OnDemandService = require("./src/services/on-demand-service"); +const ProgrammingService = require("./src/services/programming-service"); +const ActiveChannelService = require('./src/services/active-channel-service') + const onShutdown = require("node-graceful-shutdown").onShutdown; console.log( @@ -171,6 +175,10 @@ let xmltvInterval = { xmltvInterval.updateXML() xmltvInterval.startInterval() +onDemandService = new OnDemandService(channelCache, channelDB, xmltvInterval); +programmingService = new ProgrammingService(onDemandService); +activeChannelService = new ActiveChannelService(onDemandService, channelCache, channelDB); + let hdhr = HDHR(db, channelDB) let app = express() eventService.setup(app); @@ -208,10 +216,10 @@ app.use('/favicon.svg', express.static( app.use('/custom.css', express.static(path.join(process.env.DATABASE, 'custom.css'))) // API Routers -app.use(api.router(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService )) +app.use(api.router(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService, onDemandService, activeChannelService )) app.use('/api/cache/images', cacheImageService.apiRouters()) -app.use(video.router( channelDB, fillerDB, db)) +app.use(video.router( channelDB, fillerDB, db, programmingService, activeChannelService )) app.use(hdhr.router) app.listen(process.env.PORT, () => { console.log(`HTTP server running on port: http://*:${process.env.PORT}`) @@ -304,4 +312,7 @@ onShutdown("log" , [], async() => { onShutdown("xmltv-writer" , [], async() => { await xmltv.shutdown(); } ); +onShutdown("active-channels", [], async() => { + await activeChannelService.shutdown(); +} ); diff --git a/src/api.js b/src/api.js index 64599bc..23b2136 100644 --- a/src/api.js +++ b/src/api.js @@ -26,7 +26,7 @@ function safeString(object) { } module.exports = { router: api } -function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService ) { +function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService, onDemandService, activeChannelService ) { let m3uService = _m3uService; const router = express.Router() const plexServerDB = new PlexServerDB(channelDB, channelCache, fillerDB, customShowDB, db); @@ -331,10 +331,12 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService res.status(500).send("error"); } }) + // we urgently need an actual channel service router.post('/api/channel', async (req, res) => { try { await m3uService.clearCache(); cleanUpChannel(req.body); + onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) ); await channelDB.saveChannel( req.body.number, req.body ); channelCache.clear(); res.send( { number: req.body.number} ) @@ -348,6 +350,7 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService try { await m3uService.clearCache(); cleanUpChannel(req.body); + onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) ); await channelDB.saveChannel( req.body.number, req.body ); channelCache.clear(); res.send( { number: req.body.number} ) diff --git a/src/channel-cache.js b/src/channel-cache.js index 3588c96..cf2568a 100644 --- a/src/channel-cache.js +++ b/src/channel-cache.js @@ -40,6 +40,7 @@ async function getAllChannels(channelDB) { function saveChannelConfig(number, channel ) { configCache[number] = [channel]; + delete cache[number]; } function getCurrentLineupItem(channelId, t1) { @@ -153,6 +154,7 @@ module.exports = { clear: clear, getProgramLastPlayTime: getProgramLastPlayTime, getAllChannels: getAllChannels, + getAllNumbers: getAllNumbers, getChannelConfig: getChannelConfig, saveChannelConfig: saveChannelConfig, getFillerLastPlayTime: getFillerLastPlayTime, diff --git a/src/constants.js b/src/constants.js index afc9d69..6beaba8 100644 --- a/src/constants.js +++ b/src/constants.js @@ -5,5 +5,22 @@ module.exports = { TVGUIDE_MAXIMUM_FLEX_DURATION : 6 * 60 * 60 * 1000, TOO_FREQUENT: 100, + // if a channel is stopped while something is playing, subtract + // this amount of milliseconds from the last-played timestamp, because + // video playback has latency and also because maybe the user wants + // the last 30 seconds to remember what was going on... + FORGETFULNESS_BUFFER: 30 * 1000, + + // When a channel stops playing, this is a grace period before the channel is + // considered offline. It could be that the client halted the playback for some + // reason and is about to start playing again. Or maybe the user switched + // devices or something. Otherwise we would have on-demand channels constantly + // reseting on their own. + MAX_CHANNEL_IDLE: 60*1000, + + // there's a timer that checks all active channels to see if they really are + // staying active, it checks every 5 seconds + PLAYED_MONITOR_CHECK_FREQUENCY: 5*1000, + VERSION_NAME: "1.5.0-development" } diff --git a/src/services/active-channel-service.js b/src/services/active-channel-service.js new file mode 100644 index 0000000..75d7293 --- /dev/null +++ b/src/services/active-channel-service.js @@ -0,0 +1,143 @@ + +const constants = require("../constants"); + +/* Keeps track of which channels are being played, calls on-demand service + when they stop playing. +*/ + +class ActiveChannelService +{ + /**** + * + **/ + constructor(onDemandService, channelCache, channelDB) { + console.log("DEFINE THIS.CACHE"); + this.cache = {}; + this.channelDB = channelDB; + this.onDemandService = onDemandService; + this.channelCache = channelCache; + this.timeNoDelta = new Date().getTime(); + + this.loadChannelsForFirstTry(); + this.setupTimer(); + } + + loadChannelsForFirstTry() { + let fun = async() => { + try { + let numbers = await this.channelCache.getAllNumbers(this.channelDB); + numbers.forEach( (number) => { + this.ensure(this.timeNoDelta, number); + } ); + this.checkChannels(); + } catch (err) { + console.error("Unexpected error when checking channels for the first time.", err); + } + } + fun(); + } + + async shutdown() { + try { + let t = new Date().getTime() - constants.FORGETFULNESS_BUFFER; + for (const [channelNumber, value] of Object.entries(this.cache)) { + console.log("Forcefully registering channel " + channelNumber + " as stopped..."); + delete this.cache[ channelNumber ]; + await this.onDemandService.registerChannelStopped( channelNumber, t , true); + } + } catch (err) { + console.error("Unexpected error when shutting down active channels service.", err); + } + } + + setupTimer() { + this.handle = setTimeout( () => this.timerLoop(), constants.PLAYED_MONITOR_CHECK_FREQUENCY ); + } + + checkChannel(t, channelNumber, value) { + if (value.active === 0) { + let delta = t - value.lastUpdate; + if ( (delta >= constants.MAX_CHANNEL_IDLE) || (value.lastUpdate <= this.timeNoDelta) ) { + console.log("Channel : " + channelNumber + " is not playing..."); + onDemandService.registerChannelStopped(channelNumber, value.stopTime); + delete this.cache[channelNumber]; + } + } + } + + checkChannels() { + let t = new Date().getTime(); + for (const [channelNumber, value] of Object.entries(this.cache)) { + this.checkChannel(t, channelNumber, value); + } + } + + timerLoop() { + try { + this.checkChannels(); + } catch (err) { + console.error("There was an error in active channel timer loop", err); + } finally { + this.setupTimer(); + } + + } + + + registerChannelActive(t, channelNumber) { + this.ensure(t, channelNumber); + console.log("Register that channel is being played: " + channelNumber ); + this.cache[channelNumber].active++; + this.cache[channelNumber].stopTime = 0; + this.cache[channelNumber].lastUpdate = new Date().getTime(); + } + + registerChannelStopped(t, channelNumber) { + this.ensure(t, channelNumber); + console.log("Register that channel is no longer being played: " + channelNumber ); + if (this.cache[channelNumber].active === 0) { + console.error("Serious issue with channel active service, double delete"); + } else { + this.cache[channelNumber].active--; + this.cache[channelNumber].stopTime = t; + this.cache[channelNumber].lastUpdate = new Date().getTime(); + } + } + + ensure(t, channelNumber) { + if (typeof(this.cache[channelNumber]) === 'undefined') { + this.cache[channelNumber] = { + active: 0, + stopTime: t, + lastUpdate: t, + } + } + } + + peekChannel(t, channelNumber) { + this.ensure(t, channelNumber); + } + + isActiveWrapped(channelNumber) { + if (typeof(this.cache[channelNumber]) === 'undefined') { + return false; + } + if (typeof(this.cache[channelNumber].active) !== 'number') { + return false; + } + return (this.cache[channelNumber].active !== 0); + + } + + isActive(channelNumber) { + let bol = this.isActiveWrapped(channelNumber); + console.log( "channelNumber = " + channelNumber + " active? " + bol); + return bol; + + + } + + +} + +module.exports = ActiveChannelService diff --git a/src/services/on-demand-service.js b/src/services/on-demand-service.js new file mode 100644 index 0000000..22f9a6f --- /dev/null +++ b/src/services/on-demand-service.js @@ -0,0 +1,218 @@ + +const constants = require("../constants"); + +const SLACK = constants.SLACK; + + +class OnDemandService +{ + /**** + * + **/ + constructor(channelCache, channelDB, xmltvInterval) { + this.channelCache = channelCache; + this.channelDB = channelDB; + this.xmltvInterval = xmltvInterval; + } + + activateChannelIfNeeded(moment, channel) { + if ( this.isOnDemandChannelPaused(channel) ) { + channel = this.resumeOnDemandChannel(moment, channel); + this.updateChannelAsync(channel); + } + return channel; + } + + async registerChannelStopped(channelNumber, stopTime, waitForSave) { + try { + let channel = await this.channelCache.getChannelConfig( this.channelDB, channelNumber); + if (channel.length === 0) { + console.error("Could not stop channel " + channelNumber + " because it apparently no longer exists"); // I guess if someone deletes the channel just in the grace period? + return + } + channel = channel[0]; + if ( (typeof(channel.onDemand) !== 'undefined') && channel.onDemand.isOnDemand && ! channel.onDemand.paused) { + //pause the channel + channel = this.pauseOnDemandChannel( channel , stopTime ); + if (waitForSave) { + await this.updateChannelSync(channel); + } else { + this.updateChannelAsync(channel); + } + } + } catch (err) { + console.error("Error stopping channel", err); + } + + } + + updateXmltv() { + this.xmltvInterval.updateXML() + this.xmltvInterval.restartInterval() + } + + + pauseOnDemandChannel(originalChannel, stopTime) { + console.log("Pause on-demand channel : " + originalChannel.number); + let channel = clone(originalChannel); + // first find what the heck is playing + let t = stopTime; + let s = new Date(channel.startTime).getTime(); + let onDemand = channel.onDemand; + onDemand.paused = true; + if ( channel.programs.length == 0) { + console.log("On-demand channel has no programs. That doesn't really make a lot of sense..."); + onDemand.firstProgramModulo = s % onDemand.modulo; + onDemand.playedOffset = 0; + + } else if (t < s) { + // the first program didn't even play. + onDemand.firstProgramModulo = s % onDemand.modulo; + onDemand.playedOffset = 0; + } else { + let i = 0; + let total = 0; + while (true) { + let d = channel.programs[i].duration; + if ( (s + total <= t) && (t < s + total + d) ) { + break; + } + total += d; + i = (i + 1) % channel.programs.length; + } + // rotate + let programs = []; + for (let j = i; j < channel.programs.length; j++) { + programs.push( channel.programs[j] ); + } + for (let j = 0; j = SLACK) { + startTime += onDemand.modulo; + } + } + channel.startTime = (new Date(startTime)).toISOString(); + channel.programs = newPrograms; + return channel; + } + + isOnDemandChannelPaused(channel) { + return ( + (typeof(channel.onDemand) !== 'undefined') + && + (channel.onDemand.isOnDemand === true) + && + (channel.onDemand.paused === true) + ); + } + +} +function clone(channel) { + return JSON.parse( JSON.stringify(channel) ); +} + +module.exports = OnDemandService diff --git a/src/services/programming-service.js b/src/services/programming-service.js new file mode 100644 index 0000000..8386cb7 --- /dev/null +++ b/src/services/programming-service.js @@ -0,0 +1,35 @@ + +const helperFuncs = require("../helperFuncs"); + +/* Tells us what is or should be playing in some channel + If the channel is a an on-demand channel and is paused, resume the channel. + Before running the logic. + + This hub for the programming logic used to be helperFuncs.getCurrentProgramAndTimeElapsed. + + This class will still call that function, but this should be the entry point + for that logic. + + Eventually it looks like a good idea to move that logic here. + +*/ + +class ProgrammingService +{ + /**** + * + **/ + constructor(onDemandService) { + this.onDemandService = onDemandService; + } + + getCurrentProgramAndTimeElapsed(moment, channel) { + channel = onDemandService.activateChannelIfNeeded(moment, channel); + return helperFuncs.getCurrentProgramAndTimeElapsed(moment, channel); + } + + + +} + +module.exports = ProgrammingService diff --git a/src/services/tv-guide-service.js b/src/services/tv-guide-service.js index cd3160d..74f73b9 100644 --- a/src/services/tv-guide-service.js +++ b/src/services/tv-guide-service.js @@ -81,6 +81,17 @@ class TVGuideService async getCurrentPlayingIndex(channel, t) { let s = (new Date(channel.startTime)).getTime(); + if ( (typeof(channel.onDemand) !== 'undefined') && channel.onDemand.isOnDemand && channel.onDemand.paused ) { + // it's as flex + return { + index : -1, + start : t, + program : { + isOffline : true, + duration : 12*60*1000, + } + } + } if (t < s) { //it's flex time return { diff --git a/src/throttler.js b/src/throttler.js index 543cbe9..ae660f5 100644 --- a/src/throttler.js +++ b/src/throttler.js @@ -7,9 +7,7 @@ function equalItems(a, b) { if ( (typeof(a) === 'undefined') || a.isOffline || b.isOffline ) { return false; } - console.log("no idea how to compare this: " + JSON.stringify(a) ); - console.log(" with this: " + JSON.stringify(b) ); - return true; + return ( a.type === b.type); } diff --git a/src/video.js b/src/video.js index c2d7897..262559d 100644 --- a/src/video.js +++ b/src/video.js @@ -7,12 +7,13 @@ const fs = require('fs') const ProgramPlayer = require('./program-player'); const channelCache = require('./channel-cache') const wereThereTooManyAttempts = require('./throttler'); +const constants = require('./constants'); module.exports = { router: video } let StreamCount = 0; -function video( channelDB , fillerDB, db) { +function video( channelDB , fillerDB, db, programmingService, activeChannelService ) { var router = express.Router() router.get('/setup', (req, res) => { @@ -181,8 +182,9 @@ function video( channelDB , fillerDB, db) { start: 0, }; } else if (lineupItem == null) { - prog = helperFuncs.getCurrentProgramAndTimeElapsed(t0, channel); - + prog = programmingService.getCurrentProgramAndTimeElapsed(t0, channel); + activeChannelService.peekChannel(t0, channel.number); + while (true) { redirectChannels.push( brandChannel ); upperBounds.push( prog.program.duration - prog.timeElapsed ); @@ -223,7 +225,8 @@ function video( channelDB , fillerDB, db) { lineupItem = JSON.parse( JSON.stringify(lineupItem)) ; break; } else { - prog = helperFuncs.getCurrentProgramAndTimeElapsed(t0, newChannel); + prog = programmingService.getCurrentProgramAndTimeElapsed(t0, newChannel); + activeChannelService.peekChannel(t0, newChannel.number); } } } @@ -332,8 +335,12 @@ function video( channelDB , fillerDB, db) { 'Content-Type': 'video/mp2t' }); + let t1; + try { playerObj = await player.play(res); + t1 = (new Date()).getTime(); + console.log("Latency: (" + (t1- t0) ); } catch (err) { console.log("Error when attempting to play video: " +err.stack); try { @@ -345,7 +352,35 @@ function video( channelDB , fillerDB, db) { return; } + if (! isLoading) { + //setup end event to mark the channel as not playing anymore + let t0 = new Date().getTime(); + let b = 0; + let stopDetected = false; + if (typeof(lineupItem.beginningOffset) !== 'undefined') { + b = lineupItem.beginningOffset; + t0 -= b; + } + // we have to do it for every single redirected channel... + + for (let i = redirectChannels.length-1; i >= 0; i--) { + activeChannelService.registerChannelActive(t0, redirectChannels[i].number); + } + + let oldStop = stop; + stop = () => { + if (!stopDetected) { + stopDetected = true; + let t1 = new Date().getTime(); + t1 = Math.max( t0 + 1, t1 - constants.FORGETFULNESS_BUFFER - b ); + for (let i = redirectChannels.length-1; i >= 0; i--) { + activeChannelService.registerChannelStopped(t1, redirectChannels[i].number); + } + } + oldStop(); + }; + } let stream = playerObj; @@ -354,9 +389,13 @@ function video( channelDB , fillerDB, db) { stream.on("end", () => { + let t2 = (new Date()).getTime(); + console.log("Played video for: " + (t2 - t1) + " ms"); stop(); }); res.on("close", () => { + let t2 = (new Date()).getTime(); + console.log("Played video for: " + (t2 - t1) + " ms"); console.log("Client Closed"); stop(); }); diff --git a/web/directives/channel-config.js b/web/directives/channel-config.js index 9885f29..a731d78 100644 --- a/web/directives/channel-config.js +++ b/web/directives/channel-config.js @@ -86,6 +86,10 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get scope.channel.transcoding = { targetResolution: "", } + scope.channel.onDemand = { + isOnDemand : false, + modulo: 1, + } } else { scope.beforeEditChannelNumber = scope.channel.number @@ -142,6 +146,16 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get scope.channel.transcoding.targetResolution = ""; } + if (typeof(scope.channel.onDemand) === 'undefined') { + scope.channel.onDemand = {}; + } + if (typeof(scope.channel.onDemand.isOnDemand) !== 'boolean') { + scope.channel.onDemand.isOnDemand = false; + } + if (typeof(scope.channel.onDemand.modulo) !== 'number') { + scope.channel.onDemand.modulo = 1; + } + adjustStartTimeToCurrentProgram(); updateChannelDuration(); @@ -163,6 +177,22 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get let t = Date.now(); let originalStart = scope.channel.startTime.getTime(); let n = scope.channel.programs.length; + + if ( + (scope.channel.onDemand.isOnDemand === true) + && + (scope.channel.onDemand.paused === true) + ) { + originalStart = new Date().getTime(); + originalStart -= scope.channel.onDemand.playedOffset; + let m = scope.channel.onDemand.firstProgramModulo; + let n = originalStart % scope.channel.onDemand.modulo; + if (n < m) { + originalStart += (m - n); + } else if (n > m) { + originalStart -= (n - m) - scope.channel.onDemand.modulo; + } + } //scope.channel.totalDuration might not have been initialized let totalDuration = 0; for (let i = 0; i < n; i++) { @@ -220,6 +250,7 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get { name: "Flex", id: "flex" }, { name: "EPG", id: "epg" }, { name: "FFmpeg", id: "ffmpeg" }, + { name: "On-demand", id: "ondemand" }, ]; scope.setTab = (tab) => { scope.tab = tab; diff --git a/web/public/templates/channel-config.html b/web/public/templates/channel-config.html index baf082d..da9a908 100644 --- a/web/public/templates/channel-config.html +++ b/web/public/templates/channel-config.html @@ -83,7 +83,7 @@
- Programming will restart from the beginning. + Programming will restart from the beginning. For on-demand channels, the times in the schedule are tentative.
@@ -838,6 +838,38 @@ + + + + +