1415 lines
59 KiB
Python

import os
import json
import subprocess
import requests
import time
import re
import tempfile
import glob
from fastapi import FastAPI, HTTPException, UploadFile, File
from typing import List, Dict, Any, cast
# Intentar importar youtube_transcript_api como fallback
try:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
YOUTUBE_TRANSCRIPT_API_AVAILABLE = True
except Exception:
# definir placeholders para evitar NameError si la librería no está instalada
YouTubeTranscriptApi = None
class TranscriptsDisabled(Exception):
pass
class NoTranscriptFound(Exception):
pass
YOUTUBE_TRANSCRIPT_API_AVAILABLE = False
# Import CookieManager from yt_wrap to provide cookiefile paths per request
from yt_wrap import CookieManager
app = FastAPI(title="TubeScript API Pro - JSON Cleaner")
# Ruta de cookies configurable vía variable de entorno: API_COOKIES_PATH
# Por defecto, usar ./data/cookies.txt para agrupar configuraciones en la carpeta data
DEFAULT_COOKIES_PATH = './data/cookies.txt'
# Proxy opcional para requests/yt-dlp (ej. socks5h://127.0.0.1:9050)
DEFAULT_PROXY = os.getenv('API_PROXY', '')
def clean_youtube_json(raw_json: Dict) -> List[Dict]:
"""
Transforma el formato complejo 'json3' de YouTube a un formato
simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}]
"""
clean_data = []
# YouTube guarda los eventos de texto en la llave 'events'
events = raw_json.get('events', [])
for event in events:
# Solo procesamos eventos que tengan segmentos de texto
if 'segs' in event:
text = "".join([seg['utf8'] for seg in event['segs']]).strip()
if text and text != '\n':
clean_data.append({
"start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos
"duration": event.get('dDurationMs', 0) / 1000.0,
"text": text.replace('\n', ' ')
})
return clean_data
def parse_subtitle_format(content: str, format_type: str) -> List[Dict]:
"""
Parsea diferentes formatos de subtítulos (json3, srv3, vtt) al formato estándar
"""
try:
if format_type == 'json3':
# Formato JSON3 de YouTube
data = json.loads(content) if isinstance(content, str) else content
return clean_youtube_json(data)
elif format_type in ['srv3', 'vtt']:
# Para srv3 y vtt, intentar parsear como JSON primero
try:
data = json.loads(content) if isinstance(content, str) else content
# srv3 también tiene estructura similar a json3
if 'events' in data:
return clean_youtube_json(data)
except:
pass
# Si no es JSON, intentar parsear como texto VTT
clean_data = []
lines = content.split('\n') if isinstance(content, str) else []
current_time = 0.0
current_text = ""
for line in lines:
line = line.strip()
if not line or line.startswith('WEBVTT') or '-->' in line:
if '-->' in line:
# Extraer tiempo de inicio
try:
time_parts = line.split('-->')[0].strip().split(':')
if len(time_parts) >= 2:
current_time = float(time_parts[-2]) * 60 + float(time_parts[-1])
except:
pass
continue
if line and not line.isdigit():
current_text = line
if current_text:
clean_data.append({
"start": current_time,
"duration": 2.0, # Duración aproximada
"text": current_text
})
current_time += 2.0
return clean_data if clean_data else []
else:
# Formato desconocido, intentar como JSON
data = json.loads(content) if isinstance(content, str) else content
if 'events' in data:
return clean_youtube_json(data)
return []
except Exception as e:
print(f"Error parsing subtitle format {format_type}: {e}")
return []
def extract_video_id(video_id_or_url: str) -> str:
"""
Normaliza la entrada y extrae el video_id si se recibe una URL completa.
Acepta: https://www.youtube.com/watch?v=ID, youtu.be/ID, o el propio ID.
"""
if not video_id_or_url:
return ""
s = video_id_or_url.strip()
# Si ya parece un id (11-20 caracteres alfanuméricos y -, _), retornarlo
if re.match(r'^[A-Za-z0-9_-]{8,20}$', s):
return s
# Intentar extraer de URL completa
# watch?v=
m = re.search(r'[?&]v=([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# youtu.be/
m = re.search(r'youtu\.be/([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# /v/ or /embed/
m = re.search(r'(?:/v/|/embed/)([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# Si no se detecta, devolver la entrada original (fallará después si es inválida)
return s
def format_segments_text(segments: List[Dict]) -> List[str]:
"""Devuelve una lista 'format_text' con textos limpios extraídos de segments.
- elimina prefijos tipo 'Kind: captions'
- elimina contenido en corchetes/paréntesis
- elimina etiquetas HTML
- normaliza espacios
- divide por saltos de línea para obtener frases independientes
"""
def _clean_text(t: str) -> str:
if not t:
return ''
s = str(t).strip()
s = re.sub(r'^\s*Kind\s*:\s*.*$', '', s, flags=re.IGNORECASE).strip()
# eliminar contenido entre corchetes (no-greedy)
s = re.sub(r'\[[^\]]*\]', '', s)
s = re.sub(r'\([^\)]*\)', '', s)
s = re.sub(r'<[^>]+>', '', s)
s = re.sub(r'[♪★■◆►▶◀•–—]', '', s)
s = re.sub(r'\s+', ' ', s).strip()
return s
output: List[str] = []
for seg in segments or []:
raw = seg.get('text', '')
cleaned = _clean_text(raw)
if not cleaned:
continue
parts = [p.strip() for p in re.split(r'[\n\r]+', cleaned) if p.strip()]
output.extend(parts)
return output
NODE_PATH = "/usr/bin/node"
def _yt_client_args(has_cookies: bool, for_stream: bool = False) -> list:
"""Devuelve --extractor-args y --js-runtimes para metadata/streams.
Estrategia (basada en pruebas reales 2026-03-05):
- Sin cookies → android (sin n-challenge, sin Node.js)
- Con cookies → web + Node.js (web acepta cookies; Node resuelve n-challenge/signature)
- for_stream → android (mejor compatibilidad HLS en lives)
Diagnóstico:
- mweb con cookies → requiere GVS PO Token (no disponible)
- android con cookies → yt-dlp lo salta (no soporta cookies)
- web con cookies + --js-runtimes node → ✅ funciona
"""
if for_stream or not has_cookies:
return ["--extractor-args", "youtube:player_client=android"]
else:
return [
"--extractor-args", "youtube:player_client=web",
"--js-runtimes", f"node:{NODE_PATH}",
]
def _yt_subs_args(has_cookies: bool) -> list:
"""Devuelve --extractor-args para descarga de subtítulos.
Para subtítulos siempre usamos android:
- android sin cookies → ✅ funciona, obtiene auto-subs sin n-challenge
- android con cookies → yt-dlp lo salta pero descarga igual sin cookies
- web con cookies → falla en sub-langs no exactos (ej: en vs en-US)
Resultado: android es siempre el cliente más fiable para subtítulos.
"""
return ["--extractor-args", "youtube:player_client=android"]
# Nuevo helper: obtener thumbnails para un video — usa URLs estáticas directas (sin yt-dlp)
def get_video_thumbnails(video_id: str) -> List[str]:
"""Devuelve URLs de thumbnail sin llamar yt-dlp (rápido, sin bloquear el transcript).
YouTube siempre tiene estas URLs disponibles para cualquier video público.
"""
return [
f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg",
f"https://img.youtube.com/vi/{video_id}/sddefault.jpg",
f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg",
f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg",
f"https://img.youtube.com/vi/{video_id}/default.jpg",
]
def get_transcript_data(video_id: str, lang: str = "es"):
video_id = extract_video_id(video_id)
if not video_id:
return None, [], "video_id inválido o vacío"
url = f"https://www.youtube.com/watch?v={video_id}"
# Use CookieManager to get a cookiefile path per request (may be None)
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
# cookies_path: prefer the temporary cookiefile if present, otherwise fall back to env path
cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
# proxy support
proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
proxies = {'http': proxy, 'https': proxy} if proxy else None
def load_cookies_from_file(path: str) -> dict:
"""Parsea un cookies.txt en formato Netscape a un dict usable por requests."""
cookies = {}
try:
if not path or not os.path.exists(path):
return cookies
with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
for line in fh:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('\t')
# formato Netscape: domain, flag, path, secure, expiration, name, value
if len(parts) >= 7:
name = parts[5].strip()
value = parts[6].strip()
if name:
cookies[name] = value
else:
# fallback: intento simple name=value
if '=' in line:
k, v = line.split('=', 1)
cookies[k.strip()] = v.strip()
except Exception:
return {}
return cookies
cookies_for_requests = load_cookies_from_file(cookies_path) if cookies_path else {}
# Intento rápido y fiable: usar yt-dlp para descargar subtítulos (auto o manual) al tmpdir
try:
with tempfile.TemporaryDirectory() as tmpdl:
# Construir lista amplia de variantes de idioma
# yt-dlp usa códigos exactos; cubrimos las variantes más comunes
sub_langs = [lang]
if lang == "en":
sub_langs = ["en", "en-US", "en-en", "en-GB", "en-CA", "en-AU"]
elif lang == "es":
sub_langs = ["es", "es-419", "es-MX", "es-ES", "es-LA", "es-en"]
elif len(lang) == 2:
sub_langs = [lang, f"{lang}-{lang.upper()}", f"{lang}-419", f"{lang}-en"]
# siempre android para subtítulos — NO pasar --cookies porque android no las soporta
# (yt-dlp salta el cliente android si recibe cookies → no descarga nada)
ytdlp_cmd = [
"yt-dlp",
url,
"--skip-download",
"--write-auto-sub",
"--write-sub",
"--sub-format", "vtt/json3/srv3/best",
"-o", os.path.join(tmpdl, "%(id)s.%(ext)s"),
"--no-warnings",
"--sub-lang", ",".join(sub_langs),
] + _yt_subs_args(False)
# NO se pasan cookies con android (android no las soporta en yt-dlp)
# attach proxy if configured
if proxy:
ytdlp_cmd.extend(['--proxy', proxy])
try:
result = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=120)
stderr = (result.stderr or "").lower()
# Error: YouTube pide autenticación
if result.returncode != 0 and ('sign in' in stderr or 'confirm you' in stderr or 'bot' in stderr):
return None, get_video_thumbnails(video_id), "YouTube requiere autenticación para este video. Sube un cookies.txt válido con /upload_cookies."
# Si yt-dlp falló por rate limiting, devolver mensaje claro
stderr = (result.stderr or "").lower()
if result.returncode != 0 and ('sign in' in stderr or 'confirm you' in stderr or 'bot' in stderr):
return None, get_video_thumbnails(video_id), "YouTube requiere autenticación para este video. Sube un cookies.txt válido con /upload_cookies."
if result.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr):
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega un cookies.txt válido exportado desde tu navegador y monta en el contenedor, o espera unos minutos."
if result.returncode != 0 and ('http error 403' in stderr or 'forbidden' in stderr):
return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). El video puede tener restricciones. Usa cookies.txt con una cuenta autorizada."
except subprocess.TimeoutExpired:
pass
# revisar archivos creados — yt-dlp genera nombres con doble extensión: ID.lang.vtt
# glob "ID.*" no hace match; usar "ID*" para cubrir ID.en.vtt, ID.en-en.vtt, etc.
files = glob.glob(os.path.join(tmpdl, f"{video_id}*"))
# filtrar solo archivos de texto (vtt, json3, srv3, ttml, srt)
files = [f for f in files if os.path.isfile(f) and
any(f.endswith(ext) for ext in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))]
if files:
combined = []
seen_content = set()
for fpath in files:
try:
with open(fpath, 'r', encoding='utf-8') as fh:
content = fh.read()
# desduplicar archivos con mismo contenido (en.vtt vs en-en.vtt)
content_hash = hash(content[:500])
if content_hash not in seen_content:
seen_content.add(content_hash)
combined.append(content)
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
parsed = parse_subtitle_format(vtt_combined, 'vtt')
# filtrar segmentos de ruido del header VTT
_noise = {'kind: captions', 'language:', 'webvtt', 'position:', 'align:'}
parsed = [s for s in parsed if s.get('text') and
not any(s['text'].lower().startswith(n) for n in _noise)]
if parsed:
return parsed, get_video_thumbnails(video_id), None
finally:
# cleanup any temp cookiefile created for this request
try:
cookie_mgr.cleanup()
except Exception:
pass
# ...existing code continues...
# 1) Intento principal: obtener metadata con yt-dlp
_has_ck = os.path.exists(cookies_path)
command = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
] + _yt_client_args(_has_ck) + [url]
if _has_ck:
command.extend(["--cookies", cookies_path])
if proxy:
command.extend(['--proxy', proxy])
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
error_msg = result.stderr if result.stderr else "Error desconocido from yt-dlp"
# Si yt-dlp reporta algo, enviar mensaje útil
# No abortar inmediatamente: intentaremos fallback descargando subs con yt-dlp
video_metadata = None
else:
if not result.stdout.strip():
video_metadata = None
else:
try:
video_metadata = json.loads(result.stdout)
except Exception:
video_metadata = None
except subprocess.TimeoutExpired:
video_metadata = None
except FileNotFoundError:
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar."
except Exception as e:
video_metadata = None
requested_subs = {}
if video_metadata:
requested_subs = video_metadata.get('requested_subtitles', {}) or {}
# Buscar en automatic_captions y subtitles si requested_subs está vacío
if not requested_subs:
automatic_captions = video_metadata.get('automatic_captions', {}) or {}
for lang_key, formats in automatic_captions.items():
if lang in lang_key or lang_key.startswith(lang):
if formats:
requested_subs = {lang_key: formats[0]}
break
if not requested_subs:
subtitles = video_metadata.get('subtitles', {}) or {}
for lang_key, formats in subtitles.items():
if lang in lang_key or lang_key.startswith(lang):
if formats:
requested_subs = {lang_key: formats[0]}
break
# Si requested_subs está disponible, intentar descargar vía requests la URL proporcionada
if requested_subs:
lang_key = next(iter(requested_subs))
sub_url = requested_subs[lang_key].get('url')
if sub_url:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Referer': 'https://www.youtube.com/',
}
max_retries = 3
response = None
rate_limited = False
for attempt in range(max_retries):
try:
response = requests.get(sub_url, headers=headers, timeout=30, cookies=cookies_for_requests, proxies=proxies)
if response.status_code == 200:
break
elif response.status_code == 429:
rate_limited = True
if attempt < max_retries - 1:
time.sleep(2 * (attempt + 1))
continue
else:
# salir del loop y usar fallback con yt-dlp más abajo
break
elif response.status_code == 403:
return None, get_video_thumbnails(video_id), "Acceso denegado (HTTP 403). El video puede tener restricciones de edad o región. Intenta con cookies.txt."
elif response.status_code == 404:
# No encontramos la URL esperada; intentar fallback
response = None
break
else:
return None, get_video_thumbnails(video_id), f"Error al descargar subtítulos desde YouTube (HTTP {response.status_code})."
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
continue
return None, get_video_thumbnails(video_id), "Timeout al descargar subtítulos. Intenta nuevamente."
except requests.exceptions.RequestException as e:
return None, get_video_thumbnails(video_id), f"Error de conexión al descargar subtítulos: {str(e)[:100]}"
# Si obtuvimos un 200, procesarlo; si hubo rate limiting, intentar fallback con yt-dlp
if response and response.status_code == 200:
subtitle_format = requested_subs[lang_key].get('ext', 'json3')
try:
# Si la respuesta parece ser una playlist M3U8 o contiene enlaces a timedtext,
# extraer las URLs y concatenar su contenido (VTT) antes de parsear.
text_body = response.text if isinstance(response.text, str) else None
if text_body and ('#EXTM3U' in text_body or 'timedtext' in text_body or text_body.strip().lower().startswith('#extm3u')):
# Extraer URLs (líneas que empiecen con http)
urls = re.findall(r'^(https?://\S+)', text_body, flags=re.M)
# Intento 1: descargar cada URL con requests (usa cookies montadas si aplican)
combined = []
for idx, u in enumerate(urls):
try:
r2 = requests.get(u, headers=headers, timeout=20, cookies=cookies_for_requests, proxies=proxies)
if r2.status_code == 200 and r2.text:
combined.append(r2.text)
continue
# Si recibimos 429, 403, o falló, intentaremos con yt-dlp (fallback)
if r2.status_code == 429:
# fallback a yt-dlp
raise Exception('rate_limited')
except Exception:
# fallthrough al fallback con yt-dlp
pass
# Intento 2 (fallback): usar yt-dlp para descargar ese timedtext/url a un archivo temporal
try:
with tempfile.TemporaryDirectory() as tdir:
out_template = os.path.join(tdir, f"timedtext_{idx}.%(ext)s")
ytdlp_cmd = [
"yt-dlp",
u,
"-o", out_template,
"--no-warnings",
]
if os.path.exists(cookies_path):
ytdlp_cmd.extend(["--cookies", cookies_path])
# pasar proxy a yt-dlp si está configurado
if proxy:
ytdlp_cmd.extend(['--proxy', proxy])
try:
res2 = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=60)
stderr2 = (res2.stderr or "").lower()
if res2.returncode != 0 and ('http error 429' in stderr2 or 'too many requests' in stderr2):
# rate limit cuando intentamos descargar timedtext
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde."
if res2.returncode != 0 and ('http error 403' in stderr2 or 'forbidden' in stderr2):
return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). Intenta con cookies.txt o una cuenta con permisos."
except Exception:
pass
# leer cualquier archivo creado en el tempdir
for fpath in glob.glob(os.path.join(tdir, "timedtext_*.*")):
try:
with open(fpath, 'r', encoding='utf-8') as fh:
txt = fh.read()
if txt:
combined.append(txt)
except Exception:
continue
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt')
if formatted_transcript:
return formatted_transcript, get_video_thumbnails(video_id)
except Exception as e:
return None, get_video_thumbnails(video_id), f"Error al procesar los subtítulos: {str(e)[:200]}"
if not formatted_transcript:
return None, get_video_thumbnails(video_id), "Los subtítulos están vacíos o no se pudieron procesar."
return formatted_transcript, get_video_thumbnails(video_id), None
# Si hubo rate limiting, intentar fallback con yt-dlp para descargar la URL de subtítulos
if rate_limited and (not response or response.status_code != 200):
# Intentar descargar la URL de subtítulos directamente con yt-dlp (usa cookies si existen)
try:
with tempfile.TemporaryDirectory() as tdir:
out_template = os.path.join(tdir, "sub.%(ext)s")
ytdlp_cmd = [
"yt-dlp",
sub_url,
"-o", out_template,
"--no-warnings",
]
if os.path.exists(cookies_path):
ytdlp_cmd.extend(["--cookies", cookies_path])
if proxy:
ytdlp_cmd.extend(['--proxy', proxy])
res = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=90)
stderr = (res.stderr or "").lower()
if res.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr):
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde."
# Leer archivos generados
combined = []
for fpath in glob.glob(os.path.join(tdir, "*.*")):
try:
with open(fpath, 'r', encoding='utf-8') as fh:
txt = fh.read()
if txt:
combined.append(txt)
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt')
if formatted_transcript:
return formatted_transcript, get_video_thumbnails(video_id)
except FileNotFoundError:
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar."
except Exception:
# seguir con otros fallbacks
pass
# si no logró con yt-dlp, continuar y dejar que los fallbacks posteriores manejen el caso
# Fallback: intentarlo descargando subtítulos con yt-dlp a un directorio temporal
# (esto cubre casos en que la metadata no incluye requested_subs)
try:
with tempfile.TemporaryDirectory() as tmpdir:
# Intentar con auto-sub primero, luego con sub (manual)
ytdlp_variants = [
("--write-auto-sub", "auto"),
("--write-sub", "manual")
]
downloaded = None
for flag, label in ytdlp_variants:
cmd = [
"yt-dlp",
url,
"--skip-download",
flag,
"--sub-lang", lang,
"--sub-format", "json3/vtt/srv3/best",
"-o", os.path.join(tmpdir, "%(id)s.%(ext)s"),
"--no-warnings",
] + _yt_subs_args(False)
# NO cookies con android (android no las soporta, yt-dlp lo saltaría)
# añadir proxy a la llamada de yt-dlp si está configurado
if proxy:
cmd.extend(['--proxy', proxy])
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
# Revisar si se creó algún archivo en tmpdir (doble ext: ID.en.vtt)
files = glob.glob(os.path.join(tmpdir, f"{video_id}*"))
files = [f for f in files if os.path.isfile(f) and
any(f.endswith(e) for e in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))]
if files:
# Tomar el primero válido
downloaded = files[0]
break
if downloaded:
ext = os.path.splitext(downloaded)[1].lstrip('.')
try:
with open(downloaded, 'r', encoding='utf-8') as fh:
content = fh.read()
except Exception as e:
return None, get_video_thumbnails(video_id), f"Error leyendo archivo de subtítulos descargado: {str(e)[:200]}"
# Intentar parsear según extensión conocida
fmt = 'json3' if ext in ('json', 'json3') else ('vtt' if ext == 'vtt' else 'srv3')
formatted_transcript = parse_subtitle_format(content, fmt)
if formatted_transcript:
return formatted_transcript, get_video_thumbnails(video_id), None
else:
return None, get_video_thumbnails(video_id), "Se descargaron subtítulos pero no se pudieron procesar."
except FileNotFoundError:
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado. Instala yt-dlp en el contenedor/entorno y vuelve a intentar."
except Exception as e:
# No hacer crash, retornar mensaje general
return None, get_video_thumbnails(video_id), f"Error al intentar descargar subtítulos con yt-dlp: {str(e)[:200]}"
return None, get_video_thumbnails(video_id), (
f"No se encontraron subtítulos para este video en idioma '{lang}'. "
"Puede que el video no tenga subtítulos, estén en otro idioma, o requiera autenticación. "
"Prueba: ?lang=en | /debug/fetch_subs/{video_id} | sube cookies con /upload_cookies"
)
# ── Clientes exactos de NewPipeExtractor (ClientsConstants.java dev 2026-03-05) ──
_NP_IOS = {
"clientName": "IOS", "clientVersion": "21.03.2",
"clientScreen": "WATCH", "platform": "MOBILE",
"deviceMake": "Apple", "deviceModel": "iPhone16,2",
"osName": "iOS", "osVersion": "18.7.2.22H124",
"userAgent": "com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X;)",
}
_NP_ANDROID = {
"clientName": "ANDROID", "clientVersion": "21.03.36",
"clientScreen": "WATCH", "platform": "MOBILE",
"osName": "Android", "osVersion": "16", "androidSdkVersion": 36,
"userAgent": "com.google.android.youtube/21.03.36 (Linux; U; Android 16) gzip",
}
# GAPIS: youtubei.googleapis.com — NewPipe lo usa para iOS y Android (YoutubeStreamHelper.java)
_GAPIS_BASE = "https://youtubei.googleapis.com/youtubei/v1"
def _np_build_ctx(client: dict, visitor_data: str = "") -> dict:
"""context.client igual que prepareJsonBuilder de YoutubeParsingHelper.java."""
ctx = {
"clientName": client["clientName"],
"clientVersion": client["clientVersion"],
"clientScreen": client.get("clientScreen", "WATCH"),
"platform": client.get("platform", "MOBILE"),
"hl": "en", "gl": "US", "utcOffsetMinutes": 0,
}
if visitor_data:
ctx["visitorData"] = visitor_data
for k in ("deviceMake", "deviceModel", "osName", "osVersion", "androidSdkVersion"):
if client.get(k):
ctx[k] = client[k]
return ctx
def _np_get_visitor_data(client: dict, proxies: dict = None) -> str:
"""POST /visitor_id → responseContext.visitorData (getVisitorDataFromInnertube)."""
try:
ctx = _np_build_ctx(client)
payload = {
"context": {
"client": ctx,
"request": {"internalExperimentFlags": [], "useSsl": True},
"user": {"lockedSafetyMode": False},
}
}
headers = {
"User-Agent": client["userAgent"],
"X-Goog-Api-Format-Version": "2",
"Content-Type": "application/json",
}
r = requests.post(
f"{_GAPIS_BASE}/visitor_id?prettyPrint=false",
json=payload, headers=headers, timeout=8, proxies=proxies,
)
if r.status_code == 200:
return r.json().get("responseContext", {}).get("visitorData", "")
except Exception:
pass
return ""
def _np_call_player(video_id: str, client: dict,
visitor_data: str = "", proxies: dict = None) -> dict:
"""POST /player igual que getIosPlayerResponse/getAndroidPlayerResponse de NewPipe."""
import string as _str
n = int(time.time())
chars = _str.digits + _str.ascii_lowercase
t = ""
while n:
t = chars[n % 36] + t
n //= 36
url = f"{_GAPIS_BASE}/player?prettyPrint=false&t={t or '0'}&id={video_id}"
ctx = _np_build_ctx(client, visitor_data)
payload = {
"context": {
"client": ctx,
"request": {"internalExperimentFlags": [], "useSsl": True},
"user": {"lockedSafetyMode": False},
},
"videoId": video_id,
"contentCheckOk": True,
"racyCheckOk": True,
}
headers = {
"User-Agent": client["userAgent"],
"X-Goog-Api-Format-Version": "2",
"Content-Type": "application/json",
}
try:
r = requests.post(url, json=payload, headers=headers, timeout=15, proxies=proxies)
if r.status_code == 200:
return r.json()
except Exception:
pass
return {}
def innertube_get_stream(video_id: str, proxy: str = None) -> dict:
"""
Obtiene URL de stream replicando exactamente NewPipeExtractor:
1. visitorData via /visitor_id (para ambos clientes)
2. iOS /player → iosStreamingData.hlsManifestUrl (prioritario para lives)
3. Android /player → formats directas (videos normales)
Sin cookies | Sin firma JS | Sin PO Token | Sin bot-check desde servidores
"""
result = {
"title": None, "description": None,
"is_live": False, "hls_url": None,
"formats": [], "error": None,
}
proxies = {"http": proxy, "https": proxy} if proxy else None
vd_ios = _np_get_visitor_data(_NP_IOS, proxies)
vd_android = _np_get_visitor_data(_NP_ANDROID, proxies)
# iOS — preferido para hlsManifestUrl en lives (como hace NewPipe)
ios = _np_call_player(video_id, _NP_IOS, vd_ios, proxies)
ps = ios.get("playabilityStatus") or {}
if ps.get("status") == "LOGIN_REQUIRED":
result["error"] = f"Login requerido: {ps.get('reason','')}"
return result
vd_meta = ios.get("videoDetails") or {}
result["title"] = vd_meta.get("title")
result["description"] = vd_meta.get("shortDescription")
result["is_live"] = bool(vd_meta.get("isLive") or vd_meta.get("isLiveContent"))
ios_sd = ios.get("streamingData") or {}
hls = ios_sd.get("hlsManifestUrl")
if hls:
result["hls_url"] = hls
result["formats"] = [
{"itag": f.get("itag"), "mimeType": f.get("mimeType"), "quality": f.get("quality")}
for f in (ios_sd.get("formats", []) + ios_sd.get("adaptiveFormats", []))[:8]
]
return result
# Android — para videos normales o si iOS no dio HLS
android = _np_call_player(video_id, _NP_ANDROID, vd_android, proxies)
if not result["title"]:
vd2 = android.get("videoDetails") or {}
result["title"] = vd2.get("title")
result["description"] = vd2.get("shortDescription")
result["is_live"] = bool(vd2.get("isLive") or vd2.get("isLiveContent"))
android_sd = android.get("streamingData") or {}
hls = android_sd.get("hlsManifestUrl")
if hls:
result["hls_url"] = hls
return result
all_fmts = android_sd.get("formats", []) + android_sd.get("adaptiveFormats", [])
best = sorted([f for f in all_fmts if f.get("url")],
key=lambda x: x.get("bitrate", 0), reverse=True)
if best:
result["hls_url"] = best[0]["url"]
result["formats"] = [
{"itag": f.get("itag"), "mimeType": f.get("mimeType"), "quality": f.get("quality")}
for f in best[:8]
]
return result
result["error"] = (
"Innertube no devolvió streamingData. "
"Puede ser DRM, región bloqueada, privado, o YouTube actualizó su API."
)
return result
def get_stream_url(video_id: str):
"""
Obtiene la URL de transmisión m3u8/HLS.
Devuelve: (stream_url, title, description, is_live, error)
Estrategia:
1. innertube_get_stream() — técnica NewPipe, sin cookies, sin bot-check
2. Fallback yt-dlp si Innertube falla
"""
video_id = extract_video_id(video_id)
proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
# ── 1. Innertube directo (NewPipe) ────────────────────────────────────────
it = innertube_get_stream(video_id, proxy=proxy)
if it.get("hls_url"):
return (it["hls_url"], it.get("title"), it.get("description"),
it.get("is_live", False), None)
title = it.get("title")
description = it.get("description")
is_live = it.get("is_live", False)
# ── 2. Fallback yt-dlp ────────────────────────────────────────────────────
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
cookies_path_env = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
effective_cookie = cookiefile_path or (
cookies_path_env if os.path.exists(cookies_path_env) else None)
has_ck = bool(effective_cookie)
yt_url = f"https://www.youtube.com/watch?v={video_id}"
BOT_MARKERS = ("sign in to confirm", "not a bot", "sign in to")
def _is_bot(s: str) -> bool:
return any(m in s.lower() for m in BOT_MARKERS)
def _build_args(client: str) -> list:
args = ["--no-warnings", "--no-check-certificate", "--no-playlist",
"--extractor-args", f"youtube:player_client={client}"]
if client == "web":
args += ["--js-runtimes", f"node:{NODE_PATH}"]
if effective_cookie and client == "web":
args += ["--cookies", effective_cookie]
if proxy:
args += ["--proxy", proxy]
return args
def _ytdlp_url(fmt: str, client: str):
cmd = ["yt-dlp", "-g", "-f", fmt] + _build_args(client) + [yt_url]
try:
res = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=90)
if res.returncode == 0 and res.stdout.strip():
for line in res.stdout.strip().splitlines():
line = line.strip()
if line.startswith("http"):
return line, False
return None, _is_bot(res.stderr or "")
except Exception:
return None, False
clients = ["android", "ios"] + (["web"] if has_ck else [])
fmts = (["91", "92", "93", "94", "95", "96",
"best[protocol=m3u8_native]", "best[protocol=m3u8]", "best"]
if is_live else
["best[ext=m3u8]", "best[protocol=m3u8_native]",
"best[protocol=m3u8]", "best", "best[ext=mp4]"])
got_bot = False
try:
for client in clients:
for fmt in fmts:
u, is_b = _ytdlp_url(fmt, client)
if u:
return u, title, description, is_live, None
if is_b:
got_bot = True
finally:
try:
cookie_mgr.cleanup()
except Exception:
pass
if got_bot:
return None, title, description, is_live, (
"YouTube detectó actividad de bot. "
"Sube cookies.txt: curl -X POST http://localhost:8282/upload_cookies -F 'file=@cookies.txt'"
)
return None, title, description, is_live, (
it.get("error") or
"No se pudo obtener la URL del stream. "
"Si es un live, verifica que esté EN VIVO (🔴) ahora mismo."
)
# ...existing code (old get_stream_url body — reemplazado arriba) — ELIMINAR...
@app.get("/transcript/{video_id}")
def transcript_endpoint(video_id: str, lang: str = "es"):
data, thumbnails, error = get_transcript_data(video_id, lang)
# Fallback automático a 'en' si no hay subs en el idioma pedido
if (error and lang != "en" and
"No se encontraron" in (error or "") and
"autenticación" not in (error or "")):
data_en, thumbnails_en, error_en = get_transcript_data(video_id, "en")
if data_en and not error_en:
data, thumbnails, error = data_en, thumbnails_en, None
if error:
raise HTTPException(status_code=400, detail=error)
# Concatenar texto de segmentos para mostrar como texto plano además de los segmentos
try:
combined_text = "\n".join([seg.get('text', '') for seg in data if seg.get('text')])
except Exception:
combined_text = ""
# Nuevo: arreglo format_text con cada segmento como elemento (texto limpio)
try:
format_text_list = format_segments_text(data)
except Exception:
format_text_list = []
format_text = format_text_list
return {
"video_id": video_id,
"count": len(data),
"segments": data,
"text": combined_text,
"format_text": format_text,
"thumbnails": thumbnails
}
@app.get('/transcript_vtt/{video_id}')
def transcript_vtt(video_id: str, lang: str = 'es'):
"""Descarga (con yt-dlp) y devuelve subtítulos en VTT, además de segmentos parseados y texto concatenado."""
vtt_text, error = fetch_vtt_subtitles(video_id, lang)
if error:
raise HTTPException(status_code=400, detail=error)
# parsear VTT a segmentos usando parse_subtitle_format
segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else []
combined_text = '\n'.join([s.get('text','') for s in segments])
# format_text con texto limpio listo para procesamiento por agentes
format_text = format_segments_text(segments)
thumbnails = get_video_thumbnails(video_id)
return {
'video_id': video_id,
'vtt': vtt_text,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text,
'thumbnails': thumbnails
}
@app.get("/stream/{video_id}")
def stream_endpoint(video_id: str):
"""
Obtiene la URL de transmisión (m3u8/HLS) de un video/live de YouTube.
- Para lives en vivo (🔴): devuelve URL HLS directa usable con FFmpeg/VLC.
- Para videos normales: devuelve la mejor URL de video disponible.
Ejemplo FFmpeg:
ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key
"""
stream_url, title, description, is_live, error = get_stream_url(video_id)
if error:
raise HTTPException(status_code=400, detail=error)
thumbnails = get_video_thumbnails(video_id)
url_type = "m3u8/hls" if stream_url and "m3u8" in stream_url.lower() else "direct/mp4"
return {
"video_id": video_id,
"title": title,
"description": description,
"is_live": is_live,
"stream_url": stream_url,
"url_type": url_type,
"youtube_url": f"https://www.youtube.com/watch?v={video_id}",
"ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key',
"thumbnails": thumbnails,
"usage": {
"description": "Usa stream_url con FFmpeg para retransmitir",
"command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}",
"platforms": {
"youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY",
"facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY",
"twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY",
"twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY"
}
}
}
@app.post('/upload_cookies')
async def upload_cookies(file: UploadFile = File(...)):
"""Endpoint para subir cookies.txt y guardarlo en el servidor en /app/cookies.txt"""
try:
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail='Archivo vacío')
# Determinar ruta objetivo a partir de la variable de entorno
target = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
target_dir = os.path.dirname(target) or '.'
# Crear directorio si no existe
try:
os.makedirs(target_dir, exist_ok=True)
except Exception:
# Si no se puede crear el directorio, intentamos escribir en el working dir como fallback
target = os.path.basename(target)
# Guardar con permisos de escritura
with open(target, 'wb') as fh:
fh.write(content)
return {"detail": "cookies.txt guardado correctamente", "path": os.path.abspath(target)}
except Exception as e:
raise HTTPException(status_code=500, detail=f'Error al guardar cookies: {str(e)[:200]}')
@app.get("/debug/metadata/{video_id}")
def debug_metadata(video_id: str):
"""Endpoint de depuración: obtiene --dump-json de yt-dlp para un video.
Devuelve la metadata (automatic_captions, subtitles, requested_subtitles) para inspección.
"""
# try to use dynamic cookiefile per request
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
url = f"https://www.youtube.com/watch?v={video_id}"
cmd = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
url
] + _yt_client_args(os.path.exists(cookies_path))
if os.path.exists(cookies_path):
cmd.extend(["--cookies", cookies_path])
if proxy:
cmd.extend(['--proxy', proxy])
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except FileNotFoundError:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail="yt-dlp no está instalado en el contenedor/entorno.")
except subprocess.TimeoutExpired:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=504, detail="yt-dlp demoró demasiado en responder.")
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail=str(e)[:300])
if proc.returncode != 0:
stderr = proc.stderr or ''
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail=f"yt-dlp error: {stderr[:1000]}")
try:
metadata = json.loads(proc.stdout)
except Exception:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail="No se pudo parsear la salida JSON de yt-dlp.")
try:
cookie_mgr.cleanup()
except Exception:
pass
# Devolver solo las partes útiles para depuración
debug_info = {
'id': metadata.get('id'),
'title': metadata.get('title'),
'uploader': metadata.get('uploader'),
'is_live': metadata.get('is_live'),
'automatic_captions': metadata.get('automatic_captions'),
'subtitles': metadata.get('subtitles'),
'requested_subtitles': metadata.get('requested_subtitles'),
'formats_sample': metadata.get('formats')[:5] if metadata.get('formats') else None,
}
return debug_info
@app.get('/debug/fetch_subs/{video_id}')
def debug_fetch_subs(video_id: str, lang: str = 'es'):
"""Intenta descargar subtítulos con yt-dlp dentro del entorno y devuelve el log y el contenido (parcial) si existe.
Usa cookies definidas en API_COOKIES_PATH.
"""
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
out_dir = tempfile.mkdtemp(prefix='subs_')
out_template = os.path.join(out_dir, '%(id)s.%(ext)s')
url = f"https://www.youtube.com/watch?v={video_id}"
cmd = [
'yt-dlp',
'--verbose',
'--skip-download',
'--write-auto-sub',
'--write-sub',
'--sub-lang', lang,
'--sub-format', 'json3/vtt/srv3/best',
'--output', out_template,
url
] + _yt_subs_args(bool(cookiefile_path))
if cookiefile_path:
cmd.extend(['--cookies', cookiefile_path])
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=240)
except FileNotFoundError:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail='yt-dlp no está instalado en el contenedor.')
except subprocess.TimeoutExpired:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=504, detail='La ejecución de yt-dlp demoró demasiado.')
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail=str(e)[:300])
stdout = proc.stdout or ''
stderr = proc.stderr or ''
rc = proc.returncode
# Buscar archivos generados (yt-dlp usa doble extensión: ID.lang.vtt)
generated = []
for f in glob.glob(os.path.join(out_dir, f"{video_id}*")):
size = None
try:
size = os.path.getsize(f)
# tomar las primeras 200 líneas para no retornar archivos enormes
with open(f, 'r', encoding='utf-8', errors='ignore') as fh:
sample = ''.join([next(fh) for _ in range(200)]) if size > 0 else ''
generated.append({
'path': f,
'size': size,
'sample': sample
})
except StopIteration:
# menos de 200 líneas
try:
with open(f, 'r', encoding='utf-8', errors='ignore') as fh:
sample = fh.read()
except Exception:
sample = None
if size is None:
try:
size = os.path.getsize(f)
except Exception:
size = 0
generated.append({'path': f, 'size': size, 'sample': sample})
except Exception:
if size is None:
try:
size = os.path.getsize(f)
except Exception:
size = 0
generated.append({'path': f, 'size': size, 'sample': None})
try:
cookie_mgr.cleanup()
except Exception:
pass
return {
'video_id': video_id,
'rc': rc,
'stdout_tail': stdout[-2000:],
'stderr_tail': stderr[-2000:],
'generated': generated,
'out_dir': out_dir
}
# Nuevo helper para descargar VTT directamente y retornarlo como texto
def fetch_vtt_subtitles(video_id: str, lang: str = 'es'):
"""Descarga subtítulos en formato VTT usando yt-dlp y devuelve su contenido.
Retorna (vtt_text, None) en caso de éxito o (None, error_message) en caso de error.
"""
url = f"https://www.youtube.com/watch?v={video_id}"
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
with tempfile.TemporaryDirectory() as tmpdir:
out_template = os.path.join(tmpdir, '%(id)s.%(ext)s')
cmd = [
'yt-dlp',
'--skip-download',
'--write-auto-sub',
'--write-sub',
'--sub-lang', lang,
'--sub-format', 'vtt',
'--output', out_template,
url
] + _yt_subs_args(bool(cookiefile_path))
if cookiefile_path:
cmd.extend(['--cookies', cookiefile_path])
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
except FileNotFoundError:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, 'yt-dlp no está instalado en el entorno.'
except subprocess.TimeoutExpired:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, 'La descarga de subtítulos tardó demasiado.'
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, f'Error ejecutando yt-dlp: {str(e)[:200]}'
stderr = (proc.stderr or '').lower()
if proc.returncode != 0:
try:
cookie_mgr.cleanup()
except Exception:
pass
if 'http error 429' in stderr or 'too many requests' in stderr:
return None, 'YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Revisa cookies.txt o prueba desde otra IP.'
if 'http error 403' in stderr or 'forbidden' in stderr:
return None, 'Acceso denegado al descargar subtítulos (HTTP 403). Usa cookies.txt con una cuenta autorizada.'
return None, f'yt-dlp error: {proc.stderr[:1000]}'
# buscar archivos generados (doble extensión: ID.lang.vtt)
files = glob.glob(os.path.join(tmpdir, f"{video_id}*"))
files = [f for f in files if os.path.isfile(f) and
any(f.endswith(e) for e in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))]
if not files:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, 'No se generaron archivos de subtítulos.'
# intentar preferir .vtt
vtt_path = None
for f in files:
if f.lower().endswith('.vtt'):
vtt_path = f
break
if not vtt_path:
vtt_path = files[0]
try:
with open(vtt_path, 'r', encoding='utf-8', errors='ignore') as fh:
content = fh.read()
try:
cookie_mgr.cleanup()
except Exception:
pass
return content, None
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, f'Error leyendo archivo de subtítulos: {str(e)[:200]}'
@app.post('/upload_vtt/{video_id}')
async def upload_vtt(video_id: str, file: UploadFile = File(...)):
"""Permite subir un archivo VTT para un video y devuelve segmentos parseados y texto.
Guarda el archivo en /app/data/{video_id}.vtt (sobrescribe si existe).
"""
try:
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail='Archivo vacío')
target_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(target_dir, exist_ok=True)
target_path = os.path.join(target_dir, f"{video_id}.vtt")
with open(target_path, 'wb') as fh:
fh.write(content)
# Leer como texto para parsear
text = content.decode('utf-8', errors='ignore')
segments = parse_subtitle_format(text, 'vtt') if text else []
combined_text = '\n'.join([s.get('text','') for s in segments])
format_text = format_segments_text(segments)
return {
'video_id': video_id,
'path': target_path,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text
}
except Exception as e:
raise HTTPException(status_code=500, detail=f'Error al guardar/parsear VTT: {str(e)[:200]}')
@app.get('/transcript_alt/{video_id}')
def transcript_alt(video_id: str, lang: str = 'es'):
"""Intento alternativo de obtener transcript usando youtube-transcript-api (si está disponible).
Retorna segmentos en el mismo formato que get_transcript_data para mantener consistencia.
"""
if not YOUTUBE_TRANSCRIPT_API_AVAILABLE:
raise HTTPException(status_code=501, detail='youtube-transcript-api no está instalado en el entorno.')
vid = extract_video_id(video_id)
if not vid:
raise HTTPException(status_code=400, detail='video_id inválido')
# preparar idiomas a probar
langs = [lang]
if len(lang) == 2:
langs.append(f"{lang}-419")
try:
# get_transcript puede lanzar excepciones si no hay transcript
# Usar cast para silenciar el analizador estático que no infiere la comprobación previa
transcript_api = cast(Any, YouTubeTranscriptApi)
transcript_list = transcript_api.get_transcript(vid, languages=langs)
except NoTranscriptFound:
raise HTTPException(status_code=404, detail='No se encontró transcript con youtube-transcript-api')
except TranscriptsDisabled:
raise HTTPException(status_code=403, detail='Los transcripts están deshabilitados para este video')
except Exception as e:
raise HTTPException(status_code=500, detail=f'Error youtube-transcript-api: {str(e)[:300]}')
# transcript_list tiene items con keys: text, start, duration
segments = []
for item in transcript_list:
segments.append({
'start': float(item.get('start', 0)),
'duration': float(item.get('duration', 0)),
'text': item.get('text', '').strip()
})
combined_text = '\n'.join([s['text'] for s in segments if s.get('text')])
format_text = format_segments_text(segments)
thumbnails = get_video_thumbnails(vid)
return {
'video_id': vid,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text,
'source': 'youtube-transcript-api'
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)