Merge pull request #355 from vexorian/20210809_dev

20210809 dev
This commit is contained in:
vexorian 2021-08-09 00:24:12 -04:00 committed by GitHub
commit 247c6902e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 446 additions and 185 deletions

View File

@ -15,10 +15,10 @@ const video = require('./src/video')
const HDHR = require('./src/hdhr')
const FileCacheService = require('./src/services/file-cache-service');
const CacheImageService = require('./src/services/cache-image-service');
const ChannelService = require("./src/services/channel-service");
const xmltv = require('./src/xmltv')
const Plex = require('./src/plex');
const channelCache = require('./src/channel-cache');
const constants = require('./src/constants')
const ChannelDB = require("./src/dao/channel-db");
const M3uService = require("./src/services/m3u-service");
@ -81,20 +81,26 @@ if(!fs.existsSync(path.join(process.env.DATABASE, 'cache','images'))) {
channelDB = new ChannelDB( path.join(process.env.DATABASE, 'channels') );
fillerDB = new FillerDB( path.join(process.env.DATABASE, 'filler') , channelDB, channelCache );
customShowDB = new CustomShowDB( path.join(process.env.DATABASE, 'custom-shows') );
db.connect(process.env.DATABASE, ['channels', 'plex-servers', 'ffmpeg-settings', 'plex-settings', 'xmltv-settings', 'hdhr-settings', 'db-version', 'client-id', 'cache-images', 'settings'])
initDB(db, channelDB)
channelService = new ChannelService(channelDB);
fillerDB = new FillerDB( path.join(process.env.DATABASE, 'filler') , channelService );
customShowDB = new CustomShowDB( path.join(process.env.DATABASE, 'custom-shows') );
fileCache = new FileCacheService( path.join(process.env.DATABASE, 'cache') );
cacheImageService = new CacheImageService(db, fileCache);
m3uService = new M3uService(channelDB, fileCache, channelCache)
m3uService = new M3uService(fileCache, channelService)
onDemandService = new OnDemandService(channelService);
programmingService = new ProgrammingService(onDemandService);
activeChannelService = new ActiveChannelService(onDemandService, channelService);
eventService = new EventService();
initDB(db, channelDB)
i18next
.use(i18nextBackend)
.use(i18nextMiddleware.LanguageDetector)
@ -118,29 +124,27 @@ let xmltvInterval = {
interval: null,
lastRefresh: null,
updateXML: async () => {
let getChannelsCached = async() => {
let channelNumbers = await channelDB.getAllChannelNumbers();
return await Promise.all( channelNumbers.map( async (x) => {
return (await channelCache.getChannelConfig(channelDB, x))[0];
}) );
}
let channels = [];
try {
channels = await getChannelsCached();
channels = await channelService.getAllChannels();
let xmltvSettings = db['xmltv-settings'].find()[0];
let t = guideService.prepareRefresh(channels, xmltvSettings.cache*60*60*1000);
channels = null;
await guideService.refresh(t);
xmltvInterval.lastRefresh = new Date()
console.log('XMLTV Updated at ', xmltvInterval.lastRefresh.toLocaleString());
guideService.refresh(t);
} catch (err) {
console.error("Unable to update TV guide?", err);
return;
}
channels = await getChannelsCached();
},
notifyPlex: async() => {
xmltvInterval.lastRefresh = new Date()
console.log('XMLTV Updated at ', xmltvInterval.lastRefresh.toLocaleString());
channels = await channelService.getAllChannels();
let plexServers = db['plex-servers'].find()
for (let i = 0, l = plexServers.length; i < l; i++) { // Foreach plex server
@ -171,6 +175,7 @@ let xmltvInterval = {
}
}
},
startInterval: () => {
let xmltvSettings = db['xmltv-settings'].find()[0]
if (xmltvSettings.refresh !== 0) {
@ -190,12 +195,30 @@ let xmltvInterval = {
}
}
guideService.on("xmltv-updated", (data) => {
try {
xmltvInterval.notifyPlex();
} catch (err) {
console.error("Unexpected issue when reacting to xmltv update", err);
}
} );
xmltvInterval.updateXML()
xmltvInterval.startInterval()
onDemandService = new OnDemandService(channelCache, channelDB, xmltvInterval);
programmingService = new ProgrammingService(onDemandService);
activeChannelService = new ActiveChannelService(onDemandService, channelCache, channelDB);
//setup xmltv update
channelService.on("channel-update", (data) => {
try {
console.log("Updating TV Guide due to channel update...");
//TODO: this could be smarter, like avoid updating 3 times if the channel was saved three times in a short time interval...
xmltvInterval.updateXML()
xmltvInterval.restartInterval()
} catch (err) {
console.error("Unexpected error issuing TV Guide udpate", err);
}
} );
let hdhr = HDHR(db, channelDB)
let app = express()
@ -238,10 +261,10 @@ app.use('/favicon.svg', express.static(
app.use('/custom.css', express.static(path.join(process.env.DATABASE, 'custom.css')))
// API Routers
app.use(api.router(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService, onDemandService, activeChannelService ))
app.use(api.router(db, channelService, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService ))
app.use('/api/cache/images', cacheImageService.apiRouters())
app.use(video.router( channelDB, fillerDB, db, programmingService, activeChannelService ))
app.use(video.router( channelService, fillerDB, db, programmingService, activeChannelService ))
app.use(hdhr.router)
app.listen(process.env.PORT, () => {
console.log(`HTTP server running on port: http://*:${process.env.PORT}`)
@ -338,3 +361,6 @@ onShutdown("active-channels", [], async() => {
await activeChannelService.shutdown();
} );
onShutdown("video", [], async() => {
await video.shutdown();
} );

View File

@ -3,7 +3,6 @@ const express = require('express')
const path = require('path')
const fs = require('fs')
const databaseMigration = require('./database-migration');
const channelCache = require('./channel-cache')
const constants = require('./constants');
const JSONStream = require('JSONStream');
const FFMPEGInfo = require('./ffmpeg-info');
@ -26,10 +25,10 @@ function safeString(object) {
}
module.exports = { router: api }
function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService, onDemandService, activeChannelService ) {
function api(db, channelService, fillerDB, customShowDB, xmltvInterval, guideService, _m3uService, eventService ) {
let m3uService = _m3uService;
const router = express.Router()
const plexServerDB = new PlexServerDB(channelDB, channelCache, fillerDB, customShowDB, db);
const plexServerDB = new PlexServerDB(channelService, fillerDB, customShowDB, db);
router.get('/api/version', async (req, res) => {
try {
@ -222,7 +221,7 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
// Channels
router.get('/api/channels', async (req, res) => {
try {
let channels = await channelDB.getAllChannels();
let channels = await channelService.getAllChannelNumbers();
channels.sort((a, b) => { return a.number < b.number ? -1 : 1 })
res.send(channels)
} catch(err) {
@ -233,10 +232,9 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
router.get('/api/channel/:number', async (req, res) => {
try {
let number = parseInt(req.params.number, 10);
let channel = await channelCache.getChannelConfig(channelDB, number);
let channel = await channelService.getChannel(number);
if (channel.length == 1) {
channel = channel[0];
if (channel != null) {
res.send(channel);
} else {
return res.status(404).send("Channel not found");
@ -249,10 +247,9 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
router.get('/api/channel/programless/:number', async (req, res) => {
try {
let number = parseInt(req.params.number, 10);
let channel = await channelCache.getChannelConfig(channelDB, number);
let channel = await channelService.getChannel(number);
if (channel.length == 1) {
channel = channel[0];
if (channel != null) {
let copy = {};
Object.keys(channel).forEach( (key) => {
if (key != 'programs') {
@ -272,10 +269,9 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
router.get('/api/channel/programs/:number', async (req, res) => {
try {
let number = parseInt(req.params.number, 10);
let channel = await channelCache.getChannelConfig(channelDB, number);
let channel = await channelService.getChannel(number);
if (channel.length == 1) {
channel = channel[0];
if (channel != null) {
let programs = channel.programs;
if (typeof(programs) === 'undefined') {
return res.status(404).send("Channel doesn't have programs?");
@ -304,9 +300,8 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
router.get('/api/channel/description/:number', async (req, res) => {
try {
let number = parseInt(req.params.number, 10);
let channel = await channelCache.getChannelConfig(channelDB, number);
if (channel.length == 1) {
channel = channel[0];
let channel = await channelService.getChannel(number);
if (channel != null) {
res.send({
number: channel.number,
icon: channel.icon,
@ -323,7 +318,7 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
})
router.get('/api/channelNumbers', async (req, res) => {
try {
let channels = await channelDB.getAllChannelNumbers();
let channels = await channelService.getAllChannelNumbers();
channels.sort( (a,b) => { return parseInt(a) - parseInt(b) } );
res.send(channels)
} catch(err) {
@ -334,39 +329,27 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
// we urgently need an actual channel service
router.post('/api/channel', async (req, res) => {
try {
await m3uService.clearCache();
cleanUpChannel(req.body);
onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) );
await channelDB.saveChannel( req.body.number, req.body );
channelCache.clear();
await channelService.saveChannel( req.body.number, req.body );
res.send( { number: req.body.number} )
updateXmltv()
} catch(err) {
console.error(err);
res.status(500).send("error");
res.status(500).send("error");
}
})
router.put('/api/channel', async (req, res) => {
try {
await m3uService.clearCache();
cleanUpChannel(req.body);
onDemandService.fixupChannelBeforeSave( req.body, activeChannelService.isActive(req.body.number) );
await channelDB.saveChannel( req.body.number, req.body );
channelCache.clear();
await channelService.saveChannel( req.body.number, req.body );
res.send( { number: req.body.number} )
updateXmltv()
} catch(err) {
console.error(err);
res.status(500).send("error");
res.status(500).send("error");
}
})
router.delete('/api/channel', async (req, res) => {
try {
await m3uService.clearCache();
await channelDB.deleteChannel( req.body.number );
channelCache.clear();
await channelService.deleteChannel(req.body.number);
res.send( { number: req.body.number} )
updateXmltv()
} catch(err) {
console.error(err);
res.status(500).send("error");
@ -1049,47 +1032,6 @@ function api(db, channelDB, fillerDB, customShowDB, xmltvInterval, guideService
xmltvInterval.updateXML()
xmltvInterval.restartInterval()
}
function cleanUpProgram(program) {
delete program.start
delete program.stop
delete program.streams;
delete program.durationStr;
delete program.commercials;
if (
(typeof(program.duration) === 'undefined')
||
(program.duration <= 0)
) {
console.error(`Input contained a program with invalid duration: ${program.duration}. This program has been deleted`);
return [];
}
if (! Number.isInteger(program.duration) ) {
console.error(`Input contained a program with invalid duration: ${program.duration}. Duration got fixed to be integer.`);
program.duration = Math.ceil(program.duration);
}
return [ program ];
}
function cleanUpChannel(channel) {
if (
(typeof(channel.groupTitle) === 'undefined')
||
(channel.groupTitle === '')
) {
channel.groupTitle = "dizqueTV";
}
channel.programs = channel.programs.flatMap( cleanUpProgram );
delete channel.fillerContent;
delete channel.filler;
channel.fallback = channel.fallback.flatMap( cleanUpProgram );
channel.duration = 0;
for (let i = 0; i < channel.programs.length; i++) {
channel.duration += channel.programs[i].duration;
}
}
async function streamToolResult(toolRes, res) {
let programs = toolRes.programs;
delete toolRes.programs;

View File

@ -24,7 +24,7 @@ async function getChannelConfig(channelDB, channelId) {
async function getAllNumbers(channelDB) {
if (numbers === null) {
let n = channelDB.getAllChannelNumbers();
let n = await channelDB.getAllChannelNumbers();
numbers = n;
}
return numbers;
@ -32,15 +32,41 @@ async function getAllNumbers(channelDB) {
async function getAllChannels(channelDB) {
let channelNumbers = await getAllNumbers(channelDB);
return await Promise.all( channelNumbers.map( async (x) => {
return (await Promise.all( channelNumbers.map( async (x) => {
return (await getChannelConfig(channelDB, x))[0];
}) );
}) )).filter( (channel) => {
if (channel == null) {
console.error("Found a null channel " + JSON.stringify(channelNumbers) );
return false;
}
if ( typeof(channel) === "undefined") {
console.error("Found a undefined channel " + JSON.stringify(channelNumbers) );
return false;
}
if ( typeof(channel.number) === "undefined") {
console.error("Found a channel without number " + JSON.stringify(channelNumbers) );
return false;
}
return true;
} );
}
function saveChannelConfig(number, channel ) {
configCache[number] = [channel];
delete cache[number];
// flush the item played cache for the channel and any channel in its
// redirect chain
if (typeof(cache[number]) !== 'undefined') {
let lineupItem = cache[number].lineupItem;
for (let i = 0; i < lineupItem.redirectChannels.length; i++) {
delete cache[ lineupItem.redirectChannels[i].number ];
}
delete cache[number];
}
numbers = null;
}
function getCurrentLineupItem(channelId, t1) {

View File

@ -5,6 +5,12 @@ module.exports = {
TVGUIDE_MAXIMUM_FLEX_DURATION : 6 * 60 * 60 * 1000,
TOO_FREQUENT: 100,
//when a channel is forcibly stopped due to an update, let's mark it as active
// for a while during the transaction just in case.
CHANNEL_STOP_SHIELD : 5000,
START_CHANNEL_GRACE_PERIOD: 15 * 1000,
// if a channel is stopped while something is playing, subtract
// this amount of milliseconds from the last-played timestamp, because
// video playback has latency and also because maybe the user wants

View File

@ -4,11 +4,10 @@ let fs = require('fs');
class FillerDB {
constructor(folder, channelDB, channelCache) {
constructor(folder, channelService) {
this.folder = folder;
this.cache = {};
this.channelDB = channelDB;
this.channelCache = channelCache;
this.channelService = channelService;
}
@ -79,10 +78,10 @@ class FillerDB {
}
async getFillerChannels(id) {
let numbers = await this.channelDB.getAllChannelNumbers();
let numbers = await this.channelService.getAllChannelNumbers();
let channels = [];
await Promise.all( numbers.map( async(number) => {
let ch = await this.channelDB.getChannel(number);
let ch = await this.channelService.getChannel(number);
let name = ch.name;
let fillerCollections = ch.fillerCollections;
for (let i = 0 ; i < fillerCollections.length; i++) {
@ -105,13 +104,13 @@ class FillerDB {
let channels = await this.getFillerChannels(id);
await Promise.all( channels.map( async(channel) => {
console.log(`Updating channel ${channel.number} , remove filler: ${id}`);
let json = await channelDB.getChannel(channel.number);
let json = await channelService.getChannel(channel.number);
json.fillerCollections = json.fillerCollections.filter( (col) => {
return col.id != id;
} );
await this.channelDB.saveChannel( channel.number, json );
await this.channelService.saveChannel( channel.number, json );
} ) );
this.channelCache.clear();
let f = path.join(this.folder, `${id}.json` );
await new Promise( (resolve, reject) => {
fs.unlink(f, function (err) {

View File

@ -4,20 +4,21 @@ const ICON_REGEX = /https?:\/\/.*(\/library\/metadata\/\d+\/thumb\/\d+).X-Plex-T
const ICON_FIELDS = ["icon", "showIcon", "seasonIcon", "episodeIcon"];
// DB is a misnomer here, this is closer to a service
class PlexServerDB
{
constructor(channelDB, channelCache, fillerDB, showDB, db) {
this.channelDB = channelDB;
constructor(channelService, fillerDB, showDB, db) {
this.channelService = channelService;
this.db = db;
this.channelCache = channelCache;
this.fillerDB = fillerDB;
this.showDB = showDB;
}
async fixupAllChannels(name, newServer) {
let channelNumbers = await this.channelDB.getAllChannelNumbers();
let channelNumbers = await this.channelService.getAllChannelNumbers();
let report = await Promise.all( channelNumbers.map( async (i) => {
let channel = await this.channelDB.getChannel(i);
let channel = await this.channelService.getChannel(i);
let channelReport = {
channelNumber : channel.number,
channelName : channel.name,
@ -38,10 +39,10 @@ class PlexServerDB
}
}
this.fixupProgramArray(channel.fallback, name,newServer, channelReport);
await this.channelDB.saveChannel(i, channel);
await this.channelService.saveChannel(i, channel);
return channelReport;
}) );
this.channelCache.clear();
return report;
}

View File

@ -2,6 +2,7 @@ module.exports = {
getCurrentProgramAndTimeElapsed: getCurrentProgramAndTimeElapsed,
createLineup: createLineup,
getWatermark: getWatermark,
generateChannelContext: generateChannelContext,
}
let channelCache = require('./channel-cache');
@ -10,6 +11,17 @@ const randomJS = require("random-js");
const Random = randomJS.Random;
const random = new Random( randomJS.MersenneTwister19937.autoSeed() );
const CHANNEL_CONTEXT_KEYS = [
"disableFillerOverlay",
"watermark",
"icon",
"offlinePicture",
"offlineSoundtrack",
"name",
"transcoding",
"number",
];
module.exports.random = random;
function getCurrentProgramAndTimeElapsed(date, channel) {
@ -260,6 +272,7 @@ function pickRandomWithMaxDuration(channel, fillers, maxDuration) {
}
}
// any channel thing used here should be added to channel context
function getWatermark( ffmpegSettings, channel, type) {
if (! ffmpegSettings.enableFFMPEGTranscoding || ffmpegSettings.disableChannelOverlay ) {
return null;
@ -301,3 +314,12 @@ function getWatermark( ffmpegSettings, channel, type) {
return result;
}
function generateChannelContext(channel) {
let channelContext = {};
for (let i = 0; i < CHANNEL_CONTEXT_KEYS.length; i++) {
let key = CHANNEL_CONTEXT_KEYS[i];
channelContext[key] = JSON.parse( JSON.stringify(channel[key] ) );
}
return channelContext;
}

View File

@ -10,12 +10,11 @@ class ActiveChannelService
/****
*
**/
constructor(onDemandService, channelCache, channelDB) {
console.log("DEFINE THIS.CACHE");
constructor(onDemandService, channelService ) {
this.cache = {};
this.channelDB = channelDB;
this.onDemandService = onDemandService;
this.channelCache = channelCache;
this.onDemandService.setActiveChannelService(this);
this.channelService = channelService;
this.timeNoDelta = new Date().getTime();
this.loadChannelsForFirstTry();
@ -25,7 +24,7 @@ class ActiveChannelService
loadChannelsForFirstTry() {
let fun = async() => {
try {
let numbers = await this.channelCache.getAllNumbers(this.channelDB);
let numbers = await this.channelService.getAllChannelNumbers();
numbers.forEach( (number) => {
this.ensure(this.timeNoDelta, number);
} );
@ -86,20 +85,29 @@ class ActiveChannelService
registerChannelActive(t, channelNumber) {
this.ensure(t, channelNumber);
console.log("Register that channel is being played: " + channelNumber );
if (this.cache[channelNumber].active === 0) {
console.log("Channel is being played: " + channelNumber );
}
this.cache[channelNumber].active++;
//console.log(channelNumber + " ++active=" + this.cache[channelNumber].active );
this.cache[channelNumber].stopTime = 0;
this.cache[channelNumber].lastUpdate = new Date().getTime();
}
registerChannelStopped(t, channelNumber) {
this.ensure(t, channelNumber);
console.log("Register that channel is no longer being played: " + channelNumber );
if (this.cache[channelNumber].active === 1) {
console.log("Register that channel is no longer being played: " + channelNumber );
}
if (this.cache[channelNumber].active === 0) {
console.error("Serious issue with channel active service, double delete");
} else {
this.cache[channelNumber].active--;
this.cache[channelNumber].stopTime = t;
//console.log(channelNumber + " --active=" + this.cache[channelNumber].active );
let s = this.cache[channelNumber].stopTime;
if ( (typeof(s) === 'undefined') || (s < t) ) {
this.cache[channelNumber].stopTime = t;
}
this.cache[channelNumber].lastUpdate = new Date().getTime();
}
}
@ -131,7 +139,6 @@ class ActiveChannelService
isActive(channelNumber) {
let bol = this.isActiveWrapped(channelNumber);
console.log( "channelNumber = " + channelNumber + " active? " + bol);
return bol;

View File

@ -0,0 +1,101 @@
const events = require('events')
const channelCache = require("../channel-cache");
class ChannelService extends events.EventEmitter {
constructor(channelDB) {
super();
this.channelDB = channelDB;
this.onDemandService = null;
}
setOnDemandService(onDemandService) {
this.onDemandService = onDemandService;
}
async saveChannel(number, channelJson, options) {
let channel = cleanUpChannel(channelJson);
if (
(this.onDemandService != null)
&&
( (typeof(options) === 'undefined') || (options.ignoreOnDemand !== true) )
) {
this.onDemandService.fixupChannelBeforeSave( channel );
}
channelCache.saveChannelConfig( number, channel);
await channelDB.saveChannel( number, channel );
this.emit('channel-update', { channelNumber: number, channel: channel} );
}
async deleteChannel(number) {
await channelDB.deleteChannel( number );
this.emit('channel-update', { channelNumber: number, channel: null} );
channelCache.clear();
}
async getChannel(number) {
let lis = await channelCache.getChannelConfig(this.channelDB, number)
if ( lis == null || lis.length !== 1) {
return null;
}
return lis[0];
}
async getAllChannelNumbers() {
return await channelCache.getAllNumbers(this.channelDB);
}
async getAllChannels() {
return await channelCache.getAllChannels(this.channelDB);
}
}
function cleanUpProgram(program) {
delete program.start
delete program.stop
delete program.streams;
delete program.durationStr;
delete program.commercials;
if (
(typeof(program.duration) === 'undefined')
||
(program.duration <= 0)
) {
console.error(`Input contained a program with invalid duration: ${program.duration}. This program has been deleted`);
return [];
}
if (! Number.isInteger(program.duration) ) {
console.error(`Input contained a program with invalid duration: ${program.duration}. Duration got fixed to be integer.`);
program.duration = Math.ceil(program.duration);
}
return [ program ];
}
function cleanUpChannel(channel) {
if (
(typeof(channel.groupTitle) === 'undefined')
||
(channel.groupTitle === '')
) {
channel.groupTitle = "dizqueTV";
}
channel.programs = channel.programs.flatMap( cleanUpProgram );
delete channel.fillerContent;
delete channel.filler;
channel.fallback = channel.fallback.flatMap( cleanUpProgram );
channel.duration = 0;
for (let i = 0; i < channel.programs.length; i++) {
channel.duration += channel.programs[i].duration;
}
return channel;
}
module.exports = ChannelService

View File

@ -4,11 +4,13 @@
* @class M3uService
*/
class M3uService {
constructor(dataBase, fileCacheService, channelCache) {
this.dataBase = dataBase;
constructor(fileCacheService, channelService) {
this.channelService = channelService;
this.cacheService = fileCacheService;
this.channelCache = channelCache;
this.cacheReady = false;
this.channelService.on("channel-update", (data) => {
this.clearCache();
} );
}
/**
@ -37,7 +39,7 @@ class M3uService {
return this.replaceHostOnM3u(host, cachedM3U);
}
}
let channels = await this.channelCache.getAllChannels(this.dataBase);
let channels = await this.channelService.getAllChannels();
channels.sort((a, b) => {

View File

@ -9,10 +9,14 @@ class OnDemandService
/****
*
**/
constructor(channelCache, channelDB, xmltvInterval) {
this.channelCache = channelCache;
this.channelDB = channelDB;
this.xmltvInterval = xmltvInterval;
constructor(channelService) {
this.channelService = channelService;
this.channelService.setOnDemandService(this);
this.activeChannelService = null;
}
setActiveChannelService(activeChannelService) {
this.activeChannelService = activeChannelService;
}
activateChannelIfNeeded(moment, channel) {
@ -25,12 +29,12 @@ class OnDemandService
async registerChannelStopped(channelNumber, stopTime, waitForSave) {
try {
let channel = await this.channelCache.getChannelConfig( this.channelDB, channelNumber);
if (channel.length === 0) {
let channel = await this.channelService.getChannel(channelNumber);
if (channel == null) {
console.error("Could not stop channel " + channelNumber + " because it apparently no longer exists"); // I guess if someone deletes the channel just in the grace period?
return
}
channel = channel[0];
if ( (typeof(channel.onDemand) !== 'undefined') && channel.onDemand.isOnDemand && ! channel.onDemand.paused) {
//pause the channel
channel = this.pauseOnDemandChannel( channel , stopTime );
@ -46,10 +50,6 @@ class OnDemandService
}
updateXmltv() {
this.xmltvInterval.updateXML()
this.xmltvInterval.restartInterval()
}
pauseOnDemandChannel(originalChannel, stopTime) {
@ -98,20 +98,26 @@ class OnDemandService
async updateChannelSync(channel) {
try {
this.channelCache.saveChannelConfig(channel.number, channel );
await this.channelDB.saveChannel(channel.number, channel);
await this.channelService.saveChannel(
channel.number,
channel,
{ignoreOnDemand: true}
);
console.log("Channel " + channel.number + " saved by on-demand service...");
} catch (err) {
} catch (err) {
console.error("Error saving resumed channel: " + channel.number, err);
}
}
updateChannelAsync(channel) {
this.updateChannelSync(channel);
this.updateXmltv();
}
fixupChannelBeforeSave(channel, isActive) {
fixupChannelBeforeSave(channel) {
let isActive = false;
if (this.activeChannelService != null && this.activeChannelService.isActive(channel.number) ) {
isActive = true;
}
if (typeof(channel.onDemand) === 'undefined') {
channel.onDemand = {};
}

View File

@ -1,14 +1,15 @@
const events = require('events')
const constants = require("../constants");
const FALLBACK_ICON = "https://raw.githubusercontent.com/vexorain/dizquetv/main/resources/dizquetv.png";
const throttle = require('./throttle');
class TVGuideService
class TVGuideService extends events.EventEmitter
{
/****
*
**/
constructor(xmltv, db, cacheImageService, eventService, i18next) {
super();
this.cached = null;
this.lastUpdate = 0;
this.updateTime = 0;
@ -42,7 +43,8 @@ class TVGuideService
async refresh(t) {
while( this.lastUpdate < t) {
if (this.currentUpdate == -1) {
await _wait(5000);
if ( ( this.lastUpdate < t) && (this.currentUpdate == -1) ) {
this.currentUpdate = this.updateTime;
this.currentLimit = this.updateLimit;
this.currentChannels = this.updateChannels;
@ -61,7 +63,6 @@ class TVGuideService
await this.buildIt();
}
await _wait(100);
}
return await this.get();
}
@ -74,7 +75,17 @@ class TVGuideService
let arr = new Array( channel.programs.length + 1);
arr[0] = 0;
for (let i = 0; i < n; i++) {
arr[i+1] = arr[i] + channel.programs[i].duration;
let d = channel.programs[i].duration;
if (d == 0) {
console.log("Found program with duration 0, correcting it");
d = 1;
}
if (! Number.isInteger(d) ) {
console.log( `Found program in channel ${channel.number} with non-integer duration ${d}, correcting it`);
d = Math.ceil(d);
}
channel.programs[i].duration = d;
arr[i+1] = arr[i] + d;
await this._throttle();
}
return arr;
@ -108,6 +119,17 @@ class TVGuideService
if (typeof(accumulate) === 'undefined') {
throw Error(channel.number + " wasn't preprocesed correctly???!?");
}
if (accumulate[channel.programs.length] === 0) {
console.log("[tv-guide] for some reason the total channel length is 0");
return {
index : -1,
start: t,
program: {
isOffline: true,
duration: 15*60*1000,
}
}
}
let hi = channel.programs.length;
let lo = 0;
let d = (t - s) % (accumulate[channel.programs.length]);
@ -121,9 +143,18 @@ class TVGuideService
}
}
if (epoch + accumulate[lo+1] <= t) {
throw Error("General algorithm error, completely unexpected");
if ( (lo < 0) || (lo >= channel.programs.length) || (accumulate[lo+1] <= d) ) {
console.log("[tv-guide] The binary search algorithm is messed up. Replacing with flex...");
return {
index : -1,
start: t,
program: {
isOffline: true,
duration: 15*60*1000,
}
}
}
await this._throttle();
return {
index: lo,
@ -177,11 +208,24 @@ class TVGuideService
console.error("Redirrect to an unknown channel found! Involved channels = " + JSON.stringify(depth) );
} else {
let otherPlaying = await this.getChannelPlaying( channel2, undefined, t, depth );
let start = Math.max(playing.start, otherPlaying.start);
let duration = Math.min(
(playing.start + playing.program.duration) - start,
(otherPlaying.start + otherPlaying.program.duration) - start
);
let a1 = playing.start;
let b1 = a1 + playing.program.duration;
let a2 = otherPlaying.start;
let b2 = a2 + otherPlaying.program.duration;
if ( !(a1 <= t && t < b1) ) {
console.error("[tv-guide] algorithm error1 : " + a1 + ", " + t + ", " + b1 );
}
if ( !(a2 <= t && t < b2) ) {
console.error("[tv-guide] algorithm error2 : " + a2 + ", " + t + ", " + b2 );
}
let a = Math.max( a1, a2 );
let b = Math.min( b1, b2 );
let start = a;
let duration = b - a;
let program2 = clone( otherPlaying.program );
program2.duration = duration;
playing = {
@ -266,7 +310,12 @@ class TVGuideService
x.program.duration -= d;
}
if (x.program.duration == 0) {
console.error("There's a program with duration 0?");
console.error(channel.number + " There's a program with duration 0? " + JSON.stringify(x.program) + " ; " + t1 );
x.program.duration = 5 * 60 * 1000;
} else if ( ! Number.isInteger( x.program.duration ) ) {
console.error(channel.number + " There's a program with non-integer duration?? " + JSON.stringify(x.program) + " ; " + t1 );
x.program = JSON.parse( JSON.stringify(x.program) );
x.program.duration = Math.ceil(x.program.duration );
}
}
result.programs = [];
@ -352,14 +401,18 @@ class TVGuideService
return result;
}
async buildIt() {
async buildIt(lastRetry) {
try {
this.cached = await this.buildItManaged();
console.log("Internal TV Guide data refreshed at " + (new Date()).toLocaleString() );
await this.refreshXML();
} catch(err) {
console.error("Unable to update internal guide data", err);
await _wait(100);
let w = 100;
if (typeof(lastRetry) !== 'undefined') {
w = Math.min(w*2, 5 * 60 * 1000);
}
await _wait(w);
console.error("Retrying TV guide...");
await this.buildIt();
@ -374,6 +427,7 @@ class TVGuideService
let xmltvSettings = this.db['xmltv-settings'].find()[0];
await this.xmltv.WriteXMLTV(this.cached, xmltvSettings, async() => await this._throttle(), this.cacheImageService);
let t = "" + ( (new Date()) );
this.emit("xmltv-updated", { time: t } );
eventService.push(
"xmltv",
{

View File

@ -2,18 +2,23 @@ const express = require('express')
const helperFuncs = require('./helperFuncs')
const FFMPEG = require('./ffmpeg')
const FFMPEG_TEXT = require('./ffmpegText')
const PlexTranscoder = require('./plexTranscoder')
const fs = require('fs')
const ProgramPlayer = require('./program-player');
const channelCache = require('./channel-cache')
const wereThereTooManyAttempts = require('./throttler');
const constants = require('./constants');
module.exports = { router: video }
module.exports = { router: video, shutdown: shutdown }
let StreamCount = 0;
function video( channelDB , fillerDB, db, programmingService, activeChannelService ) {
let stopPlayback = false;
async function shutdown() {
stopPlayback = true;
}
function video( channelService, fillerDB, db, programmingService, activeChannelService ) {
var router = express.Router()
router.get('/setup', (req, res) => {
@ -47,18 +52,22 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
})
// Continuously stream video to client. Leverage ffmpeg concat for piecing together videos
let concat = async (req, res, audioOnly) => {
if (stopPlayback) {
res.status(503).send("Server is shutting down.")
return;
}
// Check if channel queried is valid
if (typeof req.query.channel === 'undefined') {
res.status(500).send("No Channel Specified")
return
}
let number = parseInt(req.query.channel, 10);
let channel = await channelCache.getChannelConfig(channelDB, number);
if (channel.length === 0) {
let channel = await channelService.getChannel(number);
if (channel == null) {
res.status(500).send("Channel doesn't exist")
return
}
channel = channel[0]
let ffmpegSettings = db['ffmpeg-settings'].find()[0]
@ -123,6 +132,11 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
// Stream individual video to ffmpeg concat above. This is used by the server, NOT the client
router.get('/stream', async (req, res) => {
if (stopPlayback) {
res.status(503).send("Server is shutting down.")
return;
}
// Check if channel queried is valid
res.on("error", (e) => {
console.error("There was an unexpected error in stream.", e);
@ -137,9 +151,9 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
let session = parseInt(req.query.session);
let m3u8 = (req.query.m3u8 === '1');
let number = parseInt(req.query.channel);
let channel = await channelCache.getChannelConfig(channelDB, number);
let channel = await channelService.getChannel( number);
if (channel.length === 0) {
if (channel == null) {
res.status(404).send("Channel doesn't exist")
return
}
@ -152,7 +166,6 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
if ( (typeof req.query.first !== 'undefined') && (req.query.first=='1') ) {
isFirst = true;
}
channel = channel[0]
let ffmpegSettings = db['ffmpeg-settings'].find()[0]
@ -181,12 +194,14 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
duration: 40,
start: 0,
};
} else if (lineupItem == null) {
} else if (lineupItem != null) {
redirectChannels = lineupItem.redirectChannels;
} else {
prog = programmingService.getCurrentProgramAndTimeElapsed(t0, channel);
activeChannelService.peekChannel(t0, channel.number);
while (true) {
redirectChannels.push( brandChannel );
redirectChannels.push( helperFuncs.generateChannelContext(brandChannel) );
upperBounds.push( prog.program.duration - prog.timeElapsed );
if ( !(prog.program.isOffline) || (prog.program.type != 'redirect') ) {
@ -203,9 +218,9 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
let newChannelNumber= prog.program.channel;
let newChannel = await channelCache.getChannelConfig(channelDB, newChannelNumber);
let newChannel = await channelService.getChannel(newChannelNumber);
if (newChannel.length == 0) {
if (newChannel == null) {
let err = Error("Invalid redirect to a channel that doesn't exist");
console.error("Invalid redirect to channel that doesn't exist.", err);
prog = {
@ -218,7 +233,6 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
}
continue;
}
newChannel = newChannel[0];
brandChannel = newChannel;
lineupItem = channelCache.getCurrentLineupItem( newChannel.number, t0);
if (lineupItem != null) {
@ -269,6 +283,7 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
//adjust upper bounds and record playbacks
for (let i = redirectChannels.length-1; i >= 0; i--) {
lineupItem = JSON.parse( JSON.stringify(lineupItem ));
lineupItem.redirectChannels = redirectChannels;
let u = upperBounds[i] + beginningOffset;
if (typeof(u) !== 'undefined') {
let u2 = upperBound;
@ -308,7 +323,7 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
};
}
let combinedChannel = JSON.parse( JSON.stringify(brandChannel) );
let combinedChannel = helperFuncs.generateChannelContext(brandChannel);
combinedChannel.transcoding = channel.transcoding;
let playerContext = {
@ -335,6 +350,8 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
'Content-Type': 'video/mp2t'
});
shieldActiveChannels(redirectChannels, t0, constants.START_CHANNEL_GRACE_PERIOD);
let t1;
try {
@ -367,9 +384,29 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
for (let i = redirectChannels.length-1; i >= 0; i--) {
activeChannelService.registerChannelActive(t0, redirectChannels[i].number);
}
let listener = (data) => {
let shouldStop = false;
try {
for (let i = 0; i < redirectChannels.length; i++) {
if (redirectChannels[i].number == data.channelNumber) {
shouldStop = true;
}
}
if (shouldStop) {
console.log("Playing channel has received an update.");
shieldActiveChannels( redirectChannels, t0, constants.CHANNEL_STOP_SHIELD )
setTimeout(stop, 100);
}
} catch (error) {
console.err("Unexpected error when processing channel change during playback", error);
}
};
channelService.on("channel-update", listener);
let oldStop = stop;
stop = () => {
channelService.removeListener("channel-update", listener);
if (!stopDetected) {
stopDetected = true;
let t1 = new Date().getTime();
@ -403,6 +440,12 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
router.get('/m3u8', async (req, res) => {
if (stopPlayback) {
res.status(503).send("Server is shutting down.")
return;
}
let sessionId = StreamCount++;
//res.type('application/vnd.apple.mpegurl')
@ -415,8 +458,8 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
}
let channelNum = parseInt(req.query.channel, 10)
let channel = await channelCache.getChannelConfig(channelDB, channelNum );
if (channel.length === 0) {
let channel = await channelService.getChannel(channelNum );
if (channel == null) {
res.status(500).send("Channel doesn't exist")
return
}
@ -451,6 +494,12 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
res.send(data)
})
router.get('/playlist', async (req, res) => {
if (stopPlayback) {
res.status(503).send("Server is shutting down.")
return;
}
res.type('text')
// Check if channel queried is valid
@ -460,8 +509,8 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
}
let channelNum = parseInt(req.query.channel, 10)
let channel = await channelCache.getChannelConfig(channelDB, channelNum );
if (channel.length === 0) {
let channel = await channelService.getChannel(channelNum );
if (channel == null) {
res.status(500).send("Channel doesn't exist")
return
}
@ -496,10 +545,25 @@ function video( channelDB , fillerDB, db, programmingService, activeChannelServi
res.send(data)
})
let shieldActiveChannels = (channelList, t0, timeout) => {
// because of channel redirects, it's possible that multiple channels
// are being played at once. Mark all of them as being played
// this is a grave period of 30
//mark all channels being played as active:
for (let i = channelList.length-1; i >= 0; i--) {
activeChannelService.registerChannelActive(t0, channelList[i].number);
}
setTimeout( () => {
for (let i = channelList.length-1; i >= 0; i--) {
activeChannelService.registerChannelStopped(t0, channelList[i].number);
}
}, timeout );
}
let mediaPlayer = async(channelNum, path, req, res) => {
let channel = await channelCache.getChannelConfig(channelDB, channelNum );
if (channel.length === 0) {
let channel = await channelService.getChannel(channelNum );
if (channel === null) {
res.status(404).send("Channel not found.");
return;
}

View File

@ -49,6 +49,7 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get
scope.episodeMemory = {
saved : false,
};
scope.fixedOnDemand = false;
if (typeof scope.channel === 'undefined' || scope.channel == null) {
scope.channel = {}
scope.channel.programs = []
@ -182,7 +183,11 @@ module.exports = function ($timeout, $location, dizquetv, resolutionOptions, get
(scope.channel.onDemand.isOnDemand === true)
&&
(scope.channel.onDemand.paused === true)
&&
! scope.fixedOnDemand
) {
//this should only happen once per channel
scope.fixedOnDemand = true;
originalStart = new Date().getTime();
originalStart -= scope.channel.onDemand.playedOffset;
let m = scope.channel.onDemand.firstProgramModulo;