diff --git a/index.js b/index.js index da5ee00..96fe298 100644 --- a/index.js +++ b/index.js @@ -29,6 +29,7 @@ const EventService = require("./src/services/event-service"); const OnDemandService = require("./src/services/on-demand-service"); const ProgrammingService = require("./src/services/programming-service"); const ActiveChannelService = require('./src/services/active-channel-service') +const ProgramPlayTimeDB = require('./src/dao/program-play-time-db') const onShutdown = require("node-graceful-shutdown").onShutdown; @@ -95,7 +96,19 @@ channelService = new ChannelService(channelDB); fillerDB = new FillerDB( path.join(process.env.DATABASE, 'filler') , channelService ); customShowDB = new CustomShowDB( path.join(process.env.DATABASE, 'custom-shows') ); +let programPlayTimeDB = new ProgramPlayTimeDB( path.join(process.env.DATABASE, 'play-cache') ); +async function initializeProgramPlayTimeDB() { + try { + let t0 = new Date().getTime(); + await programPlayTimeDB.load(); + let t1 = new Date().getTime(); + console.log(`Program Play Time Cache loaded in ${t1-t0} milliseconds.`); + } catch (err) { + console.log(err); + } +} +initializeProgramPlayTimeDB(); fileCache = new FileCacheService( path.join(process.env.DATABASE, 'cache') ); cacheImageService = new CacheImageService(db, fileCache); @@ -270,7 +283,7 @@ app.use('/custom.css', express.static(path.join(process.env.DATABASE, 'custom.cs app.use(api.router(db, channelService, fillerDB, customShowDB, xmltvInterval, guideService, m3uService, eventService )) app.use('/api/cache/images', cacheImageService.apiRouters()) -app.use(video.router( channelService, fillerDB, db, programmingService, activeChannelService )) +app.use(video.router( channelService, fillerDB, db, programmingService, activeChannelService, programPlayTimeDB )) app.use(hdhr.router) app.listen(process.env.PORT, () => { console.log(`HTTP server running on port: http://*:${process.env.PORT}`) diff --git a/src/channel-cache.js b/src/channel-cache.js index 302510a..9485ffb 100644 --- a/src/channel-cache.js +++ b/src/channel-cache.js @@ -1,7 +1,7 @@ const SLACK = require('./constants').SLACK; let cache = {}; -let programPlayTimeCache = {}; + let fillerPlayTimeCache = {}; let configCache = {}; let numbers = null; @@ -14,11 +14,9 @@ async function getChannelConfig(channelDB, channelId) { if (channel == null) { configCache[channelId] = []; } else { - //console.log("channel=" + JSON.stringify(channel) ); configCache[channelId] = [channel]; } } - //console.log("channel=" + JSON.stringify(configCache[channelId]).slice(0,200) ); return configCache[channelId]; } @@ -106,7 +104,7 @@ function getCurrentLineupItem(channelId, t1) { return lineupItem; } -function getKey(channelId, program) { +function getProgramKey(program) { let serverKey = "!unknown!"; if (typeof(program.serverKey) !== 'undefined') { if (typeof(program.serverKey) !== 'undefined') { @@ -117,36 +115,37 @@ function getKey(channelId, program) { if (typeof(program.key) !== 'undefined') { programKey = program.key; } - return channelId + "|" + serverKey + "|" + programKey; - + return serverKey + "|" + programKey; } + function getFillerKey(channelId, fillerId) { return channelId + "|" + fillerId; } -function recordProgramPlayTime(channelId, lineupItem, t0) { +function recordProgramPlayTime(programPlayTime, channelId, lineupItem, t0) { let remaining; if ( typeof(lineupItem.streamDuration) !== 'undefined') { remaining = lineupItem.streamDuration; } else { remaining = lineupItem.duration - lineupItem.start; } - programPlayTimeCache[ getKey(channelId, lineupItem) ] = t0 + remaining; + setProgramLastPlayTime(programPlayTime, channelId, lineupItem, t0 + remaining); if (typeof(lineupItem.fillerId) !== 'undefined') { fillerPlayTimeCache[ getFillerKey(channelId, lineupItem.fillerId) ] = t0 + remaining; } } -function getProgramLastPlayTime(channelId, program) { - let v = programPlayTimeCache[ getKey(channelId, program) ]; - if (typeof(v) === 'undefined') { - return 0; - } else { - return v; - } +function setProgramLastPlayTime(programPlayTime, channelId, lineupItem, t) { + let programKey = getProgramKey(lineupItem); + programPlayTime.update(channelId, programKey, t); +} + +function getProgramLastPlayTime(programPlayTime, channelId, program) { + let programKey = getProgramKey(program); + return programPlayTime.getProgramLastPlayTime(channelId, programKey); } function getFillerLastPlayTime(channelId, fillerId) { @@ -158,8 +157,8 @@ function getFillerLastPlayTime(channelId, fillerId) { } } -function recordPlayback(channelId, t0, lineupItem) { - recordProgramPlayTime(channelId, lineupItem, t0); +function recordPlayback(programPlayTime, channelId, t0, lineupItem) { + recordProgramPlayTime(programPlayTime, channelId, lineupItem, t0); cache[channelId] = { t0: t0, diff --git a/src/dao/program-play-time-db.js b/src/dao/program-play-time-db.js new file mode 100644 index 0000000..f285e74 --- /dev/null +++ b/src/dao/program-play-time-db.js @@ -0,0 +1,90 @@ +const path = require('path'); +var fs = require('fs'); + +class ProgramPlayTimeDB { + + constructor(dir) { + this.dir = dir; + this.programPlayTimeCache = {}; + } + + + async load() { + try { + if (! (await fs.promises.stat(this.dir)).isDirectory()) { + return; + } + } catch (err) { + return; + } + let files = await fs.promises.readdir(this.dir); + + let processSubFileName = async (fileName, subDir, subFileName) => { + try { + if (subFileName.endsWith(".json")) { + let programKey64 = subFileName.substring( + 0, + subFileName.length - 4 + ); + let programKey = Buffer.from(programKey64, 'base64') + .toString('utf-8'); + + + let filePath = path.join(subDir, subFileName); + let fileContent = await fs.promises.readFile( + filePath, 'utf-8'); + let jsonData = JSON.parse(fileContent); + let key = getKey(fileName, programKey); + this.programPlayTimeCache[ key ] = jsonData["t"] + } + } catch (err) { + console.log(`When processing ${subDir}/${subFileName}`, err); + } + } + + let processFileName = async(fileName) => { + + try { + const subDir = path.join(this.dir, fileName); + let subFiles = await fs.promises.readdir( subDir ); + + await Promise.all( subFiles.map( async subFileName => { + return processSubFileName(fileName, subDir, subFileName); + }) ); + } catch (err) { + console.log(`When processing ${subDir}`, err); + } + } + + await Promise.all( files.map(processFileName) ); + } + + getProgramLastPlayTime(channelId, programKey) { + let v = this.programPlayTimeCache[ getKey(channelId, programKey) ]; + if (typeof(v) === 'undefined') { + v = 0; + } + return v; + } + + async update(channelId, programKey, t) { + + let key = getKey(channelId, programKey); + this.programPlayTimeCache[ key ] = t; + + const channelDir = path.join(this.dir, `${channelId}`); + await fs.promises.mkdir( channelDir, { recursive: true } ); + let key64 = Buffer.from(programKey, 'utf-8').toString('base64'); + let filepath = path.join(channelDir, `${key64}.json`); + let data = {t:t}; + await fs.promises.writeFile(filepath, JSON.stringify(data), 'utf-8'); + } + +} + +function getKey(channelId, programKey) { + return channelId + "|" + programKey; +} + + +module.exports = ProgramPlayTimeDB; \ No newline at end of file diff --git a/src/helperFuncs.js b/src/helperFuncs.js index da291ce..a893b14 100644 --- a/src/helperFuncs.js +++ b/src/helperFuncs.js @@ -62,7 +62,7 @@ function getCurrentProgramAndTimeElapsed(date, channel) { return { program: channel.programs[currentProgramIndex], timeElapsed: timeElapsed, programIndex: currentProgramIndex } } -function createLineup(obj, channel, fillers, isFirst) { +function createLineup(programPlayTime, obj, channel, fillers, isFirst) { let timeElapsed = obj.timeElapsed // Start time of a file is never consistent unless 0. Run time of an episode can vary. // When within 30 seconds of start time, just make the time 0 to smooth things out @@ -97,7 +97,7 @@ function createLineup(obj, channel, fillers, isFirst) { if ( (channel.offlineMode === 'clip') && (channel.fallback.length != 0) ) { special = JSON.parse(JSON.stringify(channel.fallback[0])); } - let randomResult = pickRandomWithMaxDuration(channel, fillers, remaining + (isFirst? (7*24*60*60*1000) : 0) ); + let randomResult = pickRandomWithMaxDuration(programPlayTime, channel, fillers, remaining + (isFirst? (7*24*60*60*1000) : 0) ); filler = randomResult.filler; if (filler == null && (typeof(randomResult.minimumWait) !== undefined) && (remaining > randomResult.minimumWait) ) { remaining = randomResult.minimumWait; @@ -179,7 +179,7 @@ function weighedPick(a, total) { return random.bool(a, total); } -function pickRandomWithMaxDuration(channel, fillers, maxDuration) { +function pickRandomWithMaxDuration(programPlayTime, channel, fillers, maxDuration) { let list = []; for (let i = 0; i < fillers.length; i++) { list = list.concat(fillers[i].content); @@ -196,7 +196,7 @@ function pickRandomWithMaxDuration(channel, fillers, maxDuration) { let listM = 0; let fillerId = undefined; - let median = getMedian(channelCache, channel, fillers); + let median = getMedian(programPlayTime, channelCache, channel, fillers); for (let medianCheck = 1; medianCheck >= 0; medianCheck--) { for (let j = 0; j < fillers.length; j++) { @@ -208,7 +208,7 @@ function pickRandomWithMaxDuration(channel, fillers, maxDuration) { let clip = list[i]; // a few extra milliseconds won't hurt anyone, would it? dun dun dun if (clip.duration <= maxDuration + SLACK ) { - let t1 = channelCache.getProgramLastPlayTime( channel.number, clip ); + let t1 = channelCache.getProgramLastPlayTime(programPlayTime, channel.number, clip ); if ( (medianCheck==1) && (t1 > median) ) { continue; } @@ -332,13 +332,13 @@ function getWatermark( ffmpegSettings, channel, type) { } -function getMedian(channelCache, channel, fillers) { +function getMedian(programPlayTime, channelCache, channel, fillers) { let times = []; for (let j = 0; j < fillers.length; j++) { list = fillers[j].content; for (let i = 0; i < list.length; i++) { let clip = list[i]; - let t = channelCache.getProgramLastPlayTime( channel.number, clip); + let t = channelCache.getProgramLastPlayTime(programPlayTime, channel.number, clip); times.push(t); } } diff --git a/src/video.js b/src/video.js index 11b02bc..2854880 100644 --- a/src/video.js +++ b/src/video.js @@ -18,7 +18,7 @@ async function shutdown() { stopPlayback = true; } -function video( channelService, fillerDB, db, programmingService, activeChannelService ) { +function video( channelService, fillerDB, db, programmingService, activeChannelService, programPlayTimeDB ) { var router = express.Router() router.get('/setup', (req, res) => { @@ -232,7 +232,8 @@ function video( channelService, fillerDB, db, programmingService, activeChannelS if ( !(prog.program.isOffline) || (prog.program.type != 'redirect') ) { break; } - channelCache.recordPlayback( brandChannel.number, t0, { + channelCache.recordPlayback(programPlayTimeDB, + brandChannel.number, t0, { /*type: 'offline',*/ title: 'Error', err: Error("Recursive channel redirect found"), @@ -299,7 +300,7 @@ function video( channelService, fillerDB, db, programmingService, activeChannelS } let fillers = await fillerDB.getFillersFromChannel(brandChannel); try { - let lineup = helperFuncs.createLineup(prog, brandChannel, fillers, isFirst) + let lineup = helperFuncs.createLineup(programPlayTimeDB, prog, brandChannel, fillers, isFirst) lineupItem = lineup.shift(); } catch (err) { console.log("Error when attempting to pick video: " +err.stack); @@ -331,7 +332,7 @@ function video( channelService, fillerDB, db, programmingService, activeChannelS lineupItem.streamDuration = Math.min(u2, u); upperBound = lineupItem.streamDuration; } - channelCache.recordPlayback( redirectChannels[i].number, t0, lineupItem ); + channelCache.recordPlayback( programPlayTimeDB, redirectChannels[i].number, t0, lineupItem ); } } @@ -354,7 +355,7 @@ function video( channelService, fillerDB, db, programmingService, activeChannelS console.log("========================================================="); if (! isLoading && ! isBetween) { - channelCache.recordPlayback(channel.number, t0, lineupItem); + channelCache.recordPlayback(programPlayTimeDB, channel.number, t0, lineupItem); } if (wereThereTooManyAttempts(session, lineupItem)) { console.error("There are too many attempts to play the same item in a short period of time, playing the error stream instead.");