submaster/whisper_project/transcribe.py
2025-10-23 21:54:13 -07:00

891 lines
29 KiB
Python

#!/usr/bin/env python3
"""Transcribe audio usando distintos backends de Whisper.
Soportados: openai-whisper, transformers, faster-whisper
"""
import argparse
import sys
from pathlib import Path
def transcribe_openai_whisper(file: str, model: str):
import whisper
print(f"Cargando openai-whisper modelo={model} en CPU...")
m = whisper.load_model(model, device="cpu")
print("Transcribiendo...")
result = m.transcribe(file, fp16=False)
# openai-whisper devuelve 'segments' con start, end y text
segments = result.get("segments", None)
if segments:
for seg in segments:
print(seg.get("text", ""))
return segments
else:
print(result.get("text", ""))
return None
def transcribe_transformers(file: str, model: str):
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
device = "cpu"
torch_dtype = torch.float32
print(f"Cargando transformers modelo={model} en CPU...")
model_obj = AutoModelForSpeechSeq2Seq.from_pretrained(model, torch_dtype=torch_dtype, low_cpu_mem_usage=True)
model_obj.to(device)
processor = AutoProcessor.from_pretrained(model)
pipe = pipeline(
"automatic-speech-recognition",
model=model_obj,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
device=-1,
)
print("Transcribiendo...")
result = pipe(file)
# result puede ser dict o str dependiendo de la versión
if isinstance(result, dict):
print(result.get("text", ""))
else:
print(result)
# transformers pipeline normalmente no devuelve segmentos temporales
return None
def transcribe_faster_whisper(file: str, model: str, compute_type: str = "int8"):
from faster_whisper import WhisperModel
print(f"Cargando faster-whisper modelo={model} en CPU compute_type={compute_type}...")
model_obj = WhisperModel(model, device="cpu", compute_type=compute_type)
print("Transcribiendo...")
segments_gen, info = model_obj.transcribe(file, beam_size=5)
# faster-whisper may return a generator; convert to list to allow multiple passes
segments = list(segments_gen)
text = "".join([seg.text for seg in segments])
print(text)
# segments es una lista de objetos con .start, .end, .text
return segments
def main():
parser = argparse.ArgumentParser(
description="Transcribe audio usando Whisper (varios backends)"
)
parser.add_argument(
"--file", "-f", required=True, help="Ruta al archivo de audio"
)
parser.add_argument(
"--backend",
"-b",
choices=["openai-whisper", "transformers", "faster-whisper"],
default="faster-whisper",
help="Backend a usar",
)
parser.add_argument(
"--model",
"-m",
default="base",
help="Nombre del modelo (ej: tiny, base)",
)
parser.add_argument(
"--compute-type",
"-c",
default="int8",
help="compute_type para faster-whisper",
)
parser.add_argument(
"--srt",
action="store_true",
help="Generar archivo SRT con timestamps (si el backend lo soporta)",
)
parser.add_argument(
"--srt-file",
default=None,
help=(
"Ruta del archivo SRT de salida. Por defecto: mismo nombre"
" que el audio con extensión .srt"
),
)
parser.add_argument(
"--srt-fallback",
action="store_true",
help=(
"Generar SRT aproximado si backend no devuelve segmentos."
),
)
parser.add_argument(
"--segment-transcribe",
action="store_true",
help=(
"Cuando se usa --srt-fallback, transcribir cada segmento usando"
" archivos temporales para rellenar el texto"
),
)
parser.add_argument(
"--segment-overlap",
type=float,
default=0.2,
help=(
"Superposición en segundos entre segmentos al transcribir por"
" segmentos (por defecto: 0.2)"
),
)
parser.add_argument(
"--srt-segment-seconds",
type=float,
default=10.0,
help=(
"Duración en segundos de cada segmento para el SRT de fallback."
" Por defecto: 10.0"
),
)
parser.add_argument(
"--tts",
action="store_true",
help="Generar audio TTS a partir del texto transcrito",
)
parser.add_argument(
"--tts-model",
default="kokoro",
help="Nombre del modelo TTS a usar (ej: kokoro)",
)
parser.add_argument(
"--tts-model-repo",
default=None,
help=(
"Repo de Hugging Face para el modelo TTS (ej: user/kokoro)."
" Si se especifica, se descargará automáticamente."
),
)
parser.add_argument(
"--dub",
action="store_true",
help=(
"Generar pista doblada (por segmentos) a partir del texto transcrito"
),
)
parser.add_argument(
"--dub-out",
default=None,
help=("Ruta de salida para el audio doblado (WAV). Por defecto: mismo nombre + .dub.wav"),
)
parser.add_argument(
"--dub-mode",
choices=["replace", "mix"],
default="replace",
help=("Modo de doblaje: 'replace' reemplaza voz original por TTS; 'mix' mezcla ambas pistas"),
)
parser.add_argument(
"--dub-mix-level",
type=float,
default=0.75,
help=("Cuando --dub-mode=mix, nivel de volumen del TTS relativo (0-1)."),
)
args = parser.parse_args()
path = Path(args.file)
if not path.exists():
print(f"Archivo no encontrado: {args.file}", file=sys.stderr)
sys.exit(2)
# Shortcut: si el usuario solo quiere SRT de fallback sin transcribir
# por segmentos, no necesitamos cargar ningún backend (evita errores
# si faster-whisper/whisper no están instalados).
if args.srt and args.srt_fallback and not args.segment_transcribe:
duration = get_audio_duration(args.file)
if duration is None:
print(
"No se pudo obtener duración; no se puede generar SRT de fallback.",
file=sys.stderr,
)
sys.exit(4)
fallback_segments = make_uniform_segments(duration, args.srt_segment_seconds)
srt_file_arg = args.srt_file
srt_path = (
srt_file_arg
if srt_file_arg
else str(path.with_suffix('.srt'))
)
# crear segmentos vacíos
filled_segments = [
{"start": s["start"], "end": s["end"], "text": ""}
for s in fallback_segments
]
write_srt(filled_segments, srt_path)
print(f"SRT de fallback guardado en: {srt_path}")
sys.exit(0)
try:
segments = None
if args.backend == "openai-whisper":
segments = transcribe_openai_whisper(args.file, args.model)
elif args.backend == "transformers":
segments = transcribe_transformers(args.file, args.model)
else:
segments = transcribe_faster_whisper(
args.file, args.model, compute_type=args.compute_type
)
# Si se pide SRT y tenemos segmentos, escribir archivo SRT
if args.srt:
if segments:
# determinar nombre del srt
# determinar nombre del srt
srt_file_arg = args.srt_file
srt_path = (
srt_file_arg
if srt_file_arg
else str(path.with_suffix('.srt'))
)
segments_to_write = dedupe_adjacent_segments(segments)
write_srt(segments_to_write, srt_path)
print(f"SRT guardado en: {srt_path}")
else:
if args.srt_fallback:
# intentar generar SRT aproximado
duration = get_audio_duration(args.file)
if duration is None:
print(
"No se pudo obtener duración;"
" no se puede generar SRT de fallback.",
file=sys.stderr,
)
sys.exit(4)
fallback_segments = make_uniform_segments(
duration, args.srt_segment_seconds
)
# Para cada segmento intentamos obtener transcripción
# parcial.
filled_segments = []
if args.segment_transcribe:
# extraer cada segmento a un archivo temporal
# y transcribir
filled = transcribe_segmented_with_tempfiles(
args.file,
fallback_segments,
backend=args.backend,
model=args.model,
compute_type=args.compute_type,
overlap=args.segment_overlap,
)
filled_segments = filled
else:
for seg in fallback_segments:
seg_obj = {
"start": seg["start"],
"end": seg["end"],
"text": "",
}
filled_segments.append(seg_obj)
srt_file_arg = args.srt_file
srt_path = (
srt_file_arg
if srt_file_arg
else str(path.with_suffix('.srt'))
)
segments_to_write = dedupe_adjacent_segments(
filled_segments
)
write_srt(segments_to_write, srt_path)
print(f"SRT de fallback guardado en: {srt_path}")
print(
"Nota: para SRT con texto, habilite transcripción"
" por segmento o use un backend que devuelva"
" segmentos."
)
sys.exit(0)
else:
print(
"El backend elegido no devolvió segmentos temporales;"
" no se puede generar SRT.",
file=sys.stderr,
)
sys.exit(3)
except Exception as e:
print(f"Error durante la transcripción: {e}", file=sys.stderr)
sys.exit(1)
# Bloque TTS: sintetizar texto completo si se solicitó
if args.tts:
# si se especificó un repo, asegurar modelo descargado
if args.tts_model_repo:
model_path = ensure_tts_model(args.tts_model_repo)
# usar la ruta local como modelo
args.tts_model = model_path
all_text = None
if segments:
all_text = "\n".join(
[
s.get("text", "") if isinstance(s, dict) else s.text
for s in segments
]
)
if all_text:
tts_out = str(path.with_suffix(".tts.wav"))
ok = tts_synthesize(
all_text, tts_out, model=args.tts_model
)
if ok:
print(f"TTS guardado en: {tts_out}")
else:
print(
"Error al sintetizar TTS; comprueba dependencias.",
file=sys.stderr,
)
sys.exit(5)
# Bloque de doblaje por segmentos: sintetizar cada segmento y generar
# un archivo WAV concatenado con la pista doblada. El audio resultante
# mantiene la duración de los segmentos originales (paddings/recortes
# simples) para poder reemplazar o mezclar con la pista original.
if args.dub:
# decidir ruta de salida
dub_out = (
args.dub_out
if args.dub_out
else str(Path(args.file).with_suffix(".dub.wav"))
)
# si no tenemos segmentos, intentar fallback con transcripción por segmentos
use_segments = segments
if not use_segments:
duration = get_audio_duration(args.file)
if duration is None:
print(
"No se pudo obtener la duración del audio; no se puede doblar.",
file=sys.stderr,
)
sys.exit(6)
fallback_segments = make_uniform_segments(duration, args.srt_segment_seconds)
if args.segment_transcribe:
print("Obteniendo transcripciones por segmento para doblaje...")
use_segments = transcribe_segmented_with_tempfiles(
args.file,
fallback_segments,
backend=args.backend,
model=args.model,
compute_type=args.compute_type,
overlap=args.segment_overlap,
)
else:
# crear segmentos vacíos (no tiene texto)
use_segments = [
{"start": s["start"], "end": s["end"], "text": ""}
for s in fallback_segments
]
# asegurar modelo TTS local si se indicó repo
if args.tts_model_repo:
model_path = ensure_tts_model(args.tts_model_repo)
args.tts_model = model_path
ok = synthesize_dubbed_audio(
src_audio=args.file,
segments=use_segments,
tts_model=args.tts_model,
out_path=dub_out,
mode=args.dub_mode,
mix_level=args.dub_mix_level,
)
if ok:
print(f"Audio doblado guardado en: {dub_out}")
else:
print("Error generando audio doblado.", file=sys.stderr)
sys.exit(7)
def _format_timestamp(seconds: float) -> str:
"""Formatea segundos en timestamp SRT hh:mm:ss,mmm"""
millis = int((seconds - int(seconds)) * 1000)
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
return f"{h:02d}:{m:02d}:{s:02d},{millis:03d}"
def write_srt(segments, out_path: str):
"""Escribe una lista de segmentos en formato SRT.
segments: iterable de objetos o dicts con .start, .end y .text
"""
lines = []
for i, seg in enumerate(segments, start=1):
# soportar objetos con atributos o dicts
if hasattr(seg, "start"):
start = float(seg.start)
end = float(seg.end)
text = seg.text if hasattr(seg, "text") else str(seg)
else:
start = float(seg.get("start", 0.0))
end = float(seg.get("end", 0.0))
text = seg.get("text", "")
start_ts = _format_timestamp(start)
end_ts = _format_timestamp(end)
lines.append(str(i))
lines.append(f"{start_ts} --> {end_ts}")
# normalize text newlines
for line in str(text).strip().splitlines():
lines.append(line)
lines.append("")
Path(out_path).write_text("\n".join(lines), encoding="utf-8")
def dedupe_adjacent_segments(segments):
"""Eliminar duplicados simples entre segmentos adyacentes.
Estrategia simple: si el final de un segmento y el inicio del
siguiente comparten una secuencia de palabras, eliminamos la
duplicación del inicio del siguiente.
"""
if not segments:
return segments
# Normalize incoming segments to a list of dicts with keys start,end,text
norm = []
for s in segments:
if hasattr(s, "start"):
norm.append({"start": float(s.start), "end": float(s.end), "text": getattr(s, "text", "")})
else:
# assume mapping-like
norm.append({"start": float(s.get("start", 0.0)), "end": float(s.get("end", 0.0)), "text": s.get("text", "")})
out = [norm[0].copy()]
for seg in norm[1:]:
prev = out[-1]
a = (prev.get("text") or "").strip()
b = (seg.get("text") or "").strip()
if not a or not b:
out.append(seg.copy())
continue
# tokenizar en palabras (espacios) y buscar la mayor superposición
a_words = a.split()
b_words = b.split()
max_ol = 0
max_k = min(len(a_words), len(b_words), 10)
for k in range(1, max_k + 1):
if a_words[-k:] == b_words[:k]:
max_ol = k
if max_ol > 0:
# quitar las primeras max_ol palabras de b
new_b = " ".join(b_words[max_ol:]).strip()
new_seg = seg.copy()
new_seg["text"] = new_b
out.append(new_seg)
else:
out.append(seg.copy())
return out
def get_audio_duration(file_path: str):
"""Obtiene la duración del audio en segundos usando ffprobe.
Devuelve float (segundos) o None si no se puede obtener.
"""
try:
import subprocess
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
]
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
return float(out.strip())
except Exception:
return None
def make_uniform_segments(duration: float, seg_seconds: float):
"""Genera una lista de segmentos uniformes [{start, end}, ...]."""
segments = []
if duration <= 0 or seg_seconds <= 0:
return segments
start = 0.0
idx = 0
while start < duration:
end = min(start + seg_seconds, duration)
segments.append({"start": round(start, 3), "end": round(end, 3)})
idx += 1
start = end
return segments
def transcribe_segmented_with_tempfiles(
src_file: str,
segments: list,
backend: str = "faster-whisper",
model: str = "base",
compute_type: str = "int8",
overlap: float = 0.2,
):
"""Recorta `src_file` en segmentos y transcribe cada uno.
Retorna lista de dicts {'start','end','text'} para cada segmento.
"""
import subprocess
import tempfile
results = []
for seg in segments:
start = max(0.0, float(seg["start"]) - overlap)
end = float(seg["end"]) + overlap
duration = end - start
with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp:
tmp_path = tmp.name
cmd = [
"ffmpeg",
"-y",
"-ss",
str(start),
"-t",
str(duration),
"-i",
src_file,
"-ar",
"16000",
"-ac",
"1",
tmp_path,
]
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except Exception:
# si falla el recorte, dejar texto vacío
results.append(
{"start": seg["start"], "end": seg["end"], "text": ""}
)
continue
# transcribir tmp_path con el backend
try:
if backend == "openai-whisper":
import whisper
m = whisper.load_model(model, device="cpu")
res = m.transcribe(tmp_path, fp16=False)
text = res.get("text", "")
elif backend == "transformers":
# pipeline de transformers
import torch
from transformers import (
AutoModelForSpeechSeq2Seq,
AutoProcessor,
pipeline,
)
torch_dtype = torch.float32
model_obj = AutoModelForSpeechSeq2Seq.from_pretrained(
model, torch_dtype=torch_dtype, low_cpu_mem_usage=True
)
model_obj.to("cpu")
processor = AutoProcessor.from_pretrained(model)
pipe = pipeline(
"automatic-speech-recognition",
model=model_obj,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
device=-1,
)
out = pipe(tmp_path)
text = out["text"] if isinstance(out, dict) else str(out)
else:
# faster-whisper
from faster_whisper import WhisperModel
wmodel = WhisperModel(
model, device="cpu", compute_type=compute_type
)
segs_gen, info = wmodel.transcribe(tmp_path, beam_size=5)
segs = list(segs_gen)
text = "".join([s.text for s in segs])
except Exception:
text = ""
results.append(
{"start": seg["start"], "end": seg["end"], "text": text}
)
return results
def tts_synthesize(text: str, out_path: str, model: str = "kokoro"):
"""Sintetiza `text` a `out_path` usando Coqui TTS si está disponible,
o pyttsx3 como fallback simple.
"""
try:
# Intentar Coqui TTS
from TTS.api import TTS
# El usuario debe tener el modelo descargado o especificar el id
tts = TTS(model_name=model, progress_bar=False, gpu=False)
tts.tts_to_file(text=text, file_path=out_path)
return True
except Exception:
try:
# Fallback a pyttsx3 (menos natural, offline)
import pyttsx3
engine = pyttsx3.init()
engine.save_to_file(text, out_path)
engine.runAndWait()
return True
except Exception:
return False
def ensure_tts_model(repo_id: str):
"""Descarga un repo de Hugging Face y devuelve la ruta local.
Usa huggingface_hub.snapshot_download. Si la descarga falla, devuelve
el repo_id tal cual (se intentará usar como id remoto).
"""
try:
from huggingface_hub import snapshot_download
print(f"Descargando modelo TTS desde: {repo_id} ...")
try:
# intentar descarga explícita como 'model' (útil para ids con '/').
local_dir = snapshot_download(repo_id, repo_type="model")
except Exception:
# fallback al comportamiento por defecto
local_dir = snapshot_download(repo_id)
print(f"Modelo descargado en: {local_dir}")
return local_dir
except Exception as e:
print(f"No se pudo descargar el modelo {repo_id}: {e}")
return repo_id
def _pad_or_trim_wav(in_path: str, out_path: str, target_duration: float):
"""Pad or trim `in_path` WAV to `target_duration` seconds using ffmpeg.
Creates `out_path` with exactly target_duration seconds. If input is
shorter, pads with silence; if longer, trims.
"""
import subprocess
# ffmpeg -y -i in.wav -af apad=pad_dur=...,atrim=duration=... -ar 16000 -ac 1 out.wav
try:
# Use apad then atrim to ensure exact duration
cmd = [
"ffmpeg",
"-y",
"-i",
in_path,
"-af",
f"apad=pad_dur={max(0, target_duration)}",
"-t",
f"{target_duration}",
"-ar",
"16000",
"-ac",
"1",
out_path,
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except Exception:
return False
def synthesize_segment_tts(text: str, model: str, dur: float, out_wav: str) -> bool:
"""Sintetiza `text` en `out_wav` y ajusta su duración a `dur` segundos.
- Primero genera un WAV temporal con `tts_synthesize`.
- Luego lo pad/recorta a `dur` usando ffmpeg.
"""
import tempfile
import os
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
ok = tts_synthesize(text, tmp_path, model=model)
if not ok:
# cleanup
try:
os.remove(tmp_path)
except Exception:
pass
return False
# ajustar duración
adjusted = _pad_or_trim_wav(tmp_path, out_wav, target_duration=dur)
try:
os.remove(tmp_path)
except Exception:
pass
return adjusted
except Exception:
return False
def synthesize_dubbed_audio(
src_audio: str,
segments: list,
tts_model: str,
out_path: str,
mode: str = "replace",
mix_level: float = 0.75,
):
"""Genera una pista doblada a partir de `segments` y el audio fuente.
- segments: lista de dicts con 'start','end','text' (en segundos).
- mode: 'replace' (devuelve solo TTS concatenado) o 'mix' (mezcla TTS y original).
- mix_level: volumen relativo del TTS cuando se mezcla (0-1).
Retorna True si se generó correctamente `out_path`.
"""
import tempfile
import os
import subprocess
# Normalizar segmentos a lista de dicts {'start','end','text'}
norm_segments = []
for s in segments:
if hasattr(s, "start"):
norm_segments.append({"start": float(s.start), "end": float(s.end), "text": getattr(s, "text", "")})
else:
norm_segments.append({"start": float(s.get("start", 0.0)), "end": float(s.get("end", 0.0)), "text": s.get("text", "")})
# crear carpeta temporal para segmentos TTS
with tempfile.TemporaryDirectory() as tmpdir:
tts_segment_paths = []
for i, seg in enumerate(norm_segments):
start = float(seg.get("start", 0.0))
end = float(seg.get("end", start))
dur = max(0.001, end - start)
text = (seg.get("text") or "").strip()
out_seg = os.path.join(tmpdir, f"seg_{i:04d}.wav")
if not text:
# crear silencio de duración dur
try:
cmd = [
"ffmpeg",
"-y",
"-f",
"lavfi",
"-i",
f"anullsrc=channel_layout=mono:sample_rate=16000",
"-t",
f"{dur}",
"-ar",
"16000",
"-ac",
"1",
out_seg,
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
tts_segment_paths.append(out_seg)
except Exception:
return False
continue
ok = synthesize_segment_tts(text, tts_model, dur, out_seg)
if not ok:
return False
tts_segment_paths.append(out_seg)
# crear lista de concatenación
concat_list = os.path.join(tmpdir, "concat.txt")
with open(concat_list, "w", encoding="utf-8") as f:
for p in tts_segment_paths:
f.write(f"file '{p}'\n")
# concatenar segmentos en un WAV final temporal
final_tmp = os.path.join(tmpdir, "tts_full.wav")
try:
cmd = [
"ffmpeg",
"-y",
"-f",
"concat",
"-safe",
"0",
"-i",
concat_list,
"-c",
"copy",
final_tmp,
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception:
return False
# si el modo es replace, mover final_tmp a out_path (con conversión si es necesario)
try:
if mode == "replace":
# convertir a WAV 16k mono si no lo está
cmd = [
"ffmpeg",
"-y",
"-i",
final_tmp,
"-ar",
"16000",
"-ac",
"1",
out_path,
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
# modo mix: mezclar pista TTS con la original en out_path
# ajustar volumen del TTS
# ffmpeg -i original -i tts -filter_complex "[1:a]volume=LEVEL[a1];[0:a][a1]amix=inputs=2:normalize=0[out]" -map "[out]" out.wav
tts_level = float(max(0.0, min(1.0, mix_level)))
cmd = [
"ffmpeg",
"-y",
"-i",
src_audio,
"-i",
final_tmp,
"-filter_complex",
f"[1:a]volume={tts_level}[a1];[0:a][a1]amix=inputs=2:duration=longest:dropout_transition=0",
"-ar",
"16000",
"-ac",
"1",
out_path,
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except Exception:
return False
if __name__ == "__main__":
main()