AvanzaCast/packages/broadcast-panel/e2e/gemini_log_agent.py
Cesar Mendivil 8b458a3ddf feat: add initial LiveKit Meet integration with utility scripts, configs, and core components
- Add Next.js app structure with base configs, linting, and formatting
- Implement LiveKit Meet page, types, and utility functions
- Add Docker, Compose, and deployment scripts for backend and token server
- Provide E2E and smoke test scaffolding with Puppeteer and Playwright helpers
- Include CSS modules and global styles for UI
- Add postMessage and studio integration utilities
- Update package.json with dependencies and scripts for development and testing
2025-11-20 12:50:38 -07:00

273 lines
11 KiB
Python

#!/usr/bin/env python3
"""
gemini_log_agent.py
Script local que interpreta un prompt en lenguaje natural (heurístico) y devuelve
archivos de log o información de diagnóstico útiles para el flujo E2E/backend/frontend.
Diseñado para usarse junto al runner Playwright y el orquestador que hay en el repo.
Uso básico:
python3 packages/broadcast-panel/e2e/gemini_log_agent.py --prompt "muéstrame logs backend" --lines 200
python3 packages/broadcast-panel/e2e/gemini_log_agent.py --interactive
Notas:
- No llama a servicios externos por defecto. Si quieres integrar un LLM, se puede añadir
soporte para pasar el prompt a una API y usar la respuesta para decidir acciones.
- Solo ejecuta comandos seguros predefinidos (leer archivos de log, listar artefactos).
"""
from __future__ import annotations
import argparse
import shlex
import subprocess
import sys
import os
from typing import List, Tuple, Dict
# Configuración: archivos y rutas que el agente puede leer
LOG_FILES = {
'backend': [
'/tmp/backend_run.log',
'/tmp/e2e-backend.log',
'packages/backend-api/logs/backend.log'
],
'frontend': [
'/tmp/e2e-frontend.log',
'packages/broadcast-panel/vite-dev.log'
],
'playwright': [
'/tmp/e2e-playwright.log'
],
'plugin': [
'/tmp/e2e-plugin.log',
'/tmp/dify-plugin-output.log'
],
'prisma': [
'/tmp/backend_prisma_generate.log',
'/tmp/backend_api_npm_install_verbose.log',
'/tmp/backend_api_install_verbose.log'
],
'artifacts': [
'/tmp/e2e-artifacts'
],
'screenshot': [
'/tmp/py-playwright-shot.png',
'/tmp/dify-shot.png'
]
}
# Prefer repo-local out directory for artifacts
REPO_OUT_DIR = os.path.join(os.path.dirname(__file__), 'out')
REPO_OUT_DIR = os.path.abspath(REPO_OUT_DIR)
# Ensure it exists
os.makedirs(REPO_OUT_DIR, exist_ok=True)
# Update LOG_FILES to prefer repo-relative out dir instead of /tmp where possible
LOG_FILES['artifacts'] = [REPO_OUT_DIR]
# map screenshots to out dir
LOG_FILES['screenshot'] = [os.path.join(REPO_OUT_DIR, 'py-playwright-shot.png'), os.path.join(REPO_OUT_DIR, 'dify-shot.png')]
# Limits
MAX_OUTPUT_LINES = 500
MAX_BYTES = 200000 # 200KB cap per file
def run_cmd(cmd: List[str], timeout: int = 10) -> Tuple[int, str, str]:
try:
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout)
return proc.returncode, proc.stdout, proc.stderr
except subprocess.TimeoutExpired as e:
return 124, '', f'Timeout after {timeout}s'
except Exception as e:
return 1, '', str(e)
def tail_file(path: str, lines: int = 200) -> str:
if not os.path.exists(path):
return f'(no existe) {path}'
# Try using tail for efficiency
cmd = ['tail', f'-n{lines}', path]
code, out, err = run_cmd(cmd)
if code == 0 and out:
if len(out.encode('utf-8')) > MAX_BYTES:
# truncate by bytes
return out.encode('utf-8')[:MAX_BYTES].decode('utf-8', errors='replace') + '\n...[truncated by bytes]'
return out
# fallback: read file manually
try:
with open(path, 'rb') as f:
data = f.read(MAX_BYTES + 1)
if len(data) > MAX_BYTES:
return data[:MAX_BYTES].decode('utf-8', errors='replace') + '\n...[truncated by bytes]'
return data.decode('utf-8', errors='replace')
except Exception as e:
return f'(error reading {path}): {e}'
def list_artifact_dir(path: str) -> str:
if not os.path.exists(path):
return f'(no existe) {path}'
try:
cmd = ['ls', '-la', path]
code, out, err = run_cmd(cmd)
if code == 0:
return out
return f'(ls error) {err}'
except Exception as e:
return f'(error listing {path}): {e}'
def interpret_prompt(prompt: str) -> Dict[str, object]:
"""Heuristics to map a prompt to actions.
Returns dict with keys: action: 'get_logs'|'list_artifacts'|'run_session' etc., and params.
"""
p = prompt.lower()
# simple checks
if 'backend' in p or 'api' in p or 'session' in p and ('log' in p or 'logs' in p or 'error' in p):
return {'action': 'get_logs', 'target': 'backend'}
if 'frontend' in p or 'vite' in p or 'broadcast-panel' in p:
return {'action': 'get_logs', 'target': 'frontend'}
if 'playwright' in p or 'browser' in p or 'puppeteer' in p:
return {'action': 'get_logs', 'target': 'playwright'}
if 'plugin' in p or 'dify' in p:
return {'action': 'get_logs', 'target': 'plugin'}
if 'prisma' in p or 'db' in p or 'database' in p:
return {'action': 'get_logs', 'target': 'prisma'}
if 'artifacts' in p or 'screenshot' in p or 'artifact' in p:
return {'action': 'list_artifacts', 'target': 'artifacts'}
if 'screenshot' in p or 'imagen' in p or 'captura' in p:
return {'action': 'get_screenshot', 'target': 'screenshot'}
if 'crear' in p or 'create session' in p or 'start session' in p or 'crear sesión' in p:
return {'action': 'run_session', 'target': 'backend', 'room': 'test-room', 'username': 'e2e-agent'}
# default: fallback to all logs
return {'action': 'get_logs', 'target': 'all'}
def handle_action(mapping: Dict[str, object], lines: int = 200) -> str:
action = mapping.get('action')
target = mapping.get('target')
output_parts: List[str] = []
if action == 'get_logs':
targets = [target] if target != 'all' else list(LOG_FILES.keys())
for t in targets:
output_parts.append(f'==== LOGS: {t} ====' )
paths = LOG_FILES.get(t, [])
if not paths:
output_parts.append('(no configured paths)')
continue
for p in paths:
# if path points to directory, skip
if os.path.isdir(p):
output_parts.append(f'(es dir) {p}:')
output_parts.append(list_artifact_dir(p))
continue
output_parts.append(f'-- file: {p} --')
output_parts.append(tail_file(p, lines))
return '\n'.join(output_parts)
if action == 'list_artifacts':
p = LOG_FILES.get('artifacts', ['/tmp/e2e-artifacts'])[0]
return list_artifact_dir(p)
if action == 'get_screenshot':
parts = []
for p in LOG_FILES.get('screenshot', []):
parts.append(f'-- screenshot candidate: {p} --')
if os.path.exists(p):
try:
st = os.stat(p)
parts.append(f'exists: size={st.st_size} bytes')
except Exception as e:
parts.append(f'error stat: {e}')
else:
parts.append('(no existe)')
return '\n'.join(parts)
if action == 'run_session':
# invoke the playwright python runner with create-session
backend = mapping.get('backend', 'http://localhost:4000')
room = mapping.get('room', 'test-room')
username = mapping.get('username', 'e2e-agent')
out = os.path.join(REPO_OUT_DIR, 'py-playwright-shot-from-agent.png')
# if ws endpoint provided in mapping or env, include it
ws = mapping.get('ws') or os.environ.get('PLAYWRIGHT_WS')
cmd = ['python3', 'packages/broadcast-panel/e2e/playwright_py_runner.py', '--create-session', '--backend', backend, '--room', room, '--username', username, '--out', out]
if ws:
cmd.extend(['--ws', ws])
output_parts.append(f'Running: {shlex.join(cmd)}')
code, outp, err = run_cmd(cmd, timeout=120)
output_parts.append(f'Exit {code}')
if outp:
output_parts.append('STDOUT:')
output_parts.append(outp)
if err:
output_parts.append('STDERR:')
output_parts.append(err)
if os.path.exists(out):
st = os.stat(out)
output_parts.append(f'Screenshot generated: {out} (size={st.st_size} bytes)')
else:
output_parts.append('No screenshot generated')
return '\n'.join(output_parts)
return f'Unknown action: {action} for target {target}'
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--prompt', '-p', help='Prompt en lenguaje natural que indica qué logs quieres')
ap.add_argument('--lines', type=int, default=200, help='Número de líneas a mostrar por log')
ap.add_argument('--interactive', action='store_true', help='Modo interactivo (leer prompts hasta Ctrl+C)')
ap.add_argument('--log-file', default=None, help='Ruta de archivo donde se guardará el resultado (append). Si no se proporciona, solo se imprime en stdout')
ap.add_argument('--overwrite-log', action='store_true', help='Si se usa --log-file y --overwrite-log, el archivo será sobrescrito en lugar de anexado')
ap.add_argument('--backend', default=None, help='URL base del backend (ej. http://localhost:4000). Si se especifica, será usada por run_session')
ap.add_argument('--ws', default=None, help='Playwright run-server WS endpoint (ej. ws://192.168.1.20:3003). Si se especifica, será usada por run_session')
args = ap.parse_args()
# Normalize log-file: prefer a repo-relative ./.tmp directory to avoid /tmp space issues
# If user provided an absolute /tmp path, convert to ./.tmp/<basename>
if args.log_file:
lf = args.log_file
try:
if os.path.isabs(lf) and lf.startswith('/tmp'):
lf = os.path.join('.tmp', os.path.basename(lf))
except Exception:
pass
args.log_file = lf
else:
# default relative path inside repo
args.log_file = os.path.join('.tmp', 'gemini_agent_output.log')
# Main loop: interactively process prompts or single prompt from args
try:
if args.interactive:
print("Modo interactivo. Escribe tu prompt (o 'exit' para salir):")
while True:
try:
prompt = input('> ')
if prompt.lower() in ['exit', 'quit', 'salir']:
break
mapping = interpret_prompt(prompt)
result = handle_action(mapping, args.lines)
print(result)
except Exception as e:
print(f'Error procesando prompt: {e}', file=sys.stderr)
else:
# un solo prompt desde args
mapping = interpret_prompt(args.prompt or '')
result = handle_action(mapping, args.lines)
print(result)
# write to log-file if requested
try:
if args.log_file:
log_dir = os.path.dirname(args.log_file) or '.'
os.makedirs(log_dir, exist_ok=True)
mode = 'w' if args.overwrite_log else 'a'
from datetime import datetime
header = f"[{datetime.utcnow().isoformat()}Z] PROMPT: {args.prompt}\n"
with open(args.log_file, mode, encoding='utf-8') as f:
f.write(header)
f.write(result)
f.write('\n')
except Exception as e:
print(f"[agent][log-error] Failed to write log file {args.log_file}: {e}", file=sys.stderr)
except Exception as e:
print(f'Error en el agente: {e}', file=sys.stderr)