submaster/whisper_project/infra/kokoro_utils.py

262 lines
7.4 KiB
Python

"""Utilidades reutilizables para síntesis a partir de SRT.
Contiene parsing del SRT, llamada HTTP al endpoint TTS y helpers ffmpeg
para convertir/concatenar/padear segmentos. Estas funciones eran previamente
parte de `srt_to_kokoro.py` y se mueven aquí para ser reutilizables por
adaptadores y tests.
"""
import json
import os
import re
import shutil
import subprocess
import tempfile
from typing import Optional
try:
import requests
except Exception:
# Dejar que el import falle en tiempo de uso (cliente perezoso) si no está instalado
requests = None
try:
import srt
except Exception:
srt = None
def find_synthesis_endpoint(openapi_url: str) -> Optional[str]:
"""Intento heurístico: baja openapi.json y busca paths con palabras clave.
Retorna la URL completa del path candidato o None.
"""
if requests is None:
raise RuntimeError("'requests' no está disponible")
try:
r = requests.get(openapi_url, timeout=20)
r.raise_for_status()
spec = r.json()
except Exception:
return None
paths = spec.get("paths", {})
candidate = None
for path, methods in paths.items():
lname = path.lower()
if any(k in lname for k in ("synth", "tts", "text", "synthesize")):
for method, op in methods.items():
if method.lower() == "post":
candidate = path
break
if candidate:
break
if not candidate:
for path, methods in paths.items():
for method, op in methods.items():
meta = json.dumps(op).lower()
if any(k in meta for k in ("synth", "tts", "text", "synthesize")) and method.lower() == "post":
candidate = path
break
if candidate:
break
if not candidate:
return None
from urllib.parse import urlparse, urljoin
p = urlparse(openapi_url)
base = f"{p.scheme}://{p.netloc}"
return urljoin(base, candidate)
def parse_srt_file(path: str):
if srt is None:
raise RuntimeError("El paquete 'srt' no está instalado")
with open(path, "r", encoding="utf-8") as f:
raw = f.read()
return list(srt.parse(raw))
def synth_chunk(endpoint: str, text: str, headers: dict, payload_template: Optional[str], timeout=60):
"""Envía la solicitud y devuelve bytes de audio.
Maneja respuestas audio/* o JSON con campo base64.
"""
if requests is None:
raise RuntimeError("El paquete 'requests' no está instalado")
if payload_template:
body = payload_template.replace("{text}", text)
try:
json_body = json.loads(body)
except Exception:
json_body = {"text": text}
else:
json_body = {"text": text}
r = requests.post(endpoint, json=json_body, headers=headers, timeout=timeout)
r.raise_for_status()
ctype = r.headers.get("Content-Type", "")
if ctype.startswith("audio/"):
return r.content
try:
j = r.json()
for k in ("audio", "wav", "data", "base64"):
if k in j:
val = j[k]
import base64
try:
return base64.b64decode(val)
except Exception:
pass
except Exception:
pass
return r.content
def ensure_ffmpeg():
if shutil.which("ffmpeg") is None:
raise RuntimeError("ffmpeg no está disponible en PATH")
def convert_and_save(raw_bytes: bytes, target_path: str):
"""Guarda bytes a un archivo temporal y convierte a WAV PCM 22050 mono."""
with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as tmp:
tmp.write(raw_bytes)
tmp.flush()
tmp_path = tmp.name
cmd = [
"ffmpeg",
"-y",
"-i",
tmp_path,
"-ar",
"22050",
"-ac",
"1",
"-sample_fmt",
"s16",
target_path,
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
with open(target_path, "wb") as out:
out.write(raw_bytes)
finally:
try:
os.remove(tmp_path)
except Exception:
pass
def create_silence(duration: float, out_path: str, sr: int = 22050):
cmd = [
"ffmpeg",
"-y",
"-f",
"lavfi",
"-i",
f"anullsrc=channel_layout=mono:sample_rate={sr}",
"-t",
f"{duration}",
"-c:a",
"pcm_s16le",
out_path,
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
try:
with open(out_path, "wb") as fh:
fh.write(b"\x00" * 1024)
except Exception:
pass
def pad_or_trim_wav(in_path: str, out_path: str, target_duration: float, sr: int = 22050):
try:
p = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
in_path,
],
capture_output=True,
text=True,
check=True,
)
cur = float(p.stdout.strip())
except Exception:
cur = 0.0
if cur == 0.0:
shutil.copy(in_path, out_path)
return
if abs(cur - target_duration) < 0.02:
shutil.copy(in_path, out_path)
return
if cur > target_duration:
cmd = ["ffmpeg", "-y", "-i", in_path, "-t", f"{target_duration}", out_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
pad = target_duration - cur
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as sil:
sil_path = sil.name
listname = None
try:
create_silence(pad, sil_path, sr=sr)
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf:
listf.write(f"file '{os.path.abspath(in_path)}'\n")
listf.write(f"file '{os.path.abspath(sil_path)}'\n")
listname = listf.name
cmd2 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path]
subprocess.run(cmd2, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
finally:
try:
os.remove(sil_path)
except Exception:
pass
try:
if listname:
os.remove(listname)
except Exception:
pass
def concat_chunks(chunks: list, out_path: str):
ensure_ffmpeg()
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf:
for c in chunks:
listf.write(f"file '{os.path.abspath(c)}'\n")
listname = listf.name
try:
cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path]
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
tmp_concat = out_path + ".tmp.wav"
cmd2 = ["ffmpeg", "-y", "-i", f"concat:{'|'.join(chunks)}", "-c", "copy", tmp_concat]
subprocess.run(cmd2)
shutil.move(tmp_concat, out_path)
finally:
try:
os.remove(listname)
except Exception:
pass