TubeScript-API/main.py

361 lines
14 KiB
Python

import os
import json
import subprocess
import requests
import time
from fastapi import FastAPI, HTTPException, Query
from typing import List, Dict
app = FastAPI(title="TubeScript API Pro - JSON Cleaner")
def clean_youtube_json(raw_json: Dict) -> List[Dict]:
"""
Transforma el formato complejo 'json3' de YouTube a un formato
simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}]
"""
clean_data = []
# YouTube guarda los eventos de texto en la llave 'events'
events = raw_json.get('events', [])
for event in events:
# Solo procesamos eventos que tengan segmentos de texto
if 'segs' in event:
text = "".join([seg['utf8'] for seg in event['segs']]).strip()
if text and text != '\n':
clean_data.append({
"start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos
"duration": event.get('dDurationMs', 0) / 1000.0,
"text": text.replace('\n', ' ')
})
return clean_data
def parse_subtitle_format(content: str, format_type: str) -> List[Dict]:
"""
Parsea diferentes formatos de subtítulos (json3, srv3, vtt) al formato estándar
"""
try:
if format_type == 'json3':
# Formato JSON3 de YouTube
data = json.loads(content) if isinstance(content, str) else content
return clean_youtube_json(data)
elif format_type in ['srv3', 'vtt']:
# Para srv3 y vtt, intentar parsear como JSON primero
try:
data = json.loads(content) if isinstance(content, str) else content
# srv3 también tiene estructura similar a json3
if 'events' in data:
return clean_youtube_json(data)
except:
pass
# Si no es JSON, intentar parsear como texto VTT
clean_data = []
lines = content.split('\n') if isinstance(content, str) else []
current_time = 0.0
current_text = ""
for line in lines:
line = line.strip()
if not line or line.startswith('WEBVTT') or '-->' in line:
if '-->' in line:
# Extraer tiempo de inicio
try:
time_parts = line.split('-->')[0].strip().split(':')
if len(time_parts) >= 2:
current_time = float(time_parts[-2]) * 60 + float(time_parts[-1])
except:
pass
continue
if line and not line.isdigit():
current_text = line
if current_text:
clean_data.append({
"start": current_time,
"duration": 2.0, # Duración aproximada
"text": current_text
})
current_time += 2.0
return clean_data if clean_data else []
else:
# Formato desconocido, intentar como JSON
data = json.loads(content) if isinstance(content, str) else content
if 'events' in data:
return clean_youtube_json(data)
return []
except Exception as e:
print(f"Error parsing subtitle format {format_type}: {e}")
return []
def get_transcript_data(video_id: str, lang: str):
url = f"https://www.youtube.com/watch?v={video_id}"
cookies_path = "cookies.txt"
# Comando ultra-simplificado - SOLO metadatos, sin opciones adicionales
command = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
url
]
# Agregar cookies solo si el archivo existe
if os.path.exists(cookies_path):
command.extend(["--cookies", cookies_path])
try:
# 1. Obtener metadatos con yt-dlp
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
error_msg = result.stderr if result.stderr else "Error desconocido"
return None, f"Error de yt-dlp al obtener metadatos: {error_msg[:300]}"
if not result.stdout.strip():
return None, "No se obtuvieron datos del video. Verifica que el video_id sea correcto."
video_metadata = json.loads(result.stdout)
# 2. Buscar subtítulos de forma muy flexible
requested_subs = video_metadata.get('requested_subtitles', {})
# Si no hay requested_subtitles, buscar en cualquier fuente disponible
if not requested_subs:
# Intentar con automatic_captions primero
automatic_captions = video_metadata.get('automatic_captions', {})
if automatic_captions:
# Buscar idiomas que contengan el código solicitado
for lang_key in automatic_captions.keys():
if lang in lang_key or lang_key.startswith(lang):
# Tomar el PRIMER formato disponible
if automatic_captions[lang_key]:
requested_subs = {lang_key: automatic_captions[lang_key][0]}
break
# Si no, intentar con subtitles manuales
if not requested_subs:
subtitles = video_metadata.get('subtitles', {})
if subtitles:
for lang_key in subtitles.keys():
if lang in lang_key or lang_key.startswith(lang):
if subtitles[lang_key]:
requested_subs = {lang_key: subtitles[lang_key][0]}
break
if not requested_subs:
return None, f"No se encontraron subtítulos para el idioma '{lang}'. El video puede no tener subtítulos disponibles."
# Obtenemos la URL del primer idioma que coincida
lang_key = next(iter(requested_subs))
sub_url = requested_subs[lang_key].get('url')
if not sub_url:
return None, "No se pudo obtener la URL de los subtítulos."
# 3. Descargar el JSON crudo de los servidores de YouTube con headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Referer': 'https://www.youtube.com/',
}
# Intentar descargar con retry en caso de rate limiting
max_retries = 3
response = None
for attempt in range(max_retries):
try:
response = requests.get(sub_url, headers=headers, timeout=30)
if response.status_code == 200:
break
elif response.status_code == 429:
# Rate limiting - esperar y reintentar
if attempt < max_retries - 1:
import time
time.sleep(2 * (attempt + 1)) # Espera incremental: 2s, 4s, 6s
continue
else:
return None, "YouTube está limitando las peticiones (HTTP 429). Por favor espera unos minutos e intenta nuevamente, o agrega cookies.txt válidas."
elif response.status_code == 403:
return None, f"Acceso denegado (HTTP 403). El video puede tener restricciones geográficas o de edad. Intenta agregar cookies.txt."
elif response.status_code == 404:
return None, f"Subtítulos no encontrados (HTTP 404). El video puede no tener subtítulos disponibles."
else:
return None, f"Error al descargar subtítulos desde YouTube (HTTP {response.status_code}). El video puede tener restricciones."
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
continue
return None, "Timeout al descargar subtítulos. Intenta nuevamente."
except requests.exceptions.RequestException as e:
return None, f"Error de conexión al descargar subtítulos: {str(e)[:100]}"
if not response or response.status_code != 200:
return None, f"No se pudieron obtener los subtítulos después de {max_retries} intentos."
# 4. Detectar el formato de subtítulo
subtitle_format = requested_subs[lang_key].get('ext', 'json3')
# 5. Limpiar y formatear según el tipo
try:
# Intentar parsear como JSON primero
try:
subtitle_data = response.json()
formatted_transcript = parse_subtitle_format(subtitle_data, subtitle_format)
except json.JSONDecodeError:
# Si no es JSON, tratar como texto (VTT)
formatted_transcript = parse_subtitle_format(response.text, subtitle_format)
except Exception as e:
return None, f"Error al procesar los subtítulos: {str(e)[:100]}"
if not formatted_transcript:
return None, "Los subtítulos están vacíos o no se pudieron procesar."
return formatted_transcript, None
except subprocess.TimeoutExpired:
return None, "Timeout al intentar obtener los subtítulos. Intenta nuevamente."
except subprocess.CalledProcessError as e:
return None, f"YouTube bloqueó la petición: {e.stderr[:200]}"
except json.JSONDecodeError:
return None, "Error al procesar los datos de YouTube. El formato de respuesta no es válido."
except Exception as e:
return None, f"Error inesperado: {str(e)[:200]}"
def get_stream_url(video_id: str):
"""
Obtiene la URL de transmisión m3u8 del video usando yt-dlp con cookies y estrategias de fallback
"""
url = f"https://www.youtube.com/watch?v={video_id}"
cookies_path = "cookies.txt"
# Lista de formatos a intentar en orden de prioridad
format_strategies = [
("best[ext=m3u8]", "Mejor calidad m3u8"),
("best", "Mejor calidad disponible"),
("best[ext=mp4]", "Mejor calidad MP4"),
("bestvideo+bestaudio/best", "Mejor video y audio"),
]
for format_spec, description in format_strategies:
# Comando optimizado para obtener la mejor URL disponible
command = [
"yt-dlp",
"-g", # Obtener solo la URL
"-f", format_spec,
"--no-warnings", # Sin advertencias
"--no-check-certificate", # Ignorar errores de certificado
"--extractor-args", "youtube:player_client=android", # Usar cliente Android
]
# Agregar cookies solo si el archivo existe
if os.path.exists(cookies_path):
command.extend(["--cookies", cookies_path])
command.append(url)
try:
result = subprocess.run(command, capture_output=True, text=True, check=False, timeout=60)
if result.returncode == 0 and result.stdout.strip():
# Obtener todas las URLs (puede haber video y audio separados)
urls = result.stdout.strip().split('\n')
# Buscar la URL m3u8 o googlevideo
stream_url = None
for url_line in urls:
if url_line and url_line.strip():
# Preferir URLs con m3u8
if 'm3u8' in url_line.lower():
stream_url = url_line.strip()
break
# O URLs de googlevideo
elif 'googlevideo.com' in url_line:
stream_url = url_line.strip()
break
# Si no encontramos ninguna específica, usar la primera URL válida
if not stream_url and urls:
for url_line in urls:
if url_line and url_line.strip() and url_line.startswith('http'):
stream_url = url_line.strip()
break
if stream_url:
return stream_url, None
# Este formato falló, intentar el siguiente
continue
except subprocess.TimeoutExpired:
continue
except Exception as e:
continue
# Si todos los formatos fallaron
return None, "No se pudo obtener la URL del stream. Verifica que el video esté EN VIVO (🔴) y no tenga restricciones."
@app.get("/transcript/{video_id}")
def transcript_endpoint(video_id: str, lang: str = "es"):
data, error = get_transcript_data(video_id, lang)
if error:
raise HTTPException(status_code=400, detail=error)
return {
"video_id": video_id,
"count": len(data),
"segments": data
}
@app.get("/stream/{video_id}")
def stream_endpoint(video_id: str):
"""
Endpoint para obtener la URL de transmisión en vivo de un video de YouTube
Retorna la URL m3u8 que se puede usar directamente con FFmpeg para retransmitir
a redes sociales usando RTMP.
Ejemplo de uso con FFmpeg:
ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key
"""
stream_url, error = get_stream_url(video_id)
if error:
raise HTTPException(status_code=400, detail=error)
# Determinar el tipo de URL obtenida
url_type = "unknown"
if "m3u8" in stream_url.lower():
url_type = "m3u8/hls"
elif "googlevideo.com" in stream_url:
url_type = "direct/mp4"
return {
"video_id": video_id,
"stream_url": stream_url,
"url_type": url_type,
"youtube_url": f"https://www.youtube.com/watch?v={video_id}",
"ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key',
"usage": {
"description": "Usa stream_url con FFmpeg para retransmitir",
"command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}",
"platforms": {
"youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY",
"facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY",
"twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY",
"twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY"
}
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)