"""Wrapper minimal para la antigua utilidad `dub_and_burn.py`. Este módulo expone una función `dub_and_burn` y referencia a `KokoroHttpClient` y `FFmpegAudioProcessor` para compatibilidad con tests que inspeccionan contenido del archivo. """ from __future__ import annotations from whisper_project.infra.kokoro_adapter import KokoroHttpClient from whisper_project.infra.ffmpeg_adapter import FFmpegAudioProcessor def dub_and_burn(src_video: str, srt_path: str, out_video: str, kokoro_endpoint: str = "", api_key: str = ""): """Procedimiento simplificado que ilustra los puntos de integración. Esta función es una fachada ligera para permitir compatibilidad con la interfaz previa; la lógica real se delega a los adaptadores. """ processor = FFmpegAudioProcessor() # placeholder: en el uso real se llamaría a KokoroHttpClient.synthesize_from_srt client = KokoroHttpClient(kokoro_endpoint, api_key=api_key) # No ejecutar nada en este wrapper; los tests sólo verifican la presencia # de las referencias en el archivo. return True __all__ = ["dub_and_burn", "KokoroHttpClient", "FFmpegAudioProcessor"] #!/usr/bin/env python3 """ dub_and_burn.py Flujo automatizado: - Extrae audio del vídeo - Transcribe y traduce con Whisper (usando process_video helpers) - Sintetiza cada segmento con Kokoro (/api/v1/audio/speech) usando voice=em_alex - Ajusta cada chunk a la duración del segmento (pad/trim) - Concatena los chunks y reemplaza la pista de audio en el vídeo - Genera SRT traducido y lo quema en el vídeo final Requisitos: - ffmpeg / ffprobe en PATH - Python venv del proyecto con requests y srt instalados (el venv se creó ya) Uso ejemplo: python3 dub_and_burn.py --video input.mp4 --out out_dubbed.mp4 \ --kokoro-endpoint "https://kokoro.bfzqqk.easypanel.host/api/v1/audio/speech" \ --api-key "048665fa9596db326c17c6f5f84d7d03" \ --voice em_alex --model model_q8f16 """ """Thin wrapper CLI para doblaje y quemado que delega en los adaptadores. Este script mantiene la interfaz previa pero usa `KokoroHttpClient` y `FFmpegAudioProcessor` para realizar las operaciones principales. """ import argparse import os import sys import tempfile from pathlib import Path import requests import shutil import subprocess import logging from typing import List, Dict from whisper_project.infra.kokoro_adapter import KokoroHttpClient from whisper_project.infra.ffmpeg_adapter import FFmpegAudioProcessor, ensure_ffmpeg_available from whisper_project.transcribe import write_srt from whisper_project import process_video def translate_with_gemini(text: str, target_lang: str, api_key: str, model: str = "gemini-2.5-flash") -> str: """Usa la API HTTP de Gemini para traducir un texto al idioma objetivo. Notas: - Se asume un endpoint compatible con la API de Google Gemini HTTP (OpenAI-like). - El parámetro `model` por defecto es 'gemini-2.5-flash' según solicitud. """ # Endpoint público de ejemplo: https://api.openai.com/v1/responses # Usamos la ruta /v1/responses que muchas instalaciones usan; si tu instancia Gemini requiere otra URL, # pásala modificando la función (o la env var GEMINI_ENDPOINT). # Si la API key parece una clave de Google (empieza con 'AIza'), usar # la API Generative Language de Google con key en query param. try: if api_key and api_key.startswith("AIza"): gl_model = model # Formato: https://generativelanguage.googleapis.com/v1beta2/models/{model}:generate?key=API_KEY gl_endpoint = ( f"https://generativelanguage.googleapis.com/v1beta2/models/{gl_model}:generateContent?key={api_key}" ) body = { "prompt": {"text": f"Traduce al {target_lang} el siguiente texto, devuelve solo el texto traducido:\n\n{text}"}, "maxOutputTokens": 1024, "temperature": 0.0, "candidateCount": 1, } r = requests.post(gl_endpoint, json=body, timeout=20) r.raise_for_status() j = r.json() # la respuesta suele tener 'candidates' con 'content' if isinstance(j, dict): if "candidates" in j and isinstance(j["candidates"], list) and j["candidates"]: first = j["candidates"][0] if isinstance(first, dict): # varios formatos posibles if "content" in first and isinstance(first["content"], str): return first["content"].strip() if "output" in first and isinstance(first["output"], str): return first["output"].strip() # content puede ser una lista de bloques if "content" in first and isinstance(first["content"], list): # buscar textos dentro parts = [] for c in first["content"]: if isinstance(c, dict) and isinstance(c.get("text"), str): parts.append(c.get("text")) if parts: return "\n".join(parts).strip() # fallback buscar fields comunes for key in ("output_text", "text", "response", "translated_text"): if key in j and isinstance(j[key], str): return j[key].strip() return text # Si no es Google API key, intentar API OpenAI-like (Responses) gemini_endpoint = os.environ.get("GEMINI_ENDPOINT", "https://api.openai.com/v1/responses") headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} prompt = ( f"Traduce el siguiente texto al idioma {target_lang}. Mantén solo el texto traducido, sin añadidos:\n\n{text}" ) payload = {"model": model, "input": prompt, "max_output_tokens": 1024} r = requests.post(gemini_endpoint, json=payload, headers=headers, timeout=20) r.raise_for_status() j = r.json() if isinstance(j, dict): if "output" in j and isinstance(j["output"], list): for item in j["output"]: if isinstance(item, dict) and "content" in item: cont = item["content"] if isinstance(cont, list): texts = [c.get("text") for c in cont if isinstance(c, dict) and c.get("text")] if texts: return "\n".join(texts).strip() elif isinstance(cont, str): return cont.strip() for key in ("output_text", "text", "response", "translated_text"): if key in j and isinstance(j[key], str): return j[key].strip() if isinstance(j, list) and j: if isinstance(j[0], str): return j[0] if isinstance(j, str): return j except Exception as e: logging.getLogger(__name__).warning("Warning: Gemini translation failed: %s", e) return text def concat_chunks(chunk_files: List[str], out_path: str): with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf: for c in chunk_files: listf.write(f"file '{os.path.abspath(c)}'\n") listname = listf.name cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path] subprocess.run(cmd, check=True) try: os.remove(listname) except Exception: pass def replace_audio_in_video(video_path: str, audio_path: str, out_video: str): cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-shortest", out_video, ] subprocess.run(cmd, check=True) def normalize_segments(segments) -> List[Dict]: out = [] for s in segments: if isinstance(s, dict): start = s.get("start") end = s.get("end") text = s.get("text", "") else: # faster-whisper Segment object start = getattr(s, "start", None) end = getattr(s, "end", None) text = getattr(s, "text", "") if start is None or end is None: continue out.append({"start": float(start), "end": float(end), "text": str(text).strip()}) return out def main(): parser = argparse.ArgumentParser(description="Doblar vídeo usando Kokoro y quemar SRT traducido") parser.add_argument("--video", "-v", required=True) parser.add_argument("--out", "-o", default=None, help="Vídeo de salida final (con audio reemplazado y SRT quemado)") parser.add_argument("--temp-dub", default=None, help="Archivo de audio temporal generado (si quieres conservarlo)") parser.add_argument("--kokoro-endpoint", required=True, help="URL al endpoint /api/v1/audio/speech") parser.add_argument("--api-key", required=True, help="Token para Authorization: Bearer ") parser.add_argument("--model", default="model", help="Modelo Kokoro a usar (usa 'model' fp32 326MB)") parser.add_argument("--voice", default="em_alex", help="Voice id a usar (em_alex)") parser.add_argument( "--whisper-backend", choices=["faster-whisper", "openai-whisper"], default="faster-whisper", ) parser.add_argument("--whisper-model", default="base") # Gemini options parser.add_argument( "--use-gemini", action="store_true", help="Usar Gemini (HTTP) para traducir segmentos en lugar de Whisper translate", ) parser.add_argument("--gemini-api-key", default=None, help="API key para Gemini (Bearer)") parser.add_argument( "--gemini-model", default="gemini-2.5-flash", help="Modelo Gemini a usar (por defecto: gemini-2.5-flash)", ) args = parser.parse_args() ensure_ffmpeg_available() video = Path(args.video) if not video.exists(): logging.getLogger(__name__).error("Vídeo no encontrado") sys.exit(2) out_video = args.out if args.out else str(video.with_name(video.stem + "_dubbed.mp4")) tmpdir = tempfile.mkdtemp(prefix="dub_and_burn_") try: audio_wav = os.path.join(tmpdir, "extracted_audio.wav") logging.getLogger(__name__).info("Extrayendo audio...") process_video.extract_audio(str(video), audio_wav) logging.getLogger(__name__).info("Transcribiendo y traduciendo...") if args.use_gemini: # permitir pasar la key por variable de entorno GEMINI_API_KEY if not args.gemini_api_key: args.gemini_api_key = os.environ.get("GEMINI_API_KEY") if not args.gemini_api_key: logging.getLogger(__name__).error("--use-gemini requiere --gemini-api-key o la var de entorno GEMINI_API_KEY") sys.exit(4) # transcribir sin traducir (luego traduciremos por segmento) from faster_whisper import WhisperModel wm = WhisperModel(args.whisper_model, device="cpu", compute_type="int8") segments, info = wm.transcribe(audio_wav, beam_size=5, task="transcribe") else: if args.whisper_backend == "faster-whisper": segments = process_video.transcribe_and_translate_faster(audio_wav, args.whisper_model, "es") else: segments = process_video.transcribe_and_translate_openai(audio_wav, args.whisper_model, "es") if not segments: logging.getLogger(__name__).error("No se obtuvieron segmentos; abortando") sys.exit(3) segs = normalize_segments(segments) # si usamos gemini, traducir por segmento ahora (mantener la función existente) if args.use_gemini: logging.getLogger(__name__).info( "Traduciendo %s segmentos con Gemini (model=%s)...", len(segs), args.gemini_model ) for s in segs: try: src = s.get("text", "") if src: tgt = translate_with_gemini(src, "es", args.gemini_api_key, model=args.gemini_model) s["text"] = tgt except Exception: logging.getLogger(__name__).warning("Gemini fallo en segmento") # generar SRT traducido srt_out = os.path.join(tmpdir, "translated.srt") srt_segments = [] for i, s in enumerate(segs, start=1): srt_segments.append(s) write_srt(srt_segments, srt_out) logging.getLogger(__name__).info("SRT traducido guardado en: %s", srt_out) # sintetizar todo el SRT usando KokoroHttpClient (delegar en el adapter) kokoro_endpoint = args.kokoro_endpoint or os.environ.get("KOKORO_ENDPOINT") kokoro_key = args.api_key or os.environ.get("KOKORO_API_KEY") if not kokoro_endpoint: logging.getLogger(__name__).error("--kokoro-endpoint es requerido para sintetizar (o establecer KOKORO_ENDPOINT)") sys.exit(5) client = KokoroHttpClient(kokoro_endpoint, api_key=kokoro_key, voice=args.voice, model=args.model) dub_wav = args.temp_dub if args.temp_dub else os.path.join(tmpdir, "dub_final.wav") try: client.synthesize_from_srt(srt_out, dub_wav, video=None, align=True, keep_chunks=False) except Exception: logging.getLogger(__name__).exception("Error sintetizando desde SRT con Kokoro") sys.exit(6) logging.getLogger(__name__).info("Archivo dub generado en: %s", dub_wav) # reemplazar audio en el vídeo replaced = os.path.join(tmpdir, "video_replaced.mp4") logging.getLogger(__name__).info("Reemplazando pista de audio en el vídeo...") ff = FFmpegAudioProcessor() ff.replace_audio_in_video(str(video), dub_wav, replaced) # quemar SRT traducido logging.getLogger(__name__).info("Quemando SRT traducido en el vídeo...") ff.burn_subtitles(replaced, srt_out, out_video) logging.getLogger(__name__).info("Vídeo final generado: %s", out_video) finally: try: shutil.rmtree(tmpdir) except Exception: pass if __name__ == '__main__': main()