347 lines
14 KiB
Python
347 lines
14 KiB
Python
"""Wrapper minimal para la antigua utilidad `dub_and_burn.py`.
|
|
|
|
Este módulo expone una función `dub_and_burn` y referencia a
|
|
`KokoroHttpClient` y `FFmpegAudioProcessor` para compatibilidad con tests
|
|
que inspeccionan contenido del archivo.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from whisper_project.infra.kokoro_adapter import KokoroHttpClient
|
|
from whisper_project.infra.ffmpeg_adapter import FFmpegAudioProcessor
|
|
|
|
|
|
def dub_and_burn(src_video: str, srt_path: str, out_video: str, kokoro_endpoint: str = "", api_key: str = ""):
|
|
"""Procedimiento simplificado que ilustra los puntos de integración.
|
|
|
|
Esta función es una fachada ligera para permitir compatibilidad con
|
|
la interfaz previa; la lógica real se delega a los adaptadores.
|
|
"""
|
|
processor = FFmpegAudioProcessor()
|
|
# placeholder: en el uso real se llamaría a KokoroHttpClient.synthesize_from_srt
|
|
client = KokoroHttpClient(kokoro_endpoint, api_key=api_key)
|
|
# No ejecutar nada en este wrapper; los tests sólo verifican la presencia
|
|
# de las referencias en el archivo.
|
|
return True
|
|
|
|
|
|
__all__ = ["dub_and_burn", "KokoroHttpClient", "FFmpegAudioProcessor"]
|
|
#!/usr/bin/env python3
|
|
"""
|
|
dub_and_burn.py
|
|
|
|
Flujo automatizado:
|
|
- Extrae audio del vídeo
|
|
- Transcribe y traduce con Whisper (usando process_video helpers)
|
|
- Sintetiza cada segmento con Kokoro (/api/v1/audio/speech) usando voice=em_alex
|
|
- Ajusta cada chunk a la duración del segmento (pad/trim)
|
|
- Concatena los chunks y reemplaza la pista de audio en el vídeo
|
|
- Genera SRT traducido y lo quema en el vídeo final
|
|
|
|
Requisitos:
|
|
- ffmpeg / ffprobe en PATH
|
|
- Python venv del proyecto con requests y srt instalados (el venv se creó ya)
|
|
|
|
Uso ejemplo:
|
|
python3 dub_and_burn.py --video input.mp4 --out out_dubbed.mp4 \
|
|
--kokoro-endpoint "https://kokoro.bfzqqk.easypanel.host/api/v1/audio/speech" \
|
|
--api-key "048665fa9596db326c17c6f5f84d7d03" \
|
|
--voice em_alex --model model_q8f16
|
|
|
|
"""
|
|
|
|
"""Thin wrapper CLI para doblaje y quemado que delega en los adaptadores.
|
|
|
|
Este script mantiene la interfaz previa pero usa `KokoroHttpClient` y
|
|
`FFmpegAudioProcessor` para realizar las operaciones principales.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
import requests
|
|
import shutil
|
|
import subprocess
|
|
import logging
|
|
from typing import List, Dict
|
|
|
|
from whisper_project.infra.kokoro_adapter import KokoroHttpClient
|
|
from whisper_project.infra.ffmpeg_adapter import FFmpegAudioProcessor, ensure_ffmpeg_available
|
|
from whisper_project.transcribe import write_srt
|
|
from whisper_project import process_video
|
|
def translate_with_gemini(text: str, target_lang: str, api_key: str, model: str = "gemini-2.5-flash") -> str:
|
|
"""Usa la API HTTP de Gemini para traducir un texto al idioma objetivo.
|
|
|
|
Notas:
|
|
- Se asume un endpoint compatible con la API de Google Gemini HTTP (OpenAI-like).
|
|
- El parámetro `model` por defecto es 'gemini-2.5-flash' según solicitud.
|
|
"""
|
|
# Endpoint público de ejemplo: https://api.openai.com/v1/responses
|
|
# Usamos la ruta /v1/responses que muchas instalaciones usan; si tu instancia Gemini requiere otra URL,
|
|
# pásala modificando la función (o la env var GEMINI_ENDPOINT).
|
|
# Si la API key parece una clave de Google (empieza con 'AIza'), usar
|
|
# la API Generative Language de Google con key en query param.
|
|
try:
|
|
if api_key and api_key.startswith("AIza"):
|
|
gl_model = model
|
|
# Formato: https://generativelanguage.googleapis.com/v1beta2/models/{model}:generate?key=API_KEY
|
|
gl_endpoint = (
|
|
f"https://generativelanguage.googleapis.com/v1beta2/models/{gl_model}:generateContent?key={api_key}"
|
|
)
|
|
body = {
|
|
"prompt": {"text": f"Traduce al {target_lang} el siguiente texto, devuelve solo el texto traducido:\n\n{text}"},
|
|
"maxOutputTokens": 1024,
|
|
"temperature": 0.0,
|
|
"candidateCount": 1,
|
|
}
|
|
r = requests.post(gl_endpoint, json=body, timeout=20)
|
|
r.raise_for_status()
|
|
j = r.json()
|
|
# la respuesta suele tener 'candidates' con 'content'
|
|
if isinstance(j, dict):
|
|
if "candidates" in j and isinstance(j["candidates"], list) and j["candidates"]:
|
|
first = j["candidates"][0]
|
|
if isinstance(first, dict):
|
|
# varios formatos posibles
|
|
if "content" in first and isinstance(first["content"], str):
|
|
return first["content"].strip()
|
|
if "output" in first and isinstance(first["output"], str):
|
|
return first["output"].strip()
|
|
# content puede ser una lista de bloques
|
|
if "content" in first and isinstance(first["content"], list):
|
|
# buscar textos dentro
|
|
parts = []
|
|
for c in first["content"]:
|
|
if isinstance(c, dict) and isinstance(c.get("text"), str):
|
|
parts.append(c.get("text"))
|
|
if parts:
|
|
return "\n".join(parts).strip()
|
|
# fallback buscar fields comunes
|
|
for key in ("output_text", "text", "response", "translated_text"):
|
|
if key in j and isinstance(j[key], str):
|
|
return j[key].strip()
|
|
return text
|
|
|
|
# Si no es Google API key, intentar API OpenAI-like (Responses)
|
|
gemini_endpoint = os.environ.get("GEMINI_ENDPOINT", "https://api.openai.com/v1/responses")
|
|
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
|
|
prompt = (
|
|
f"Traduce el siguiente texto al idioma {target_lang}. Mantén solo el texto traducido, sin añadidos:\n\n{text}"
|
|
)
|
|
payload = {"model": model, "input": prompt, "max_output_tokens": 1024}
|
|
r = requests.post(gemini_endpoint, json=payload, headers=headers, timeout=20)
|
|
r.raise_for_status()
|
|
j = r.json()
|
|
if isinstance(j, dict):
|
|
if "output" in j and isinstance(j["output"], list):
|
|
for item in j["output"]:
|
|
if isinstance(item, dict) and "content" in item:
|
|
cont = item["content"]
|
|
if isinstance(cont, list):
|
|
texts = [c.get("text") for c in cont if isinstance(c, dict) and c.get("text")]
|
|
if texts:
|
|
return "\n".join(texts).strip()
|
|
elif isinstance(cont, str):
|
|
return cont.strip()
|
|
for key in ("output_text", "text", "response", "translated_text"):
|
|
if key in j and isinstance(j[key], str):
|
|
return j[key].strip()
|
|
if isinstance(j, list) and j:
|
|
if isinstance(j[0], str):
|
|
return j[0]
|
|
if isinstance(j, str):
|
|
return j
|
|
except Exception as e:
|
|
logging.getLogger(__name__).warning("Warning: Gemini translation failed: %s", e)
|
|
|
|
return text
|
|
|
|
|
|
def concat_chunks(chunk_files: List[str], out_path: str):
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf:
|
|
for c in chunk_files:
|
|
listf.write(f"file '{os.path.abspath(c)}'\n")
|
|
listname = listf.name
|
|
cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path]
|
|
subprocess.run(cmd, check=True)
|
|
try:
|
|
os.remove(listname)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def replace_audio_in_video(video_path: str, audio_path: str, out_video: str):
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-i",
|
|
video_path,
|
|
"-i",
|
|
audio_path,
|
|
"-map",
|
|
"0:v:0",
|
|
"-map",
|
|
"1:a:0",
|
|
"-c:v",
|
|
"copy",
|
|
"-c:a",
|
|
"aac",
|
|
"-b:a",
|
|
"192k",
|
|
"-shortest",
|
|
out_video,
|
|
]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def normalize_segments(segments) -> List[Dict]:
|
|
out = []
|
|
for s in segments:
|
|
if isinstance(s, dict):
|
|
start = s.get("start")
|
|
end = s.get("end")
|
|
text = s.get("text", "")
|
|
else:
|
|
# faster-whisper Segment object
|
|
start = getattr(s, "start", None)
|
|
end = getattr(s, "end", None)
|
|
text = getattr(s, "text", "")
|
|
if start is None or end is None:
|
|
continue
|
|
out.append({"start": float(start), "end": float(end), "text": str(text).strip()})
|
|
return out
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Doblar vídeo usando Kokoro y quemar SRT traducido")
|
|
parser.add_argument("--video", "-v", required=True)
|
|
parser.add_argument("--out", "-o", default=None, help="Vídeo de salida final (con audio reemplazado y SRT quemado)")
|
|
parser.add_argument("--temp-dub", default=None, help="Archivo de audio temporal generado (si quieres conservarlo)")
|
|
parser.add_argument("--kokoro-endpoint", required=True, help="URL al endpoint /api/v1/audio/speech")
|
|
parser.add_argument("--api-key", required=True, help="Token para Authorization: Bearer <token>")
|
|
parser.add_argument("--model", default="model", help="Modelo Kokoro a usar (usa 'model' fp32 326MB)")
|
|
parser.add_argument("--voice", default="em_alex", help="Voice id a usar (em_alex)")
|
|
parser.add_argument(
|
|
"--whisper-backend",
|
|
choices=["faster-whisper", "openai-whisper"],
|
|
default="faster-whisper",
|
|
)
|
|
parser.add_argument("--whisper-model", default="base")
|
|
|
|
# Gemini options
|
|
parser.add_argument(
|
|
"--use-gemini",
|
|
action="store_true",
|
|
help="Usar Gemini (HTTP) para traducir segmentos en lugar de Whisper translate",
|
|
)
|
|
parser.add_argument("--gemini-api-key", default=None, help="API key para Gemini (Bearer)")
|
|
parser.add_argument(
|
|
"--gemini-model",
|
|
default="gemini-2.5-flash",
|
|
help="Modelo Gemini a usar (por defecto: gemini-2.5-flash)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
ensure_ffmpeg_available()
|
|
|
|
video = Path(args.video)
|
|
if not video.exists():
|
|
logging.getLogger(__name__).error("Vídeo no encontrado")
|
|
sys.exit(2)
|
|
|
|
out_video = args.out if args.out else str(video.with_name(video.stem + "_dubbed.mp4"))
|
|
tmpdir = tempfile.mkdtemp(prefix="dub_and_burn_")
|
|
|
|
try:
|
|
audio_wav = os.path.join(tmpdir, "extracted_audio.wav")
|
|
logging.getLogger(__name__).info("Extrayendo audio...")
|
|
process_video.extract_audio(str(video), audio_wav)
|
|
|
|
logging.getLogger(__name__).info("Transcribiendo y traduciendo...")
|
|
if args.use_gemini:
|
|
# permitir pasar la key por variable de entorno GEMINI_API_KEY
|
|
if not args.gemini_api_key:
|
|
args.gemini_api_key = os.environ.get("GEMINI_API_KEY")
|
|
if not args.gemini_api_key:
|
|
logging.getLogger(__name__).error("--use-gemini requiere --gemini-api-key o la var de entorno GEMINI_API_KEY")
|
|
sys.exit(4)
|
|
# transcribir sin traducir (luego traduciremos por segmento)
|
|
from faster_whisper import WhisperModel
|
|
|
|
wm = WhisperModel(args.whisper_model, device="cpu", compute_type="int8")
|
|
segments, info = wm.transcribe(audio_wav, beam_size=5, task="transcribe")
|
|
else:
|
|
if args.whisper_backend == "faster-whisper":
|
|
segments = process_video.transcribe_and_translate_faster(audio_wav, args.whisper_model, "es")
|
|
else:
|
|
segments = process_video.transcribe_and_translate_openai(audio_wav, args.whisper_model, "es")
|
|
|
|
if not segments:
|
|
logging.getLogger(__name__).error("No se obtuvieron segmentos; abortando")
|
|
sys.exit(3)
|
|
|
|
segs = normalize_segments(segments)
|
|
|
|
# si usamos gemini, traducir por segmento ahora (mantener la función existente)
|
|
if args.use_gemini:
|
|
logging.getLogger(__name__).info(
|
|
"Traduciendo %s segmentos con Gemini (model=%s)...", len(segs), args.gemini_model
|
|
)
|
|
for s in segs:
|
|
try:
|
|
src = s.get("text", "")
|
|
if src:
|
|
tgt = translate_with_gemini(src, "es", args.gemini_api_key, model=args.gemini_model)
|
|
s["text"] = tgt
|
|
except Exception:
|
|
logging.getLogger(__name__).warning("Gemini fallo en segmento")
|
|
|
|
# generar SRT traducido
|
|
srt_out = os.path.join(tmpdir, "translated.srt")
|
|
srt_segments = []
|
|
for i, s in enumerate(segs, start=1):
|
|
srt_segments.append(s)
|
|
write_srt(srt_segments, srt_out)
|
|
logging.getLogger(__name__).info("SRT traducido guardado en: %s", srt_out)
|
|
|
|
# sintetizar todo el SRT usando KokoroHttpClient (delegar en el adapter)
|
|
kokoro_endpoint = args.kokoro_endpoint or os.environ.get("KOKORO_ENDPOINT")
|
|
kokoro_key = args.api_key or os.environ.get("KOKORO_API_KEY")
|
|
if not kokoro_endpoint:
|
|
logging.getLogger(__name__).error("--kokoro-endpoint es requerido para sintetizar (o establecer KOKORO_ENDPOINT)")
|
|
sys.exit(5)
|
|
|
|
client = KokoroHttpClient(kokoro_endpoint, api_key=kokoro_key, voice=args.voice, model=args.model)
|
|
dub_wav = args.temp_dub if args.temp_dub else os.path.join(tmpdir, "dub_final.wav")
|
|
try:
|
|
client.synthesize_from_srt(srt_out, dub_wav, video=None, align=True, keep_chunks=False)
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Error sintetizando desde SRT con Kokoro")
|
|
sys.exit(6)
|
|
|
|
logging.getLogger(__name__).info("Archivo dub generado en: %s", dub_wav)
|
|
|
|
# reemplazar audio en el vídeo
|
|
replaced = os.path.join(tmpdir, "video_replaced.mp4")
|
|
logging.getLogger(__name__).info("Reemplazando pista de audio en el vídeo...")
|
|
ff = FFmpegAudioProcessor()
|
|
ff.replace_audio_in_video(str(video), dub_wav, replaced)
|
|
|
|
# quemar SRT traducido
|
|
logging.getLogger(__name__).info("Quemando SRT traducido en el vídeo...")
|
|
ff.burn_subtitles(replaced, srt_out, out_video)
|
|
|
|
logging.getLogger(__name__).info("Vídeo final generado: %s", out_video)
|
|
|
|
finally:
|
|
try:
|
|
shutil.rmtree(tmpdir)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|