diff --git a/playout/libretime_playout/liquidsoap/1.4/ls_lib.liq b/playout/libretime_playout/liquidsoap/1.4/ls_lib.liq index 9e55fcdae..af1a10e80 100644 --- a/playout/libretime_playout/liquidsoap/1.4/ls_lib.liq +++ b/playout/libretime_playout/liquidsoap/1.4/ls_lib.liq @@ -152,7 +152,7 @@ def input.http_restart(~id,~initial_url="http://dummy/url") log(string_of(server.execute("#{id}.start"))) ; (-1.) else 0.5 end}) - "OK" + "Done" end) source diff --git a/playout/libretime_playout/liquidsoap/1.4/ls_script.liq b/playout/libretime_playout/liquidsoap/1.4/ls_script.liq index c7a481703..6be680cf7 100644 --- a/playout/libretime_playout/liquidsoap/1.4/ls_script.liq +++ b/playout/libretime_playout/liquidsoap/1.4/ls_script.liq @@ -201,29 +201,29 @@ server.register(namespace="sources", description="Stop main input source.", usage="stop_input_main", "stop_input_main", - fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done." end) + fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done" end) server.register(namespace="sources", description="Start main input source.", usage="start_input_main", "start_input_main", - fun (s) -> begin log("sources.start_input_main") start_input_main() "Done." end) + fun (s) -> begin log("sources.start_input_main") start_input_main() "Done" end) server.register(namespace="sources", description="Stop show input source.", usage="stop_input_show", "stop_input_show", - fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done." end) + fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done" end) server.register(namespace="sources", description="Start show input source.", usage="start_input_show", "start_input_show", - fun (s) -> begin log("sources.start_input_show") start_input_show() "Done." end) + fun (s) -> begin log("sources.start_input_show") start_input_show() "Done" end) server.register(namespace="sources", description="Stop schedule source.", usage="stop_schedule", "stop_schedule", - fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done." end) + fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done" end) server.register(namespace="sources", description="Start schedule source.", usage="start_schedule", "start_schedule", - fun (s) -> begin log("sources.start_schedule") start_schedule() "Done." end) + fun (s) -> begin log("sources.start_schedule") start_schedule() "Done" end) diff --git a/playout/libretime_playout/liquidsoap/2.0/ls_lib.liq b/playout/libretime_playout/liquidsoap/2.0/ls_lib.liq index 9bc6ef3d9..26ed6c420 100644 --- a/playout/libretime_playout/liquidsoap/2.0/ls_lib.liq +++ b/playout/libretime_playout/liquidsoap/2.0/ls_lib.liq @@ -153,7 +153,7 @@ def input.http_restart(~id,~initial_url="http://dummy/url") log(string_of(server.execute("#{id}.start"))) ; (-1.) else 0.5 end}) - "OK" + "Done" end) source diff --git a/playout/libretime_playout/liquidsoap/2.0/ls_script.liq b/playout/libretime_playout/liquidsoap/2.0/ls_script.liq index cd934fcd5..1a0ccb5ae 100644 --- a/playout/libretime_playout/liquidsoap/2.0/ls_script.liq +++ b/playout/libretime_playout/liquidsoap/2.0/ls_script.liq @@ -190,29 +190,29 @@ server.register(namespace="sources", description="Stop main input source.", usage="stop_input_main", "stop_input_main", - fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done." end) + fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done" end) server.register(namespace="sources", description="Start main input source.", usage="start_input_main", "start_input_main", - fun (s) -> begin log("sources.start_input_main") start_input_main() "Done." end) + fun (s) -> begin log("sources.start_input_main") start_input_main() "Done" end) server.register(namespace="sources", description="Stop show input source.", usage="stop_input_show", "stop_input_show", - fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done." end) + fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done" end) server.register(namespace="sources", description="Start show input source.", usage="start_input_show", "start_input_show", - fun (s) -> begin log("sources.start_input_show") start_input_show() "Done." end) + fun (s) -> begin log("sources.start_input_show") start_input_show() "Done" end) server.register(namespace="sources", description="Stop schedule source.", usage="stop_schedule", "stop_schedule", - fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done." end) + fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done" end) server.register(namespace="sources", description="Start schedule source.", usage="start_schedule", "start_schedule", - fun (s) -> begin log("sources.start_schedule") start_schedule() "Done." end) + fun (s) -> begin log("sources.start_schedule") start_schedule() "Done" end) diff --git a/playout/libretime_playout/liquidsoap/client/_client.py b/playout/libretime_playout/liquidsoap/client/_client.py index d06c28b7c..9a13392b1 100644 --- a/playout/libretime_playout/liquidsoap/client/_client.py +++ b/playout/libretime_playout/liquidsoap/client/_client.py @@ -20,6 +20,8 @@ class LiquidsoapClientError(Exception): class LiquidsoapClient: """ A client to communicate with a running Liquidsoap server. + + The client is not thread safe. """ conn: LiquidsoapConnection @@ -45,7 +47,7 @@ class LiquidsoapClient: self.conn.write(f"var.set {name} = {value}") result = self.conn.read() if f"Variable {name} set" not in result: - logger.error(result) + logger.error("unexpected response: %s", result) def version(self) -> Tuple[int, int, int]: with self.conn: @@ -69,6 +71,7 @@ class LiquidsoapClient: with self.conn: for queue_id in queues: self.conn.write(f"queues.s{queue_id}_skip") + self.conn.read() # Flush def queue_push(self, queue_id: int, entry: str, show_name: str) -> None: with self.conn: @@ -84,21 +87,28 @@ class LiquidsoapClient: def web_stream_start(self) -> None: with self.conn: self.conn.write("sources.start_schedule") + self.conn.read() # Flush self.conn.write("sources.start_web_stream") + self.conn.read() # Flush def web_stream_start_buffer(self, schedule_id: int, uri: str) -> None: with self.conn: self.conn.write(f"web_stream.set_id {schedule_id}") + self.conn.read() # Flush self.conn.write(f"http.restart {uri}") + self.conn.read() # Flush def web_stream_stop(self) -> None: with self.conn: self.conn.write("sources.stop_web_stream") + self.conn.read() # Flush def web_stream_stop_buffer(self) -> None: with self.conn: self.conn.write("http.stop") + self.conn.read() # Flush self.conn.write("web_stream.set_id -1") + self.conn.read() # Flush def source_switch_status( self, @@ -113,6 +123,7 @@ class LiquidsoapClient: action = "start" if streaming else "stop" with self.conn: self.conn.write(f"sources.{action}_{name_map[name]}") + self.conn.read() # Flush def settings_update( self, diff --git a/playout/libretime_playout/liquidsoap/client/_connection.py b/playout/libretime_playout/liquidsoap/client/_connection.py index 57607dcde..93ec120f1 100644 --- a/playout/libretime_playout/liquidsoap/client/_connection.py +++ b/playout/libretime_playout/liquidsoap/client/_connection.py @@ -1,7 +1,6 @@ import logging +import socket from pathlib import Path -from socket import AF_UNIX, SOCK_STREAM, create_connection, socket -from threading import Lock from typing import Optional logger = logging.getLogger(__name__) @@ -19,8 +18,7 @@ class LiquidsoapConnection: _path: Optional[Path] = None _timeout: int - _lock: Lock - _sock: Optional[socket] = None + _sock: Optional[socket.socket] = None _eof = b"END" def __init__( @@ -40,7 +38,6 @@ class LiquidsoapConnection: socket instead of the host and port address. Defaults to None. timeout: Socket timeout. Defaults to 5. """ - self._lock = Lock() self._path = path self._host = host self._port = port @@ -50,47 +47,43 @@ class LiquidsoapConnection: return f"{self._host}:{self._port}" if self._path is None else str(self._path) def __enter__(self): - try: - self.connect() - return self - except OSError as exception: - self._sock = None - self._lock.release() - raise exception + self.connect() + return self def __exit__(self, exc_type, exc_value, _traceback): self.close() def connect(self): - logger.debug("trying to acquire lock") - # pylint: disable=consider-using-with - self._lock.acquire() - logger.debug("connecting to %s", self.address()) + try: + logger.debug("connecting to %s", self.address()) - if self._path is not None: - self._sock = socket(AF_UNIX, SOCK_STREAM) - self._sock.settimeout(self._timeout) - self._sock.connect(str(self._path)) - else: - self._sock = create_connection( - address=(self._host, self._port), - timeout=self._timeout, - ) + if self._path is not None: + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(str(self._path)) + else: + self._sock = socket.create_connection( + address=(self._host, self._port), + timeout=self._timeout, + ) + + except (OSError, ConnectionError): + self._sock = None + raise def close(self): if self._sock is not None: logger.debug("closing connection to %s", self.address()) - self.write("exit") - # Reading for clean exit - while self._sock.recv(1024): - continue + try: + self.write("exit") + # Reading for clean exit + while self._sock.recv(1024): + continue - sock = self._sock - self._sock = None - sock.close() - - self._lock.release() + finally: + self._sock.close() + self._sock = None def write(self, *messages: str): if self._sock is None: diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index 31354162a..bc14034b6 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -111,11 +111,12 @@ def cli( ) wait_for_legacy(legacy_client) - liq_client = LiquidsoapClient( - host=config.playout.liquidsoap_host, - port=config.playout.liquidsoap_port, + wait_for_liquidsoap( + LiquidsoapClient( + host=config.playout.liquidsoap_host, + port=config.playout.liquidsoap_port, + ) ) - wait_for_liquidsoap(liq_client) fetch_queue: "Queue[Dict[str, Any]]" = Queue() push_queue: "Queue[Events]" = Queue() @@ -125,7 +126,12 @@ def cli( # priority, and retrieve it. file_queue: "Queue[FileEvents]" = Queue() - liquidsoap = Liquidsoap(liq_client) + liquidsoap = Liquidsoap( + LiquidsoapClient( + host=config.playout.liquidsoap_host, + port=config.playout.liquidsoap_port, + ) + ) PypoFile(file_queue, api_client).start() @@ -133,7 +139,10 @@ def cli( fetch_queue, push_queue, file_queue, - liq_client, + LiquidsoapClient( + host=config.playout.liquidsoap_host, + port=config.playout.liquidsoap_port, + ), liquidsoap, config, api_client,