diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 566df0d9e..75935a6dd 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -35,7 +35,27 @@ just_switched = ref false %include "ls_lib.liq" -queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5)) +sources = ref [] +source_id = ref 0 + +def create_source() + sources := list.append([request.equeue(id="s#{!source_id}", length=0.5)], !sources) + source_id := !source_id + 1 +end + +create_source() +create_source() +create_source() +create_source() + +create_source() +create_source() +create_source() +create_source() + +queue = add(!sources) + +queue = audio_to_stereo(id="queue_src", queue) queue = cue_cut(queue) queue = amplify(1., override="replay_gain", queue) diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 941f3610a..9fed011fe 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -13,11 +13,12 @@ import signal import logging import locale import os -from Queue import Queue +from Queue import Queue from threading import Lock from pypopush import PypoPush +from pypoliqqueue import PypoLiqQueue from pypofetch import PypoFetch from pypofile import PypoFile from recorder import Recorder @@ -63,7 +64,7 @@ try: LogWriter.override_std_err(logger) except Exception, e: print "Couldn't configure logging" - sys.exit() + sys.exit(1) def configure_locale(): logger.debug("Before %s", locale.nl_langinfo(locale.CODESET)) @@ -228,8 +229,16 @@ if __name__ == '__main__': stat.daemon = True stat.start() + pypoLiq_q = Queue() + liq_queue_tracker = dict() + telnet_liquidsoap = TelnetLiquidsoap() + plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ + telnet_liquidsoap) + plq.daemon = True + plq.start() + # all join() are commented out because we want to exit entire pypo - # if pypofetch is exiting + # if pypofetch terminates #pmh.join() #recorder.join() #pp.join() diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index f10c8c958..06552765e 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -7,16 +7,16 @@ import logging.config import json import telnetlib import copy -from threading import Thread import subprocess +import datetime from Queue import Empty +from threading import Thread +from subprocess import Popen, PIPE +from configobj import ConfigObj from api_clients import api_client from std_err_override import LogWriter -from subprocess import Popen, PIPE - -from configobj import ConfigObj # configure logging logging_cfg = os.path.join(os.path.dirname(__file__), "logging.cfg") @@ -481,6 +481,7 @@ class PypoFetch(Thread): except Exception, e: pass + media_copy = {} for key in media: media_item = media[key] if (media_item['type'] == 'file'): @@ -490,12 +491,17 @@ class PypoFetch(Thread): media_item['file_ready'] = False media_filtered[key] = media_item + media_item['start'] = datetime.strptime(media_item['start'], "%Y-%m-%d-%H-%M-%S") + media_item['end'] = datetime.strptime(media_item['end'], "%Y-%m-%d-%H-%M-%S") + media_copy[media_item['start']] = media_item + + self.media_prepare_queue.put(copy.copy(media_filtered)) except Exception, e: self.logger.error("%s", e) # Send the data to pypo-push self.logger.debug("Pushing to pypo-push") - self.push_queue.put(media) + self.push_queue.put(media_copy) # cleanup diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py new file mode 100644 index 000000000..53e9bbeec --- /dev/null +++ b/python_apps/pypo/pypoliqqueue.py @@ -0,0 +1,104 @@ +from threading import Thread +from collections import deque +from datetime import datetime + +import traceback +import sys + +from Queue import Empty + +import signal +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + +class PypoLiqQueue(Thread): + def __init__(self, q, telnet_lock, logger, liq_queue_tracker, \ + telnet_liquidsoap): + Thread.__init__(self) + self.queue = q + self.telnet_lock = telnet_lock + self.logger = logger + self.liq_queue_tracker = liq_queue_tracker + self.telnet_liquidsoap = telnet_liquidsoap + + def main(self): + time_until_next_play = None + schedule_deque = deque() + media_schedule = None + + while True: + try: + if time_until_next_play is None: + media_schedule = self.queue.get(block=True) + else: + media_schedule = self.queue.get(block=True, timeout=time_until_next_play) + except Empty, e: + #Time to push a scheduled item. + media_item = schedule_deque.popleft() + self.telnet_to_liquidsoap(media_item) + if len(schedule_deque): + time_until_next_play = \ + self.date_interval_to_seconds( + schedule_deque[0]['start'] - datetime.utcnow()) + else: + time_until_next_play = None + else: + #new schedule received. Replace old one with this. + schedule_deque.clear() + + keys = sorted(media_schedule.keys()) + for i in keys: + schedule_deque.append(media_schedule[i]) + + time_until_next_play = self.date_interval_to_seconds(\ + keys[0] - datetime.utcnow()) + + def is_media_item_finished(self, media_item): + return datetime.utcnow() > media_item['end'] + + def telnet_to_liquidsoap(self, media_item): + """ + telnets to liquidsoap and pushes the media_item to its queue. Push the + show name of every media_item as well, just to keep Liquidsoap up-to-date + about which show is playing. + """ + + available_queue = None + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi == None or self.is_media_item_finished(mi): + #queue "i" is available. Push to this queue + available_queue = i + + if available_queue == None: + raise NoQueueAvailableException() + + try: + self.telnet_liquidsoap.queue_push(available_queue, media_item) + self.liq_queue_tracker[available_queue] = media_item + except Exception as e: + self.logger.error(e) + raise + + def date_interval_to_seconds(self, interval): + """ + Convert timedelta object into int representing the number of seconds. If + number of seconds is less than 0, then return 0. + """ + seconds = (interval.microseconds + \ + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) + if seconds < 0: seconds = 0 + + return seconds + + + def run(self): + try: self.main() + except Exception, e: + self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc()) + +class NoQueueAvailableException(Exception): + pass diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index f438b3bb1..ea51d0724 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -9,9 +9,15 @@ import logging.config import telnetlib import calendar import math +import traceback import os -from pypofetch import PypoFetch +from pypofetch import PypoFetch +from telnetliquidsoap import TelnetLiquidsoap +from pypoliqqueue import PypoLiqQueue + + +import Queue from Queue import Empty from threading import Thread @@ -58,6 +64,27 @@ class PypoPush(Thread): self.pushed_objects = {} self.logger = logging.getLogger('push') self.current_prebuffering_stream_id = None + self.queue_id = 0 + self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ + self.logger,\ + LS_HOST,\ + LS_PORT\ + ) + + liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + + self.pypoLiq_q = Queue() + self.plq = PypoLiqQueue(self.pypoLiq_q, \ + telnet_lock, \ + liq_queue_tracker, \ + self.telnet_liquidsoap) + plq.daemon = True + plq.start() def main(self): loops = 0 @@ -74,50 +101,9 @@ class PypoPush(Thread): media_schedule = self.queue.get(block=True) else: media_schedule = self.queue.get(block=True, timeout=time_until_next_play) - - chains = self.get_all_chains(media_schedule) - - #We get to the following lines only if a schedule was received. - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - liquidsoap_stream_id = self.get_current_stream_id_from_liquidsoap() - - tnow = datetime.utcnow() - current_event_chain, original_chain = self.get_current_chain(chains, tnow) - - if len(current_event_chain) > 0: - try: - chains.remove(original_chain) - except ValueError, e: - self.logger.error(str(e)) - - #At this point we know that Liquidsoap is playing something, and that something - #is scheduled. We need to verify whether the schedule we just received matches - #what Liquidsoap is playing, and if not, correct it. - - self.handle_new_schedule(media_schedule, liquidsoap_queue_approx, liquidsoap_stream_id, current_event_chain) - - - #At this point everything in the present has been taken care of and Liquidsoap - #is playing whatever is scheduled. - #Now we need to prepare ourselves for future scheduled events. - # - next_media_item_chain = self.get_next_schedule_chain(chains, tnow) - - self.logger.debug("Next schedule chain: %s", next_media_item_chain) - if next_media_item_chain is not None: - try: - chains.remove(next_media_item_chain) - except ValueError, e: - self.logger.error(str(e)) - - chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") - time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) - self.logger.debug("Blocking %s seconds until show start", time_until_next_play) - else: - self.logger.debug("Blocking indefinitely since no show scheduled") - time_until_next_play = None except Empty, e: #We only get here when a new chain of tracks are ready to be played. + #"timeout" has parameter has been reached. self.push_to_liquidsoap(next_media_item_chain) next_media_item_chain = self.get_next_schedule_chain(chains, datetime.utcnow()) @@ -126,7 +112,8 @@ class PypoPush(Thread): chains.remove(next_media_item_chain) except ValueError, e: self.logger.error(str(e)) - chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + + chain_start = next_media_item_chain[0]['start'] time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) self.logger.debug("Blocking %s seconds until show start", time_until_next_play) else: @@ -134,12 +121,111 @@ class PypoPush(Thread): time_until_next_play = None except Exception, e: self.logger.error(str(e)) + else: + #separate media_schedule list into currently_playing and + #scheduled_for_future lists + currently_playing, scheduled_for_future = \ + self.separate_present_future(media_schedule) + + self.verify_correct_present_media(currently_playing) + + self.future_scheduled_queue.put(scheduled_for_future) + + self.pypoLiq_q.put(scheduled_for_future) + + """ + #queue.get timeout never had a chance to expire. Instead a new + #schedule was received. Let's parse this schedule and generate + #a new timeout. + try: + chains = self.get_all_chains(media_schedule) + + #We get to the following lines only if a schedule was received. + liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() + liquidsoap_stream_id = self.get_current_stream_id_from_liquidsoap() + + tnow = datetime.utcnow() + current_event_chain, original_chain = \ + self.get_current_chain(chains, tnow) + + if len(current_event_chain) > 0: + try: + chains.remove(original_chain) + except ValueError, e: + self.logger.error(str(e)) + + #At this point we know that Liquidsoap is playing something, and that something + #is scheduled. We need to verify whether the schedule we just received matches + #what Liquidsoap is playing, and if not, correct it. + self.handle_new_schedule(media_schedule, \ + liquidsoap_queue_approx, \ + liquidsoap_stream_id, \ + current_event_chain) + + #At this point everything in the present has been taken care of and Liquidsoap + #is playing whatever is scheduled. + #Now we need to prepare ourselves for future scheduled events. + next_media_item_chain = self.get_next_schedule_chain(chains, tnow) + + self.logger.debug("Next schedule chain: %s", next_media_item_chain) + if next_media_item_chain is not None: + try: + chains.remove(next_media_item_chain) + except ValueError, e: + self.logger.error(str(e)) + + chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) + self.logger.debug("Blocking %s seconds until show start", time_until_next_play) + else: + self.logger.debug("Blocking indefinitely since no show scheduled") + time_until_next_play = None + except Exception, e: + self.logger.error(str(e)) + """ if loops % heartbeat_period == 0: self.logger.info("heartbeat") loops = 0 loops += 1 + + def separate_present_future(self, media_schedule): + tnow = datetime.utcnow() + + present = {} + future = {} + + sorted_keys = sorted(media_schedule.keys()) + for mkey in sorted_keys: + media_item = media_schedule[mkey] + + media_item_start = media_item['start'] + diff_td = tnow - media_item_start + diff_sec = self.date_interval_to_seconds(diff_td) + + if diff_sec >= 0: + present[media_item['start']] = media_item + else: + future[media_item['start']] = media_item + + return present, future + + def verify_correct_present_media(self, currently_playing): + #verify whether Liquidsoap is currently playing the correct items. + #if we find an item that Liquidsoap is not playing, then push it + #into one of Liquidsoap's queues. If Liquidsoap is already playing + #it do nothing. If Liquidsoap is playing a track that isn't in + #currently_playing then stop it. + + #Check for Liquidsoap media we should source.skip + #get liquidsoap items for each queue. Since each queue can only have one + #item, we should have a max of 8 items. + #TODO + + #Check for media Liquidsoap should start playing + #TODO + def get_current_stream_id_from_liquidsoap(self): response = "-1" try: @@ -167,7 +253,7 @@ class PypoPush(Thread): self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) - msg = 'queue.queue\n' + msg = 's0.queue\n' tn.write(msg) response = tn.read_until("\r\n").strip(" \r\n") tn.write('exit\n') @@ -355,7 +441,7 @@ class PypoPush(Thread): def modify_cue_point(self, link): tnow = datetime.utcnow() - link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S") + link_start = link['start'] diff_td = tnow - link_start diff_sec = self.date_interval_to_seconds(diff_td) @@ -399,8 +485,8 @@ class PypoPush(Thread): for chain in chains: iteration = 0 for link in chain: - link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S") - link_end = datetime.strptime(link['end'], "%Y-%m-%d-%H-%M-%S") + link_start = link['start'] + link_end = link['end'] self.logger.debug("tnow %s, chain_start %s", tnow, link_start) if link_start <= tnow and tnow < link_end: @@ -423,10 +509,12 @@ class PypoPush(Thread): closest_start = None closest_chain = None for chain in chains: - chain_start = datetime.strptime(chain[0]['start'], "%Y-%m-%d-%H-%M-%S") - chain_end = datetime.strptime(chain[-1]['end'], "%Y-%m-%d-%H-%M-%S") + chain_start = chain[0]['start'] + chain_end = chain[-1]['end'] self.logger.debug("tnow %s, chain_start %s", tnow, chain_start) - if (closest_start == None or chain_start < closest_start) and (chain_start > tnow or (chain_start < tnow and chain_end > tnow)): + if (closest_start == None or chain_start < closest_start) and \ + (chain_start > tnow or \ + (chain_start < tnow and chain_end > tnow)): closest_start = chain_start closest_chain = chain @@ -482,6 +570,8 @@ class PypoPush(Thread): self.stop_web_stream_output(media_item) except Exception, e: self.logger.error('Pypo Push Exception: %s', e) + finally: + self.queue_id = (self.queue_id + 1) % 8 def start_web_stream_buffer(self, media_item): @@ -640,7 +730,7 @@ class PypoPush(Thread): self.logger.debug(msg) tn.write(msg) - msg = "queue.queue\n" + msg = "s0.queue\n" self.logger.debug(msg) tn.write(msg) @@ -687,10 +777,8 @@ class PypoPush(Thread): self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) - #tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) - annotation = self.create_liquidsoap_annotation(media_item) - msg = 'queue.push %s\n' % annotation.encode('utf-8') + msg = 's%s.push %s\n' % (self.queue_id, annotation.encode('utf-8')) self.logger.debug(msg) tn.write(msg) queue_id = tn.read_until("\r\n").strip("\r\n") @@ -722,7 +810,6 @@ class PypoPush(Thread): def run(self): try: self.main() except Exception, e: - import traceback top = traceback.format_exc() self.logger.error('Pypo Push Exception: %s', top) diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py new file mode 100644 index 000000000..5131d2c62 --- /dev/null +++ b/python_apps/pypo/telnetliquidsoap.py @@ -0,0 +1,74 @@ +import telnetlib + +def create_liquidsoap_annotation(media): + # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. + return 'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s",replay_gain="%s dB":%s' \ + % (media['id'], float(media['fade_in']) / 1000, float(media['fade_out']) / 1000, float(media['cue_in']), float(media['cue_out']), media['row_id'], media['replay_gain'], media['dst']) + +class TelnetLiquidsoap: + + def __init__(self, telnet_lock, logger, ls_host, ls_port): + self.telnet_lock = telnet_lock + self.ls_host = ls_host + self.ls_port = ls_port + self.logger = logger + + def __connect(self): + return telnetlib.Telnet(self.ls_host, self.ls_port) + + def __is_empty(self, tn, queue_id): + return True + + + def queue_push(self, queue_id, media_item): + try: + self.telnet_lock.acquire() + tn = self.__connect() + + if not self.__is_empty(tn, queue_id): + raise QueueNotEmptyException() + + annotation = create_liquidsoap_annotation(media_item) + msg = '%s.push %s\n' % (queue_id, annotation.encode('utf-8')) + self.logger.debug(msg) + tn.write(msg) + + show_name = media_item['show_name'] + msg = 'vars.show_name %s\n' % show_name.encode('utf-8') + tn.write(msg) + self.logger.debug(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception: + raise + finally: + self.telnet_lock.release() + +class DummyTelnetLiquidsoap: + + def __init__(self, telnet_lock, logger): + self.telnet_lock = telnet_lock + self.liquidsoap_mock_queues = {} + self.logger = logger + + for i in range(4): + self.liquidsoap_mock_queues["s"+str(i)] = [] + + def queue_push(self, queue_id, media_item): + try: + self.telnet_lock.acquire() + + self.logger.info("Pushing %s to queue %s" % (media_item, queue_id)) + from datetime import datetime + print "Time now: %s" % datetime.utcnow() + + annotation = create_liquidsoap_annotation(media_item) + self.liquidsoap_mock_queues[queue_id].append(annotation) + except Exception: + raise + finally: + self.telnet_lock.release() + +class QueueNotEmptyException(Exception): + pass diff --git a/python_apps/pypo/testpypoliqqueue.py b/python_apps/pypo/testpypoliqqueue.py new file mode 100644 index 000000000..f1847b34f --- /dev/null +++ b/python_apps/pypo/testpypoliqqueue.py @@ -0,0 +1,98 @@ +from pypoliqqueue import PypoLiqQueue +from telnetliquidsoap import DummyTelnetLiquidsoap, TelnetLiquidsoap + + +from Queue import Queue +from threading import Lock + +import sys +import signal +import logging +from datetime import datetime +from datetime import timedelta + +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + +# configure logging +format = '%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s' +logging.basicConfig(level=logging.DEBUG, format=format) +logging.captureWarnings(True) + +telnet_lock = Lock() +pypoPush_q = Queue() + + +pypoLiq_q = Queue() +liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + +#dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging) +dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, \ + "localhost", \ + 1234) + +plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logging, liq_queue_tracker, \ + dummy_telnet_liquidsoap) +plq.daemon = True +plq.start() + + +print "Time now: %s" % datetime.utcnow() + +media_schedule = {} + +start_dt = datetime.utcnow() + timedelta(seconds=1) +end_dt = datetime.utcnow() + timedelta(seconds=6) + +media_schedule[start_dt] = {"id": 5, \ + "type":"file", \ + "row_id":9, \ + "uri":"", \ + "dst":"/home/martin/Music/ipod/Hot Chocolate - You Sexy Thing.mp3", \ + "fade_in":0, \ + "fade_out":0, \ + "cue_in":0, \ + "cue_out":300, \ + "start": start_dt, \ + "end": end_dt, \ + "show_name":"Untitled", \ + "replay_gain": 0, \ + "independent_event": True \ + } + + + +start_dt = datetime.utcnow() + timedelta(seconds=2) +end_dt = datetime.utcnow() + timedelta(seconds=6) + +media_schedule[start_dt] = {"id": 5, \ + "type":"file", \ + "row_id":9, \ + "uri":"", \ + "dst":"/home/martin/Music/ipod/Good Charlotte - bloody valentine.mp3", \ + "fade_in":0, \ + "fade_out":0, \ + "cue_in":0, \ + "cue_out":300, \ + "start": start_dt, \ + "end": end_dt, \ + "show_name":"Untitled", \ + "replay_gain": 0, \ + "independent_event": True \ + } +pypoLiq_q.put(media_schedule) + +plq.join() + + + + +