From 9e55d3bb6f2fa0f9a4dc858359a99e9d50c826a4 Mon Sep 17 00:00:00 2001 From: dakriy Date: Fri, 6 Jun 2025 21:05:04 -0700 Subject: [PATCH] feat: use fanout queue type for playout queue (#3161) ### Description Currently, only one service can listen to libretime schedule change events. This change allows for as many services as desired to listen for schedule change events. **This is a new feature**: Yes **I have updated the documentation to reflect these changes**: No, as this seems like the obvious default ### Testing Notes **What I did:** I created 2 playout blocks, connected them both to the fanout queue and saw that they could all connect and receive schedule change events at the same time. **How you can replicate my testing:** See testing notes --- legacy/application/models/RabbitMq.php | 4 ++-- playout/libretime_playout/message_handler.py | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/legacy/application/models/RabbitMq.php b/legacy/application/models/RabbitMq.php index 3c2ea47f5..a6303e5b5 100644 --- a/legacy/application/models/RabbitMq.php +++ b/legacy/application/models/RabbitMq.php @@ -57,7 +57,7 @@ class Application_Model_RabbitMq $exchange = 'airtime-pypo'; $data = json_encode($md, JSON_FORCE_OBJECT); - self::sendMessage($exchange, 'direct', true, $data); + self::sendMessage($exchange, 'fanout', true, $data); } public static function SendMessageToMediaMonitor($event_type, $md) @@ -88,7 +88,7 @@ class Application_Model_RabbitMq } $data = json_encode($temp); - self::sendMessage($exchange, 'direct', true, $data); + self::sendMessage($exchange, 'fanout', true, $data); } public static function SendMessageToAnalyzer( diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index 6dc48417d..5f524b80c 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -27,8 +27,13 @@ class MessageHandler(ConsumerMixin): self.fetch_queue = fetch_queue def get_consumers(self, Consumer, channel): - exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) - queues = [Queue("pypo-fetch", exchange=exchange, key="foo")] + exchange = Exchange("airtime-pypo", "fanout", durable=True, auto_delete=True) + # RabbitMQ says to avoid temporary queues with well-known names + # https://www.rabbitmq.com/docs/queues#shared-temporary-queues + # A server named queue that expires is used so that if the service + # gets temporarily disconnected, no events are lost, but the queue is + # automatically cleaned up on shutdown. + queues = [Queue("", exchange=exchange, expires=30.0)] return [ Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),