import os import json import subprocess import requests from fastapi import FastAPI, HTTPException, Query from typing import List, Dict app = FastAPI(title="TubeScript API Pro - JSON Cleaner") def clean_youtube_json(raw_json: Dict) -> List[Dict]: """ Transforma el formato complejo 'json3' de YouTube a un formato simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}] """ clean_data = [] # YouTube guarda los eventos de texto en la llave 'events' events = raw_json.get('events', []) for event in events: # Solo procesamos eventos que tengan segmentos de texto if 'segs' in event: text = "".join([seg['utf8'] for seg in event['segs']]).strip() if text and text != '\n': clean_data.append({ "start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos "duration": event.get('dDurationMs', 0) / 1000.0, "text": text.replace('\n', ' ') }) return clean_data def get_transcript_data(video_id: str, lang: str): url = f"https://www.youtube.com/watch?v={video_id}" cookies_path = "cookies.txt" # Comando yt-dlp optimizado command = [ "yt-dlp", "--skip-download", "--write-auto-subs", # Si no hay manuales, trae los de IA "--sub-format", "json3", "--sub-langs", f"{lang}.*", # Acepta variantes como es-419 "--cookies", cookies_path if os.path.exists(cookies_path) else "", "--dump-json", url ] try: # 1. Obtener metadatos con yt-dlp result = subprocess.run(command, capture_output=True, text=True, check=True) video_metadata = json.loads(result.stdout) # 2. Buscar la URL de los subtítulos requested_subs = video_metadata.get('requested_subtitles', {}) if not requested_subs: return None, "No se encontraron subtítulos para este idioma." # Obtenemos la URL del primer idioma que coincida lang_key = next(iter(requested_subs)) sub_url = requested_subs[lang_key]['url'] # 3. Descargar el JSON crudo de los servidores de YouTube response = requests.get(sub_url) if response.status_code != 200: return None, "Error al descargar el archivo de subtítulos desde YouTube." # 4. Limpiar y formatear formatted_transcript = clean_youtube_json(response.json()) return formatted_transcript, None except subprocess.CalledProcessError as e: return None, f"YouTube bloqueó la petición: {e.stderr[:200]}" except Exception as e: return None, str(e) def get_stream_url(video_id: str): """ Obtiene la URL de transmisión en vivo del video usando yt-dlp con cookies """ url = f"https://www.youtube.com/watch?v={video_id}" cookies_path = "cookies.txt" # Comando para obtener la URL de transmisión command = [ "yt-dlp", "-g", # Obtener solo la URL "-f", "best[ext=m3u8]/best", # Formato preferido m3u8 o mejor disponible ] # Agregar cookies solo si el archivo existe if os.path.exists(cookies_path): command.extend(["--cookies", cookies_path]) command.append(url) try: result = subprocess.run(command, capture_output=True, text=True, check=True) stream_url = result.stdout.strip() if not stream_url: return None, "No se pudo obtener la URL de transmisión" return stream_url, None except subprocess.CalledProcessError as e: error_msg = e.stderr if e.stderr else str(e) return None, f"Error al obtener la URL: {error_msg[:200]}" except Exception as e: return None, str(e) @app.get("/transcript/{video_id}") def transcript_endpoint(video_id: str, lang: str = "es"): data, error = get_transcript_data(video_id, lang) if error: raise HTTPException(status_code=400, detail=error) return { "video_id": video_id, "count": len(data), "segments": data } @app.get("/stream/{video_id}") def stream_endpoint(video_id: str): """ Endpoint para obtener la URL de transmisión en vivo de un video de YouTube """ stream_url, error = get_stream_url(video_id) if error: raise HTTPException(status_code=400, detail=error) return { "video_id": video_id, "stream_url": stream_url } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)