- Introduced `playwright_extract_m3u8.py` to extract M3U8 URLs from YouTube videos using Playwright. - Added `README_PLAYWRIGHT.md` for usage instructions and requirements. - Created `expand_and_test_proxies.py` to expand user-provided proxies and test their validity. - Implemented `generate_proxy_whitelist.py` to generate a whitelist of working proxies based on testing results. - Added sample proxy files: `user_proxies.txt` for user-defined proxies and `proxies_sample.txt` as a template. - Generated `expanded_proxies.txt`, `whitelist.json`, and `whitelist.txt` for storing expanded and valid proxies. - Included error handling and logging for proxy testing results.
2006 lines
83 KiB
Python
2006 lines
83 KiB
Python
import os
|
|
import json
|
|
import subprocess
|
|
import requests
|
|
import time
|
|
import re
|
|
import tempfile
|
|
import glob
|
|
import random
|
|
from fastapi import FastAPI, HTTPException, UploadFile, File
|
|
from typing import List, Dict, Any, cast
|
|
from fastapi.responses import JSONResponse
|
|
|
|
# Intentar importar youtube_transcript_api como fallback
|
|
try:
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
|
|
YOUTUBE_TRANSCRIPT_API_AVAILABLE = True
|
|
except Exception:
|
|
# definir placeholders para evitar NameError si la librería no está instalada
|
|
YouTubeTranscriptApi = None
|
|
class TranscriptsDisabled(Exception):
|
|
pass
|
|
class NoTranscriptFound(Exception):
|
|
pass
|
|
YOUTUBE_TRANSCRIPT_API_AVAILABLE = False
|
|
|
|
# Import CookieManager from yt_wrap to provide cookiefile paths per request
|
|
from yt_wrap import CookieManager
|
|
|
|
app = FastAPI(title="TubeScript API Pro - JSON Cleaner")
|
|
|
|
# Ruta de cookies configurable vía variable de entorno: API_COOKIES_PATH
|
|
# Por defecto, usar ./data/cookies.txt para agrupar configuraciones en la carpeta data
|
|
DEFAULT_COOKIES_PATH = './data/cookies.txt'
|
|
# Proxy opcional para requests/yt-dlp (ej. socks5h://127.0.0.1:9050)
|
|
DEFAULT_PROXY = os.getenv('API_PROXY', '')
|
|
|
|
# Nuevo: rotador/simple selector de proxies
|
|
# - Si se define API_PROXY se usa directamente.
|
|
# - Si se define API_PROXIES (lista separada por comas) se elige uno al azar.
|
|
# Ej: API_PROXIES="socks5h://127.0.0.1:9050,http://10.0.0.1:3128"
|
|
|
|
# Nuevo: ruta por defecto del archivo whitelist
|
|
PROXY_WHITELIST_FILE = os.getenv('PROXY_WHITELIST_FILE', 'tools/whitelist.txt')
|
|
_proxy_whitelist_cache = { 'ts': 0, 'proxies': [] }
|
|
|
|
|
|
def _load_whitelist_file(path: str, ttl: int = 30):
|
|
"""Carga proxies desde archivo path con TTL en segundos para cache.
|
|
Retorna lista de proxies (puede ser vacía).
|
|
"""
|
|
now = time.time()
|
|
if _proxy_whitelist_cache['proxies'] and (now - _proxy_whitelist_cache['ts'] < ttl):
|
|
return _proxy_whitelist_cache['proxies']
|
|
proxies = []
|
|
try:
|
|
if os.path.exists(path):
|
|
with open(path, 'r', encoding='utf-8') as fh:
|
|
for line in fh:
|
|
p = line.strip()
|
|
if p and not p.startswith('#'):
|
|
proxies.append(p)
|
|
except Exception:
|
|
proxies = []
|
|
_proxy_whitelist_cache['proxies'] = proxies
|
|
_proxy_whitelist_cache['ts'] = now
|
|
return proxies
|
|
|
|
|
|
def _get_proxy_choice() -> str | None:
|
|
"""Devuelve una URL de proxy elegida:
|
|
- Prioridad: API_PROXY (single) -> API_PROXIES (comma list) -> PROXY_WHITELIST_FILE -> None
|
|
"""
|
|
# 1) Legacy single proxy has priority
|
|
single = os.getenv('API_PROXY', '') or DEFAULT_PROXY or ''
|
|
if single:
|
|
return single
|
|
|
|
# 2) comma-separated list from env
|
|
lst = os.getenv('API_PROXIES', '') or ''
|
|
if lst:
|
|
proxies = [p.strip() for p in lst.split(',') if p.strip()]
|
|
if proxies:
|
|
return random.choice(proxies)
|
|
|
|
# 3) whitelist file
|
|
wl_file = os.getenv('PROXY_WHITELIST_FILE', PROXY_WHITELIST_FILE)
|
|
proxies = _load_whitelist_file(wl_file)
|
|
if proxies:
|
|
return random.choice(proxies)
|
|
|
|
return None
|
|
|
|
def clean_youtube_json(raw_json: Dict) -> List[Dict]:
|
|
"""
|
|
Transforma el formato complejo 'json3' de YouTube a un formato
|
|
simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}]
|
|
"""
|
|
clean_data = []
|
|
# YouTube guarda los eventos de texto en la llave 'events'
|
|
events = raw_json.get('events', [])
|
|
|
|
for event in events:
|
|
# Solo procesamos eventos que tengan segmentos de texto
|
|
if 'segs' in event:
|
|
text = "".join([seg['utf8'] for seg in event['segs']]).strip()
|
|
if text and text != '\n':
|
|
clean_data.append({
|
|
"start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos
|
|
"duration": event.get('dDurationMs', 0) / 1000.0,
|
|
"text": text.replace('\n', ' ')
|
|
})
|
|
return clean_data
|
|
|
|
def parse_subtitle_format(content: str, format_type: str) -> List[Dict]:
|
|
"""
|
|
Parsea diferentes formatos de subtítulos (json3, srv3, vtt) al formato estándar
|
|
"""
|
|
try:
|
|
if format_type == 'json3':
|
|
# Formato JSON3 de YouTube
|
|
data = json.loads(content) if isinstance(content, str) else content
|
|
return clean_youtube_json(data)
|
|
|
|
elif format_type in ['srv3', 'vtt']:
|
|
# Para srv3 y vtt, intentar parsear como JSON primero
|
|
try:
|
|
data = json.loads(content) if isinstance(content, str) else content
|
|
# srv3 también tiene estructura similar a json3
|
|
if 'events' in data:
|
|
return clean_youtube_json(data)
|
|
except:
|
|
pass
|
|
|
|
# Si no es JSON, intentar parsear como texto VTT
|
|
clean_data = []
|
|
lines = content.split('\n') if isinstance(content, str) else []
|
|
|
|
current_time = 0.0
|
|
current_text = ""
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line or line.startswith('WEBVTT') or '-->' in line:
|
|
if '-->' in line:
|
|
# Extraer tiempo de inicio
|
|
try:
|
|
time_parts = line.split('-->')[0].strip().split(':')
|
|
if len(time_parts) >= 2:
|
|
current_time = float(time_parts[-2]) * 60 + float(time_parts[-1])
|
|
except:
|
|
pass
|
|
continue
|
|
|
|
if line and not line.isdigit():
|
|
current_text = line
|
|
if current_text:
|
|
clean_data.append({
|
|
"start": current_time,
|
|
"duration": 2.0, # Duración aproximada
|
|
"text": current_text
|
|
})
|
|
current_time += 2.0
|
|
|
|
return clean_data if clean_data else []
|
|
|
|
else:
|
|
# Formato desconocido, intentar como JSON
|
|
data = json.loads(content) if isinstance(content, str) else content
|
|
if 'events' in data:
|
|
return clean_youtube_json(data)
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"Error parsing subtitle format {format_type}: {e}")
|
|
return []
|
|
|
|
def extract_video_id(video_id_or_url: str) -> str:
|
|
"""
|
|
Normaliza la entrada y extrae el video_id si se recibe una URL completa.
|
|
Acepta: https://www.youtube.com/watch?v=ID, youtu.be/ID, o el propio ID.
|
|
"""
|
|
if not video_id_or_url:
|
|
return ""
|
|
s = video_id_or_url.strip()
|
|
# Si ya parece un id (11-20 caracteres alfanuméricos y -, _), retornarlo
|
|
if re.match(r'^[A-Za-z0-9_-]{8,20}$', s):
|
|
return s
|
|
|
|
# Intentar extraer de URL completa
|
|
# watch?v=
|
|
m = re.search(r'[?&]v=([A-Za-z0-9_-]{8,20})', s)
|
|
if m:
|
|
return m.group(1)
|
|
# youtu.be/
|
|
m = re.search(r'youtu\.be/([A-Za-z0-9_-]{8,20})', s)
|
|
if m:
|
|
return m.group(1)
|
|
# /v/ or /embed/
|
|
m = re.search(r'(?:/v/|/embed/)([A-Za-z0-9_-]{8,20})', s)
|
|
if m:
|
|
return m.group(1)
|
|
|
|
# Si no se detecta, devolver la entrada original (fallará después si es inválida)
|
|
return s
|
|
|
|
|
|
def format_segments_text(segments: List[Dict]) -> List[str]:
|
|
"""Devuelve una lista 'format_text' con textos limpios extraídos de segments.
|
|
|
|
- elimina prefijos tipo 'Kind: captions'
|
|
- elimina contenido en corchetes/paréntesis
|
|
- elimina etiquetas HTML
|
|
- normaliza espacios
|
|
- divide por saltos de línea para obtener frases independientes
|
|
"""
|
|
def _clean_text(t: str) -> str:
|
|
if not t:
|
|
return ''
|
|
s = str(t).strip()
|
|
s = re.sub(r'^\s*Kind\s*:\s*.*$', '', s, flags=re.IGNORECASE).strip()
|
|
# eliminar contenido entre corchetes (no-greedy)
|
|
s = re.sub(r'\[[^\]]*\]', '', s)
|
|
s = re.sub(r'\([^\)]*\)', '', s)
|
|
s = re.sub(r'<[^>]+>', '', s)
|
|
s = re.sub(r'[♪★■◆►▶◀•–—]', '', s)
|
|
s = re.sub(r'\s+', ' ', s).strip()
|
|
return s
|
|
|
|
output: List[str] = []
|
|
for seg in segments or []:
|
|
raw = seg.get('text', '')
|
|
cleaned = _clean_text(raw)
|
|
if not cleaned:
|
|
continue
|
|
parts = [p.strip() for p in re.split(r'[\n\r]+', cleaned) if p.strip()]
|
|
output.extend(parts)
|
|
return output
|
|
|
|
|
|
NODE_PATH = "/usr/bin/node"
|
|
|
|
def _yt_client_args(has_cookies: bool, for_stream: bool = False) -> list:
|
|
"""Devuelve --extractor-args y --js-runtimes para metadata/streams.
|
|
|
|
Estrategia actualizada 2026-03-07:
|
|
- android → REQUIERE GVS PO Token desde 2026 → formatos HTTPS omitidos → HTTP 403.
|
|
YA NO SE USA para metadata ni streams.
|
|
- Sin cookies → tv_embedded (sin PO Token, sin n-challenge, funciona para metadata)
|
|
- Con cookies → web + Node.js (Node resuelve n-challenge/signature)
|
|
- for_stream → tv_embedded (más fiable para HLS/lives sin cookies)
|
|
|
|
Diagnóstico:
|
|
- android → requiere GVS PO Token (2026) → NO usar
|
|
- mweb → requiere Visitor Data PO Token → NO usar sin cookies
|
|
- tv_embedded → sin PO Token requerido → ✅ funciona para metadata/stream
|
|
- web + Node.js → ✅ funciona con cookies
|
|
"""
|
|
if for_stream or not has_cookies:
|
|
return ["--extractor-args", "youtube:player_client=tv_embedded"]
|
|
else:
|
|
return [
|
|
"--extractor-args", "youtube:player_client=web",
|
|
"--js-runtimes", f"node:{NODE_PATH}",
|
|
]
|
|
|
|
|
|
def _yt_subs_args(has_cookies: bool) -> list:
|
|
"""Devuelve --extractor-args para descarga de subtítulos.
|
|
|
|
Estrategia actualizada 2026-03-07:
|
|
- android → requiere GVS PO Token desde 2026 → subtítulos HTTP 403 → NO usar.
|
|
- tv_embedded → sin PO Token, obtiene auto-subs sin bot-check → ✅ preferido.
|
|
- mweb → fallback útil si tv_embedded no trae subs en ciertos idiomas.
|
|
- web + Node → sólo con cookies (resuelve n-challenge).
|
|
"""
|
|
if has_cookies:
|
|
return [
|
|
"--extractor-args", "youtube:player_client=web",
|
|
"--js-runtimes", f"node:{NODE_PATH}",
|
|
]
|
|
return ["--extractor-args", "youtube:player_client=tv_embedded,mweb"]
|
|
|
|
|
|
|
|
# Nuevo helper: obtener thumbnails para un video — usa URLs estáticas directas (sin yt-dlp)
|
|
def get_video_thumbnails(video_id: str) -> List[str]:
|
|
"""Devuelve URLs de thumbnail sin llamar yt-dlp (rápido, sin bloquear el transcript).
|
|
YouTube siempre tiene estas URLs disponibles para cualquier video público.
|
|
"""
|
|
return [
|
|
f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg",
|
|
f"https://img.youtube.com/vi/{video_id}/sddefault.jpg",
|
|
f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg",
|
|
f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg",
|
|
f"https://img.youtube.com/vi/{video_id}/default.jpg",
|
|
]
|
|
|
|
def get_transcript_data(video_id: str, lang: str = "es"):
|
|
video_id = extract_video_id(video_id)
|
|
if not video_id:
|
|
return None, [], "video_id inválido o vacío"
|
|
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
# Use CookieManager to get a cookiefile path per request (may be None)
|
|
cookie_mgr = CookieManager()
|
|
cookiefile_path = cookie_mgr.get_cookiefile_path()
|
|
|
|
# cookies_path: prefer the temporary cookiefile if present, otherwise fall back to env path
|
|
cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
|
|
# proxy support
|
|
proxy = _get_proxy_choice()
|
|
proxies = {'http': proxy, 'https': proxy} if proxy else None
|
|
|
|
def load_cookies_from_file(path: str) -> dict:
|
|
"""Parsea un cookies.txt en formato Netscape a un dict usable por requests."""
|
|
cookies = {}
|
|
try:
|
|
if not path or not os.path.exists(path):
|
|
return cookies
|
|
with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
parts = line.split('\t')
|
|
# formato Netscape: domain, flag, path, secure, expiration, name, value
|
|
if len(parts) >= 7:
|
|
name = parts[5].strip()
|
|
value = parts[6].strip()
|
|
if name:
|
|
cookies[name] = value
|
|
else:
|
|
# fallback: intento simple name=value
|
|
if '=' in line:
|
|
k, v = line.split('=', 1)
|
|
cookies[k.strip()] = v.strip()
|
|
except Exception:
|
|
return {}
|
|
return cookies
|
|
|
|
cookies_for_requests = load_cookies_from_file(cookies_path) if cookies_path else {}
|
|
_has_ck_subs = bool(cookies_path and os.path.exists(cookies_path))
|
|
|
|
# Intento rápido y fiable: usar yt-dlp para descargar subtítulos (auto o manual) al tmpdir
|
|
try:
|
|
with tempfile.TemporaryDirectory() as tmpdl:
|
|
# Construir lista amplia de variantes de idioma
|
|
# yt-dlp usa códigos exactos; cubrimos las variantes más comunes
|
|
sub_langs = [lang]
|
|
if lang == "en":
|
|
sub_langs = ["en", "en-US", "en-en", "en-GB", "en-CA", "en-AU"]
|
|
elif lang == "es":
|
|
sub_langs = ["es", "es-419", "es-MX", "es-ES", "es-LA", "es-en"]
|
|
elif len(lang) == 2:
|
|
sub_langs = [lang, f"{lang}-{lang.upper()}", f"{lang}-419", f"{lang}-en"]
|
|
|
|
# tv_embedded/mweb para subtítulos sin cookies (no requieren PO Token)
|
|
# web + Node.js cuando hay cookies (resuelve n-challenge)
|
|
ytdlp_cmd = [
|
|
"yt-dlp",
|
|
url,
|
|
"--skip-download",
|
|
"--write-auto-sub",
|
|
"--write-sub",
|
|
"--sub-format", "vtt/json3/srv3/best",
|
|
"-o", os.path.join(tmpdl, "%(id)s.%(ext)s"),
|
|
"--no-warnings",
|
|
"--sub-lang", ",".join(sub_langs),
|
|
] + _yt_subs_args(_has_ck_subs)
|
|
# Pasar cookies solo cuando se usa cliente web (con cookies)
|
|
if _has_ck_subs:
|
|
ytdlp_cmd.extend(["--cookies", cookies_path])
|
|
|
|
# attach proxy if configured
|
|
if proxy:
|
|
ytdlp_cmd.extend(['--proxy', proxy])
|
|
|
|
try:
|
|
result = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=120)
|
|
stderr = (result.stderr or "").lower()
|
|
# Error: YouTube pide autenticación
|
|
if result.returncode != 0 and ('sign in' in stderr or 'confirm you' in stderr or 'bot' in stderr):
|
|
return None, get_video_thumbnails(video_id), "YouTube requiere autenticación para este video. Sube un cookies.txt válido con /upload_cookies."
|
|
# Si yt-dlp falló por rate limiting, devolver mensaje claro
|
|
stderr = (result.stderr or "").lower()
|
|
if result.returncode != 0 and ('sign in' in stderr or 'confirm you' in stderr or 'bot' in stderr):
|
|
return None, get_video_thumbnails(video_id), "YouTube requiere autenticación para este video. Sube un cookies.txt válido con /upload_cookies."
|
|
if result.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr):
|
|
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega un cookies.txt válido exportado desde tu navegador y monta en el contenedor, o espera unos minutos."
|
|
if result.returncode != 0 and ('http error 403' in stderr or 'forbidden' in stderr):
|
|
return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). El video puede tener restricciones. Usa cookies.txt con una cuenta autorizada."
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
|
|
# revisar archivos creados — yt-dlp genera nombres con doble extensión: ID.lang.vtt
|
|
# glob "ID.*" no hace match; usar "ID*" para cubrir ID.en.vtt, ID.en-en.vtt, etc.
|
|
files = glob.glob(os.path.join(tmpdl, f"{video_id}*"))
|
|
# filtrar solo archivos de texto (vtt, json3, srv3, ttml, srt)
|
|
files = [f for f in files if os.path.isfile(f) and
|
|
any(f.endswith(ext) for ext in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))]
|
|
if files:
|
|
combined = []
|
|
seen_content = set()
|
|
for fpath in files:
|
|
try:
|
|
with open(fpath, 'r', encoding='utf-8') as fh:
|
|
content = fh.read()
|
|
# desduplicar archivos con mismo contenido (en.vtt vs en-en.vtt)
|
|
content_hash = hash(content[:500])
|
|
if content_hash not in seen_content:
|
|
seen_content.add(content_hash)
|
|
combined.append(content)
|
|
except Exception:
|
|
continue
|
|
if combined:
|
|
vtt_combined = "\n".join(combined)
|
|
parsed = parse_subtitle_format(vtt_combined, 'vtt')
|
|
# filtrar segmentos de ruido del header VTT
|
|
_noise = {'kind: captions', 'language:', 'webvtt', 'position:', 'align:'}
|
|
parsed = [s for s in parsed if s.get('text') and
|
|
not any(s['text'].lower().startswith(n) for n in _noise)]
|
|
if parsed:
|
|
return parsed, get_video_thumbnails(video_id), None
|
|
finally:
|
|
# cleanup any temp cookiefile created for this request
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
# ...existing code continues...
|
|
|
|
# 1) Intento principal: obtener metadata con yt-dlp
|
|
_has_ck = os.path.exists(cookies_path)
|
|
command = [
|
|
"yt-dlp",
|
|
"--skip-download",
|
|
"--dump-json",
|
|
"--no-warnings",
|
|
] + _yt_client_args(_has_ck) + [url]
|
|
|
|
if _has_ck:
|
|
command.extend(["--cookies", cookies_path])
|
|
if proxy:
|
|
command.extend(['--proxy', proxy])
|
|
|
|
try:
|
|
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
|
|
|
|
if result.returncode != 0:
|
|
error_msg = result.stderr if result.stderr else "Error desconocido from yt-dlp"
|
|
# Si yt-dlp reporta algo, enviar mensaje útil
|
|
# No abortar inmediatamente: intentaremos fallback descargando subs con yt-dlp
|
|
video_metadata = None
|
|
else:
|
|
if not result.stdout.strip():
|
|
video_metadata = None
|
|
else:
|
|
try:
|
|
video_metadata = json.loads(result.stdout)
|
|
except Exception:
|
|
video_metadata = None
|
|
except subprocess.TimeoutExpired:
|
|
video_metadata = None
|
|
except FileNotFoundError:
|
|
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar."
|
|
except Exception as e:
|
|
video_metadata = None
|
|
|
|
requested_subs = {}
|
|
if video_metadata:
|
|
requested_subs = video_metadata.get('requested_subtitles', {}) or {}
|
|
|
|
# Buscar en automatic_captions y subtitles si requested_subs está vacío
|
|
if not requested_subs:
|
|
automatic_captions = video_metadata.get('automatic_captions', {}) or {}
|
|
for lang_key, formats in automatic_captions.items():
|
|
if lang in lang_key or lang_key.startswith(lang):
|
|
if formats:
|
|
requested_subs = {lang_key: formats[0]}
|
|
break
|
|
|
|
if not requested_subs:
|
|
subtitles = video_metadata.get('subtitles', {}) or {}
|
|
for lang_key, formats in subtitles.items():
|
|
if lang in lang_key or lang_key.startswith(lang):
|
|
if formats:
|
|
requested_subs = {lang_key: formats[0]}
|
|
break
|
|
|
|
# Si requested_subs está disponible, intentar descargar vía requests la URL proporcionada
|
|
if requested_subs:
|
|
lang_key = next(iter(requested_subs))
|
|
sub_url = requested_subs[lang_key].get('url')
|
|
|
|
if sub_url:
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'application/json, text/plain, */*',
|
|
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
|
|
'Referer': 'https://www.youtube.com/',
|
|
}
|
|
|
|
max_retries = 3
|
|
response = None
|
|
rate_limited = False
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = requests.get(sub_url, headers=headers, timeout=30, cookies=cookies_for_requests, proxies=proxies)
|
|
if response.status_code == 200:
|
|
break
|
|
elif response.status_code == 429:
|
|
rate_limited = True
|
|
if attempt < max_retries - 1:
|
|
time.sleep(2 * (attempt + 1))
|
|
continue
|
|
else:
|
|
# salir del loop y usar fallback con yt-dlp más abajo
|
|
break
|
|
elif response.status_code == 403:
|
|
return None, get_video_thumbnails(video_id), "Acceso denegado (HTTP 403). El video puede tener restricciones de edad o región. Intenta con cookies.txt."
|
|
elif response.status_code == 404:
|
|
# No encontramos la URL esperada; intentar fallback
|
|
response = None
|
|
break
|
|
else:
|
|
return None, get_video_thumbnails(video_id), f"Error al descargar subtítulos desde YouTube (HTTP {response.status_code})."
|
|
except requests.exceptions.Timeout:
|
|
if attempt < max_retries - 1:
|
|
continue
|
|
return None, get_video_thumbnails(video_id), "Timeout al descargar subtítulos. Intenta nuevamente."
|
|
except requests.exceptions.RequestException as e:
|
|
return None, get_video_thumbnails(video_id), f"Error de conexión al descargar subtítulos: {str(e)[:100]}"
|
|
|
|
# Si obtuvimos un 200, procesarlo; si hubo rate limiting, intentar fallback con yt-dlp
|
|
if response and response.status_code == 200:
|
|
subtitle_format = requested_subs[lang_key].get('ext', 'json3')
|
|
try:
|
|
# Si la respuesta parece ser una playlist M3U8 o contiene enlaces a timedtext,
|
|
# extraer las URLs y concatenar su contenido (VTT) antes de parsear.
|
|
text_body = response.text if isinstance(response.text, str) else None
|
|
|
|
if text_body and ('#EXTM3U' in text_body or 'timedtext' in text_body or text_body.strip().lower().startswith('#extm3u')):
|
|
# Extraer URLs (líneas que empiecen con http)
|
|
urls = re.findall(r'^(https?://\S+)', text_body, flags=re.M)
|
|
|
|
# Intento 1: descargar cada URL con requests (usa cookies montadas si aplican)
|
|
combined = []
|
|
for idx, u in enumerate(urls):
|
|
try:
|
|
r2 = requests.get(u, headers=headers, timeout=20, cookies=cookies_for_requests, proxies=proxies)
|
|
if r2.status_code == 200 and r2.text:
|
|
combined.append(r2.text)
|
|
continue
|
|
# Si recibimos 429, 403, o falló, intentaremos con yt-dlp (fallback)
|
|
if r2.status_code == 429:
|
|
# fallback a yt-dlp
|
|
raise Exception('rate_limited')
|
|
except Exception:
|
|
# fallthrough al fallback con yt-dlp
|
|
pass
|
|
|
|
# Intento 2 (fallback): usar yt-dlp para descargar ese timedtext/url a un archivo temporal
|
|
try:
|
|
with tempfile.TemporaryDirectory() as tdir:
|
|
out_template = os.path.join(tdir, f"timedtext_{idx}.%(ext)s")
|
|
ytdlp_cmd = [
|
|
"yt-dlp",
|
|
u,
|
|
"-o", out_template,
|
|
"--no-warnings",
|
|
]
|
|
if os.path.exists(cookies_path):
|
|
ytdlp_cmd.extend(["--cookies", cookies_path])
|
|
|
|
# pasar proxy a yt-dlp si está configurado
|
|
if proxy:
|
|
ytdlp_cmd.extend(['--proxy', proxy])
|
|
try:
|
|
res2 = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=60)
|
|
stderr2 = (res2.stderr or "").lower()
|
|
if res2.returncode != 0 and ('http error 429' in stderr2 or 'too many requests' in stderr2):
|
|
# rate limit cuando intentamos descargar timedtext
|
|
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde."
|
|
if res2.returncode != 0 and ('http error 403' in stderr2 or 'forbidden' in stderr2):
|
|
return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). Intenta con cookies.txt o una cuenta con permisos."
|
|
except Exception:
|
|
pass
|
|
|
|
# leer cualquier archivo creado en el tempdir
|
|
for fpath in glob.glob(os.path.join(tdir, "timedtext_*.*")):
|
|
try:
|
|
with open(fpath, 'r', encoding='utf-8') as fh:
|
|
txt = fh.read()
|
|
if txt:
|
|
combined.append(txt)
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
if combined:
|
|
vtt_combined = "\n".join(combined)
|
|
formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt')
|
|
if formatted_transcript:
|
|
return formatted_transcript, get_video_thumbnails(video_id)
|
|
except Exception as e:
|
|
return None, get_video_thumbnails(video_id), f"Error al procesar los subtítulos: {str(e)[:200]}"
|
|
|
|
if not formatted_transcript:
|
|
return None, get_video_thumbnails(video_id), "Los subtítulos están vacíos o no se pudieron procesar."
|
|
|
|
return formatted_transcript, get_video_thumbnails(video_id), None
|
|
# Si hubo rate limiting, intentar fallback con yt-dlp para descargar la URL de subtítulos
|
|
if rate_limited and (not response or response.status_code != 200):
|
|
# Intentar descargar la URL de subtítulos directamente con yt-dlp (usa cookies si existen)
|
|
try:
|
|
with tempfile.TemporaryDirectory() as tdir:
|
|
out_template = os.path.join(tdir, "sub.%(ext)s")
|
|
ytdlp_cmd = [
|
|
"yt-dlp",
|
|
sub_url,
|
|
"-o", out_template,
|
|
"--no-warnings",
|
|
]
|
|
if os.path.exists(cookies_path):
|
|
ytdlp_cmd.extend(["--cookies", cookies_path])
|
|
|
|
if proxy:
|
|
ytdlp_cmd.extend(['--proxy', proxy])
|
|
res = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=90)
|
|
stderr = (res.stderr or "").lower()
|
|
if res.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr):
|
|
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde."
|
|
# Leer archivos generados
|
|
combined = []
|
|
for fpath in glob.glob(os.path.join(tdir, "*.*")):
|
|
try:
|
|
with open(fpath, 'r', encoding='utf-8') as fh:
|
|
txt = fh.read()
|
|
if txt:
|
|
combined.append(txt)
|
|
except Exception:
|
|
continue
|
|
if combined:
|
|
vtt_combined = "\n".join(combined)
|
|
formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt')
|
|
if formatted_transcript:
|
|
return formatted_transcript, get_video_thumbnails(video_id)
|
|
except FileNotFoundError:
|
|
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar."
|
|
except Exception:
|
|
# seguir con otros fallbacks
|
|
pass
|
|
|
|
# si no logró con yt-dlp, continuar y dejar que los fallbacks posteriores manejen el caso
|
|
|
|
|
|
# Fallback: intentarlo descargando subtítulos con yt-dlp a un directorio temporal
|
|
# (esto cubre casos en que la metadata no incluye requested_subs)
|
|
try:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Intentar con auto-sub primero, luego con sub (manual)
|
|
ytdlp_variants = [
|
|
("--write-auto-sub", "auto"),
|
|
("--write-sub", "manual")
|
|
]
|
|
|
|
downloaded = None
|
|
for flag, label in ytdlp_variants:
|
|
cmd = [
|
|
"yt-dlp",
|
|
url,
|
|
"--skip-download",
|
|
flag,
|
|
"--sub-lang", lang,
|
|
"--sub-format", "json3/vtt/srv3/best",
|
|
"-o", os.path.join(tmpdir, "%(id)s.%(ext)s"),
|
|
"--no-warnings",
|
|
] + _yt_subs_args(_has_ck_subs)
|
|
# Pasar cookies sólo con cliente web
|
|
if _has_ck_subs:
|
|
cmd.extend(["--cookies", cookies_path])
|
|
|
|
# añadir proxy a la llamada de yt-dlp si está configurado
|
|
if proxy:
|
|
cmd.extend(['--proxy', proxy])
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
|
|
|
# Revisar si se creó algún archivo en tmpdir (doble ext: ID.en.vtt)
|
|
files = glob.glob(os.path.join(tmpdir, f"{video_id}*"))
|
|
files = [f for f in files if os.path.isfile(f) and
|
|
any(f.endswith(e) for e in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))]
|
|
if files:
|
|
# Tomar el primero válido
|
|
downloaded = files[0]
|
|
break
|
|
|
|
if downloaded:
|
|
ext = os.path.splitext(downloaded)[1].lstrip('.')
|
|
try:
|
|
with open(downloaded, 'r', encoding='utf-8') as fh:
|
|
content = fh.read()
|
|
except Exception as e:
|
|
return None, get_video_thumbnails(video_id), f"Error leyendo archivo de subtítulos descargado: {str(e)[:200]}"
|
|
|
|
# Intentar parsear según extensión conocida
|
|
fmt = 'json3' if ext in ('json', 'json3') else ('vtt' if ext == 'vtt' else 'srv3')
|
|
formatted_transcript = parse_subtitle_format(content, fmt)
|
|
if formatted_transcript:
|
|
return formatted_transcript, get_video_thumbnails(video_id), None
|
|
else:
|
|
return None, get_video_thumbnails(video_id), "Se descargaron subtítulos pero no se pudieron procesar."
|
|
except FileNotFoundError:
|
|
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado. Instala yt-dlp en el contenedor/entorno y vuelve a intentar."
|
|
except Exception as e:
|
|
# No hacer crash, retornar mensaje general
|
|
return None, get_video_thumbnails(video_id), f"Error al intentar descargar subtítulos con yt-dlp: {str(e)[:200]}"
|
|
|
|
return None, get_video_thumbnails(video_id), (
|
|
f"No se encontraron subtítulos para este video en idioma '{lang}'. "
|
|
"Puede que el video no tenga subtítulos, estén en otro idioma, o requiera autenticación. "
|
|
"Prueba: ?lang=en | /debug/fetch_subs/{video_id} | sube cookies con /upload_cookies"
|
|
)
|
|
|
|
# ── Clientes Innertube (sincronizados con NewPipeExtractor + yt-dlp 2026-03) ──
|
|
_NP_IOS = {
|
|
"clientName": "IOS", "clientVersion": "21.03.2",
|
|
"clientScreen": "WATCH", "platform": "MOBILE",
|
|
"deviceMake": "Apple", "deviceModel": "iPhone16,2",
|
|
"osName": "iOS", "osVersion": "18.7.2.22H124",
|
|
"userAgent": "com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X;)",
|
|
}
|
|
_NP_ANDROID = {
|
|
"clientName": "ANDROID", "clientVersion": "21.03.36",
|
|
"clientScreen": "WATCH", "platform": "MOBILE",
|
|
"osName": "Android", "osVersion": "16", "androidSdkVersion": 36,
|
|
"userAgent": "com.google.android.youtube/21.03.36 (Linux; U; Android 16) gzip",
|
|
}
|
|
# tv_embedded: NO requiere PO Token, siempre devuelve videoDetails + hlsManifestUrl en lives
|
|
# Es el cliente más fiable para obtener title/description sin autenticación.
|
|
_NP_TV_EMBEDDED = {
|
|
"clientName": "TVHTML5_SIMPLY_EMBEDDED_PLAYER",
|
|
"clientVersion": "2.0",
|
|
"clientScreen": "EMBED",
|
|
"platform": "TV",
|
|
"userAgent": "Mozilla/5.0 (SMART-TV; LINUX; Tizen 6.0) AppleWebKit/538.1 (KHTML, like Gecko) Version/6.0 TV Safari/538.1",
|
|
}
|
|
# GAPIS: youtubei.googleapis.com — usado por NewPipe para iOS/Android/TV
|
|
_GAPIS_BASE = "https://youtubei.googleapis.com/youtubei/v1"
|
|
|
|
|
|
def _np_build_ctx(client: dict, visitor_data: str = "") -> dict:
|
|
"""context.client igual que prepareJsonBuilder de YoutubeParsingHelper.java."""
|
|
ctx = {
|
|
"clientName": client["clientName"],
|
|
"clientVersion": client["clientVersion"],
|
|
"clientScreen": client.get("clientScreen", "WATCH"),
|
|
"platform": client.get("platform", "MOBILE"),
|
|
"hl": "en", "gl": "US", "utcOffsetMinutes": 0,
|
|
}
|
|
if visitor_data:
|
|
ctx["visitorData"] = visitor_data
|
|
for k in ("deviceMake", "deviceModel", "osName", "osVersion", "androidSdkVersion"):
|
|
if client.get(k):
|
|
ctx[k] = client[k]
|
|
return ctx
|
|
|
|
|
|
def _np_get_visitor_data(client: dict, proxies: dict = None) -> str:
|
|
"""POST /visitor_id → responseContext.visitorData (getVisitorDataFromInnertube)."""
|
|
try:
|
|
ctx = _np_build_ctx(client)
|
|
payload = {
|
|
"context": {
|
|
"client": ctx,
|
|
"request": {"internalExperimentFlags": [], "useSsl": True},
|
|
"user": {"lockedSafetyMode": False},
|
|
}
|
|
}
|
|
headers = {
|
|
"User-Agent": client["userAgent"],
|
|
"X-Goog-Api-Format-Version": "2",
|
|
"Content-Type": "application/json",
|
|
}
|
|
r = requests.post(
|
|
f"{_GAPIS_BASE}/visitor_id?prettyPrint=false",
|
|
json=payload, headers=headers, timeout=8, proxies=proxies,
|
|
)
|
|
if r.status_code == 200:
|
|
return r.json().get("responseContext", {}).get("visitorData", "")
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def _np_call_player(video_id: str, client: dict,
|
|
visitor_data: str = "", proxies: dict = None) -> dict:
|
|
"""POST /player igual que getIosPlayerResponse/getAndroidPlayerResponse de NewPipe."""
|
|
import string as _str
|
|
n = int(time.time())
|
|
chars = _str.digits + _str.ascii_lowercase
|
|
t = ""
|
|
while n:
|
|
t = chars[n % 36] + t
|
|
n //= 36
|
|
url = f"{_GAPIS_BASE}/player?prettyPrint=false&t={t or '0'}&id={video_id}"
|
|
ctx = _np_build_ctx(client, visitor_data)
|
|
payload = {
|
|
"context": {
|
|
"client": ctx,
|
|
"request": {"internalExperimentFlags": [], "useSsl": True},
|
|
"user": {"lockedSafetyMode": False},
|
|
},
|
|
"videoId": video_id,
|
|
"contentCheckOk": True,
|
|
"racyCheckOk": True,
|
|
}
|
|
headers = {
|
|
"User-Agent": client["userAgent"],
|
|
"X-Goog-Api-Format-Version": "2",
|
|
"Content-Type": "application/json",
|
|
}
|
|
try:
|
|
r = requests.post(url, json=payload, headers=headers, timeout=15, proxies=proxies)
|
|
if r.status_code == 200:
|
|
return r.json()
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def innertube_get_stream(video_id: str, proxy: str = None) -> dict:
|
|
"""
|
|
Obtiene URL de stream replicando NewPipeExtractor + fallback tv_embedded.
|
|
|
|
Orden de intentos:
|
|
1. iOS → hlsManifestUrl (prioritario para lives, trae videoDetails)
|
|
2. Android → formats directas + videoDetails
|
|
3. tv_embedded → sin PO Token, siempre trae videoDetails y hlsManifestUrl en lives
|
|
|
|
Sin cookies | Sin firma JS | Sin bot-check desde servidores
|
|
"""
|
|
result = {
|
|
"title": None, "description": None,
|
|
"is_live": False, "hls_url": None,
|
|
"formats": [], "error": None,
|
|
}
|
|
proxies = {"http": proxy, "https": proxy} if proxy else None
|
|
|
|
vd_ios = _np_get_visitor_data(_NP_IOS, proxies)
|
|
vd_android = _np_get_visitor_data(_NP_ANDROID, proxies)
|
|
|
|
# ── iOS — preferido para hlsManifestUrl en lives ──────────────────────────
|
|
ios = _np_call_player(video_id, _NP_IOS, vd_ios, proxies)
|
|
ps = ios.get("playabilityStatus") or {}
|
|
if ps.get("status") == "LOGIN_REQUIRED":
|
|
result["error"] = f"Login requerido: {ps.get('reason','')}"
|
|
return result
|
|
|
|
vd_meta = ios.get("videoDetails") or {}
|
|
result["title"] = vd_meta.get("title") or None
|
|
result["description"] = vd_meta.get("shortDescription") or None
|
|
result["is_live"] = bool(vd_meta.get("isLive") or vd_meta.get("isLiveContent"))
|
|
|
|
ios_sd = ios.get("streamingData") or {}
|
|
hls = ios_sd.get("hlsManifestUrl")
|
|
if hls:
|
|
result["hls_url"] = hls
|
|
result["formats"] = [
|
|
{"itag": f.get("itag"), "mimeType": f.get("mimeType"), "quality": f.get("quality")}
|
|
for f in (ios_sd.get("formats", []) + ios_sd.get("adaptiveFormats", []))[:8]
|
|
]
|
|
# Intentar completar videoDetails si iOS no los trajo
|
|
if not result["title"]:
|
|
vd_android_resp = _np_call_player(video_id, _NP_ANDROID, vd_android, proxies)
|
|
vd2 = vd_android_resp.get("videoDetails") or {}
|
|
result["title"] = vd2.get("title") or result["title"]
|
|
result["description"] = vd2.get("shortDescription") or result["description"]
|
|
if not result["title"]:
|
|
# último intento: tv_embedded
|
|
tv = _np_call_player(video_id, _NP_TV_EMBEDDED, "", proxies)
|
|
vd3 = tv.get("videoDetails") or {}
|
|
result["title"] = vd3.get("title") or result["title"]
|
|
result["description"] = vd3.get("shortDescription") or result["description"]
|
|
return result
|
|
|
|
# ── Android — para videos normales o si iOS no dio HLS ───────────────────
|
|
android = _np_call_player(video_id, _NP_ANDROID, vd_android, proxies)
|
|
if not result["title"]:
|
|
vd2 = android.get("videoDetails") or {}
|
|
result["title"] = vd2.get("title") or None
|
|
result["description"] = vd2.get("shortDescription") or None
|
|
result["is_live"] = result["is_live"] or bool(
|
|
vd2.get("isLive") or vd2.get("isLiveContent"))
|
|
|
|
android_sd = android.get("streamingData") or {}
|
|
hls = android_sd.get("hlsManifestUrl")
|
|
if hls:
|
|
result["hls_url"] = hls
|
|
if not result["title"]:
|
|
tv = _np_call_player(video_id, _NP_TV_EMBEDDED, "", proxies)
|
|
vd3 = tv.get("videoDetails") or {}
|
|
result["title"] = vd3.get("title") or result["title"]
|
|
result["description"] = vd3.get("shortDescription") or result["description"]
|
|
return result
|
|
|
|
all_fmts = android_sd.get("formats", []) + android_sd.get("adaptiveFormats", [])
|
|
best = sorted([f for f in all_fmts if f.get("url")],
|
|
key=lambda x: x.get("bitrate", 0), reverse=True)
|
|
if best:
|
|
result["hls_url"] = best[0]["url"]
|
|
result["formats"] = [
|
|
{"itag": f.get("itag"), "mimeType": f.get("mimeType"), "quality": f.get("quality")}
|
|
for f in best[:8]
|
|
]
|
|
if not result["title"]:
|
|
tv = _np_call_player(video_id, _NP_TV_EMBEDDED, "", proxies)
|
|
vd3 = tv.get("videoDetails") or {}
|
|
result["title"] = vd3.get("title") or result["title"]
|
|
result["description"] = vd3.get("shortDescription") or result["description"]
|
|
return result
|
|
|
|
# ── tv_embedded — sin PO Token, último recurso para streamingData ─────────
|
|
tv = _np_call_player(video_id, _NP_TV_EMBEDDED, "", proxies)
|
|
vd3 = tv.get("videoDetails") or {}
|
|
if not result["title"]:
|
|
result["title"] = vd3.get("title") or None
|
|
result["description"] = vd3.get("shortDescription") or None
|
|
result["is_live"] = result["is_live"] or bool(
|
|
vd3.get("isLive") or vd3.get("isLiveContent"))
|
|
|
|
tv_sd = tv.get("streamingData") or {}
|
|
hls = tv_sd.get("hlsManifestUrl")
|
|
if hls:
|
|
result["hls_url"] = hls
|
|
return result
|
|
|
|
all_fmts_tv = tv_sd.get("formats", []) + tv_sd.get("adaptiveFormats", [])
|
|
best_tv = sorted([f for f in all_fmts_tv if f.get("url")],
|
|
key=lambda x: x.get("bitrate", 0), reverse=True)
|
|
if best_tv:
|
|
result["hls_url"] = best_tv[0]["url"]
|
|
result["formats"] = [
|
|
{"itag": f.get("itag"), "mimeType": f.get("mimeType"), "quality": f.get("quality")}
|
|
for f in best_tv[:8]
|
|
]
|
|
return result
|
|
|
|
result["error"] = (
|
|
"Innertube no devolvió streamingData (iOS + Android + tv_embedded). "
|
|
"Puede ser DRM, región bloqueada, privado, o YouTube actualizó su API."
|
|
)
|
|
return result
|
|
|
|
|
|
def _fetch_metadata_ytdlp(video_id: str, proxy: str = None) -> dict:
|
|
"""Obtiene title, description, is_live usando yt-dlp.
|
|
|
|
Prueba clientes en orden hasta obtener título:
|
|
1. tv_embedded — sin PO Token, devuelve videoDetails completo
|
|
2. ios — HLS nativo, suele traer title
|
|
3. mweb — fallback adicional
|
|
4. --print title (rápido, último recurso)
|
|
"""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
proxy_args = ["--proxy", proxy] if proxy else []
|
|
|
|
# Intentar con --dump-json para cada cliente
|
|
for client in ("tv_embedded", "ios", "mweb"):
|
|
cmd = [
|
|
"yt-dlp", "--skip-download", "--dump-json", "--no-warnings",
|
|
"--extractor-args", f"youtube:player_client={client}",
|
|
url,
|
|
] + proxy_args
|
|
try:
|
|
res = subprocess.run(cmd, capture_output=True, text=True, timeout=25)
|
|
if res.returncode == 0 and res.stdout.strip():
|
|
d = json.loads(res.stdout.strip())
|
|
title = d.get("title") or d.get("fulltitle")
|
|
if title:
|
|
return {
|
|
"title": title,
|
|
"description": d.get("description") or None,
|
|
"is_live": bool(d.get("is_live") or d.get("was_live")),
|
|
}
|
|
except Exception:
|
|
continue
|
|
|
|
# Último recurso: --print title (muy rápido, sólo el título)
|
|
for client in ("tv_embedded", "ios", "mweb"):
|
|
cmd = [
|
|
"yt-dlp", "--skip-download", "--no-warnings",
|
|
"--print", "%(title)s\n%(is_live)s\n%(description)s",
|
|
"--extractor-args", f"youtube:player_client={client}",
|
|
url,
|
|
] + proxy_args
|
|
try:
|
|
res = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
|
|
if res.returncode == 0 and res.stdout.strip():
|
|
lines = res.stdout.strip().splitlines()
|
|
title = lines[0].strip() if lines else None
|
|
if title and title.lower() not in ("none", "na", ""):
|
|
is_live = lines[1].strip().lower() in ("true", "1") if len(lines) > 1 else False
|
|
desc = "\n".join(lines[2:]).strip() if len(lines) > 2 else None
|
|
return {
|
|
"title": title,
|
|
"description": desc or None,
|
|
"is_live": is_live,
|
|
}
|
|
except Exception:
|
|
continue
|
|
|
|
return {"title": None, "description": None, "is_live": False}
|
|
|
|
|
|
|
|
def get_stream_url(video_id: str):
|
|
"""
|
|
Obtiene la URL de transmisión m3u8/HLS.
|
|
Devuelve: (stream_url, title, description, is_live, error)
|
|
|
|
Estrategia:
|
|
1. innertube_get_stream() — iOS + Android + tv_embedded, sin cookies
|
|
2. Fallback yt-dlp con tv_embedded/ios/web
|
|
3. title/description siempre se completan con _fetch_metadata_ytdlp si faltan
|
|
"""
|
|
video_id = extract_video_id(video_id)
|
|
proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
|
|
|
|
# ── 1. Innertube directo (NewPipe) ────────────────────────────────────────
|
|
it = innertube_get_stream(video_id, proxy=proxy)
|
|
|
|
title = it.get("title")
|
|
description = it.get("description")
|
|
is_live = it.get("is_live", False)
|
|
|
|
if it.get("hls_url"):
|
|
# Completar metadatos con yt-dlp si Innertube no los trajo
|
|
if not title:
|
|
meta = _fetch_metadata_ytdlp(video_id, proxy=proxy)
|
|
title = meta["title"] or title
|
|
description = meta["description"] or description
|
|
is_live = is_live or meta["is_live"]
|
|
return it["hls_url"], title, description, is_live, None
|
|
|
|
# ── 2. Fallback yt-dlp ────────────────────────────────────────────────────
|
|
cookie_mgr = CookieManager()
|
|
cookiefile_path = cookie_mgr.get_cookiefile_path()
|
|
cookies_path_env = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
|
|
effective_cookie = cookiefile_path or (
|
|
cookies_path_env if os.path.exists(cookies_path_env) else None)
|
|
has_ck = bool(effective_cookie)
|
|
yt_url = f"https://www.youtube.com/watch?v={video_id}"
|
|
BOT_MARKERS = ("sign in to confirm", "not a bot", "sign in to")
|
|
|
|
def _is_bot(s: str) -> bool:
|
|
return any(m in s.lower() for m in BOT_MARKERS)
|
|
|
|
def _build_args(client: str) -> list:
|
|
args = ["--no-warnings", "--no-check-certificate", "--no-playlist",
|
|
"--extractor-args", f"youtube:player_client={client}"]
|
|
if client == "web":
|
|
args += ["--js-runtimes", f"node:{NODE_PATH}"]
|
|
if effective_cookie and client == "web":
|
|
args += ["--cookies", effective_cookie]
|
|
if proxy:
|
|
args += ["--proxy", proxy]
|
|
return args
|
|
|
|
def _ytdlp_url(fmt: str, client: str):
|
|
cmd = ["yt-dlp", "-g", "-f", fmt] + _build_args(client) + [yt_url]
|
|
try:
|
|
res = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=90)
|
|
if res.returncode == 0 and res.stdout.strip():
|
|
for line in res.stdout.strip().splitlines():
|
|
line = line.strip()
|
|
if line.startswith("http"):
|
|
return line, False
|
|
return None, _is_bot(res.stderr or "")
|
|
except Exception:
|
|
return None, False
|
|
|
|
# tv_embedded no requiere PO Token; ios da HLS nativo; web+cookies resuelve n-challenge
|
|
clients = ["tv_embedded", "ios"] + (["web"] if has_ck else [])
|
|
fmts = (["91", "92", "93", "94", "95", "96",
|
|
"best[protocol=m3u8_native]", "best[protocol=m3u8]", "best"]
|
|
if is_live else
|
|
["best[ext=m3u8]", "best[protocol=m3u8_native]",
|
|
"best[protocol=m3u8]", "best", "best[ext=mp4]"])
|
|
got_bot = False
|
|
try:
|
|
for client in clients:
|
|
for fmt in fmts:
|
|
u, is_b = _ytdlp_url(fmt, client)
|
|
if u:
|
|
# Completar metadatos si todavía faltan
|
|
if not title:
|
|
meta = _fetch_metadata_ytdlp(video_id, proxy=proxy)
|
|
title = meta["title"] or title
|
|
description = meta["description"] or description
|
|
is_live = is_live or meta["is_live"]
|
|
return u, title, description, is_live, None
|
|
if is_b:
|
|
got_bot = True
|
|
finally:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
|
|
# Último intento de metadatos aunque no haya stream
|
|
if not title:
|
|
meta = _fetch_metadata_ytdlp(video_id, proxy=proxy)
|
|
title = meta["title"] or title
|
|
description = meta["description"] or description
|
|
|
|
if got_bot:
|
|
# Intentar fallback con Playwright usando _attempt_playwright_fallback y devolver m3u8/cookies si encuentra; si falla, devolver mensaje anterior con detalle.
|
|
try:
|
|
pw_m3u8, pw_cookies, pw_err = _attempt_playwright_fallback(video_id)
|
|
if pw_m3u8:
|
|
# si Playwright encontró el m3u8, retornar exitoso
|
|
return pw_m3u8, title, description, is_live, None
|
|
# si Playwright no tuvo éxito, incluir su error en la respuesta
|
|
detail = pw_err or 'YouTube detectó actividad de bot. Sube cookies.txt con /upload_cookies.'
|
|
except Exception as e:
|
|
detail = f'YouTube detectó actividad de bot. Además, Playwright fallback falló: {str(e)[:200]}'
|
|
return None, title, description, is_live, detail
|
|
return None, title, description, is_live, (
|
|
"YouTube detectó actividad de bot. "
|
|
"Sube cookies.txt: curl -X POST http://localhost:8282/upload_cookies -F 'file=@cookies.txt'"
|
|
)
|
|
|
|
|
|
@app.get("/debug/stream/{video_id}")
|
|
def debug_stream(video_id: str):
|
|
"""Diagnóstico completo del endpoint /stream: muestra qué devuelve cada cliente
|
|
Innertube (iOS, Android, tv_embedded) y yt-dlp por separado.
|
|
"""
|
|
video_id = extract_video_id(video_id)
|
|
proxy = _get_proxy_choice()
|
|
proxies = {"http": proxy, "https": proxy} if proxy else None
|
|
|
|
def _call(client_dict, label):
|
|
try:
|
|
vd_data = _np_get_visitor_data(client_dict, proxies)
|
|
resp = _np_call_player(video_id, client_dict, vd_data, proxies)
|
|
ps = resp.get("playabilityStatus") or {}
|
|
vd = resp.get("videoDetails") or {}
|
|
sd = resp.get("streamingData") or {}
|
|
return {
|
|
"client": label,
|
|
"status": ps.get("status"),
|
|
"reason": ps.get("reason", ""),
|
|
"title": vd.get("title"),
|
|
"description_preview": str(vd.get("shortDescription", "") or "")[:120],
|
|
"isLive": vd.get("isLive"),
|
|
"isLiveContent": vd.get("isLiveContent"),
|
|
"hlsManifestUrl": (sd.get("hlsManifestUrl") or "")[:100],
|
|
"formats_count": len(sd.get("formats", [])),
|
|
"adaptiveFormats_count": len(sd.get("adaptiveFormats", [])),
|
|
"streamingData_keys": list(sd.keys()),
|
|
}
|
|
except Exception as e:
|
|
return {"client": label, "error": str(e)}
|
|
|
|
results = [
|
|
_call(_NP_IOS, "iOS"),
|
|
_call(_NP_ANDROID, "Android"),
|
|
_call(_NP_TV_EMBEDDED, "tv_embedded"),
|
|
]
|
|
|
|
# yt-dlp dump-json con tv_embedded
|
|
ytdlp_meta = {}
|
|
try:
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
cmd = ["yt-dlp", "--skip-download", "--dump-json", "--no-warnings",
|
|
"--extractor-args", "youtube:player_client=tv_embedded", url]
|
|
if proxy:
|
|
cmd.extend(["--proxy", proxy])
|
|
res = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
if res.returncode == 0 and res.stdout.strip():
|
|
d = json.loads(res.stdout.strip())
|
|
ytdlp_meta = {
|
|
"title": d.get("title"),
|
|
"description_preview": str(d.get("description") or "")[:120],
|
|
"is_live": d.get("is_live"),
|
|
"was_live": d.get("was_live"),
|
|
}
|
|
else:
|
|
ytdlp_meta = {"error": res.stderr[:500]}
|
|
except Exception as e:
|
|
ytdlp_meta = {"error": str(e)}
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"innertube_clients": results,
|
|
"ytdlp_tv_embedded": ytdlp_meta,
|
|
}
|
|
|
|
|
|
@app.get("/transcript/{video_id}")
|
|
def transcript_endpoint(video_id: str, lang: str = "es"):
|
|
data, thumbnails, error = get_transcript_data(video_id, lang)
|
|
|
|
# Fallback automático a 'en' si no hay subs en el idioma pedido
|
|
if (error and lang != "en" and
|
|
"No se encontraron" in (error or "") and
|
|
"autenticación" not in (error or "")):
|
|
data_en, thumbnails_en, error_en = get_transcript_data(video_id, "en")
|
|
if data_en and not error_en:
|
|
data, thumbnails, error = data_en, thumbnails_en, None
|
|
|
|
if error:
|
|
raise HTTPException(status_code=400, detail=error)
|
|
|
|
# Concatenar texto de segmentos para mostrar como texto plano además de los segmentos
|
|
try:
|
|
combined_text = "\n".join([seg.get('text', '') for seg in data if seg.get('text')])
|
|
except Exception:
|
|
combined_text = ""
|
|
|
|
# Nuevo: arreglo format_text con cada segmento como elemento (texto limpio)
|
|
try:
|
|
format_text_list = format_segments_text(data)
|
|
except Exception:
|
|
format_text_list = []
|
|
|
|
format_text = format_text_list
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"count": len(data),
|
|
"segments": data,
|
|
"text": combined_text,
|
|
"format_text": format_text,
|
|
"thumbnails": thumbnails
|
|
}
|
|
|
|
@app.get('/transcript_vtt/{video_id}')
|
|
def transcript_vtt(video_id: str, lang: str = 'es'):
|
|
"""Descarga (con yt-dlp) y devuelve subtítulos en VTT, además de segmentos parseados y texto concatenado."""
|
|
vtt_text, error = fetch_vtt_subtitles(video_id, lang)
|
|
if error:
|
|
raise HTTPException(status_code=400, detail=error)
|
|
|
|
# parsear VTT a segmentos usando parse_subtitle_format
|
|
segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else []
|
|
|
|
combined_text = '\n'.join([s.get('text','') for s in segments])
|
|
# format_text con texto limpio listo para procesamiento por agentes
|
|
format_text = format_segments_text(segments)
|
|
|
|
thumbnails = get_video_thumbnails(video_id)
|
|
|
|
return {
|
|
'video_id': video_id,
|
|
'vtt': vtt_text,
|
|
'count': len(segments),
|
|
'segments': segments,
|
|
'text': combined_text,
|
|
'format_text': format_text,
|
|
'thumbnails': thumbnails
|
|
}
|
|
|
|
@app.get("/stream/{video_id}")
|
|
def stream_endpoint(video_id: str):
|
|
"""
|
|
Obtiene la URL de transmisión (m3u8/HLS) de un video/live de YouTube.
|
|
|
|
- Para lives en vivo (🔴): devuelve URL HLS directa usable con FFmpeg/VLC.
|
|
- Para videos normales: devuelve la mejor URL de video disponible.
|
|
|
|
Ejemplo FFmpeg:
|
|
ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key
|
|
"""
|
|
stream_url, title, description, is_live, error = get_stream_url(video_id)
|
|
|
|
if error:
|
|
raise HTTPException(status_code=400, detail=error)
|
|
|
|
thumbnails = get_video_thumbnails(video_id)
|
|
url_type = "m3u8/hls" if stream_url and "m3u8" in stream_url.lower() else "direct/mp4"
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"title": title,
|
|
"description": description,
|
|
"is_live": is_live,
|
|
"stream_url": stream_url,
|
|
"url_type": url_type,
|
|
"youtube_url": f"https://www.youtube.com/watch?v={video_id}",
|
|
"ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key',
|
|
"thumbnails": thumbnails,
|
|
"usage": {
|
|
"description": "Usa stream_url con FFmpeg para retransmitir",
|
|
"command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}",
|
|
"platforms": {
|
|
"youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY",
|
|
"facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY",
|
|
"twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY",
|
|
"twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY"
|
|
}
|
|
}
|
|
}
|
|
|
|
@app.post('/upload_cookies')
|
|
async def upload_cookies(file: UploadFile = File(...)):
|
|
"""Endpoint para subir cookies.txt y guardarlo en el servidor en /app/cookies.txt"""
|
|
try:
|
|
content = await file.read()
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail='Archivo vacío')
|
|
# Determinar ruta objetivo a partir de la variable de entorno
|
|
target = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
|
|
target_dir = os.path.dirname(target) or '.'
|
|
# Crear directorio si no existe
|
|
try:
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
except Exception:
|
|
# Si no se puede crear el directorio, intentamos escribir en el working dir como fallback
|
|
target = os.path.basename(target)
|
|
|
|
# Guardar con permisos de escritura
|
|
with open(target, 'wb') as fh:
|
|
fh.write(content)
|
|
|
|
return {"detail": "cookies.txt guardado correctamente", "path": os.path.abspath(target)}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f'Error al guardar cookies: {str(e)[:200]}')
|
|
|
|
|
|
# ── Rutas conocidas de perfiles de navegador en Linux/Mac/Windows ────────────
|
|
_BROWSER_PROFILES = {
|
|
"chrome": [
|
|
# Linux
|
|
os.path.expanduser("~/.config/google-chrome/Default"),
|
|
os.path.expanduser("~/.config/google-chrome/Profile 1"),
|
|
# Montaje desde docker-compose (host path mapeado)
|
|
"/host-chrome/Default",
|
|
"/host-chrome",
|
|
# macOS
|
|
os.path.expanduser("~/Library/Application Support/Google/Chrome/Default"),
|
|
],
|
|
"chromium": [
|
|
os.path.expanduser("~/.config/chromium/Default"),
|
|
"/host-chromium/Default",
|
|
"/host-chromium",
|
|
os.path.expanduser("~/Library/Application Support/Chromium/Default"),
|
|
],
|
|
"brave": [
|
|
os.path.expanduser("~/.config/BraveSoftware/Brave-Browser/Default"),
|
|
"/host-brave/Default",
|
|
"/host-brave",
|
|
os.path.expanduser("~/Library/Application Support/BraveSoftware/Brave-Browser/Default"),
|
|
],
|
|
"firefox": [
|
|
# Firefox usa --cookies-from-browser firefox directamente, yt-dlp detecta el perfil
|
|
os.path.expanduser("~/.mozilla/firefox"),
|
|
"/host-firefox",
|
|
],
|
|
"edge": [
|
|
os.path.expanduser("~/.config/microsoft-edge/Default"),
|
|
"/host-edge/Default",
|
|
],
|
|
}
|
|
|
|
|
|
def _find_browser_profile(browser: str) -> str | None:
|
|
"""Devuelve la primera ruta de perfil existente para el navegador dado."""
|
|
for path in _BROWSER_PROFILES.get(browser, []):
|
|
if os.path.exists(path):
|
|
return path
|
|
return None
|
|
|
|
|
|
def _extract_cookies_from_browser(browser: str, profile_path: str | None,
|
|
target: str, proxy: str | None = None) -> dict:
|
|
"""
|
|
Usa yt-dlp --cookies-from-browser para extraer cookies de YouTube
|
|
del perfil del navegador indicado y guardarlas en target (Netscape format).
|
|
"""
|
|
cmd = [
|
|
"yt-dlp",
|
|
"--cookies-from-browser", browser if not profile_path else f"{browser}:{profile_path}",
|
|
"--cookies", target, # exportar a archivo Netscape
|
|
"--skip-download",
|
|
"--no-warnings",
|
|
"--extractor-args", "youtube:player_client=tv_embedded",
|
|
"https://www.youtube.com/watch?v=dQw4w9WgXcQ", # video corto para forzar extracción
|
|
]
|
|
if proxy:
|
|
cmd.extend(["--proxy", proxy])
|
|
|
|
try:
|
|
res = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
stderr = res.stderr or ""
|
|
stdout = res.stdout or ""
|
|
|
|
# Verificar que el archivo fue creado y no está vacío
|
|
if os.path.exists(target) and os.path.getsize(target) > 100:
|
|
# Contar cookies de youtube.com
|
|
yt_cookies = 0
|
|
with open(target, "r", errors="ignore") as fh:
|
|
for line in fh:
|
|
if ".youtube.com" in line or "youtube.com" in line:
|
|
yt_cookies += 1
|
|
return {
|
|
"success": True,
|
|
"browser": browser,
|
|
"profile_path": profile_path,
|
|
"cookies_file": target,
|
|
"youtube_cookie_lines": yt_cookies,
|
|
"stderr_preview": stderr[:300] if stderr else "",
|
|
}
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"browser": browser,
|
|
"error": "No se generó el archivo de cookies o está vacío",
|
|
"stderr": stderr[:500],
|
|
"stdout": stdout[:200],
|
|
"returncode": res.returncode,
|
|
}
|
|
except subprocess.TimeoutExpired:
|
|
return {"success": False, "browser": browser, "error": "Timeout al extraer cookies (60s)"}
|
|
except FileNotFoundError:
|
|
return {"success": False, "browser": browser, "error": "yt-dlp no encontrado"}
|
|
except Exception as e:
|
|
return {"success": False, "browser": browser, "error": str(e)[:200]}
|
|
|
|
|
|
@app.post("/extract_chrome_cookies")
|
|
def extract_chrome_cookies(browser: str = "chrome", profile_path: str = ""):
|
|
"""
|
|
Extrae cookies de YouTube directamente desde el perfil del navegador instalado
|
|
en el HOST (montado como volumen) y las guarda en /app/data/cookies.txt.
|
|
|
|
Parámetros:
|
|
- browser: chrome | chromium | brave | firefox | edge (default: chrome)
|
|
- profile_path: ruta manual al perfil (opcional, se auto-detecta si está vacío)
|
|
|
|
Requisito en docker-compose.yml (ya incluido):
|
|
volumes:
|
|
- ~/.config/google-chrome:/host-chrome:ro
|
|
|
|
Ejemplo:
|
|
curl -X POST "http://localhost:8282/extract_chrome_cookies?browser=chrome"
|
|
curl -X POST "http://localhost:8282/extract_chrome_cookies?browser=brave"
|
|
"""
|
|
proxy = _get_proxy_choice()
|
|
target = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
|
|
|
|
# Asegurar directorio destino
|
|
target_dir = os.path.dirname(target) or "."
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
|
|
browser = browser.lower().strip()
|
|
valid_browsers = list(_BROWSER_PROFILES.keys())
|
|
if browser not in valid_browsers:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Navegador '{browser}' no soportado. Usa: {', '.join(valid_browsers)}"
|
|
)
|
|
|
|
# Auto-detectar perfil si no se indicó
|
|
resolved_profile = profile_path.strip() or _find_browser_profile(browser)
|
|
|
|
if not resolved_profile and browser != "firefox":
|
|
# Para Firefox yt-dlp lo detecta solo; para el resto necesitamos la ruta
|
|
available = {b: _find_browser_profile(b) for b in valid_browsers}
|
|
found = {b: p for b, p in available.items() if p}
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=(
|
|
f"No se encontró el perfil de '{browser}' en las rutas conocidas. "
|
|
f"Agrega el volumen en docker-compose.yml o pasa profile_path manualmente. "
|
|
f"Perfiles encontrados: {found if found else 'ninguno'}"
|
|
)
|
|
)
|
|
|
|
result = _extract_cookies_from_browser(browser, resolved_profile, target, proxy)
|
|
|
|
if not result["success"]:
|
|
raise HTTPException(status_code=500, detail=result)
|
|
|
|
return {
|
|
"detail": f"Cookies extraídas de {browser} y guardadas en {target}",
|
|
**result,
|
|
"next_step": "Los endpoints /transcript y /stream usarán estas cookies automáticamente.",
|
|
}
|
|
|
|
|
|
@app.get("/cookies/status")
|
|
def cookies_status():
|
|
"""Muestra el estado actual de las cookies configuradas y qué navegadores están disponibles."""
|
|
target = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
|
|
proxy = _get_proxy_choice()
|
|
|
|
# Estado del archivo de cookies actual
|
|
cookies_info = {"path": target, "exists": False, "size_bytes": 0, "youtube_lines": 0}
|
|
if os.path.exists(target):
|
|
cookies_info["exists"] = True
|
|
cookies_info["size_bytes"] = os.path.getsize(target)
|
|
yt_lines = 0
|
|
try:
|
|
with open(target, "r", errors="ignore") as fh:
|
|
for line in fh:
|
|
if "youtube.com" in line and not line.startswith("#"):
|
|
yt_lines += 1
|
|
except Exception:
|
|
pass
|
|
cookies_info["youtube_lines"] = yt_lines
|
|
|
|
# Detectar perfiles de navegador disponibles (en el contenedor / host montado)
|
|
available_browsers = {}
|
|
for browser in _BROWSER_PROFILES:
|
|
path = _find_browser_profile(browser)
|
|
available_browsers[browser] = {
|
|
"found": bool(path),
|
|
"profile_path": path,
|
|
}
|
|
|
|
return {
|
|
"cookies_file": cookies_info,
|
|
"available_browsers": available_browsers,
|
|
"extract_endpoint": "POST /extract_chrome_cookies?browser=chrome",
|
|
"upload_endpoint": "POST /upload_cookies",
|
|
"proxy": proxy or "no configurado",
|
|
"note": (
|
|
"Para usar cookies de Chrome del host, agrega en docker-compose.yml: "
|
|
"volumes: - ~/.config/google-chrome:/host-chrome:ro"
|
|
),
|
|
}
|
|
|
|
|
|
def debug_metadata(video_id: str):
|
|
"""Endpoint de depuración: obtiene --dump-json de yt-dlp para un video.
|
|
Devuelve la metadata (automatic_captions, subtitles, requested_subtitles) para inspección.
|
|
"""
|
|
# try to use dynamic cookiefile per request
|
|
cookie_mgr = CookieManager()
|
|
cookiefile_path = cookie_mgr.get_cookiefile_path()
|
|
cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
|
|
proxy = _get_proxy_choice()
|
|
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
cmd = [
|
|
"yt-dlp",
|
|
"--skip-download",
|
|
"--dump-json",
|
|
"--no-warnings",
|
|
url
|
|
] + _yt_client_args(os.path.exists(cookies_path))
|
|
if os.path.exists(cookies_path):
|
|
cmd.extend(["--cookies", cookies_path])
|
|
if proxy:
|
|
cmd.extend(['--proxy', proxy])
|
|
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
except FileNotFoundError:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=500, detail="yt-dlp no está instalado en el contenedor/entorno.")
|
|
except subprocess.TimeoutExpired:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=504, detail="yt-dlp demoró demasiado en responder.")
|
|
except Exception as e:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=500, detail=str(e)[:300])
|
|
|
|
if proc.returncode != 0:
|
|
stderr = proc.stderr or ''
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=500, detail=f"yt-dlp error: {stderr[:1000]}")
|
|
|
|
try:
|
|
metadata = json.loads(proc.stdout)
|
|
except Exception:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=500, detail="No se pudo parsear la salida JSON de yt-dlp.")
|
|
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
|
|
# Devolver solo las partes útiles para depuración
|
|
debug_info = {
|
|
'id': metadata.get('id'),
|
|
'title': metadata.get('title'),
|
|
'uploader': metadata.get('uploader'),
|
|
'is_live': metadata.get('is_live'),
|
|
'automatic_captions': metadata.get('automatic_captions'),
|
|
'subtitles': metadata.get('subtitles'),
|
|
'requested_subtitles': metadata.get('requested_subtitles'),
|
|
'formats_sample': metadata.get('formats')[:5] if metadata.get('formats') else None,
|
|
}
|
|
return debug_info
|
|
|
|
@app.get('/debug/fetch_subs/{video_id}')
|
|
def debug_fetch_subs(video_id: str, lang: str = 'es'):
|
|
"""Intenta descargar subtítulos con yt-dlp dentro del entorno y devuelve el log y el contenido (parcial) si existe.
|
|
Usa cookies definidas en API_COOKIES_PATH.
|
|
"""
|
|
cookie_mgr = CookieManager()
|
|
cookiefile_path = cookie_mgr.get_cookiefile_path()
|
|
out_dir = tempfile.mkdtemp(prefix='subs_')
|
|
out_template = os.path.join(out_dir, '%(id)s.%(ext)s')
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
cmd = [
|
|
'yt-dlp',
|
|
'--verbose',
|
|
'--skip-download',
|
|
'--write-auto-sub',
|
|
'--write-sub',
|
|
'--sub-lang', lang,
|
|
'--sub-format', 'json3/vtt/srv3/best',
|
|
'--output', out_template,
|
|
url
|
|
] + _yt_subs_args(bool(cookiefile_path))
|
|
if cookiefile_path:
|
|
cmd.extend(['--cookies', cookiefile_path])
|
|
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=240)
|
|
except FileNotFoundError:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=500, detail='yt-dlp no está instalado en el contenedor.')
|
|
except subprocess.TimeoutExpired:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=504, detail='La ejecución de yt-dlp demoró demasiado.')
|
|
except Exception as e:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=500, detail=str(e)[:300])
|
|
|
|
stdout = proc.stdout or ''
|
|
stderr = proc.stderr or ''
|
|
rc = proc.returncode
|
|
|
|
# Buscar archivos generados (yt-dlp usa doble extensión: ID.lang.vtt)
|
|
generated = []
|
|
for f in glob.glob(os.path.join(out_dir, f"{video_id}*")):
|
|
size = None
|
|
try:
|
|
size = os.path.getsize(f)
|
|
# tomar las primeras 200 líneas para no retornar archivos enormes
|
|
with open(f, 'r', encoding='utf-8', errors='ignore') as fh:
|
|
sample = ''.join([next(fh) for _ in range(200)]) if size > 0 else ''
|
|
generated.append({
|
|
'path': f,
|
|
'size': size,
|
|
'sample': sample
|
|
})
|
|
except StopIteration:
|
|
# menos de 200 líneas
|
|
try:
|
|
with open(f, 'r', encoding='utf-8', errors='ignore') as fh:
|
|
sample = fh.read()
|
|
except Exception:
|
|
sample = None
|
|
if size is None:
|
|
try:
|
|
size = os.path.getsize(f)
|
|
except Exception:
|
|
size = 0
|
|
generated.append({'path': f, 'size': size, 'sample': sample})
|
|
except Exception:
|
|
if size is None:
|
|
try:
|
|
size = os.path.getsize(f)
|
|
except Exception:
|
|
size = 0
|
|
generated.append({'path': f, 'size': size, 'sample': None})
|
|
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
'video_id': video_id,
|
|
'rc': rc,
|
|
'stdout_tail': stdout[-2000:],
|
|
'stderr_tail': stderr[-2000:],
|
|
'generated': generated,
|
|
'out_dir': out_dir
|
|
}
|
|
|
|
# Nuevo helper para descargar VTT directamente y retornarlo como texto
|
|
def fetch_vtt_subtitles(video_id: str, lang: str = 'es'):
|
|
"""Descarga subtítulos en formato VTT usando yt-dlp y devuelve su contenido.
|
|
Retorna (vtt_text, None) en caso de éxito o (None, error_message) en caso de error.
|
|
"""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
cookie_mgr = CookieManager()
|
|
cookiefile_path = cookie_mgr.get_cookiefile_path()
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
out_template = os.path.join(tmpdir, '%(id)s.%(ext)s')
|
|
cmd = [
|
|
'yt-dlp',
|
|
'--skip-download',
|
|
'--write-auto-sub',
|
|
'--write-sub',
|
|
'--sub-lang', lang,
|
|
'--sub-format', 'vtt',
|
|
'--output', out_template,
|
|
url
|
|
] + _yt_subs_args(bool(cookiefile_path))
|
|
if cookiefile_path:
|
|
cmd.extend(['--cookies', cookiefile_path])
|
|
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
|
|
except FileNotFoundError:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
return None, 'yt-dlp no está instalado en el entorno.'
|
|
except subprocess.TimeoutExpired:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
return None, 'La descarga de subtítulos tardó demasiado.'
|
|
except Exception as e:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
return None, f'Error ejecutando yt-dlp: {str(e)[:200]}'
|
|
|
|
stderr = (proc.stderr or '').lower()
|
|
if proc.returncode != 0:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
if 'http error 429' in stderr or 'too many requests' in stderr:
|
|
return None, 'YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Revisa cookies.txt o prueba desde otra IP.'
|
|
if 'http error 403' in stderr or 'forbidden' in stderr:
|
|
return None, 'Acceso denegado al descargar subtítulos (HTTP 403). Usa cookies.txt con una cuenta autorizada.'
|
|
return None, f'yt-dlp error: {proc.stderr[:1000]}'
|
|
|
|
# buscar archivos generados (doble extensión: ID.lang.vtt)
|
|
files = glob.glob(os.path.join(tmpdir, f"{video_id}*"))
|
|
files = [f for f in files if os.path.isfile(f) and
|
|
any(f.endswith(e) for e in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))]
|
|
if not files:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
return None, 'No se generaron archivos de subtítulos.'
|
|
|
|
# intentar preferir .vtt
|
|
vtt_path = None
|
|
for f in files:
|
|
if f.lower().endswith('.vtt'):
|
|
vtt_path = f
|
|
break
|
|
if not vtt_path:
|
|
vtt_path = files[0]
|
|
|
|
try:
|
|
with open(vtt_path, 'r', encoding='utf-8', errors='ignore') as fh:
|
|
content = fh.read()
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
return content, None
|
|
except Exception as e:
|
|
try:
|
|
cookie_mgr.cleanup()
|
|
except Exception:
|
|
pass
|
|
return None, f'Error leyendo archivo de subtítulos: {str(e)[:200]}'
|
|
|
|
@app.post('/upload_vtt/{video_id}')
|
|
async def upload_vtt(video_id: str, file: UploadFile = File(...)):
|
|
"""Permite subir un archivo VTT para un video y devuelve segmentos parseados y texto.
|
|
Guarda el archivo en /app/data/{video_id}.vtt (sobrescribe si existe).
|
|
"""
|
|
try:
|
|
content = await file.read()
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail='Archivo vacío')
|
|
|
|
target_dir = os.path.join(os.getcwd(), 'data')
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
target_path = os.path.join(target_dir, f"{video_id}.vtt")
|
|
|
|
with open(target_path, 'wb') as fh:
|
|
fh.write(content)
|
|
|
|
# Leer como texto para parsear
|
|
text = content.decode('utf-8', errors='ignore')
|
|
segments = parse_subtitle_format(text, 'vtt') if text else []
|
|
combined_text = '\n'.join([s.get('text','') for s in segments])
|
|
format_text = format_segments_text(segments)
|
|
|
|
return {
|
|
'video_id': video_id,
|
|
'path': target_path,
|
|
'count': len(segments),
|
|
'segments': segments,
|
|
'text': combined_text,
|
|
'format_text': format_text
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f'Error al guardar/parsear VTT: {str(e)[:200]}')
|
|
|
|
@app.get('/transcript_alt/{video_id}')
|
|
def transcript_alt(video_id: str, lang: str = 'es'):
|
|
"""Intento alternativo de obtener transcript usando youtube-transcript-api (si está disponible).
|
|
Retorna segmentos en el mismo formato que get_transcript_data para mantener consistencia.
|
|
"""
|
|
if not YOUTUBE_TRANSCRIPT_API_AVAILABLE:
|
|
raise HTTPException(status_code=501, detail='youtube-transcript-api no está instalado en el entorno.')
|
|
|
|
vid = extract_video_id(video_id)
|
|
if not vid:
|
|
raise HTTPException(status_code=400, detail='video_id inválido')
|
|
|
|
# preparar idiomas a probar
|
|
langs = [lang]
|
|
if len(lang) == 2:
|
|
langs.append(f"{lang}-419")
|
|
|
|
try:
|
|
# get_transcript puede lanzar excepciones si no hay transcript
|
|
# Usar cast para silenciar el analizador estático que no infiere la comprobación previa
|
|
transcript_api = cast(Any, YouTubeTranscriptApi)
|
|
transcript_list = transcript_api.get_transcript(vid, languages=langs)
|
|
except NoTranscriptFound:
|
|
raise HTTPException(status_code=404, detail='No se encontró transcript con youtube-transcript-api')
|
|
except TranscriptsDisabled:
|
|
raise HTTPException(status_code=403, detail='Los transcripts están deshabilitados para este video')
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f'Error youtube-transcript-api: {str(e)[:300]}')
|
|
|
|
# transcript_list tiene items con keys: text, start, duration
|
|
segments = []
|
|
for item in transcript_list:
|
|
segments.append({
|
|
'start': float(item.get('start', 0)),
|
|
'duration': float(item.get('duration', 0)),
|
|
'text': item.get('text', '').strip()
|
|
})
|
|
|
|
combined_text = '\n'.join([s['text'] for s in segments if s.get('text')])
|
|
format_text = format_segments_text(segments)
|
|
|
|
thumbnails = get_video_thumbnails(vid)
|
|
|
|
return {
|
|
'video_id': vid,
|
|
'count': len(segments),
|
|
'segments': segments,
|
|
'text': combined_text,
|
|
'format_text': format_text,
|
|
'source': 'youtube-transcript-api'
|
|
}
|
|
|
|
@app.get('/playwright/stream/{video_id}')
|
|
def playwright_stream(video_id: str, profile: str = '', headless: bool = True, timeout: int = 60):
|
|
"""Usa Playwright (script tools/playwright_extract_m3u8.py) para abrir el video
|
|
en un navegador real (o con perfil persistente) y extraer las URLs m3u8 y cookies.
|
|
|
|
Parámetros:
|
|
- profile: ruta al user-data-dir de Chrome (opcional). Si el contenedor tiene el
|
|
perfil montado en /host-chrome, pásalo como `/host-chrome/Default`.
|
|
- headless: true/false para ejecutar sin UI.
|
|
- timeout: segundos máximos a esperar por la ejecución del script.
|
|
|
|
Uso (ejemplo):
|
|
curl 'http://localhost:8282/playwright/stream/cmqVmX2UVBM?headless=false&profile=/host-chrome'
|
|
|
|
Nota: el script genera `./data/cookies.txt` si logra extraer cookies.
|
|
"""
|
|
vid = extract_video_id(video_id)
|
|
if not vid:
|
|
raise HTTPException(status_code=400, detail='video_id inválido')
|
|
|
|
script = os.path.join(os.getcwd(), 'tools', 'playwright_extract_m3u8.py')
|
|
if not os.path.exists(script):
|
|
raise HTTPException(status_code=500, detail='Script Playwright no encontrado en tools/playwright_extract_m3u8.py')
|
|
|
|
cmd = ['python3', script, '--video', f'https://www.youtube.com/watch?v={vid}', '--timeout', str(timeout)]
|
|
if headless:
|
|
cmd.append('--headless')
|
|
# profile can be provided via env PLAYWRIGHT_PROFILE or param
|
|
profile_path = profile or os.getenv('PLAYWRIGHT_PROFILE', '')
|
|
if profile_path:
|
|
cmd.extend(['--profile', profile_path])
|
|
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 10)
|
|
except subprocess.TimeoutExpired:
|
|
raise HTTPException(status_code=504, detail='Playwright timed out')
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f'Error ejecutando Playwright: {str(e)[:300]}')
|
|
|
|
if proc.returncode != 0:
|
|
# incluir stderr para diagnóstico
|
|
detail = (proc.stderr or proc.stdout or 'Error desconocido')[:2000]
|
|
return JSONResponse(status_code=500, content={"error": "Playwright error", "detail": detail})
|
|
|
|
try:
|
|
out = json.loads(proc.stdout or '{}')
|
|
except Exception:
|
|
return JSONResponse(status_code=500, content={"error": "No se pudo parsear salida Playwright", "raw": proc.stdout[:2000]})
|
|
|
|
return out
|
|
|
|
def _attempt_playwright_fallback(video_id: str, headless: bool = True, profile: str | None = None, timeout: int = 60):
|
|
"""Ejecuta el script Playwright para intentar extraer m3u8 y cookies.
|
|
Retorna (m3u8_url or None, cookies_saved_path or None, error_message or None)
|
|
"""
|
|
script = os.path.join(os.getcwd(), 'tools', 'playwright_extract_m3u8.py')
|
|
if not os.path.exists(script):
|
|
return None, None, 'Playwright extractor script no disponible'
|
|
|
|
cmd = ['python3', script, '--video', f'https://www.youtube.com/watch?v={video_id}', '--timeout', str(timeout)]
|
|
if headless:
|
|
cmd.append('--headless')
|
|
# profile can be provided via env PLAYWRIGHT_PROFILE or param
|
|
profile_path = profile or os.getenv('PLAYWRIGHT_PROFILE', '')
|
|
if profile_path:
|
|
cmd.extend(['--profile', profile_path])
|
|
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 10)
|
|
except subprocess.TimeoutExpired:
|
|
return None, None, 'Playwright timed out'
|
|
except Exception as e:
|
|
return None, None, f'Error ejecutando Playwright: {str(e)[:200]}'
|
|
|
|
if proc.returncode != 0:
|
|
# incluir stderr para diagnóstico
|
|
detail = (proc.stderr or proc.stdout or 'Error desconocido')[:2000]
|
|
return None, None, f'Playwright error: {detail}'
|
|
|
|
try:
|
|
data = json.loads(proc.stdout or '{}')
|
|
except Exception:
|
|
return None, None, 'No se pudo parsear la salida de Playwright'
|
|
|
|
urls = data.get('m3u8_urls') or []
|
|
cookies_file = data.get('cookies_file')
|
|
|
|
if not urls:
|
|
return None, cookies_file, 'No se encontró m3u8 via Playwright'
|
|
|
|
# tomar la primera URL válida
|
|
m3u8 = urls[0]
|
|
|
|
# Si Playwright devolvió cookies, moverlas a API_COOKIES_PATH para que el resto del sistema las use
|
|
if cookies_file and os.path.exists(cookies_file):
|
|
target = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
|
|
try:
|
|
target_dir = os.path.dirname(target) or '.'
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
# copiar contenido
|
|
with open(cookies_file, 'rb') as src, open(target, 'wb') as dst:
|
|
dst.write(src.read())
|
|
return m3u8, target, None
|
|
except Exception as e:
|
|
return m3u8, None, f'm3u8 encontrado pero no se pudo guardar cookies: {str(e)[:200]}'
|
|
|
|
return m3u8, None, None
|