From 58aa84d19f0bda4077d2ec1e18060b91e6965082 Mon Sep 17 00:00:00 2001 From: vexorian Date: Sun, 8 Aug 2021 21:42:15 -0400 Subject: [PATCH 1/3] Do not clone whole channel just to combine the channel --- src/helperFuncs.js | 22 ++++++++++++++++++++++ src/video.js | 2 +- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/helperFuncs.js b/src/helperFuncs.js index 343d02c..4cf2301 100644 --- a/src/helperFuncs.js +++ b/src/helperFuncs.js @@ -2,6 +2,7 @@ module.exports = { getCurrentProgramAndTimeElapsed: getCurrentProgramAndTimeElapsed, createLineup: createLineup, getWatermark: getWatermark, + generateChannelContext: generateChannelContext, } let channelCache = require('./channel-cache'); @@ -10,6 +11,17 @@ const randomJS = require("random-js"); const Random = randomJS.Random; const random = new Random( randomJS.MersenneTwister19937.autoSeed() ); +const CHANNEL_CONTEXT_KEYS = [ + "disableFillerOverlay", + "watermark", + "icon", + "offlinePicture", + "offlineSoundtrack", + "name", + "transcoding", + "number", +]; + module.exports.random = random; function getCurrentProgramAndTimeElapsed(date, channel) { @@ -260,6 +272,7 @@ function pickRandomWithMaxDuration(channel, fillers, maxDuration) { } } +// any channel thing used here should be added to channel context function getWatermark( ffmpegSettings, channel, type) { if (! ffmpegSettings.enableFFMPEGTranscoding || ffmpegSettings.disableChannelOverlay ) { return null; @@ -301,3 +314,12 @@ function getWatermark( ffmpegSettings, channel, type) { return result; } + +function generateChannelContext(channel) { + let channelContext = {}; + for (let i = 0; i < CHANNEL_CONTEXT_KEYS.length; i++) { + let key = CHANNEL_CONTEXT_KEYS[i]; + channelContext[key] = JSON.parse( JSON.stringify(channel[key] ) ); + } + return channelContext; +} diff --git a/src/video.js b/src/video.js index c2d7897..9ed736d 100644 --- a/src/video.js +++ b/src/video.js @@ -305,7 +305,7 @@ function video( channelDB , fillerDB, db) { }; } - let combinedChannel = JSON.parse( JSON.stringify(brandChannel) ); + let combinedChannel = helperFuncs.generateChannelContext(brandChannel); combinedChannel.transcoding = channel.transcoding; let playerContext = { From 9fb4db8d8682d519b92383d7ef2437c92b019605 Mon Sep 17 00:00:00 2001 From: vexorian Date: Sat, 7 Aug 2021 11:01:30 -0400 Subject: [PATCH 2/3] Fix rewind/fast forward not working correctly in on-demand channels. --- web/directives/channel-config.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/web/directives/channel-config.js b/web/directives/channel-config.js index a731d78..1bdec64 100644 --- a/web/directives/channel-config.js +++ b/web/directives/channel-config.js @@ -49,6 +49,7 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get scope.episodeMemory = { saved : false, }; + scope.fixedOnDemand = false; if (typeof scope.channel === 'undefined' || scope.channel == null) { scope.channel = {} scope.channel.programs = [] @@ -182,7 +183,11 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get (scope.channel.onDemand.isOnDemand === true) && (scope.channel.onDemand.paused === true) + && + ! scope.fixedOnDemand ) { + //this should only happen once per channel + scope.fixedOnDemand = true; originalStart = new Date().getTime(); originalStart -= scope.channel.onDemand.playedOffset; let m = scope.channel.onDemand.firstProgramModulo; From 30252b7d0736469a87c1c0b208da7218e7af3ab2 Mon Sep 17 00:00:00 2001 From: vexorian Date: Mon, 9 Aug 2021 00:15:22 -0400 Subject: [PATCH 3/3] Channel Service refactor. Editing a channel that's currently playing will also change its current stream. --- index.js | 74 +++++++++++------ src/api.js | 94 +++++----------------- src/channel-cache.js | 34 +++++++- src/constants.js | 6 ++ src/dao/filler-db.js | 15 ++-- src/dao/plex-server-db.js | 15 ++-- src/services/active-channel-service.js | 25 +++--- src/services/channel-service.js | 101 +++++++++++++++++++++++ src/services/m3u-service.js | 10 ++- src/services/on-demand-service.js | 38 +++++---- src/services/tv-guide-service.js | 84 ++++++++++++++++---- src/video.js | 106 ++++++++++++++++++++----- 12 files changed, 418 insertions(+), 184 deletions(-) create mode 100644 src/services/channel-service.js diff --git a/index.js b/index.js index 74a1c8f..fc06018 100644 --- a/index.js +++ b/index.js @@ -15,10 +15,10 @@ const video = require('./src/video') const HDHR = require('./src/hdhr') const FileCacheService = require('./src/services/file-cache-service'); const CacheImageService = require('./src/services/cache-image-service'); +const ChannelService = require("./src/services/channel-service"); const xmltv = require('./src/xmltv') const Plex = require('./src/plex'); -const channelCache = require('./src/channel-cache'); const constants = require('./src/constants') const ChannelDB = require("./src/dao/channel-db"); const M3uService = require("./src/services/m3u-service"); @@ -81,20 +81,26 @@ if(!fs.existsSync(path.join(process.env.DATABASE, 'cache','images'))) { channelDB = new ChannelDB( path.join(process.env.DATABASE, 'channels') ); -fillerDB = new FillerDB( path.join(process.env.DATABASE, 'filler') , channelDB, channelCache ); - -customShowDB = new CustomShowDB( path.join(process.env.DATABASE, 'custom-shows') ); db.connect(process.env.DATABASE, ['channels', 'plex-servers', 'ffmpeg-settings', 'plex-settings', 'xmltv-settings', 'hdhr-settings', 'db-version', 'client-id', 'cache-images', 'settings']) +initDB(db, channelDB) + +channelService = new ChannelService(channelDB); + +fillerDB = new FillerDB( path.join(process.env.DATABASE, 'filler') , channelService ); +customShowDB = new CustomShowDB( path.join(process.env.DATABASE, 'custom-shows') ); + fileCache = new FileCacheService( path.join(process.env.DATABASE, 'cache') ); cacheImageService = new CacheImageService(db, fileCache); -m3uService = new M3uService(channelDB, fileCache, channelCache) +m3uService = new M3uService(fileCache, channelService) + +onDemandService = new OnDemandService(channelService); +programmingService = new ProgrammingService(onDemandService); +activeChannelService = new ActiveChannelService(onDemandService, channelService); + eventService = new EventService(); -initDB(db, channelDB) - - i18next .use(i18nextBackend) .use(i18nextMiddleware.LanguageDetector) @@ -118,29 +124,27 @@ let xmltvInterval = { interval: null, lastRefresh: null, updateXML: async () => { - let getChannelsCached = async() => { - let channelNumbers = await channelDB.getAllChannelNumbers(); - return await Promise.all( channelNumbers.map( async (x) => { - return (await channelCache.getChannelConfig(channelDB, x))[0]; - }) ); - } let channels = []; try { - channels = await getChannelsCached(); + channels = await channelService.getAllChannels(); let xmltvSettings = db['xmltv-settings'].find()[0]; let t = guideService.prepareRefresh(channels, xmltvSettings.cache*60*60*1000); channels = null; - await guideService.refresh(t); - xmltvInterval.lastRefresh = new Date() - console.log('XMLTV Updated at ', xmltvInterval.lastRefresh.toLocaleString()); + guideService.refresh(t); } catch (err) { console.error("Unable to update TV guide?", err); return; } - channels = await getChannelsCached(); + }, + + notifyPlex: async() => { + xmltvInterval.lastRefresh = new Date() + console.log('XMLTV Updated at ', xmltvInterval.lastRefresh.toLocaleString()); + + channels = await channelService.getAllChannels(); let plexServers = db['plex-servers'].find() for (let i = 0, l = plexServers.length; i < l; i++) { // Foreach plex server @@ -171,6 +175,7 @@ let xmltvInterval = { } } }, + startInterval: () => { let xmltvSettings = db['xmltv-settings'].find()[0] if (xmltvSettings.refresh !== 0) { @@ -190,12 +195,30 @@ let xmltvInterval = { } } +guideService.on("xmltv-updated", (data) => { + try { + xmltvInterval.notifyPlex(); + } catch (err) { + console.error("Unexpected issue when reacting to xmltv update", err); + } +} ); + xmltvInterval.updateXML() xmltvInterval.startInterval() -onDemandService = new OnDemandService(channelCache, channelDB, xmltvInterval); -programmingService = new ProgrammingService(onDemandService); -activeChannelService = new ActiveChannelService(onDemandService, channelCache, channelDB); + +//setup xmltv update +channelService.on("channel-update", (data) => { + try { + console.log("Updating TV Guide due to channel update..."); + //TODO: this could be smarter, like avoid updating 3 times if the channel was saved three times in a short time interval... + xmltvInterval.updateXML() + xmltvInterval.restartInterval() + } catch (err) { + console.error("Unexpected error issuing TV Guide udpate", err); + } +} ); + let hdhr = HDHR(db, channelDB) let app = express() @@ -238,10 +261,10 @@ app.use('/favicon.svg', express.static( app.use('/custom.css', express.static(path.join(process.env.DATABASE, 'custom.css'))) // API Routers -app.use(api.router(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService, onDemandService, activeChannelService )) +app.use(api.router(db, channelService, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService )) app.use('/api/cache/images', cacheImageService.apiRouters()) -app.use(video.router( channelDB, fillerDB, db, programmingService, activeChannelService )) +app.use(video.router( channelService, fillerDB, db, programmingService, activeChannelService )) app.use(hdhr.router) app.listen(process.env.PORT, () => { console.log(`HTTP server running on port: http://*:${process.env.PORT}`) @@ -338,3 +361,6 @@ onShutdown("active-channels", [], async() => { await activeChannelService.shutdown(); } ); +onShutdown("video", [], async() => { + await video.shutdown(); +} ); diff --git a/src/api.js b/src/api.js index 4782fb1..cc17398 100644 --- a/src/api.js +++ b/src/api.js @@ -3,7 +3,6 @@ const express = require('express') const path = require('path') const fs = require('fs') const databaseMigration = require('./database-migration'); -const channelCache = require('./channel-cache') const constants = require('./constants'); const JSONStream = require('JSONStream'); const FFMPEGInfo = require('./ffmpeg-info'); @@ -26,10 +25,10 @@ function safeString(object) { } module.exports = { router: api } -function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService, onDemandService, activeChannelService ) { +function api(db, channelService, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService ) { let m3uService = _m3uService; const router = express.Router() - const plexServerDB = new PlexServerDB(channelDB, channelCache, fillerDB, customShowDB, db); + const plexServerDB = new PlexServerDB(channelService, fillerDB, customShowDB, db); router.get('/api/version', async (req, res) => { try { @@ -222,7 +221,7 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService // Channels router.get('/api/channels', async (req, res) => { try { - let channels = await channelDB.getAllChannels(); + let channels = await channelService.getAllChannelNumbers(); channels.sort((a, b) => { return a.number < b.number ? -1 : 1 }) res.send(channels) } catch(err) { @@ -233,10 +232,9 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService router.get('/api/channel/:number', async (req, res) => { try { let number = parseInt(req.params.number, 10); - let channel = await channelCache.getChannelConfig(channelDB, number); + let channel = await channelService.getChannel(number); - if (channel.length == 1) { - channel = channel[0]; + if (channel != null) { res.send(channel); } else { return res.status(404).send("Channel not found"); @@ -249,10 +247,9 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService router.get('/api/channel/programless/:number', async (req, res) => { try { let number = parseInt(req.params.number, 10); - let channel = await channelCache.getChannelConfig(channelDB, number); + let channel = await channelService.getChannel(number); - if (channel.length == 1) { - channel = channel[0]; + if (channel != null) { let copy = {}; Object.keys(channel).forEach( (key) => { if (key != 'programs') { @@ -272,10 +269,9 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService router.get('/api/channel/programs/:number', async (req, res) => { try { let number = parseInt(req.params.number, 10); - let channel = await channelCache.getChannelConfig(channelDB, number); + let channel = await channelService.getChannel(number); - if (channel.length == 1) { - channel = channel[0]; + if (channel != null) { let programs = channel.programs; if (typeof(programs) === 'undefined') { return res.status(404).send("Channel doesn't have programs?"); @@ -304,9 +300,8 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService router.get('/api/channel/description/:number', async (req, res) => { try { let number = parseInt(req.params.number, 10); - let channel = await channelCache.getChannelConfig(channelDB, number); - if (channel.length == 1) { - channel = channel[0]; + let channel = await channelService.getChannel(number); + if (channel != null) { res.send({ number: channel.number, icon: channel.icon, @@ -323,7 +318,7 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService }) router.get('/api/channelNumbers', async (req, res) => { try { - let channels = await channelDB.getAllChannelNumbers(); + let channels = await channelService.getAllChannelNumbers(); channels.sort( (a,b) => { return parseInt(a) - parseInt(b) } ); res.send(channels) } catch(err) { @@ -334,39 +329,27 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService // we urgently need an actual channel service router.post('/api/channel', async (req, res) => { try { - await m3uService.clearCache(); - cleanUpChannel(req.body); - onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) ); - await channelDB.saveChannel( req.body.number, req.body ); - channelCache.clear(); + await channelService.saveChannel( req.body.number, req.body ); res.send( { number: req.body.number} ) - updateXmltv() } catch(err) { console.error(err); - res.status(500).send("error"); + res.status(500).send("error"); } }) router.put('/api/channel', async (req, res) => { try { - await m3uService.clearCache(); - cleanUpChannel(req.body); - onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) ); - await channelDB.saveChannel( req.body.number, req.body ); - channelCache.clear(); + await channelService.saveChannel( req.body.number, req.body ); res.send( { number: req.body.number} ) - updateXmltv() } catch(err) { console.error(err); - res.status(500).send("error"); + res.status(500).send("error"); } + }) router.delete('/api/channel', async (req, res) => { try { - await m3uService.clearCache(); - await channelDB.deleteChannel( req.body.number ); - channelCache.clear(); + await channelService.deleteChannel(req.body.number); res.send( { number: req.body.number} ) - updateXmltv() } catch(err) { console.error(err); res.status(500).send("error"); @@ -1049,47 +1032,6 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService xmltvInterval.updateXML() xmltvInterval.restartInterval() } - - function cleanUpProgram(program) { - delete program.start - delete program.stop - delete program.streams; - delete program.durationStr; - delete program.commercials; - if ( - (typeof(program.duration) === 'undefined') - || - (program.duration <= 0) - ) { - console.error(`Input contained a program with invalid duration: ${program.duration}. This program has been deleted`); - return []; - } - if (! Number.isInteger(program.duration) ) { - console.error(`Input contained a program with invalid duration: ${program.duration}. Duration got fixed to be integer.`); - program.duration = Math.ceil(program.duration); - } - return [ program ]; - } - - function cleanUpChannel(channel) { - if ( - (typeof(channel.groupTitle) === 'undefined') - || - (channel.groupTitle === '') - ) { - channel.groupTitle = "dizqueTV"; - } - channel.programs = channel.programs.flatMap( cleanUpProgram ); - delete channel.fillerContent; - delete channel.filler; - channel.fallback = channel.fallback.flatMap( cleanUpProgram ); - channel.duration = 0; - for (let i = 0; i < channel.programs.length; i++) { - channel.duration += channel.programs[i].duration; - } - - } - async function streamToolResult(toolRes, res) { let programs = toolRes.programs; delete toolRes.programs; diff --git a/src/channel-cache.js b/src/channel-cache.js index cf2568a..5c7b624 100644 --- a/src/channel-cache.js +++ b/src/channel-cache.js @@ -24,7 +24,7 @@ async function getChannelConfig(channelDB, channelId) { async function getAllNumbers(channelDB) { if (numbers === null) { - let n = channelDB.getAllChannelNumbers(); + let n = await channelDB.getAllChannelNumbers(); numbers = n; } return numbers; @@ -32,15 +32,41 @@ async function getAllNumbers(channelDB) { async function getAllChannels(channelDB) { let channelNumbers = await getAllNumbers(channelDB); - return await Promise.all( channelNumbers.map( async (x) => { + return (await Promise.all( channelNumbers.map( async (x) => { return (await getChannelConfig(channelDB, x))[0]; - }) ); + }) )).filter( (channel) => { + if (channel == null) { + console.error("Found a null channel " + JSON.stringify(channelNumbers) ); + return false; + } + if ( typeof(channel) === "undefined") { + console.error("Found a undefined channel " + JSON.stringify(channelNumbers) ); + return false; + } + if ( typeof(channel.number) === "undefined") { + console.error("Found a channel without number " + JSON.stringify(channelNumbers) ); + return false; + } + + return true; + } ); } function saveChannelConfig(number, channel ) { configCache[number] = [channel]; - delete cache[number]; + + // flush the item played cache for the channel and any channel in its + // redirect chain + if (typeof(cache[number]) !== 'undefined') { + let lineupItem = cache[number].lineupItem; + for (let i = 0; i < lineupItem.redirectChannels.length; i++) { + delete cache[ lineupItem.redirectChannels[i].number ]; + } + delete cache[number]; + + } + numbers = null; } function getCurrentLineupItem(channelId, t1) { diff --git a/src/constants.js b/src/constants.js index 6beaba8..b2563f6 100644 --- a/src/constants.js +++ b/src/constants.js @@ -5,6 +5,12 @@ module.exports = { TVGUIDE_MAXIMUM_FLEX_DURATION : 6 * 60 * 60 * 1000, TOO_FREQUENT: 100, + //when a channel is forcibly stopped due to an update, let's mark it as active + // for a while during the transaction just in case. + CHANNEL_STOP_SHIELD : 5000, + + START_CHANNEL_GRACE_PERIOD: 15 * 1000, + // if a channel is stopped while something is playing, subtract // this amount of milliseconds from the last-played timestamp, because // video playback has latency and also because maybe the user wants diff --git a/src/dao/filler-db.js b/src/dao/filler-db.js index fcd2d3e..262913b 100644 --- a/src/dao/filler-db.js +++ b/src/dao/filler-db.js @@ -4,11 +4,10 @@ let fs = require('fs'); class FillerDB { - constructor(folder, channelDB, channelCache) { + constructor(folder, channelService) { this.folder = folder; this.cache = {}; - this.channelDB = channelDB; - this.channelCache = channelCache; + this.channelService = channelService; } @@ -79,10 +78,10 @@ class FillerDB { } async getFillerChannels(id) { - let numbers = await this.channelDB.getAllChannelNumbers(); + let numbers = await this.channelService.getAllChannelNumbers(); let channels = []; await Promise.all( numbers.map( async(number) => { - let ch = await this.channelDB.getChannel(number); + let ch = await this.channelService.getChannel(number); let name = ch.name; let fillerCollections = ch.fillerCollections; for (let i = 0 ; i < fillerCollections.length; i++) { @@ -105,13 +104,13 @@ class FillerDB { let channels = await this.getFillerChannels(id); await Promise.all( channels.map( async(channel) => { console.log(`Updating channel ${channel.number} , remove filler: ${id}`); - let json = await channelDB.getChannel(channel.number); + let json = await channelService.getChannel(channel.number); json.fillerCollections = json.fillerCollections.filter( (col) => { return col.id != id; } ); - await this.channelDB.saveChannel( channel.number, json ); + await this.channelService.saveChannel( channel.number, json ); } ) ); - this.channelCache.clear(); + let f = path.join(this.folder, `${id}.json` ); await new Promise( (resolve, reject) => { fs.unlink(f, function (err) { diff --git a/src/dao/plex-server-db.js b/src/dao/plex-server-db.js index 02d19ed..ed5853f 100644 --- a/src/dao/plex-server-db.js +++ b/src/dao/plex-server-db.js @@ -4,20 +4,21 @@ const ICON_REGEX = /https?:\/\/.*(\/library\/metadata\/\d+\/thumb\/\d+).X-Plex-T const ICON_FIELDS = ["icon", "showIcon", "seasonIcon", "episodeIcon"]; +// DB is a misnomer here, this is closer to a service class PlexServerDB { - constructor(channelDB, channelCache, fillerDB, showDB, db) { - this.channelDB = channelDB; + constructor(channelService, fillerDB, showDB, db) { + this.channelService = channelService; this.db = db; - this.channelCache = channelCache; + this.fillerDB = fillerDB; this.showDB = showDB; } async fixupAllChannels(name, newServer) { - let channelNumbers = await this.channelDB.getAllChannelNumbers(); + let channelNumbers = await this.channelService.getAllChannelNumbers(); let report = await Promise.all( channelNumbers.map( async (i) => { - let channel = await this.channelDB.getChannel(i); + let channel = await this.channelService.getChannel(i); let channelReport = { channelNumber : channel.number, channelName : channel.name, @@ -38,10 +39,10 @@ class PlexServerDB } } this.fixupProgramArray(channel.fallback, name,newServer, channelReport); - await this.channelDB.saveChannel(i, channel); + await this.channelService.saveChannel(i, channel); return channelReport; }) ); - this.channelCache.clear(); + return report; } diff --git a/src/services/active-channel-service.js b/src/services/active-channel-service.js index 75d7293..c64bd80 100644 --- a/src/services/active-channel-service.js +++ b/src/services/active-channel-service.js @@ -10,12 +10,11 @@ class ActiveChannelService /**** * **/ - constructor(onDemandService, channelCache, channelDB) { - console.log("DEFINE THIS.CACHE"); + constructor(onDemandService, channelService ) { this.cache = {}; - this.channelDB = channelDB; this.onDemandService = onDemandService; - this.channelCache = channelCache; + this.onDemandService.setActiveChannelService(this); + this.channelService = channelService; this.timeNoDelta = new Date().getTime(); this.loadChannelsForFirstTry(); @@ -25,7 +24,7 @@ class ActiveChannelService loadChannelsForFirstTry() { let fun = async() => { try { - let numbers = await this.channelCache.getAllNumbers(this.channelDB); + let numbers = await this.channelService.getAllChannelNumbers(); numbers.forEach( (number) => { this.ensure(this.timeNoDelta, number); } ); @@ -86,20 +85,29 @@ class ActiveChannelService registerChannelActive(t, channelNumber) { this.ensure(t, channelNumber); - console.log("Register that channel is being played: " + channelNumber ); + if (this.cache[channelNumber].active === 0) { + console.log("Channel is being played: " + channelNumber ); + } this.cache[channelNumber].active++; + //console.log(channelNumber + " ++active=" + this.cache[channelNumber].active ); this.cache[channelNumber].stopTime = 0; this.cache[channelNumber].lastUpdate = new Date().getTime(); } registerChannelStopped(t, channelNumber) { this.ensure(t, channelNumber); - console.log("Register that channel is no longer being played: " + channelNumber ); + if (this.cache[channelNumber].active === 1) { + console.log("Register that channel is no longer being played: " + channelNumber ); + } if (this.cache[channelNumber].active === 0) { console.error("Serious issue with channel active service, double delete"); } else { this.cache[channelNumber].active--; - this.cache[channelNumber].stopTime = t; + //console.log(channelNumber + " --active=" + this.cache[channelNumber].active ); + let s = this.cache[channelNumber].stopTime; + if ( (typeof(s) === 'undefined') || (s < t) ) { + this.cache[channelNumber].stopTime = t; + } this.cache[channelNumber].lastUpdate = new Date().getTime(); } } @@ -131,7 +139,6 @@ class ActiveChannelService isActive(channelNumber) { let bol = this.isActiveWrapped(channelNumber); - console.log( "channelNumber = " + channelNumber + " active? " + bol); return bol; diff --git a/src/services/channel-service.js b/src/services/channel-service.js new file mode 100644 index 0000000..b981f9c --- /dev/null +++ b/src/services/channel-service.js @@ -0,0 +1,101 @@ +const events = require('events') +const channelCache = require("../channel-cache"); + +class ChannelService extends events.EventEmitter { + + constructor(channelDB) { + super(); + this.channelDB = channelDB; + this.onDemandService = null; + } + + setOnDemandService(onDemandService) { + this.onDemandService = onDemandService; + } + + async saveChannel(number, channelJson, options) { + + let channel = cleanUpChannel(channelJson); + if ( + (this.onDemandService != null) + && + ( (typeof(options) === 'undefined') || (options.ignoreOnDemand !== true) ) + ) { + this.onDemandService.fixupChannelBeforeSave( channel ); + } + channelCache.saveChannelConfig( number, channel); + await channelDB.saveChannel( number, channel ); + + this.emit('channel-update', { channelNumber: number, channel: channel} ); + } + + async deleteChannel(number) { + await channelDB.deleteChannel( number ); + this.emit('channel-update', { channelNumber: number, channel: null} ); + + channelCache.clear(); + } + + async getChannel(number) { + let lis = await channelCache.getChannelConfig(this.channelDB, number) + if ( lis == null || lis.length !== 1) { + return null; + } + return lis[0]; + } + + async getAllChannelNumbers() { + return await channelCache.getAllNumbers(this.channelDB); + } + + async getAllChannels() { + return await channelCache.getAllChannels(this.channelDB); + } + + +} + + +function cleanUpProgram(program) { + delete program.start + delete program.stop + delete program.streams; + delete program.durationStr; + delete program.commercials; + if ( + (typeof(program.duration) === 'undefined') + || + (program.duration <= 0) + ) { + console.error(`Input contained a program with invalid duration: ${program.duration}. This program has been deleted`); + return []; + } + if (! Number.isInteger(program.duration) ) { + console.error(`Input contained a program with invalid duration: ${program.duration}. Duration got fixed to be integer.`); + program.duration = Math.ceil(program.duration); + } + return [ program ]; +} + +function cleanUpChannel(channel) { + if ( + (typeof(channel.groupTitle) === 'undefined') + || + (channel.groupTitle === '') + ) { + channel.groupTitle = "dizqueTV"; + } + channel.programs = channel.programs.flatMap( cleanUpProgram ); + delete channel.fillerContent; + delete channel.filler; + channel.fallback = channel.fallback.flatMap( cleanUpProgram ); + channel.duration = 0; + for (let i = 0; i < channel.programs.length; i++) { + channel.duration += channel.programs[i].duration; + } + return channel; + +} + + +module.exports = ChannelService \ No newline at end of file diff --git a/src/services/m3u-service.js b/src/services/m3u-service.js index f3563b5..c58c536 100644 --- a/src/services/m3u-service.js +++ b/src/services/m3u-service.js @@ -4,11 +4,13 @@ * @class M3uService */ class M3uService { - constructor(dataBase, fileCacheService, channelCache) { - this.dataBase = dataBase; + constructor(fileCacheService, channelService) { + this.channelService = channelService; this.cacheService = fileCacheService; - this.channelCache = channelCache; this.cacheReady = false; + this.channelService.on("channel-update", (data) => { + this.clearCache(); + } ); } /** @@ -37,7 +39,7 @@ class M3uService { return this.replaceHostOnM3u(host, cachedM3U); } } - let channels = await this.channelCache.getAllChannels(this.dataBase); + let channels = await this.channelService.getAllChannels(); channels.sort((a, b) => { diff --git a/src/services/on-demand-service.js b/src/services/on-demand-service.js index 22f9a6f..c2ac3cd 100644 --- a/src/services/on-demand-service.js +++ b/src/services/on-demand-service.js @@ -9,10 +9,14 @@ class OnDemandService /**** * **/ - constructor(channelCache, channelDB, xmltvInterval) { - this.channelCache = channelCache; - this.channelDB = channelDB; - this.xmltvInterval = xmltvInterval; + constructor(channelService) { + this.channelService = channelService; + this.channelService.setOnDemandService(this); + this.activeChannelService = null; + } + + setActiveChannelService(activeChannelService) { + this.activeChannelService = activeChannelService; } activateChannelIfNeeded(moment, channel) { @@ -25,12 +29,12 @@ class OnDemandService async registerChannelStopped(channelNumber, stopTime, waitForSave) { try { - let channel = await this.channelCache.getChannelConfig( this.channelDB, channelNumber); - if (channel.length === 0) { + let channel = await this.channelService.getChannel(channelNumber); + if (channel == null) { console.error("Could not stop channel " + channelNumber + " because it apparently no longer exists"); // I guess if someone deletes the channel just in the grace period? return } - channel = channel[0]; + if ( (typeof(channel.onDemand) !== 'undefined') && channel.onDemand.isOnDemand && ! channel.onDemand.paused) { //pause the channel channel = this.pauseOnDemandChannel( channel , stopTime ); @@ -46,10 +50,6 @@ class OnDemandService } - updateXmltv() { - this.xmltvInterval.updateXML() - this.xmltvInterval.restartInterval() - } pauseOnDemandChannel(originalChannel, stopTime) { @@ -98,20 +98,26 @@ class OnDemandService async updateChannelSync(channel) { try { - this.channelCache.saveChannelConfig(channel.number, channel ); - await this.channelDB.saveChannel(channel.number, channel); + await this.channelService.saveChannel( + channel.number, + channel, + {ignoreOnDemand: true} + ); console.log("Channel " + channel.number + " saved by on-demand service..."); - } catch (err) { + } catch (err) { console.error("Error saving resumed channel: " + channel.number, err); } } updateChannelAsync(channel) { this.updateChannelSync(channel); - this.updateXmltv(); } - fixupChannelBeforeSave(channel, isActive) { + fixupChannelBeforeSave(channel) { + let isActive = false; + if (this.activeChannelService != null && this.activeChannelService.isActive(channel.number) ) { + isActive = true; + } if (typeof(channel.onDemand) === 'undefined') { channel.onDemand = {}; } diff --git a/src/services/tv-guide-service.js b/src/services/tv-guide-service.js index 0a5ce93..09e3e8f 100644 --- a/src/services/tv-guide-service.js +++ b/src/services/tv-guide-service.js @@ -1,14 +1,15 @@ - +const events = require('events') const constants = require("../constants"); const FALLBACK_ICON = "https://raw.githubusercontent.com/vexorain/dizquetv/main/resources/dizquetv.png"; const throttle = require('./throttle'); -class TVGuideService +class TVGuideService extends events.EventEmitter { /**** * **/ constructor(xmltv, db, cacheImageService, eventService, i18next) { + super(); this.cached = null; this.lastUpdate = 0; this.updateTime = 0; @@ -42,7 +43,8 @@ class TVGuideService async refresh(t) { while( this.lastUpdate < t) { - if (this.currentUpdate == -1) { + await _wait(5000); + if ( ( this.lastUpdate < t) && (this.currentUpdate == -1) ) { this.currentUpdate = this.updateTime; this.currentLimit = this.updateLimit; this.currentChannels = this.updateChannels; @@ -61,7 +63,6 @@ class TVGuideService await this.buildIt(); } - await _wait(100); } return await this.get(); } @@ -74,7 +75,17 @@ class TVGuideService let arr = new Array( channel.programs.length + 1); arr[0] = 0; for (let i = 0; i < n; i++) { - arr[i+1] = arr[i] + channel.programs[i].duration; + let d = channel.programs[i].duration; + if (d == 0) { + console.log("Found program with duration 0, correcting it"); + d = 1; + } + if (! Number.isInteger(d) ) { + console.log( `Found program in channel ${channel.number} with non-integer duration ${d}, correcting it`); + d = Math.ceil(d); + } + channel.programs[i].duration = d; + arr[i+1] = arr[i] + d; await this._throttle(); } return arr; @@ -108,6 +119,17 @@ class TVGuideService if (typeof(accumulate) === 'undefined') { throw Error(channel.number + " wasn't preprocesed correctly???!?"); } + if (accumulate[channel.programs.length] === 0) { + console.log("[tv-guide] for some reason the total channel length is 0"); + return { + index : -1, + start: t, + program: { + isOffline: true, + duration: 15*60*1000, + } + } + } let hi = channel.programs.length; let lo = 0; let d = (t - s) % (accumulate[channel.programs.length]); @@ -121,9 +143,18 @@ class TVGuideService } } - if (epoch + accumulate[lo+1] <= t) { - throw Error("General algorithm error, completely unexpected"); + if ( (lo < 0) || (lo >= channel.programs.length) || (accumulate[lo+1] <= d) ) { + console.log("[tv-guide] The binary search algorithm is messed up. Replacing with flex..."); + return { + index : -1, + start: t, + program: { + isOffline: true, + duration: 15*60*1000, + } + } } + await this._throttle(); return { index: lo, @@ -177,11 +208,24 @@ class TVGuideService console.error("Redirrect to an unknown channel found! Involved channels = " + JSON.stringify(depth) ); } else { let otherPlaying = await this.getChannelPlaying( channel2, undefined, t, depth ); - let start = Math.max(playing.start, otherPlaying.start); - let duration = Math.min( - (playing.start + playing.program.duration) - start, - (otherPlaying.start + otherPlaying.program.duration) - start - ); + let a1 = playing.start; + let b1 = a1 + playing.program.duration; + + let a2 = otherPlaying.start; + let b2 = a2 + otherPlaying.program.duration; + + if ( !(a1 <= t && t < b1) ) { + console.error("[tv-guide] algorithm error1 : " + a1 + ", " + t + ", " + b1 ); + } + if ( !(a2 <= t && t < b2) ) { + console.error("[tv-guide] algorithm error2 : " + a2 + ", " + t + ", " + b2 ); + } + + let a = Math.max( a1, a2 ); + let b = Math.min( b1, b2 ); + + let start = a; + let duration = b - a; let program2 = clone( otherPlaying.program ); program2.duration = duration; playing = { @@ -266,7 +310,12 @@ class TVGuideService x.program.duration -= d; } if (x.program.duration == 0) { - console.error("There's a program with duration 0?"); + console.error(channel.number + " There's a program with duration 0? " + JSON.stringify(x.program) + " ; " + t1 ); + x.program.duration = 5 * 60 * 1000; + } else if ( ! Number.isInteger( x.program.duration ) ) { + console.error(channel.number + " There's a program with non-integer duration?? " + JSON.stringify(x.program) + " ; " + t1 ); + x.program = JSON.parse( JSON.stringify(x.program) ); + x.program.duration = Math.ceil(x.program.duration ); } } result.programs = []; @@ -352,14 +401,18 @@ class TVGuideService return result; } - async buildIt() { + async buildIt(lastRetry) { try { this.cached = await this.buildItManaged(); console.log("Internal TV Guide data refreshed at " + (new Date()).toLocaleString() ); await this.refreshXML(); } catch(err) { console.error("Unable to update internal guide data", err); - await _wait(100); + let w = 100; + if (typeof(lastRetry) !== 'undefined') { + w = Math.min(w*2, 5 * 60 * 1000); + } + await _wait(w); console.error("Retrying TV guide..."); await this.buildIt(); @@ -374,6 +427,7 @@ class TVGuideService let xmltvSettings = this.db['xmltv-settings'].find()[0]; await this.xmltv.WriteXMLTV(this.cached, xmltvSettings, async() => await this._throttle(), this.cacheImageService); let t = "" + ( (new Date()) ); + this.emit("xmltv-updated", { time: t } ); eventService.push( "xmltv", { diff --git a/src/video.js b/src/video.js index 041cb48..39dab90 100644 --- a/src/video.js +++ b/src/video.js @@ -2,18 +2,23 @@ const express = require('express') const helperFuncs = require('./helperFuncs') const FFMPEG = require('./ffmpeg') const FFMPEG_TEXT = require('./ffmpegText') -const PlexTranscoder = require('./plexTranscoder') const fs = require('fs') const ProgramPlayer = require('./program-player'); const channelCache = require('./channel-cache') const wereThereTooManyAttempts = require('./throttler'); const constants = require('./constants'); -module.exports = { router: video } +module.exports = { router: video, shutdown: shutdown } let StreamCount = 0; -function video( channelDB , fillerDB, db, programmingService, activeChannelService ) { +let stopPlayback = false; + +async function shutdown() { + stopPlayback = true; +} + +function video( channelService, fillerDB, db, programmingService, activeChannelService ) { var router = express.Router() router.get('/setup', (req, res) => { @@ -47,18 +52,22 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi }) // Continuously stream video to client. Leverage ffmpeg concat for piecing together videos let concat = async (req, res, audioOnly) => { + if (stopPlayback) { + res.status(503).send("Server is shutting down.") + return; + } + // Check if channel queried is valid if (typeof req.query.channel === 'undefined') { res.status(500).send("No Channel Specified") return } let number = parseInt(req.query.channel, 10); - let channel = await channelCache.getChannelConfig(channelDB, number); - if (channel.length === 0) { + let channel = await channelService.getChannel(number); + if (channel == null) { res.status(500).send("Channel doesn't exist") return } - channel = channel[0] let ffmpegSettings = db['ffmpeg-settings'].find()[0] @@ -123,6 +132,11 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi // Stream individual video to ffmpeg concat above. This is used by the server, NOT the client router.get('/stream', async (req, res) => { + if (stopPlayback) { + res.status(503).send("Server is shutting down.") + return; + } + // Check if channel queried is valid res.on("error", (e) => { console.error("There was an unexpected error in stream.", e); @@ -137,9 +151,9 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi let session = parseInt(req.query.session); let m3u8 = (req.query.m3u8 === '1'); let number = parseInt(req.query.channel); - let channel = await channelCache.getChannelConfig(channelDB, number); + let channel = await channelService.getChannel( number); - if (channel.length === 0) { + if (channel == null) { res.status(404).send("Channel doesn't exist") return } @@ -152,7 +166,6 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi if ( (typeof req.query.first !== 'undefined') && (req.query.first=='1') ) { isFirst = true; } - channel = channel[0] let ffmpegSettings = db['ffmpeg-settings'].find()[0] @@ -181,12 +194,14 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi duration: 40, start: 0, }; - } else if (lineupItem == null) { + } else if (lineupItem != null) { + redirectChannels = lineupItem.redirectChannels; + } else { prog = programmingService.getCurrentProgramAndTimeElapsed(t0, channel); activeChannelService.peekChannel(t0, channel.number); while (true) { - redirectChannels.push( brandChannel ); + redirectChannels.push( helperFuncs.generateChannelContext(brandChannel) ); upperBounds.push( prog.program.duration - prog.timeElapsed ); if ( !(prog.program.isOffline) || (prog.program.type != 'redirect') ) { @@ -203,9 +218,9 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi let newChannelNumber= prog.program.channel; - let newChannel = await channelCache.getChannelConfig(channelDB, newChannelNumber); + let newChannel = await channelService.getChannel(newChannelNumber); - if (newChannel.length == 0) { + if (newChannel == null) { let err = Error("Invalid redirect to a channel that doesn't exist"); console.error("Invalid redirect to channel that doesn't exist.", err); prog = { @@ -218,7 +233,6 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi } continue; } - newChannel = newChannel[0]; brandChannel = newChannel; lineupItem = channelCache.getCurrentLineupItem( newChannel.number, t0); if (lineupItem != null) { @@ -269,6 +283,7 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi //adjust upper bounds and record playbacks for (let i = redirectChannels.length-1; i >= 0; i--) { lineupItem = JSON.parse( JSON.stringify(lineupItem )); + lineupItem.redirectChannels = redirectChannels; let u = upperBounds[i] + beginningOffset; if (typeof(u) !== 'undefined') { let u2 = upperBound; @@ -335,6 +350,8 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi 'Content-Type': 'video/mp2t' }); + shieldActiveChannels(redirectChannels, t0, constants.START_CHANNEL_GRACE_PERIOD); + let t1; try { @@ -367,9 +384,29 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi for (let i = redirectChannels.length-1; i >= 0; i--) { activeChannelService.registerChannelActive(t0, redirectChannels[i].number); } - + let listener = (data) => { + let shouldStop = false; + try { + for (let i = 0; i < redirectChannels.length; i++) { + if (redirectChannels[i].number == data.channelNumber) { + shouldStop = true; + } + } + if (shouldStop) { + console.log("Playing channel has received an update."); + shieldActiveChannels( redirectChannels, t0, constants.CHANNEL_STOP_SHIELD ) + setTimeout(stop, 100); + } + } catch (error) { + console.err("Unexpected error when processing channel change during playback", error); + } + + }; + channelService.on("channel-update", listener); + let oldStop = stop; stop = () => { + channelService.removeListener("channel-update", listener); if (!stopDetected) { stopDetected = true; let t1 = new Date().getTime(); @@ -403,6 +440,12 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi router.get('/m3u8', async (req, res) => { + if (stopPlayback) { + res.status(503).send("Server is shutting down.") + return; + } + + let sessionId = StreamCount++; //res.type('application/vnd.apple.mpegurl') @@ -415,8 +458,8 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi } let channelNum = parseInt(req.query.channel, 10) - let channel = await channelCache.getChannelConfig(channelDB, channelNum ); - if (channel.length === 0) { + let channel = await channelService.getChannel(channelNum ); + if (channel == null) { res.status(500).send("Channel doesn't exist") return } @@ -451,6 +494,12 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi res.send(data) }) router.get('/playlist', async (req, res) => { + if (stopPlayback) { + res.status(503).send("Server is shutting down.") + return; + } + + res.type('text') // Check if channel queried is valid @@ -460,8 +509,8 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi } let channelNum = parseInt(req.query.channel, 10) - let channel = await channelCache.getChannelConfig(channelDB, channelNum ); - if (channel.length === 0) { + let channel = await channelService.getChannel(channelNum ); + if (channel == null) { res.status(500).send("Channel doesn't exist") return } @@ -496,10 +545,25 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi res.send(data) }) + let shieldActiveChannels = (channelList, t0, timeout) => { + // because of channel redirects, it's possible that multiple channels + // are being played at once. Mark all of them as being played + // this is a grave period of 30 + //mark all channels being played as active: + for (let i = channelList.length-1; i >= 0; i--) { + activeChannelService.registerChannelActive(t0, channelList[i].number); + } + setTimeout( () => { + for (let i = channelList.length-1; i >= 0; i--) { + activeChannelService.registerChannelStopped(t0, channelList[i].number); + } + }, timeout ); + } + let mediaPlayer = async(channelNum, path, req, res) => { - let channel = await channelCache.getChannelConfig(channelDB, channelNum ); - if (channel.length === 0) { + let channel = await channelService.getChannel(channelNum ); + if (channel === null) { res.status(404).send("Channel not found."); return; }