### Description The playout exchange was changed to `fanout` without migrating the previous `direct` exchange. This cause issues during upgrades, such as: ``` amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'airtime-pypo' in vhost '/libretime': received 'fanout' but current is 'direct' ``` This is documented upstream in https://www.rabbitmq.com/docs/queues#property-equivalence This change provides an upgrade path by renaming the exchange, and leave the old exchange behind. Loosing messages is not a concern for the playout queue. ### Testing Notes - Checkout version `4.4.0` - Run `make dev` - Checkout 9e55d3bb6f2fa0f9a4dc858359a99e9d50c826a4 - Run `make dev` - See the exception in playout: `docker compose logs -f playout` - Checkout this PR - Run `make dev` - See playout fine working. ### **Links** https://github.com/libretime/libretime/pull/3161
105 lines
3.2 KiB
Python
105 lines
3.2 KiB
Python
import json
|
|
import logging
|
|
from queue import Queue as ThreadQueue
|
|
from signal import SIGTERM, signal
|
|
from time import sleep
|
|
from typing import Any, Dict
|
|
|
|
# For RabbitMQ
|
|
from kombu.connection import Connection
|
|
from kombu.message import Message
|
|
from kombu.messaging import Exchange, Queue
|
|
from kombu.mixins import ConsumerMixin
|
|
|
|
from .config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessageHandler(ConsumerMixin):
|
|
def __init__(
|
|
self,
|
|
connection: Connection,
|
|
fetch_queue: "ThreadQueue[Dict[str, Any]]",
|
|
):
|
|
self.connection = connection
|
|
|
|
self.fetch_queue = fetch_queue
|
|
|
|
def get_consumers(self, Consumer, channel):
|
|
exchange = Exchange("playout", "fanout", durable=True, auto_delete=True)
|
|
# RabbitMQ says to avoid temporary queues with well-known names
|
|
# https://www.rabbitmq.com/docs/queues#shared-temporary-queues
|
|
# A server named queue that expires is used so that if the service
|
|
# gets temporarily disconnected, no events are lost, but the queue is
|
|
# automatically cleaned up on shutdown.
|
|
queues = [Queue("", exchange=exchange, expires=30.0)]
|
|
|
|
return [
|
|
Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),
|
|
]
|
|
|
|
def on_message(self, body, message: Message) -> None:
|
|
logger.debug("received message: %s", body)
|
|
try:
|
|
try:
|
|
body = body.decode()
|
|
except (UnicodeDecodeError, AttributeError):
|
|
pass
|
|
|
|
payload: dict = json.loads(body)
|
|
command = payload["event_type"]
|
|
logger.info("handling event %s: %s", command, payload)
|
|
|
|
if command in (
|
|
"update_schedule",
|
|
"reset_liquidsoap_bootstrap",
|
|
"update_stream_format",
|
|
"update_message_offline",
|
|
"update_station_name",
|
|
"switch_source",
|
|
"update_transition_fade",
|
|
"disconnect_source",
|
|
):
|
|
self.fetch_queue.put(payload)
|
|
else:
|
|
logger.warning("invalid command: %s", command)
|
|
|
|
except Exception as exception: # pylint: disable=broad-exception-caught
|
|
logger.exception(exception)
|
|
|
|
message.ack()
|
|
|
|
|
|
# pylint: disable=too-few-public-methods
|
|
class MessageListener:
|
|
def __init__(
|
|
self,
|
|
config: Config,
|
|
fetch_queue: "ThreadQueue[Dict[str, Any]]",
|
|
) -> None:
|
|
self.config = config
|
|
self.fetch_queue = fetch_queue
|
|
|
|
def run_forever(self) -> None:
|
|
while True:
|
|
with Connection(
|
|
self.config.rabbitmq.url,
|
|
heartbeat=5,
|
|
transport_options={"client_properties": {"connection_name": "playout"}},
|
|
) as connection:
|
|
handler = MessageHandler(
|
|
connection=connection,
|
|
fetch_queue=self.fetch_queue,
|
|
)
|
|
|
|
def shutdown(_signum, _frame):
|
|
raise SystemExit()
|
|
|
|
signal(SIGTERM, shutdown)
|
|
|
|
logger.info("starting message handler")
|
|
handler.run()
|
|
|
|
sleep(5)
|