From 2c2bb86698fe83ac739186e9e458c51b9d0f6b7b Mon Sep 17 00:00:00 2001 From: martin Date: Tue, 13 Sep 2011 14:56:24 -0400 Subject: [PATCH] CC-2750: Ability to query health status for pypo, liquidsoap, media monitor, and recorder -fixed rabbitmq not delivering messages --- .../controllers/plugins/RabbitMqPlugin.php | 3 +- airtime_mvc/application/models/RabbitMq.php | 2 - .../application/models/Systemstatus.php | 3 ++ python_apps/pypo/pypofetch.py | 46 +++++++++++-------- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/airtime_mvc/application/controllers/plugins/RabbitMqPlugin.php b/airtime_mvc/application/controllers/plugins/RabbitMqPlugin.php index 1a55522ae..0d5c9ab62 100644 --- a/airtime_mvc/application/controllers/plugins/RabbitMqPlugin.php +++ b/airtime_mvc/application/controllers/plugins/RabbitMqPlugin.php @@ -7,6 +7,7 @@ class RabbitMqPlugin extends Zend_Controller_Plugin_Abstract if (RabbitMq::$doPush) { $md = array('schedule' => Schedule::GetScheduledPlaylists()); RabbitMq::SendMessageToPypo("update_schedule", $md); + RabbitMq::SendMessageToShowRecorder("update_schedule"); } } -} \ No newline at end of file +} diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index daf9fa123..ed94f1a2f 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -34,8 +34,6 @@ class RabbitMq $channel->basic_publish($msg, $EXCHANGE); $channel->close(); $conn->close(); - - self::SendMessageToShowRecorder("update_schedule"); } public static function SendMessageToMediaMonitor($event_type, $md) diff --git a/airtime_mvc/application/models/Systemstatus.php b/airtime_mvc/application/models/Systemstatus.php index f8a31038d..ba9249e37 100644 --- a/airtime_mvc/application/models/Systemstatus.php +++ b/airtime_mvc/application/models/Systemstatus.php @@ -4,6 +4,9 @@ class Application_Model_Systemstatus { public static function GetPypoStatus(){ + + RabbitMq::SendMessageToPypo("get_status", array()); + return array( "process_id"=>500, "uptime_seconds"=>3600 diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 76c5b0570..1d497b834 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -20,6 +20,7 @@ import filecmp # For RabbitMQ from kombu.connection import BrokerConnection from kombu.messaging import Exchange, Queue, Consumer, Producer +from kombu.exceptions import MessageStateError from api_clients import api_client @@ -72,26 +73,33 @@ class PypoFetch(Thread): Hopefully there is a better way to do this. """ def handle_message(self, body, message): - logger = logging.getLogger('fetch') - logger.info("Received event from RabbitMQ: " + message.body) + try: + logger = logging.getLogger('fetch') + logger.info("Received event from RabbitMQ: " + message.body) + + m = json.loads(message.body) + command = m['event_type'] + logger.info("Handling command: " + command) - m = json.loads(message.body) - command = m['event_type'] - logger.info("Handling command: " + command) - - if command == 'update_schedule': - self.schedule_data = m['schedule'] - self.process_schedule(self.schedule_data, "scheduler", False) - elif command == 'update_stream_setting': - logger.info("Updating stream setting...") - self.regenerateLiquidsoapConf(m['setting']) - elif command == 'cancel_current_show': - logger.info("Cancel current show command received...") - self.stop_current_show() - elif command == 'get_status': - self.get_status() - # ACK the message to take it off the queue - message.ack() + if command == 'update_schedule': + self.schedule_data = m['schedule'] + self.process_schedule(self.schedule_data, "scheduler", False) + elif command == 'update_stream_setting': + logger.info("Updating stream setting...") + self.regenerateLiquidsoapConf(m['setting']) + elif command == 'cancel_current_show': + logger.info("Cancel current show command received...") + self.stop_current_show() + elif command == 'get_status': + self.get_status() + except Exception, e: + logger.error("Exception in handling RabbitMQ message: %s", e) + finally: + # ACK the message to take it off the queue + try: + message.ack() + except MessageStateError, m: + logger.error("Message ACK error: %s", m); def get_status(self): logger = logging.getLogger('fetch')