submaster/whisper_project/infra/kokoro_adapter.py

157 lines
6.4 KiB
Python

import os
import subprocess
import shutil
import logging
from typing import Optional
# Importar funciones pesadas (parsing/synth) de forma perezosa dentro de
# `synthesize_from_srt` para evitar fallos en la importación del paquete cuando
# dependencias opcionales (p.ej. 'srt') no están instaladas.
from .ffmpeg_adapter import FFmpegAudioProcessor
logger = logging.getLogger(__name__)
class KokoroHttpClient:
"""Cliente HTTP para sintetizar segmentos desde un .srt usando un endpoint compatible.
Reemplaza la invocación por subprocess a `srt_to_kokoro.py`. Reusa las funciones de
`srt_to_kokoro.py` para parsing y síntesis HTTP (synth_chunk) y usa FFmpegAudioProcessor
para operaciones con WAV cuando sea necesario.
"""
def __init__(self, endpoint: str, api_key: Optional[str] = None, voice: Optional[str] = None, model: Optional[str] = None):
self.endpoint = endpoint
self.api_key = api_key
self.voice = voice or "em_alex"
self.model = model or "model"
self._processor = FFmpegAudioProcessor()
def synthesize_from_srt(self, srt_path: str, out_wav: str, video: Optional[str] = None, align: bool = True, keep_chunks: bool = False, mix_with_original: bool = False, mix_background_volume: float = 0.2):
"""Sintetiza cada subtítulo del SRT y concatena en out_wav.
Parámetros claves coinciden con la versión previa del adaptador CLI para compatibilidad.
"""
headers = {"Accept": "*/*"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
# importar las utilidades sólo cuando se vayan a usar
try:
from whisper_project.srt_to_kokoro import parse_srt_file, synth_chunk
except ModuleNotFoundError as e:
raise RuntimeError("Módulo requerido no encontrado para síntesis por SRT: instale 'srt' y 'requests' (pip install srt requests)") from e
subs = parse_srt_file(srt_path)
tmpdir = os.path.join(os.path.dirname(out_wav), f".kokoro_tmp_{os.getpid()}")
os.makedirs(tmpdir, exist_ok=True)
chunk_files = []
prev_end = 0.0
for i, sub in enumerate(subs, start=1):
text = "\n".join(line.strip() for line in sub.content.splitlines()).strip()
if not text:
prev_end = sub.end.total_seconds()
continue
start_sec = sub.start.total_seconds()
end_sec = sub.end.total_seconds()
duration = end_sec - start_sec
# align: insertar silencio por la brecha anterior
if align:
gap = start_sec - prev_end
if gap > 0.01:
sil_target = os.path.join(tmpdir, f"sil_{i:04d}.wav")
self._processor.create_silence(gap, sil_target)
chunk_files.append(sil_target)
# construir payload_template simple que reemplace {text}
payload_template = '{"model":"%s","voice":"%s","input":"{text}","response_format":"wav"}' % (self.model, self.voice)
try:
raw = synth_chunk(self.endpoint, text, headers, payload_template)
except Exception as e:
# saltar segmento con log y continuar
logger.exception("Error al sintetizar segmento %s", i)
prev_end = end_sec
continue
target = os.path.join(tmpdir, f"chunk_{i:04d}.wav")
# convertir/normalizar bytes a wav
self._processor.save_bytes_as_wav(raw, target)
if align:
aligned = os.path.join(tmpdir, f"chunk_{i:04d}.aligned.wav")
self._processor.pad_or_trim_wav(target, aligned, duration)
chunk_files.append(aligned)
if not keep_chunks:
try:
os.remove(target)
except Exception:
logger.debug("No se pudo eliminar chunk intermedio %s", target)
else:
chunk_files.append(target)
prev_end = end_sec
logger.info(" - Segmento %s/%s -> %s", i, len(subs), os.path.basename(chunk_files[-1]))
if not chunk_files:
raise RuntimeError("No se generaron fragmentos de audio desde el SRT")
# concatenar
self._processor.concat_wavs(chunk_files, out_wav)
# operaciones opcionales: mezclar o reemplazar en vídeo original
if mix_with_original and video:
# extraer audio original y mezclar: delegar a srt_to_kokoro original no es necesario
# aquí podemos replicar la estrategia previa: extraer audio, usar ffmpeg para mezclar
orig_tmp = os.path.join(tmpdir, f"orig_{os.getpid()}.wav")
try:
self._processor.extract_audio(video, orig_tmp, sr=22050)
# mezclar usando ffmpeg filter_complex
mixed_tmp = os.path.join(tmpdir, f"mixed_{os.getpid()}.wav")
vol = float(mix_background_volume)
cmd = [
"ffmpeg",
"-y",
"-i",
out_wav,
"-i",
orig_tmp,
"-filter_complex",
f"[0:a]volume=1[a1];[1:a]volume={vol}[a0];[a1][a0]amix=inputs=2:duration=first:dropout_transition=0[mix]",
"-map",
"[mix]",
"-c:a",
"pcm_s16le",
mixed_tmp,
]
subprocess.run(cmd, check=True)
shutil.move(mixed_tmp, out_wav)
finally:
try:
if os.path.exists(orig_tmp):
os.remove(orig_tmp)
except Exception:
pass
if video:
# si se pidió reemplazar la pista original
out_video = os.path.splitext(video)[0] + ".replaced_audio.mp4"
try:
self._processor.replace_audio_in_video(video, out_wav, out_video)
except Exception:
logger.exception("Error al reemplazar audio en el vídeo")
# limpieza: opcional conservar tmpdir si keep_chunks
if not keep_chunks:
try:
import shutil as _sh
_sh.rmtree(tmpdir, ignore_errors=True)
except Exception:
logger.debug("No se pudo eliminar tmpdir %s", tmpdir)