262 lines
7.4 KiB
Python
262 lines
7.4 KiB
Python
"""Utilidades reutilizables para síntesis a partir de SRT.
|
|
|
|
Contiene parsing del SRT, llamada HTTP al endpoint TTS y helpers ffmpeg
|
|
para convertir/concatenar/padear segmentos. Estas funciones eran previamente
|
|
parte de `srt_to_kokoro.py` y se mueven aquí para ser reutilizables por
|
|
adaptadores y tests.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from typing import Optional
|
|
|
|
try:
|
|
import requests
|
|
except Exception:
|
|
# Dejar que el import falle en tiempo de uso (cliente perezoso) si no está instalado
|
|
requests = None
|
|
|
|
try:
|
|
import srt
|
|
except Exception:
|
|
srt = None
|
|
|
|
|
|
def find_synthesis_endpoint(openapi_url: str) -> Optional[str]:
|
|
"""Intento heurístico: baja openapi.json y busca paths con palabras clave.
|
|
|
|
Retorna la URL completa del path candidato o None.
|
|
"""
|
|
if requests is None:
|
|
raise RuntimeError("'requests' no está disponible")
|
|
try:
|
|
r = requests.get(openapi_url, timeout=20)
|
|
r.raise_for_status()
|
|
spec = r.json()
|
|
except Exception:
|
|
return None
|
|
|
|
paths = spec.get("paths", {})
|
|
candidate = None
|
|
for path, methods in paths.items():
|
|
lname = path.lower()
|
|
if any(k in lname for k in ("synth", "tts", "text", "synthesize")):
|
|
for method, op in methods.items():
|
|
if method.lower() == "post":
|
|
candidate = path
|
|
break
|
|
if candidate:
|
|
break
|
|
|
|
if not candidate:
|
|
for path, methods in paths.items():
|
|
for method, op in methods.items():
|
|
meta = json.dumps(op).lower()
|
|
if any(k in meta for k in ("synth", "tts", "text", "synthesize")) and method.lower() == "post":
|
|
candidate = path
|
|
break
|
|
if candidate:
|
|
break
|
|
|
|
if not candidate:
|
|
return None
|
|
|
|
from urllib.parse import urlparse, urljoin
|
|
|
|
p = urlparse(openapi_url)
|
|
base = f"{p.scheme}://{p.netloc}"
|
|
return urljoin(base, candidate)
|
|
|
|
|
|
def parse_srt_file(path: str):
|
|
if srt is None:
|
|
raise RuntimeError("El paquete 'srt' no está instalado")
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
raw = f.read()
|
|
return list(srt.parse(raw))
|
|
|
|
|
|
def synth_chunk(endpoint: str, text: str, headers: dict, payload_template: Optional[str], timeout=60):
|
|
"""Envía la solicitud y devuelve bytes de audio.
|
|
|
|
Maneja respuestas audio/* o JSON con campo base64.
|
|
"""
|
|
if requests is None:
|
|
raise RuntimeError("El paquete 'requests' no está instalado")
|
|
|
|
if payload_template:
|
|
body = payload_template.replace("{text}", text)
|
|
try:
|
|
json_body = json.loads(body)
|
|
except Exception:
|
|
json_body = {"text": text}
|
|
else:
|
|
json_body = {"text": text}
|
|
|
|
r = requests.post(endpoint, json=json_body, headers=headers, timeout=timeout)
|
|
r.raise_for_status()
|
|
|
|
ctype = r.headers.get("Content-Type", "")
|
|
if ctype.startswith("audio/"):
|
|
return r.content
|
|
try:
|
|
j = r.json()
|
|
for k in ("audio", "wav", "data", "base64"):
|
|
if k in j:
|
|
val = j[k]
|
|
import base64
|
|
|
|
try:
|
|
return base64.b64decode(val)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
return r.content
|
|
|
|
|
|
def ensure_ffmpeg():
|
|
if shutil.which("ffmpeg") is None:
|
|
raise RuntimeError("ffmpeg no está disponible en PATH")
|
|
|
|
|
|
def convert_and_save(raw_bytes: bytes, target_path: str):
|
|
"""Guarda bytes a un archivo temporal y convierte a WAV PCM 22050 mono."""
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as tmp:
|
|
tmp.write(raw_bytes)
|
|
tmp.flush()
|
|
tmp_path = tmp.name
|
|
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-i",
|
|
tmp_path,
|
|
"-ar",
|
|
"22050",
|
|
"-ac",
|
|
"1",
|
|
"-sample_fmt",
|
|
"s16",
|
|
target_path,
|
|
]
|
|
try:
|
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
except subprocess.CalledProcessError:
|
|
with open(target_path, "wb") as out:
|
|
out.write(raw_bytes)
|
|
finally:
|
|
try:
|
|
os.remove(tmp_path)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def create_silence(duration: float, out_path: str, sr: int = 22050):
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
f"anullsrc=channel_layout=mono:sample_rate={sr}",
|
|
"-t",
|
|
f"{duration}",
|
|
"-c:a",
|
|
"pcm_s16le",
|
|
out_path,
|
|
]
|
|
try:
|
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
except subprocess.CalledProcessError:
|
|
try:
|
|
with open(out_path, "wb") as fh:
|
|
fh.write(b"\x00" * 1024)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def pad_or_trim_wav(in_path: str, out_path: str, target_duration: float, sr: int = 22050):
|
|
try:
|
|
p = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
in_path,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
cur = float(p.stdout.strip())
|
|
except Exception:
|
|
cur = 0.0
|
|
|
|
if cur == 0.0:
|
|
shutil.copy(in_path, out_path)
|
|
return
|
|
|
|
if abs(cur - target_duration) < 0.02:
|
|
shutil.copy(in_path, out_path)
|
|
return
|
|
|
|
if cur > target_duration:
|
|
cmd = ["ffmpeg", "-y", "-i", in_path, "-t", f"{target_duration}", out_path]
|
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
return
|
|
|
|
pad = target_duration - cur
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as sil:
|
|
sil_path = sil.name
|
|
listname = None
|
|
try:
|
|
create_silence(pad, sil_path, sr=sr)
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf:
|
|
listf.write(f"file '{os.path.abspath(in_path)}'\n")
|
|
listf.write(f"file '{os.path.abspath(sil_path)}'\n")
|
|
listname = listf.name
|
|
cmd2 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path]
|
|
subprocess.run(cmd2, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
finally:
|
|
try:
|
|
os.remove(sil_path)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if listname:
|
|
os.remove(listname)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def concat_chunks(chunks: list, out_path: str):
|
|
ensure_ffmpeg()
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf:
|
|
for c in chunks:
|
|
listf.write(f"file '{os.path.abspath(c)}'\n")
|
|
listname = listf.name
|
|
|
|
try:
|
|
cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path]
|
|
subprocess.run(cmd, check=True)
|
|
except subprocess.CalledProcessError:
|
|
tmp_concat = out_path + ".tmp.wav"
|
|
cmd2 = ["ffmpeg", "-y", "-i", f"concat:{'|'.join(chunks)}", "-c", "copy", tmp_concat]
|
|
subprocess.run(cmd2)
|
|
shutil.move(tmp_concat, out_path)
|
|
finally:
|
|
try:
|
|
os.remove(listname)
|
|
except Exception:
|
|
pass
|