import os import json import subprocess import requests from fastapi import FastAPI, HTTPException, Query from typing import List, Dict app = FastAPI(title="TubeScript API Pro - JSON Cleaner") def clean_youtube_json(raw_json: Dict) -> List[Dict]: """ Transforma el formato complejo 'json3' de YouTube a un formato simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}] """ clean_data = [] # YouTube guarda los eventos de texto en la llave 'events' events = raw_json.get('events', []) for event in events: # Solo procesamos eventos que tengan segmentos de texto if 'segs' in event: text = "".join([seg['utf8'] for seg in event['segs']]).strip() if text and text != '\n': clean_data.append({ "start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos "duration": event.get('dDurationMs', 0) / 1000.0, "text": text.replace('\n', ' ') }) return clean_data def get_transcript_data(video_id: str, lang: str): url = f"https://www.youtube.com/watch?v={video_id}" cookies_path = "cookies.txt" # Comando yt-dlp optimizado command = [ "yt-dlp", "--skip-download", "--write-auto-subs", # Si no hay manuales, trae los de IA "--sub-format", "json3", "--sub-langs", f"{lang}.*", # Acepta variantes como es-419 "--cookies", cookies_path if os.path.exists(cookies_path) else "", "--dump-json", url ] try: # 1. Obtener metadatos con yt-dlp result = subprocess.run(command, capture_output=True, text=True, check=True) video_metadata = json.loads(result.stdout) # 2. Buscar la URL de los subtítulos requested_subs = video_metadata.get('requested_subtitles', {}) if not requested_subs: return None, "No se encontraron subtítulos para este idioma." # Obtenemos la URL del primer idioma que coincida lang_key = next(iter(requested_subs)) sub_url = requested_subs[lang_key]['url'] # 3. Descargar el JSON crudo de los servidores de YouTube response = requests.get(sub_url) if response.status_code != 200: return None, "Error al descargar el archivo de subtítulos desde YouTube." # 4. Limpiar y formatear formatted_transcript = clean_youtube_json(response.json()) return formatted_transcript, None except subprocess.CalledProcessError as e: return None, f"YouTube bloqueó la petición: {e.stderr[:200]}" except Exception as e: return None, str(e) def get_stream_url(video_id: str): """ Obtiene la URL de transmisión m3u8 del video usando yt-dlp con cookies """ url = f"https://www.youtube.com/watch?v={video_id}" cookies_path = "cookies.txt" # Comando optimizado para obtener la mejor URL disponible command = [ "yt-dlp", "-g", # Obtener solo la URL "-f", "best[ext=m3u8]/best", # Mejor calidad disponible "--no-warnings", # Sin advertencias "--no-check-certificate", # Ignorar errores de certificado ] # Agregar cookies solo si el archivo existe if os.path.exists(cookies_path): command.extend(["--cookies", cookies_path]) command.append(url) try: result = subprocess.run(command, capture_output=True, text=True, check=False, timeout=60) if result.returncode == 0 and result.stdout.strip(): # Obtener todas las URLs (puede haber video y audio separados) urls = result.stdout.strip().split('\n') # Buscar la URL m3u8 o googlevideo stream_url = None for url_line in urls: if url_line and url_line.strip(): # Preferir URLs con m3u8 if 'm3u8' in url_line.lower(): stream_url = url_line.strip() break # O URLs de googlevideo elif 'googlevideo.com' in url_line: stream_url = url_line.strip() break # Si no encontramos ninguna específica, usar la primera URL válida if not stream_url and urls: for url_line in urls: if url_line and url_line.strip() and url_line.startswith('http'): stream_url = url_line.strip() break if not stream_url: return None, "No se pudo obtener la URL de transmisión" return stream_url, None # Error en la ejecución error_msg = result.stderr if result.stderr else "No se pudo obtener la URL" return None, f"Error de yt-dlp: {error_msg[:200]}" except subprocess.CalledProcessError as e: error_msg = e.stderr if e.stderr else str(e) return None, f"Error al obtener la URL: {error_msg[:200]}" except Exception as e: return None, str(e) @app.get("/transcript/{video_id}") def transcript_endpoint(video_id: str, lang: str = "es"): data, error = get_transcript_data(video_id, lang) if error: raise HTTPException(status_code=400, detail=error) return { "video_id": video_id, "count": len(data), "segments": data } @app.get("/stream/{video_id}") def stream_endpoint(video_id: str): """ Endpoint para obtener la URL de transmisión en vivo de un video de YouTube Retorna la URL m3u8 que se puede usar directamente con FFmpeg para retransmitir a redes sociales usando RTMP. Ejemplo de uso con FFmpeg: ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key """ stream_url, error = get_stream_url(video_id) if error: raise HTTPException(status_code=400, detail=error) # Determinar el tipo de URL obtenida url_type = "unknown" if "m3u8" in stream_url.lower(): url_type = "m3u8/hls" elif "googlevideo.com" in stream_url: url_type = "direct/mp4" return { "video_id": video_id, "stream_url": stream_url, "url_type": url_type, "youtube_url": f"https://www.youtube.com/watch?v={video_id}", "ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key', "usage": { "description": "Usa stream_url con FFmpeg para retransmitir", "command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}", "platforms": { "youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY", "facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY", "twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY", "twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY" } } } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)