TubeScript-API/main.py

615 lines
26 KiB
Python

import os
import json
import subprocess
import requests
import time
import re
import tempfile
import glob
from fastapi import FastAPI, HTTPException, UploadFile, File
from typing import List, Dict
app = FastAPI(title="TubeScript API Pro - JSON Cleaner")
# Ruta de cookies configurable vía variable de entorno: API_COOKIES_PATH
DEFAULT_COOKIES_PATH = os.getenv('API_COOKIES_PATH', './cookies.txt')
def clean_youtube_json(raw_json: Dict) -> List[Dict]:
"""
Transforma el formato complejo 'json3' de YouTube a un formato
simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}]
"""
clean_data = []
# YouTube guarda los eventos de texto en la llave 'events'
events = raw_json.get('events', [])
for event in events:
# Solo procesamos eventos que tengan segmentos de texto
if 'segs' in event:
text = "".join([seg['utf8'] for seg in event['segs']]).strip()
if text and text != '\n':
clean_data.append({
"start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos
"duration": event.get('dDurationMs', 0) / 1000.0,
"text": text.replace('\n', ' ')
})
return clean_data
def parse_subtitle_format(content: str, format_type: str) -> List[Dict]:
"""
Parsea diferentes formatos de subtítulos (json3, srv3, vtt) al formato estándar
"""
try:
if format_type == 'json3':
# Formato JSON3 de YouTube
data = json.loads(content) if isinstance(content, str) else content
return clean_youtube_json(data)
elif format_type in ['srv3', 'vtt']:
# Para srv3 y vtt, intentar parsear como JSON primero
try:
data = json.loads(content) if isinstance(content, str) else content
# srv3 también tiene estructura similar a json3
if 'events' in data:
return clean_youtube_json(data)
except:
pass
# Si no es JSON, intentar parsear como texto VTT
clean_data = []
lines = content.split('\n') if isinstance(content, str) else []
current_time = 0.0
current_text = ""
for line in lines:
line = line.strip()
if not line or line.startswith('WEBVTT') or '-->' in line:
if '-->' in line:
# Extraer tiempo de inicio
try:
time_parts = line.split('-->')[0].strip().split(':')
if len(time_parts) >= 2:
current_time = float(time_parts[-2]) * 60 + float(time_parts[-1])
except:
pass
continue
if line and not line.isdigit():
current_text = line
if current_text:
clean_data.append({
"start": current_time,
"duration": 2.0, # Duración aproximada
"text": current_text
})
current_time += 2.0
return clean_data if clean_data else []
else:
# Formato desconocido, intentar como JSON
data = json.loads(content) if isinstance(content, str) else content
if 'events' in data:
return clean_youtube_json(data)
return []
except Exception as e:
print(f"Error parsing subtitle format {format_type}: {e}")
return []
def extract_video_id(video_id_or_url: str) -> str:
"""
Normaliza la entrada y extrae el video_id si se recibe una URL completa.
Acepta: https://www.youtube.com/watch?v=ID, youtu.be/ID, o el propio ID.
"""
if not video_id_or_url:
return ""
s = video_id_or_url.strip()
# Si ya parece un id (11-20 caracteres alfanuméricos y -, _), retornarlo
if re.match(r'^[A-Za-z0-9_-]{8,20}$', s):
return s
# Intentar extraer de URL completa
# watch?v=
m = re.search(r'[?&]v=([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# youtu.be/
m = re.search(r'youtu\.be/([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# /v/ or /embed/
m = re.search(r'(?:/v/|/embed/)([A-Za-z0-9_-]{8,20})', s)
if m:
return m.group(1)
# Si no se detecta, devolver la entrada original (fallará después si es inválida)
return s
def get_transcript_data(video_id: str, lang: str = "es"):
video_id = extract_video_id(video_id)
if not video_id:
return None, "video_id inválido o vacío"
url = f"https://www.youtube.com/watch?v={video_id}"
# Leer la ruta de cookies desde la variable de entorno al invocar (permite override en runtime)
cookies_path = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
def load_cookies_from_file(path: str) -> dict:
"""Parsea un cookies.txt en formato Netscape a un dict usable por requests."""
cookies = {}
try:
if not os.path.exists(path):
return cookies
with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
for line in fh:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('\t')
# formato Netscape: domain, flag, path, secure, expiration, name, value
if len(parts) >= 7:
name = parts[5].strip()
value = parts[6].strip()
if name:
cookies[name] = value
else:
# fallback: intento simple name=value
if '=' in line:
k, v = line.split('=', 1)
cookies[k.strip()] = v.strip()
except Exception:
return {}
return cookies
cookies_for_requests = load_cookies_from_file(cookies_path)
# Intento rápido y fiable: usar yt-dlp para descargar subtítulos (auto o manual) al tmpdir
try:
with tempfile.TemporaryDirectory() as tmpdl:
# probar variantes de idioma (ej. es y es-419) para cubrir casos regionales
sub_langs = [lang]
if len(lang) == 2:
sub_langs.append(f"{lang}-419")
ytdlp_cmd = [
"yt-dlp",
url,
"--skip-download",
"--write-auto-sub",
"--write-sub",
"--sub-format", "vtt/json3/srv3/best",
"-o", os.path.join(tmpdl, "%(id)s.%(ext)s"),
"--no-warnings",
]
# agregar sub-lang si hay variantes
if sub_langs:
ytdlp_cmd.extend(["--sub-lang", ",".join(sub_langs)])
if os.path.exists(cookies_path):
ytdlp_cmd.extend(["--cookies", cookies_path])
try:
result = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=120)
# Si yt-dlp falló por rate limiting, devolver mensaje claro
stderr = (result.stderr or "").lower()
if result.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr):
return None, "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega un cookies.txt válido exportado desde tu navegador y monta en el contenedor, o espera unos minutos."
if result.returncode != 0 and ('http error 403' in stderr or 'forbidden' in stderr):
return None, "Acceso denegado al descargar subtítulos (HTTP 403). El video puede tener restricciones. Usa cookies.txt con una cuenta autorizada."
except subprocess.TimeoutExpired:
pass
# revisar archivos creados
files = glob.glob(os.path.join(tmpdl, f"{video_id}.*"))
if files:
combined = []
for fpath in files:
try:
with open(fpath, 'r', encoding='utf-8') as fh:
combined.append(fh.read())
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
parsed = parse_subtitle_format(vtt_combined, 'vtt')
if parsed:
return parsed, None
except FileNotFoundError:
# yt-dlp no instalado, seguiremos con los métodos previos
pass
except Exception:
pass
# 1) Intento principal: obtener metadata con yt-dlp
command = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
url
]
if os.path.exists(cookies_path):
command.extend(["--cookies", cookies_path])
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
error_msg = result.stderr if result.stderr else "Error desconocido from yt-dlp"
# Si yt-dlp reporta algo, enviar mensaje útil
# No abortar inmediatamente: intentaremos fallback descargando subs con yt-dlp
video_metadata = None
else:
if not result.stdout.strip():
video_metadata = None
else:
try:
video_metadata = json.loads(result.stdout)
except Exception:
video_metadata = None
except subprocess.TimeoutExpired:
video_metadata = None
except FileNotFoundError:
return None, "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar."
except Exception as e:
video_metadata = None
requested_subs = {}
if video_metadata:
requested_subs = video_metadata.get('requested_subtitles', {}) or {}
# Buscar en automatic_captions y subtitles si requested_subs está vacío
if not requested_subs:
automatic_captions = video_metadata.get('automatic_captions', {}) or {}
for lang_key, formats in automatic_captions.items():
if lang in lang_key or lang_key.startswith(lang):
if formats:
requested_subs = {lang_key: formats[0]}
break
if not requested_subs:
subtitles = video_metadata.get('subtitles', {}) or {}
for lang_key, formats in subtitles.items():
if lang in lang_key or lang_key.startswith(lang):
if formats:
requested_subs = {lang_key: formats[0]}
break
# Si requested_subs está disponible, intentar descargar vía requests la URL proporcionada
if requested_subs:
lang_key = next(iter(requested_subs))
sub_url = requested_subs[lang_key].get('url')
if sub_url:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Referer': 'https://www.youtube.com/',
}
max_retries = 3
response = None
for attempt in range(max_retries):
try:
response = requests.get(sub_url, headers=headers, timeout=30, cookies=cookies_for_requests)
if response.status_code == 200:
break
elif response.status_code == 429:
if attempt < max_retries - 1:
time.sleep(2 * (attempt + 1))
continue
else:
return None, "YouTube está limitando las peticiones (HTTP 429). Agrega cookies.txt o espera unos minutos."
elif response.status_code == 403:
return None, "Acceso denegado (HTTP 403). El video puede tener restricciones de edad o región. Intenta con cookies.txt."
elif response.status_code == 404:
# No encontramos la URL esperada; intentar fallback
response = None
break
else:
return None, f"Error al descargar subtítulos desde YouTube (HTTP {response.status_code})."
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
continue
return None, "Timeout al descargar subtítulos. Intenta nuevamente."
except requests.exceptions.RequestException as e:
return None, f"Error de conexión al descargar subtítulos: {str(e)[:100]}"
if response and response.status_code == 200:
subtitle_format = requested_subs[lang_key].get('ext', 'json3')
try:
# Si la respuesta parece ser una playlist M3U8 o contiene enlaces a timedtext,
# extraer las URLs y concatenar su contenido (VTT) antes de parsear.
text_body = response.text if isinstance(response.text, str) else None
if text_body and ('#EXTM3U' in text_body or 'timedtext' in text_body or text_body.strip().lower().startswith('#extm3u')):
# Extraer URLs (líneas que empiecen con http)
urls = re.findall(r'^(https?://\S+)', text_body, flags=re.M)
# Intento 1: descargar cada URL con requests (usa cookies montadas si aplican)
combined = []
for idx, u in enumerate(urls):
try:
r2 = requests.get(u, headers=headers, timeout=20, cookies=cookies_for_requests)
if r2.status_code == 200 and r2.text:
combined.append(r2.text)
continue
except Exception:
# fallthrough al fallback
pass
# Intento 2 (fallback): usar yt-dlp para descargar ese timedtext/url a un archivo temporal
try:
with tempfile.TemporaryDirectory() as tdir:
out_template = os.path.join(tdir, f"timedtext_{idx}.%(ext)s")
ytdlp_cmd = [
"yt-dlp",
u,
"-o", out_template,
"--no-warnings",
]
if os.path.exists(cookies_path):
ytdlp_cmd.extend(["--cookies", cookies_path])
try:
res2 = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=60)
stderr2 = (res2.stderr or "").lower()
if res2.returncode != 0 and ('http error 429' in stderr2 or 'too many requests' in stderr2):
# rate limit cuando intentamos descargar timedtext
return None, "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde."
if res2.returncode != 0 and ('http error 403' in stderr2 or 'forbidden' in stderr2):
return None, "Acceso denegado al descargar subtítulos (HTTP 403). Intenta con cookies.txt o una cuenta con permisos."
except Exception:
pass
# leer cualquier archivo creado en el tempdir
for fpath in glob.glob(os.path.join(tdir, "timedtext_*.*")):
try:
with open(fpath, 'r', encoding='utf-8') as fh:
txt = fh.read()
if txt:
combined.append(txt)
except Exception:
continue
except Exception:
continue
if combined:
vtt_combined = "\n".join(combined)
formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt')
if formatted_transcript:
return formatted_transcript, None
try:
subtitle_data = response.json()
formatted_transcript = parse_subtitle_format(subtitle_data, subtitle_format)
except json.JSONDecodeError:
formatted_transcript = parse_subtitle_format(response.text, subtitle_format)
except Exception as e:
return None, f"Error al procesar los subtítulos: {str(e)[:200]}"
if not formatted_transcript:
return None, "Los subtítulos están vacíos o no se pudieron procesar."
return formatted_transcript, None
# Fallback: intentarlo descargando subtítulos con yt-dlp a un directorio temporal
# (esto cubre casos en que la metadata no incluye requested_subtitles)
try:
with tempfile.TemporaryDirectory() as tmpdir:
# Intentar con auto-sub primero, luego con sub (manual)
ytdlp_variants = [
("--write-auto-sub", "auto"),
("--write-sub", "manual")
]
downloaded = None
for flag, label in ytdlp_variants:
cmd = [
"yt-dlp",
url,
"--skip-download",
flag,
"--sub-lang", lang,
"--sub-format", "json3/vtt/srv3/best",
"-o", os.path.join(tmpdir, "%(id)s.%(ext)s"),
"--no-warnings",
]
if os.path.exists(cookies_path):
cmd.extend(["--cookies", cookies_path])
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
except subprocess.TimeoutExpired:
r = None
# Revisar si se creó algún archivo en tmpdir
files = glob.glob(os.path.join(tmpdir, f"{video_id}.*"))
if files:
# Tomar el primero válido
downloaded = files[0]
break
if downloaded:
ext = os.path.splitext(downloaded)[1].lstrip('.')
try:
with open(downloaded, 'r', encoding='utf-8') as fh:
content = fh.read()
except Exception as e:
return None, f"Error leyendo archivo de subtítulos descargado: {str(e)[:200]}"
# Intentar parsear según extensión conocida
fmt = 'json3' if ext in ('json', 'json3') else ('vtt' if ext == 'vtt' else 'srv3')
formatted_transcript = parse_subtitle_format(content, fmt)
if formatted_transcript:
return formatted_transcript, None
else:
return None, "Se descargaron subtítulos pero no se pudieron procesar."
except FileNotFoundError:
return None, "yt-dlp no está instalado. Instala yt-dlp en el contenedor/entorno y vuelve a intentar."
except Exception as e:
# No hacer crash, retornar mensaje general
return None, f"Error al intentar descargar subtítulos con yt-dlp: {str(e)[:200]}"
return None, "No se encontraron subtítulos para este video (o el video no tiene subtítulos disponibles). Intenta con otro video en vivo o agrega cookies.txt si hay restricciones."
def get_stream_url(video_id: str):
"""
Obtiene la URL de transmisión m3u8 del video usando yt-dlp con cookies y estrategias de fallback
"""
url = f"https://www.youtube.com/watch?v={video_id}"
# Leer la ruta de cookies desde la variable de entorno (si no está, usar valor por defecto)
cookies_path = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH)
# Lista de formatos a intentar en orden de prioridad
format_strategies = [
("best[ext=m3u8]", "Mejor calidad m3u8"),
("best", "Mejor calidad disponible"),
("best[ext=mp4]", "Mejor calidad MP4"),
("bestvideo+bestaudio/best", "Mejor video y audio"),
]
for format_spec, description in format_strategies:
# Comando optimizado para obtener la mejor URL disponible
command = [
"yt-dlp",
"-g", # Obtener solo la URL
"-f", format_spec,
"--no-warnings", # Sin advertencias
"--no-check-certificate", # Ignorar errores de certificado
"--extractor-args", "youtube:player_client=android", # Usar cliente Android
]
# Agregar cookies solo si el archivo existe
if os.path.exists(cookies_path):
command.extend(["--cookies", cookies_path])
command.append(url)
try:
result = subprocess.run(command, capture_output=True, text=True, check=False, timeout=60)
if result.returncode == 0 and result.stdout.strip():
# Obtener todas las URLs (puede haber video y audio separados)
urls = result.stdout.strip().split('\n')
# Buscar la URL m3u8 o googlevideo
stream_url = None
for url_line in urls:
if url_line and url_line.strip():
# Preferir URLs con m3u8
if 'm3u8' in url_line.lower():
stream_url = url_line.strip()
break
# O URLs de googlevideo
elif 'googlevideo.com' in url_line:
stream_url = url_line.strip()
break
# Si no encontramos ninguna específica, usar la primera URL válida
if not stream_url and urls:
for url_line in urls:
if url_line and url_line.strip() and url_line.startswith('http'):
stream_url = url_line.strip()
break
if stream_url:
return stream_url, None
# Este formato falló, intentar el siguiente
continue
except subprocess.TimeoutExpired:
continue
except Exception as e:
continue
# Si todos los formatos fallaron
return None, "No se pudo obtener la URL del stream. Verifica que el video esté EN VIVO (🔴) y no tenga restricciones."
@app.get("/transcript/{video_id}")
def transcript_endpoint(video_id: str, lang: str = "es"):
data, error = get_transcript_data(video_id, lang)
if error:
raise HTTPException(status_code=400, detail=error)
# Concatenar texto de segmentos para mostrar como texto plano además de los segmentos
try:
combined_text = "\n".join([seg.get('text', '') for seg in data if seg.get('text')])
except Exception:
combined_text = ""
return {
"video_id": video_id,
"count": len(data),
"segments": data,
"text": combined_text
}
@app.get("/stream/{video_id}")
def stream_endpoint(video_id: str):
"""
Endpoint para obtener la URL de transmisión en vivo de un video de YouTube
Retorna la URL m3u8 que se puede usar directamente con FFmpeg para retransmitir
a redes sociales usando RTMP.
Ejemplo de uso con FFmpeg:
ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key
"""
stream_url, error = get_stream_url(video_id)
if error:
raise HTTPException(status_code=400, detail=error)
# Determinar el tipo de URL obtenida
url_type = "unknown"
if "m3u8" in stream_url.lower():
url_type = "m3u8/hls"
elif "googlevideo.com" in stream_url:
url_type = "direct/mp4"
return {
"video_id": video_id,
"stream_url": stream_url,
"url_type": url_type,
"youtube_url": f"https://www.youtube.com/watch?v={video_id}",
"ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key',
"usage": {
"description": "Usa stream_url con FFmpeg para retransmitir",
"command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}",
"platforms": {
"youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY",
"facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY",
"twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY",
"twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY"
}
}
}
@app.post('/upload_cookies')
async def upload_cookies(file: UploadFile = File(...)):
"""Endpoint para subir cookies.txt y guardarlo en el servidor en /app/cookies.txt"""
try:
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail='Archivo vacío')
target = 'cookies.txt'
# Guardar con permisos de escritura
with open(target, 'wb') as fh:
fh.write(content)
return {"detail": "cookies.txt guardado correctamente", "path": os.path.abspath(target)}
except Exception as e:
raise HTTPException(status_code=500, detail=f'Error al guardar cookies: {str(e)[:200]}')
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)