TubeScript-API/tools/generate_proxy_whitelist.py
Cesar Mendivil 2923510c51 Add Playwright tools for extracting M3U8 URLs and proxy management
- Introduced `playwright_extract_m3u8.py` to extract M3U8 URLs from YouTube videos using Playwright.
- Added `README_PLAYWRIGHT.md` for usage instructions and requirements.
- Created `expand_and_test_proxies.py` to expand user-provided proxies and test their validity.
- Implemented `generate_proxy_whitelist.py` to generate a whitelist of working proxies based on testing results.
- Added sample proxy files: `user_proxies.txt` for user-defined proxies and `proxies_sample.txt` as a template.
- Generated `expanded_proxies.txt`, `whitelist.json`, and `whitelist.txt` for storing expanded and valid proxies.
- Included error handling and logging for proxy testing results.
2026-03-17 00:29:51 -07:00

243 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
generate_proxy_whitelist.py
Lee una lista de proxies desde un archivo (proxies.txt), prueba cada proxy con yt-dlp
intentando descargar metadata mínimo de YouTube, mide latencia y genera:
- whitelist.json : lista estructurada de proxies con estado y métricas
- whitelist.txt : solo proxies válidos, ordenados por latencia
Formato de proxies.txt: una URL por línea, ejemplos:
socks5h://127.0.0.1:1080
http://10.0.0.1:3128
Uso:
python3 tools/generate_proxy_whitelist.py --input tools/proxies.txt --out tools/whitelist.json --test-url https://www.youtube.com/watch?v=dQw4w9WgXcQ
Notas:
- Requiere tener `yt-dlp` instalado en el entorno donde se ejecuta este script.
- Este script intenta usar yt-dlp porque valida directamente que el proxy funciona
para las llamadas a YouTube (incluye manejo de JS/firma en yt-dlp cuando aplique).
- Ajusta timeouts y pruebas por concurrencia según tus necesidades.
"""
import argparse
import json
import subprocess
import time
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import requests
# Mensajes que indican bloqueo/bot-check de yt-dlp
BOT_MARKERS = ("sign in to confirm", "not a bot", "sign in to", "HTTP Error 403", "HTTP Error 429")
def test_proxy(proxy: str, test_url: str, timeout: int = 25) -> dict:
"""Prueba un proxy ejecutando yt-dlp --dump-json sobre test_url.
Retorna dict con info: proxy, ok, rc, stderr, elapsed_ms, stdout_preview
"""
proxy = proxy.strip()
if not proxy:
return {"proxy": proxy, "ok": False, "error": "empty"}
cmd = [
"yt-dlp",
"--skip-download",
"--dump-json",
"--no-warnings",
"--extractor-args", "youtube:player_client=tv_embedded",
"--socket-timeout", "10",
test_url,
"--proxy", proxy,
]
start = time.perf_counter()
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
elapsed = (time.perf_counter() - start) * 1000.0
stdout = proc.stdout or ""
stderr = proc.stderr or ""
rc = proc.returncode
# heurística de éxito: rc == 0 y stdout no vacío y no markers de bot en stderr
stderr_low = stderr.lower()
bot_hit = any(m.lower() in stderr_low for m in BOT_MARKERS)
ok = (rc == 0 and stdout.strip() != "" and not bot_hit)
return {
"proxy": proxy,
"ok": ok,
"rc": rc,
"elapsed_ms": int(elapsed),
"bot_detected": bool(bot_hit),
"stderr_preview": stderr[:1000],
"stdout_preview": stdout[:2000],
}
except subprocess.TimeoutExpired:
elapsed = (time.perf_counter() - start) * 1000.0
return {"proxy": proxy, "ok": False, "error": "timeout", "elapsed_ms": int(elapsed)}
except FileNotFoundError:
return {"proxy": proxy, "ok": False, "error": "yt-dlp-not-found"}
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000.0
return {"proxy": proxy, "ok": False, "error": str(e), "elapsed_ms": int(elapsed)}
def generate_whitelist(input_file: str, out_json: str, out_txt: str, test_url: str, concurrency: int = 6):
proxies = []
with open(input_file, 'r', encoding='utf-8') as fh:
for line in fh:
line = line.strip()
if not line or line.startswith('#'):
continue
proxies.append(line)
results = []
with ThreadPoolExecutor(max_workers=concurrency) as ex:
futures = {ex.submit(test_proxy, p, test_url): p for p in proxies}
for fut in as_completed(futures):
try:
r = fut.result()
except Exception as e:
r = {"proxy": futures[fut], "ok": False, "error": str(e)}
results.append(r)
print(f"Tested: {r.get('proxy')} ok={r.get('ok')} rc={r.get('rc', '-') } elapsed={r.get('elapsed_ms','-')}ms")
# Ordenar proxies válidos por elapsed asc
valid = [r for r in results if r.get('ok')]
valid_sorted = sorted(valid, key=lambda x: x.get('elapsed_ms', 999999))
# Guardar JSON completo
out = {"tested_at": int(time.time()), "test_url": test_url, "results": results, "valid_count": len(valid_sorted)}
with open(out_json, 'w', encoding='utf-8') as fh:
json.dump(out, fh, indent=2, ensure_ascii=False)
# Guardar lista TXT (whitelist) con orden preferido
with open(out_txt, 'w', encoding='utf-8') as fh:
for r in valid_sorted:
fh.write(r['proxy'] + '\n')
return out, valid_sorted
def _extract_proxies_from_json(obj):
"""Dado un objeto JSON (parsed), intenta extraer una lista de proxies en forma de URLs.
Soporta varias estructuras comunes:
- lista simple de strings: ["socks5h://1.2.3.4:1080", ...]
- lista de objetos con keys como ip, port, protocol
- objetos anidados con 'proxy' o 'url' o 'address'
"""
proxies = []
if isinstance(obj, list):
for item in obj:
if isinstance(item, str):
proxies.append(item.strip())
elif isinstance(item, dict):
# intentar keys comunes
# ejemplos: {"ip":"1.2.3.4","port":1080, "protocol":"socks5"}
ip = item.get('ip') or item.get('host') or item.get('address') or item.get('ip_address')
port = item.get('port') or item.get('p')
proto = item.get('protocol') or item.get('proto') or item.get('type') or item.get('scheme')
if ip and port:
proto = proto or 'http'
proxies.append(f"{proto}://{ip}:{port}")
continue
# buscar valores en keys que puedan contener url
for k in ('proxy','url','address','connect'):
v = item.get(k)
if isinstance(v, str) and v.strip():
proxies.append(v.strip())
break
elif isinstance(obj, dict):
# encontrar listas dentro del dict
for v in obj.values():
if isinstance(v, (list, dict)):
proxies.extend(_extract_proxies_from_json(v))
# si el dict mismo tiene un campo 'proxy' o similar
for k in ('proxies','list','data'):
if k in obj and isinstance(obj[k], (list,dict)):
proxies.extend(_extract_proxies_from_json(obj[k]))
return [p for p in proxies if p]
def download_and_write_proxies(url: str, out_file: str) -> int:
"""Descarga JSON desde `url`, extrae proxies y las escribe en `out_file`.
Retorna número de proxies escritas.
"""
try:
r = requests.get(url, timeout=30)
r.raise_for_status()
data = r.json()
except Exception as e:
raise RuntimeError(f"Error descargando/parsing JSON desde {url}: {e}")
proxies = _extract_proxies_from_json(data)
# normalizar: si la entrada es 'ip:port' convertir a http://ip:port
normalized = []
for p in proxies:
p = p.strip()
if not p:
continue
# si es 'ip:port' o 'ip port'
if ':' in p and not p.lower().startswith(('http://','https://','socks5://','socks5h://','socks4://')):
# asumir http
normalized.append('http://' + p)
else:
normalized.append(p)
# dedup preserving order
seen = set()
out = []
for p in normalized:
if p in seen:
continue
seen.add(p)
out.append(p)
if not out:
# como fallback, si JSON es una estructura plana de objetos con 'ip' y 'port'
# ya manejado, si nada, error
raise RuntimeError(f"No se extrajeron proxies del JSON: {url}")
with open(out_file, 'w', encoding='utf-8') as fh:
for p in out:
fh.write(p + '\n')
return len(out)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Test a list of proxies with yt-dlp and generate a whitelist')
parser.add_argument('--input', default='tools/proxies.txt', help='Input file with proxies (one per line)')
parser.add_argument('--out-json', default='tools/whitelist.json', help='Output JSON results')
parser.add_argument('--out-txt', default='tools/whitelist.txt', help='Output whitelist (one proxy per line)')
parser.add_argument('--test-url', default='https://www.youtube.com/watch?v=dQw4w9WgXcQ', help='YouTube test URL to use')
parser.add_argument('--concurrency', type=int, default=6, help='Concurrent workers')
parser.add_argument('--from-url', default='', help='Download a JSON of proxies from a URL and use it as input')
args = parser.parse_args()
# If from-url provided, download and write to temporary input file
input_file = args.input
temp_written = False
try:
if args.from_url:
print(f"Downloading proxies JSON from: {args.from_url}")
written = download_and_write_proxies(args.from_url, input_file)
print(f"Wrote {written} proxies to {input_file}")
temp_written = True
if not os.path.exists(input_file):
print(f"Input file {input_file} not found. Create it with one proxy per line or use --from-url.")
raise SystemExit(1)
out, valid_sorted = generate_whitelist(input_file, args.out_json, args.out_txt, args.test_url, args.concurrency)
print('\nSummary:')
print(f" Tested: {len(out['results'])}, Valid: {len(valid_sorted)}")
print(f" JSON: {args.out_json}, TXT whitelist: {args.out_txt}")
finally:
# optionally remove temp file? keep it for inspection
pass