diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 97613e81f..9b890321c 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -120,8 +120,13 @@ class MessageListener: def disconnect_from_messaging_server(self): '''Stop consuming RabbitMQ messages and disconnect''' - self._channel.stop_consuming() - self._connection.close() + # If you try to close a connection that's already closed, you're going to have a bad time. + # We're breaking EAFP because this can be called multiple times depending on exception + # handling flow here. + if not self._channel.is_closed and not self._channel.is_closing: + self._channel.stop_consuming() + if not self._connection.is_closed and not self._connection.is_closing: + self._connection.close() def graceful_shutdown(self, signum, frame): '''Disconnect and break out of the message listening loop''' diff --git a/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py index ade3c5bdb..ebf9a12d5 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py @@ -38,9 +38,9 @@ def process_http_requests(ipc_queue, http_retry_queue_path): # retried later: retry_queue = collections.deque() shutdown = False - - # Unpickle retry_queue from disk so that we won't have lost any uploads - # if airtime_analyzer is shut down while the web server is down or unreachable, + + # Unpickle retry_queue from disk so that we won't have lost any uploads + # if airtime_analyzer is shut down while the web server is down or unreachable, # and there were failed HTTP requests pending, waiting to be retried. try: with open(http_retry_queue_path, 'rb') as pickle_file: @@ -57,33 +57,42 @@ def process_http_requests(ipc_queue, http_retry_queue_path): logging.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path) pass - - while not shutdown: + while True: try: - request = ipc_queue.get(block=True, timeout=5) - if isinstance(request, str) and request == "shutdown": # Bit of a cheat - shutdown = True - break - if not isinstance(request, PicklableHttpRequest): - raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__) - except Queue.Empty: - request = None - - # If there's no new HTTP request we need to execute, let's check our "retry - # queue" and see if there's any failed HTTP requests we can retry: - if request: - send_http_request(request, retry_queue) - else: - # Using a for loop instead of while so we only iterate over all the requests once! - for i in range(len(retry_queue)): - request = retry_queue.popleft() - send_http_request(request, retry_queue) + while not shutdown: + try: + request = ipc_queue.get(block=True, timeout=5) + if isinstance(request, str) and request == "shutdown": # Bit of a cheat + shutdown = True + break + if not isinstance(request, PicklableHttpRequest): + raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__) + except Queue.Empty: + request = None + + # If there's no new HTTP request we need to execute, let's check our "retry + # queue" and see if there's any failed HTTP requests we can retry: + if request: + send_http_request(request, retry_queue) + else: + # Using a for loop instead of while so we only iterate over all the requests once! + for i in range(len(retry_queue)): + request = retry_queue.popleft() + send_http_request(request, retry_queue) + + logging.info("Shutting down status_reporter") + # Pickle retry_queue to disk so that we don't lose uploads if we're shut down while + # while the web server is down or unreachable. + with open(http_retry_queue_path, 'wb') as pickle_file: + pickle.dump(retry_queue, pickle_file) + except Exception as e: # Terrible top-level exception handler to prevent the thread from dying, just in case. + if shutdown: + return + logging.exception("Unhandled exception in StatusReporter") + logging.exception(e) + logging.info("Restarting StatusReporter thread") + time.sleep(2) # Throttle it - logging.info("Shutting down status_reporter") - # Pickle retry_queue to disk so that we don't lose uploads if we're shut down while - # while the web server is down or unreachable. - with open(http_retry_queue_path, 'wb') as pickle_file: - pickle.dump(retry_queue, pickle_file) def send_http_request(picklable_request, retry_queue): if not isinstance(picklable_request, PicklableHttpRequest):