commit 85691f13dcc03ae2db778875cf17f72411596efc Author: Cesar Mendivil Date: Thu Oct 23 21:54:13 2025 -0700 Init commit diff --git a/dailyrutines.dubbed.es.aligned.mp4 b/dailyrutines.dubbed.es.aligned.mp4 new file mode 100644 index 0000000..e9e6788 Binary files /dev/null and b/dailyrutines.dubbed.es.aligned.mp4 differ diff --git a/dailyrutines.dubbed.es.mixed.mp4 b/dailyrutines.dubbed.es.mixed.mp4 new file mode 100644 index 0000000..c0093b8 Binary files /dev/null and b/dailyrutines.dubbed.es.mixed.mp4 differ diff --git a/dailyrutines.dubbed.es.mixed.subs.mp4 b/dailyrutines.dubbed.es.mixed.subs.mp4 new file mode 100644 index 0000000..3476d44 Binary files /dev/null and b/dailyrutines.dubbed.es.mixed.subs.mp4 differ diff --git a/dailyrutines.dubbed.es.mp4 b/dailyrutines.dubbed.es.mp4 new file mode 100644 index 0000000..b145e57 Binary files /dev/null and b/dailyrutines.dubbed.es.mp4 differ diff --git a/dailyrutines.dubbed.es.subs.mp4 b/dailyrutines.dubbed.es.subs.mp4 new file mode 100644 index 0000000..2d6711d Binary files /dev/null and b/dailyrutines.dubbed.es.subs.mp4 differ diff --git a/dailyrutines.dubbed.gemini.mp4 b/dailyrutines.dubbed.gemini.mp4 new file mode 100644 index 0000000..af6611f Binary files /dev/null and b/dailyrutines.dubbed.gemini.mp4 differ diff --git a/dailyrutines.dubbed.mp4 b/dailyrutines.dubbed.mp4 new file mode 100644 index 0000000..d3ec20a Binary files /dev/null and b/dailyrutines.dubbed.mp4 differ diff --git a/dailyrutines.mp4 b/dailyrutines.mp4 new file mode 100644 index 0000000..4be2ebb Binary files /dev/null and b/dailyrutines.mp4 differ diff --git a/dailyrutines.replaced_audio.mp4 b/dailyrutines.replaced_audio.mp4 new file mode 100644 index 0000000..bb4f5f8 Binary files /dev/null and b/dailyrutines.replaced_audio.mp4 differ diff --git a/dailyrutines.replaced_audio.subs.mp4 b/dailyrutines.replaced_audio.subs.mp4 new file mode 100644 index 0000000..b805003 Binary files /dev/null and b/dailyrutines.replaced_audio.subs.mp4 differ diff --git a/prompt_init.md b/prompt_init.md new file mode 100644 index 0000000..32a29b7 --- /dev/null +++ b/prompt_init.md @@ -0,0 +1,112 @@ +You can run OpenAI's **Whisper** model for audio-to-text transcription on a **CPU** using **PyTorch**, typically by either using the original `openai-whisper` library or the Hugging Face `transformers` implementation. + +### Using the `openai-whisper` library + +1. **Installation:** Ensure you have Python, PyTorch (CPU version), and **FFmpeg** installed. + + ```bash + # Install the Whisper package + pip install -U openai-whisper + # On Linux, install FFmpeg (example for Debian/Ubuntu) + sudo apt update && sudo apt install ffmpeg + ``` + +2. **Specify CPU in Python:** In your Python script, explicitly load the model and move it to the CPU device. You can also pass the `device='cpu'` argument directly to `whisper.load_model()`. + + ```python + import whisper + + # Load the model and specify 'cpu' as the device + model = whisper.load_model("base", device='cpu') + + # Or, if loading and then moving: + # model = whisper.load_model("base").to("cpu") + + # Transcribe the audio file + result = model.transcribe("path/to/your/audio.mp3", fp16=False) # fp16=False is recommended for CPU + + print(result["text"]) + ``` + + *Note: Using a smaller model like `"tiny"` or `"base"` will be significantly faster on a CPU.* + +----- + +### Using the Hugging Face `transformers` library + +The Hugging Face `transformers` library also provides a way to run Whisper and often includes optimizations: + +1. **Installation:** Install the necessary libraries, ensuring you have the CPU-only version of PyTorch if you don't have a GPU. + + ```bash + pip install transformers datasets accelerate torch + ``` + +2. **Setup and Pipeline:** Use the PyTorch `AutoModelForSpeechSeq2Seq`, `AutoProcessor`, and `pipeline`, explicitly setting the device to `"cpu"`: + + ```python + import torch + from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline + + # Set device to CPU + device = "cpu" + torch_dtype = torch.float32 # Use float32 on CPU for standard performance + + # Choose a model size + model_id = "openai/whisper-base" # Example model + + # Load model and processor + model = AutoModelForSpeechSeq2Seq.from_pretrained( + model_id, + torch_dtype=torch_dtype, + low_cpu_mem_usage=True, + use_safetensors=True + ).to(device) + + processor = AutoProcessor.from_pretrained(model_id) + + # Create the ASR pipeline + pipe = pipeline( + "automatic-speech-recognition", + model=model, + tokenizer=processor.tokenizer, + feature_extractor=processor.feature_extractor, + torch_dtype=torch_dtype, + device=device, + ) + + # Transcribe + result = pipe("path/to/your/audio.mp3") + print(result["text"]) + ``` + +----- + +### Optimization: `faster-whisper` + +For much better performance on a CPU (up to 4 times faster), consider using the **`faster-whisper`** library, which uses the CTranslate2 inference engine: + +1. **Installation:** + + ```bash + pip install faster-whisper + ``` + +2. **Usage:** + + ```python + from faster_whisper import WhisperModel + + model_size = "base" # Choose a model size + + # Run on CPU with INT8 precision for speed + model = WhisperModel(model_size, device="cpu", compute_type="int8") + + segments, info = model.transcribe("path/to/your/audio.mp3", beam_size=5) + + for segment in segments: + print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text)) + ``` + +The [Whisper: Install Guide](https://www.youtube.com/watch?v=XX-ET_-onYU) video walks through the initial installation steps for Whisper AI, which is a prerequisite for running it with PyTorch on any device. +http://googleusercontent.com/youtube_content/0 \ No newline at end of file diff --git a/whisper_project/README.md b/whisper_project/README.md new file mode 100644 index 0000000..a19950c --- /dev/null +++ b/whisper_project/README.md @@ -0,0 +1,131 @@ +# Proyecto de ejemplo: Transcriptor Whisper (CPU) + +Un pequeño proyecto que demuestra cómo transcribir archivos de audio en CPU usando tres enfoques: + +- `openai-whisper` (paquete original) +# Proyecto de ejemplo: Transcriptor y doblador (Whisper + Coqui TTS) + +Este repo contiene utilidades para transcribir audio/video en CPU usando +distintos backends de Whisper y para generar doblaje por segmentos usando +Coqui TTS. Está pensado como una base reproducible y ligera para pruebas en CPU. + +Contenido principal +- `transcribe.py` — CLI principal: transcripción (openai-whisper / + transformers / faster-whisper), generación de SRT (incluyendo fallback), + síntesis TTS por segmento y pipeline de doblaje (replace / mix). +- `process_video.py` — pipeline alto nivel (extraer audio, transcribir, + traducir/opcional, generar SRT y quemar subtítulos en video). + +Requisitos del sistema +- `ffmpeg` (disponible en PATH) +- Para Coqui TTS se recomienda usar Miniforge/Conda con Python 3.11 en CPU. + +Instalación rápida + +1) Entorno ligero (solo transcripción con `faster-whisper` y dependencias mínimas): + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +2) Entorno para Coqui TTS (recomendado si vas a sintetizar/doblar): + +```bash +# Instalar Miniforge/Miniconda si no lo tienes, luego: +conda create -n tts_env python=3.11 -y +conda activate tts_env +# PyTorch CPU + TTS +python -m pip install --index-url https://download.pytorch.org/whl/cpu torch torchvision torchaudio +python -m pip install TTS faster-whisper +``` + +Nota: en la sesión de ejemplo se creó `tts_env` y se instaló `TTS` y `faster-whisper`. + +Uso — ejemplos prácticos + +1) Transcribir un audio y generar SRT (faster-whisper, modelo `base`): + +```bash +# desde el entorno donde tengas faster-whisper disponible +python whisper_project/transcribe.py \ + --file whisper_project/dailyrutines.audio.wav \ + --backend faster-whisper --model base --srt +``` + +2) Generar SRT de fallback (sin texto) — divide en segmentos uniformes: + +```bash +python whisper_project/transcribe.py -f path/to/audio.wav -b transformers --srt --srt-fallback +``` + +3) Transcripción por segmentos (extrae piezas y transcribe cada una): + +```bash +python whisper_project/transcribe.py -f path/to/audio.wav --segment-transcribe --srt --srt-segment-seconds 8 +``` + +4) Doblaje por segmentos (replace = reemplaza voz original): + +```bash +# usando el entorno tts_env donde instalaste Coqui TTS +conda activate tts_env +python whisper_project/transcribe.py \ + --file whisper_project/dailyrutines.audio.wav \ + --segment-transcribe --srt --srt-file whisper_project/dailyrutines.kokoro.dub.srt \ + --srt-segment-seconds 6 \ + --tts-model tts_models/en/ljspeech/tacotron2-DDC \ + --tts-model-repo tts_models/en/ljspeech/tacotron2-DDC \ + --dub --dub-mode replace --dub-out whisper_project/dailyrutines.kokoro.dub.wav +``` + +5) Doblaje por segmentos (mix = mezcla TTS con original): + +```bash +python whisper_project/transcribe.py \ + --file whisper_project/dailyrutines.audio.wav \ + --segment-transcribe --dub --dub-mode mix --dub-mix-level 0.7 \ + --tts-model tts_models/en/ljspeech/tacotron2-DDC --dub-out out_mix.wav +``` + +Remuxar audio doblado en el MP4 y quemar subtítulos + +1) Reemplazar la pista de audio en el MP4 por la pista doblada (sin recomprimir video): + +```bash +ffmpeg -y -i dailyrutines.mp4 -i whisper_project/dailyrutines.kokoro.dub.wav -c:v copy -map 0:v:0 -map 1:a:0 -shortest dailyrutines.kokoro.dub.mp4 +``` + +2) Quemar subtítulos (hardcode) en el video (requiere re-encode del video): + +```bash +ffmpeg -y -i dailyrutines.mp4 -vf "subtitles=whisper_project/dailyrutines.kokoro.dub.srt:force_style='FontName=Arial,FontSize=24'" -c:a copy dailyrutines.kokoro.subs.mp4 +``` + +Notas sobre modelos Hugging Face y tokens +- Si el repo del modelo TTS o del modelo de ASR es privado necesitarás + exportar `HUGGINGFACE_HUB_TOKEN` en el entorno antes de ejecutar el script + para que `huggingface_hub.snapshot_download` pueda acceder. Ejemplo: + +```bash +export HUGGINGFACE_HUB_TOKEN="hf_xxx..." +``` + +Rendimiento y recomendaciones +- En CPU usa modelos pequeños (`tiny`, `base`) para tiempos aceptables. +- `faster-whisper` con `compute_type=int8` reduce memoria y acelera en CPU. +- Para producción con GPU, instala las ruedas de PyTorch/GPU apropiadas + y activa `gpu=True` en las llamadas a TTS y whisper si tu hardware lo permite. + +Problemas comunes y troubleshooting +- "No module named 'faster_whisper'": instala `faster-whisper` en tu entorno. +- Coqui TTS tarda en descargar modelos la primera vez (pesan decenas de MB). +- Si la síntesis TTS es lenta, verifica que `torch` esté instalado y que + `TTS` use un vocoder optimizado (HifiGAN es el por defecto en los modelos + de ejemplo). + +¿Quieres que haga esto por ti? +- Puedo generar el MP4 final (reemplazando pista y/o quemando subtítulos), + o ajustar el pipeline de doblaje (p.ej. time-stretch suave). Indica qué + prefieres y lo ejecuto localmente en este workspace. diff --git a/whisper_project/__pycache__/dub_and_burn.cpython-313.pyc b/whisper_project/__pycache__/dub_and_burn.cpython-313.pyc new file mode 100644 index 0000000..0d8a36a Binary files /dev/null and b/whisper_project/__pycache__/dub_and_burn.cpython-313.pyc differ diff --git a/whisper_project/__pycache__/process_video.cpython-313.pyc b/whisper_project/__pycache__/process_video.cpython-313.pyc new file mode 100644 index 0000000..a73c828 Binary files /dev/null and b/whisper_project/__pycache__/process_video.cpython-313.pyc differ diff --git a/whisper_project/__pycache__/transcribe.cpython-313.pyc b/whisper_project/__pycache__/transcribe.cpython-313.pyc new file mode 100644 index 0000000..4451e90 Binary files /dev/null and b/whisper_project/__pycache__/transcribe.cpython-313.pyc differ diff --git a/whisper_project/coqui_test.wav b/whisper_project/coqui_test.wav new file mode 100644 index 0000000..31df696 Binary files /dev/null and b/whisper_project/coqui_test.wav differ diff --git a/whisper_project/dailyrutines.audio.srt b/whisper_project/dailyrutines.audio.srt new file mode 100644 index 0000000..32a1260 --- /dev/null +++ b/whisper_project/dailyrutines.audio.srt @@ -0,0 +1,56 @@ +1 +00:00:00,000 --> 00:00:10,000 + +2 +00:00:10,000 --> 00:00:20,000 + +3 +00:00:20,000 --> 00:00:30,000 + +4 +00:00:30,000 --> 00:00:40,000 + +5 +00:00:40,000 --> 00:00:50,000 + +6 +00:00:50,000 --> 00:01:00,000 + +7 +00:01:00,000 --> 00:01:10,000 + +8 +00:01:10,000 --> 00:01:20,000 + +9 +00:01:20,000 --> 00:01:30,000 + +10 +00:01:30,000 --> 00:01:40,000 + +11 +00:01:40,000 --> 00:01:50,000 + +12 +00:01:50,000 --> 00:02:00,000 + +13 +00:02:00,000 --> 00:02:10,000 + +14 +00:02:10,000 --> 00:02:20,000 + +15 +00:02:20,000 --> 00:02:30,000 + +16 +00:02:30,000 --> 00:02:40,000 + +17 +00:02:40,000 --> 00:02:50,000 + +18 +00:02:50,000 --> 00:03:00,000 + +19 +00:03:00,000 --> 00:03:09,009 diff --git a/whisper_project/dailyrutines.audio.wav b/whisper_project/dailyrutines.audio.wav new file mode 100644 index 0000000..10fc45b Binary files /dev/null and b/whisper_project/dailyrutines.audio.wav differ diff --git a/whisper_project/dailyrutines.kokoro.api.wav b/whisper_project/dailyrutines.kokoro.api.wav new file mode 100644 index 0000000..bab2a43 Binary files /dev/null and b/whisper_project/dailyrutines.kokoro.api.wav differ diff --git a/whisper_project/dailyrutines.kokoro.dub.es.aligned.wav b/whisper_project/dailyrutines.kokoro.dub.es.aligned.wav new file mode 100644 index 0000000..82a4016 Binary files /dev/null and b/whisper_project/dailyrutines.kokoro.dub.es.aligned.wav differ diff --git a/whisper_project/dailyrutines.kokoro.dub.es.srt b/whisper_project/dailyrutines.kokoro.dub.es.srt new file mode 100644 index 0000000..3de095f --- /dev/null +++ b/whisper_project/dailyrutines.kokoro.dub.es.srt @@ -0,0 +1,72 @@ +1 +00:00:00,000 --> 00:00:06,960 +Rutinas diarias + +2 +00:00:06,960 --> 00:00:14,480 +Hola mamá, estoy disfrutando la vida en Nueva Zelanda. + +3 +00:00:14,480 --> 00:00:19,240 +El campo es tan hermoso. + +4 +00:00:19,240 --> 00:00:23,199 +Mi rutina es diferente ahora. + +5 +00:00:23,199 --> 00:00:29,960 +Me despierto a las 6 en punto cada mañana y salgo a correr. + +6 +00:00:29,960 --> 00:00:36,640 +A las 7 en punto desayuno. + +7 +00:00:36,640 --> 00:00:42,120 +El café en Nueva Zelanda es tan bueno. + +8 +00:00:42,120 --> 00:00:46,240 +A las 8 voy a trabajar. + +9 +00:00:46,240 --> 00:00:52,679 +Normalmente tomo el autobús, pero a veces camino. + +10 +00:00:52,679 --> 00:00:57,439 +Empiezo a trabajar a las 9. + +11 +00:00:57,439 --> 00:01:02,399 +Trabajo en mi oficina hasta la hora del almuerzo. + +12 +00:01:02,399 --> 00:01:08,920 +A las 12 almuerzo con mis colegas en el parque. + +13 +00:01:08,920 --> 00:01:15,239 +Es agradable disfrutar del aire fresco y charlar juntos. + +14 +00:01:15,239 --> 00:01:23,759 +A las 5 salgo del trabajo y voy al gimnasio. + +15 +00:01:23,760 --> 00:01:32,920 +Hago ejercicio hasta las seis y luego voy a casa. + +16 +00:01:32,920 --> 00:01:39,520 +A las 8 ceno, luego me relajo. + +17 +00:01:39,520 --> 00:01:44,800 +I normally go to bed at 11 o'clock. + +18 +00:01:44,799 --> 00:01:51,799 +Hasta pronto, Stephen. + diff --git a/whisper_project/dailyrutines.kokoro.dub.es.wav b/whisper_project/dailyrutines.kokoro.dub.es.wav new file mode 100644 index 0000000..7b9fe62 Binary files /dev/null and b/whisper_project/dailyrutines.kokoro.dub.es.wav differ diff --git a/whisper_project/dailyrutines.kokoro.dub.srt b/whisper_project/dailyrutines.kokoro.dub.srt new file mode 100644 index 0000000..12f584a --- /dev/null +++ b/whisper_project/dailyrutines.kokoro.dub.srt @@ -0,0 +1,71 @@ +1 +00:00:00,000 --> 00:00:06,960 +Dayly routines + +2 +00:00:06,960 --> 00:00:14,480 +Hi mom, I'm enjoying life in New Zealand. + +3 +00:00:14,480 --> 00:00:19,240 +The countryside is so beautiful. + +4 +00:00:19,240 --> 00:00:23,199 +My routine is different now. + +5 +00:00:23,199 --> 00:00:29,960 +I wake at 6 o'clock every morning and go for a run. + +6 +00:00:29,960 --> 00:00:36,640 +At 7 o'clock I have breakfast. + +7 +00:00:36,640 --> 00:00:42,120 +The coffee in New Zealand is so good. + +8 +00:00:42,120 --> 00:00:46,240 +At 8 o'clock I go to work. + +9 +00:00:46,240 --> 00:00:52,679 +I usually take the bus, but sometimes I walk. + +10 +00:00:52,679 --> 00:00:57,439 +I start work at 9 o'clock. + +11 +00:00:57,439 --> 00:01:02,399 +I work in my office until lunchtime. + +12 +00:01:02,399 --> 00:01:08,920 +At 12 o'clock I have lunch with my colleagues in the park. + +13 +00:01:08,920 --> 00:01:15,239 +It's nice to enjoy the fresh air and chat together. + +14 +00:01:15,239 --> 00:01:23,759 +At 5 o'clock I leave work and go to the gym. + +15 +00:01:23,760 --> 00:01:32,920 +I exercise until 6 o'clock and then go home. + +16 +00:01:32,920 --> 00:01:39,520 +At 8 o'clock I eat dinner, then relax. + +17 +00:01:39,520 --> 00:01:44,800 +I normally go to bed at 11 o'clock. + +18 +00:01:44,799 --> 00:01:51,799 +See you soon, Stephen. diff --git a/whisper_project/dailyrutines.kokoro.dub.wav b/whisper_project/dailyrutines.kokoro.dub.wav new file mode 100644 index 0000000..42cc20a Binary files /dev/null and b/whisper_project/dailyrutines.kokoro.dub.wav differ diff --git a/whisper_project/dub_and_burn.py b/whisper_project/dub_and_burn.py new file mode 100644 index 0000000..e6d1e6d --- /dev/null +++ b/whisper_project/dub_and_burn.py @@ -0,0 +1,484 @@ +#!/usr/bin/env python3 +""" +dub_and_burn.py + +Flujo automatizado: +- Extrae audio del vídeo +- Transcribe y traduce con Whisper (usando process_video helpers) +- Sintetiza cada segmento con Kokoro (/api/v1/audio/speech) usando voice=em_alex +- Ajusta cada chunk a la duración del segmento (pad/trim) +- Concatena los chunks y reemplaza la pista de audio en el vídeo +- Genera SRT traducido y lo quema en el vídeo final + +Requisitos: +- ffmpeg / ffprobe en PATH +- Python venv del proyecto con requests y srt instalados (el venv se creó ya) + +Uso ejemplo: + python3 dub_and_burn.py --video input.mp4 --out out_dubbed.mp4 \ + --kokoro-endpoint "https://kokoro.bfzqqk.easypanel.host/api/v1/audio/speech" \ + --api-key "048665fa9596db326c17c6f5f84d7d03" \ + --voice em_alex --model model_q8f16 + +""" + +import argparse +import json +import os +import shlex +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path +from typing import List, Dict + +import requests +import srt + +# Import translation/transcription helpers from process_video +from whisper_project.process_video import ( + extract_audio, + transcribe_and_translate_faster, + transcribe_and_translate_openai, + burn_subtitles, +) + +# Use write_srt from transcribe module if available +from whisper_project.transcribe import write_srt + + +def ensure_ffmpeg(): + if shutil.which("ffmpeg") is None or shutil.which("ffprobe") is None: + print("ffmpeg/ffprobe no encontrados en PATH. Instálalos.") + sys.exit(1) + + +def get_duration(path: str) -> float: + cmd = [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + path, + ] + p = subprocess.run(cmd, capture_output=True, text=True) + if p.returncode != 0: + return 0.0 + try: + return float(p.stdout.strip()) + except Exception: + return 0.0 + + +def pad_or_trim(in_path: str, out_path: str, target_duration: float, sr: int = 22050): + cur = get_duration(in_path) + if cur == 0.0: + # copy as-is + shutil.copy(in_path, out_path) + return True + if abs(cur - target_duration) < 0.02: + # casi igual + shutil.copy(in_path, out_path) + return True + if cur > target_duration: + # recortar + cmd = ["ffmpeg", "-y", "-i", in_path, "-t", f"{target_duration}", out_path] + subprocess.run(cmd, check=True) + return True + else: + # pad: crear silencio de duración faltante y concatenar + pad = target_duration - cur + with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as sil: + sil_path = sil.name + try: + cmd1 = [ + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + f"anullsrc=channel_layout=mono:sample_rate={sr}", + "-t", + f"{pad}", + "-c:a", + "pcm_s16le", + sil_path, + ] + subprocess.run(cmd1, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # concat in_path + sil_path + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf: + listf.write(f"file '{os.path.abspath(in_path)}'\n") + listf.write(f"file '{os.path.abspath(sil_path)}'\n") + listname = listf.name + cmd2 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path] + subprocess.run(cmd2, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + finally: + try: + os.remove(sil_path) + except Exception: + pass + try: + os.remove(listname) + except Exception: + pass + return True + + +def synthesize_segment_kokoro(endpoint: str, api_key: str, model: str, voice: str, text: str) -> bytes: + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "*/*"} + payload = {"model": model, "voice": voice, "input": text, "response_format": "wav"} + r = requests.post(endpoint, json=payload, headers=headers, timeout=120) + r.raise_for_status() + # si viene audio + ctype = r.headers.get("Content-Type", "") + if ctype.startswith("audio/"): + return r.content + # intentar JSON base64 + try: + j = r.json() + for k in ("audio", "wav", "data", "base64"): + if k in j: + import base64 + + return base64.b64decode(j[k]) + except Exception: + pass + # fallback + return r.content + + +def translate_with_gemini(text: str, target_lang: str, api_key: str, model: str = "gemini-2.5-flash") -> str: + """Usa la API HTTP de Gemini para traducir un texto al idioma objetivo. + + Notas: + - Se asume un endpoint compatible con la API de Google Gemini HTTP (OpenAI-like). + - El parámetro `model` por defecto es 'gemini-2.5-flash' según solicitud. + """ + # Endpoint público de ejemplo: https://api.openai.com/v1/responses + # Usamos la ruta /v1/responses que muchas instalaciones usan; si tu instancia Gemini requiere otra URL, + # pásala modificando la función (o la env var GEMINI_ENDPOINT). + # Si la API key parece una clave de Google (empieza con 'AIza'), usar + # la API Generative Language de Google con key en query param. + try: + if api_key and api_key.startswith("AIza"): + gl_model = model + # Formato: https://generativelanguage.googleapis.com/v1beta2/models/{model}:generate?key=API_KEY + gl_endpoint = ( + f"https://generativelanguage.googleapis.com/v1beta2/models/{gl_model}:generateContent?key={api_key}" + ) + body = { + "prompt": {"text": f"Traduce al {target_lang} el siguiente texto, devuelve solo el texto traducido:\n\n{text}"}, + "maxOutputTokens": 1024, + "temperature": 0.0, + "candidateCount": 1, + } + r = requests.post(gl_endpoint, json=body, timeout=20) + r.raise_for_status() + j = r.json() + # la respuesta suele tener 'candidates' con 'content' + if isinstance(j, dict): + if "candidates" in j and isinstance(j["candidates"], list) and j["candidates"]: + first = j["candidates"][0] + if isinstance(first, dict): + # varios formatos posibles + if "content" in first and isinstance(first["content"], str): + return first["content"].strip() + if "output" in first and isinstance(first["output"], str): + return first["output"].strip() + # content puede ser una lista de bloques + if "content" in first and isinstance(first["content"], list): + # buscar textos dentro + parts = [] + for c in first["content"]: + if isinstance(c, dict) and isinstance(c.get("text"), str): + parts.append(c.get("text")) + if parts: + return "\n".join(parts).strip() + # fallback buscar fields comunes + for key in ("output_text", "text", "response", "translated_text"): + if key in j and isinstance(j[key], str): + return j[key].strip() + return text + + # Si no es Google API key, intentar API OpenAI-like (Responses) + gemini_endpoint = os.environ.get("GEMINI_ENDPOINT", "https://api.openai.com/v1/responses") + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + prompt = ( + f"Traduce el siguiente texto al idioma {target_lang}. Mantén solo el texto traducido, sin añadidos:\n\n{text}" + ) + payload = {"model": model, "input": prompt, "max_output_tokens": 1024} + r = requests.post(gemini_endpoint, json=payload, headers=headers, timeout=20) + r.raise_for_status() + j = r.json() + if isinstance(j, dict): + if "output" in j and isinstance(j["output"], list): + for item in j["output"]: + if isinstance(item, dict) and "content" in item: + cont = item["content"] + if isinstance(cont, list): + texts = [c.get("text") for c in cont if isinstance(c, dict) and c.get("text")] + if texts: + return "\n".join(texts).strip() + elif isinstance(cont, str): + return cont.strip() + for key in ("output_text", "text", "response", "translated_text"): + if key in j and isinstance(j[key], str): + return j[key].strip() + if isinstance(j, list) and j: + if isinstance(j[0], str): + return j[0] + if isinstance(j, str): + return j + except Exception as e: + print(f"Warning: Gemini translation failed: {e}") + + return text + + +def concat_chunks(chunk_files: List[str], out_path: str): + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf: + for c in chunk_files: + listf.write(f"file '{os.path.abspath(c)}'\n") + listname = listf.name + cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path] + subprocess.run(cmd, check=True) + try: + os.remove(listname) + except Exception: + pass + + +def replace_audio_in_video(video_path: str, audio_path: str, out_video: str): + cmd = [ + "ffmpeg", + "-y", + "-i", + video_path, + "-i", + audio_path, + "-map", + "0:v:0", + "-map", + "1:a:0", + "-c:v", + "copy", + "-c:a", + "aac", + "-b:a", + "192k", + "-shortest", + out_video, + ] + subprocess.run(cmd, check=True) + + +def normalize_segments(segments) -> List[Dict]: + out = [] + for s in segments: + if isinstance(s, dict): + start = s.get("start") + end = s.get("end") + text = s.get("text", "") + else: + # faster-whisper Segment object + start = getattr(s, "start", None) + end = getattr(s, "end", None) + text = getattr(s, "text", "") + if start is None or end is None: + continue + out.append({"start": float(start), "end": float(end), "text": str(text).strip()}) + return out + + +def main(): + parser = argparse.ArgumentParser(description="Doblar vídeo usando Kokoro y quemar SRT traducido") + parser.add_argument("--video", "-v", required=True) + parser.add_argument("--out", "-o", default=None, help="Vídeo de salida final (con audio reemplazado y SRT quemado)") + parser.add_argument("--temp-dub", default=None, help="Archivo de audio temporal generado (si quieres conservarlo)") + parser.add_argument("--kokoro-endpoint", required=True, help="URL al endpoint /api/v1/audio/speech") + parser.add_argument("--api-key", required=True, help="Token para Authorization: Bearer ") + parser.add_argument("--model", default="model", help="Modelo Kokoro a usar (usa 'model' fp32 326MB)") + parser.add_argument("--voice", default="em_alex", help="Voice id a usar (em_alex)") + parser.add_argument( + "--whisper-backend", + choices=["faster-whisper", "openai-whisper"], + default="faster-whisper", + ) + parser.add_argument("--whisper-model", default="base") + + # Gemini options + parser.add_argument( + "--use-gemini", + action="store_true", + help="Usar Gemini (HTTP) para traducir segmentos en lugar de Whisper translate", + ) + parser.add_argument("--gemini-api-key", default=None, help="API key para Gemini (Bearer)") + parser.add_argument( + "--gemini-model", + default="gemini-2.5-flash", + help="Modelo Gemini a usar (por defecto: gemini-2.5-flash)", + ) + + args = parser.parse_args() + + ensure_ffmpeg() + + video = Path(args.video) + if not video.exists(): + print("Vídeo no encontrado", file=sys.stderr) + sys.exit(2) + + out_video = args.out if args.out else str(video.with_name(video.stem + "_dubbed.mp4")) + tmpdir = tempfile.mkdtemp(prefix="dub_and_burn_") + + try: + audio_wav = os.path.join(tmpdir, "extracted_audio.wav") + print("Extrayendo audio...") + extract_audio(str(video), audio_wav) + + print("Transcribiendo (y traduciendo si no se usa Gemini) ...") + + # Si se solicita Gemini, hacemos transcribe-only y luego traducimos por segmento con Gemini + if args.use_gemini: + # permitir pasar la key por variable de entorno GEMINI_API_KEY + if not args.gemini_api_key: + args.gemini_api_key = os.environ.get("GEMINI_API_KEY") + if not args.gemini_api_key: + print("--use-gemini requiere --gemini-api-key o la var de entorno GEMINI_API_KEY", file=sys.stderr) + sys.exit(4) + # transcribir sin traducir + from faster_whisper import WhisperModel + + wm = WhisperModel(args.whisper_model, device="cpu", compute_type="int8") + segments, info = wm.transcribe(audio_wav, beam_size=5, task="transcribe") + else: + if args.whisper_backend == "faster-whisper": + segments = transcribe_and_translate_faster(audio_wav, args.whisper_model, "es") + else: + segments = transcribe_and_translate_openai(audio_wav, args.whisper_model, "es") + + if not segments: + print("No se obtuvieron segmentos; abortando", file=sys.stderr) + sys.exit(3) + + segs = normalize_segments(segments) + + # si usamos gemini, traducir por segmento ahora + if args.use_gemini: + print(f"Traduciendo {len(segs)} segmentos con Gemini (model={args.gemini_model})...") + for s in segs: + try: + src = s.get("text", "") + if src: + tgt = translate_with_gemini(src, "es", args.gemini_api_key, model=args.gemini_model) + s["text"] = tgt + except Exception as e: + print(f"Warning: Gemini fallo en segmento: {e}") + + # generar SRT traducido + srt_out = os.path.join(tmpdir, "translated.srt") + srt_segments = [] + for i, s in enumerate(segs, start=1): + srt_segments.append(s) + write_srt(srt_segments, srt_out) + print(f"SRT traducido guardado en: {srt_out}") + + # sintetizar por segmento + chunk_files = [] + print(f"Sintetizando {len(segs)} segmentos con Kokoro (voice={args.voice})...") + for i, s in enumerate(segs, start=1): + text = s.get("text", "") + if not text: + # generar silencio con la duración del segmento + target_dur = s["end"] - s["start"] + silent = os.path.join(tmpdir, f"chunk_{i:04d}.wav") + cmd = [ + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + "anullsrc=channel_layout=mono:sample_rate=22050", + "-t", + f"{target_dur}", + "-c:a", + "pcm_s16le", + silent, + ] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + chunk_files.append(silent) + print(f" - Segmento {i}: silencio {target_dur}s") + continue + + try: + raw = synthesize_segment_kokoro(args.kokoro_endpoint, args.api_key, args.model, args.voice, text) + except Exception as e: + print(f"Error sintetizando segmento {i}: {e}") + # fallback: generar silencio + target_dur = s["end"] - s["start"] + silent = os.path.join(tmpdir, f"chunk_{i:04d}.wav") + cmd = [ + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + "anullsrc=channel_layout=mono:sample_rate=22050", + "-t", + f"{target_dur}", + "-c:a", + "pcm_s16le", + silent, + ] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + chunk_files.append(silent) + continue + + # guardar raw en temp file + tmp_chunk = os.path.join(tmpdir, f"raw_chunk_{i:04d}.bin") + with open(tmp_chunk, "wb") as f: + f.write(raw) + + # convertir a WAV estandar (22050 mono) + tmp_wav = os.path.join(tmpdir, f"tmp_chunk_{i:04d}.wav") + cmdc = ["ffmpeg", "-y", "-i", tmp_chunk, "-ar", "22050", "-ac", "1", "-sample_fmt", "s16", tmp_wav] + subprocess.run(cmdc, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # ajustar a la duración del segmento + target_dur = s["end"] - s["start"] + final_chunk = os.path.join(tmpdir, f"chunk_{i:04d}.wav") + pad_or_trim(tmp_wav, final_chunk, target_dur, sr=22050) + chunk_files.append(final_chunk) + print(f" - Segmento {i}/{len(segs)} -> {os.path.basename(final_chunk)}") + + # concatenar chunks + dub_wav = args.temp_dub if args.temp_dub else os.path.join(tmpdir, "dub_final.wav") + print("Concatenando chunks...") + concat_chunks(chunk_files, dub_wav) + print(f"Archivo dub generado en: {dub_wav}") + + # reemplazar audio en el vídeo + replaced = os.path.join(tmpdir, "video_replaced.mp4") + print("Reemplazando pista de audio en el vídeo...") + replace_audio_in_video(str(video), dub_wav, replaced) + + # quemar SRT traducido + print("Quemando SRT traducido en el vídeo...") + burn_subtitles(replaced, srt_out, out_video) + + print(f"Vídeo final generado: {out_video}") + + finally: + try: + shutil.rmtree(tmpdir) + except Exception: + pass + + +if __name__ == '__main__': + main() diff --git a/whisper_project/dub_female_clone_es.wav b/whisper_project/dub_female_clone_es.wav new file mode 100644 index 0000000..b3645ac Binary files /dev/null and b/whisper_project/dub_female_clone_es.wav differ diff --git a/whisper_project/dub_male_clone_ptbr.wav b/whisper_project/dub_male_clone_ptbr.wav new file mode 100644 index 0000000..42b3b57 Binary files /dev/null and b/whisper_project/dub_male_clone_ptbr.wav differ diff --git a/whisper_project/dub_male_style.wav b/whisper_project/dub_male_style.wav new file mode 100644 index 0000000..28202e3 Binary files /dev/null and b/whisper_project/dub_male_style.wav differ diff --git a/whisper_project/dub_male_style_out.wav b/whisper_project/dub_male_style_out.wav new file mode 100644 index 0000000..014aad0 Binary files /dev/null and b/whisper_project/dub_male_style_out.wav differ diff --git a/whisper_project/process_video.py b/whisper_project/process_video.py new file mode 100644 index 0000000..316d8c0 --- /dev/null +++ b/whisper_project/process_video.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 +"""Procesamiento de vídeo: extrae audio, transcribe/traduce y +quema subtítulos. + +Flujo: +- Extrae audio con ffmpeg (WAV 16k mono) +- Transcribe con faster-whisper o openai-whisper + (opción task='translate') +- Escribe SRT y lo incrusta en el vídeo con ffmpeg + +Nota: requiere ffmpeg instalado y, para modelos, faster-whisper +o openai-whisper. +""" +import argparse +import subprocess +import tempfile +from pathlib import Path +import sys + +from transcribe import write_srt + + +def extract_audio(video_path: str, out_audio: str): + cmd = [ + "ffmpeg", + "-y", + "-i", + video_path, + "-vn", + "-acodec", + "pcm_s16le", + "-ar", + "16000", + "-ac", + "1", + out_audio, + ] + subprocess.run(cmd, check=True) + + +def burn_subtitles(video_path: str, srt_path: str, out_video: str): + # Usar filtro subtitles de ffmpeg + cmd = [ + "ffmpeg", + "-y", + "-i", + video_path, + "-vf", + f"subtitles={srt_path}", + "-c:a", + "copy", + out_video, + ] + subprocess.run(cmd, check=True) + + +def transcribe_and_translate_faster(audio_path: str, model: str, target: str): + from faster_whisper import WhisperModel + + wm = WhisperModel(model, device="cpu", compute_type="int8") + segments, info = wm.transcribe( + audio_path, beam_size=5, task="translate", language=target + ) + return segments + + +def transcribe_and_translate_openai(audio_path: str, model: str, target: str): + import whisper + + m = whisper.load_model(model, device="cpu") + result = m.transcribe( + audio_path, fp16=False, task="translate", language=target + ) + return result.get("segments", None) + + +def main(): + parser = argparse.ArgumentParser( + description=( + "Extraer, transcribir/traducir y quemar subtítulos en vídeo" + " (offline)" + ) + ) + parser.add_argument( + "--video", "-v", required=True, help="Ruta del archivo de vídeo" + ) + parser.add_argument( + "--backend", + "-b", + choices=["faster-whisper", "openai-whisper"], + default="faster-whisper", + ) + parser.add_argument( + "--model", + "-m", + default="base", + help="Modelo de whisper a usar (tiny, base, etc.)", + ) + parser.add_argument( + "--to", "-t", default="es", help="Idioma de destino para traducción" + ) + parser.add_argument( + "--out", + "-o", + default=None, + help=( + "Ruta del vídeo de salida (si no se especifica," + " se usa input_burned.mp4)" + ), + ) + parser.add_argument( + "--srt", + default=None, + help=( + "Ruta SRT a escribir (si no se especifica," + " se usa input.srt)" + ), + ) + + args = parser.parse_args() + + video = Path(args.video) + if not video.exists(): + print("Vídeo no encontrado", file=sys.stderr) + sys.exit(2) + + out_video = ( + args.out + if args.out + else str(video.with_name(video.stem + "_burned.mp4")) + ) + srt_path = args.srt if args.srt else str(video.with_suffix('.srt')) + + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: + audio_path = tmp.name + + try: + print("Extrayendo audio con ffmpeg...") + extract_audio(str(video), audio_path) + + print( + f"Transcribiendo y traduciendo a '{args.to}'" + f" usando {args.backend}..." + ) + if args.backend == "faster-whisper": + segments = transcribe_and_translate_faster( + audio_path, args.model, args.to + ) + else: + segments = transcribe_and_translate_openai( + audio_path, args.model, args.to + ) + + if not segments: + print( + "No se obtuvieron segmentos de la transcripción", + file=sys.stderr, + ) + sys.exit(3) + + print(f"Escribiendo SRT en {srt_path}...") + write_srt(segments, srt_path) + + print( + f"Quemando subtítulos en el vídeo -> {out_video}" + f" (esto puede tardar)..." + ) + burn_subtitles(str(video), srt_path, out_video) + + print("Proceso completado.") + finally: + try: + Path(audio_path).unlink() + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/whisper_project/ref_female_es.wav b/whisper_project/ref_female_es.wav new file mode 100644 index 0000000..839bd04 Binary files /dev/null and b/whisper_project/ref_female_es.wav differ diff --git a/whisper_project/requirements.txt b/whisper_project/requirements.txt new file mode 100644 index 0000000..c841123 --- /dev/null +++ b/whisper_project/requirements.txt @@ -0,0 +1,12 @@ +# Dependencias básicas para ejecutar Whisper en CPU +torch>=1.12.0 +ffmpeg-python +numpy +# Optional backends (comment/uncomment as needed) +openai-whisper +transformers +faster-whisper +# TTS (opcional) +TTS +pyttsx3 +huggingface-hub diff --git a/whisper_project/run_xtts_clone.py b/whisper_project/run_xtts_clone.py new file mode 100644 index 0000000..7cc5149 --- /dev/null +++ b/whisper_project/run_xtts_clone.py @@ -0,0 +1,17 @@ +import os, traceback +from TTS.api import TTS + +out='whisper_project/dub_female_xtts_es.wav' +speaker='whisper_project/ref_female_es.wav' +text='Hola, esta es una prueba de clonación usando xtts_v2 en español latino.' +model='tts_models/multilingual/multi-dataset/xtts_v2' + +try: + print('Cargando modelo:', model) + tts = TTS(model_name=model, progress_bar=True, gpu=False) + print('Llamando a tts_to_file con speaker_wav=', speaker) + tts.tts_to_file(text=text, file_path=out, speaker_wav=speaker, language='es') + print('Generado:', out, 'size=', os.path.getsize(out)) +except Exception as e: + print('Error durante la clonación:') + traceback.print_exc() diff --git a/whisper_project/srt_to_kokoro.py b/whisper_project/srt_to_kokoro.py new file mode 100644 index 0000000..64e611d --- /dev/null +++ b/whisper_project/srt_to_kokoro.py @@ -0,0 +1,492 @@ +#!/usr/bin/env python3 +""" +srt_to_kokoro.py + +Leer un archivo .srt y sintetizar cada subtítulo usando una API OpenAPI-compatible (p. ej. Kokoro). +- Intenta autodetectar un endpoint de síntesis en `--openapi` (URL JSON) buscando paths que contengan 'synth'|'tts'|'text' y que acepten POST. +- Alternativamente usa `--endpoint` y un `--payload-template` con {text} como placeholder. +- Guarda fragmentos temporales y los concatena con ffmpeg en un único WAV de salida. + +Dependencias: requests, srt (pip install requests srt) +Requiere ffmpeg en PATH. + +Ejemplos: + python srt_to_kokoro.py --srt subs.srt --openapi "https://kokoro.../openapi.json" --voice "alloy" --out out.wav --api-key "TOKEN" + python srt_to_kokoro.py --srt subs.srt --endpoint "https://kokoro.../v1/synthesize" --payload-template '{"text": "{text}", "voice": "alloy"}' --out out.wav + +""" + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import tempfile +from typing import Optional + +try: + import requests +except Exception as e: + print("Este script requiere la librería 'requests'. Instálala con: pip install requests") + raise + +try: + import srt +except Exception: + print("Este script requiere la librería 'srt'. Instálala con: pip install srt") + raise + + +def find_synthesis_endpoint(openapi_url: str) -> Optional[str]: + """Intento heurístico: baja openapi.json y busca paths con 'synth'|'tts'|'text' que soporten POST.""" + try: + r = requests.get(openapi_url, timeout=20) + r.raise_for_status() + spec = r.json() + except Exception as e: + print(f"No pude leer openapi.json desde {openapi_url}: {e}") + return None + + paths = spec.get("paths", {}) + candidate = None + for path, methods in paths.items(): + lname = path.lower() + if any(k in lname for k in ("synth", "tts", "text", "synthesize")): + for method, op in methods.items(): + if method.lower() == "post": + # candidato + candidate = path + break + if candidate: + break + + if not candidate: + # fallback: scan operationId or summary + for path, methods in paths.items(): + for method, op in methods.items(): + meta = json.dumps(op).lower() + if any(k in meta for k in ("synth", "tts", "text", "synthesize")) and method.lower() == "post": + candidate = path + break + if candidate: + break + + if not candidate: + return None + + # Construir base url desde openapi_url + from urllib.parse import urlparse, urljoin + p = urlparse(openapi_url) + base = f"{p.scheme}://{p.netloc}" + return urljoin(base, candidate) + + +def parse_srt_file(path: str): + with open(path, "r", encoding="utf-8") as f: + raw = f.read() + subs = list(srt.parse(raw)) + return subs + + +def synth_chunk(endpoint: str, text: str, headers: dict, payload_template: Optional[str], timeout=60): + """Envía la solicitud y devuelve bytes de audio. Maneja respuestas audio/* o JSON con campo base64.""" + # Construir payload + if payload_template: + body = payload_template.replace("{text}", text) + try: + json_body = json.loads(body) + except Exception: + # enviar como texto plano + json_body = {"text": text} + else: + json_body = {"text": text} + + # Realizar POST + r = requests.post(endpoint, json=json_body, headers=headers, timeout=timeout) + r.raise_for_status() + + ctype = r.headers.get("Content-Type", "") + if ctype.startswith("audio/"): + return r.content + # Si viene JSON con base64 + try: + j = r.json() + # buscar campos con 'audio' o 'wav' o 'base64' + for k in ("audio", "wav", "data", "base64"): + if k in j: + val = j[k] + # si es base64 + import base64 + try: + return base64.b64decode(val) + except Exception: + # tal vez ya es bytes hex u otra cosa + pass + except Exception: + pass + + # Fallback: devolver raw bytes + return r.content + + +def ensure_ffmpeg(): + if shutil.which("ffmpeg") is None: + print("ffmpeg no está disponible en PATH. Instálalo para poder concatenar/convertir audios.") + sys.exit(1) + + +def convert_and_save(raw_bytes: bytes, target_path: str): + """Guarda bytes a un archivo temporal y convierte a WAV PCM 16k mono usando ffmpeg.""" + with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as tmp: + tmp.write(raw_bytes) + tmp.flush() + tmp_path = tmp.name + + # Convertir con ffmpeg a WAV 22050 Hz mono 16-bit + cmd = [ + "ffmpeg", "-y", "-i", tmp_path, + "-ar", "22050", "-ac", "1", "-sample_fmt", "s16", target_path + ] + try: + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except subprocess.CalledProcessError as e: + print(f"ffmpeg falló al convertir chunk: {e}") + # como fallback, escribir los bytes "crudos" + with open(target_path, "wb") as out: + out.write(raw_bytes) + finally: + try: + os.remove(tmp_path) + except Exception: + pass + + +def create_silence(duration: float, out_path: str, sr: int = 22050): + """Create a silent wav of given duration (seconds) at sr and save to out_path.""" + # use ffmpeg anullsrc + cmd = [ + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + f"anullsrc=channel_layout=mono:sample_rate={sr}", + "-t", + f"{duration}", + "-c:a", + "pcm_s16le", + out_path, + ] + try: + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except subprocess.CalledProcessError: + # fallback: write tiny silence by creating zero bytes + try: + with open(out_path, "wb") as fh: + fh.write(b"\x00" * 1024) + except Exception: + pass + + +def pad_or_trim_wav(in_path: str, out_path: str, target_duration: float, sr: int = 22050): + """Pad with silence or trim input wav to match target_duration (seconds).""" + # get duration + try: + p = subprocess.run([ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + in_path, + ], capture_output=True, text=True) + cur = float(p.stdout.strip()) + except Exception: + cur = 0.0 + + if cur == 0.0: + shutil.copy(in_path, out_path) + return + + if abs(cur - target_duration) < 0.02: + shutil.copy(in_path, out_path) + return + + if cur > target_duration: + cmd = ["ffmpeg", "-y", "-i", in_path, "-t", f"{target_duration}", out_path] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return + + # pad: create silence of missing duration and concat + pad = target_duration - cur + with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as sil: + sil_path = sil.name + try: + create_silence(pad, sil_path, sr=sr) + # concat in_path + sil_path + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf: + listf.write(f"file '{os.path.abspath(in_path)}'\n") + listf.write(f"file '{os.path.abspath(sil_path)}'\n") + listname = listf.name + cmd2 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path] + subprocess.run(cmd2, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + finally: + try: + os.remove(sil_path) + except Exception: + pass + try: + os.remove(listname) + except Exception: + pass + + +def concat_chunks(chunks: list, out_path: str): + # Crear lista para ffmpeg concat demuxer + ensure_ffmpeg() + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as listf: + for c in chunks: + listf.write(f"file '{os.path.abspath(c)}'\n") + listname = listf.name + + cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", listname, "-c", "copy", out_path] + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError: + # fallback: concatenar mediante reconversión + tmp_concat = out_path + ".tmp.wav" + cmd2 = ["ffmpeg", "-y", "-i", f"concat:{'|'.join(chunks)}", "-c", "copy", tmp_concat] + subprocess.run(cmd2) + shutil.move(tmp_concat, out_path) + finally: + try: + os.remove(listname) + except Exception: + pass + + +def main(): + p = argparse.ArgumentParser() + p.add_argument("--srt", required=True, help="Ruta al archivo .srt traducido") + p.add_argument("--openapi", required=False, help="URL al openapi.json de Kokoro (intenta autodetectar endpoint)") + p.add_argument("--endpoint", required=False, help="URL directa del endpoint de síntesis (usa esto si autodetección falla)") + p.add_argument( + "--payload-template", + required=False, + help='Plantilla JSON para el payload con {text} como placeholder, ejemplo: "{\"text\": \"{text}\", \"voice\": \"alloy\"}"', + ) + p.add_argument("--api-key", required=False, help="Valor para autorización (se envía como header Authorization: Bearer )") + p.add_argument("--voice", required=False, help="Nombre de voz si aplica (se añade al payload si se usa template)") + p.add_argument("--out", required=True, help="Ruta de salida WAV final") + p.add_argument( + "--video", + required=False, + help="Ruta al vídeo original (necesario si quieres mezclar el audio con la pista original).", + ) + p.add_argument( + "--mix-with-original", + action="store_true", + help="Mezclar el WAV generado con la pista de audio original del vídeo (usa --video).", + ) + p.add_argument( + "--mix-background-volume", + type=float, + default=0.2, + help="Volumen de la pista original al mezclar (0.0-1.0), por defecto 0.2", + ) + p.add_argument( + "--replace-original", + action="store_true", + help="Reemplazar la pista de audio del vídeo original por el WAV generado (usa --video).", + ) + p.add_argument( + "--align", + action="store_true", + help="Generar silencios para alinear segmentos con los timestamps del SRT (inserta gaps entre segmentos).", + ) + p.add_argument( + "--keep-chunks", + action="store_true", + help="Conservar los WAV de cada segmento en el directorio temporal (útil para debugging).", + ) + args = p.parse_args() + + headers = {"Accept": "*/*"} + if args.api_key: + headers["Authorization"] = f"Bearer {args.api_key}" + + endpoint = args.endpoint + if not endpoint and args.openapi: + print("Intentando detectar endpoint desde openapi.json...") + endpoint = find_synthesis_endpoint(args.openapi) + if endpoint: + print(f"Usando endpoint detectado: {endpoint}") + else: + print("No se detectó endpoint automáticamente. Pasa --endpoint o --payload-template.") + sys.exit(1) + + if not endpoint: + print("Debes proporcionar --endpoint o --openapi para que el script funcione.") + sys.exit(1) + + subs = parse_srt_file(args.srt) + tmpdir = tempfile.mkdtemp(prefix="srt_kokoro_") + chunk_files = [] + + print(f"Sintetizando {len(subs)} segmentos...") + prev_end = 0.0 + for i, sub in enumerate(subs, start=1): + text = re.sub(r"\s+", " ", sub.content.strip()) + if not text: + prev_end = sub.end.total_seconds() + continue + + start_sec = sub.start.total_seconds() + end_sec = sub.end.total_seconds() + duration = end_sec - start_sec + + # if align requested, insert silence for gap between previous end and current start + if args.align: + gap = start_sec - prev_end + if gap > 0.01: + sil_target = os.path.join(tmpdir, f"sil_{i:04d}.wav") + create_silence(gap, sil_target) + chunk_files.append(sil_target) + + try: + raw = synth_chunk(endpoint, text, headers, args.payload_template) + except Exception as e: + print(f"Error al sintetizar segmento {i}: {e}") + prev_end = end_sec + continue + + target = os.path.join(tmpdir, f"chunk_{i:04d}.wav") + convert_and_save(raw, target) + + # If align: pad or trim to subtitle duration, otherwise keep raw chunk + if args.align: + aligned = os.path.join(tmpdir, f"chunk_{i:04d}.aligned.wav") + pad_or_trim_wav(target, aligned, duration) + # replace target with aligned file in list + chunk_files.append(aligned) + # remove original raw chunk unless keep-chunks + if not args.keep_chunks: + try: + os.remove(target) + except Exception: + pass + else: + chunk_files.append(target) + + prev_end = end_sec + print(f" - Segmento {i}/{len(subs)} -> {os.path.basename(chunk_files[-1])}") + + if not chunk_files: + print("No se generaron fragmentos de audio. Abortando.") + shutil.rmtree(tmpdir, ignore_errors=True) + sys.exit(1) + + print("Concatenando fragments...") + concat_chunks(chunk_files, args.out) + print(f"Archivo final generado en: {args.out}") + + # Si el usuario pidió mezclar con la pista original del vídeo + if args.mix_with_original: + if not args.video: + print("--mix-with-original requiere que pases --video con la ruta del vídeo original.") + else: + # extraer audio del vídeo original a wav temporal (mono 22050) + orig_tmp = os.path.join(tempfile.gettempdir(), f"orig_audio_{os.getpid()}.wav") + mixed_tmp = os.path.join(tempfile.gettempdir(), f"mixed_audio_{os.getpid()}.wav") + try: + cmd_ext = [ + "ffmpeg", + "-y", + "-i", + args.video, + "-vn", + "-ar", + "22050", + "-ac", + "1", + "-sample_fmt", + "s16", + orig_tmp, + ] + subprocess.run(cmd_ext, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # Mezclar: new audio (args.out) en primer plano, original a volumen reducido + vol = float(args.mix_background_volume) + # construir filtro: [0:a]volume=1[a1];[1:a]volume=vol[a0];[a1][a0]amix=inputs=2:duration=first:weights=1 vol [mix] + filter_complex = f"[0:a]volume=1[a1];[1:a]volume={vol}[a0];[a1][a0]amix=inputs=2:duration=first:weights=1 {vol}[mix]" + # usar ffmpeg para mezclar y generar mixed_tmp + cmd_mix = [ + "ffmpeg", + "-y", + "-i", + args.out, + "-i", + orig_tmp, + "-filter_complex", + f"[0:a]volume=1[a1];[1:a]volume={vol}[a0];[a1][a0]amix=inputs=2:duration=first:dropout_transition=0[mix]", + "-map", + "[mix]", + "-c:a", + "pcm_s16le", + mixed_tmp, + ] + subprocess.run(cmd_mix, check=True) + + # reemplazar args.out con mixed_tmp + shutil.move(mixed_tmp, args.out) + print(f"Archivo mezclado generado en: {args.out}") + except subprocess.CalledProcessError as e: + print(f"Error al mezclar audio con la pista original: {e}") + finally: + try: + if os.path.exists(orig_tmp): + os.remove(orig_tmp) + except Exception: + pass + + # Si se solicita reemplazar la pista original en el vídeo + if args.replace_original: + if not args.video: + print("--replace-original requiere que pases --video con la ruta del vídeo original.") + else: + out_video = os.path.splitext(args.video)[0] + ".replaced_audio.mp4" + try: + cmd_rep = [ + "ffmpeg", + "-y", + "-i", + args.video, + "-i", + args.out, + "-map", + "0:v:0", + "-map", + "1:a:0", + "-c:v", + "copy", + "-c:a", + "aac", + "-b:a", + "192k", + out_video, + ] + subprocess.run(cmd_rep, check=True) + print(f"Vídeo con audio reemplazado generado: {out_video}") + except subprocess.CalledProcessError as e: + print(f"Error al reemplazar audio en el vídeo: {e}") + + # limpieza + shutil.rmtree(tmpdir, ignore_errors=True) + + +if __name__ == '__main__': + main() diff --git a/whisper_project/transcribe.py b/whisper_project/transcribe.py new file mode 100644 index 0000000..40d59e8 --- /dev/null +++ b/whisper_project/transcribe.py @@ -0,0 +1,890 @@ +#!/usr/bin/env python3 +"""Transcribe audio usando distintos backends de Whisper. + +Soportados: openai-whisper, transformers, faster-whisper +""" +import argparse +import sys +from pathlib import Path + + +def transcribe_openai_whisper(file: str, model: str): + import whisper + + print(f"Cargando openai-whisper modelo={model} en CPU...") + m = whisper.load_model(model, device="cpu") + print("Transcribiendo...") + result = m.transcribe(file, fp16=False) + # openai-whisper devuelve 'segments' con start, end y text + segments = result.get("segments", None) + if segments: + for seg in segments: + print(seg.get("text", "")) + return segments + else: + print(result.get("text", "")) + return None + + +def transcribe_transformers(file: str, model: str): + import torch + from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline + + device = "cpu" + torch_dtype = torch.float32 + + print(f"Cargando transformers modelo={model} en CPU...") + model_obj = AutoModelForSpeechSeq2Seq.from_pretrained(model, torch_dtype=torch_dtype, low_cpu_mem_usage=True) + model_obj.to(device) + processor = AutoProcessor.from_pretrained(model) + + pipe = pipeline( + "automatic-speech-recognition", + model=model_obj, + tokenizer=processor.tokenizer, + feature_extractor=processor.feature_extractor, + device=-1, + ) + + print("Transcribiendo...") + result = pipe(file) + # result puede ser dict o str dependiendo de la versión + if isinstance(result, dict): + print(result.get("text", "")) + else: + print(result) + # transformers pipeline normalmente no devuelve segmentos temporales + return None + + +def transcribe_faster_whisper(file: str, model: str, compute_type: str = "int8"): + from faster_whisper import WhisperModel + + print(f"Cargando faster-whisper modelo={model} en CPU compute_type={compute_type}...") + model_obj = WhisperModel(model, device="cpu", compute_type=compute_type) + print("Transcribiendo...") + segments_gen, info = model_obj.transcribe(file, beam_size=5) + # faster-whisper may return a generator; convert to list to allow multiple passes + segments = list(segments_gen) + text = "".join([seg.text for seg in segments]) + print(text) + # segments es una lista de objetos con .start, .end, .text + return segments + + +def main(): + parser = argparse.ArgumentParser( + description="Transcribe audio usando Whisper (varios backends)" + ) + parser.add_argument( + "--file", "-f", required=True, help="Ruta al archivo de audio" + ) + parser.add_argument( + "--backend", + "-b", + choices=["openai-whisper", "transformers", "faster-whisper"], + default="faster-whisper", + help="Backend a usar", + ) + parser.add_argument( + "--model", + "-m", + default="base", + help="Nombre del modelo (ej: tiny, base)", + ) + parser.add_argument( + "--compute-type", + "-c", + default="int8", + help="compute_type para faster-whisper", + ) + parser.add_argument( + "--srt", + action="store_true", + help="Generar archivo SRT con timestamps (si el backend lo soporta)", + ) + parser.add_argument( + "--srt-file", + default=None, + help=( + "Ruta del archivo SRT de salida. Por defecto: mismo nombre" + " que el audio con extensión .srt" + ), + ) + parser.add_argument( + "--srt-fallback", + action="store_true", + help=( + "Generar SRT aproximado si backend no devuelve segmentos." + ), + ) + parser.add_argument( + "--segment-transcribe", + action="store_true", + help=( + "Cuando se usa --srt-fallback, transcribir cada segmento usando" + " archivos temporales para rellenar el texto" + ), + ) + parser.add_argument( + "--segment-overlap", + type=float, + default=0.2, + help=( + "Superposición en segundos entre segmentos al transcribir por" + " segmentos (por defecto: 0.2)" + ), + ) + parser.add_argument( + "--srt-segment-seconds", + type=float, + default=10.0, + help=( + "Duración en segundos de cada segmento para el SRT de fallback." + " Por defecto: 10.0" + ), + ) + parser.add_argument( + "--tts", + action="store_true", + help="Generar audio TTS a partir del texto transcrito", + ) + parser.add_argument( + "--tts-model", + default="kokoro", + help="Nombre del modelo TTS a usar (ej: kokoro)", + ) + parser.add_argument( + "--tts-model-repo", + default=None, + help=( + "Repo de Hugging Face para el modelo TTS (ej: user/kokoro)." + " Si se especifica, se descargará automáticamente." + ), + ) + parser.add_argument( + "--dub", + action="store_true", + help=( + "Generar pista doblada (por segmentos) a partir del texto transcrito" + ), + ) + parser.add_argument( + "--dub-out", + default=None, + help=("Ruta de salida para el audio doblado (WAV). Por defecto: mismo nombre + .dub.wav"), + ) + parser.add_argument( + "--dub-mode", + choices=["replace", "mix"], + default="replace", + help=("Modo de doblaje: 'replace' reemplaza voz original por TTS; 'mix' mezcla ambas pistas"), + ) + parser.add_argument( + "--dub-mix-level", + type=float, + default=0.75, + help=("Cuando --dub-mode=mix, nivel de volumen del TTS relativo (0-1)."), + ) + + args = parser.parse_args() + + path = Path(args.file) + if not path.exists(): + print(f"Archivo no encontrado: {args.file}", file=sys.stderr) + sys.exit(2) + + # Shortcut: si el usuario solo quiere SRT de fallback sin transcribir + # por segmentos, no necesitamos cargar ningún backend (evita errores + # si faster-whisper/whisper no están instalados). + if args.srt and args.srt_fallback and not args.segment_transcribe: + duration = get_audio_duration(args.file) + if duration is None: + print( + "No se pudo obtener duración; no se puede generar SRT de fallback.", + file=sys.stderr, + ) + sys.exit(4) + fallback_segments = make_uniform_segments(duration, args.srt_segment_seconds) + srt_file_arg = args.srt_file + srt_path = ( + srt_file_arg + if srt_file_arg + else str(path.with_suffix('.srt')) + ) + # crear segmentos vacíos + filled_segments = [ + {"start": s["start"], "end": s["end"], "text": ""} + for s in fallback_segments + ] + write_srt(filled_segments, srt_path) + print(f"SRT de fallback guardado en: {srt_path}") + sys.exit(0) + + try: + segments = None + if args.backend == "openai-whisper": + segments = transcribe_openai_whisper(args.file, args.model) + elif args.backend == "transformers": + segments = transcribe_transformers(args.file, args.model) + else: + segments = transcribe_faster_whisper( + args.file, args.model, compute_type=args.compute_type + ) + + # Si se pide SRT y tenemos segmentos, escribir archivo SRT + if args.srt: + if segments: + # determinar nombre del srt + # determinar nombre del srt + srt_file_arg = args.srt_file + srt_path = ( + srt_file_arg + if srt_file_arg + else str(path.with_suffix('.srt')) + ) + segments_to_write = dedupe_adjacent_segments(segments) + write_srt(segments_to_write, srt_path) + print(f"SRT guardado en: {srt_path}") + else: + if args.srt_fallback: + # intentar generar SRT aproximado + duration = get_audio_duration(args.file) + if duration is None: + print( + "No se pudo obtener duración;" + " no se puede generar SRT de fallback.", + file=sys.stderr, + ) + sys.exit(4) + fallback_segments = make_uniform_segments( + duration, args.srt_segment_seconds + ) + # Para cada segmento intentamos obtener transcripción + # parcial. + filled_segments = [] + if args.segment_transcribe: + # extraer cada segmento a un archivo temporal + # y transcribir + filled = transcribe_segmented_with_tempfiles( + args.file, + fallback_segments, + backend=args.backend, + model=args.model, + compute_type=args.compute_type, + overlap=args.segment_overlap, + ) + filled_segments = filled + else: + for seg in fallback_segments: + seg_obj = { + "start": seg["start"], + "end": seg["end"], + "text": "", + } + filled_segments.append(seg_obj) + srt_file_arg = args.srt_file + srt_path = ( + srt_file_arg + if srt_file_arg + else str(path.with_suffix('.srt')) + ) + segments_to_write = dedupe_adjacent_segments( + filled_segments + ) + write_srt(segments_to_write, srt_path) + print(f"SRT de fallback guardado en: {srt_path}") + print( + "Nota: para SRT con texto, habilite transcripción" + " por segmento o use un backend que devuelva" + " segmentos." + ) + sys.exit(0) + else: + print( + "El backend elegido no devolvió segmentos temporales;" + " no se puede generar SRT.", + file=sys.stderr, + ) + sys.exit(3) + except Exception as e: + print(f"Error durante la transcripción: {e}", file=sys.stderr) + sys.exit(1) + + # Bloque TTS: sintetizar texto completo si se solicitó + if args.tts: + # si se especificó un repo, asegurar modelo descargado + if args.tts_model_repo: + model_path = ensure_tts_model(args.tts_model_repo) + # usar la ruta local como modelo + args.tts_model = model_path + + all_text = None + if segments: + all_text = "\n".join( + [ + s.get("text", "") if isinstance(s, dict) else s.text + for s in segments + ] + ) + if all_text: + tts_out = str(path.with_suffix(".tts.wav")) + ok = tts_synthesize( + all_text, tts_out, model=args.tts_model + ) + if ok: + print(f"TTS guardado en: {tts_out}") + else: + print( + "Error al sintetizar TTS; comprueba dependencias.", + file=sys.stderr, + ) + sys.exit(5) + + # Bloque de doblaje por segmentos: sintetizar cada segmento y generar + # un archivo WAV concatenado con la pista doblada. El audio resultante + # mantiene la duración de los segmentos originales (paddings/recortes + # simples) para poder reemplazar o mezclar con la pista original. + if args.dub: + # decidir ruta de salida + dub_out = ( + args.dub_out + if args.dub_out + else str(Path(args.file).with_suffix(".dub.wav")) + ) + + # si no tenemos segmentos, intentar fallback con transcripción por segmentos + use_segments = segments + if not use_segments: + duration = get_audio_duration(args.file) + if duration is None: + print( + "No se pudo obtener la duración del audio; no se puede doblar.", + file=sys.stderr, + ) + sys.exit(6) + fallback_segments = make_uniform_segments(duration, args.srt_segment_seconds) + if args.segment_transcribe: + print("Obteniendo transcripciones por segmento para doblaje...") + use_segments = transcribe_segmented_with_tempfiles( + args.file, + fallback_segments, + backend=args.backend, + model=args.model, + compute_type=args.compute_type, + overlap=args.segment_overlap, + ) + else: + # crear segmentos vacíos (no tiene texto) + use_segments = [ + {"start": s["start"], "end": s["end"], "text": ""} + for s in fallback_segments + ] + + # asegurar modelo TTS local si se indicó repo + if args.tts_model_repo: + model_path = ensure_tts_model(args.tts_model_repo) + args.tts_model = model_path + + ok = synthesize_dubbed_audio( + src_audio=args.file, + segments=use_segments, + tts_model=args.tts_model, + out_path=dub_out, + mode=args.dub_mode, + mix_level=args.dub_mix_level, + ) + if ok: + print(f"Audio doblado guardado en: {dub_out}") + else: + print("Error generando audio doblado.", file=sys.stderr) + sys.exit(7) + + + + + +def _format_timestamp(seconds: float) -> str: + """Formatea segundos en timestamp SRT hh:mm:ss,mmm""" + millis = int((seconds - int(seconds)) * 1000) + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + s = int(seconds % 60) + return f"{h:02d}:{m:02d}:{s:02d},{millis:03d}" + + +def write_srt(segments, out_path: str): + """Escribe una lista de segmentos en formato SRT. + + segments: iterable de objetos o dicts con .start, .end y .text + """ + lines = [] + for i, seg in enumerate(segments, start=1): + # soportar objetos con atributos o dicts + if hasattr(seg, "start"): + start = float(seg.start) + end = float(seg.end) + text = seg.text if hasattr(seg, "text") else str(seg) + else: + start = float(seg.get("start", 0.0)) + end = float(seg.get("end", 0.0)) + text = seg.get("text", "") + + start_ts = _format_timestamp(start) + end_ts = _format_timestamp(end) + lines.append(str(i)) + lines.append(f"{start_ts} --> {end_ts}") + # normalize text newlines + for line in str(text).strip().splitlines(): + lines.append(line) + lines.append("") + + Path(out_path).write_text("\n".join(lines), encoding="utf-8") + + +def dedupe_adjacent_segments(segments): + """Eliminar duplicados simples entre segmentos adyacentes. + + Estrategia simple: si el final de un segmento y el inicio del + siguiente comparten una secuencia de palabras, eliminamos la + duplicación del inicio del siguiente. + """ + if not segments: + return segments + + # Normalize incoming segments to a list of dicts with keys start,end,text + norm = [] + for s in segments: + if hasattr(s, "start"): + norm.append({"start": float(s.start), "end": float(s.end), "text": getattr(s, "text", "")}) + else: + # assume mapping-like + norm.append({"start": float(s.get("start", 0.0)), "end": float(s.get("end", 0.0)), "text": s.get("text", "")}) + + out = [norm[0].copy()] + for seg in norm[1:]: + prev = out[-1] + a = (prev.get("text") or "").strip() + b = (seg.get("text") or "").strip() + if not a or not b: + out.append(seg.copy()) + continue + + # tokenizar en palabras (espacios) y buscar la mayor superposición + a_words = a.split() + b_words = b.split() + max_ol = 0 + max_k = min(len(a_words), len(b_words), 10) + for k in range(1, max_k + 1): + if a_words[-k:] == b_words[:k]: + max_ol = k + + if max_ol > 0: + # quitar las primeras max_ol palabras de b + new_b = " ".join(b_words[max_ol:]).strip() + new_seg = seg.copy() + new_seg["text"] = new_b + out.append(new_seg) + else: + out.append(seg.copy()) + + return out + + +def get_audio_duration(file_path: str): + """Obtiene la duración del audio en segundos usando ffprobe. + + Devuelve float (segundos) o None si no se puede obtener. + """ + try: + import subprocess + + cmd = [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + file_path, + ] + out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) + return float(out.strip()) + except Exception: + return None + + +def make_uniform_segments(duration: float, seg_seconds: float): + """Genera una lista de segmentos uniformes [{start, end}, ...].""" + segments = [] + if duration <= 0 or seg_seconds <= 0: + return segments + start = 0.0 + idx = 0 + while start < duration: + end = min(start + seg_seconds, duration) + segments.append({"start": round(start, 3), "end": round(end, 3)}) + idx += 1 + start = end + return segments + + +def transcribe_segmented_with_tempfiles( + src_file: str, + segments: list, + backend: str = "faster-whisper", + model: str = "base", + compute_type: str = "int8", + overlap: float = 0.2, +): + """Recorta `src_file` en segmentos y transcribe cada uno. + + Retorna lista de dicts {'start','end','text'} para cada segmento. + """ + import subprocess + import tempfile + + results = [] + for seg in segments: + start = max(0.0, float(seg["start"]) - overlap) + end = float(seg["end"]) + overlap + duration = end - start + + with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp: + tmp_path = tmp.name + cmd = [ + "ffmpeg", + "-y", + "-ss", + str(start), + "-t", + str(duration), + "-i", + src_file, + "-ar", + "16000", + "-ac", + "1", + tmp_path, + ] + try: + subprocess.run( + cmd, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except Exception: + # si falla el recorte, dejar texto vacío + results.append( + {"start": seg["start"], "end": seg["end"], "text": ""} + ) + continue + + # transcribir tmp_path con el backend + try: + if backend == "openai-whisper": + import whisper + + m = whisper.load_model(model, device="cpu") + res = m.transcribe(tmp_path, fp16=False) + text = res.get("text", "") + elif backend == "transformers": + # pipeline de transformers + import torch + from transformers import ( + AutoModelForSpeechSeq2Seq, + AutoProcessor, + pipeline, + ) + + torch_dtype = torch.float32 + model_obj = AutoModelForSpeechSeq2Seq.from_pretrained( + model, torch_dtype=torch_dtype, low_cpu_mem_usage=True + ) + model_obj.to("cpu") + processor = AutoProcessor.from_pretrained(model) + pipe = pipeline( + "automatic-speech-recognition", + model=model_obj, + tokenizer=processor.tokenizer, + feature_extractor=processor.feature_extractor, + device=-1, + ) + out = pipe(tmp_path) + text = out["text"] if isinstance(out, dict) else str(out) + else: + # faster-whisper + from faster_whisper import WhisperModel + + wmodel = WhisperModel( + model, device="cpu", compute_type=compute_type + ) + segs_gen, info = wmodel.transcribe(tmp_path, beam_size=5) + segs = list(segs_gen) + text = "".join([s.text for s in segs]) + + except Exception: + text = "" + + results.append( + {"start": seg["start"], "end": seg["end"], "text": text} + ) + + return results + + +def tts_synthesize(text: str, out_path: str, model: str = "kokoro"): + """Sintetiza `text` a `out_path` usando Coqui TTS si está disponible, + o pyttsx3 como fallback simple. + """ + try: + # Intentar Coqui TTS + from TTS.api import TTS + + # El usuario debe tener el modelo descargado o especificar el id + tts = TTS(model_name=model, progress_bar=False, gpu=False) + tts.tts_to_file(text=text, file_path=out_path) + return True + except Exception: + try: + # Fallback a pyttsx3 (menos natural, offline) + import pyttsx3 + + engine = pyttsx3.init() + engine.save_to_file(text, out_path) + engine.runAndWait() + return True + except Exception: + return False + + +def ensure_tts_model(repo_id: str): + """Descarga un repo de Hugging Face y devuelve la ruta local. + + Usa huggingface_hub.snapshot_download. Si la descarga falla, devuelve + el repo_id tal cual (se intentará usar como id remoto). + """ + try: + from huggingface_hub import snapshot_download + + print(f"Descargando modelo TTS desde: {repo_id} ...") + try: + # intentar descarga explícita como 'model' (útil para ids con '/'). + local_dir = snapshot_download(repo_id, repo_type="model") + except Exception: + # fallback al comportamiento por defecto + local_dir = snapshot_download(repo_id) + print(f"Modelo descargado en: {local_dir}") + return local_dir + except Exception as e: + print(f"No se pudo descargar el modelo {repo_id}: {e}") + return repo_id + + +def _pad_or_trim_wav(in_path: str, out_path: str, target_duration: float): + """Pad or trim `in_path` WAV to `target_duration` seconds using ffmpeg. + + Creates `out_path` with exactly target_duration seconds. If input is + shorter, pads with silence; if longer, trims. + """ + import subprocess + + # ffmpeg -y -i in.wav -af apad=pad_dur=...,atrim=duration=... -ar 16000 -ac 1 out.wav + try: + # Use apad then atrim to ensure exact duration + cmd = [ + "ffmpeg", + "-y", + "-i", + in_path, + "-af", + f"apad=pad_dur={max(0, target_duration)}", + "-t", + f"{target_duration}", + "-ar", + "16000", + "-ac", + "1", + out_path, + ] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return True + except Exception: + return False + + +def synthesize_segment_tts(text: str, model: str, dur: float, out_wav: str) -> bool: + """Sintetiza `text` en `out_wav` y ajusta su duración a `dur` segundos. + + - Primero genera un WAV temporal con `tts_synthesize`. + - Luego lo pad/recorta a `dur` usando ffmpeg. + """ + import tempfile + import os + + try: + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: + tmp_path = tmp.name + + ok = tts_synthesize(text, tmp_path, model=model) + if not ok: + # cleanup + try: + os.remove(tmp_path) + except Exception: + pass + return False + + # ajustar duración + adjusted = _pad_or_trim_wav(tmp_path, out_wav, target_duration=dur) + try: + os.remove(tmp_path) + except Exception: + pass + return adjusted + except Exception: + return False + + +def synthesize_dubbed_audio( + src_audio: str, + segments: list, + tts_model: str, + out_path: str, + mode: str = "replace", + mix_level: float = 0.75, +): + """Genera una pista doblada a partir de `segments` y el audio fuente. + + - segments: lista de dicts con 'start','end','text' (en segundos). + - mode: 'replace' (devuelve solo TTS concatenado) o 'mix' (mezcla TTS y original). + - mix_level: volumen relativo del TTS cuando se mezcla (0-1). + + Retorna True si se generó correctamente `out_path`. + """ + import tempfile + import os + import subprocess + + # Normalizar segmentos a lista de dicts {'start','end','text'} + norm_segments = [] + for s in segments: + if hasattr(s, "start"): + norm_segments.append({"start": float(s.start), "end": float(s.end), "text": getattr(s, "text", "")}) + else: + norm_segments.append({"start": float(s.get("start", 0.0)), "end": float(s.get("end", 0.0)), "text": s.get("text", "")}) + + # crear carpeta temporal para segmentos TTS + with tempfile.TemporaryDirectory() as tmpdir: + tts_segment_paths = [] + for i, seg in enumerate(norm_segments): + start = float(seg.get("start", 0.0)) + end = float(seg.get("end", start)) + dur = max(0.001, end - start) + text = (seg.get("text") or "").strip() + + out_seg = os.path.join(tmpdir, f"seg_{i:04d}.wav") + + if not text: + # crear silencio de duración dur + try: + cmd = [ + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + f"anullsrc=channel_layout=mono:sample_rate=16000", + "-t", + f"{dur}", + "-ar", + "16000", + "-ac", + "1", + out_seg, + ] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + tts_segment_paths.append(out_seg) + except Exception: + return False + continue + + ok = synthesize_segment_tts(text, tts_model, dur, out_seg) + if not ok: + return False + tts_segment_paths.append(out_seg) + + # crear lista de concatenación + concat_list = os.path.join(tmpdir, "concat.txt") + with open(concat_list, "w", encoding="utf-8") as f: + for p in tts_segment_paths: + f.write(f"file '{p}'\n") + + # concatenar segmentos en un WAV final temporal + final_tmp = os.path.join(tmpdir, "tts_full.wav") + try: + cmd = [ + "ffmpeg", + "-y", + "-f", + "concat", + "-safe", + "0", + "-i", + concat_list, + "-c", + "copy", + final_tmp, + ] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except Exception: + return False + + # si el modo es replace, mover final_tmp a out_path (con conversión si es necesario) + try: + if mode == "replace": + # convertir a WAV 16k mono si no lo está + cmd = [ + "ffmpeg", + "-y", + "-i", + final_tmp, + "-ar", + "16000", + "-ac", + "1", + out_path, + ] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return True + + # modo mix: mezclar pista TTS con la original en out_path + # ajustar volumen del TTS + # ffmpeg -i original -i tts -filter_complex "[1:a]volume=LEVEL[a1];[0:a][a1]amix=inputs=2:normalize=0[out]" -map "[out]" out.wav + tts_level = float(max(0.0, min(1.0, mix_level))) + cmd = [ + "ffmpeg", + "-y", + "-i", + src_audio, + "-i", + final_tmp, + "-filter_complex", + f"[1:a]volume={tts_level}[a1];[0:a][a1]amix=inputs=2:duration=longest:dropout_transition=0", + "-ar", + "16000", + "-ac", + "1", + out_path, + ] + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return True + except Exception: + return False + + +if __name__ == "__main__": + main() + diff --git a/whisper_project/translate_srt_with_gemini.py b/whisper_project/translate_srt_with_gemini.py new file mode 100644 index 0000000..8d822f2 --- /dev/null +++ b/whisper_project/translate_srt_with_gemini.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""translate_srt_with_gemini.py +Lee un .srt, traduce cada bloque de texto con Gemini (Google Generative API) y +escribe un nuevo .srt manteniendo índices y timestamps. + +Uso: + export GEMINI_API_KEY="..." + .venv/bin/python whisper_project/translate_srt_with_gemini.py \ + --in whisper_project/dailyrutines.kokoro.dub.srt \ + --out whisper_project/dailyrutines.kokoro.dub.es.srt \ + --model gemini-2.5-flash + +Si no pasas --gemini-api-key, se usará la variable de entorno GEMINI_API_KEY. +""" +import argparse +import json +import os +import time +from typing import List + +import requests +import srt +# Intentar usar la librería oficial si está instalada (mejor compatibilidad) +try: + import google.generativeai as genai # type: ignore +except Exception: + genai = None + + +def translate_text_google_gl(text: str, api_key: str, model: str = "gemini-2.5-flash") -> str: + """Llamada a la API Generative Language de Google (generateContent). + Devuelve el texto traducido (o el texto original si falla). + """ + if not api_key: + raise ValueError("gemini api key required") + # Si la librería oficial está disponible, usarla (maneja internamente los endpoints) + if genai is not None: + try: + genai.configure(api_key=api_key) + model_obj = genai.GenerativeModel(model) + # la librería acepta un prompt simple o lista; pedimos texto traducido explícitamente + prompt = f"Traduce al español el siguiente texto y devuelve solo el texto traducido:\n\n{text}" + resp = model_obj.generate_content(prompt, generation_config={"max_output_tokens": 1024, "temperature": 0.0}) + # resp.text está disponible en la respuesta wrapper + if hasattr(resp, "text") and resp.text: + return resp.text.strip() + # fallback: revisar candidates + if hasattr(resp, "candidates") and resp.candidates: + c = resp.candidates[0] + if hasattr(c, "content") and hasattr(c.content, "parts"): + parts = [p.text for p in c.content.parts if getattr(p, "text", None)] + if parts: + return "\n".join(parts).strip() + except Exception as e: + print(f"Warning: genai library translate failed: {e}") + + # Fallback HTTP (legacy/path-variant). Intentamos v1 y v1beta2 según disponibilidad. + for prefix in ("v1", "v1beta2"): + endpoint = ( + f"https://generativelanguage.googleapis.com/{prefix}/models/{model}:generateContent?key={api_key}" + ) + body = { + "prompt": {"text": f"Traduce al español el siguiente texto y devuelve solo el texto traducido:\n\n{text}"}, + "maxOutputTokens": 1024, + "temperature": 0.0, + "candidateCount": 1, + } + try: + r = requests.post(endpoint, json=body, timeout=30) + r.raise_for_status() + j = r.json() + # buscar candidatos + if isinstance(j, dict) and "candidates" in j and isinstance(j["candidates"], list) and j["candidates"]: + first = j["candidates"][0] + if isinstance(first, dict): + if "content" in first and isinstance(first["content"], str): + return first["content"].strip() + if "output" in first and isinstance(first["output"], str): + return first["output"].strip() + if "content" in first and isinstance(first["content"], list): + parts = [] + for c in first["content"]: + if isinstance(c, dict) and isinstance(c.get("text"), str): + parts.append(c.get("text")) + if parts: + return "\n".join(parts).strip() + for key in ("output_text", "text", "response", "translated_text"): + if key in j and isinstance(j[key], str): + return j[key].strip() + except Exception as e: + print(f"Warning: GL translate failed ({prefix}): {e}") + + return text + + +def translate_srt_file(in_path: str, out_path: str, api_key: str, model: str): + with open(in_path, "r", encoding="utf-8") as fh: + subs = list(srt.parse(fh.read())) + + for i, sub in enumerate(subs, start=1): + text = sub.content.strip() + if not text: + continue + # llamar a la API + try: + translated = translate_text_google_gl(text, api_key, model=model) + except Exception as e: + print(f"Warning: translate failed for index {sub.index}: {e}") + translated = text + # asignar traducido + sub.content = translated + # pequeño delay para no golpear la API demasiado rápido + time.sleep(0.15) + print(f"Translated {i}/{len(subs)}") + + out_s = srt.compose(subs) + with open(out_path, "w", encoding="utf-8") as fh: + fh.write(out_s) + print(f"Wrote translated SRT to: {out_path}") + + +def main(): + p = argparse.ArgumentParser() + p.add_argument("--in", dest="in_srt", required=True) + p.add_argument("--out", dest="out_srt", required=True) + p.add_argument("--gemini-api-key", default=None) + p.add_argument("--model", default="gemini-2.5-flash") + args = p.parse_args() + + key = args.gemini_api_key or os.environ.get("GEMINI_API_KEY") + if not key: + print("Provide --gemini-api-key or set GEMINI_API_KEY env var", flush=True) + raise SystemExit(2) + + translate_srt_file(args.in_srt, args.out_srt, key, args.model) + + +if __name__ == '__main__': + main()