TubeScript-API/main.py

1279 lines
51 KiB
Python

import os
import json
import subprocess
import requests
import time
import re
import tempfile
import glob
from fastapi import FastAPI, HTTPException, UploadFile, File
from typing import List, Dict, Any, cast
# Intentar importar youtube_transcript_api como fallback
try:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
YOUTUBE_TRANSCRIPT_API_AVAILABLE = True
except Exception:
# definir placeholders para evitar NameError si la librería no está instalada
YouTubeTranscriptApi = None
class TranscriptsDisabled(Exception):
pass
class NoTranscriptFound(Exception):
pass
YOUTUBE_TRANSCRIPT_API_AVAILABLE = False
# Import CookieManager from yt_wrap to provide cookiefile paths per request
from yt_wrap import CookieManager
app = FastAPI(title="TubeScript API Pro - JSON Cleaner")
# Ruta de cookies configurable vía variable de entorno: API_COOKIES_PATH
DEFAULT_COOKIES_PATH = os.getenv('API_COOKIES_PATH', './cookies.txt')
# Proxy opcional para requests/yt-dlp (ej. socks5h://127.0.0.1:9050)
DEFAULT_PROXY = os.getenv('API_PROXY', '')
def clean_youtube_json(raw_json: Dict) -> List[Dict]:
"""
Transforma el formato complejo 'json3' de YouTube a un formato
simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}]
"""
clean_data = []
# YouTube guarda los eventos de texto en la llave 'events'
events = raw_json.get('events', [])
for event in events:
# Solo procesamos eventos que tengan segmentos de texto
if 'segs' in event:
text = "".join([seg['utf8'] for seg in event['segs']]).strip()
if text and text != '\n':
clean_data.append({
"start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos
"duration": event.get('dDurationMs', 0) / 1000.0,
"text": text.replace('\n', ' ')
})
return clean_data
def parse_subtitle_format(content: str, format_type: str) -> List[Dict]:
"""
Parsea diferentes formatos de subtítulos (json3, srv3, vtt) al formato estándar
"""
try:
if format_type == 'json3':
# Formato JSON3 de YouTube
data = json.loads(content) if isinstance(content, str) else content
return clean_youtube_json(data)
elif format_type in ['srv3', 'vtt']:
# Para srv3 y vtt, intentar parsear como JSON primero
try:
data = json.loads(content) if isinstance(content, str) else content
# srv3 también tiene estructura similar a json3
if 'events' in data:
return clean_youtube_json(data)
except:
pass
# Si no es JSON, intentar parsear como texto VTT
clean_data = []
lines = content.split('\n') if isinstance(content, str) else []
current_time = 0.0
current_text = ""
for line in lines:
line = line.strip()
if not line or line.startswith('WEBVTT') or '-->' in line:
if '-->' in line:
# Extraer tiempo de inicio
try:
time_parts = line.split('-->')[0].strip().split(':')
if len(time_parts) >= 2:
current_time = float(time_parts[-2]) * 60 + float(time_parts[-1])
except:
pass
continue
if line and not line.isdigit():
current_text = line
if current_text:
clean_data.append({
"start": current_time,
"duration": 2.0, # Duración aproximada
"text": current_text
})
current_time += 2.0
return clean_data if clean_data else []
else:
# Formato desconocido, intentar como JSON
data = json.loads(content) if isinstance(content, str) else content
if 'events' in data:
return clean_youtube_json(data)
return []
except Exception as e:
print(f"Error parsing subtitle format {format_type}: {e}")
return []
def extract_video_id(video_id_or_url: str) -> str:
"""
Normaliza la entrada y extrae el video_id si se recibe una URL completa.
Acepta: https://www.youtube.com/watch?v=ID, youtu.be/ID, o el propio ID.
"""
if not video_id_or_url:
return ""
s = video_id_or_url.strip()
# Si ya parece un id (11-20 caracteres alfanuméricos y -, _), retornarlo
if re.match(r'^[A-Za-z0-9_-]{8,20}$', s):
return s
# Intentar extraer de URL completa
# watch?v=
m = re.search(r'[?&]v=([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# youtu.be/
m = re.search(r'youtu\.be/([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# /v/ or /embed/
m = re.search(r'(?:/v/|/embed/)([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# Si no se detecta, devolver la entrada original (fallará después si es inválida)
return s
def format_segments_text(segments: List[Dict]) -> List[str]:
"""Devuelve una lista 'format_text' con textos limpios extraídos de segments.
- elimina prefijos tipo 'Kind: captions'
- elimina contenido en corchetes/paréntesis
- elimina etiquetas HTML
- normaliza espacios
- divide por saltos de línea para obtener frases independientes
"""
def _clean_text(t: str) -> str:
if not t:
return ''
s = str(t).strip()
s = re.sub(r'^\s*Kind\s*:\s*.*$', '', s, flags=re.IGNORECASE).strip()
# eliminar contenido entre corchetes (no-greedy)
s = re.sub(r'\[.*?\]', '', s)
s = re.sub(r'\(.*?\)', '', s)
s = re.sub(r'<[^>]+>', '', s)
s = re.sub(r'[♪★■◆►▶◀•–—]', '', s)
s = re.sub(r'\s+', ' ', s).strip()
return s
output: List[str] = []
for seg in segments or []:
raw = seg.get('text', '')
cleaned = _clean_text(raw)
if not cleaned:
continue
parts = [p.strip() for p in re.split(r'[\n\r]+', cleaned) if p.strip()]
output.extend(parts)
return output
# Nuevo helper: obtener thumbnails para un video (intenta yt-dlp --dump-json, fallback a URLs estándar)
def get_video_thumbnails(video_id: str) -> List[str]:
"""Devuelve una lista de URLs de thumbnail para el video.
Primero intenta obtener metadata con yt-dlp y extraer 'thumbnails' o 'thumbnail'.
Si falla, construye una lista de URLs por defecto (maxresdefault, sddefault, hqdefault, mqdefault, default).
"""
thumbs: List[str] = []
url = f"https://www.youtube.com/watch?v={video_id}"
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
cmd = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
url
]
if os.path.exists(cookies_path):
cmd.extend(["--cookies", cookies_path])
if proxy:
cmd.extend(['--proxy', proxy])
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if proc.returncode == 0 and proc.stdout:
try:
meta = json.loads(proc.stdout)
# thumbnails puede ser lista de dicts con 'url'
t = meta.get('thumbnails') or meta.get('thumbnail')
if isinstance(t, list):
for item in t:
if isinstance(item, dict) and item.get('url'):
thumbs.append(item.get('url'))
elif isinstance(item, str):
thumbs.append(item)
elif isinstance(t, dict) and t.get('url'):
thumbs.append(t.get('url'))
elif isinstance(t, str):
thumbs.append(t)
except Exception:
pass
except Exception:
pass
finally:
try:
cookie_mgr.cleanup()
except Exception:
pass
# Si no obtuvimos thumbnails desde metadata, construir URLs estándar
if not thumbs:
thumbs = [
f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg",
f"https://i.ytimg.com/vi/{video_id}/sddefault.jpg",
f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg",
f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
f"https://i.ytimg.com/vi/{video_id}/default.jpg",
]
# deduplicate while preserving order
seen = set()
unique_thumbs = []
for t in thumbs:
if t and t not in seen:
seen.add(t)
unique_thumbs.append(t)
return unique_thumbs
def get_transcript_data(video_id: str, lang: str = "es"):
video_id = extract_video_id(video_id)
if not video_id:
return None, [], "video_id inválido o vacío"
url = f"https://www.youtube.com/watch?v={video_id}"
# Use CookieManager to get a cookiefile path per request (may be None)
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
# cookies_path: prefer the temporary cookiefile if present, otherwise fall back to env path
cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
# proxy support
proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
proxies = {'http': proxy, 'https': proxy} if proxy else None
def load_cookies_from_file(path: str) -> dict:
"""Parsea un cookies.txt en formato Netscape a un dict usable por requests."""
cookies = {}
try:
if not path or not os.path.exists(path):
return cookies
with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
for line in fh:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('\t')
# formato Netscape: domain, flag, path, secure, expiration, name, value
if len(parts) >= 7:
name = parts[5].strip()
value = parts[6].strip()
if name:
cookies[name] = value
else:
# fallback: intento simple name=value
if '=' in line:
k, v = line.split('=', 1)
cookies[k.strip()] = v.strip()
except Exception:
return {}
return cookies
cookies_for_requests = load_cookies_from_file(cookies_path) if cookies_path else {}
# Intento rápido y fiable: usar yt-dlp para descargar subtítulos (auto o manual) al tmpdir
try:
with tempfile.TemporaryDirectory() as tmpdl:
# probar variantes de idioma (ej. es y es-419) para cubrir casos regionales
sub_langs = [lang]
if len(lang) == 2:
sub_langs.append(f"{lang}-419")
ytdlp_cmd = [
"yt-dlp",
url,
"--skip-download",
"--write-auto-sub",
"--write-sub",
"--sub-format", "vtt/json3/srv3/best",
"-o", os.path.join(tmpdl, "%(id)s.%(ext)s"),
"--no-warnings",
]
# agregar sub-lang si hay variantes
if sub_langs:
ytdlp_cmd.extend(["--sub-lang", ",".join(sub_langs)])
# attach cookiefile if exists
if cookiefile_path:
ytdlp_cmd.extend(["--cookies", cookiefile_path])
# attach proxy if configured
if proxy:
ytdlp_cmd.extend(['--proxy', proxy])
try:
result = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=120)
# Si yt-dlp falló por rate limiting, devolver mensaje claro
stderr = (result.stderr or "").lower()
if result.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr):
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega un cookies.txt válido exportado desde tu navegador y monta en el contenedor, o espera unos minutos."
if result.returncode != 0 and ('http error 403' in stderr or 'forbidden' in stderr):
return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). El video puede tener restricciones. Usa cookies.txt con una cuenta autorizada."
except subprocess.TimeoutExpired:
pass
# revisar archivos creados
files = glob.glob(os.path.join(tmpdl, f"{video_id}.*"))
if files:
combined = []
for fpath in files:
try:
with open(fpath, 'r', encoding='utf-8') as fh:
combined.append(fh.read())
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
parsed = parse_subtitle_format(vtt_combined, 'vtt')
if parsed:
return parsed, get_video_thumbnails(video_id), None
finally:
# cleanup any temp cookiefile created for this request
try:
cookie_mgr.cleanup()
except Exception:
pass
# ...existing code continues...
# 1) Intento principal: obtener metadata con yt-dlp
command = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
url
]
if os.path.exists(cookies_path):
command.extend(["--cookies", cookies_path])
# attach proxy if configured
if proxy:
command.extend(['--proxy', proxy])
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
error_msg = result.stderr if result.stderr else "Error desconocido from yt-dlp"
# Si yt-dlp reporta algo, enviar mensaje útil
# No abortar inmediatamente: intentaremos fallback descargando subs con yt-dlp
video_metadata = None
else:
if not result.stdout.strip():
video_metadata = None
else:
try:
video_metadata = json.loads(result.stdout)
except Exception:
video_metadata = None
except subprocess.TimeoutExpired:
video_metadata = None
except FileNotFoundError:
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar."
except Exception as e:
video_metadata = None
requested_subs = {}
if video_metadata:
requested_subs = video_metadata.get('requested_subtitles', {}) or {}
# Buscar en automatic_captions y subtitles si requested_subs está vacío
if not requested_subs:
automatic_captions = video_metadata.get('automatic_captions', {}) or {}
for lang_key, formats in automatic_captions.items():
if lang in lang_key or lang_key.startswith(lang):
if formats:
requested_subs = {lang_key: formats[0]}
break
if not requested_subs:
subtitles = video_metadata.get('subtitles', {}) or {}
for lang_key, formats in subtitles.items():
if lang in lang_key or lang_key.startswith(lang):
if formats:
requested_subs = {lang_key: formats[0]}
break
# Si requested_subs está disponible, intentar descargar vía requests la URL proporcionada
if requested_subs:
lang_key = next(iter(requested_subs))
sub_url = requested_subs[lang_key].get('url')
if sub_url:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Referer': 'https://www.youtube.com/',
}
max_retries = 3
response = None
rate_limited = False
for attempt in range(max_retries):
try:
response = requests.get(sub_url, headers=headers, timeout=30, cookies=cookies_for_requests, proxies=proxies)
if response.status_code == 200:
break
elif response.status_code == 429:
rate_limited = True
if attempt < max_retries - 1:
time.sleep(2 * (attempt + 1))
continue
else:
# salir del loop y usar fallback con yt-dlp más abajo
break
elif response.status_code == 403:
return None, get_video_thumbnails(video_id), "Acceso denegado (HTTP 403). El video puede tener restricciones de edad o región. Intenta con cookies.txt."
elif response.status_code == 404:
# No encontramos la URL esperada; intentar fallback
response = None
break
else:
return None, get_video_thumbnails(video_id), f"Error al descargar subtítulos desde YouTube (HTTP {response.status_code})."
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
continue
return None, get_video_thumbnails(video_id), "Timeout al descargar subtítulos. Intenta nuevamente."
except requests.exceptions.RequestException as e:
return None, get_video_thumbnails(video_id), f"Error de conexión al descargar subtítulos: {str(e)[:100]}"
# Si obtuvimos un 200, procesarlo; si hubo rate limiting, intentar fallback con yt-dlp
if response and response.status_code == 200:
subtitle_format = requested_subs[lang_key].get('ext', 'json3')
try:
# Si la respuesta parece ser una playlist M3U8 o contiene enlaces a timedtext,
# extraer las URLs y concatenar su contenido (VTT) antes de parsear.
text_body = response.text if isinstance(response.text, str) else None
if text_body and ('#EXTM3U' in text_body or 'timedtext' in text_body or text_body.strip().lower().startswith('#extm3u')):
# Extraer URLs (líneas que empiecen con http)
urls = re.findall(r'^(https?://\S+)', text_body, flags=re.M)
# Intento 1: descargar cada URL con requests (usa cookies montadas si aplican)
combined = []
for idx, u in enumerate(urls):
try:
r2 = requests.get(u, headers=headers, timeout=20, cookies=cookies_for_requests, proxies=proxies)
if r2.status_code == 200 and r2.text:
combined.append(r2.text)
continue
# Si recibimos 429, 403, o falló, intentaremos con yt-dlp (fallback)
if r2.status_code == 429:
# fallback a yt-dlp
raise Exception('rate_limited')
except Exception:
# fallthrough al fallback con yt-dlp
pass
# Intento 2 (fallback): usar yt-dlp para descargar ese timedtext/url a un archivo temporal
try:
with tempfile.TemporaryDirectory() as tdir:
out_template = os.path.join(tdir, f"timedtext_{idx}.%(ext)s")
ytdlp_cmd = [
"yt-dlp",
u,
"-o", out_template,
"--no-warnings",
]
if os.path.exists(cookies_path):
ytdlp_cmd.extend(["--cookies", cookies_path])
# pasar proxy a yt-dlp si está configurado
if proxy:
ytdlp_cmd.extend(['--proxy', proxy])
try:
res2 = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=60)
stderr2 = (res2.stderr or "").lower()
if res2.returncode != 0 and ('http error 429' in stderr2 or 'too many requests' in stderr2):
# rate limit cuando intentamos descargar timedtext
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde."
if res2.returncode != 0 and ('http error 403' in stderr2 or 'forbidden' in stderr2):
return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). Intenta con cookies.txt o una cuenta con permisos."
except Exception:
pass
# leer cualquier archivo creado en el tempdir
for fpath in glob.glob(os.path.join(tdir, "timedtext_*.*")):
try:
with open(fpath, 'r', encoding='utf-8') as fh:
txt = fh.read()
if txt:
combined.append(txt)
except Exception:
continue
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt')
if formatted_transcript:
return formatted_transcript, get_video_thumbnails(video_id), None
try:
subtitle_data = response.json()
formatted_transcript = parse_subtitle_format(subtitle_data, subtitle_format)
except json.JSONDecodeError:
formatted_transcript = parse_subtitle_format(response.text, subtitle_format)
except Exception as e:
return None, get_video_thumbnails(video_id), f"Error al procesar los subtítulos: {str(e)[:200]}"
if not formatted_transcript:
return None, get_video_thumbnails(video_id), "Los subtítulos están vacíos o no se pudieron procesar."
return formatted_transcript, get_video_thumbnails(video_id), None
# Si hubo rate limiting, intentar fallback con yt-dlp para descargar la URL de subtítulos
if rate_limited and (not response or response.status_code != 200):
# Intentar descargar la URL de subtítulos directamente con yt-dlp (usa cookies si existen)
try:
with tempfile.TemporaryDirectory() as tdir:
out_template = os.path.join(tdir, "sub.%(ext)s")
ytdlp_cmd = [
"yt-dlp",
sub_url,
"-o", out_template,
"--no-warnings",
]
if os.path.exists(cookies_path):
ytdlp_cmd.extend(["--cookies", cookies_path])
if proxy:
ytdlp_cmd.extend(['--proxy', proxy])
res = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=90)
stderr = (res.stderr or "").lower()
if res.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr):
return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde."
# Leer archivos generados
combined = []
for fpath in glob.glob(os.path.join(tdir, "*.*")):
try:
with open(fpath, 'r', encoding='utf-8') as fh:
txt = fh.read()
if txt:
combined.append(txt)
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt')
if formatted_transcript:
return formatted_transcript, get_video_thumbnails(video_id), None
except FileNotFoundError:
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar."
except Exception:
# seguir con otros fallbacks
pass
# si no logró con yt-dlp, continuar y dejar que los fallbacks posteriores manejen el caso
# Fallback: intentarlo descargando subtítulos con yt-dlp a un directorio temporal
# (esto cubre casos en que la metadata no incluye requested_subs)
try:
with tempfile.TemporaryDirectory() as tmpdir:
# Intentar con auto-sub primero, luego con sub (manual)
ytdlp_variants = [
("--write-auto-sub", "auto"),
("--write-sub", "manual")
]
downloaded = None
for flag, label in ytdlp_variants:
cmd = [
"yt-dlp",
url,
"--skip-download",
flag,
"--sub-lang", lang,
"--sub-format", "json3/vtt/srv3/best",
"-o", os.path.join(tmpdir, "%(id)s.%(ext)s"),
"--no-warnings",
]
if os.path.exists(cookies_path):
cmd.extend(["--cookies", cookies_path])
# añadir proxy a la llamada de yt-dlp si está configurado
if proxy:
cmd.extend(['--proxy', proxy])
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
# Revisar si se creó algún archivo en tmpdir
files = glob.glob(os.path.join(tmpdir, f"{video_id}.*"))
if files:
# Tomar el primero válido
downloaded = files[0]
break
if downloaded:
ext = os.path.splitext(downloaded)[1].lstrip('.')
try:
with open(downloaded, 'r', encoding='utf-8') as fh:
content = fh.read()
except Exception as e:
return None, get_video_thumbnails(video_id), f"Error leyendo archivo de subtítulos descargado: {str(e)[:200]}"
# Intentar parsear según extensión conocida
fmt = 'json3' if ext in ('json', 'json3') else ('vtt' if ext == 'vtt' else 'srv3')
formatted_transcript = parse_subtitle_format(content, fmt)
if formatted_transcript:
return formatted_transcript, get_video_thumbnails(video_id), None
else:
return None, get_video_thumbnails(video_id), "Se descargaron subtítulos pero no se pudieron procesar."
except FileNotFoundError:
return None, get_video_thumbnails(video_id), "yt-dlp no está instalado. Instala yt-dlp en el contenedor/entorno y vuelve a intentar."
except Exception as e:
# No hacer crash, retornar mensaje general
return None, get_video_thumbnails(video_id), f"Error al intentar descargar subtítulos con yt-dlp: {str(e)[:200]}"
return None, get_video_thumbnails(video_id), "No se encontraron subtítulos para este video (o el video no tiene subtítulos disponibles). Intenta con otro video en vivo o agrega cookies.txt si hay restricciones."
def get_stream_url(video_id: str):
"""
Obtiene la URL de transmisión m3u8 del video usando yt-dlp con cookies y estrategias de fallback
"""
url = f"https://www.youtube.com/watch?v={video_id}"
# dynamically get cookiefile for this request
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
try:
# Lista de formatos a intentar en orden de prioridad
format_strategies = [
("best[ext=m3u8]", "Mejor calidad m3u8"),
("best", "Mejor calidad disponible"),
("best[ext=mp4]", "Mejor calidad MP4"),
("bestvideo+bestaudio/best", "Mejor video y audio"),
]
for format_spec, description in format_strategies:
command = [
"yt-dlp",
"-g",
"-f", format_spec,
"--no-warnings",
"--no-check-certificate",
"--extractor-args", "youtube:player_client=android",
]
if cookiefile_path:
command.extend(["--cookies", cookiefile_path])
command.append(url)
try:
result = subprocess.run(command, capture_output=True, text=True, check=False, timeout=60)
if result.returncode == 0 and result.stdout.strip():
# Obtener todas las URLs (puede haber video y audio separados)
urls = result.stdout.strip().split('\n')
# Buscar la URL m3u8 o googlevideo
stream_url = None
for url_line in urls:
if url_line and url_line.strip():
# Preferir URLs con m3u8
if 'm3u8' in url_line.lower():
stream_url = url_line.strip()
break
# O URLs de googlevideo
elif 'googlevideo.com' in url_line:
stream_url = url_line.strip()
break
# Si no encontramos ninguna específica, usar la primera URL válida
if not stream_url and urls:
for url_line in urls:
if url_line and url_line.strip() and url_line.startswith('http'):
stream_url = url_line.strip()
break
if stream_url:
return stream_url, None
continue
except subprocess.TimeoutExpired:
continue
except Exception:
continue
return None, "No se pudo obtener la URL del stream. Verifica que el video esté EN VIVO (🔴) y no tenga restricciones."
finally:
try:
cookie_mgr.cleanup()
except Exception:
pass
@app.get("/transcript/{video_id}")
def transcript_endpoint(video_id: str, lang: str = "es"):
data, thumbnails, error = get_transcript_data(video_id, lang)
if error:
raise HTTPException(status_code=400, detail=error)
# Concatenar texto de segmentos para mostrar como texto plano además de los segmentos
try:
combined_text = "\n".join([seg.get('text', '') for seg in data if seg.get('text')])
except Exception:
combined_text = ""
# Nuevo: arreglo format_text con cada segmento como elemento (texto limpio)
try:
format_text_list = format_segments_text(data)
except Exception:
format_text_list = []
format_text = format_text_list
return {
"video_id": video_id,
"count": len(data),
"segments": data,
"text": combined_text,
"format_text": format_text,
"thumbnails": thumbnails
}
@app.get('/transcript_vtt/{video_id}')
def transcript_vtt(video_id: str, lang: str = 'es'):
"""Descarga (con yt-dlp) y devuelve subtítulos en VTT, además de segmentos parseados y texto concatenado."""
vtt_text, error = fetch_vtt_subtitles(video_id, lang)
if error:
raise HTTPException(status_code=400, detail=error)
# parsear VTT a segmentos usando parse_subtitle_format
segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else []
combined_text = '\n'.join([s.get('text','') for s in segments])
# format_text con texto limpio listo para procesamiento por agentes
format_text = format_segments_text(segments)
thumbnails = get_video_thumbnails(video_id)
return {
'video_id': video_id,
'vtt': vtt_text,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text,
'thumbnails': thumbnails
}
@app.get("/stream/{video_id}")
def stream_endpoint(video_id: str):
"""
Endpoint para obtener la URL de transmisión en vivo de un video de YouTube
Retorna la URL m3u8 que se puede usar directamente con FFmpeg para retransmitir
a redes sociales usando RTMP.
Ejemplo de uso con FFmpeg:
ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key
"""
stream_url, error = get_stream_url(video_id)
if error:
raise HTTPException(status_code=400, detail=error)
thumbnails = get_video_thumbnails(video_id)
# Determinar el tipo de URL obtenida
url_type = "unknown"
if stream_url and "m3u8" in stream_url.lower():
url_type = "m3u8/hls"
elif stream_url and "googlevideo.com" in stream_url:
url_type = "direct/mp4"
# Obtener title y description con yt-dlp --dump-json
title = None
description = None
try:
_cookie_mgr = CookieManager()
_cookiefile = _cookie_mgr.get_cookiefile_path()
_cookies_path = _cookiefile or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
_proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
_cmd = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
"--extractor-args", "youtube:player_client=android",
f"https://www.youtube.com/watch?v={video_id}"
]
if _cookiefile:
_cmd.extend(["--cookies", _cookiefile])
if _proxy:
_cmd.extend(["--proxy", _proxy])
_proc = subprocess.run(_cmd, capture_output=True, text=True, timeout=60)
if _proc.returncode == 0 and _proc.stdout:
_meta = json.loads(_proc.stdout)
title = _meta.get("title")
description = _meta.get("description")
except Exception:
pass
finally:
try:
_cookie_mgr.cleanup()
except Exception:
pass
return {
"video_id": video_id,
"title": title,
"description": description,
"stream_url": stream_url,
"url_type": url_type,
"youtube_url": f"https://www.youtube.com/watch?v={video_id}",
"ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key',
"thumbnails": thumbnails,
"usage": {
"description": "Usa stream_url con FFmpeg para retransmitir",
"command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}",
"platforms": {
"youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY",
"facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY",
"twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY",
"twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY"
}
}
}
@app.post('/upload_cookies')
async def upload_cookies(file: UploadFile = File(...)):
"""Endpoint para subir cookies.txt y guardarlo en el servidor en /app/cookies.txt"""
try:
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail='Archivo vacío')
target = 'cookies.txt'
# Guardar con permisos de escritura
with open(target, 'wb') as fh:
fh.write(content)
return {"detail": "cookies.txt guardado correctamente", "path": os.path.abspath(target)}
except Exception as e:
raise HTTPException(status_code=500, detail=f'Error al guardar cookies: {str(e)[:200]}')
@app.get("/debug/metadata/{video_id}")
def debug_metadata(video_id: str):
"""Endpoint de depuración: obtiene --dump-json de yt-dlp para un video.
Devuelve la metadata (automatic_captions, subtitles, requested_subtitles) para inspección.
"""
# try to use dynamic cookiefile per request
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None
url = f"https://www.youtube.com/watch?v={video_id}"
cmd = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
"--extractor-args", "youtube:player_client=android",
url
]
if os.path.exists(cookies_path):
cmd.extend(["--cookies", cookies_path])
if proxy:
cmd.extend(['--proxy', proxy])
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except FileNotFoundError:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail="yt-dlp no está instalado en el contenedor/entorno.")
except subprocess.TimeoutExpired:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=504, detail="yt-dlp demoró demasiado en responder.")
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail=str(e)[:300])
if proc.returncode != 0:
stderr = proc.stderr or ''
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail=f"yt-dlp error: {stderr[:1000]}")
try:
metadata = json.loads(proc.stdout)
except Exception:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail="No se pudo parsear la salida JSON de yt-dlp.")
try:
cookie_mgr.cleanup()
except Exception:
pass
# Devolver solo las partes útiles para depuración
debug_info = {
'id': metadata.get('id'),
'title': metadata.get('title'),
'uploader': metadata.get('uploader'),
'is_live': metadata.get('is_live'),
'automatic_captions': metadata.get('automatic_captions'),
'subtitles': metadata.get('subtitles'),
'requested_subtitles': metadata.get('requested_subtitles'),
'formats_sample': metadata.get('formats')[:5] if metadata.get('formats') else None,
}
return debug_info
@app.get('/debug/fetch_subs/{video_id}')
def debug_fetch_subs(video_id: str, lang: str = 'es'):
"""Intenta descargar subtítulos con yt-dlp dentro del entorno y devuelve el log y el contenido (parcial) si existe.
Usa cookies definidas en API_COOKIES_PATH.
"""
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
out_dir = tempfile.mkdtemp(prefix='subs_')
out_template = os.path.join(out_dir, '%(id)s.%(ext)s')
url = f"https://www.youtube.com/watch?v={video_id}"
cmd = [
'yt-dlp',
'--verbose',
'--skip-download',
'--write-auto-sub',
'--write-sub',
'--sub-lang', lang,
'--sub-format', 'json3/vtt/srv3/best',
'--output', out_template,
url
]
if cookiefile_path:
cmd.extend(['--cookies', cookiefile_path])
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=240)
except FileNotFoundError:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail='yt-dlp no está instalado en el contenedor.')
except subprocess.TimeoutExpired:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=504, detail='La ejecución de yt-dlp demoró demasiado.')
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
raise HTTPException(status_code=500, detail=str(e)[:300])
stdout = proc.stdout or ''
stderr = proc.stderr or ''
rc = proc.returncode
# Buscar archivos generados
generated = []
for f in glob.glob(os.path.join(out_dir, f"{video_id}.*")):
size = None
try:
size = os.path.getsize(f)
# tomar las primeras 200 líneas para no retornar archivos enormes
with open(f, 'r', encoding='utf-8', errors='ignore') as fh:
sample = ''.join([next(fh) for _ in range(200)]) if size > 0 else ''
generated.append({
'path': f,
'size': size,
'sample': sample
})
except StopIteration:
# menos de 200 líneas
try:
with open(f, 'r', encoding='utf-8', errors='ignore') as fh:
sample = fh.read()
except Exception:
sample = None
if size is None:
try:
size = os.path.getsize(f)
except Exception:
size = 0
generated.append({'path': f, 'size': size, 'sample': sample})
except Exception:
if size is None:
try:
size = os.path.getsize(f)
except Exception:
size = 0
generated.append({'path': f, 'size': size, 'sample': None})
try:
cookie_mgr.cleanup()
except Exception:
pass
return {
'video_id': video_id,
'rc': rc,
'stdout_tail': stdout[-2000:],
'stderr_tail': stderr[-2000:],
'generated': generated,
'out_dir': out_dir
}
# Nuevo helper para descargar VTT directamente y retornarlo como texto
def fetch_vtt_subtitles(video_id: str, lang: str = 'es'):
"""Descarga subtítulos en formato VTT usando yt-dlp y devuelve su contenido.
Retorna (vtt_text, None) en caso de éxito o (None, error_message) en caso de error.
"""
url = f"https://www.youtube.com/watch?v={video_id}"
cookie_mgr = CookieManager()
cookiefile_path = cookie_mgr.get_cookiefile_path()
with tempfile.TemporaryDirectory() as tmpdir:
out_template = os.path.join(tmpdir, '%(id)s.%(ext)s')
cmd = [
'yt-dlp',
'--skip-download',
'--write-auto-sub',
'--write-sub',
'--sub-lang', lang,
'--sub-format', 'vtt',
'--output', out_template,
url
]
if cookiefile_path:
cmd.extend(['--cookies', cookiefile_path])
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
except FileNotFoundError:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, 'yt-dlp no está instalado en el entorno.'
except subprocess.TimeoutExpired:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, 'La descarga de subtítulos tardó demasiado.'
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, f'Error ejecutando yt-dlp: {str(e)[:200]}'
stderr = (proc.stderr or '').lower()
if proc.returncode != 0:
try:
cookie_mgr.cleanup()
except Exception:
pass
if 'http error 429' in stderr or 'too many requests' in stderr:
return None, 'YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Revisa cookies.txt o prueba desde otra IP.'
if 'http error 403' in stderr or 'forbidden' in stderr:
return None, 'Acceso denegado al descargar subtítulos (HTTP 403). Usa cookies.txt con una cuenta autorizada.'
return None, f'yt-dlp error: {proc.stderr[:1000]}'
files = glob.glob(os.path.join(tmpdir, f"{video_id}.*"))
if not files:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, 'No se generaron archivos de subtítulos.'
# intentar preferir .vtt
vtt_path = None
for f in files:
if f.lower().endswith('.vtt'):
vtt_path = f
break
if not vtt_path:
vtt_path = files[0]
try:
with open(vtt_path, 'r', encoding='utf-8', errors='ignore') as fh:
content = fh.read()
try:
cookie_mgr.cleanup()
except Exception:
pass
return content, None
except Exception as e:
try:
cookie_mgr.cleanup()
except Exception:
pass
return None, f'Error leyendo archivo de subtítulos: {str(e)[:200]}'
# Nuevo endpoint que devuelve VTT crudo, segmentos parseados y texto concatenado
@app.get('/transcript_vtt/{video_id}')
def transcript_vtt(video_id: str, lang: str = 'es'):
"""Descarga (con yt-dlp) y devuelve subtítulos en VTT, además de segmentos parseados y texto concatenado."""
vtt_text, error = fetch_vtt_subtitles(video_id, lang)
if error:
raise HTTPException(status_code=400, detail=error)
# parsear VTT a segmentos usando parse_subtitle_format
segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else []
combined_text = '\n'.join([s.get('text','') for s in segments])
# format_text con texto limpio listo para procesamiento por agentes
format_text = format_segments_text(segments)
thumbnails = get_video_thumbnails(video_id)
return {
'video_id': video_id,
'vtt': vtt_text,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text,
'thumbnails': thumbnails
}
@app.post('/upload_vtt/{video_id}')
async def upload_vtt(video_id: str, file: UploadFile = File(...)):
"""Permite subir un archivo VTT para un video y devuelve segmentos parseados y texto.
Guarda el archivo en /app/data/{video_id}.vtt (sobrescribe si existe).
"""
try:
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail='Archivo vacío')
target_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(target_dir, exist_ok=True)
target_path = os.path.join(target_dir, f"{video_id}.vtt")
with open(target_path, 'wb') as fh:
fh.write(content)
# Leer como texto para parsear
text = content.decode('utf-8', errors='ignore')
segments = parse_subtitle_format(text, 'vtt') if text else []
combined_text = '\n'.join([s.get('text','') for s in segments])
format_text = format_segments_text(segments)
return {
'video_id': video_id,
'path': target_path,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text
}
except Exception as e:
raise HTTPException(status_code=500, detail=f'Error al guardar/parsear VTT: {str(e)[:200]}')
@app.get('/transcript_alt/{video_id}')
def transcript_alt(video_id: str, lang: str = 'es'):
"""Intento alternativo de obtener transcript usando youtube-transcript-api (si está disponible).
Retorna segmentos en el mismo formato que get_transcript_data para mantener consistencia.
"""
if not YOUTUBE_TRANSCRIPT_API_AVAILABLE:
raise HTTPException(status_code=501, detail='youtube-transcript-api no está instalado en el entorno.')
vid = extract_video_id(video_id)
if not vid:
raise HTTPException(status_code=400, detail='video_id inválido')
# preparar idiomas a probar
langs = [lang]
if len(lang) == 2:
langs.append(f"{lang}-419")
try:
# get_transcript puede lanzar excepciones si no hay transcript
# Usar cast para silenciar el analizador estático que no infiere la comprobación previa
transcript_api = cast(Any, YouTubeTranscriptApi)
transcript_list = transcript_api.get_transcript(vid, languages=langs)
except NoTranscriptFound:
raise HTTPException(status_code=404, detail='No se encontró transcript con youtube-transcript-api')
except TranscriptsDisabled:
raise HTTPException(status_code=403, detail='Los transcripts están deshabilitados para este video')
except Exception as e:
raise HTTPException(status_code=500, detail=f'Error youtube-transcript-api: {str(e)[:300]}')
# transcript_list tiene items con keys: text, start, duration
segments = []
for item in transcript_list:
segments.append({
'start': float(item.get('start', 0)),
'duration': float(item.get('duration', 0)),
'text': item.get('text', '').strip()
})
combined_text = '\n'.join([s['text'] for s in segments if s.get('text')])
format_text = format_segments_text(segments)
thumbnails = get_video_thumbnails(vid)
return {
'video_id': vid,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text,
'source': 'youtube-transcript-api'
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)