import os import json import subprocess import requests import time import re import tempfile import glob from fastapi import FastAPI, HTTPException, UploadFile, File from typing import List, Dict, Any, cast # Intentar importar youtube_transcript_api como fallback try: from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound YOUTUBE_TRANSCRIPT_API_AVAILABLE = True except Exception: # definir placeholders para evitar NameError si la librería no está instalada YouTubeTranscriptApi = None class TranscriptsDisabled(Exception): pass class NoTranscriptFound(Exception): pass YOUTUBE_TRANSCRIPT_API_AVAILABLE = False # Import CookieManager from yt_wrap to provide cookiefile paths per request from yt_wrap import CookieManager app = FastAPI(title="TubeScript API Pro - JSON Cleaner") # Ruta de cookies configurable vía variable de entorno: API_COOKIES_PATH DEFAULT_COOKIES_PATH = os.getenv('API_COOKIES_PATH', './cookies.txt') # Proxy opcional para requests/yt-dlp (ej. socks5h://127.0.0.1:9050) DEFAULT_PROXY = os.getenv('API_PROXY', '') def clean_youtube_json(raw_json: Dict) -> List[Dict]: """ Transforma el formato complejo 'json3' de YouTube a un formato simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}] """ clean_data = [] # YouTube guarda los eventos de texto en la llave 'events' events = raw_json.get('events', []) for event in events: # Solo procesamos eventos que tengan segmentos de texto if 'segs' in event: text = "".join([seg['utf8'] for seg in event['segs']]).strip() if text and text != '\n': clean_data.append({ "start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos "duration": event.get('dDurationMs', 0) / 1000.0, "text": text.replace('\n', ' ') }) return clean_data def parse_subtitle_format(content: str, format_type: str) -> List[Dict]: """ Parsea diferentes formatos de subtítulos (json3, srv3, vtt) al formato estándar """ try: if format_type == 'json3': # Formato JSON3 de YouTube data = json.loads(content) if isinstance(content, str) else content return clean_youtube_json(data) elif format_type in ['srv3', 'vtt']: # Para srv3 y vtt, intentar parsear como JSON primero try: data = json.loads(content) if isinstance(content, str) else content # srv3 también tiene estructura similar a json3 if 'events' in data: return clean_youtube_json(data) except: pass # Si no es JSON, intentar parsear como texto VTT clean_data = [] lines = content.split('\n') if isinstance(content, str) else [] current_time = 0.0 current_text = "" for line in lines: line = line.strip() if not line or line.startswith('WEBVTT') or '-->' in line: if '-->' in line: # Extraer tiempo de inicio try: time_parts = line.split('-->')[0].strip().split(':') if len(time_parts) >= 2: current_time = float(time_parts[-2]) * 60 + float(time_parts[-1]) except: pass continue if line and not line.isdigit(): current_text = line if current_text: clean_data.append({ "start": current_time, "duration": 2.0, # Duración aproximada "text": current_text }) current_time += 2.0 return clean_data if clean_data else [] else: # Formato desconocido, intentar como JSON data = json.loads(content) if isinstance(content, str) else content if 'events' in data: return clean_youtube_json(data) return [] except Exception as e: print(f"Error parsing subtitle format {format_type}: {e}") return [] def extract_video_id(video_id_or_url: str) -> str: """ Normaliza la entrada y extrae el video_id si se recibe una URL completa. Acepta: https://www.youtube.com/watch?v=ID, youtu.be/ID, o el propio ID. """ if not video_id_or_url: return "" s = video_id_or_url.strip() # Si ya parece un id (11-20 caracteres alfanuméricos y -, _), retornarlo if re.match(r'^[A-Za-z0-9_-]{8,20}$', s): return s # Intentar extraer de URL completa # watch?v= m = re.search(r'[?&]v=([A-Za-z0-9_-]{8,20})', s) if m: return m.group(1) # youtu.be/ m = re.search(r'youtu\.be/([A-Za-z0-9_-]{8,20})', s) if m: return m.group(1) # /v/ or /embed/ m = re.search(r'(?:/v/|/embed/)([A-Za-z0-9_-]{8,20})', s) if m: return m.group(1) # Si no se detecta, devolver la entrada original (fallará después si es inválida) return s def format_segments_text(segments: List[Dict]) -> List[str]: """Devuelve una lista 'format_text' con textos limpios extraídos de segments. - elimina prefijos tipo 'Kind: captions' - elimina contenido en corchetes/paréntesis - elimina etiquetas HTML - normaliza espacios - divide por saltos de línea para obtener frases independientes """ def _clean_text(t: str) -> str: if not t: return '' s = str(t).strip() s = re.sub(r'^\s*Kind\s*:\s*.*$', '', s, flags=re.IGNORECASE).strip() # eliminar contenido entre corchetes (no-greedy) s = re.sub(r'\[.*?\]', '', s) s = re.sub(r'\(.*?\)', '', s) s = re.sub(r'<[^>]+>', '', s) s = re.sub(r'[♪★■◆►▶◀•–—]', '', s) s = re.sub(r'\s+', ' ', s).strip() return s output: List[str] = [] for seg in segments or []: raw = seg.get('text', '') cleaned = _clean_text(raw) if not cleaned: continue parts = [p.strip() for p in re.split(r'[\n\r]+', cleaned) if p.strip()] output.extend(parts) return output # Nuevo helper: obtener thumbnails para un video (intenta yt-dlp --dump-json, fallback a URLs estándar) def get_video_thumbnails(video_id: str) -> List[str]: """Devuelve una lista de URLs de thumbnail para el video. Primero intenta obtener metadata con yt-dlp y extraer 'thumbnails' o 'thumbnail'. Si falla, construye una lista de URLs por defecto (maxresdefault, sddefault, hqdefault, mqdefault, default). """ thumbs: List[str] = [] url = f"https://www.youtube.com/watch?v={video_id}" cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH) proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None cmd = [ "yt-dlp", "--skip-download", "--dump-json", "--no-warnings", url ] if os.path.exists(cookies_path): cmd.extend(["--cookies", cookies_path]) if proxy: cmd.extend(['--proxy', proxy]) try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if proc.returncode == 0 and proc.stdout: try: meta = json.loads(proc.stdout) # thumbnails puede ser lista de dicts con 'url' t = meta.get('thumbnails') or meta.get('thumbnail') if isinstance(t, list): for item in t: if isinstance(item, dict) and item.get('url'): thumbs.append(item.get('url')) elif isinstance(item, str): thumbs.append(item) elif isinstance(t, dict) and t.get('url'): thumbs.append(t.get('url')) elif isinstance(t, str): thumbs.append(t) except Exception: pass except Exception: pass finally: try: cookie_mgr.cleanup() except Exception: pass # Si no obtuvimos thumbnails desde metadata, construir URLs estándar if not thumbs: thumbs = [ f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg", f"https://i.ytimg.com/vi/{video_id}/sddefault.jpg", f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg", f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg", f"https://i.ytimg.com/vi/{video_id}/default.jpg", ] # deduplicate while preserving order seen = set() unique_thumbs = [] for t in thumbs: if t and t not in seen: seen.add(t) unique_thumbs.append(t) return unique_thumbs def get_transcript_data(video_id: str, lang: str = "es"): video_id = extract_video_id(video_id) if not video_id: return None, [], "video_id inválido o vacío" url = f"https://www.youtube.com/watch?v={video_id}" # Use CookieManager to get a cookiefile path per request (may be None) cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() # cookies_path: prefer the temporary cookiefile if present, otherwise fall back to env path cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH) # proxy support proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None proxies = {'http': proxy, 'https': proxy} if proxy else None def load_cookies_from_file(path: str) -> dict: """Parsea un cookies.txt en formato Netscape a un dict usable por requests.""" cookies = {} try: if not path or not os.path.exists(path): return cookies with open(path, 'r', encoding='utf-8', errors='ignore') as fh: for line in fh: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') # formato Netscape: domain, flag, path, secure, expiration, name, value if len(parts) >= 7: name = parts[5].strip() value = parts[6].strip() if name: cookies[name] = value else: # fallback: intento simple name=value if '=' in line: k, v = line.split('=', 1) cookies[k.strip()] = v.strip() except Exception: return {} return cookies cookies_for_requests = load_cookies_from_file(cookies_path) if cookies_path else {} # Intento rápido y fiable: usar yt-dlp para descargar subtítulos (auto o manual) al tmpdir try: with tempfile.TemporaryDirectory() as tmpdl: # probar variantes de idioma (ej. es y es-419) para cubrir casos regionales sub_langs = [lang] if len(lang) == 2: sub_langs.append(f"{lang}-419") ytdlp_cmd = [ "yt-dlp", url, "--skip-download", "--write-auto-sub", "--write-sub", "--sub-format", "vtt/json3/srv3/best", "-o", os.path.join(tmpdl, "%(id)s.%(ext)s"), "--no-warnings", ] # agregar sub-lang si hay variantes if sub_langs: ytdlp_cmd.extend(["--sub-lang", ",".join(sub_langs)]) # attach cookiefile if exists if cookiefile_path: ytdlp_cmd.extend(["--cookies", cookiefile_path]) # attach proxy if configured if proxy: ytdlp_cmd.extend(['--proxy', proxy]) try: result = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=120) # Si yt-dlp falló por rate limiting, devolver mensaje claro stderr = (result.stderr or "").lower() if result.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr): return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega un cookies.txt válido exportado desde tu navegador y monta en el contenedor, o espera unos minutos." if result.returncode != 0 and ('http error 403' in stderr or 'forbidden' in stderr): return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). El video puede tener restricciones. Usa cookies.txt con una cuenta autorizada." except subprocess.TimeoutExpired: pass # revisar archivos creados files = glob.glob(os.path.join(tmpdl, f"{video_id}.*")) if files: combined = [] for fpath in files: try: with open(fpath, 'r', encoding='utf-8') as fh: combined.append(fh.read()) except Exception: continue if combined: vtt_combined = "\n".join(combined) parsed = parse_subtitle_format(vtt_combined, 'vtt') if parsed: return parsed, get_video_thumbnails(video_id), None finally: # cleanup any temp cookiefile created for this request try: cookie_mgr.cleanup() except Exception: pass # ...existing code continues... # 1) Intento principal: obtener metadata con yt-dlp command = [ "yt-dlp", "--skip-download", "--dump-json", "--no-warnings", url ] if os.path.exists(cookies_path): command.extend(["--cookies", cookies_path]) # attach proxy if configured if proxy: command.extend(['--proxy', proxy]) try: result = subprocess.run(command, capture_output=True, text=True, timeout=60) if result.returncode != 0: error_msg = result.stderr if result.stderr else "Error desconocido from yt-dlp" # Si yt-dlp reporta algo, enviar mensaje útil # No abortar inmediatamente: intentaremos fallback descargando subs con yt-dlp video_metadata = None else: if not result.stdout.strip(): video_metadata = None else: try: video_metadata = json.loads(result.stdout) except Exception: video_metadata = None except subprocess.TimeoutExpired: video_metadata = None except FileNotFoundError: return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar." except Exception as e: video_metadata = None requested_subs = {} if video_metadata: requested_subs = video_metadata.get('requested_subtitles', {}) or {} # Buscar en automatic_captions y subtitles si requested_subs está vacío if not requested_subs: automatic_captions = video_metadata.get('automatic_captions', {}) or {} for lang_key, formats in automatic_captions.items(): if lang in lang_key or lang_key.startswith(lang): if formats: requested_subs = {lang_key: formats[0]} break if not requested_subs: subtitles = video_metadata.get('subtitles', {}) or {} for lang_key, formats in subtitles.items(): if lang in lang_key or lang_key.startswith(lang): if formats: requested_subs = {lang_key: formats[0]} break # Si requested_subs está disponible, intentar descargar vía requests la URL proporcionada if requested_subs: lang_key = next(iter(requested_subs)) sub_url = requested_subs[lang_key].get('url') if sub_url: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', 'Referer': 'https://www.youtube.com/', } max_retries = 3 response = None rate_limited = False for attempt in range(max_retries): try: response = requests.get(sub_url, headers=headers, timeout=30, cookies=cookies_for_requests, proxies=proxies) if response.status_code == 200: break elif response.status_code == 429: rate_limited = True if attempt < max_retries - 1: time.sleep(2 * (attempt + 1)) continue else: # salir del loop y usar fallback con yt-dlp más abajo break elif response.status_code == 403: return None, get_video_thumbnails(video_id), "Acceso denegado (HTTP 403). El video puede tener restricciones de edad o región. Intenta con cookies.txt." elif response.status_code == 404: # No encontramos la URL esperada; intentar fallback response = None break else: return None, get_video_thumbnails(video_id), f"Error al descargar subtítulos desde YouTube (HTTP {response.status_code})." except requests.exceptions.Timeout: if attempt < max_retries - 1: continue return None, get_video_thumbnails(video_id), "Timeout al descargar subtítulos. Intenta nuevamente." except requests.exceptions.RequestException as e: return None, get_video_thumbnails(video_id), f"Error de conexión al descargar subtítulos: {str(e)[:100]}" # Si obtuvimos un 200, procesarlo; si hubo rate limiting, intentar fallback con yt-dlp if response and response.status_code == 200: subtitle_format = requested_subs[lang_key].get('ext', 'json3') try: # Si la respuesta parece ser una playlist M3U8 o contiene enlaces a timedtext, # extraer las URLs y concatenar su contenido (VTT) antes de parsear. text_body = response.text if isinstance(response.text, str) else None if text_body and ('#EXTM3U' in text_body or 'timedtext' in text_body or text_body.strip().lower().startswith('#extm3u')): # Extraer URLs (líneas que empiecen con http) urls = re.findall(r'^(https?://\S+)', text_body, flags=re.M) # Intento 1: descargar cada URL con requests (usa cookies montadas si aplican) combined = [] for idx, u in enumerate(urls): try: r2 = requests.get(u, headers=headers, timeout=20, cookies=cookies_for_requests, proxies=proxies) if r2.status_code == 200 and r2.text: combined.append(r2.text) continue # Si recibimos 429, 403, o falló, intentaremos con yt-dlp (fallback) if r2.status_code == 429: # fallback a yt-dlp raise Exception('rate_limited') except Exception: # fallthrough al fallback con yt-dlp pass # Intento 2 (fallback): usar yt-dlp para descargar ese timedtext/url a un archivo temporal try: with tempfile.TemporaryDirectory() as tdir: out_template = os.path.join(tdir, f"timedtext_{idx}.%(ext)s") ytdlp_cmd = [ "yt-dlp", u, "-o", out_template, "--no-warnings", ] if os.path.exists(cookies_path): ytdlp_cmd.extend(["--cookies", cookies_path]) # pasar proxy a yt-dlp si está configurado if proxy: ytdlp_cmd.extend(['--proxy', proxy]) try: res2 = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=60) stderr2 = (res2.stderr or "").lower() if res2.returncode != 0 and ('http error 429' in stderr2 or 'too many requests' in stderr2): # rate limit cuando intentamos descargar timedtext return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde." if res2.returncode != 0 and ('http error 403' in stderr2 or 'forbidden' in stderr2): return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). Intenta con cookies.txt o una cuenta con permisos." except Exception: pass # leer cualquier archivo creado en el tempdir for fpath in glob.glob(os.path.join(tdir, "timedtext_*.*")): try: with open(fpath, 'r', encoding='utf-8') as fh: txt = fh.read() if txt: combined.append(txt) except Exception: continue except Exception: continue if combined: vtt_combined = "\n".join(combined) formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt') if formatted_transcript: return formatted_transcript, get_video_thumbnails(video_id), None try: subtitle_data = response.json() formatted_transcript = parse_subtitle_format(subtitle_data, subtitle_format) except json.JSONDecodeError: formatted_transcript = parse_subtitle_format(response.text, subtitle_format) except Exception as e: return None, get_video_thumbnails(video_id), f"Error al procesar los subtítulos: {str(e)[:200]}" if not formatted_transcript: return None, get_video_thumbnails(video_id), "Los subtítulos están vacíos o no se pudieron procesar." return formatted_transcript, get_video_thumbnails(video_id), None # Si hubo rate limiting, intentar fallback con yt-dlp para descargar la URL de subtítulos if rate_limited and (not response or response.status_code != 200): # Intentar descargar la URL de subtítulos directamente con yt-dlp (usa cookies si existen) try: with tempfile.TemporaryDirectory() as tdir: out_template = os.path.join(tdir, "sub.%(ext)s") ytdlp_cmd = [ "yt-dlp", sub_url, "-o", out_template, "--no-warnings", ] if os.path.exists(cookies_path): ytdlp_cmd.extend(["--cookies", cookies_path]) if proxy: ytdlp_cmd.extend(['--proxy', proxy]) res = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=90) stderr = (res.stderr or "").lower() if res.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr): return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde." # Leer archivos generados combined = [] for fpath in glob.glob(os.path.join(tdir, "*.*")): try: with open(fpath, 'r', encoding='utf-8') as fh: txt = fh.read() if txt: combined.append(txt) except Exception: continue if combined: vtt_combined = "\n".join(combined) formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt') if formatted_transcript: return formatted_transcript, get_video_thumbnails(video_id), None except FileNotFoundError: return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar." except Exception: # seguir con otros fallbacks pass # si no logró con yt-dlp, continuar y dejar que los fallbacks posteriores manejen el caso # Fallback: intentarlo descargando subtítulos con yt-dlp a un directorio temporal # (esto cubre casos en que la metadata no incluye requested_subs) try: with tempfile.TemporaryDirectory() as tmpdir: # Intentar con auto-sub primero, luego con sub (manual) ytdlp_variants = [ ("--write-auto-sub", "auto"), ("--write-sub", "manual") ] downloaded = None for flag, label in ytdlp_variants: cmd = [ "yt-dlp", url, "--skip-download", flag, "--sub-lang", lang, "--sub-format", "json3/vtt/srv3/best", "-o", os.path.join(tmpdir, "%(id)s.%(ext)s"), "--no-warnings", ] if os.path.exists(cookies_path): cmd.extend(["--cookies", cookies_path]) # añadir proxy a la llamada de yt-dlp si está configurado if proxy: cmd.extend(['--proxy', proxy]) r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) # Revisar si se creó algún archivo en tmpdir files = glob.glob(os.path.join(tmpdir, f"{video_id}.*")) if files: # Tomar el primero válido downloaded = files[0] break if downloaded: ext = os.path.splitext(downloaded)[1].lstrip('.') try: with open(downloaded, 'r', encoding='utf-8') as fh: content = fh.read() except Exception as e: return None, get_video_thumbnails(video_id), f"Error leyendo archivo de subtítulos descargado: {str(e)[:200]}" # Intentar parsear según extensión conocida fmt = 'json3' if ext in ('json', 'json3') else ('vtt' if ext == 'vtt' else 'srv3') formatted_transcript = parse_subtitle_format(content, fmt) if formatted_transcript: return formatted_transcript, get_video_thumbnails(video_id), None else: return None, get_video_thumbnails(video_id), "Se descargaron subtítulos pero no se pudieron procesar." except FileNotFoundError: return None, get_video_thumbnails(video_id), "yt-dlp no está instalado. Instala yt-dlp en el contenedor/entorno y vuelve a intentar." except Exception as e: # No hacer crash, retornar mensaje general return None, get_video_thumbnails(video_id), f"Error al intentar descargar subtítulos con yt-dlp: {str(e)[:200]}" return None, get_video_thumbnails(video_id), "No se encontraron subtítulos para este video (o el video no tiene subtítulos disponibles). Intenta con otro video en vivo o agrega cookies.txt si hay restricciones." def get_stream_url(video_id: str): """ Obtiene la URL de transmisión m3u8 del video usando yt-dlp con cookies y estrategias de fallback """ url = f"https://www.youtube.com/watch?v={video_id}" # dynamically get cookiefile for this request cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() try: # Lista de formatos a intentar en orden de prioridad format_strategies = [ ("best[ext=m3u8]", "Mejor calidad m3u8"), ("best", "Mejor calidad disponible"), ("best[ext=mp4]", "Mejor calidad MP4"), ("bestvideo+bestaudio/best", "Mejor video y audio"), ] for format_spec, description in format_strategies: command = [ "yt-dlp", "-g", "-f", format_spec, "--no-warnings", "--no-check-certificate", "--extractor-args", "youtube:player_client=android", ] if cookiefile_path: command.extend(["--cookies", cookiefile_path]) command.append(url) try: result = subprocess.run(command, capture_output=True, text=True, check=False, timeout=60) if result.returncode == 0 and result.stdout.strip(): # Obtener todas las URLs (puede haber video y audio separados) urls = result.stdout.strip().split('\n') # Buscar la URL m3u8 o googlevideo stream_url = None for url_line in urls: if url_line and url_line.strip(): # Preferir URLs con m3u8 if 'm3u8' in url_line.lower(): stream_url = url_line.strip() break # O URLs de googlevideo elif 'googlevideo.com' in url_line: stream_url = url_line.strip() break # Si no encontramos ninguna específica, usar la primera URL válida if not stream_url and urls: for url_line in urls: if url_line and url_line.strip() and url_line.startswith('http'): stream_url = url_line.strip() break if stream_url: return stream_url, None continue except subprocess.TimeoutExpired: continue except Exception: continue return None, "No se pudo obtener la URL del stream. Verifica que el video esté EN VIVO (🔴) y no tenga restricciones." finally: try: cookie_mgr.cleanup() except Exception: pass @app.get("/transcript/{video_id}") def transcript_endpoint(video_id: str, lang: str = "es"): data, thumbnails, error = get_transcript_data(video_id, lang) if error: raise HTTPException(status_code=400, detail=error) # Concatenar texto de segmentos para mostrar como texto plano además de los segmentos try: combined_text = "\n".join([seg.get('text', '') for seg in data if seg.get('text')]) except Exception: combined_text = "" # Nuevo: arreglo format_text con cada segmento como elemento (texto limpio) try: format_text_list = format_segments_text(data) except Exception: format_text_list = [] format_text = format_text_list return { "video_id": video_id, "count": len(data), "segments": data, "text": combined_text, "format_text": format_text, "thumbnails": thumbnails } @app.get('/transcript_vtt/{video_id}') def transcript_vtt(video_id: str, lang: str = 'es'): """Descarga (con yt-dlp) y devuelve subtítulos en VTT, además de segmentos parseados y texto concatenado.""" vtt_text, error = fetch_vtt_subtitles(video_id, lang) if error: raise HTTPException(status_code=400, detail=error) # parsear VTT a segmentos usando parse_subtitle_format segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else [] combined_text = '\n'.join([s.get('text','') for s in segments]) # format_text con texto limpio listo para procesamiento por agentes format_text = format_segments_text(segments) thumbnails = get_video_thumbnails(video_id) return { 'video_id': video_id, 'vtt': vtt_text, 'count': len(segments), 'segments': segments, 'text': combined_text, 'format_text': format_text, 'thumbnails': thumbnails } @app.get("/stream/{video_id}") def stream_endpoint(video_id: str): """ Endpoint para obtener la URL de transmisión en vivo de un video de YouTube Retorna la URL m3u8 que se puede usar directamente con FFmpeg para retransmitir a redes sociales usando RTMP. Ejemplo de uso con FFmpeg: ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key """ stream_url, error = get_stream_url(video_id) if error: raise HTTPException(status_code=400, detail=error) thumbnails = get_video_thumbnails(video_id) # Determinar el tipo de URL obtenida url_type = "unknown" if "m3u8" in stream_url.lower(): url_type = "m3u8/hls" elif "googlevideo.com" in stream_url: url_type = "direct/mp4" return { "video_id": video_id, "stream_url": stream_url, "url_type": url_type, "youtube_url": f"https://www.youtube.com/watch?v={video_id}", "ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key', "thumbnails": thumbnails, "usage": { "description": "Usa stream_url con FFmpeg para retransmitir", "command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}", "platforms": { "youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY", "facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY", "twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY", "twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY" } } } @app.post('/upload_cookies') async def upload_cookies(file: UploadFile = File(...)): """Endpoint para subir cookies.txt y guardarlo en el servidor en /app/cookies.txt""" try: content = await file.read() if not content: raise HTTPException(status_code=400, detail='Archivo vacío') target = 'cookies.txt' # Guardar con permisos de escritura with open(target, 'wb') as fh: fh.write(content) return {"detail": "cookies.txt guardado correctamente", "path": os.path.abspath(target)} except Exception as e: raise HTTPException(status_code=500, detail=f'Error al guardar cookies: {str(e)[:200]}') @app.get("/debug/metadata/{video_id}") def debug_metadata(video_id: str): """Endpoint de depuración: obtiene --dump-json de yt-dlp para un video. Devuelve la metadata (automatic_captions, subtitles, requested_subtitles) para inspección. """ # try to use dynamic cookiefile per request cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH) proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ "yt-dlp", "--skip-download", "--dump-json", "--no-warnings", "--extractor-args", "youtube:player_client=android", url ] if os.path.exists(cookies_path): cmd.extend(["--cookies", cookies_path]) if proxy: cmd.extend(['--proxy', proxy]) try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) except FileNotFoundError: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail="yt-dlp no está instalado en el contenedor/entorno.") except subprocess.TimeoutExpired: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=504, detail="yt-dlp demoró demasiado en responder.") except Exception as e: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail=str(e)[:300]) if proc.returncode != 0: stderr = proc.stderr or '' try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail=f"yt-dlp error: {stderr[:1000]}") try: metadata = json.loads(proc.stdout) except Exception: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail="No se pudo parsear la salida JSON de yt-dlp.") try: cookie_mgr.cleanup() except Exception: pass # Devolver solo las partes útiles para depuración debug_info = { 'id': metadata.get('id'), 'title': metadata.get('title'), 'uploader': metadata.get('uploader'), 'is_live': metadata.get('is_live'), 'automatic_captions': metadata.get('automatic_captions'), 'subtitles': metadata.get('subtitles'), 'requested_subtitles': metadata.get('requested_subtitles'), 'formats_sample': metadata.get('formats')[:5] if metadata.get('formats') else None, } return debug_info @app.get('/debug/fetch_subs/{video_id}') def debug_fetch_subs(video_id: str, lang: str = 'es'): """Intenta descargar subtítulos con yt-dlp dentro del entorno y devuelve el log y el contenido (parcial) si existe. Usa cookies definidas en API_COOKIES_PATH. """ cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() out_dir = tempfile.mkdtemp(prefix='subs_') out_template = os.path.join(out_dir, '%(id)s.%(ext)s') url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ 'yt-dlp', '--verbose', '--skip-download', '--write-auto-sub', '--write-sub', '--sub-lang', lang, '--sub-format', 'json3/vtt/srv3/best', '--output', out_template, url ] if cookiefile_path: cmd.extend(['--cookies', cookiefile_path]) try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=240) except FileNotFoundError: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail='yt-dlp no está instalado en el contenedor.') except subprocess.TimeoutExpired: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=504, detail='La ejecución de yt-dlp demoró demasiado.') except Exception as e: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail=str(e)[:300]) stdout = proc.stdout or '' stderr = proc.stderr or '' rc = proc.returncode # Buscar archivos generados generated = [] for f in glob.glob(os.path.join(out_dir, f"{video_id}.*")): size = None try: size = os.path.getsize(f) # tomar las primeras 200 líneas para no retornar archivos enormes with open(f, 'r', encoding='utf-8', errors='ignore') as fh: sample = ''.join([next(fh) for _ in range(200)]) if size > 0 else '' generated.append({ 'path': f, 'size': size, 'sample': sample }) except StopIteration: # menos de 200 líneas try: with open(f, 'r', encoding='utf-8', errors='ignore') as fh: sample = fh.read() except Exception: sample = None if size is None: try: size = os.path.getsize(f) except Exception: size = 0 generated.append({'path': f, 'size': size, 'sample': sample}) except Exception: if size is None: try: size = os.path.getsize(f) except Exception: size = 0 generated.append({'path': f, 'size': size, 'sample': None}) try: cookie_mgr.cleanup() except Exception: pass return { 'video_id': video_id, 'rc': rc, 'stdout_tail': stdout[-2000:], 'stderr_tail': stderr[-2000:], 'generated': generated, 'out_dir': out_dir } # Nuevo helper para descargar VTT directamente y retornarlo como texto def fetch_vtt_subtitles(video_id: str, lang: str = 'es'): """Descarga subtítulos en formato VTT usando yt-dlp y devuelve su contenido. Retorna (vtt_text, None) en caso de éxito o (None, error_message) en caso de error. """ url = f"https://www.youtube.com/watch?v={video_id}" cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() with tempfile.TemporaryDirectory() as tmpdir: out_template = os.path.join(tmpdir, '%(id)s.%(ext)s') cmd = [ 'yt-dlp', '--skip-download', '--write-auto-sub', '--write-sub', '--sub-lang', lang, '--sub-format', 'vtt', '--output', out_template, url ] if cookiefile_path: cmd.extend(['--cookies', cookiefile_path]) try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=180) except FileNotFoundError: try: cookie_mgr.cleanup() except Exception: pass return None, 'yt-dlp no está instalado en el entorno.' except subprocess.TimeoutExpired: try: cookie_mgr.cleanup() except Exception: pass return None, 'La descarga de subtítulos tardó demasiado.' except Exception as e: try: cookie_mgr.cleanup() except Exception: pass return None, f'Error ejecutando yt-dlp: {str(e)[:200]}' stderr = (proc.stderr or '').lower() if proc.returncode != 0: try: cookie_mgr.cleanup() except Exception: pass if 'http error 429' in stderr or 'too many requests' in stderr: return None, 'YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Revisa cookies.txt o prueba desde otra IP.' if 'http error 403' in stderr or 'forbidden' in stderr: return None, 'Acceso denegado al descargar subtítulos (HTTP 403). Usa cookies.txt con una cuenta autorizada.' return None, f'yt-dlp error: {proc.stderr[:1000]}' files = glob.glob(os.path.join(tmpdir, f"{video_id}.*")) if not files: try: cookie_mgr.cleanup() except Exception: pass return None, 'No se generaron archivos de subtítulos.' # intentar preferir .vtt vtt_path = None for f in files: if f.lower().endswith('.vtt'): vtt_path = f break if not vtt_path: vtt_path = files[0] try: with open(vtt_path, 'r', encoding='utf-8', errors='ignore') as fh: content = fh.read() try: cookie_mgr.cleanup() except Exception: pass return content, None except Exception as e: try: cookie_mgr.cleanup() except Exception: pass return None, f'Error leyendo archivo de subtítulos: {str(e)[:200]}' # Nuevo endpoint que devuelve VTT crudo, segmentos parseados y texto concatenado @app.get('/transcript_vtt/{video_id}') def transcript_vtt(video_id: str, lang: str = 'es'): """Descarga (con yt-dlp) y devuelve subtítulos en VTT, además de segmentos parseados y texto concatenado.""" vtt_text, error = fetch_vtt_subtitles(video_id, lang) if error: raise HTTPException(status_code=400, detail=error) # parsear VTT a segmentos usando parse_subtitle_format segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else [] combined_text = '\n'.join([s.get('text','') for s in segments]) # format_text con texto limpio listo para procesamiento por agentes format_text = format_segments_text(segments) thumbnails = get_video_thumbnails(video_id) return { 'video_id': video_id, 'vtt': vtt_text, 'count': len(segments), 'segments': segments, 'text': combined_text, 'format_text': format_text, 'thumbnails': thumbnails } @app.post('/upload_vtt/{video_id}') async def upload_vtt(video_id: str, file: UploadFile = File(...)): """Permite subir un archivo VTT para un video y devuelve segmentos parseados y texto. Guarda el archivo en /app/data/{video_id}.vtt (sobrescribe si existe). """ try: content = await file.read() if not content: raise HTTPException(status_code=400, detail='Archivo vacío') target_dir = os.path.join(os.getcwd(), 'data') os.makedirs(target_dir, exist_ok=True) target_path = os.path.join(target_dir, f"{video_id}.vtt") with open(target_path, 'wb') as fh: fh.write(content) # Leer como texto para parsear text = content.decode('utf-8', errors='ignore') segments = parse_subtitle_format(text, 'vtt') if text else [] combined_text = '\n'.join([s.get('text','') for s in segments]) format_text = format_segments_text(segments) return { 'video_id': video_id, 'path': target_path, 'count': len(segments), 'segments': segments, 'text': combined_text, 'format_text': format_text } except Exception as e: raise HTTPException(status_code=500, detail=f'Error al guardar/parsear VTT: {str(e)[:200]}') @app.get('/transcript_alt/{video_id}') def transcript_alt(video_id: str, lang: str = 'es'): """Intento alternativo de obtener transcript usando youtube-transcript-api (si está disponible). Retorna segmentos en el mismo formato que get_transcript_data para mantener consistencia. """ if not YOUTUBE_TRANSCRIPT_API_AVAILABLE: raise HTTPException(status_code=501, detail='youtube-transcript-api no está instalado en el entorno.') vid = extract_video_id(video_id) if not vid: raise HTTPException(status_code=400, detail='video_id inválido') # preparar idiomas a probar langs = [lang] if len(lang) == 2: langs.append(f"{lang}-419") try: # get_transcript puede lanzar excepciones si no hay transcript # Usar cast para silenciar el analizador estático que no infiere la comprobación previa transcript_api = cast(Any, YouTubeTranscriptApi) transcript_list = transcript_api.get_transcript(vid, languages=langs) except NoTranscriptFound: raise HTTPException(status_code=404, detail='No se encontró transcript con youtube-transcript-api') except TranscriptsDisabled: raise HTTPException(status_code=403, detail='Los transcripts están deshabilitados para este video') except Exception as e: raise HTTPException(status_code=500, detail=f'Error youtube-transcript-api: {str(e)[:300]}') # transcript_list tiene items con keys: text, start, duration segments = [] for item in transcript_list: segments.append({ 'start': float(item.get('start', 0)), 'duration': float(item.get('duration', 0)), 'text': item.get('text', '').strip() }) combined_text = '\n'.join([s['text'] for s in segments if s.get('text')]) format_text = format_segments_text(segments) thumbnails = get_video_thumbnails(vid) return { 'video_id': vid, 'count': len(segments), 'segments': segments, 'text': combined_text, 'format_text': format_text, 'source': 'youtube-transcript-api' } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)