Add format_segments_text function to clean and format transcript segments

This commit is contained in:
cesarmendivil 2026-02-22 16:02:36 -07:00
parent f7cb65cbc0
commit f8924a2965

61
main.py
View File

@ -7,7 +7,7 @@ import re
import tempfile
import glob
from fastapi import FastAPI, HTTPException, UploadFile, File
from typing import List, Dict
from typing import List, Dict, Any, cast
# Intentar importar youtube_transcript_api como fallback
try:
@ -147,6 +147,39 @@ def extract_video_id(video_id_or_url: str) -> str:
return s
def format_segments_text(segments: List[Dict]) -> List[str]:
"""Devuelve una lista 'format_text' con textos limpios extraídos de segments.
- elimina prefijos tipo 'Kind: captions'
- elimina contenido en corchetes/paréntesis
- elimina etiquetas HTML
- normaliza espacios
- divide por saltos de línea para obtener frases independientes
"""
def _clean_text(t: str) -> str:
if not t:
return ''
s = str(t).strip()
s = re.sub(r'^\s*Kind\s*:\s*.*$', '', s, flags=re.IGNORECASE).strip()
# eliminar contenido entre corchetes, patrón seguro para corchetes
s = re.sub(r'\[[^\]]*\]', '', s)
s = re.sub(r'\(.*?\)', '', s)
s = re.sub(r'<[^>]+>', '', s)
s = re.sub(r'[♪★■◆►▶◀•–—]', '', s)
s = re.sub(r'\s+', ' ', s).strip()
return s
output: List[str] = []
for seg in segments or []:
raw = seg.get('text', '')
cleaned = _clean_text(raw)
if not cleaned:
continue
parts = [p.strip() for p in re.split(r'[\n\r]+', cleaned) if p.strip()]
output.extend(parts)
return output
def get_transcript_data(video_id: str, lang: str = "es"):
video_id = extract_video_id(video_id)
if not video_id:
@ -642,11 +675,20 @@ def transcript_endpoint(video_id: str, lang: str = "es"):
except Exception:
combined_text = ""
# Nuevo: arreglo format_text con cada segmento como elemento (texto limpio)
try:
format_text_list = format_segments_text(data)
except Exception:
format_text_list = []
format_text = format_text_list
return {
"video_id": video_id,
"count": len(data),
"segments": data,
"text": combined_text
"text": combined_text,
"format_text": format_text
}
@app.get("/stream/{video_id}")
@ -989,13 +1031,16 @@ def transcript_vtt(video_id: str, lang: str = 'es'):
segments = parse_subtitle_format(vtt_text, 'vtt') if vtt_text else []
combined_text = '\n'.join([s.get('text','') for s in segments])
# format_text con texto limpio listo para procesamiento por agentes
format_text = format_segments_text(segments)
return {
'video_id': video_id,
'vtt': vtt_text,
'count': len(segments),
'segments': segments,
'text': combined_text
'text': combined_text,
'format_text': format_text
}
@app.post('/upload_vtt/{video_id}')
@ -1019,13 +1064,15 @@ async def upload_vtt(video_id: str, file: UploadFile = File(...)):
text = content.decode('utf-8', errors='ignore')
segments = parse_subtitle_format(text, 'vtt') if text else []
combined_text = '\n'.join([s.get('text','') for s in segments])
format_text = format_segments_text(segments)
return {
'video_id': video_id,
'path': target_path,
'count': len(segments),
'segments': segments,
'text': combined_text
'text': combined_text,
'format_text': format_text
}
except Exception as e:
@ -1050,7 +1097,9 @@ def transcript_alt(video_id: str, lang: str = 'es'):
try:
# get_transcript puede lanzar excepciones si no hay transcript
transcript_list = YouTubeTranscriptApi.get_transcript(vid, languages=langs)
# Usar cast para silenciar el analizador estático que no infiere la comprobación previa
transcript_api = cast(Any, YouTubeTranscriptApi)
transcript_list = transcript_api.get_transcript(vid, languages=langs)
except NoTranscriptFound:
raise HTTPException(status_code=404, detail='No se encontró transcript con youtube-transcript-api')
except TranscriptsDisabled:
@ -1068,12 +1117,14 @@ def transcript_alt(video_id: str, lang: str = 'es'):
})
combined_text = '\n'.join([s['text'] for s in segments if s.get('text')])
format_text = format_segments_text(segments)
return {
'video_id': vid,
'count': len(segments),
'segments': segments,
'text': combined_text,
'format_text': format_text,
'source': 'youtube-transcript-api'
}