diff --git a/legacy/application/models/RabbitMq.php b/legacy/application/models/RabbitMq.php index 3c2ea47f5..a6303e5b5 100644 --- a/legacy/application/models/RabbitMq.php +++ b/legacy/application/models/RabbitMq.php @@ -57,7 +57,7 @@ class Application_Model_RabbitMq $exchange = 'airtime-pypo'; $data = json_encode($md, JSON_FORCE_OBJECT); - self::sendMessage($exchange, 'direct', true, $data); + self::sendMessage($exchange, 'fanout', true, $data); } public static function SendMessageToMediaMonitor($event_type, $md) @@ -88,7 +88,7 @@ class Application_Model_RabbitMq } $data = json_encode($temp); - self::sendMessage($exchange, 'direct', true, $data); + self::sendMessage($exchange, 'fanout', true, $data); } public static function SendMessageToAnalyzer( diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index 6dc48417d..5f524b80c 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -27,8 +27,13 @@ class MessageHandler(ConsumerMixin): self.fetch_queue = fetch_queue def get_consumers(self, Consumer, channel): - exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) - queues = [Queue("pypo-fetch", exchange=exchange, key="foo")] + exchange = Exchange("airtime-pypo", "fanout", durable=True, auto_delete=True) + # RabbitMQ says to avoid temporary queues with well-known names + # https://www.rabbitmq.com/docs/queues#shared-temporary-queues + # A server named queue that expires is used so that if the service + # gets temporarily disconnected, no events are lost, but the queue is + # automatically cleaned up on shutdown. + queues = [Queue("", exchange=exchange, expires=30.0)] return [ Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),