fix: remove potential deadlock in liquidsoap client (#3165)
### Description Remove the liquidsoap connection lock to remove any chance to run into a deadlock. With this change, the client is not thread safe anymore. Also handle connection error exception when talking to liquidsoap, e.g. when liquidsoap is restarted and closes the telnet connection. **This is a new feature**: No **I have updated the documentation to reflect these changes**: Not relevant. ### Testing Notes - Restart liquidsoap while playout is talking to it. - See the connection being dropped and playout that might run into a deadlock.
This commit is contained in:
parent
6588eb0939
commit
f33518d637
@ -152,7 +152,7 @@ def input.http_restart(~id,~initial_url="http://dummy/url")
|
|||||||
log(string_of(server.execute("#{id}.start"))) ;
|
log(string_of(server.execute("#{id}.start"))) ;
|
||||||
(-1.)
|
(-1.)
|
||||||
else 0.5 end})
|
else 0.5 end})
|
||||||
"OK"
|
"Done"
|
||||||
end)
|
end)
|
||||||
|
|
||||||
source
|
source
|
||||||
|
|||||||
@ -201,29 +201,29 @@ server.register(namespace="sources",
|
|||||||
description="Stop main input source.",
|
description="Stop main input source.",
|
||||||
usage="stop_input_main",
|
usage="stop_input_main",
|
||||||
"stop_input_main",
|
"stop_input_main",
|
||||||
fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done." end)
|
fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Start main input source.",
|
description="Start main input source.",
|
||||||
usage="start_input_main",
|
usage="start_input_main",
|
||||||
"start_input_main",
|
"start_input_main",
|
||||||
fun (s) -> begin log("sources.start_input_main") start_input_main() "Done." end)
|
fun (s) -> begin log("sources.start_input_main") start_input_main() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Stop show input source.",
|
description="Stop show input source.",
|
||||||
usage="stop_input_show",
|
usage="stop_input_show",
|
||||||
"stop_input_show",
|
"stop_input_show",
|
||||||
fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done." end)
|
fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Start show input source.",
|
description="Start show input source.",
|
||||||
usage="start_input_show",
|
usage="start_input_show",
|
||||||
"start_input_show",
|
"start_input_show",
|
||||||
fun (s) -> begin log("sources.start_input_show") start_input_show() "Done." end)
|
fun (s) -> begin log("sources.start_input_show") start_input_show() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Stop schedule source.",
|
description="Stop schedule source.",
|
||||||
usage="stop_schedule",
|
usage="stop_schedule",
|
||||||
"stop_schedule",
|
"stop_schedule",
|
||||||
fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done." end)
|
fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Start schedule source.",
|
description="Start schedule source.",
|
||||||
usage="start_schedule",
|
usage="start_schedule",
|
||||||
"start_schedule",
|
"start_schedule",
|
||||||
fun (s) -> begin log("sources.start_schedule") start_schedule() "Done." end)
|
fun (s) -> begin log("sources.start_schedule") start_schedule() "Done" end)
|
||||||
|
|||||||
@ -153,7 +153,7 @@ def input.http_restart(~id,~initial_url="http://dummy/url")
|
|||||||
log(string_of(server.execute("#{id}.start"))) ;
|
log(string_of(server.execute("#{id}.start"))) ;
|
||||||
(-1.)
|
(-1.)
|
||||||
else 0.5 end})
|
else 0.5 end})
|
||||||
"OK"
|
"Done"
|
||||||
end)
|
end)
|
||||||
|
|
||||||
source
|
source
|
||||||
|
|||||||
@ -190,29 +190,29 @@ server.register(namespace="sources",
|
|||||||
description="Stop main input source.",
|
description="Stop main input source.",
|
||||||
usage="stop_input_main",
|
usage="stop_input_main",
|
||||||
"stop_input_main",
|
"stop_input_main",
|
||||||
fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done." end)
|
fun (s) -> begin log("sources.stop_input_main") stop_input_main() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Start main input source.",
|
description="Start main input source.",
|
||||||
usage="start_input_main",
|
usage="start_input_main",
|
||||||
"start_input_main",
|
"start_input_main",
|
||||||
fun (s) -> begin log("sources.start_input_main") start_input_main() "Done." end)
|
fun (s) -> begin log("sources.start_input_main") start_input_main() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Stop show input source.",
|
description="Stop show input source.",
|
||||||
usage="stop_input_show",
|
usage="stop_input_show",
|
||||||
"stop_input_show",
|
"stop_input_show",
|
||||||
fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done." end)
|
fun (s) -> begin log("sources.stop_input_show") stop_input_show() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Start show input source.",
|
description="Start show input source.",
|
||||||
usage="start_input_show",
|
usage="start_input_show",
|
||||||
"start_input_show",
|
"start_input_show",
|
||||||
fun (s) -> begin log("sources.start_input_show") start_input_show() "Done." end)
|
fun (s) -> begin log("sources.start_input_show") start_input_show() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Stop schedule source.",
|
description="Stop schedule source.",
|
||||||
usage="stop_schedule",
|
usage="stop_schedule",
|
||||||
"stop_schedule",
|
"stop_schedule",
|
||||||
fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done." end)
|
fun (s) -> begin log("sources.stop_schedule") stop_schedule() "Done" end)
|
||||||
server.register(namespace="sources",
|
server.register(namespace="sources",
|
||||||
description="Start schedule source.",
|
description="Start schedule source.",
|
||||||
usage="start_schedule",
|
usage="start_schedule",
|
||||||
"start_schedule",
|
"start_schedule",
|
||||||
fun (s) -> begin log("sources.start_schedule") start_schedule() "Done." end)
|
fun (s) -> begin log("sources.start_schedule") start_schedule() "Done" end)
|
||||||
|
|||||||
@ -20,6 +20,8 @@ class LiquidsoapClientError(Exception):
|
|||||||
class LiquidsoapClient:
|
class LiquidsoapClient:
|
||||||
"""
|
"""
|
||||||
A client to communicate with a running Liquidsoap server.
|
A client to communicate with a running Liquidsoap server.
|
||||||
|
|
||||||
|
The client is not thread safe.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
conn: LiquidsoapConnection
|
conn: LiquidsoapConnection
|
||||||
@ -45,7 +47,7 @@ class LiquidsoapClient:
|
|||||||
self.conn.write(f"var.set {name} = {value}")
|
self.conn.write(f"var.set {name} = {value}")
|
||||||
result = self.conn.read()
|
result = self.conn.read()
|
||||||
if f"Variable {name} set" not in result:
|
if f"Variable {name} set" not in result:
|
||||||
logger.error(result)
|
logger.error("unexpected response: %s", result)
|
||||||
|
|
||||||
def version(self) -> Tuple[int, int, int]:
|
def version(self) -> Tuple[int, int, int]:
|
||||||
with self.conn:
|
with self.conn:
|
||||||
@ -69,6 +71,7 @@ class LiquidsoapClient:
|
|||||||
with self.conn:
|
with self.conn:
|
||||||
for queue_id in queues:
|
for queue_id in queues:
|
||||||
self.conn.write(f"queues.s{queue_id}_skip")
|
self.conn.write(f"queues.s{queue_id}_skip")
|
||||||
|
self.conn.read() # Flush
|
||||||
|
|
||||||
def queue_push(self, queue_id: int, entry: str, show_name: str) -> None:
|
def queue_push(self, queue_id: int, entry: str, show_name: str) -> None:
|
||||||
with self.conn:
|
with self.conn:
|
||||||
@ -84,21 +87,28 @@ class LiquidsoapClient:
|
|||||||
def web_stream_start(self) -> None:
|
def web_stream_start(self) -> None:
|
||||||
with self.conn:
|
with self.conn:
|
||||||
self.conn.write("sources.start_schedule")
|
self.conn.write("sources.start_schedule")
|
||||||
|
self.conn.read() # Flush
|
||||||
self.conn.write("sources.start_web_stream")
|
self.conn.write("sources.start_web_stream")
|
||||||
|
self.conn.read() # Flush
|
||||||
|
|
||||||
def web_stream_start_buffer(self, schedule_id: int, uri: str) -> None:
|
def web_stream_start_buffer(self, schedule_id: int, uri: str) -> None:
|
||||||
with self.conn:
|
with self.conn:
|
||||||
self.conn.write(f"web_stream.set_id {schedule_id}")
|
self.conn.write(f"web_stream.set_id {schedule_id}")
|
||||||
|
self.conn.read() # Flush
|
||||||
self.conn.write(f"http.restart {uri}")
|
self.conn.write(f"http.restart {uri}")
|
||||||
|
self.conn.read() # Flush
|
||||||
|
|
||||||
def web_stream_stop(self) -> None:
|
def web_stream_stop(self) -> None:
|
||||||
with self.conn:
|
with self.conn:
|
||||||
self.conn.write("sources.stop_web_stream")
|
self.conn.write("sources.stop_web_stream")
|
||||||
|
self.conn.read() # Flush
|
||||||
|
|
||||||
def web_stream_stop_buffer(self) -> None:
|
def web_stream_stop_buffer(self) -> None:
|
||||||
with self.conn:
|
with self.conn:
|
||||||
self.conn.write("http.stop")
|
self.conn.write("http.stop")
|
||||||
|
self.conn.read() # Flush
|
||||||
self.conn.write("web_stream.set_id -1")
|
self.conn.write("web_stream.set_id -1")
|
||||||
|
self.conn.read() # Flush
|
||||||
|
|
||||||
def source_switch_status(
|
def source_switch_status(
|
||||||
self,
|
self,
|
||||||
@ -113,6 +123,7 @@ class LiquidsoapClient:
|
|||||||
action = "start" if streaming else "stop"
|
action = "start" if streaming else "stop"
|
||||||
with self.conn:
|
with self.conn:
|
||||||
self.conn.write(f"sources.{action}_{name_map[name]}")
|
self.conn.write(f"sources.{action}_{name_map[name]}")
|
||||||
|
self.conn.read() # Flush
|
||||||
|
|
||||||
def settings_update(
|
def settings_update(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import socket
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from socket import AF_UNIX, SOCK_STREAM, create_connection, socket
|
|
||||||
from threading import Lock
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -19,8 +18,7 @@ class LiquidsoapConnection:
|
|||||||
_path: Optional[Path] = None
|
_path: Optional[Path] = None
|
||||||
_timeout: int
|
_timeout: int
|
||||||
|
|
||||||
_lock: Lock
|
_sock: Optional[socket.socket] = None
|
||||||
_sock: Optional[socket] = None
|
|
||||||
_eof = b"END"
|
_eof = b"END"
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -40,7 +38,6 @@ class LiquidsoapConnection:
|
|||||||
socket instead of the host and port address. Defaults to None.
|
socket instead of the host and port address. Defaults to None.
|
||||||
timeout: Socket timeout. Defaults to 5.
|
timeout: Socket timeout. Defaults to 5.
|
||||||
"""
|
"""
|
||||||
self._lock = Lock()
|
|
||||||
self._path = path
|
self._path = path
|
||||||
self._host = host
|
self._host = host
|
||||||
self._port = port
|
self._port = port
|
||||||
@ -50,47 +47,43 @@ class LiquidsoapConnection:
|
|||||||
return f"{self._host}:{self._port}" if self._path is None else str(self._path)
|
return f"{self._host}:{self._port}" if self._path is None else str(self._path)
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
try:
|
self.connect()
|
||||||
self.connect()
|
return self
|
||||||
return self
|
|
||||||
except OSError as exception:
|
|
||||||
self._sock = None
|
|
||||||
self._lock.release()
|
|
||||||
raise exception
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, _traceback):
|
def __exit__(self, exc_type, exc_value, _traceback):
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
logger.debug("trying to acquire lock")
|
try:
|
||||||
# pylint: disable=consider-using-with
|
logger.debug("connecting to %s", self.address())
|
||||||
self._lock.acquire()
|
|
||||||
logger.debug("connecting to %s", self.address())
|
|
||||||
|
|
||||||
if self._path is not None:
|
if self._path is not None:
|
||||||
self._sock = socket(AF_UNIX, SOCK_STREAM)
|
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
self._sock.settimeout(self._timeout)
|
self._sock.settimeout(self._timeout)
|
||||||
self._sock.connect(str(self._path))
|
self._sock.connect(str(self._path))
|
||||||
else:
|
else:
|
||||||
self._sock = create_connection(
|
self._sock = socket.create_connection(
|
||||||
address=(self._host, self._port),
|
address=(self._host, self._port),
|
||||||
timeout=self._timeout,
|
timeout=self._timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
except (OSError, ConnectionError):
|
||||||
|
self._sock = None
|
||||||
|
raise
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self._sock is not None:
|
if self._sock is not None:
|
||||||
logger.debug("closing connection to %s", self.address())
|
logger.debug("closing connection to %s", self.address())
|
||||||
|
|
||||||
self.write("exit")
|
try:
|
||||||
# Reading for clean exit
|
self.write("exit")
|
||||||
while self._sock.recv(1024):
|
# Reading for clean exit
|
||||||
continue
|
while self._sock.recv(1024):
|
||||||
|
continue
|
||||||
|
|
||||||
sock = self._sock
|
finally:
|
||||||
self._sock = None
|
self._sock.close()
|
||||||
sock.close()
|
self._sock = None
|
||||||
|
|
||||||
self._lock.release()
|
|
||||||
|
|
||||||
def write(self, *messages: str):
|
def write(self, *messages: str):
|
||||||
if self._sock is None:
|
if self._sock is None:
|
||||||
|
|||||||
@ -111,11 +111,12 @@ def cli(
|
|||||||
)
|
)
|
||||||
wait_for_legacy(legacy_client)
|
wait_for_legacy(legacy_client)
|
||||||
|
|
||||||
liq_client = LiquidsoapClient(
|
wait_for_liquidsoap(
|
||||||
host=config.playout.liquidsoap_host,
|
LiquidsoapClient(
|
||||||
port=config.playout.liquidsoap_port,
|
host=config.playout.liquidsoap_host,
|
||||||
|
port=config.playout.liquidsoap_port,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
wait_for_liquidsoap(liq_client)
|
|
||||||
|
|
||||||
fetch_queue: "Queue[Dict[str, Any]]" = Queue()
|
fetch_queue: "Queue[Dict[str, Any]]" = Queue()
|
||||||
push_queue: "Queue[Events]" = Queue()
|
push_queue: "Queue[Events]" = Queue()
|
||||||
@ -125,7 +126,12 @@ def cli(
|
|||||||
# priority, and retrieve it.
|
# priority, and retrieve it.
|
||||||
file_queue: "Queue[FileEvents]" = Queue()
|
file_queue: "Queue[FileEvents]" = Queue()
|
||||||
|
|
||||||
liquidsoap = Liquidsoap(liq_client)
|
liquidsoap = Liquidsoap(
|
||||||
|
LiquidsoapClient(
|
||||||
|
host=config.playout.liquidsoap_host,
|
||||||
|
port=config.playout.liquidsoap_port,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
PypoFile(file_queue, api_client).start()
|
PypoFile(file_queue, api_client).start()
|
||||||
|
|
||||||
@ -133,7 +139,10 @@ def cli(
|
|||||||
fetch_queue,
|
fetch_queue,
|
||||||
push_queue,
|
push_queue,
|
||||||
file_queue,
|
file_queue,
|
||||||
liq_client,
|
LiquidsoapClient(
|
||||||
|
host=config.playout.liquidsoap_host,
|
||||||
|
port=config.playout.liquidsoap_port,
|
||||||
|
),
|
||||||
liquidsoap,
|
liquidsoap,
|
||||||
config,
|
config,
|
||||||
api_client,
|
api_client,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user