From 78b33b9e239d8c2b9db2d30369d214b480a2d411 Mon Sep 17 00:00:00 2001 From: "paul.baranowski" Date: Wed, 23 Mar 2011 01:09:27 -0400 Subject: [PATCH] CC-2084: Integrate RabbitMQ for immediate schedule updates and commands Implemented RabbitMQ on the pypo side. Schedule updates are now almost instantaneous and we are only polling the server once per hour if we aren't updated in that time. Canceling a show happens right away. --- application/models/RabbitMq.php | 34 +-- .../demo/amqp_airtime_consumer.php | 54 +++++ pypo/config.cfg | 11 +- pypo/pypofetch.py | 211 +++++++++++------- pypo/pypopush.py | 17 +- 5 files changed, 218 insertions(+), 109 deletions(-) create mode 100644 library/php-amqplib/demo/amqp_airtime_consumer.php diff --git a/application/models/RabbitMq.php b/application/models/RabbitMq.php index d251116bf..af5bffa02 100644 --- a/application/models/RabbitMq.php +++ b/application/models/RabbitMq.php @@ -10,23 +10,23 @@ class RabbitMq * in the future. */ public static function PushSchedule() { -// global $CC_CONFIG; -// $conn = new AMQPConnection($CC_CONFIG["rabbitmq"]["host"], -// $CC_CONFIG["rabbitmq"]["port"], -// $CC_CONFIG["rabbitmq"]["user"], -// $CC_CONFIG["rabbitmq"]["password"]); -// $channel = $conn->channel(); -// $channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false, true, true); -// -// $EXCHANGE = 'airtime-schedule'; -// $channel->exchange_declare($EXCHANGE, 'direct', false, false, false); -// -// $data = json_encode(Schedule::ExportRangeAsJson()); -// $msg = new AMQPMessage($data, array('content_type' => 'text/plain')); -// -// $channel->basic_publish($msg, $EXCHANGE); -// $channel->close(); -// $conn->close(); + global $CC_CONFIG; + $conn = new AMQPConnection($CC_CONFIG["rabbitmq"]["host"], + $CC_CONFIG["rabbitmq"]["port"], + $CC_CONFIG["rabbitmq"]["user"], + $CC_CONFIG["rabbitmq"]["password"]); + $channel = $conn->channel(); + $channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false, true, true); + + $EXCHANGE = 'airtime-schedule'; + $channel->exchange_declare($EXCHANGE, 'direct', false, true); + + $data = json_encode(Schedule::ExportRangeAsJson()); + $msg = new AMQPMessage($data, array('content_type' => 'text/plain')); + + $channel->basic_publish($msg, $EXCHANGE); + $channel->close(); + $conn->close(); } } diff --git a/library/php-amqplib/demo/amqp_airtime_consumer.php b/library/php-amqplib/demo/amqp_airtime_consumer.php new file mode 100644 index 000000000..bb5f8bcc8 --- /dev/null +++ b/library/php-amqplib/demo/amqp_airtime_consumer.php @@ -0,0 +1,54 @@ +#!/usr/bin/php + + */ + +require_once('../amqp.inc'); + +$HOST = 'localhost'; +$PORT = 5672; +$USER = 'guest'; +$PASS = 'guest'; +$VHOST = '/'; +$EXCHANGE = 'airtime-schedule'; +$QUEUE = 'airtime-schedule-msgs'; +$CONSUMER_TAG = 'airtime-consumer'; + +$conn = new AMQPConnection($HOST, $PORT, $USER, $PASS); +$ch = $conn->channel(); +$ch->access_request($VHOST, false, false, true, true); + +$ch->queue_declare($QUEUE); +$ch->exchange_declare($EXCHANGE, 'direct', false, false, false); +$ch->queue_bind($QUEUE, $EXCHANGE); + +function process_message($msg) { + global $ch, $CONSUMER_TAG; + + echo "\n--------\n"; + echo $msg->body; + echo "\n--------\n"; + + $ch->basic_ack($msg->delivery_info['delivery_tag']); + + // Cancel callback + if ($msg->body === 'quit') { + $ch->basic_cancel($CONSUMER_TAG); + } +} + +$ch->basic_consume($QUEUE, $CONSUMER_TAG, false, false, false, false, 'process_message'); + +// Loop as long as the channel has callbacks registered +echo "Waiting for messages...\n"; +while(count($ch->callbacks)) { + $ch->wait(); +} + +$ch->close(); +$conn->close(); +?> diff --git a/pypo/config.cfg b/pypo/config.cfg index 638558fbf..b4dc9424e 100644 --- a/pypo/config.cfg +++ b/pypo/config.cfg @@ -26,6 +26,13 @@ base_url = 'http://localhost/' ls_host = '127.0.0.1' ls_port = '1234' +############################################ +# RabbitMQ settings # +############################################ +rabbitmq_host = 'localhost' +rabbitmq_user = 'guest' +rabbitmq_password = 'guest' + ############################################ # pypo preferences # ############################################ @@ -42,7 +49,7 @@ cache_for = 24 #how long to hold the cache, in hours # the time you expect to "lock-in" your schedule. So if your schedule is set # 24 hours in advance, this can be set to poll every 12 hours. # -poll_interval = 5 # in seconds. +poll_interval = 3600 # in seconds. # Push interval in seconds. @@ -52,7 +59,7 @@ poll_interval = 5 # in seconds. # # It's hard to imagine a situation where this should be more than 1 second. # -push_interval = 2 # in seconds +push_interval = 1 # in seconds # 'pre' or 'otf'. 'pre' cues while playlist preparation # while 'otf' (on the fly) cues while loading into ls diff --git a/pypo/pypofetch.py b/pypo/pypofetch.py index 0ca468198..c31bd094a 100644 --- a/pypo/pypofetch.py +++ b/pypo/pypofetch.py @@ -12,6 +12,10 @@ import telnetlib import math from threading import Thread +# For RabbitMQ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer + from api_clients import api_client from util import CueFile @@ -25,91 +29,101 @@ try: config = ConfigObj('config.cfg') LS_HOST = config['ls_host'] LS_PORT = config['ls_port'] - POLL_INTERVAL = 5 + POLL_INTERVAL = int(config['poll_interval']) except Exception, e: print 'Error loading config file: ', e sys.exit() +# Yuk - using a global, i know! +SCHEDULE_PUSH_MSG = [] + +""" +Handle a message from RabbitMQ, put it into our yucky global var. +Hopefully there is a better way to do this. +""" +def handle_message(body, message): + logger = logging.getLogger('fetch') + global SCHEDULE_PUSH_MSG + logger.info("Received schedule from RabbitMQ: " + message.body) + SCHEDULE_PUSH_MSG = json.loads(message.body) + # ACK the message to take it off the queue + message.ack() + + class PypoFetch(Thread): def __init__(self, q): Thread.__init__(self) + logger = logging.getLogger('fetch') self.api_client = api_client.api_client_factory(config) self.cue_file = CueFile() self.set_export_source('scheduler') self.queue = q + logger.info("Initializing RabbitMQ stuff") + schedule_exchange = Exchange("airtime-schedule", "direct", durable=True, auto_delete=True) + schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") + self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/") + channel = self.connection.channel() + consumer = Consumer(channel, schedule_queue) + consumer.register_callback(handle_message) + consumer.consume() + + logger.info("PypoFetch: init complete"); + + def set_export_source(self, export_source): self.export_source = export_source self.cache_dir = config["cache_dir"] + self.export_source + '/' + """ - Fetching part of pypo - - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") - - Saves a serialized file of the schedule - - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied - to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) - - runs the cleanup routine, to get rid of unused cashed files + Process the schedule + - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") + - Saves a serialized file of the schedule + - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied + to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) + - runs the cleanup routine, to get rid of unused cashed files """ - def fetch(self, export_source): - #wrapper script for fetching the whole schedule (in json) + def process_schedule(self, schedule_data, export_source): logger = logging.getLogger('fetch') - - try: os.mkdir(self.cache_dir) - except Exception, e: pass - - # get schedule + self.schedule = schedule_data["playlists"] + + # Push stream metadata to liquidsoap + # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! + stream_metadata = schedule_data['stream_metadata'] try: - while self.get_schedule() != 1: - logger.warning("failed to read from export url") - time.sleep(1) + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + #encode in latin-1 due to telnet protocol not supporting utf-8 + tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1')) + tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1')) + tn.write('exit\n') + tn.read_all() + except Exception, e: + logger.error("Exception %s", e) + status = 0 + # Download all the media and put playlists in liquidsoap format + try: + playlists = self.prepare_playlists() except Exception, e: logger.error("%s", e) - # prepare the playlists - try: - playlists = self.prepare_playlists() - except Exception, e: logger.error("%s", e) - - + # Send the data to pypo-push scheduled_data = dict() scheduled_data['playlists'] = playlists scheduled_data['schedule'] = self.schedule + scheduled_data['stream_metadata'] = schedule_data["stream_metadata"] self.queue.put(scheduled_data) # cleanup try: self.cleanup(self.export_source) except Exception, e: logger.error("%s", e) - def get_schedule(self): - logger = logging.getLogger('fetch') - status, response = self.api_client.get_schedule() - - if status == 1: - schedule = response['playlists'] - stream_metadata = response['stream_metadata'] - try: - self.schedule = schedule - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - #encode in latin-1 due to telnet protocol not supporting utf-8 - tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1')) - tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1')) - - tn.write('exit\n') - tn.read_all() - - except Exception, e: - logger.error("Exception %s", e) - status = 0 - - return status """ - Alternative version of playout preparation. Every playlist entry is - pre-cued if neccessary (cue_in/cue_out != 0) and stored in the - playlist folder. - file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 + In this function every audio file is cut as necessary (cue_in/cue_out != 0) + and stored in a playlist folder. + file is e.g. 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 """ def prepare_playlists(self): logger = logging.getLogger('fetch') @@ -126,7 +140,7 @@ class PypoFetch(Thread): try: for pkey in scheduleKeys: - logger.info("found playlist at %s", pkey) + logger.info("Playlist starting at %s", pkey) playlist = schedule[pkey] # create playlist directory @@ -135,15 +149,15 @@ class PypoFetch(Thread): except Exception, e: pass - logger.debug('*****************************************') - logger.debug('pkey: ' + str(pkey)) - logger.debug('cached at : ' + self.cache_dir + str(pkey)) - logger.debug('subtype: ' + str(playlist['subtype'])) - logger.debug('played: ' + str(playlist['played'])) - logger.debug('schedule id: ' + str(playlist['schedule_id'])) - logger.debug('duration: ' + str(playlist['duration'])) - logger.debug('source id: ' + str(playlist['x_ident'])) - logger.debug('*****************************************') + #logger.debug('*****************************************') + #logger.debug('pkey: ' + str(pkey)) + #logger.debug('cached at : ' + self.cache_dir + str(pkey)) + #logger.debug('subtype: ' + str(playlist['subtype'])) + #logger.debug('played: ' + str(playlist['played'])) + #logger.debug('schedule id: ' + str(playlist['schedule_id'])) + #logger.debug('duration: ' + str(playlist['duration'])) + #logger.debug('source id: ' + str(playlist['x_ident'])) + #logger.debug('*****************************************') if int(playlist['played']) == 1: logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey) @@ -156,11 +170,13 @@ class PypoFetch(Thread): logger.info("%s", e) return playlists + + """ + Download and cache the media files. + This handles both remote and local files. + Returns an updated ls_playlist string. + """ def handle_media_file(self, playlist, pkey): - """ - This handles both remote and local files. - Returns an updated ls_playlist string. - """ ls_playlist = [] logger = logging.getLogger('fetch') @@ -170,11 +186,11 @@ class PypoFetch(Thread): fileExt = os.path.splitext(media['uri'])[1] try: if str(media['cue_in']) == '0' and str(media['cue_out']) == '0': - logger.debug('No cue in/out detected for this file') + #logger.debug('No cue in/out detected for this file') dst = "%s%s/%s%s" % (self.cache_dir, str(pkey), str(media['id']), str(fileExt)) do_cue = False else: - logger.debug('Cue in/out detected') + #logger.debug('Cue in/out detected') dst = "%s%s/%s_cue_%s-%s%s" % \ (self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt)) do_cue = True @@ -199,7 +215,7 @@ class PypoFetch(Thread): % (str(media['export_source']), media['id'], 0, str(float(media['fade_in']) / 1000), \ str(float(media['fade_out']) / 1000), media['row_id'],dst) - logger.debug(pl_entry) + #logger.debug(pl_entry) """ Tracks are only added to the playlist if they are accessible @@ -213,7 +229,7 @@ class PypoFetch(Thread): entry['show_name'] = playlist['show_name'] ls_playlist.append(entry) - logger.debug("everything ok, adding %s to playlist", pl_entry) + #logger.debug("everything ok, adding %s to playlist", pl_entry) else: print 'zero-file: ' + dst + ' from ' + media['uri'] logger.warning("zero-size file - skipping %s. will not add it to playlist", dst) @@ -225,11 +241,15 @@ class PypoFetch(Thread): return ls_playlist + """ + Download a file from a remote server and store it in the cache. + """ def handle_remote_file(self, media, dst, do_cue): logger = logging.getLogger('fetch') if do_cue == False: if os.path.isfile(dst): - logger.debug("file already in cache: %s", dst) + pass + #logger.debug("file already in cache: %s", dst) else: logger.debug("try to download %s", media['uri']) self.api_client.get_media(media['uri'], dst) @@ -270,11 +290,11 @@ class PypoFetch(Thread): logger.error("%s", e) + """ + Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" + and deletes them. + """ def cleanup(self, export_source): - """ - Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" - and deletes them. - """ logger = logging.getLogger('fetch') offset = 3600 * int(config["cache_for"]) @@ -297,18 +317,41 @@ class PypoFetch(Thread): print e logger.error("%s", e) + + """ + Main loop of the thread: + Wait for schedule updates from RabbitMQ, but in case there arent any, + poll the server to get the upcoming schedule. + """ def run(self): - loops = 0 - heartbeat_period = math.floor(30/POLL_INTERVAL) logger = logging.getLogger('fetch') - + + try: os.mkdir(self.cache_dir) + except Exception, e: pass + + # Bootstrap: since we are just starting up, we need to grab the + # most recent schedule. After that we can just wait for updates. + status, schedule_data = self.api_client.get_schedule() + if status == 1: + self.process_schedule(schedule_data, "scheduler") + logger.info("Bootstrap complete: got initial copy of the schedule") + + loops = 1 while True: - if loops % heartbeat_period == 0: - logger.info("heartbeat") - loops = 0 - try: self.fetch('scheduler') - except Exception, e: - logger.error('Pypo Fetch Error, exiting: %s', e) - sys.exit() - time.sleep(POLL_INTERVAL) + logger.info("Loop #"+str(loops)) + try: + # Wait for messages from RabbitMQ. Timeout if we + # dont get any after POLL_INTERVAL. + self.connection.drain_events(timeout=POLL_INTERVAL) + # Hooray for globals! + schedule_data = SCHEDULE_PUSH_MSG + status = 1 + except: + # We didnt get a message for a while, so poll the server + # to get an updated schedule. + status, schedule_data = self.api_client.get_schedule() + + if status == 1: + self.process_schedule(schedule_data, "scheduler") loops += 1 + diff --git a/pypo/pypopush.py b/pypo/pypopush.py index 62203fc59..25bfddbf1 100644 --- a/pypo/pypopush.py +++ b/pypo/pypopush.py @@ -38,6 +38,7 @@ class PypoPush(Thread): self.schedule = dict() self.playlists = dict() + self.stream_metadata = dict() """ push_ahead2 MUST be < push_ahead. The difference in these two values @@ -53,18 +54,21 @@ class PypoPush(Thread): self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" """ - The Push Loop - the push loop periodically (minimal 1/2 of the playlist-grid) - checks if there is a playlist that should be scheduled at the current time. - If yes, the temporary liquidsoap playlist gets replaced with the corresponding one, + The Push Loop - the push loop periodically checks if there is a playlist + that should be scheduled at the current time. + If yes, the current liquidsoap playlist gets replaced with the corresponding one, then liquidsoap is asked (via telnet) to reload and immediately play it. """ def push(self, export_source): logger = logging.getLogger('push') + # get a new schedule from pypo-fetch if not self.queue.empty(): scheduled_data = self.queue.get() + logger.debug("Received data from pypo-fetch") self.schedule = scheduled_data['schedule'] self.playlists = scheduled_data['playlists'] + self.stream_metadata = scheduled_data['stream_metadata'] schedule = self.schedule playlists = self.playlists @@ -120,7 +124,8 @@ class PypoPush(Thread): if start <= str_tnow_s and str_tnow_s < end: currently_on_air = True else: - logger.debug('Empty schedule') + pass + #logger.debug('Empty schedule') if not currently_on_air: tn = telnetlib.Telnet(LS_HOST, LS_PORT) @@ -184,7 +189,7 @@ class PypoPush(Thread): def load_schedule_tracker(self): logger = logging.getLogger('push') - logger.debug('load_schedule_tracker') + #logger.debug('load_schedule_tracker') playedItems = dict() # create the file if it doesnt exist @@ -197,7 +202,7 @@ class PypoPush(Thread): except Exception, e: logger.error('Error creating schedule tracker file: %s', e) else: - logger.debug('schedule tracker file exists, opening: ' + self.schedule_tracker_file) + #logger.debug('schedule tracker file exists, opening: ' + self.schedule_tracker_file) try: schedule_tracker = open(self.schedule_tracker_file, "r") playedItems = pickle.load(schedule_tracker)