364 lines
14 KiB
Python
364 lines
14 KiB
Python
"""Orquestador que compone los adaptadores infra para ejecutar el pipeline.
|
|
|
|
Proporciona una clase `Orchestrator` con método `run` y soporta modo dry-run
|
|
para inspección sin ejecutar los pasos pesados.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from whisper_project.infra import process_video, transcribe
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Orchestrator:
|
|
"""Orquesta: extracción audio -> transcripción -> TTS por segmento -> reemplazo audio -> quemar subtítulos.
|
|
|
|
Nota: los pasos concretos se delegan a los adaptadores en `whisper_project.infra`.
|
|
"""
|
|
|
|
def __init__(self, dry_run: bool = False, tts_model: str = "kokoro", verbose: bool = False):
|
|
self.dry_run = dry_run
|
|
self.tts_model = tts_model
|
|
# No configurar basicConfig aquí: usar configure_logging desde el CLI/main
|
|
if verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
def run(self, src_video: str, out_dir: str, translate: bool = False) -> dict:
|
|
"""Ejecuta el pipeline.
|
|
|
|
Args:
|
|
src_video: ruta al vídeo de entrada.
|
|
out_dir: carpeta donde escribir resultados intermedios/finales.
|
|
translate: si True, intentará traducir SRT (delegado a futuras implementaciones).
|
|
|
|
Returns:
|
|
diccionario con resultados y rutas generadas.
|
|
"""
|
|
src = Path(src_video)
|
|
out = Path(out_dir)
|
|
out.mkdir(parents=True, exist_ok=True)
|
|
|
|
result = {
|
|
"input_video": str(src.resolve()),
|
|
"out_dir": str(out.resolve()),
|
|
"steps": [],
|
|
}
|
|
|
|
# 1) Extraer audio
|
|
audio_wav = out / f"{src.stem}.wav"
|
|
step = {"name": "extract_audio", "out": str(audio_wav)}
|
|
result["steps"].append(step)
|
|
if self.dry_run:
|
|
logger.info("[dry-run] extraer audio: %s -> %s", src, audio_wav)
|
|
else:
|
|
logger.info("extraer audio: %s -> %s", src, audio_wav)
|
|
process_video.extract_audio(str(src), str(audio_wav))
|
|
|
|
# 2) Transcribir (segmentado si es necesario)
|
|
srt_path = out / f"{src.stem}.srt"
|
|
step = {"name": "transcribe", "out": str(srt_path)}
|
|
result["steps"].append(step)
|
|
if self.dry_run:
|
|
logger.info("[dry-run] transcribir audio -> %s", srt_path)
|
|
segments = []
|
|
else:
|
|
logger.info("transcribir audio -> %s", srt_path)
|
|
# usamos la función delegante que el proyecto expone
|
|
segments = transcribe.transcribe_segmented_with_tempfiles(str(audio_wav), [])
|
|
transcribe.write_srt(segments, str(srt_path))
|
|
|
|
# 3) (Opcional) traducir SRT — placeholder
|
|
if translate:
|
|
step = {"name": "translate", "out": str(srt_path)}
|
|
result["steps"].append(step)
|
|
if self.dry_run:
|
|
logger.info("[dry-run] traducir SRT: %s", srt_path)
|
|
else:
|
|
logger.info("traducir SRT: %s (funcionalidad no implementada en orquestador)", srt_path)
|
|
|
|
# 4) Generar TTS segmentado en un WAV final (dub)
|
|
dubbed_wav = out / f"{src.stem}.dub.wav"
|
|
step = {"name": "tts_and_stitch", "out": str(dubbed_wav)}
|
|
result["steps"].append(step)
|
|
if self.dry_run:
|
|
logger.info("[dry-run] synthesize TTS por segmento -> %s (modelo=%s)", dubbed_wav, self.tts_model)
|
|
else:
|
|
logger.info("synthesize TTS por segmento -> %s (modelo=%s)", dubbed_wav, self.tts_model)
|
|
# por ahora usamos la función helper de transcribe para síntesis (si existe)
|
|
try:
|
|
# `segments` viene de la transcripción previa
|
|
transcribe.tts_synthesize(" ".join([s.get("text", "") for s in segments]), str(dubbed_wav), model=self.tts_model)
|
|
except Exception:
|
|
# Fallback simple: crear un silencio (no romper)
|
|
logger.exception("TTS falló, creando archivo vacío como fallback")
|
|
try:
|
|
process_video.pad_or_trim_wav(0.0, str(dubbed_wav))
|
|
except Exception:
|
|
logger.exception("No se pudo crear WAV de fallback")
|
|
|
|
# 5) Reemplazar audio en el vídeo
|
|
dubbed_video = out / f"{src.stem}.dub.mp4"
|
|
step = {"name": "replace_audio_in_video", "out": str(dubbed_video)}
|
|
result["steps"].append(step)
|
|
if self.dry_run:
|
|
logger.info("[dry-run] reemplazar audio en video: %s -> %s", src, dubbed_video)
|
|
else:
|
|
logger.info("reemplazar audio en video: %s -> %s", src, dubbed_video)
|
|
process_video.replace_audio_in_video(str(src), str(dubbed_wav), str(dubbed_video))
|
|
|
|
# 6) Quemar subtítulos en vídeo final
|
|
burned = out / f"{src.stem}.burned.mp4"
|
|
step = {"name": "burn_subtitles", "out": str(burned)}
|
|
result["steps"].append(step)
|
|
if self.dry_run:
|
|
logger.info("[dry-run] quemar subtítulos: %s + %s -> %s", dubbed_video, srt_path, burned)
|
|
else:
|
|
logger.info("quemar subtítulos: %s + %s -> %s", dubbed_video, srt_path, burned)
|
|
process_video.burn_subtitles(str(dubbed_video), str(srt_path), str(burned))
|
|
|
|
return result
|
|
|
|
|
|
__all__ = ["Orchestrator"]
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from typing import Optional
|
|
|
|
from ..core.models import PipelineResult
|
|
from ..infra import ffmpeg_adapter
|
|
from ..infra.kokoro_adapter import KokoroHttpClient
|
|
|
|
|
|
class PipelineOrchestrator:
|
|
"""Use case class that coordinates the high-level steps of the pipeline.
|
|
|
|
Esta clase mantiene la lógica de orquestación en métodos pequeños y
|
|
testables, y depende de adaptadores infra para las operaciones I/O.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
kokoro_endpoint: str,
|
|
kokoro_key: Optional[str] = None,
|
|
voice: Optional[str] = None,
|
|
kokoro_model: Optional[str] = None,
|
|
transcriber=None,
|
|
translator=None,
|
|
tts_client=None,
|
|
audio_processor=None,
|
|
):
|
|
# Si no se inyectan adaptadores, crear implementaciones por defecto
|
|
# Sólo importar adaptadores pesados si no se inyectan implementaciones.
|
|
if transcriber is None:
|
|
try:
|
|
from ..infra.faster_whisper_adapter import FasterWhisperTranscriber
|
|
|
|
self.transcriber = FasterWhisperTranscriber()
|
|
except Exception:
|
|
# dejar como None para permitir fallback a subprocess en tiempo de ejecución
|
|
self.transcriber = None
|
|
else:
|
|
self.transcriber = transcriber
|
|
|
|
if translator is None:
|
|
try:
|
|
from ..infra.marian_adapter import MarianTranslator
|
|
|
|
self.translator = MarianTranslator()
|
|
except Exception:
|
|
self.translator = None
|
|
else:
|
|
self.translator = translator
|
|
|
|
if tts_client is None:
|
|
try:
|
|
from ..infra.kokoro_adapter import KokoroHttpClient
|
|
|
|
self.tts_client = KokoroHttpClient(kokoro_endpoint, api_key=kokoro_key, voice=voice, model=kokoro_model)
|
|
except Exception:
|
|
self.tts_client = None
|
|
else:
|
|
self.tts_client = tts_client
|
|
|
|
if audio_processor is None:
|
|
try:
|
|
from ..infra.ffmpeg_adapter import FFmpegAudioProcessor
|
|
|
|
self.audio_processor = FFmpegAudioProcessor()
|
|
except Exception:
|
|
self.audio_processor = None
|
|
else:
|
|
self.audio_processor = audio_processor
|
|
|
|
def run(
|
|
self,
|
|
video: str,
|
|
srt: Optional[str],
|
|
workdir: str,
|
|
translate_method: str = "local",
|
|
gemini_api_key: Optional[str] = None,
|
|
whisper_model: str = "base",
|
|
mix: bool = False,
|
|
mix_background_volume: float = 0.2,
|
|
keep_chunks: bool = False,
|
|
dry_run: bool = False,
|
|
) -> PipelineResult:
|
|
"""Run the pipeline.
|
|
|
|
When dry_run=True the orchestrator will only print planned actions
|
|
instead of executing subprocesses or ffmpeg commands.
|
|
"""
|
|
# 0) prepare paths
|
|
if dry_run:
|
|
logger.info("[dry-run] workdir: %s", workdir)
|
|
|
|
# 1) extraer audio
|
|
audio_tmp = os.path.join(workdir, "extracted_audio.wav")
|
|
if dry_run:
|
|
logger.info("[dry-run] ffmpeg extract audio -> %s", audio_tmp)
|
|
else:
|
|
self.audio_processor.extract_audio(video, audio_tmp, sr=16000)
|
|
|
|
# 2) transcribir si es necesario
|
|
if srt:
|
|
srt_in = srt
|
|
else:
|
|
srt_in = os.path.join(workdir, "transcribed.srt")
|
|
cmd_trans = [
|
|
sys.executable,
|
|
"whisper_project/transcribe.py",
|
|
"--file",
|
|
audio_tmp,
|
|
"--backend",
|
|
"faster-whisper",
|
|
"--model",
|
|
whisper_model,
|
|
"--srt",
|
|
"--srt-file",
|
|
srt_in,
|
|
]
|
|
if dry_run:
|
|
logger.info("[dry-run] %s", " ".join(cmd_trans))
|
|
else:
|
|
# Use injected transcriber when possible
|
|
try:
|
|
self.transcriber.transcribe(audio_tmp, srt_in)
|
|
except Exception:
|
|
# Fallback to subprocess if adapter not available
|
|
subprocess.run(cmd_trans, check=True)
|
|
|
|
# 3) traducir
|
|
srt_translated = os.path.join(workdir, "translated.srt")
|
|
if translate_method == "local":
|
|
cmd_translate = [
|
|
sys.executable,
|
|
"whisper_project/translate_srt_local.py",
|
|
"--in",
|
|
srt_in,
|
|
"--out",
|
|
srt_translated,
|
|
]
|
|
if dry_run:
|
|
logger.info("[dry-run] %s", " ".join(cmd_translate))
|
|
else:
|
|
try:
|
|
self.translator.translate_srt(srt_in, srt_translated)
|
|
except Exception:
|
|
subprocess.run(cmd_translate, check=True)
|
|
elif translate_method == "gemini":
|
|
# preferir adaptador inyectado que soporte Gemini, sino usar el local wrapper
|
|
cmd_translate = [
|
|
sys.executable,
|
|
"whisper_project/translate_srt_with_gemini.py",
|
|
"--in",
|
|
srt_in,
|
|
"--out",
|
|
srt_translated,
|
|
]
|
|
if gemini_api_key:
|
|
cmd_translate += ["--gemini-api-key", gemini_api_key]
|
|
|
|
if dry_run:
|
|
logger.info("[dry-run] %s", " ".join(cmd_translate))
|
|
else:
|
|
try:
|
|
# intentar usar adaptador Gemini si está disponible
|
|
if self.translator and getattr(self.translator, "__class__", None).__name__ == "GeminiTranslator":
|
|
self.translator.translate_srt(srt_in, srt_translated)
|
|
else:
|
|
# intentar importar adaptador local
|
|
from ..infra.gemini_adapter import GeminiTranslator
|
|
|
|
gem = GeminiTranslator(api_key=gemini_api_key)
|
|
gem.translate_srt(srt_in, srt_translated)
|
|
except Exception:
|
|
subprocess.run(cmd_translate, check=True)
|
|
elif translate_method == "argos":
|
|
cmd_translate = [
|
|
sys.executable,
|
|
"whisper_project/translate_srt_argos.py",
|
|
"--in",
|
|
srt_in,
|
|
"--out",
|
|
srt_translated,
|
|
]
|
|
if dry_run:
|
|
logger.info("[dry-run] %s", " ".join(cmd_translate))
|
|
else:
|
|
try:
|
|
if self.translator and getattr(self.translator, "__class__", None).__name__ == "ArgosTranslator":
|
|
self.translator.translate_srt(srt_in, srt_translated)
|
|
else:
|
|
from ..infra.argos_adapter import ArgosTranslator
|
|
|
|
a = ArgosTranslator()
|
|
a.translate_srt(srt_in, srt_translated)
|
|
except Exception:
|
|
subprocess.run(cmd_translate, check=True)
|
|
elif translate_method == "none":
|
|
srt_translated = srt_in
|
|
else:
|
|
raise ValueError("translate_method not supported in this orchestrator")
|
|
|
|
# 4) sintetizar por segmento
|
|
dub_wav = os.path.join(workdir, "dub_final.wav")
|
|
if dry_run:
|
|
logger.info("[dry-run] synthesize from srt %s -> %s", srt_translated, dub_wav)
|
|
else:
|
|
# Use injected tts_client
|
|
self.tts_client.synthesize_from_srt(
|
|
srt_translated,
|
|
dub_wav,
|
|
video=video,
|
|
align=True,
|
|
keep_chunks=keep_chunks,
|
|
mix_with_original=mix,
|
|
mix_background_volume=mix_background_volume,
|
|
)
|
|
|
|
# 5) reemplazar audio en vídeo
|
|
replaced = os.path.splitext(video)[0] + ".replaced_audio.mp4"
|
|
if dry_run:
|
|
logger.info("[dry-run] replace audio in video -> %s", replaced)
|
|
else:
|
|
self.audio_processor.replace_audio_in_video(video, dub_wav, replaced)
|
|
|
|
# 6) quemar subtítulos
|
|
burned = os.path.splitext(video)[0] + ".replaced_audio.subs.mp4"
|
|
if dry_run:
|
|
logger.info("[dry-run] burn subtitles %s -> %s", srt_translated, burned)
|
|
else:
|
|
self.audio_processor.burn_subtitles(replaced, srt_translated, burned)
|
|
|
|
return PipelineResult(
|
|
workdir=workdir,
|
|
dub_wav=dub_wav,
|
|
replaced_video=replaced,
|
|
burned_video=burned,
|
|
)
|