import tempfile import logging import os import asyncio import gc import psutil from moviepy.editor import * import edge_tts import gradio as gr from pydub import AudioSegment # Configuración de Logs logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # CONSTANTES DE ARCHIVOS INTRO_VIDEO = "introvideo.mp4" OUTRO_VIDEO = "outrovideo.mp4" MUSIC_BG = "musicafondo.mp3" EJEMPLO_VIDEO = "ejemplo.mp4" # CONSTANTES DE LIMITACIONES MAX_VIDEO_SIZE = 200 * 1024 * 1024 # Tamaño máximo en bytes (200MB) # Configuración de chunks SEGMENT_DURATION = 30 # Duración exacta entre transiciones (sin overlap) TRANSITION_DURATION = 1.5 # Duración del efecto slide PROCESSING_CHUNK = 120 # Procesar en bloques de 2 minutos para optimizar memoria # Validar existencia de archivos for file in [INTRO_VIDEO, OUTRO_VIDEO, MUSIC_BG, EJEMPLO_VIDEO]: if not os.path.exists(file): logging.error(f"Falta archivo necesario: {file}") raise FileNotFoundError(f"Falta: {file}") def mostrar_uso_memoria(): proceso = psutil.Process(os.getpid()) memoria_uso = proceso.memory_info().rss / 1024 / 1024 logging.info(f"Uso de memoria: {memoria_uso:.2f} MB") def eliminar_archivo_tiempo(ruta, delay=3600): def eliminar(): try: if os.path.exists(ruta): os.remove(ruta) logging.info(f"Archivo eliminado: {ruta}") except Exception as e: logging.error(f"Error al eliminar {ruta}: {e}") from threading import Timer Timer(delay, eliminar).start() def validar_video(video_path): try: # Comprobar tamaño del archivo file_size = os.path.getsize(video_path) if file_size > MAX_VIDEO_SIZE: logging.warning(f"El video excede el tamaño máximo: {file_size/1024/1024:.2f}MB > {MAX_VIDEO_SIZE/1024/1024}MB") return False # Validar que es un video clip = VideoFileClip(video_path) duracion = clip.duration fps = clip.fps logging.info(f"Video validado: duración={duracion}s, fps={fps}") clip.close() return True except Exception as e: logging.error(f"El video no es válido: {e}") return False def obtener_info_video(video_path): """Obtiene información básica del video como FPS, duración y tamaño""" try: clip = VideoFileClip(video_path) info = { "fps": clip.fps, "duration": clip.duration, "size": clip.size } clip.close() return info except Exception as e: logging.error(f"Error al obtener info del video: {e}") return {"fps": 30, "duration": 0, "size": (640, 360)} # valores por defecto def convertir_video(video_path): try: # Obtener FPS del video original para mantenerlo info = obtener_info_video(video_path) fps = info["fps"] if info["fps"] else 30 # Valor por defecto si no se puede determinar with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_converted: output_path = tmp_converted.name # Convertir a un formato más eficiente pero manteniendo la resolución original Y el framerate os.system(f'ffmpeg -i "{video_path}" -c:v libx264 -crf 28 -preset ultrafast -r {fps} -c:a aac -b:a 96k "{output_path}" -y') # Comprobar si ahora cumple las limitaciones de tamaño if not validar_video(output_path): # Si sigue sin cumplir, aumentar la compresión pero sin cambiar la resolución os.system(f'ffmpeg -i "{output_path}" -c:v libx264 -crf 32 -preset ultrafast -r {fps} -c:a aac -b:a 64k "{output_path}.tmp" -y') os.remove(output_path) os.rename(f"{output_path}.tmp", output_path) return output_path except Exception as e: logging.error(f"Error al convertir el video: {e}") raise async def generar_tts(texto, voz, duracion_total): try: if not texto.strip(): raise ValueError("El texto para TTS no puede estar vacío.") # Limitar el texto a 1000 caracteres para procesar más rápido if len(texto) > 1000: texto = texto[:1000] logging.info("Texto para TTS truncado a 1000 caracteres para optimizar rendimiento") logging.info(f"Generando TTS con voz: {voz}") communicate = edge_tts.Communicate(texto, voz) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_tts: await communicate.save(tmp_tts.name) tts_audio = AudioFileClip(tmp_tts.name) if tts_audio.duration > duracion_total: tts_audio = tts_audio.subclip(0, duracion_total) return tts_audio, tmp_tts.name except Exception as e: logging.error(f"Fallo en TTS: {str(e)}") raise def crear_musica_fondo(duracion_total): bg_music = AudioSegment.from_mp3(MUSIC_BG) needed_ms = int(duracion_total * 1000) repeticiones = needed_ms // len(bg_music) + 1 bg_music = bg_music * repeticiones bg_music = bg_music[:needed_ms].fade_out(1000) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_bg: bg_music.export(tmp_bg.name, format="mp3") return AudioFileClip(tmp_bg.name).volumex(0.15), tmp_bg.name def create_slide_transition(clip1, clip2, duration=TRANSITION_DURATION): # Obtener dimensiones de los clips width, height = clip1.size part1 = clip1.subclip(clip1.duration - duration) part2 = clip2.subclip(0, duration) transition = CompositeVideoClip([ part1.fx(vfx.fadeout, duration), part2.fx(vfx.fadein, duration).set_position( lambda t: ('center', height - (height * (t/duration))) ) ], size=(width, height)).set_duration(duration) return transition def liberar_memoria(objetos_cerrar=None): """Forzar liberación de memoria cerrando objetos y llamando al recolector de basura""" if objetos_cerrar: for obj in objetos_cerrar: if obj is not None: try: obj.close() except: pass # Forzar recolección de basura gc.collect() mostrar_uso_memoria() async def procesar_video(video_input, texto_tts, voz_seleccionada, progress=gr.Progress()): temp_files = [] intro, outro, video_original = None, None, None segmentos_temp = [] try: mostrar_uso_memoria() logging.info("Iniciando procesamiento") progress(0, desc="Validando video") # Obtener información del video original original_info = obtener_info_video(video_input) original_fps = original_info["fps"] logging.info(f"Video original - FPS: {original_fps}, Tamaño: {original_info['size']}") if not validar_video(video_input): progress(0.05, desc="Optimizando formato de video") video_input = convertir_video(video_input) temp_files.append(video_input) progress(0.1, desc="Preparando video") # Cargamos el video original pero respetamos su resolución video_original = VideoFileClip(video_input) duracion_video = video_original.duration # Guardamos las dimensiones originales original_size = video_original.size video_original.close() # Cerrar para liberar memoria # Información importante sobre el video original logging.info(f"Duración total del video: {duracion_video} segundos") logging.info(f"Resolución original: {original_size[0]}x{original_size[1]}") if duracion_video <= 0: raise ValueError("El video debe tener una duración mayor que cero.") progress(0.2, desc="Generando narración (TTS)") tts_audio, tts_path = await generar_tts(texto_tts, voz_seleccionada, duracion_video) temp_files.append(tts_path) progress(0.3, desc="Preparando música de fondo") bg_audio, bg_path = crear_musica_fondo(duracion_video) temp_files.append(bg_path) # Procesar por bloques para optimizar memoria num_chunks = int(duracion_video // PROCESSING_CHUNK) + (1 if duracion_video % PROCESSING_CHUNK > 0 else 0) logging.info(f"Procesando video en {num_chunks} bloques") for chunk_idx in range(num_chunks): chunk_start = chunk_idx * PROCESSING_CHUNK chunk_end = min((chunk_idx + 1) * PROCESSING_CHUNK, duracion_video) progress(0.35 + (0.45 * chunk_idx / num_chunks), desc=f"Procesando bloque {chunk_idx+1}/{num_chunks} ({chunk_start:.1f}s - {chunk_end:.1f}s)") # Cargar solo la porción del video que necesitamos chunk_video = VideoFileClip(video_input).subclip(chunk_start, chunk_end) # Aseguramos que el framerate se mantiene en todos los clips if original_fps and chunk_video.fps != original_fps: logging.info(f"Ajustando FPS del chunk {chunk_idx+1} a {original_fps}") chunk_video = chunk_video.set_fps(original_fps) # Extraer la porción de audio correspondiente a este bloque tts_chunk_end = min(chunk_end, tts_audio.duration) chunk_tts = None if chunk_start < tts_audio.duration: chunk_tts = tts_audio.subclip(chunk_start, tts_chunk_end) chunk_bg = bg_audio.subclip(chunk_start, chunk_end) # Crear la mezcla de audio para este bloque audio_chunks = [chunk_bg] if chunk_video.audio: audio_chunks.append(chunk_video.audio.volumex(0.5)) if chunk_tts: audio_chunks.append(chunk_tts.volumex(0.85)) chunk_audio_final = CompositeAudioClip(audio_chunks) chunk_video = chunk_video.set_audio(chunk_audio_final) # Procesar las transiciones dentro de este chunk si es necesario if chunk_end - chunk_start > SEGMENT_DURATION: segments_in_chunk = [] segments_count = int((chunk_end - chunk_start) // SEGMENT_DURATION) + \ (1 if (chunk_end - chunk_start) % SEGMENT_DURATION > 0 else 0) for i in range(segments_count): seg_start = i * SEGMENT_DURATION seg_end = min(seg_start + SEGMENT_DURATION, chunk_end - chunk_start) segment = chunk_video.subclip(seg_start, seg_end) if i == 0: segments_in_chunk.append(segment) else: prev_segment = segments_in_chunk[-1] transition = create_slide_transition(prev_segment, segment) prev_end = prev_segment.duration - TRANSITION_DURATION if prev_end > 0: segments_in_chunk[-1] = prev_segment.subclip(0, prev_end) segments_in_chunk.append(transition) segments_in_chunk.append(segment) chunk_processed = concatenate_videoclips(segments_in_chunk, method="compose") else: chunk_processed = chunk_video # Guardar este chunk procesado como archivo temporal with tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk{chunk_idx}.mp4") as chunk_file: chunk_path = chunk_file.name chunk_processed.write_videofile( chunk_path, codec="libx264", audio_codec="aac", preset="ultrafast", bitrate="1M", fps=original_fps, # Asegurar que se mantiene el FPS original ffmpeg_params=["-crf", "28"], verbose=False ) segmentos_temp.append(chunk_path) # Liberar memoria chunk_video.close() chunk_processed.close() liberar_memoria() # Liberar memoria antes de procesar intro/outro liberar_memoria([tts_audio, bg_audio]) tts_audio = bg_audio = None # Añadir intro y outro - conservar resolución original para consistencia progress(0.85, desc="Preparando intro y outro") # Procesamiento del intro intro = VideoFileClip(INTRO_VIDEO) # Asegurar que se utiliza el mismo FPS que el video original if original_fps and intro.fps != original_fps: intro = intro.set_fps(original_fps) with tempfile.NamedTemporaryFile(delete=False, suffix="_intro.mp4") as tmp_intro: intro.write_videofile( tmp_intro.name, codec="libx264", audio_codec="aac", preset="ultrafast", fps=original_fps, # Usar FPS original bitrate="1M", ffmpeg_params=["-crf", "28"], verbose=False ) segmentos_temp.insert(0, tmp_intro.name) # Intro al principio intro.close() # Procesamiento del outro outro = VideoFileClip(OUTRO_VIDEO) # Asegurar que se utiliza el mismo FPS que el video original if original_fps and outro.fps != original_fps: outro = outro.set_fps(original_fps) with tempfile.NamedTemporaryFile(delete=False, suffix="_outro.mp4") as tmp_outro: outro.write_videofile( tmp_outro.name, codec="libx264", audio_codec="aac", preset="ultrafast", fps=original_fps, # Usar FPS original bitrate="1M", ffmpeg_params=["-crf", "28"], verbose=False ) segmentos_temp.append(tmp_outro.name) # Outro al final outro.close() # Unir todos los segmentos con ffmpeg progress(0.9, desc="Generando video final") # Crear un archivo de metadatos para ffmpeg with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as concat_file: # Escribir archivo de lista para concatenación for segment in segmentos_temp: concat_file.write(f"file '{segment}'\n".encode()) concat_path = concat_file.name # Usar FFmpeg para concatenar todos los segmentos with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_final: output_path = tmp_final.name # Usar el parámetro -vsync para asegurar que se mantiene la sincronización de video cmd = f'ffmpeg -f concat -safe 0 -i "{concat_path}" -vsync cfr -c copy "{output_path}" -y' logging.info(f"Ejecutando comando FFmpeg: {cmd}") os.system(cmd) # Limpiar archivos temporales os.remove(concat_path) for segment in segmentos_temp: if os.path.exists(segment): os.remove(segment) # Verificar que el video final tiene la duración esperada try: final_info = obtener_info_video(output_path) logging.info(f"Video final - Duración: {final_info['duration']}s, FPS: {final_info['fps']}") except Exception as e: logging.error(f"No se pudo verificar el video final: {e}") eliminar_archivo_tiempo(output_path, 3600) # Eliminación después de 1 hora progress(1.0, desc="¡Video listo!") logging.info(f"Video final guardado: {output_path}") mostrar_uso_memoria() return output_path except Exception as e: logging.error(f"Fallo general: {str(e)}") raise finally: try: liberar_memoria([video_original, intro, outro]) for file in temp_files: try: if os.path.exists(file): os.remove(file) except Exception as e: logging.warning(f"Error limpiando {file}: {e}") for segment in segmentos_temp: try: if os.path.exists(segment): os.remove(segment) except Exception as e: logging.warning(f"Error limpiando segmento {segment}: {e}") except Exception as e: logging.warning(f"Error al cerrar recursos: {str(e)}") # Interfaz Gradio with gr.Blocks() as demo: gr.Markdown("# Editor de Video con IA") with gr.Tab("Principal"): video_input = gr.Video(label="Subir video") texto_tts = gr.Textbox( label="Texto para TTS (máx. 1000 caracteres)", lines=3, placeholder="Escribe aquí tu texto..." ) voz_seleccionada = gr.Dropdown( label="Voz", choices=[ "es-ES-AlvaroNeural", "es-MX-BeatrizNeural", "es-ES-ElviraNeural", "es-MX-JavierNeural", "es-AR-ElenaNeural", "es-AR-TomasNeural", "es-CL-CatalinaNeural", "es-CL-LorenzoNeural", "es-CO-SofiaNeural", "es-CO-GonzaloNeural", "es-PE-CamilaNeural", "es-PE-AlexNeural", "es-VE-MariaNeural", "es-VE-ManuelNeural", "es-US-AlonsoNeural", "es-US-PalomaNeural", "es-ES-AbrilNeural", "es-ES-DarioNeural", "es-ES-HelenaRUS", "es-ES-LauraNeural", "es-ES-PabloNeural", "es-ES-TriniNeural", "en-US-AriaNeural", "en-US-GuyNeural", "en-US-JennyNeural", "en-US-AmberNeural", "en-US-AnaNeural", "en-US-AshleyNeural", "en-US-BrandonNeural", "en-US-ChristopherNeural", "en-US-CoraNeural", "en-US-DavisNeural", "en-US-ElizabethNeural", "en-US-EricNeural", "en-US-GinaNeural", "en-US-JacobNeural", "en-US-JaneNeural", "en-US-JasonNeural", "en-US-MichelleNeural", "en-US-MonicaNeural", "en-US-SaraNeural", "en-US-SteffanNeural", "en-US-TonyNeural", "en-US-YaraNeural", "fr-FR-AlainNeural", "fr-FR-BrigitteNeural", "fr-FR-CelesteNeural", "fr-FR-ClaudeNeural", "fr-FR-CoralieNeural", "fr-FR-DeniseNeural", "fr-FR-EloiseNeural", "fr-FR-HenriNeural", "fr-FR-JacquelineNeural", "fr-FR-JeromeNeural", "fr-FR-JosephineNeural", "fr-FR-MauriceNeural", "fr-FR-YvesNeural", "fr-FR-YvetteNeural", "de-DE-AmalaNeural", "de-DE-BerndNeural", "de-DE-ChristophNeural", "de-DE-ConradNeural", "de-DE-ElkeNeural", "de-DE-GiselaNeural", "de-DE-KasperNeural", "de-DE-KatjaNeural", "de-DE-KillianNeural", "de-DE-KlarissaNeural", "de-DE-KlausNeural", "de-DE-LouisaNeural", "de-DE-MajaNeural", "de-DE-RalfNeural", "de-DE-TanjaNeural", "de-DE-ViktoriaNeural", "it-IT-BenignoNeural", "it-IT-CalimeroNeural", "it-IT-CataldoNeural", "it-IT-DiegoNeural", "it-IT-ElsaNeural", "it-IT-FabiolaNeural", "it-IT-GianniNeural", "it-IT-ImeldaNeural", "it-IT-IrmaNeural", "it-IT-IsabellaNeural", "it-IT-LisandroNeural", "it-IT-PalmiraNeural", "it-IT-PierinaNeural", "it-IT-RinaldoNeural", "ja-JP-AoiNeural", "ja-JP-DaichiNeural", "ja-JP-HarukaNeural", "ja-JP-KeitaNeural", "ja-JP-MayuNeural", "ja-JP-NanamiNeural", "ja-JP-NaokiNeural", "ja-JP-ShioriNeural" ], value="es-ES-AlvaroNeural" ) procesar_btn = gr.Button("Generar Video (Modo Optimizado)") video_output = gr.Video(label="Video Procesado") with gr.Accordion("Ejemplos de Uso", open=False): gr.Examples( examples=[[EJEMPLO_VIDEO, "¡Hola! Esto es una prueba. Suscríbete al canal."]], inputs=[video_input, texto_tts], label="Ejemplos" ) procesar_btn.click( procesar_video, inputs=[video_input, texto_tts, voz_seleccionada], outputs=video_output ) gr.Markdown(""" ### ℹ️ Notas importantes: - **Optimizaciones para Hugging Face Spaces:** - Procesamiento por bloques para videos largos - Máximo tamaño de archivo: 200MB - Mantiene la resolución original del video - Mantiene la velocidad original del video (FPS) - Texto TTS limitado a 1000 caracteres - Las transiciones ocurren cada 30 segundos - El video contiene intro y outro predefinidos - El archivo generado se elimina después de 1 hora - Para videos de alta calidad, considera usar este código localmente """) if __name__ == "__main__": # Instalar psutil si no está disponible try: import psutil except ImportError: os.system("pip install psutil") import psutil demo.queue().launch()