From 451b19150ba18efe5b5c8d402ffb56b231590161 Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Thu, 13 Mar 2014 13:35:48 -0400 Subject: [PATCH] CC-5709: Airtime Analyzer * Notify airtime_analyzer of new uploads with RabbitMQ * Use a durable exchange for airtime-uploads --- airtime_mvc/application/models/RabbitMq.php | 24 ++++++++++++------- .../rest/controllers/MediaController.php | 17 ++++++++++--- .../airtime_analyzer/message_listener.py | 2 +- .../airtime_analyzer/tools/message_sender.php | 2 +- 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index d14d8249f..a178fb585 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -13,7 +13,7 @@ class Application_Model_RabbitMq self::$doPush = true; } - private static function sendMessage($exchange, $data) + private static function sendMessage($exchange, $exchangeType, $autoDeleteExchange, $data, $queue="") { $CC_CONFIG = Config::getConfig(); @@ -31,7 +31,9 @@ class Application_Model_RabbitMq $channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false, true, true); - $channel->exchange_declare($exchange, 'direct', false, true); + //I'm pretty sure we DON'T want to autodelete ANY exchanges but I'm keeping the code + //the way it is just so I don't accidentally break anything when I add the Analyzer code in. -- Albert, March 13, 2014 + $channel->exchange_declare($exchange, $exchangeType, false, true, $autoDeleteExchange); $msg = new AMQPMessage($data, array('content_type' => 'text/plain')); @@ -46,7 +48,7 @@ class Application_Model_RabbitMq $exchange = 'airtime-pypo'; $data = json_encode($md, JSON_FORCE_OBJECT); - self::sendMessage($exchange, $data); + self::sendMessage($exchange, 'direct', true, $data); } public static function SendMessageToMediaMonitor($event_type, $md) @@ -55,7 +57,7 @@ class Application_Model_RabbitMq $exchange = 'airtime-media-monitor'; $data = json_encode($md); - self::sendMessage($exchange, $data); + self::sendMessage($exchange, 'direct', true, $data); } public static function SendMessageToShowRecorder($event_type) @@ -74,14 +76,18 @@ class Application_Model_RabbitMq } $data = json_encode($temp); - self::sendMessage($exchange, $data); + self::sendMessage($exchange, 'direct', true, $data); } - public static function SendMessageToAnalyzer() + public static function SendMessageToAnalyzer($tmpFilePath, $finalDirectory, $callbackUrl, $apiKey) { $exchange = 'airtime-uploads'; - //$data = json_encode($md); - //TODO: Finish me - //self::sendMessage($exchange, $data); + $data['tmp_file_path'] = $tmpFilePath; + $data['final_directory'] = $finalDirectory; + $data['callback_url'] = $callbackUrl; + $data['api_key'] = $apiKey; + + $jsonData = json_encode($data); + self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads'); } } diff --git a/airtime_mvc/application/modules/rest/controllers/MediaController.php b/airtime_mvc/application/modules/rest/controllers/MediaController.php index d7bd9132c..f0eaf7b53 100644 --- a/airtime_mvc/application/modules/rest/controllers/MediaController.php +++ b/airtime_mvc/application/modules/rest/controllers/MediaController.php @@ -66,7 +66,7 @@ class Rest_MediaController extends Zend_Rest_Controller return; } - $this->processUploadedFile(); + $this->processUploadedFile($this->getRequest()->getRequestUri()); //TODO: Strip or sanitize the JSON output $file = new CcFiles(); @@ -179,13 +179,24 @@ class Rest_MediaController extends Zend_Rest_Controller $resp->appendBody("ERROR: Media not found."); } - private function processUploadedFile() + private function processUploadedFile($callbackUrl) { + $CC_CONFIG = Config::getConfig(); + $apiKey = $CC_CONFIG["apiKey"][0]; + $upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; $tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir); $tempFileName = basename($tempFilePath); - //TODO: Dispatch a message to airtime_analyzer through RabbitMQ! + //TODO: Remove copyFileToStor from StoredFile... + + $storDir = Application_Model_MusicDir::getStorDir(); + $finalDestinationDir = $storDir->getDirectory() . "/organize"; + + //Dispatch a message to airtime_analyzer through RabbitMQ, + //notifying it that there's a new upload to process! + Application_Model_RabbitMq::SendMessageToAnalyzer($tempFilePath, + $finalDestinationDir, $callbackUrl, $apiKey); } } \ No newline at end of file diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 64b1b8ea9..d25219ecd 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -54,7 +54,7 @@ class MessageListener: port=self._port, virtual_host=self._vhost, credentials=pika.credentials.PlainCredentials(self._username, self._password))) self._channel = self._connection.channel() - self._channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE) + self._channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE, durable=True) result = self._channel.queue_declare(queue=QUEUE, durable=True) self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) diff --git a/python_apps/airtime_analyzer/tools/message_sender.php b/python_apps/airtime_analyzer/tools/message_sender.php index d5c9171c1..59677673d 100644 --- a/python_apps/airtime_analyzer/tools/message_sender.php +++ b/python_apps/airtime_analyzer/tools/message_sender.php @@ -36,7 +36,7 @@ $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); // declare/create the exchange as a topic exchange. -$channel->exchange_declare($exchange, $exchangeType, false, false, false); +$channel->exchange_declare($exchange, $exchangeType, false, true, false); $msg = new AMQPMessage($message, array("content_type" => "text/plain"));