From eadf68cb6106d2db3423516dde0e68cf356b4060 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 10 Feb 2012 18:43:40 -0500 Subject: [PATCH] CC-3318: When changing stream settings (Liquidsoap + Pypo restart), sometimes Airtime does not resume playback -fixed --- .../controllers/PreferenceController.php | 2 - airtime_mvc/application/models/Preference.php | 12 +++- .../application/models/StreamSetting.php | 10 ++- .../pypo/liquidsoap_scripts/ls_lib.liq | 3 +- python_apps/pypo/pypofetch.py | 70 +++++++++++++------ python_apps/pypo/pypopush.py | 9 +-- 6 files changed, 71 insertions(+), 35 deletions(-) diff --git a/airtime_mvc/application/controllers/PreferenceController.php b/airtime_mvc/application/controllers/PreferenceController.php index b297700b0..98f0e62c7 100644 --- a/airtime_mvc/application/controllers/PreferenceController.php +++ b/airtime_mvc/application/controllers/PreferenceController.php @@ -219,8 +219,6 @@ class PreferenceController extends Zend_Controller_Action for($i=1;$i<=$num_of_stream;$i++){ Application_Model_StreamSetting::setLiquidsoapError($i, "waiting"); } - // this goes into cc_pref table - Application_Model_Preference::SetStreamLabelFormat($values['streamFormat']); // store stream update timestamp Application_Model_Preference::SetStreamUpdateTimestamp(); Application_Model_RabbitMq::SendMessageToPypo("update_stream_setting", $data); diff --git a/airtime_mvc/application/models/Preference.php b/airtime_mvc/application/models/Preference.php index 3e77a93d8..180ab0fa4 100644 --- a/airtime_mvc/application/models/Preference.php +++ b/airtime_mvc/application/models/Preference.php @@ -102,7 +102,6 @@ class Application_Model_Preference public static function SetHeadTitle($title, $view=null){ self::SetValue("station_name", $title); - Application_Model_RabbitMq::PushSchedule(); // in case this is called from airtime-saas script if($view !== null){ @@ -111,6 +110,11 @@ class Application_Model_Preference $view->headTitle()->exchangeArray(array()); //clear headTitle ArrayObject $view->headTitle(self::GetHeadTitle()); } + + $eventType = "update_station_name"; + $md = array("station_name"=>$title); + + Application_Model_RabbitMq::SendMessageToPypo($eventType, $md); } /** @@ -153,7 +157,11 @@ class Application_Model_Preference public static function SetStreamLabelFormat($type){ self::SetValue("stream_label_format", $type); - Application_Model_RabbitMq::PushSchedule(); + + $eventType = "update_stream_format"; + $md = array("stream_format"=>$type); + + Application_Model_RabbitMq::SendMessageToPypo($eventType, $md); } public static function GetStreamLabelFormat(){ diff --git a/airtime_mvc/application/models/StreamSetting.php b/airtime_mvc/application/models/StreamSetting.php index 1e4d77956..0ae84a009 100644 --- a/airtime_mvc/application/models/StreamSetting.php +++ b/airtime_mvc/application/models/StreamSetting.php @@ -83,8 +83,12 @@ class Application_Model_StreamSetting { $CC_DBC->query($sql); } else if ($key == "output_sound_device_type") { $sql = "UPDATE cc_stream_setting SET value='$d' WHERE keyname='$key'"; - $CC_DBC->query($sql); - } else { + $CC_DBC->query($sql); + } else if ($key == "streamFormat"){ + // this goes into cc_pref table + Logging::log("Insert stream label format $d"); + Application_Model_Preference::SetStreamLabelFormat($d); + } else if (is_array($d)) { $temp = explode('_', $key); $prefix = $temp[0]; foreach ($d as $k=>$v) { @@ -96,6 +100,8 @@ class Application_Model_StreamSetting { $sql = "UPDATE cc_stream_setting SET value='$v' WHERE keyname='$keyname'"; $CC_DBC->query($sql); } + } else { + Logging::log("Warning unexpected value: ".$key); } } } diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index b1a98f3c6..74c289f84 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -5,10 +5,9 @@ end # A function applied to each metadata chunk def append_title(m) = + log("Using stream_format #{!stream_metadata_type}") if !stream_metadata_type == 1 then [("artist","#{!show_name} - #{m['artist']}")] - #####elsif !stream_metadata_type == 2 then - ##### [("artist", ""), ("title", !show_name)] elsif !stream_metadata_type == 2 then [("artist",!station_name), ("title", !show_name)] else diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 4add31cd8..e148490ac 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -11,7 +11,7 @@ import json import telnetlib import math import socket -from threading import Thread +from threading import Thread, Lock from subprocess import Popen, PIPE from datetime import datetime from datetime import timedelta @@ -44,11 +44,12 @@ except Exception, e: class PypoFetch(Thread): def __init__(self, q): Thread.__init__(self) - logger = logging.getLogger('fetch') + self.lock = Lock() self.api_client = api_client.api_client_factory(config) self.set_export_source('scheduler') self.queue = q self.schedule_data = [] + logger = logging.getLogger('fetch') logger.info("PypoFetch: init complete") def init_rabbit_mq(self): @@ -74,6 +75,12 @@ class PypoFetch(Thread): """ def handle_message(self, body, message): try: + + #Acquire Lock because multiple rabbitmq messages can be sent simultaneously + #and therefore we can have multiple threads inside this function. This causes + #multiple telnet connections to Liquidsoap which causes problems (refused connections). + self.lock.acquire() + logger = logging.getLogger('fetch') logger.info("Received event from RabbitMQ: " + message.body) @@ -87,14 +94,21 @@ class PypoFetch(Thread): elif command == 'update_stream_setting': logger.info("Updating stream setting...") self.regenerateLiquidsoapConf(m['setting']) + elif command == 'update_stream_format': + logger.info("Updating stream format...") + self.update_liquidsoap_stream_format(m['stream_format']) + elif command == 'update_station_name': + logger.info("Updating station name...") + self.update_liquidsoap_station_name(m['station_name']) elif command == 'cancel_current_show': logger.info("Cancel current show command received...") self.stop_current_show() except Exception, e: logger.error("Exception in handling RabbitMQ message: %s", e) finally: - # ACK the message to take it off the queue + self.lock.release() try: + # ACK the message to take it off the queue message.ack() except MessageStateError, m: logger.error("Message ACK error: %s", m) @@ -257,6 +271,39 @@ class PypoFetch(Thread): self.cache_dir = config["cache_dir"] + self.export_source + '/' logger.info("Creating cache directory at %s", self.cache_dir) + + def update_liquidsoap_stream_format(self, stream_format): + # Push stream metadata to liquidsoap + # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! + try: + logger = logging.getLogger('fetch') + logger.info(LS_HOST) + logger.info(LS_PORT) + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') + logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + logger.error("Exception %s", e) + + def update_liquidsoap_station_name(self, station_name): + # Push stream metadata to liquidsoap + # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! + try: + logger = logging.getLogger('fetch') + logger.info(LS_HOST) + logger.info(LS_PORT) + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + command = ('vars.station_name %s\n' % station_name).encode('utf-8') + logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + logger.error("Exception %s", e) + """ Process the schedule - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") @@ -269,22 +316,6 @@ class PypoFetch(Thread): logger = logging.getLogger('fetch') playlists = schedule_data["playlists"] - # Push stream metadata to liquidsoap - # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! - stream_metadata = schedule_data['stream_metadata'] - try: - logger.info(LS_HOST) - logger.info(LS_PORT) - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - #encode in latin-1 due to telnet protocol not supporting utf-8 - tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1')) - tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1')) - tn.write('exit\n') - tn.read_all() - except Exception, e: - logger.error("Exception %s", e) - status = 0 - # Download all the media and put playlists in liquidsoap "annotate" format try: liquidsoap_playlists = self.prepare_playlists(playlists, bootstrapping) @@ -294,7 +325,6 @@ class PypoFetch(Thread): scheduled_data = dict() scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists scheduled_data['schedule'] = playlists - scheduled_data['stream_metadata'] = schedule_data["stream_metadata"] self.queue.put(scheduled_data) # cleanup diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index d616cf8cc..24f48c7cb 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -39,7 +39,6 @@ class PypoPush(Thread): self.schedule = dict() self.playlists = dict() - self.stream_metadata = dict() self.liquidsoap_state_play = True self.push_ahead = 10 @@ -58,7 +57,7 @@ class PypoPush(Thread): def push(self, export_source): logger = logging.getLogger('push') - + timenow = time.time() # get a new schedule from pypo-fetch if not self.queue.empty(): # make sure we get the latest schedule @@ -67,16 +66,12 @@ class PypoPush(Thread): logger.debug("Received data from pypo-fetch") self.schedule = scheduled_data['schedule'] self.playlists = scheduled_data['liquidsoap_playlists'] - self.stream_metadata = scheduled_data['stream_metadata'] logger.debug('schedule %s' % json.dumps(self.schedule)) logger.debug('playlists %s' % json.dumps(self.playlists)) schedule = self.schedule playlists = self.playlists - - timenow = time.time() - logger.debug('timenow %s' % timenow) currently_on_air = False if schedule: @@ -169,7 +164,7 @@ class PypoPush(Thread): #Sending schedule table row id string. logger.debug("vars.pypo_data %s\n"%(liquidsoap_data["schedule_id"])) - tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('latin-1')) + tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) logger.debug('Preparing to push playlist %s' % pkey) for item in playlist: