diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index a178fb585..7af846127 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -78,12 +78,14 @@ class Application_Model_RabbitMq self::sendMessage($exchange, 'direct', true, $data); } - - public static function SendMessageToAnalyzer($tmpFilePath, $finalDirectory, $callbackUrl, $apiKey) + + public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, + $callbackUrl, $apiKey) { $exchange = 'airtime-uploads'; $data['tmp_file_path'] = $tmpFilePath; - $data['final_directory'] = $finalDirectory; + $data['import_directory'] = $importedStorageDirectory; + $data['original_filename'] = $originalFilename; $data['callback_url'] = $callbackUrl; $data['api_key'] = $apiKey; diff --git a/airtime_mvc/application/models/StoredFile.php b/airtime_mvc/application/models/StoredFile.php index 009572647..7f1aa7d69 100644 --- a/airtime_mvc/application/models/StoredFile.php +++ b/airtime_mvc/application/models/StoredFile.php @@ -988,48 +988,69 @@ SQL; return $freeSpace >= $fileSize; } - public static function copyFileToStor($p_targetDir, $fileName, $tempname) + /** + * Copy a newly uploaded audio file from its temporary upload directory + * on the local disk (like /tmp) over to Airtime's "stor" directory, + * which is where all ingested music/media live. + * + * This is done in PHP here on the web server rather than in airtime_analyzer because + * the airtime_analyzer might be running on a different physical computer than the web server, + * and it probably won't have access to the web server's /tmp folder. The stor/organize directory + * is, however, both accessible to the machines running airtime_analyzer and the web server + * on Airtime Pro. + * + * The file is actually copied to "stor/organize", which is a staging directory where files go + * before they're processed by airtime_analyzer, which then moves them to "stor/imported" in the final + * step. + * + * TODO: Implement better error handling here... + * + * @param string $tempFilePath + * @param string $originalFilename + * @throws Exception + * @return Ambigous + */ + public static function copyFileToStor($tempFilePath, $originalFilename) { - $audio_file = $p_targetDir . DIRECTORY_SEPARATOR . $tempname; + $audio_file = $tempFilePath; Logging::info('copyFileToStor: moving file '.$audio_file); - + $storDir = Application_Model_MusicDir::getStorDir(); $stor = $storDir->getDirectory(); // check if "organize" dir exists and if not create one if (!file_exists($stor."/organize")) { if (!mkdir($stor."/organize", 0777)) { - return array( - "code" => 109, - "message" => _("Failed to create 'organize' directory.")); + throw new Exception("Failed to create organize directory."); } } - + if (chmod($audio_file, 0644) === false) { Logging::info("Warning: couldn't change permissions of $audio_file to 0644"); } - + // Check if we have enough space before copying if (!self::isEnoughDiskSpaceToCopy($stor, $audio_file)) { $freeSpace = disk_free_space($stor); $fileSize = filesize($audio_file); - - return array("code" => 107, - "message" => sprintf(_("The file was not uploaded, there is " - ."%s MB of disk space left and the file you are " - ."uploading has a size of %s MB."), $freeSpace, $fileSize)); + + throw new Exception(sprintf(_("The file was not uploaded, there is " + ."%s MB of disk space left and the file you are " + ."uploading has a size of %s MB."), $freeSpace, $fileSize)); } - + // Check if liquidsoap can play this file + // TODO: Move this to airtime_analyzer if (!self::liquidsoapFilePlayabilityTest($audio_file)) { return array( - "code" => 110, - "message" => _("This file appears to be corrupted and will not " - ."be added to media library.")); + "code" => 110, + "message" => _("This file appears to be corrupted and will not " + ."be added to media library.")); } + // Did all the checks for real, now trying to copy $audio_stor = Application_Common_OsPath::join($stor, "organize", - $fileName); + $originalFilename); $user = Application_Model_User::getCurrentUser(); if (is_null($user)) { $uid = Application_Model_User::getFirstAdminId(); @@ -1044,7 +1065,7 @@ SQL; written)"); } else { Logging::info("Successfully written identification file for - uploaded '$audio_stor'"); + uploaded '$audio_stor'"); } //if the uploaded file is not UTF-8 encoded, let's encode it. Assuming source //encoding is ISO-8859-1 @@ -1059,18 +1080,14 @@ SQL; //is enough disk space . unlink($audio_file); //remove the file after failed rename unlink($id_file); // Also remove the identifier file - - return array( - "code" => 108, - "message" => _("The file was not uploaded, this error can occur if the computer " - ."hard drive does not have enough disk space or the stor " - ."directory does not have correct write permissions.")); + + throw new Exception("The file was not uploaded, this error can occur if the computer " + ."hard drive does not have enough disk space or the stor " + ."directory does not have correct write permissions."); } - // Now that we successfully added this file, we will add another tag - // file that will identify the user that owns it - return null; + return $audio_stor; } - + /* * Pass the file through Liquidsoap and test if it is readable. Return True if readable, and False otherwise. */ diff --git a/airtime_mvc/application/modules/rest/controllers/MediaController.php b/airtime_mvc/application/modules/rest/controllers/MediaController.php index 011f2eeff..e0f893930 100644 --- a/airtime_mvc/application/modules/rest/controllers/MediaController.php +++ b/airtime_mvc/application/modules/rest/controllers/MediaController.php @@ -73,7 +73,7 @@ class Rest_MediaController extends Zend_Rest_Controller $file->save(); $callbackUrl = $this->getRequest()->getScheme() . '://' . $this->getRequest()->getHttpHost() . $this->getRequest()->getRequestUri() . "/" . $file->getPrimaryKey(); - $this->processUploadedFile($callbackUrl); + $this->processUploadedFile($callbackUrl, $_FILES["file"]["name"], $this->getOwnerId()); $this->getResponse() ->setHttpResponseCode(201) @@ -95,7 +95,27 @@ class Rest_MediaController extends Zend_Rest_Controller { //TODO: Strip or sanitize the JSON output - $file->fromArray(json_decode($this->getRequest()->getRawBody(), true), BasePeer::TYPE_FIELDNAME); + $fileFromJson = json_decode($this->getRequest()->getRawBody(), true); + + //Our RESTful API takes "full_path" as a field, which we then split and translate to match + //our internal schema. Internally, file path is stored relative to a directory, with the directory + //as a foreign key to cc_music_dirs. + if ($fileFromJson["full_path"]) { + + $fullPath = $fileFromJson["full_path"]; + $storDir = Application_Model_MusicDir::getStorDir()->getDirectory(); + $pos = strpos($fullPath, $storDir); + + if ($pos !== FALSE) + { + assert($pos == 0); //Path must start with the stor directory path + + $filePathRelativeToStor = substr($fullPath, strlen($storDir)); + $fileFromJson["filepath"] = $filePathRelativeToStor; + $fileFromJson["directory"] = 1; //1 corresponds to the default stor/imported directory. + } + } + $file->fromArray($fileFromJson, BasePeer::TYPE_FIELDNAME); $file->save(); $this->getResponse() ->setHttpResponseCode(200) @@ -181,7 +201,7 @@ class Rest_MediaController extends Zend_Rest_Controller $resp->appendBody("ERROR: Media not found."); } - private function processUploadedFile($callbackUrl) + private function processUploadedFile($callbackUrl, $originalFilename, $ownerId) { $CC_CONFIG = Config::getConfig(); $apiKey = $CC_CONFIG["apiKey"][0]; @@ -192,14 +212,32 @@ class Rest_MediaController extends Zend_Rest_Controller //TODO: Remove copyFileToStor from StoredFile... - $storDir = Application_Model_MusicDir::getStorDir(); - $finalDestinationDir = $storDir->getDirectory() . "/organize"; + //TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon? + $upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; + $tempFilePath = $upload_dir . "/" . $tempFileName; + + $storDir = Application_Model_MusicDir::getStorDir(); + //$finalFullFilePath = $storDir->getDirectory() . "/imported/" . $ownerId . "/" . $originalFilename; + $importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId; + + + try { + //Copy the temporary file over to the "organize" folder so that it's off our webserver + //and accessible by airtime_analyzer which could be running on a different machine. + $newTempFilePath = Application_Model_StoredFile::copyFileToStor($tempFilePath, $originalFilename); + } catch (Exception $e) { + Logging::error($e->getMessage()); + } + + //Logging::info("New temporary file path: " . $newTempFilePath); + //Logging::info("Final file path: " . $finalFullFilePath); + //Dispatch a message to airtime_analyzer through RabbitMQ, //notifying it that there's a new upload to process! - Application_Model_RabbitMq::SendMessageToAnalyzer($tempFilePath, - $finalDestinationDir, $callbackUrl, $apiKey); - + Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath, + $importedStorageDirectory, $originalFilename, + $callbackUrl, $apiKey); } private function getOwnerId() diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index c4a4d18a3..af880a735 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -1,7 +1,7 @@ import logging import multiprocessing import shutil -import os +import os, errno from metadata_analyzer import MetadataAnalyzer class AnalyzerPipeline: @@ -12,28 +12,59 @@ class AnalyzerPipeline: # Take message dictionary and perform the necessary analysis. @staticmethod - def run_analysis(queue, audio_file_path, final_directory): + def run_analysis(queue, audio_file_path, import_directory, original_filename): if not isinstance(queue, multiprocessing.queues.Queue): raise TypeError("queue must be a multiprocessing.Queue()") if not isinstance(audio_file_path, unicode): raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__ + " instead.") - if not isinstance(final_directory, unicode): - raise TypeError("final_directory must be unicode. Was of type " + type(final_directory).__name__ + " instead.") + if not isinstance(import_directory, unicode): + raise TypeError("import_directory must be unicode. Was of type " + type(import_directory).__name__ + " instead.") + if not isinstance(original_filename, unicode): + raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__ + " instead.") + #print ReplayGainAnalyzer.analyze("foo.mp3") # Analyze the audio file we were told to analyze: # First, we extract the ID3 tags and other metadata: - queue.put(MetadataAnalyzer.analyze(audio_file_path)) + results = MetadataAnalyzer.analyze(audio_file_path) # Note that the queue we're putting the results into is our interprocess communication # back to the main process. - #print ReplayGainAnalyzer.analyze("foo.mp3") + #Import the file over to it's final location. + + final_file_path = import_directory + if results.has_key("artist_name"): + final_file_path += "/" + results["artist_name"] + if results.has_key("album"): + final_file_path += "/" + results["album"] + final_file_path += "/" + original_filename - final_audio_file_path = final_directory + os.sep + os.path.basename(audio_file_path) - if os.path.exists(final_audio_file_path) and not os.path.samefile(audio_file_path, final_audio_file_path): - os.remove(final_audio_file_path) + #Ensure any redundant slashes are stripped + final_file_path = os.path.normpath(final_file_path) - shutil.move(audio_file_path, final_audio_file_path) + #final_audio_file_path = final_directory + os.sep + os.path.basename(audio_file_path) + if os.path.exists(final_file_path) and not os.path.samefile(audio_file_path, final_file_path): + raise Exception("File exists and will not be overwritten.") # by design + #Overwriting a file would mean Airtime's database has the wrong information... + + #Ensure the full path to the file exists + mkdir_p(os.path.dirname(final_file_path)) + + #Move the file into its final destination directory + shutil.move(audio_file_path, final_file_path) + + #Pass the full path back to Airtime + results["full_path"] = final_file_path + queue.put(results) + + +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: # Python >2.5 + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: raise diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index d25219ecd..d1311a468 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -83,11 +83,13 @@ class MessageListener: try: msg_dict = json.loads(body) audio_file_path = msg_dict["tmp_file_path"] - final_directory = msg_dict["final_directory"] + #final_file_path = msg_dict["final_file_path"] + import_directory = msg_dict["import_directory"] + original_filename = msg_dict["original_filename"] callback_url = msg_dict["callback_url"] api_key = msg_dict["api_key"] - audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, final_directory) + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: @@ -123,11 +125,11 @@ class MessageListener: channel.basic_ack(delivery_tag=method_frame.delivery_tag) @staticmethod - def spawn_analyzer_process(audio_file_path, final_directory): + def spawn_analyzer_process(audio_file_path, import_directory, original_filename): q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, final_directory)) + args=(q, audio_file_path, import_directory, original_filename)) p.start() p.join() if p.exitcode == 0: