import os import json import subprocess import requests import time import re import tempfile import glob from fastapi import FastAPI, HTTPException, UploadFile, File from typing import List, Dict, Any, cast # Intentar importar youtube_transcript_api como fallback try: from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound YOUTUBE_TRANSCRIPT_API_AVAILABLE = True except Exception: # definir placeholders para evitar NameError si la librería no está instalada YouTubeTranscriptApi = None class TranscriptsDisabled(Exception): pass class NoTranscriptFound(Exception): pass YOUTUBE_TRANSCRIPT_API_AVAILABLE = False # Import CookieManager from yt_wrap to provide cookiefile paths per request from yt_wrap import CookieManager app = FastAPI(title="TubeScript API Pro - JSON Cleaner") # Ruta de cookies configurable vía variable de entorno: API_COOKIES_PATH # Por defecto, usar ./data/cookies.txt para agrupar configuraciones en la carpeta data DEFAULT_COOKIES_PATH = './data/cookies.txt' # Proxy opcional para requests/yt-dlp (ej. socks5h://127.0.0.1:9050) DEFAULT_PROXY = os.getenv('API_PROXY', '') def clean_youtube_json(raw_json: Dict) -> List[Dict]: """ Transforma el formato complejo 'json3' de YouTube a un formato simple: [{'start': 0.0, 'duration': 2.0, 'text': 'Hola'}] """ clean_data = [] # YouTube guarda los eventos de texto en la llave 'events' events = raw_json.get('events', []) for event in events: # Solo procesamos eventos que tengan segmentos de texto if 'segs' in event: text = "".join([seg['utf8'] for seg in event['segs']]).strip() if text and text != '\n': clean_data.append({ "start": event.get('tStartMs', 0) / 1000.0, # Convertir a segundos "duration": event.get('dDurationMs', 0) / 1000.0, "text": text.replace('\n', ' ') }) return clean_data def parse_subtitle_format(content: str, format_type: str) -> List[Dict]: """ Parsea diferentes formatos de subtítulos (json3, srv3, vtt) al formato estándar """ try: if format_type == 'json3': # Formato JSON3 de YouTube data = json.loads(content) if isinstance(content, str) else content return clean_youtube_json(data) elif format_type in ['srv3', 'vtt']: # Para srv3 y vtt, intentar parsear como JSON primero try: data = json.loads(content) if isinstance(content, str) else content # srv3 también tiene estructura similar a json3 if 'events' in data: return clean_youtube_json(data) except: pass # Si no es JSON, intentar parsear como texto VTT clean_data = [] lines = content.split('\n') if isinstance(content, str) else [] current_time = 0.0 current_text = "" for line in lines: line = line.strip() if not line or line.startswith('WEBVTT') or '-->' in line: if '-->' in line: # Extraer tiempo de inicio try: time_parts = line.split('-->')[0].strip().split(':') if len(time_parts) >= 2: current_time = float(time_parts[-2]) * 60 + float(time_parts[-1]) except: pass continue if line and not line.isdigit(): current_text = line if current_text: clean_data.append({ "start": current_time, "duration": 2.0, # Duración aproximada "text": current_text }) current_time += 2.0 return clean_data if clean_data else [] else: # Formato desconocido, intentar como JSON data = json.loads(content) if isinstance(content, str) else content if 'events' in data: return clean_youtube_json(data) return [] except Exception as e: print(f"Error parsing subtitle format {format_type}: {e}") return [] def extract_video_id(video_id_or_url: str) -> str: """ Normaliza la entrada y extrae el video_id si se recibe una URL completa. Acepta: https://www.youtube.com/watch?v=ID, youtu.be/ID, o el propio ID. """ if not video_id_or_url: return "" s = video_id_or_url.strip() # Si ya parece un id (11-20 caracteres alfanuméricos y -, _), retornarlo if re.match(r'^[A-Za-z0-9_-]{8,20}$', s): return s # Intentar extraer de URL completa # watch?v= m = re.search(r'[?&]v=([A-Za-z0-9_-]{8,20})', s) if m: return m.group(1) # youtu.be/ m = re.search(r'youtu\.be/([A-Za-z0-9_-]{8,20})', s) if m: return m.group(1) # /v/ or /embed/ m = re.search(r'(?:/v/|/embed/)([A-Za-z0-9_-]{8,20})', s) if m: return m.group(1) # Si no se detecta, devolver la entrada original (fallará después si es inválida) return s def format_segments_text(segments: List[Dict]) -> List[str]: """Devuelve una lista 'format_text' con textos limpios extraídos de segments. - elimina prefijos tipo 'Kind: captions' - elimina contenido en corchetes/paréntesis - elimina etiquetas HTML - normaliza espacios - divide por saltos de línea para obtener frases independientes """ def _clean_text(t: str) -> str: if not t: return '' s = str(t).strip() s = re.sub(r'^\s*Kind\s*:\s*.*$', '', s, flags=re.IGNORECASE).strip() # eliminar contenido entre corchetes (no-greedy) s = re.sub(r'\[[^\]]*\]', '', s) s = re.sub(r'\([^\)]*\)', '', s) s = re.sub(r'<[^>]+>', '', s) s = re.sub(r'[♪★■◆►▶◀•–—]', '', s) s = re.sub(r'\s+', ' ', s).strip() return s output: List[str] = [] for seg in segments or []: raw = seg.get('text', '') cleaned = _clean_text(raw) if not cleaned: continue parts = [p.strip() for p in re.split(r'[\n\r]+', cleaned) if p.strip()] output.extend(parts) return output NODE_PATH = "/usr/bin/node" def _yt_client_args(has_cookies: bool, for_stream: bool = False) -> list: """Devuelve --extractor-args y --js-runtimes para metadata/streams. Estrategia (basada en pruebas reales 2026-03-05): - Sin cookies → android (sin n-challenge, sin Node.js) - Con cookies → web + Node.js (web acepta cookies; Node resuelve n-challenge/signature) - for_stream → android (mejor compatibilidad HLS en lives) Diagnóstico: - mweb con cookies → requiere GVS PO Token (no disponible) - android con cookies → yt-dlp lo salta (no soporta cookies) - web con cookies + --js-runtimes node → ✅ funciona """ if for_stream or not has_cookies: return ["--extractor-args", "youtube:player_client=android"] else: return [ "--extractor-args", "youtube:player_client=web", "--js-runtimes", f"node:{NODE_PATH}", ] def _yt_subs_args(has_cookies: bool) -> list: """Devuelve --extractor-args para descarga de subtítulos. Para subtítulos siempre usamos android: - android sin cookies → ✅ funciona, obtiene auto-subs sin n-challenge - android con cookies → yt-dlp lo salta pero descarga igual sin cookies - web con cookies → falla en sub-langs no exactos (ej: en vs en-US) Resultado: android es siempre el cliente más fiable para subtítulos. """ return ["--extractor-args", "youtube:player_client=android"] # Nuevo helper: obtener thumbnails para un video — usa URLs estáticas directas (sin yt-dlp) def get_video_thumbnails(video_id: str) -> List[str]: """Devuelve URLs de thumbnail sin llamar yt-dlp (rápido, sin bloquear el transcript). YouTube siempre tiene estas URLs disponibles para cualquier video público. """ return [ f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg", f"https://img.youtube.com/vi/{video_id}/sddefault.jpg", f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg", f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg", f"https://img.youtube.com/vi/{video_id}/default.jpg", ] def get_transcript_data(video_id: str, lang: str = "es"): video_id = extract_video_id(video_id) if not video_id: return None, [], "video_id inválido o vacío" url = f"https://www.youtube.com/watch?v={video_id}" # Use CookieManager to get a cookiefile path per request (may be None) cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() # cookies_path: prefer the temporary cookiefile if present, otherwise fall back to env path cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH) # proxy support proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None proxies = {'http': proxy, 'https': proxy} if proxy else None def load_cookies_from_file(path: str) -> dict: """Parsea un cookies.txt en formato Netscape a un dict usable por requests.""" cookies = {} try: if not path or not os.path.exists(path): return cookies with open(path, 'r', encoding='utf-8', errors='ignore') as fh: for line in fh: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') # formato Netscape: domain, flag, path, secure, expiration, name, value if len(parts) >= 7: name = parts[5].strip() value = parts[6].strip() if name: cookies[name] = value else: # fallback: intento simple name=value if '=' in line: k, v = line.split('=', 1) cookies[k.strip()] = v.strip() except Exception: return {} return cookies cookies_for_requests = load_cookies_from_file(cookies_path) if cookies_path else {} # Intento rápido y fiable: usar yt-dlp para descargar subtítulos (auto o manual) al tmpdir try: with tempfile.TemporaryDirectory() as tmpdl: # Construir lista amplia de variantes de idioma # yt-dlp usa códigos exactos; cubrimos las variantes más comunes sub_langs = [lang] if lang == "en": sub_langs = ["en", "en-US", "en-en", "en-GB", "en-CA", "en-AU"] elif lang == "es": sub_langs = ["es", "es-419", "es-MX", "es-ES", "es-LA", "es-en"] elif len(lang) == 2: sub_langs = [lang, f"{lang}-{lang.upper()}", f"{lang}-419", f"{lang}-en"] # siempre android para subtítulos — NO pasar --cookies porque android no las soporta # (yt-dlp salta el cliente android si recibe cookies → no descarga nada) ytdlp_cmd = [ "yt-dlp", url, "--skip-download", "--write-auto-sub", "--write-sub", "--sub-format", "vtt/json3/srv3/best", "-o", os.path.join(tmpdl, "%(id)s.%(ext)s"), "--no-warnings", "--sub-lang", ",".join(sub_langs), ] + _yt_subs_args(False) # NO se pasan cookies con android (android no las soporta en yt-dlp) # attach proxy if configured if proxy: ytdlp_cmd.extend(['--proxy', proxy]) try: result = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=120) stderr = (result.stderr or "").lower() # Error: YouTube pide autenticación if result.returncode != 0 and ('sign in' in stderr or 'confirm you' in stderr or 'bot' in stderr): return None, get_video_thumbnails(video_id), "YouTube requiere autenticación para este video. Sube un cookies.txt válido con /upload_cookies." # Si yt-dlp falló por rate limiting, devolver mensaje claro stderr = (result.stderr or "").lower() if result.returncode != 0 and ('sign in' in stderr or 'confirm you' in stderr or 'bot' in stderr): return None, get_video_thumbnails(video_id), "YouTube requiere autenticación para este video. Sube un cookies.txt válido con /upload_cookies." if result.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr): return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega un cookies.txt válido exportado desde tu navegador y monta en el contenedor, o espera unos minutos." if result.returncode != 0 and ('http error 403' in stderr or 'forbidden' in stderr): return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). El video puede tener restricciones. Usa cookies.txt con una cuenta autorizada." except subprocess.TimeoutExpired: pass # revisar archivos creados — yt-dlp genera nombres con doble extensión: ID.lang.vtt # glob "ID.*" no hace match; usar "ID*" para cubrir ID.en.vtt, ID.en-en.vtt, etc. files = glob.glob(os.path.join(tmpdl, f"{video_id}*")) # filtrar solo archivos de texto (vtt, json3, srv3, ttml, srt) files = [f for f in files if os.path.isfile(f) and any(f.endswith(ext) for ext in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))] if files: combined = [] seen_content = set() for fpath in files: try: with open(fpath, 'r', encoding='utf-8') as fh: content = fh.read() # desduplicar archivos con mismo contenido (en.vtt vs en-en.vtt) content_hash = hash(content[:500]) if content_hash not in seen_content: seen_content.add(content_hash) combined.append(content) except Exception: continue if combined: vtt_combined = "\n".join(combined) parsed = parse_subtitle_format(vtt_combined, 'vtt') # filtrar segmentos de ruido del header VTT _noise = {'kind: captions', 'language:', 'webvtt', 'position:', 'align:'} parsed = [s for s in parsed if s.get('text') and not any(s['text'].lower().startswith(n) for n in _noise)] if parsed: return parsed, get_video_thumbnails(video_id), None finally: # cleanup any temp cookiefile created for this request try: cookie_mgr.cleanup() except Exception: pass # ...existing code continues... # 1) Intento principal: obtener metadata con yt-dlp _has_ck = os.path.exists(cookies_path) command = [ "yt-dlp", "--skip-download", "--dump-json", "--no-warnings", ] + _yt_client_args(_has_ck) + [url] if _has_ck: command.extend(["--cookies", cookies_path]) if proxy: command.extend(['--proxy', proxy]) try: result = subprocess.run(command, capture_output=True, text=True, timeout=60) if result.returncode != 0: error_msg = result.stderr if result.stderr else "Error desconocido from yt-dlp" # Si yt-dlp reporta algo, enviar mensaje útil # No abortar inmediatamente: intentaremos fallback descargando subs con yt-dlp video_metadata = None else: if not result.stdout.strip(): video_metadata = None else: try: video_metadata = json.loads(result.stdout) except Exception: video_metadata = None except subprocess.TimeoutExpired: video_metadata = None except FileNotFoundError: return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar." except Exception as e: video_metadata = None requested_subs = {} if video_metadata: requested_subs = video_metadata.get('requested_subtitles', {}) or {} # Buscar en automatic_captions y subtitles si requested_subs está vacío if not requested_subs: automatic_captions = video_metadata.get('automatic_captions', {}) or {} for lang_key, formats in automatic_captions.items(): if lang in lang_key or lang_key.startswith(lang): if formats: requested_subs = {lang_key: formats[0]} break if not requested_subs: subtitles = video_metadata.get('subtitles', {}) or {} for lang_key, formats in subtitles.items(): if lang in lang_key or lang_key.startswith(lang): if formats: requested_subs = {lang_key: formats[0]} break # Si requested_subs está disponible, intentar descargar vía requests la URL proporcionada if requested_subs: lang_key = next(iter(requested_subs)) sub_url = requested_subs[lang_key].get('url') if sub_url: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', 'Referer': 'https://www.youtube.com/', } max_retries = 3 response = None rate_limited = False for attempt in range(max_retries): try: response = requests.get(sub_url, headers=headers, timeout=30, cookies=cookies_for_requests, proxies=proxies) if response.status_code == 200: break elif response.status_code == 429: rate_limited = True if attempt < max_retries - 1: time.sleep(2 * (attempt + 1)) continue else: # salir del loop y usar fallback con yt-dlp más abajo break elif response.status_code == 403: return None, get_video_thumbnails(video_id), "Acceso denegado (HTTP 403). El video puede tener restricciones de edad o región. Intenta con cookies.txt." elif response.status_code == 404: # No encontramos la URL esperada; intentar fallback response = None break else: return None, get_video_thumbnails(video_id), f"Error al descargar subtítulos desde YouTube (HTTP {response.status_code})." except requests.exceptions.Timeout: if attempt < max_retries - 1: continue return None, get_video_thumbnails(video_id), "Timeout al descargar subtítulos. Intenta nuevamente." except requests.exceptions.RequestException as e: return None, get_video_thumbnails(video_id), f"Error de conexión al descargar subtítulos: {str(e)[:100]}" # Si obtuvimos un 200, procesarlo; si hubo rate limiting, intentar fallback con yt-dlp if response and response.status_code == 200: subtitle_format = requested_subs[lang_key].get('ext', 'json3') try: # Si la respuesta parece ser una playlist M3U8 o contiene enlaces a timedtext, # extraer las URLs y concatenar su contenido (VTT) antes de parsear. text_body = response.text if isinstance(response.text, str) else None if text_body and ('#EXTM3U' in text_body or 'timedtext' in text_body or text_body.strip().lower().startswith('#extm3u')): # Extraer URLs (líneas que empiecen con http) urls = re.findall(r'^(https?://\S+)', text_body, flags=re.M) # Intento 1: descargar cada URL con requests (usa cookies montadas si aplican) combined = [] for idx, u in enumerate(urls): try: r2 = requests.get(u, headers=headers, timeout=20, cookies=cookies_for_requests, proxies=proxies) if r2.status_code == 200 and r2.text: combined.append(r2.text) continue # Si recibimos 429, 403, o falló, intentaremos con yt-dlp (fallback) if r2.status_code == 429: # fallback a yt-dlp raise Exception('rate_limited') except Exception: # fallthrough al fallback con yt-dlp pass # Intento 2 (fallback): usar yt-dlp para descargar ese timedtext/url a un archivo temporal try: with tempfile.TemporaryDirectory() as tdir: out_template = os.path.join(tdir, f"timedtext_{idx}.%(ext)s") ytdlp_cmd = [ "yt-dlp", u, "-o", out_template, "--no-warnings", ] if os.path.exists(cookies_path): ytdlp_cmd.extend(["--cookies", cookies_path]) # pasar proxy a yt-dlp si está configurado if proxy: ytdlp_cmd.extend(['--proxy', proxy]) try: res2 = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=60) stderr2 = (res2.stderr or "").lower() if res2.returncode != 0 and ('http error 429' in stderr2 or 'too many requests' in stderr2): # rate limit cuando intentamos descargar timedtext return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde." if res2.returncode != 0 and ('http error 403' in stderr2 or 'forbidden' in stderr2): return None, get_video_thumbnails(video_id), "Acceso denegado al descargar subtítulos (HTTP 403). Intenta con cookies.txt o una cuenta con permisos." except Exception: pass # leer cualquier archivo creado en el tempdir for fpath in glob.glob(os.path.join(tdir, "timedtext_*.*")): try: with open(fpath, 'r', encoding='utf-8') as fh: txt = fh.read() if txt: combined.append(txt) except Exception: continue except Exception: continue if combined: vtt_combined = "\n".join(combined) formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt') if formatted_transcript: return formatted_transcript, get_video_thumbnails(video_id) except Exception as e: return None, get_video_thumbnails(video_id), f"Error al procesar los subtítulos: {str(e)[:200]}" if not formatted_transcript: return None, get_video_thumbnails(video_id), "Los subtítulos están vacíos o no se pudieron procesar." return formatted_transcript, get_video_thumbnails(video_id), None # Si hubo rate limiting, intentar fallback con yt-dlp para descargar la URL de subtítulos if rate_limited and (not response or response.status_code != 200): # Intentar descargar la URL de subtítulos directamente con yt-dlp (usa cookies si existen) try: with tempfile.TemporaryDirectory() as tdir: out_template = os.path.join(tdir, "sub.%(ext)s") ytdlp_cmd = [ "yt-dlp", sub_url, "-o", out_template, "--no-warnings", ] if os.path.exists(cookies_path): ytdlp_cmd.extend(["--cookies", cookies_path]) if proxy: ytdlp_cmd.extend(['--proxy', proxy]) res = subprocess.run(ytdlp_cmd, capture_output=True, text=True, timeout=90) stderr = (res.stderr or "").lower() if res.returncode != 0 and ('http error 429' in stderr or 'too many requests' in stderr): return None, get_video_thumbnails(video_id), "YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Agrega cookies.txt válido o intenta más tarde." # Leer archivos generados combined = [] for fpath in glob.glob(os.path.join(tdir, "*.*")): try: with open(fpath, 'r', encoding='utf-8') as fh: txt = fh.read() if txt: combined.append(txt) except Exception: continue if combined: vtt_combined = "\n".join(combined) formatted_transcript = parse_subtitle_format(vtt_combined, 'vtt') if formatted_transcript: return formatted_transcript, get_video_thumbnails(video_id) except FileNotFoundError: return None, get_video_thumbnails(video_id), "yt-dlp no está instalado en el contenedor/entorno. Instala yt-dlp y vuelve a intentar." except Exception: # seguir con otros fallbacks pass # si no logró con yt-dlp, continuar y dejar que los fallbacks posteriores manejen el caso # Fallback: intentarlo descargando subtítulos con yt-dlp a un directorio temporal # (esto cubre casos en que la metadata no incluye requested_subs) try: with tempfile.TemporaryDirectory() as tmpdir: # Intentar con auto-sub primero, luego con sub (manual) ytdlp_variants = [ ("--write-auto-sub", "auto"), ("--write-sub", "manual") ] downloaded = None for flag, label in ytdlp_variants: cmd = [ "yt-dlp", url, "--skip-download", flag, "--sub-lang", lang, "--sub-format", "json3/vtt/srv3/best", "-o", os.path.join(tmpdir, "%(id)s.%(ext)s"), "--no-warnings", ] + _yt_subs_args(False) # NO cookies con android (android no las soporta, yt-dlp lo saltaría) # añadir proxy a la llamada de yt-dlp si está configurado if proxy: cmd.extend(['--proxy', proxy]) r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) # Revisar si se creó algún archivo en tmpdir (doble ext: ID.en.vtt) files = glob.glob(os.path.join(tmpdir, f"{video_id}*")) files = [f for f in files if os.path.isfile(f) and any(f.endswith(e) for e in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))] if files: # Tomar el primero válido downloaded = files[0] break if downloaded: ext = os.path.splitext(downloaded)[1].lstrip('.') try: with open(downloaded, 'r', encoding='utf-8') as fh: content = fh.read() except Exception as e: return None, get_video_thumbnails(video_id), f"Error leyendo archivo de subtítulos descargado: {str(e)[:200]}" # Intentar parsear según extensión conocida fmt = 'json3' if ext in ('json', 'json3') else ('vtt' if ext == 'vtt' else 'srv3') formatted_transcript = parse_subtitle_format(content, fmt) if formatted_transcript: return formatted_transcript, get_video_thumbnails(video_id), None else: return None, get_video_thumbnails(video_id), "Se descargaron subtítulos pero no se pudieron procesar." except FileNotFoundError: return None, get_video_thumbnails(video_id), "yt-dlp no está instalado. Instala yt-dlp en el contenedor/entorno y vuelve a intentar." except Exception as e: # No hacer crash, retornar mensaje general return None, get_video_thumbnails(video_id), f"Error al intentar descargar subtítulos con yt-dlp: {str(e)[:200]}" return None, get_video_thumbnails(video_id), ( f"No se encontraron subtítulos para este video en idioma '{lang}'. " "Puede que el video no tenga subtítulos, estén en otro idioma, o requiera autenticación. " "Prueba: ?lang=en | /debug/fetch_subs/{video_id} | sube cookies con /upload_cookies" ) # ── Clientes exactos de NewPipeExtractor (ClientsConstants.java dev 2026-03-05) ── _NP_IOS = { "clientName": "IOS", "clientVersion": "21.03.2", "clientScreen": "WATCH", "platform": "MOBILE", "deviceMake": "Apple", "deviceModel": "iPhone16,2", "osName": "iOS", "osVersion": "18.7.2.22H124", "userAgent": "com.google.ios.youtube/21.03.2 (iPhone16,2; U; CPU iOS 18_7_2 like Mac OS X;)", } _NP_ANDROID = { "clientName": "ANDROID", "clientVersion": "21.03.36", "clientScreen": "WATCH", "platform": "MOBILE", "osName": "Android", "osVersion": "16", "androidSdkVersion": 36, "userAgent": "com.google.android.youtube/21.03.36 (Linux; U; Android 16) gzip", } # GAPIS: youtubei.googleapis.com — NewPipe lo usa para iOS y Android (YoutubeStreamHelper.java) _GAPIS_BASE = "https://youtubei.googleapis.com/youtubei/v1" def _np_build_ctx(client: dict, visitor_data: str = "") -> dict: """context.client igual que prepareJsonBuilder de YoutubeParsingHelper.java.""" ctx = { "clientName": client["clientName"], "clientVersion": client["clientVersion"], "clientScreen": client.get("clientScreen", "WATCH"), "platform": client.get("platform", "MOBILE"), "hl": "en", "gl": "US", "utcOffsetMinutes": 0, } if visitor_data: ctx["visitorData"] = visitor_data for k in ("deviceMake", "deviceModel", "osName", "osVersion", "androidSdkVersion"): if client.get(k): ctx[k] = client[k] return ctx def _np_get_visitor_data(client: dict, proxies: dict = None) -> str: """POST /visitor_id → responseContext.visitorData (getVisitorDataFromInnertube).""" try: ctx = _np_build_ctx(client) payload = { "context": { "client": ctx, "request": {"internalExperimentFlags": [], "useSsl": True}, "user": {"lockedSafetyMode": False}, } } headers = { "User-Agent": client["userAgent"], "X-Goog-Api-Format-Version": "2", "Content-Type": "application/json", } r = requests.post( f"{_GAPIS_BASE}/visitor_id?prettyPrint=false", json=payload, headers=headers, timeout=8, proxies=proxies, ) if r.status_code == 200: return r.json().get("responseContext", {}).get("visitorData", "") except Exception: pass return "" def _np_call_player(video_id: str, client: dict, visitor_data: str = "", proxies: dict = None) -> dict: """POST /player igual que getIosPlayerResponse/getAndroidPlayerResponse de NewPipe.""" import string as _str n = int(time.time()) chars = _str.digits + _str.ascii_lowercase t = "" while n: t = chars[n % 36] + t n //= 36 url = f"{_GAPIS_BASE}/player?prettyPrint=false&t={t or '0'}&id={video_id}" ctx = _np_build_ctx(client, visitor_data) payload = { "context": { "client": ctx, "request": {"internalExperimentFlags": [], "useSsl": True}, "user": {"lockedSafetyMode": False}, }, "videoId": video_id, "contentCheckOk": True, "racyCheckOk": True, } headers = { "User-Agent": client["userAgent"], "X-Goog-Api-Format-Version": "2", "Content-Type": "application/json", } try: r = requests.post(url, json=payload, headers=headers, timeout=15, proxies=proxies) if r.status_code == 200: return r.json() except Exception: pass return {} def innertube_get_stream(video_id: str, proxy: str = None) -> dict: """ Obtiene URL de stream replicando exactamente NewPipeExtractor: 1. visitorData via /visitor_id (para ambos clientes) 2. iOS /player → iosStreamingData.hlsManifestUrl (prioritario para lives) 3. Android /player → formats directas (videos normales) Sin cookies | Sin firma JS | Sin PO Token | Sin bot-check desde servidores """ result = { "title": None, "description": None, "is_live": False, "hls_url": None, "formats": [], "error": None, } proxies = {"http": proxy, "https": proxy} if proxy else None vd_ios = _np_get_visitor_data(_NP_IOS, proxies) vd_android = _np_get_visitor_data(_NP_ANDROID, proxies) # iOS — preferido para hlsManifestUrl en lives (como hace NewPipe) ios = _np_call_player(video_id, _NP_IOS, vd_ios, proxies) ps = ios.get("playabilityStatus") or {} if ps.get("status") == "LOGIN_REQUIRED": result["error"] = f"Login requerido: {ps.get('reason','')}" return result vd_meta = ios.get("videoDetails") or {} result["title"] = vd_meta.get("title") result["description"] = vd_meta.get("shortDescription") result["is_live"] = bool(vd_meta.get("isLive") or vd_meta.get("isLiveContent")) ios_sd = ios.get("streamingData") or {} hls = ios_sd.get("hlsManifestUrl") if hls: result["hls_url"] = hls result["formats"] = [ {"itag": f.get("itag"), "mimeType": f.get("mimeType"), "quality": f.get("quality")} for f in (ios_sd.get("formats", []) + ios_sd.get("adaptiveFormats", []))[:8] ] return result # Android — para videos normales o si iOS no dio HLS android = _np_call_player(video_id, _NP_ANDROID, vd_android, proxies) if not result["title"]: vd2 = android.get("videoDetails") or {} result["title"] = vd2.get("title") result["description"] = vd2.get("shortDescription") result["is_live"] = bool(vd2.get("isLive") or vd2.get("isLiveContent")) android_sd = android.get("streamingData") or {} hls = android_sd.get("hlsManifestUrl") if hls: result["hls_url"] = hls return result all_fmts = android_sd.get("formats", []) + android_sd.get("adaptiveFormats", []) best = sorted([f for f in all_fmts if f.get("url")], key=lambda x: x.get("bitrate", 0), reverse=True) if best: result["hls_url"] = best[0]["url"] result["formats"] = [ {"itag": f.get("itag"), "mimeType": f.get("mimeType"), "quality": f.get("quality")} for f in best[:8] ] return result result["error"] = ( "Innertube no devolvió streamingData. " "Puede ser DRM, región bloqueada, privado, o YouTube actualizó su API." ) return result def get_stream_url(video_id: str): """ Obtiene la URL de transmisión m3u8/HLS. Devuelve: (stream_url, title, description, is_live, error) Estrategia: 1. innertube_get_stream() — técnica NewPipe, sin cookies, sin bot-check 2. Fallback yt-dlp si Innertube falla """ video_id = extract_video_id(video_id) proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None # ── 1. Innertube directo (NewPipe) ──────────────────────────────────────── it = innertube_get_stream(video_id, proxy=proxy) if it.get("hls_url"): return (it["hls_url"], it.get("title"), it.get("description"), it.get("is_live", False), None) title = it.get("title") description = it.get("description") is_live = it.get("is_live", False) # ── 2. Fallback yt-dlp ──────────────────────────────────────────────────── cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() cookies_path_env = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH) effective_cookie = cookiefile_path or ( cookies_path_env if os.path.exists(cookies_path_env) else None) has_ck = bool(effective_cookie) yt_url = f"https://www.youtube.com/watch?v={video_id}" BOT_MARKERS = ("sign in to confirm", "not a bot", "sign in to") def _is_bot(s: str) -> bool: return any(m in s.lower() for m in BOT_MARKERS) def _build_args(client: str) -> list: args = ["--no-warnings", "--no-check-certificate", "--no-playlist", "--extractor-args", f"youtube:player_client={client}"] if client == "web": args += ["--js-runtimes", f"node:{NODE_PATH}"] if effective_cookie and client == "web": args += ["--cookies", effective_cookie] if proxy: args += ["--proxy", proxy] return args def _ytdlp_url(fmt: str, client: str): cmd = ["yt-dlp", "-g", "-f", fmt] + _build_args(client) + [yt_url] try: res = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=90) if res.returncode == 0 and res.stdout.strip(): for line in res.stdout.strip().splitlines(): line = line.strip() if line.startswith("http"): return line, False return None, _is_bot(res.stderr or "") except Exception: return None, False clients = ["android", "ios"] + (["web"] if has_ck else []) fmts = (["91", "92", "93", "94", "95", "96", "best[protocol=m3u8_native]", "best[protocol=m3u8]", "best"] if is_live else ["best[ext=m3u8]", "best[protocol=m3u8_native]", "best[protocol=m3u8]", "best", "best[ext=mp4]"]) got_bot = False try: for client in clients: for fmt in fmts: u, is_b = _ytdlp_url(fmt, client) if u: return u, title, description, is_live, None if is_b: got_bot = True finally: try: cookie_mgr.cleanup() except Exception: pass if got_bot: return None, title, description, is_live, ( "YouTube detectó actividad de bot. " "Sube cookies.txt: curl -X POST http://localhost:8282/upload_cookies -F 'file=@cookies.txt'" ) return None, title, description, is_live, ( it.get("error") or "No se pudo obtener la URL del stream. " "Si es un live, verifica que esté EN VIVO (🔴) ahora mismo." ) # ...existing code (old get_stream_url body — reemplazado arriba) — ELIMINAR... @app.get("/transcript/{video_id}") def transcript_endpoint(video_id: str, lang: str = "es"): data, thumbnails, error = get_transcript_data(video_id, lang) # Fallback automático a 'en' si no hay subs en el idioma pedido if (error and lang != "en" and "No se encontraron" in (error or "") and "autenticación" not in (error or "")): data_en, thumbnails_en, error_en = get_transcript_data(video_id, "en") if data_en and not error_en: data, thumbnails, error = data_en, thumbnails_en, None if error: raise HTTPException(status_code=400, detail=error) # Concatenar texto de segmentos para mostrar como texto plano además de los segmentos try: combined_text = "\n".join([seg.get('text', '') for seg in data if seg.get('text')]) except Exception: combined_text = "" # Nuevo: arreglo format_text con cada segmento como elemento (texto limpio) try: format_text_list = format_segments_text(data) except Exception: format_text_list = [] format_text = format_text_list return { "video_id": video_id, "count": len(data), "segments": data, "text": combined_text, "format_text": format_text, "thumbnails": thumbnails } @app.get('/transcript_vtt/{video_id}') def transcript_vtt(video_id: str, lang: str = 'es'): """Descarga (con yt-dlp) y devuelve subtítulos en VTT, además de segmentos parseados y texto concatenado.""" vtt_text, error = fetch_vtt_subtitles(video_id, lang) if error: raise HTTPException(status_code=400, detail=error) # parsear VTT a segmentos usando parse_subtitle_format segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else [] combined_text = '\n'.join([s.get('text','') for s in segments]) # format_text con texto limpio listo para procesamiento por agentes format_text = format_segments_text(segments) thumbnails = get_video_thumbnails(video_id) return { 'video_id': video_id, 'vtt': vtt_text, 'count': len(segments), 'segments': segments, 'text': combined_text, 'format_text': format_text, 'thumbnails': thumbnails } @app.get("/stream/{video_id}") def stream_endpoint(video_id: str): """ Obtiene la URL de transmisión (m3u8/HLS) de un video/live de YouTube. - Para lives en vivo (🔴): devuelve URL HLS directa usable con FFmpeg/VLC. - Para videos normales: devuelve la mejor URL de video disponible. Ejemplo FFmpeg: ffmpeg -re -i "URL_M3U8" -c copy -f flv rtmp://destino/stream_key """ stream_url, title, description, is_live, error = get_stream_url(video_id) if error: raise HTTPException(status_code=400, detail=error) thumbnails = get_video_thumbnails(video_id) url_type = "m3u8/hls" if stream_url and "m3u8" in stream_url.lower() else "direct/mp4" return { "video_id": video_id, "title": title, "description": description, "is_live": is_live, "stream_url": stream_url, "url_type": url_type, "youtube_url": f"https://www.youtube.com/watch?v={video_id}", "ffmpeg_example": f'ffmpeg -re -i "{stream_url}" -c copy -f flv rtmp://destino/stream_key', "thumbnails": thumbnails, "usage": { "description": "Usa stream_url con FFmpeg para retransmitir", "command_template": "ffmpeg -re -i \"{stream_url}\" -c copy -f flv {rtmp_url}/{stream_key}", "platforms": { "youtube": "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY", "facebook": "rtmps://live-api-s.facebook.com:443/rtmp/YOUR_STREAM_KEY", "twitch": "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY", "twitter": "rtmps://fa.contribute.live-video.net/app/YOUR_STREAM_KEY" } } } @app.post('/upload_cookies') async def upload_cookies(file: UploadFile = File(...)): """Endpoint para subir cookies.txt y guardarlo en el servidor en /app/cookies.txt""" try: content = await file.read() if not content: raise HTTPException(status_code=400, detail='Archivo vacío') # Determinar ruta objetivo a partir de la variable de entorno target = os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH) target_dir = os.path.dirname(target) or '.' # Crear directorio si no existe try: os.makedirs(target_dir, exist_ok=True) except Exception: # Si no se puede crear el directorio, intentamos escribir en el working dir como fallback target = os.path.basename(target) # Guardar con permisos de escritura with open(target, 'wb') as fh: fh.write(content) return {"detail": "cookies.txt guardado correctamente", "path": os.path.abspath(target)} except Exception as e: raise HTTPException(status_code=500, detail=f'Error al guardar cookies: {str(e)[:200]}') @app.get("/debug/metadata/{video_id}") def debug_metadata(video_id: str): """Endpoint de depuración: obtiene --dump-json de yt-dlp para un video. Devuelve la metadata (automatic_captions, subtitles, requested_subtitles) para inspección. """ # try to use dynamic cookiefile per request cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() cookies_path = cookiefile_path or os.getenv('API_COOKIES_PATH', DEFAULT_COOKIES_PATH) proxy = os.getenv('API_PROXY', DEFAULT_PROXY) or None url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ "yt-dlp", "--skip-download", "--dump-json", "--no-warnings", url ] + _yt_client_args(os.path.exists(cookies_path)) if os.path.exists(cookies_path): cmd.extend(["--cookies", cookies_path]) if proxy: cmd.extend(['--proxy', proxy]) try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) except FileNotFoundError: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail="yt-dlp no está instalado en el contenedor/entorno.") except subprocess.TimeoutExpired: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=504, detail="yt-dlp demoró demasiado en responder.") except Exception as e: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail=str(e)[:300]) if proc.returncode != 0: stderr = proc.stderr or '' try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail=f"yt-dlp error: {stderr[:1000]}") try: metadata = json.loads(proc.stdout) except Exception: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail="No se pudo parsear la salida JSON de yt-dlp.") try: cookie_mgr.cleanup() except Exception: pass # Devolver solo las partes útiles para depuración debug_info = { 'id': metadata.get('id'), 'title': metadata.get('title'), 'uploader': metadata.get('uploader'), 'is_live': metadata.get('is_live'), 'automatic_captions': metadata.get('automatic_captions'), 'subtitles': metadata.get('subtitles'), 'requested_subtitles': metadata.get('requested_subtitles'), 'formats_sample': metadata.get('formats')[:5] if metadata.get('formats') else None, } return debug_info @app.get('/debug/fetch_subs/{video_id}') def debug_fetch_subs(video_id: str, lang: str = 'es'): """Intenta descargar subtítulos con yt-dlp dentro del entorno y devuelve el log y el contenido (parcial) si existe. Usa cookies definidas en API_COOKIES_PATH. """ cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() out_dir = tempfile.mkdtemp(prefix='subs_') out_template = os.path.join(out_dir, '%(id)s.%(ext)s') url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ 'yt-dlp', '--verbose', '--skip-download', '--write-auto-sub', '--write-sub', '--sub-lang', lang, '--sub-format', 'json3/vtt/srv3/best', '--output', out_template, url ] + _yt_subs_args(bool(cookiefile_path)) if cookiefile_path: cmd.extend(['--cookies', cookiefile_path]) try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=240) except FileNotFoundError: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail='yt-dlp no está instalado en el contenedor.') except subprocess.TimeoutExpired: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=504, detail='La ejecución de yt-dlp demoró demasiado.') except Exception as e: try: cookie_mgr.cleanup() except Exception: pass raise HTTPException(status_code=500, detail=str(e)[:300]) stdout = proc.stdout or '' stderr = proc.stderr or '' rc = proc.returncode # Buscar archivos generados (yt-dlp usa doble extensión: ID.lang.vtt) generated = [] for f in glob.glob(os.path.join(out_dir, f"{video_id}*")): size = None try: size = os.path.getsize(f) # tomar las primeras 200 líneas para no retornar archivos enormes with open(f, 'r', encoding='utf-8', errors='ignore') as fh: sample = ''.join([next(fh) for _ in range(200)]) if size > 0 else '' generated.append({ 'path': f, 'size': size, 'sample': sample }) except StopIteration: # menos de 200 líneas try: with open(f, 'r', encoding='utf-8', errors='ignore') as fh: sample = fh.read() except Exception: sample = None if size is None: try: size = os.path.getsize(f) except Exception: size = 0 generated.append({'path': f, 'size': size, 'sample': sample}) except Exception: if size is None: try: size = os.path.getsize(f) except Exception: size = 0 generated.append({'path': f, 'size': size, 'sample': None}) try: cookie_mgr.cleanup() except Exception: pass return { 'video_id': video_id, 'rc': rc, 'stdout_tail': stdout[-2000:], 'stderr_tail': stderr[-2000:], 'generated': generated, 'out_dir': out_dir } # Nuevo helper para descargar VTT directamente y retornarlo como texto def fetch_vtt_subtitles(video_id: str, lang: str = 'es'): """Descarga subtítulos en formato VTT usando yt-dlp y devuelve su contenido. Retorna (vtt_text, None) en caso de éxito o (None, error_message) en caso de error. """ url = f"https://www.youtube.com/watch?v={video_id}" cookie_mgr = CookieManager() cookiefile_path = cookie_mgr.get_cookiefile_path() with tempfile.TemporaryDirectory() as tmpdir: out_template = os.path.join(tmpdir, '%(id)s.%(ext)s') cmd = [ 'yt-dlp', '--skip-download', '--write-auto-sub', '--write-sub', '--sub-lang', lang, '--sub-format', 'vtt', '--output', out_template, url ] + _yt_subs_args(bool(cookiefile_path)) if cookiefile_path: cmd.extend(['--cookies', cookiefile_path]) try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=180) except FileNotFoundError: try: cookie_mgr.cleanup() except Exception: pass return None, 'yt-dlp no está instalado en el entorno.' except subprocess.TimeoutExpired: try: cookie_mgr.cleanup() except Exception: pass return None, 'La descarga de subtítulos tardó demasiado.' except Exception as e: try: cookie_mgr.cleanup() except Exception: pass return None, f'Error ejecutando yt-dlp: {str(e)[:200]}' stderr = (proc.stderr or '').lower() if proc.returncode != 0: try: cookie_mgr.cleanup() except Exception: pass if 'http error 429' in stderr or 'too many requests' in stderr: return None, 'YouTube está limitando las peticiones al descargar subtítulos (HTTP 429). Revisa cookies.txt o prueba desde otra IP.' if 'http error 403' in stderr or 'forbidden' in stderr: return None, 'Acceso denegado al descargar subtítulos (HTTP 403). Usa cookies.txt con una cuenta autorizada.' return None, f'yt-dlp error: {proc.stderr[:1000]}' # buscar archivos generados (doble extensión: ID.lang.vtt) files = glob.glob(os.path.join(tmpdir, f"{video_id}*")) files = [f for f in files if os.path.isfile(f) and any(f.endswith(e) for e in ('.vtt', '.json3', '.srv3', '.srt', '.ttml'))] if not files: try: cookie_mgr.cleanup() except Exception: pass return None, 'No se generaron archivos de subtítulos.' # intentar preferir .vtt vtt_path = None for f in files: if f.lower().endswith('.vtt'): vtt_path = f break if not vtt_path: vtt_path = files[0] try: with open(vtt_path, 'r', encoding='utf-8', errors='ignore') as fh: content = fh.read() try: cookie_mgr.cleanup() except Exception: pass return content, None except Exception as e: try: cookie_mgr.cleanup() except Exception: pass return None, f'Error leyendo archivo de subtítulos: {str(e)[:200]}' @app.post('/upload_vtt/{video_id}') async def upload_vtt(video_id: str, file: UploadFile = File(...)): """Permite subir un archivo VTT para un video y devuelve segmentos parseados y texto. Guarda el archivo en /app/data/{video_id}.vtt (sobrescribe si existe). """ try: content = await file.read() if not content: raise HTTPException(status_code=400, detail='Archivo vacío') target_dir = os.path.join(os.getcwd(), 'data') os.makedirs(target_dir, exist_ok=True) target_path = os.path.join(target_dir, f"{video_id}.vtt") with open(target_path, 'wb') as fh: fh.write(content) # Leer como texto para parsear text = content.decode('utf-8', errors='ignore') segments = parse_subtitle_format(text, 'vtt') if text else [] combined_text = '\n'.join([s.get('text','') for s in segments]) format_text = format_segments_text(segments) return { 'video_id': video_id, 'path': target_path, 'count': len(segments), 'segments': segments, 'text': combined_text, 'format_text': format_text } except Exception as e: raise HTTPException(status_code=500, detail=f'Error al guardar/parsear VTT: {str(e)[:200]}') @app.get('/transcript_alt/{video_id}') def transcript_alt(video_id: str, lang: str = 'es'): """Intento alternativo de obtener transcript usando youtube-transcript-api (si está disponible). Retorna segmentos en el mismo formato que get_transcript_data para mantener consistencia. """ if not YOUTUBE_TRANSCRIPT_API_AVAILABLE: raise HTTPException(status_code=501, detail='youtube-transcript-api no está instalado en el entorno.') vid = extract_video_id(video_id) if not vid: raise HTTPException(status_code=400, detail='video_id inválido') # preparar idiomas a probar langs = [lang] if len(lang) == 2: langs.append(f"{lang}-419") try: # get_transcript puede lanzar excepciones si no hay transcript # Usar cast para silenciar el analizador estático que no infiere la comprobación previa transcript_api = cast(Any, YouTubeTranscriptApi) transcript_list = transcript_api.get_transcript(vid, languages=langs) except NoTranscriptFound: raise HTTPException(status_code=404, detail='No se encontró transcript con youtube-transcript-api') except TranscriptsDisabled: raise HTTPException(status_code=403, detail='Los transcripts están deshabilitados para este video') except Exception as e: raise HTTPException(status_code=500, detail=f'Error youtube-transcript-api: {str(e)[:300]}') # transcript_list tiene items con keys: text, start, duration segments = [] for item in transcript_list: segments.append({ 'start': float(item.get('start', 0)), 'duration': float(item.get('duration', 0)), 'text': item.get('text', '').strip() }) combined_text = '\n'.join([s['text'] for s in segments if s.get('text')]) format_text = format_segments_text(segments) thumbnails = get_video_thumbnails(vid) return { 'video_id': vid, 'count': len(segments), 'segments': segments, 'text': combined_text, 'format_text': format_text, 'source': 'youtube-transcript-api' } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)