Jonas L. 6588eb0939
refactor(playout): revert prefer datetime.now(timezone.utc) over datetime.utcnow() (#3169)
This reverts commit 8bd2db16617bb90d9a3b00ec48a8836a6bafa4f1.

The API does not support timezone aware query parameters.
2025-06-14 15:16:47 +02:00

80 lines
2.5 KiB
Python

import logging
from collections import deque
from datetime import datetime
from queue import Empty, Queue
from threading import Thread
from typing import Any, Dict
from ..utils import seconds_between
from .events import AnyEvent
from .liquidsoap import Liquidsoap
logger = logging.getLogger(__name__)
class PypoLiqQueue(Thread):
name = "liquidsoap_queue"
daemon = True
def __init__(
self,
future_queue: "Queue[Dict[str, Any]]",
liquidsoap: Liquidsoap,
):
Thread.__init__(self)
self.queue = future_queue
self.liquidsoap = liquidsoap
def main(self) -> None:
time_until_next_play = None
schedule_deque: deque[AnyEvent] = deque()
media_schedule = None
while True:
try:
if time_until_next_play is None:
logger.info("waiting indefinitely for schedule")
media_schedule = self.queue.get(block=True)
else:
logger.info(
"waiting %ss until next scheduled item", time_until_next_play
)
media_schedule = self.queue.get(
block=True, timeout=time_until_next_play
)
except Empty:
# Time to push a scheduled item.
media_item = schedule_deque.popleft()
self.liquidsoap.play(media_item)
if len(schedule_deque):
time_until_next_play = seconds_between(
datetime.utcnow(),
schedule_deque[0].start,
)
else:
time_until_next_play = None
else:
logger.info("New schedule received")
# new schedule received. Replace old one with this.
schedule_deque.clear()
keys = sorted(media_schedule.keys())
for i in keys:
schedule_deque.append(media_schedule[i])
if len(keys):
time_until_next_play = seconds_between(
datetime.utcnow(),
media_schedule[keys[0]].start,
)
else:
time_until_next_play = None
def run(self) -> None:
try:
self.main()
except Exception as exception: # pylint: disable=broad-exception-caught
logger.exception(exception)