#!/usr/bin/env python3 """Transcribe audio usando distintos backends de Whisper. Soportados: openai-whisper, transformers, faster-whisper """ import argparse import sys from pathlib import Path def transcribe_openai_whisper(file: str, model: str): import whisper print(f"Cargando openai-whisper modelo={model} en CPU...") m = whisper.load_model(model, device="cpu") print("Transcribiendo...") result = m.transcribe(file, fp16=False) # openai-whisper devuelve 'segments' con start, end y text segments = result.get("segments", None) if segments: for seg in segments: print(seg.get("text", "")) return segments else: print(result.get("text", "")) return None def transcribe_transformers(file: str, model: str): import torch from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline device = "cpu" torch_dtype = torch.float32 print(f"Cargando transformers modelo={model} en CPU...") model_obj = AutoModelForSpeechSeq2Seq.from_pretrained(model, torch_dtype=torch_dtype, low_cpu_mem_usage=True) model_obj.to(device) processor = AutoProcessor.from_pretrained(model) pipe = pipeline( "automatic-speech-recognition", model=model_obj, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, device=-1, ) print("Transcribiendo...") result = pipe(file) # result puede ser dict o str dependiendo de la versión if isinstance(result, dict): print(result.get("text", "")) else: print(result) # transformers pipeline normalmente no devuelve segmentos temporales return None def transcribe_faster_whisper(file: str, model: str, compute_type: str = "int8"): from faster_whisper import WhisperModel print(f"Cargando faster-whisper modelo={model} en CPU compute_type={compute_type}...") model_obj = WhisperModel(model, device="cpu", compute_type=compute_type) print("Transcribiendo...") segments_gen, info = model_obj.transcribe(file, beam_size=5) # faster-whisper may return a generator; convert to list to allow multiple passes segments = list(segments_gen) text = "".join([seg.text for seg in segments]) print(text) # segments es una lista de objetos con .start, .end, .text return segments def main(): parser = argparse.ArgumentParser( description="Transcribe audio usando Whisper (varios backends)" ) parser.add_argument( "--file", "-f", required=True, help="Ruta al archivo de audio" ) parser.add_argument( "--backend", "-b", choices=["openai-whisper", "transformers", "faster-whisper"], default="faster-whisper", help="Backend a usar", ) parser.add_argument( "--model", "-m", default="base", help="Nombre del modelo (ej: tiny, base)", ) parser.add_argument( "--compute-type", "-c", default="int8", help="compute_type para faster-whisper", ) parser.add_argument( "--srt", action="store_true", help="Generar archivo SRT con timestamps (si el backend lo soporta)", ) parser.add_argument( "--srt-file", default=None, help=( "Ruta del archivo SRT de salida. Por defecto: mismo nombre" " que el audio con extensión .srt" ), ) parser.add_argument( "--srt-fallback", action="store_true", help=( "Generar SRT aproximado si backend no devuelve segmentos." ), ) parser.add_argument( "--segment-transcribe", action="store_true", help=( "Cuando se usa --srt-fallback, transcribir cada segmento usando" " archivos temporales para rellenar el texto" ), ) parser.add_argument( "--segment-overlap", type=float, default=0.2, help=( "Superposición en segundos entre segmentos al transcribir por" " segmentos (por defecto: 0.2)" ), ) parser.add_argument( "--srt-segment-seconds", type=float, default=10.0, help=( "Duración en segundos de cada segmento para el SRT de fallback." " Por defecto: 10.0" ), ) parser.add_argument( "--tts", action="store_true", help="Generar audio TTS a partir del texto transcrito", ) parser.add_argument( "--tts-model", default="kokoro", help="Nombre del modelo TTS a usar (ej: kokoro)", ) parser.add_argument( "--tts-model-repo", default=None, help=( "Repo de Hugging Face para el modelo TTS (ej: user/kokoro)." " Si se especifica, se descargará automáticamente." ), ) parser.add_argument( "--dub", action="store_true", help=( "Generar pista doblada (por segmentos) a partir del texto transcrito" ), ) parser.add_argument( "--dub-out", default=None, help=("Ruta de salida para el audio doblado (WAV). Por defecto: mismo nombre + .dub.wav"), ) parser.add_argument( "--dub-mode", choices=["replace", "mix"], default="replace", help=("Modo de doblaje: 'replace' reemplaza voz original por TTS; 'mix' mezcla ambas pistas"), ) parser.add_argument( "--dub-mix-level", type=float, default=0.75, help=("Cuando --dub-mode=mix, nivel de volumen del TTS relativo (0-1)."), ) args = parser.parse_args() path = Path(args.file) if not path.exists(): print(f"Archivo no encontrado: {args.file}", file=sys.stderr) sys.exit(2) # Shortcut: si el usuario solo quiere SRT de fallback sin transcribir # por segmentos, no necesitamos cargar ningún backend (evita errores # si faster-whisper/whisper no están instalados). if args.srt and args.srt_fallback and not args.segment_transcribe: duration = get_audio_duration(args.file) if duration is None: print( "No se pudo obtener duración; no se puede generar SRT de fallback.", file=sys.stderr, ) sys.exit(4) fallback_segments = make_uniform_segments(duration, args.srt_segment_seconds) srt_file_arg = args.srt_file srt_path = ( srt_file_arg if srt_file_arg else str(path.with_suffix('.srt')) ) # crear segmentos vacíos filled_segments = [ {"start": s["start"], "end": s["end"], "text": ""} for s in fallback_segments ] write_srt(filled_segments, srt_path) print(f"SRT de fallback guardado en: {srt_path}") sys.exit(0) try: segments = None if args.backend == "openai-whisper": segments = transcribe_openai_whisper(args.file, args.model) elif args.backend == "transformers": segments = transcribe_transformers(args.file, args.model) else: segments = transcribe_faster_whisper( args.file, args.model, compute_type=args.compute_type ) # Si se pide SRT y tenemos segmentos, escribir archivo SRT if args.srt: if segments: # determinar nombre del srt # determinar nombre del srt srt_file_arg = args.srt_file srt_path = ( srt_file_arg if srt_file_arg else str(path.with_suffix('.srt')) ) segments_to_write = dedupe_adjacent_segments(segments) write_srt(segments_to_write, srt_path) print(f"SRT guardado en: {srt_path}") else: if args.srt_fallback: # intentar generar SRT aproximado duration = get_audio_duration(args.file) if duration is None: print( "No se pudo obtener duración;" " no se puede generar SRT de fallback.", file=sys.stderr, ) sys.exit(4) fallback_segments = make_uniform_segments( duration, args.srt_segment_seconds ) # Para cada segmento intentamos obtener transcripción # parcial. filled_segments = [] if args.segment_transcribe: # extraer cada segmento a un archivo temporal # y transcribir filled = transcribe_segmented_with_tempfiles( args.file, fallback_segments, backend=args.backend, model=args.model, compute_type=args.compute_type, overlap=args.segment_overlap, ) filled_segments = filled else: for seg in fallback_segments: seg_obj = { "start": seg["start"], "end": seg["end"], "text": "", } filled_segments.append(seg_obj) srt_file_arg = args.srt_file srt_path = ( srt_file_arg if srt_file_arg else str(path.with_suffix('.srt')) ) segments_to_write = dedupe_adjacent_segments( filled_segments ) write_srt(segments_to_write, srt_path) print(f"SRT de fallback guardado en: {srt_path}") print( "Nota: para SRT con texto, habilite transcripción" " por segmento o use un backend que devuelva" " segmentos." ) sys.exit(0) else: print( "El backend elegido no devolvió segmentos temporales;" " no se puede generar SRT.", file=sys.stderr, ) sys.exit(3) except Exception as e: print(f"Error durante la transcripción: {e}", file=sys.stderr) sys.exit(1) # Bloque TTS: sintetizar texto completo si se solicitó if args.tts: # si se especificó un repo, asegurar modelo descargado if args.tts_model_repo: model_path = ensure_tts_model(args.tts_model_repo) # usar la ruta local como modelo args.tts_model = model_path all_text = None if segments: all_text = "\n".join( [ s.get("text", "") if isinstance(s, dict) else s.text for s in segments ] ) if all_text: tts_out = str(path.with_suffix(".tts.wav")) ok = tts_synthesize( all_text, tts_out, model=args.tts_model ) if ok: print(f"TTS guardado en: {tts_out}") else: print( "Error al sintetizar TTS; comprueba dependencias.", file=sys.stderr, ) sys.exit(5) # Bloque de doblaje por segmentos: sintetizar cada segmento y generar # un archivo WAV concatenado con la pista doblada. El audio resultante # mantiene la duración de los segmentos originales (paddings/recortes # simples) para poder reemplazar o mezclar con la pista original. if args.dub: # decidir ruta de salida dub_out = ( args.dub_out if args.dub_out else str(Path(args.file).with_suffix(".dub.wav")) ) # si no tenemos segmentos, intentar fallback con transcripción por segmentos use_segments = segments if not use_segments: duration = get_audio_duration(args.file) if duration is None: print( "No se pudo obtener la duración del audio; no se puede doblar.", file=sys.stderr, ) sys.exit(6) fallback_segments = make_uniform_segments(duration, args.srt_segment_seconds) if args.segment_transcribe: print("Obteniendo transcripciones por segmento para doblaje...") use_segments = transcribe_segmented_with_tempfiles( args.file, fallback_segments, backend=args.backend, model=args.model, compute_type=args.compute_type, overlap=args.segment_overlap, ) else: # crear segmentos vacíos (no tiene texto) use_segments = [ {"start": s["start"], "end": s["end"], "text": ""} for s in fallback_segments ] # asegurar modelo TTS local si se indicó repo if args.tts_model_repo: model_path = ensure_tts_model(args.tts_model_repo) args.tts_model = model_path ok = synthesize_dubbed_audio( src_audio=args.file, segments=use_segments, tts_model=args.tts_model, out_path=dub_out, mode=args.dub_mode, mix_level=args.dub_mix_level, ) if ok: print(f"Audio doblado guardado en: {dub_out}") else: print("Error generando audio doblado.", file=sys.stderr) sys.exit(7) def _format_timestamp(seconds: float) -> str: """Formatea segundos en timestamp SRT hh:mm:ss,mmm""" millis = int((seconds - int(seconds)) * 1000) h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) return f"{h:02d}:{m:02d}:{s:02d},{millis:03d}" def write_srt(segments, out_path: str): """Escribe una lista de segmentos en formato SRT. segments: iterable de objetos o dicts con .start, .end y .text """ lines = [] for i, seg in enumerate(segments, start=1): # soportar objetos con atributos o dicts if hasattr(seg, "start"): start = float(seg.start) end = float(seg.end) text = seg.text if hasattr(seg, "text") else str(seg) else: start = float(seg.get("start", 0.0)) end = float(seg.get("end", 0.0)) text = seg.get("text", "") start_ts = _format_timestamp(start) end_ts = _format_timestamp(end) lines.append(str(i)) lines.append(f"{start_ts} --> {end_ts}") # normalize text newlines for line in str(text).strip().splitlines(): lines.append(line) lines.append("") Path(out_path).write_text("\n".join(lines), encoding="utf-8") def dedupe_adjacent_segments(segments): """Eliminar duplicados simples entre segmentos adyacentes. Estrategia simple: si el final de un segmento y el inicio del siguiente comparten una secuencia de palabras, eliminamos la duplicación del inicio del siguiente. """ if not segments: return segments # Normalize incoming segments to a list of dicts with keys start,end,text norm = [] for s in segments: if hasattr(s, "start"): norm.append({"start": float(s.start), "end": float(s.end), "text": getattr(s, "text", "")}) else: # assume mapping-like norm.append({"start": float(s.get("start", 0.0)), "end": float(s.get("end", 0.0)), "text": s.get("text", "")}) out = [norm[0].copy()] for seg in norm[1:]: prev = out[-1] a = (prev.get("text") or "").strip() b = (seg.get("text") or "").strip() if not a or not b: out.append(seg.copy()) continue # tokenizar en palabras (espacios) y buscar la mayor superposición a_words = a.split() b_words = b.split() max_ol = 0 max_k = min(len(a_words), len(b_words), 10) for k in range(1, max_k + 1): if a_words[-k:] == b_words[:k]: max_ol = k if max_ol > 0: # quitar las primeras max_ol palabras de b new_b = " ".join(b_words[max_ol:]).strip() new_seg = seg.copy() new_seg["text"] = new_b out.append(new_seg) else: out.append(seg.copy()) return out def get_audio_duration(file_path: str): """Obtiene la duración del audio en segundos usando ffprobe. Devuelve float (segundos) o None si no se puede obtener. """ try: import subprocess cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path, ] out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) return float(out.strip()) except Exception: return None def make_uniform_segments(duration: float, seg_seconds: float): """Genera una lista de segmentos uniformes [{start, end}, ...].""" segments = [] if duration <= 0 or seg_seconds <= 0: return segments start = 0.0 idx = 0 while start < duration: end = min(start + seg_seconds, duration) segments.append({"start": round(start, 3), "end": round(end, 3)}) idx += 1 start = end return segments def transcribe_segmented_with_tempfiles( src_file: str, segments: list, backend: str = "faster-whisper", model: str = "base", compute_type: str = "int8", overlap: float = 0.2, ): """Recorta `src_file` en segmentos y transcribe cada uno. Retorna lista de dicts {'start','end','text'} para cada segmento. """ import subprocess import tempfile results = [] for seg in segments: start = max(0.0, float(seg["start"]) - overlap) end = float(seg["end"]) + overlap duration = end - start with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp: tmp_path = tmp.name cmd = [ "ffmpeg", "-y", "-ss", str(start), "-t", str(duration), "-i", src_file, "-ar", "16000", "-ac", "1", tmp_path, ] try: subprocess.run( cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except Exception: # si falla el recorte, dejar texto vacío results.append( {"start": seg["start"], "end": seg["end"], "text": ""} ) continue # transcribir tmp_path con el backend try: if backend == "openai-whisper": import whisper m = whisper.load_model(model, device="cpu") res = m.transcribe(tmp_path, fp16=False) text = res.get("text", "") elif backend == "transformers": # pipeline de transformers import torch from transformers import ( AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline, ) torch_dtype = torch.float32 model_obj = AutoModelForSpeechSeq2Seq.from_pretrained( model, torch_dtype=torch_dtype, low_cpu_mem_usage=True ) model_obj.to("cpu") processor = AutoProcessor.from_pretrained(model) pipe = pipeline( "automatic-speech-recognition", model=model_obj, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, device=-1, ) out = pipe(tmp_path) text = out["text"] if isinstance(out, dict) else str(out) else: # faster-whisper from faster_whisper import WhisperModel wmodel = WhisperModel( model, device="cpu", compute_type=compute_type ) segs_gen, info = wmodel.transcribe(tmp_path, beam_size=5) segs = list(segs_gen) text = "".join([s.text for s in segs]) except Exception: text = "" results.append( {"start": seg["start"], "end": seg["end"], "text": text} ) return results def tts_synthesize(text: str, out_path: str, model: str = "kokoro"): """Sintetiza `text` a `out_path` usando Coqui TTS si está disponible, o pyttsx3 como fallback simple. """ try: # Intentar Coqui TTS from TTS.api import TTS # El usuario debe tener el modelo descargado o especificar el id tts = TTS(model_name=model, progress_bar=False, gpu=False) tts.tts_to_file(text=text, file_path=out_path) return True except Exception: try: # Fallback a pyttsx3 (menos natural, offline) import pyttsx3 engine = pyttsx3.init() engine.save_to_file(text, out_path) engine.runAndWait() return True except Exception: return False def ensure_tts_model(repo_id: str): """Descarga un repo de Hugging Face y devuelve la ruta local. Usa huggingface_hub.snapshot_download. Si la descarga falla, devuelve el repo_id tal cual (se intentará usar como id remoto). """ try: from huggingface_hub import snapshot_download print(f"Descargando modelo TTS desde: {repo_id} ...") try: # intentar descarga explícita como 'model' (útil para ids con '/'). local_dir = snapshot_download(repo_id, repo_type="model") except Exception: # fallback al comportamiento por defecto local_dir = snapshot_download(repo_id) print(f"Modelo descargado en: {local_dir}") return local_dir except Exception as e: print(f"No se pudo descargar el modelo {repo_id}: {e}") return repo_id def _pad_or_trim_wav(in_path: str, out_path: str, target_duration: float): """Pad or trim `in_path` WAV to `target_duration` seconds using ffmpeg. Creates `out_path` with exactly target_duration seconds. If input is shorter, pads with silence; if longer, trims. """ import subprocess # ffmpeg -y -i in.wav -af apad=pad_dur=...,atrim=duration=... -ar 16000 -ac 1 out.wav try: # Use apad then atrim to ensure exact duration cmd = [ "ffmpeg", "-y", "-i", in_path, "-af", f"apad=pad_dur={max(0, target_duration)}", "-t", f"{target_duration}", "-ar", "16000", "-ac", "1", out_path, ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except Exception: return False def synthesize_segment_tts(text: str, model: str, dur: float, out_wav: str) -> bool: """Sintetiza `text` en `out_wav` y ajusta su duración a `dur` segundos. - Primero genera un WAV temporal con `tts_synthesize`. - Luego lo pad/recorta a `dur` usando ffmpeg. """ import tempfile import os try: with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name ok = tts_synthesize(text, tmp_path, model=model) if not ok: # cleanup try: os.remove(tmp_path) except Exception: pass return False # ajustar duración adjusted = _pad_or_trim_wav(tmp_path, out_wav, target_duration=dur) try: os.remove(tmp_path) except Exception: pass return adjusted except Exception: return False def synthesize_dubbed_audio( src_audio: str, segments: list, tts_model: str, out_path: str, mode: str = "replace", mix_level: float = 0.75, ): """Genera una pista doblada a partir de `segments` y el audio fuente. - segments: lista de dicts con 'start','end','text' (en segundos). - mode: 'replace' (devuelve solo TTS concatenado) o 'mix' (mezcla TTS y original). - mix_level: volumen relativo del TTS cuando se mezcla (0-1). Retorna True si se generó correctamente `out_path`. """ import tempfile import os import subprocess # Normalizar segmentos a lista de dicts {'start','end','text'} norm_segments = [] for s in segments: if hasattr(s, "start"): norm_segments.append({"start": float(s.start), "end": float(s.end), "text": getattr(s, "text", "")}) else: norm_segments.append({"start": float(s.get("start", 0.0)), "end": float(s.get("end", 0.0)), "text": s.get("text", "")}) # crear carpeta temporal para segmentos TTS with tempfile.TemporaryDirectory() as tmpdir: tts_segment_paths = [] for i, seg in enumerate(norm_segments): start = float(seg.get("start", 0.0)) end = float(seg.get("end", start)) dur = max(0.001, end - start) text = (seg.get("text") or "").strip() out_seg = os.path.join(tmpdir, f"seg_{i:04d}.wav") if not text: # crear silencio de duración dur try: cmd = [ "ffmpeg", "-y", "-f", "lavfi", "-i", f"anullsrc=channel_layout=mono:sample_rate=16000", "-t", f"{dur}", "-ar", "16000", "-ac", "1", out_seg, ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) tts_segment_paths.append(out_seg) except Exception: return False continue ok = synthesize_segment_tts(text, tts_model, dur, out_seg) if not ok: return False tts_segment_paths.append(out_seg) # crear lista de concatenación concat_list = os.path.join(tmpdir, "concat.txt") with open(concat_list, "w", encoding="utf-8") as f: for p in tts_segment_paths: f.write(f"file '{p}'\n") # concatenar segmentos en un WAV final temporal final_tmp = os.path.join(tmpdir, "tts_full.wav") try: cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_list, "-c", "copy", final_tmp, ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception: return False # si el modo es replace, mover final_tmp a out_path (con conversión si es necesario) try: if mode == "replace": # convertir a WAV 16k mono si no lo está cmd = [ "ffmpeg", "-y", "-i", final_tmp, "-ar", "16000", "-ac", "1", out_path, ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True # modo mix: mezclar pista TTS con la original en out_path # ajustar volumen del TTS # ffmpeg -i original -i tts -filter_complex "[1:a]volume=LEVEL[a1];[0:a][a1]amix=inputs=2:normalize=0[out]" -map "[out]" out.wav tts_level = float(max(0.0, min(1.0, mix_level))) cmd = [ "ffmpeg", "-y", "-i", src_audio, "-i", final_tmp, "-filter_complex", f"[1:a]volume={tts_level}[a1];[0:a][a1]amix=inputs=2:duration=longest:dropout_transition=0", "-ar", "16000", "-ac", "1", out_path, ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except Exception: return False if __name__ == "__main__": main()