feat: use fanout queue type for playout queue (#3161)

### Description

Currently, only one service can listen to libretime schedule change
events. This change allows for as many services as desired to listen for
schedule change events.

**This is a new feature**:

Yes

**I have updated the documentation to reflect these changes**:

No, as this seems like the obvious default

### Testing Notes

**What I did:**

I created 2 playout blocks, connected them both to the fanout queue and
saw that they could all connect and receive schedule change events at
the same time.

**How you can replicate my testing:**

See testing notes
This commit is contained in:
dakriy 2025-06-06 21:05:04 -07:00 committed by GitHub
parent d3be6772de
commit 9e55d3bb6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 9 additions and 4 deletions

View File

@ -57,7 +57,7 @@ class Application_Model_RabbitMq
$exchange = 'airtime-pypo';
$data = json_encode($md, JSON_FORCE_OBJECT);
self::sendMessage($exchange, 'direct', true, $data);
self::sendMessage($exchange, 'fanout', true, $data);
}
public static function SendMessageToMediaMonitor($event_type, $md)
@ -88,7 +88,7 @@ class Application_Model_RabbitMq
}
$data = json_encode($temp);
self::sendMessage($exchange, 'direct', true, $data);
self::sendMessage($exchange, 'fanout', true, $data);
}
public static function SendMessageToAnalyzer(

View File

@ -27,8 +27,13 @@ class MessageHandler(ConsumerMixin):
self.fetch_queue = fetch_queue
def get_consumers(self, Consumer, channel):
exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
queues = [Queue("pypo-fetch", exchange=exchange, key="foo")]
exchange = Exchange("airtime-pypo", "fanout", durable=True, auto_delete=True)
# RabbitMQ says to avoid temporary queues with well-known names
# https://www.rabbitmq.com/docs/queues#shared-temporary-queues
# A server named queue that expires is used so that if the service
# gets temporarily disconnected, no events are lost, but the queue is
# automatically cleaned up on shutdown.
queues = [Queue("", exchange=exchange, expires=30.0)]
return [
Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),