gnosticdev's picture
Update app.py
b6c6048 verified
raw
history blame
19 kB
import tempfile
import logging
import os
import asyncio
import gc
import psutil
from moviepy.editor import *
import edge_tts
import gradio as gr
from pydub import AudioSegment
# Configuración de Logs
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# CONSTANTES DE ARCHIVOS
INTRO_VIDEO = "introvideo.mp4"
OUTRO_VIDEO = "outrovideo.mp4"
MUSIC_BG = "musicafondo.mp3"
EJEMPLO_VIDEO = "ejemplo.mp4"
# CONSTANTES DE LIMITACIONES
MAX_VIDEO_SIZE = 200 * 1024 * 1024 # Tamaño máximo en bytes (200MB)
MAX_RESOLUTION = (640, 360) # Resolución máxima (360p para optimizar)
# Configuración de chunks
SEGMENT_DURATION = 30 # Duración exacta entre transiciones (sin overlap)
TRANSITION_DURATION = 1.5 # Duración del efecto slide
PROCESSING_CHUNK = 120 # Procesar en bloques de 2 minutos para optimizar memoria
# Validar existencia de archivos
for file in [INTRO_VIDEO, OUTRO_VIDEO, MUSIC_BG, EJEMPLO_VIDEO]:
if not os.path.exists(file):
logging.error(f"Falta archivo necesario: {file}")
raise FileNotFoundError(f"Falta: {file}")
def mostrar_uso_memoria():
proceso = psutil.Process(os.getpid())
memoria_uso = proceso.memory_info().rss / 1024 / 1024
logging.info(f"Uso de memoria: {memoria_uso:.2f} MB")
def eliminar_archivo_tiempo(ruta, delay=3600):
def eliminar():
try:
if os.path.exists(ruta):
os.remove(ruta)
logging.info(f"Archivo eliminado: {ruta}")
except Exception as e:
logging.error(f"Error al eliminar {ruta}: {e}")
from threading import Timer
Timer(delay, eliminar).start()
def validar_video(video_path):
try:
# Comprobar tamaño del archivo
file_size = os.path.getsize(video_path)
if file_size > MAX_VIDEO_SIZE:
logging.warning(f"El video excede el tamaño máximo: {file_size/1024/1024:.2f}MB > {MAX_VIDEO_SIZE/1024/1024}MB")
return False
# Validar que es un video
clip = VideoFileClip(video_path)
duracion = clip.duration
clip.close()
return True
except Exception as e:
logging.error(f"El video no es válido: {e}")
return False
def convertir_video(video_path):
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_converted:
output_path = tmp_converted.name
# Convertir a un formato más eficiente y con menor resolución para optimizar
os.system(f'ffmpeg -i "{video_path}" -vf "scale={MAX_RESOLUTION[0]}:{MAX_RESOLUTION[1]}" -c:v libx264 -crf 28 -preset ultrafast -c:a aac -b:a 96k "{output_path}" -y')
# Comprobar si ahora cumple las limitaciones de tamaño
if not validar_video(output_path):
# Si sigue sin cumplir, aumentar la compresión
os.system(f'ffmpeg -i "{output_path}" -vf "scale={MAX_RESOLUTION[0]}:{MAX_RESOLUTION[1]}" -c:v libx264 -crf 32 -preset ultrafast -c:a aac -b:a 64k "{output_path}.tmp" -y')
os.remove(output_path)
os.rename(f"{output_path}.tmp", output_path)
return output_path
except Exception as e:
logging.error(f"Error al convertir el video: {e}")
raise
async def generar_tts(texto, voz, duracion_total):
try:
if not texto.strip():
raise ValueError("El texto para TTS no puede estar vacío.")
# Limitar el texto a 1000 caracteres para procesar más rápido
if len(texto) > 1000:
texto = texto[:1000]
logging.info("Texto para TTS truncado a 1000 caracteres para optimizar rendimiento")
logging.info(f"Generando TTS con voz: {voz}")
communicate = edge_tts.Communicate(texto, voz)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_tts:
await communicate.save(tmp_tts.name)
tts_audio = AudioFileClip(tmp_tts.name)
if tts_audio.duration > duracion_total:
tts_audio = tts_audio.subclip(0, duracion_total)
return tts_audio, tmp_tts.name
except Exception as e:
logging.error(f"Fallo en TTS: {str(e)}")
raise
def crear_musica_fondo(duracion_total):
bg_music = AudioSegment.from_mp3(MUSIC_BG)
needed_ms = int(duracion_total * 1000)
repeticiones = needed_ms // len(bg_music) + 1
bg_music = bg_music * repeticiones
bg_music = bg_music[:needed_ms].fade_out(1000)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_bg:
bg_music.export(tmp_bg.name, format="mp3")
return AudioFileClip(tmp_bg.name).volumex(0.15), tmp_bg.name
def create_slide_transition(clip1, clip2, duration=TRANSITION_DURATION):
part1 = clip1.subclip(clip1.duration - duration)
part2 = clip2.subclip(0, duration)
transition = CompositeVideoClip([
part1.fx(vfx.fadeout, duration),
part2.fx(vfx.fadein, duration).set_position(
lambda t: ('center', MAX_RESOLUTION[1] - (MAX_RESOLUTION[1] * (t/duration)))
)
], size=MAX_RESOLUTION).set_duration(duration) # Reducido para optimizar
return transition
def liberar_memoria(objetos_cerrar=None):
"""Forzar liberación de memoria cerrando objetos y llamando al recolector de basura"""
if objetos_cerrar:
for obj in objetos_cerrar:
if obj is not None:
try:
obj.close()
except:
pass
# Forzar recolección de basura
gc.collect()
mostrar_uso_memoria()
async def procesar_video(video_input, texto_tts, voz_seleccionada, progress=gr.Progress()):
temp_files = []
intro, outro, video_original = None, None, None
segmentos_temp = []
try:
mostrar_uso_memoria()
logging.info("Iniciando procesamiento")
progress(0, desc="Validando video")
if not validar_video(video_input):
progress(0.05, desc="Optimizando formato de video")
video_input = convertir_video(video_input)
temp_files.append(video_input)
progress(0.1, desc="Preparando video")
# Reducir resolución para optimizar procesamiento
video_original = VideoFileClip(video_input)
duracion_video = video_original.duration
video_original.close() # Cerrar para liberar memoria
# Información importante sobre el video original
logging.info(f"Duración total del video: {duracion_video} segundos")
if duracion_video <= 0:
raise ValueError("El video debe tener una duración mayor que cero.")
progress(0.2, desc="Generando narración (TTS)")
tts_audio, tts_path = await generar_tts(texto_tts, voz_seleccionada, duracion_video)
temp_files.append(tts_path)
progress(0.3, desc="Preparando música de fondo")
bg_audio, bg_path = crear_musica_fondo(duracion_video)
temp_files.append(bg_path)
# Procesar por bloques para optimizar memoria
num_chunks = int(duracion_video // PROCESSING_CHUNK) + (1 if duracion_video % PROCESSING_CHUNK > 0 else 0)
logging.info(f"Procesando video en {num_chunks} bloques")
for chunk_idx in range(num_chunks):
chunk_start = chunk_idx * PROCESSING_CHUNK
chunk_end = min((chunk_idx + 1) * PROCESSING_CHUNK, duracion_video)
progress(0.35 + (0.45 * chunk_idx / num_chunks),
desc=f"Procesando bloque {chunk_idx+1}/{num_chunks} ({chunk_start:.1f}s - {chunk_end:.1f}s)")
# Cargar solo la porción del video que necesitamos
chunk_video = VideoFileClip(video_input).subclip(chunk_start, chunk_end)
# Extraer la porción de audio correspondiente a este bloque
# FIX: Corrección para evitar acceder a tiempo más allá de la duración del audio TTS
tts_chunk_end = min(chunk_end, tts_audio.duration)
chunk_tts = None
if chunk_start < tts_audio.duration:
chunk_tts = tts_audio.subclip(chunk_start, tts_chunk_end)
chunk_bg = bg_audio.subclip(chunk_start, chunk_end)
# Crear la mezcla de audio para este bloque
audio_chunks = [chunk_bg]
if chunk_video.audio:
audio_chunks.append(chunk_video.audio.volumex(0.5))
if chunk_tts:
audio_chunks.append(chunk_tts.volumex(0.85))
chunk_audio_final = CompositeAudioClip(audio_chunks)
chunk_video = chunk_video.set_audio(chunk_audio_final)
# Procesar las transiciones dentro de este chunk si es necesario
if chunk_end - chunk_start > SEGMENT_DURATION:
segments_in_chunk = []
segments_count = int((chunk_end - chunk_start) // SEGMENT_DURATION) + \
(1 if (chunk_end - chunk_start) % SEGMENT_DURATION > 0 else 0)
for i in range(segments_count):
seg_start = i * SEGMENT_DURATION
seg_end = min(seg_start + SEGMENT_DURATION, chunk_end - chunk_start)
segment = chunk_video.subclip(seg_start, seg_end)
if i == 0:
segments_in_chunk.append(segment)
else:
prev_segment = segments_in_chunk[-1]
transition = create_slide_transition(prev_segment, segment)
prev_end = prev_segment.duration - TRANSITION_DURATION
if prev_end > 0:
segments_in_chunk[-1] = prev_segment.subclip(0, prev_end)
segments_in_chunk.append(transition)
segments_in_chunk.append(segment)
chunk_processed = concatenate_videoclips(segments_in_chunk, method="compose")
else:
chunk_processed = chunk_video
# Guardar este chunk procesado como archivo temporal
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk{chunk_idx}.mp4") as chunk_file:
chunk_path = chunk_file.name
chunk_processed.write_videofile(
chunk_path,
codec="libx264",
audio_codec="aac",
preset="ultrafast",
bitrate="1M",
ffmpeg_params=["-crf", "28"],
verbose=False
)
segmentos_temp.append(chunk_path)
# Liberar memoria
chunk_video.close()
chunk_processed.close()
liberar_memoria()
# Liberar memoria antes de procesar intro/outro
liberar_memoria([tts_audio, bg_audio])
tts_audio = bg_audio = None
# Añadir intro y outro
progress(0.85, desc="Preparando intro y outro")
intro = VideoFileClip(INTRO_VIDEO, target_resolution=MAX_RESOLUTION)
with tempfile.NamedTemporaryFile(delete=False, suffix="_intro.mp4") as tmp_intro:
intro.write_videofile(
tmp_intro.name,
codec="libx264",
audio_codec="aac",
preset="ultrafast",
bitrate="1M",
ffmpeg_params=["-crf", "28"],
verbose=False
)
segmentos_temp.insert(0, tmp_intro.name) # Intro al principio
intro.close()
outro = VideoFileClip(OUTRO_VIDEO, target_resolution=MAX_RESOLUTION)
with tempfile.NamedTemporaryFile(delete=False, suffix="_outro.mp4") as tmp_outro:
outro.write_videofile(
tmp_outro.name,
codec="libx264",
audio_codec="aac",
preset="ultrafast",
bitrate="1M",
ffmpeg_params=["-crf", "28"],
verbose=False
)
segmentos_temp.append(tmp_outro.name) # Outro al final
outro.close()
# Unir todos los segmentos con ffmpeg
progress(0.9, desc="Generando video final")
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as concat_file:
# Escribir archivo de lista para concatenación
for segment in segmentos_temp:
concat_file.write(f"file '{segment}'\n".encode())
concat_path = concat_file.name
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_final:
output_path = tmp_final.name
os.system(f'ffmpeg -f concat -safe 0 -i "{concat_path}" -c copy "{output_path}" -y')
# Limpiar archivos temporales
os.remove(concat_path)
for segment in segmentos_temp:
if os.path.exists(segment):
os.remove(segment)
eliminar_archivo_tiempo(output_path, 3600) # Eliminación después de 1 hora
progress(1.0, desc="¡Video listo!")
logging.info(f"Video final guardado: {output_path}")
mostrar_uso_memoria()
return output_path
except Exception as e:
logging.error(f"Fallo general: {str(e)}")
raise
finally:
try:
liberar_memoria([video_original, intro, outro])
for file in temp_files:
try:
if os.path.exists(file):
os.remove(file)
except Exception as e:
logging.warning(f"Error limpiando {file}: {e}")
for segment in segmentos_temp:
try:
if os.path.exists(segment):
os.remove(segment)
except Exception as e:
logging.warning(f"Error limpiando segmento {segment}: {e}")
except Exception as e:
logging.warning(f"Error al cerrar recursos: {str(e)}")
# Interfaz Gradio
with gr.Blocks() as demo:
gr.Markdown("# Editor de Video con IA")
with gr.Tab("Principal"):
video_input = gr.Video(label="Subir video")
texto_tts = gr.Textbox(
label="Texto para TTS (máx. 1000 caracteres)",
lines=3,
placeholder="Escribe aquí tu texto..."
)
voz_seleccionada = gr.Dropdown(
label="Voz",
choices=[
"es-ES-AlvaroNeural", "es-MX-BeatrizNeural",
"es-ES-ElviraNeural", "es-MX-JavierNeural",
"es-AR-ElenaNeural", "es-AR-TomasNeural",
"es-CL-CatalinaNeural", "es-CL-LorenzoNeural",
"es-CO-SofiaNeural", "es-CO-GonzaloNeural",
"es-PE-CamilaNeural", "es-PE-AlexNeural",
"es-VE-MariaNeural", "es-VE-ManuelNeural",
"es-US-AlonsoNeural", "es-US-PalomaNeural",
"es-ES-AbrilNeural", "es-ES-DarioNeural",
"es-ES-HelenaRUS", "es-ES-LauraNeural",
"es-ES-PabloNeural", "es-ES-TriniNeural",
"en-US-AriaNeural", "en-US-GuyNeural",
"en-US-JennyNeural", "en-US-AmberNeural",
"en-US-AnaNeural", "en-US-AshleyNeural",
"en-US-BrandonNeural", "en-US-ChristopherNeural",
"en-US-CoraNeural", "en-US-DavisNeural",
"en-US-ElizabethNeural", "en-US-EricNeural",
"en-US-GinaNeural", "en-US-JacobNeural",
"en-US-JaneNeural", "en-US-JasonNeural",
"en-US-MichelleNeural", "en-US-MonicaNeural",
"en-US-SaraNeural", "en-US-SteffanNeural",
"en-US-TonyNeural", "en-US-YaraNeural",
"fr-FR-AlainNeural", "fr-FR-BrigitteNeural",
"fr-FR-CelesteNeural", "fr-FR-ClaudeNeural",
"fr-FR-CoralieNeural", "fr-FR-DeniseNeural",
"fr-FR-EloiseNeural", "fr-FR-HenriNeural",
"fr-FR-JacquelineNeural", "fr-FR-JeromeNeural",
"fr-FR-JosephineNeural", "fr-FR-MauriceNeural",
"fr-FR-YvesNeural", "fr-FR-YvetteNeural",
"de-DE-AmalaNeural", "de-DE-BerndNeural",
"de-DE-ChristophNeural", "de-DE-ConradNeural",
"de-DE-ElkeNeural", "de-DE-GiselaNeural",
"de-DE-KasperNeural", "de-DE-KatjaNeural",
"de-DE-KillianNeural", "de-DE-KlarissaNeural",
"de-DE-KlausNeural", "de-DE-LouisaNeural",
"de-DE-MajaNeural", "de-DE-RalfNeural",
"de-DE-TanjaNeural", "de-DE-ViktoriaNeural",
"it-IT-BenignoNeural", "it-IT-CalimeroNeural",
"it-IT-CataldoNeural", "it-IT-DiegoNeural",
"it-IT-ElsaNeural", "it-IT-FabiolaNeural",
"it-IT-GianniNeural", "it-IT-ImeldaNeural",
"it-IT-IrmaNeural", "it-IT-IsabellaNeural",
"it-IT-LisandroNeural", "it-IT-PalmiraNeural",
"it-IT-PierinaNeural", "it-IT-RinaldoNeural",
"ja-JP-AoiNeural", "ja-JP-DaichiNeural",
"ja-JP-HarukaNeural", "ja-JP-KeitaNeural",
"ja-JP-MayuNeural", "ja-JP-NanamiNeural",
"ja-JP-NaokiNeural", "ja-JP-ShioriNeural"
],
value="es-ES-AlvaroNeural"
)
procesar_btn = gr.Button("Generar Video (Modo Optimizado)")
video_output = gr.Video(label="Video Procesado")
with gr.Accordion("Ejemplos de Uso", open=False):
gr.Examples(
examples=[[EJEMPLO_VIDEO, "¡Hola! Esto es una prueba. Suscríbete al canal."]],
inputs=[video_input, texto_tts],
label="Ejemplos"
)
procesar_btn.click(
procesar_video,
inputs=[video_input, texto_tts, voz_seleccionada],
outputs=video_output
)
gr.Markdown("""
### ℹ️ Notas importantes:
- **Optimizaciones para Hugging Face Spaces:**
- Procesamiento por bloques para videos largos
- Máximo tamaño de archivo: 200MB
- Resolución reducida a 640x360 para procesamiento más rápido
- Texto TTS limitado a 1000 caracteres
- Las transiciones ocurren cada 30 segundos
- El video contiene intro y outro predefinidos
- El archivo generado se elimina después de 1 hora
- Para videos de alta calidad, considera usar este código localmente
""")
if __name__ == "__main__":
# Instalar psutil si no está disponible
try:
import psutil
except ImportError:
os.system("pip install psutil")
import psutil
demo.queue().launch()