"""Utilidades reutilizables para síntesis a partir de SRT. Contiene parsing del SRT, llamada HTTP al endpoint TTS y helpers ffmpeg para convertir/concatenar/padear segmentos. Estas funciones eran previamente parte de `srt_to_kokoro.py` y se mueven aquí para ser reutilizables por adaptadores y tests. """ import json import os import re import shutil import subprocess import tempfile from typing import Optional try: import requests except Exception: # Dejar que el import falle en tiempo de uso (cliente perezoso) si no está instalado requests = None try: import srt except Exception: srt = None def find_synthesis_endpoint(openapi_url: str) -> Optional[str]: """Intento heurístico: baja openapi.json y busca paths con palabras clave. Retorna la URL completa del path candidato o None. """ if requests is None: raise RuntimeError("'requests' no está disponible") try: r = requests.get(openapi_url, timeout=20) r.raise_for_status() spec = r.json() except Exception: return None paths = spec.get("paths", {}) candidate = None for path, methods in paths.items(): lname = path.lower() if any(k in lname for k in ("synth", "tts", "text", "synthesize")): for method, op in methods.items(): if method.lower() == "post": candidate = path break if candidate: break if not candidate: for path, methods in paths.items(): for method, op in methods.items(): meta = json.dumps(op).lower() if any(k in meta for k in ("synth", "tts", "text", "synthesize")) and method.lower() == "post": candidate = path break if candidate: break if not candidate: return None from urllib.parse import urlparse, urljoin p = urlparse(openapi_url) base = f"{p.scheme}://{p.netloc}" return urljoin(base, candidate) def parse_srt_file(path: str): if srt is None: raise RuntimeError("El paquete 'srt' no está instalado") with open(path, "r", encoding="utf-8") as f: raw = f.read() return list(srt.parse(raw)) def synth_chunk(endpoint: str, text: str, headers: dict, payload_template: Optional[str], timeout=60): """Envía la solicitud y devuelve bytes de audio. Maneja respuestas audio/* o JSON con campo base64. """ if requests is None: raise RuntimeError("El paquete 'requests' no está instalado") if payload_template: body = payload_template.replace("{text}", text) try: json_body = json.loads(body) except Exception: json_body = {"text": text} else: json_body = {"text": text} r = requests.post(endpoint, json=json_body, headers=headers, timeout=timeout) r.raise_for_status() ctype = r.headers.get("Content-Type", "") if ctype.startswith("audio/"): return r.content try: j = r.json() for k in ("audio", "wav", "data", "base64"): if k in j: val = j[k] import base64 try: return base64.b64decode(val) except Exception: pass except Exception: pass return r.content def ensure_ffmpeg(): if shutil.which("ffmpeg") is None: raise RuntimeError("ffmpeg no está disponible en PATH") def convert_and_save(raw_bytes: bytes, target_path: str): """Guarda bytes a un archivo temporal y convierte a WAV PCM 22050 mono.""" with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as tmp: tmp.write(raw_bytes) tmp.flush() tmp_path = tmp.name cmd = [ "ffmpeg", "-y", "-i", tmp_path, "-ar", "22050", "-ac", "1", "-sample_fmt", "s16", target_path, ] try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: with open(target_path, "wb") as out: out.write(raw_bytes) finally: try: os.remove(tmp_path) except Exception: pass def create_silence(duration: float, out_path: str, sr: int = 22050): cmd = [ "ffmpeg", "-y", "-f", "lavfi", "-i", f"anullsrc=channel_layout=mono:sample_rate={sr}", "-t", f"{duration}", "-c:a", "pcm_s16le", out_path, ] try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: try: with open(out_path, "wb") as fh: fh.write(b"\x00" * 1024) except Exception: pass def pad_or_trim_wav(in_path: str, out_path: str, target_duration: float, sr: int = 22050): try: p = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", in_path, ], capture_output=True, text=True, check=True, ) cur = float(p.stdout.strip()) except Exception: cur = 0.0 if cur == 0.0: shutil.copy(in_path, out_path) return if abs(cur - target_duration) < 0.02: shutil.copy(in_path, out_path) return if cur > target_duration: cmd = ["ffmpeg", "-y", "-i", in_path, "-t", f"{target_duration}", out_path] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return pad = target_duration - cur with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as sil: sil_path = sil.name listname = None try: create_silence(pad, sil_path, sr=sr) with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf: listf.write(f"file '{os.path.abspath(in_path)}'\n") listf.write(f"file '{os.path.abspath(sil_path)}'\n") listname = listf.name cmd2 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path] subprocess.run(cmd2, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) finally: try: os.remove(sil_path) except Exception: pass try: if listname: os.remove(listname) except Exception: pass def concat_chunks(chunks: list, out_path: str): ensure_ffmpeg() with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf: for c in chunks: listf.write(f"file '{os.path.abspath(c)}'\n") listname = listf.name try: cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path] subprocess.run(cmd, check=True) except subprocess.CalledProcessError: tmp_concat = out_path + ".tmp.wav" cmd2 = ["ffmpeg", "-y", "-i", f"concat:{'|'.join(chunks)}", "-c", "copy", tmp_concat] subprocess.run(cmd2) shutil.move(tmp_concat, out_path) finally: try: os.remove(listname) except Exception: pass