|
import tempfile |
|
import logging |
|
import os |
|
import asyncio |
|
import gc |
|
import psutil |
|
from moviepy.editor import * |
|
import edge_tts |
|
import gradio as gr |
|
from pydub import AudioSegment |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
|
|
INTRO_VIDEO = "introvideo.mp4" |
|
OUTRO_VIDEO = "outrovideo.mp4" |
|
MUSIC_BG = "musicafondo.mp3" |
|
EJEMPLO_VIDEO = "ejemplo.mp4" |
|
|
|
|
|
MAX_VIDEO_SIZE = 200 * 1024 * 1024 |
|
|
|
|
|
SEGMENT_DURATION = 30 |
|
TRANSITION_DURATION = 1.5 |
|
PROCESSING_CHUNK = 120 |
|
|
|
|
|
for file in [INTRO_VIDEO, OUTRO_VIDEO, MUSIC_BG, EJEMPLO_VIDEO]: |
|
if not os.path.exists(file): |
|
logging.error(f"Falta archivo necesario: {file}") |
|
raise FileNotFoundError(f"Falta: {file}") |
|
|
|
def mostrar_uso_memoria(): |
|
proceso = psutil.Process(os.getpid()) |
|
memoria_uso = proceso.memory_info().rss / 1024 / 1024 |
|
logging.info(f"Uso de memoria: {memoria_uso:.2f} MB") |
|
|
|
def eliminar_archivo_tiempo(ruta, delay=3600): |
|
def eliminar(): |
|
try: |
|
if os.path.exists(ruta): |
|
os.remove(ruta) |
|
logging.info(f"Archivo eliminado: {ruta}") |
|
except Exception as e: |
|
logging.error(f"Error al eliminar {ruta}: {e}") |
|
from threading import Timer |
|
Timer(delay, eliminar).start() |
|
|
|
def validar_video(video_path): |
|
try: |
|
|
|
file_size = os.path.getsize(video_path) |
|
if file_size > MAX_VIDEO_SIZE: |
|
logging.warning(f"El video excede el tama帽o m谩ximo: {file_size/1024/1024:.2f}MB > {MAX_VIDEO_SIZE/1024/1024}MB") |
|
return False |
|
|
|
|
|
clip = VideoFileClip(video_path) |
|
duracion = clip.duration |
|
fps = clip.fps |
|
logging.info(f"Video validado: duraci贸n={duracion}s, fps={fps}") |
|
clip.close() |
|
|
|
return True |
|
except Exception as e: |
|
logging.error(f"El video no es v谩lido: {e}") |
|
return False |
|
|
|
def obtener_info_video(video_path): |
|
"""Obtiene informaci贸n b谩sica del video como FPS, duraci贸n y tama帽o""" |
|
try: |
|
clip = VideoFileClip(video_path) |
|
info = { |
|
"fps": clip.fps, |
|
"duration": clip.duration, |
|
"size": clip.size |
|
} |
|
clip.close() |
|
return info |
|
except Exception as e: |
|
logging.error(f"Error al obtener info del video: {e}") |
|
return {"fps": 30, "duration": 0, "size": (640, 360)} |
|
|
|
def convertir_video(video_path): |
|
try: |
|
|
|
info = obtener_info_video(video_path) |
|
fps = info["fps"] if info["fps"] else 30 |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_converted: |
|
output_path = tmp_converted.name |
|
|
|
|
|
os.system(f'ffmpeg -i "{video_path}" -c:v libx264 -crf 28 -preset ultrafast -r {fps} -c:a aac -b:a 96k "{output_path}" -y') |
|
|
|
|
|
if not validar_video(output_path): |
|
|
|
os.system(f'ffmpeg -i "{output_path}" -c:v libx264 -crf 32 -preset ultrafast -r {fps} -c:a aac -b:a 64k "{output_path}.tmp" -y') |
|
os.remove(output_path) |
|
os.rename(f"{output_path}.tmp", output_path) |
|
|
|
return output_path |
|
except Exception as e: |
|
logging.error(f"Error al convertir el video: {e}") |
|
raise |
|
|
|
async def generar_tts(texto, voz, duracion_total): |
|
try: |
|
if not texto.strip(): |
|
raise ValueError("El texto para TTS no puede estar vac铆o.") |
|
|
|
if len(texto) > 1000: |
|
texto = texto[:1000] |
|
logging.info("Texto para TTS truncado a 1000 caracteres para optimizar rendimiento") |
|
|
|
logging.info(f"Generando TTS con voz: {voz}") |
|
communicate = edge_tts.Communicate(texto, voz) |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_tts: |
|
await communicate.save(tmp_tts.name) |
|
tts_audio = AudioFileClip(tmp_tts.name) |
|
if tts_audio.duration > duracion_total: |
|
tts_audio = tts_audio.subclip(0, duracion_total) |
|
return tts_audio, tmp_tts.name |
|
except Exception as e: |
|
logging.error(f"Fallo en TTS: {str(e)}") |
|
raise |
|
|
|
def crear_musica_fondo(duracion_total): |
|
bg_music = AudioSegment.from_mp3(MUSIC_BG) |
|
needed_ms = int(duracion_total * 1000) |
|
repeticiones = needed_ms // len(bg_music) + 1 |
|
bg_music = bg_music * repeticiones |
|
bg_music = bg_music[:needed_ms].fade_out(1000) |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_bg: |
|
bg_music.export(tmp_bg.name, format="mp3") |
|
return AudioFileClip(tmp_bg.name).volumex(0.15), tmp_bg.name |
|
|
|
def create_slide_transition(clip1, clip2, duration=TRANSITION_DURATION): |
|
|
|
width, height = clip1.size |
|
|
|
part1 = clip1.subclip(clip1.duration - duration) |
|
part2 = clip2.subclip(0, duration) |
|
transition = CompositeVideoClip([ |
|
part1.fx(vfx.fadeout, duration), |
|
part2.fx(vfx.fadein, duration).set_position( |
|
lambda t: ('center', height - (height * (t/duration))) |
|
) |
|
], size=(width, height)).set_duration(duration) |
|
return transition |
|
|
|
def liberar_memoria(objetos_cerrar=None): |
|
"""Forzar liberaci贸n de memoria cerrando objetos y llamando al recolector de basura""" |
|
if objetos_cerrar: |
|
for obj in objetos_cerrar: |
|
if obj is not None: |
|
try: |
|
obj.close() |
|
except: |
|
pass |
|
|
|
|
|
gc.collect() |
|
mostrar_uso_memoria() |
|
|
|
async def procesar_video(video_input, texto_tts, voz_seleccionada, progress=gr.Progress()): |
|
temp_files = [] |
|
intro, outro, video_original = None, None, None |
|
segmentos_temp = [] |
|
|
|
try: |
|
mostrar_uso_memoria() |
|
logging.info("Iniciando procesamiento") |
|
progress(0, desc="Validando video") |
|
|
|
|
|
original_info = obtener_info_video(video_input) |
|
original_fps = original_info["fps"] |
|
logging.info(f"Video original - FPS: {original_fps}, Tama帽o: {original_info['size']}") |
|
|
|
if not validar_video(video_input): |
|
progress(0.05, desc="Optimizando formato de video") |
|
video_input = convertir_video(video_input) |
|
temp_files.append(video_input) |
|
|
|
progress(0.1, desc="Preparando video") |
|
|
|
video_original = VideoFileClip(video_input) |
|
duracion_video = video_original.duration |
|
|
|
original_size = video_original.size |
|
video_original.close() |
|
|
|
|
|
logging.info(f"Duraci贸n total del video: {duracion_video} segundos") |
|
logging.info(f"Resoluci贸n original: {original_size[0]}x{original_size[1]}") |
|
|
|
if duracion_video <= 0: |
|
raise ValueError("El video debe tener una duraci贸n mayor que cero.") |
|
|
|
progress(0.2, desc="Generando narraci贸n (TTS)") |
|
tts_audio, tts_path = await generar_tts(texto_tts, voz_seleccionada, duracion_video) |
|
temp_files.append(tts_path) |
|
|
|
progress(0.3, desc="Preparando m煤sica de fondo") |
|
bg_audio, bg_path = crear_musica_fondo(duracion_video) |
|
temp_files.append(bg_path) |
|
|
|
|
|
num_chunks = int(duracion_video // PROCESSING_CHUNK) + (1 if duracion_video % PROCESSING_CHUNK > 0 else 0) |
|
logging.info(f"Procesando video en {num_chunks} bloques") |
|
|
|
for chunk_idx in range(num_chunks): |
|
chunk_start = chunk_idx * PROCESSING_CHUNK |
|
chunk_end = min((chunk_idx + 1) * PROCESSING_CHUNK, duracion_video) |
|
|
|
progress(0.35 + (0.45 * chunk_idx / num_chunks), |
|
desc=f"Procesando bloque {chunk_idx+1}/{num_chunks} ({chunk_start:.1f}s - {chunk_end:.1f}s)") |
|
|
|
|
|
chunk_video = VideoFileClip(video_input).subclip(chunk_start, chunk_end) |
|
|
|
|
|
if original_fps and chunk_video.fps != original_fps: |
|
logging.info(f"Ajustando FPS del chunk {chunk_idx+1} a {original_fps}") |
|
chunk_video = chunk_video.set_fps(original_fps) |
|
|
|
|
|
tts_chunk_end = min(chunk_end, tts_audio.duration) |
|
chunk_tts = None |
|
if chunk_start < tts_audio.duration: |
|
chunk_tts = tts_audio.subclip(chunk_start, tts_chunk_end) |
|
|
|
chunk_bg = bg_audio.subclip(chunk_start, chunk_end) |
|
|
|
|
|
audio_chunks = [chunk_bg] |
|
if chunk_video.audio: |
|
audio_chunks.append(chunk_video.audio.volumex(0.5)) |
|
if chunk_tts: |
|
audio_chunks.append(chunk_tts.volumex(0.85)) |
|
|
|
chunk_audio_final = CompositeAudioClip(audio_chunks) |
|
chunk_video = chunk_video.set_audio(chunk_audio_final) |
|
|
|
|
|
if chunk_end - chunk_start > SEGMENT_DURATION: |
|
segments_in_chunk = [] |
|
segments_count = int((chunk_end - chunk_start) // SEGMENT_DURATION) + \ |
|
(1 if (chunk_end - chunk_start) % SEGMENT_DURATION > 0 else 0) |
|
|
|
for i in range(segments_count): |
|
seg_start = i * SEGMENT_DURATION |
|
seg_end = min(seg_start + SEGMENT_DURATION, chunk_end - chunk_start) |
|
segment = chunk_video.subclip(seg_start, seg_end) |
|
|
|
if i == 0: |
|
segments_in_chunk.append(segment) |
|
else: |
|
prev_segment = segments_in_chunk[-1] |
|
transition = create_slide_transition(prev_segment, segment) |
|
|
|
prev_end = prev_segment.duration - TRANSITION_DURATION |
|
if prev_end > 0: |
|
segments_in_chunk[-1] = prev_segment.subclip(0, prev_end) |
|
|
|
segments_in_chunk.append(transition) |
|
segments_in_chunk.append(segment) |
|
|
|
chunk_processed = concatenate_videoclips(segments_in_chunk, method="compose") |
|
else: |
|
chunk_processed = chunk_video |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk{chunk_idx}.mp4") as chunk_file: |
|
chunk_path = chunk_file.name |
|
chunk_processed.write_videofile( |
|
chunk_path, |
|
codec="libx264", |
|
audio_codec="aac", |
|
preset="ultrafast", |
|
bitrate="1M", |
|
fps=original_fps, |
|
ffmpeg_params=["-crf", "28"], |
|
verbose=False |
|
) |
|
segmentos_temp.append(chunk_path) |
|
|
|
|
|
chunk_video.close() |
|
chunk_processed.close() |
|
liberar_memoria() |
|
|
|
|
|
liberar_memoria([tts_audio, bg_audio]) |
|
tts_audio = bg_audio = None |
|
|
|
|
|
progress(0.85, desc="Preparando intro y outro") |
|
|
|
|
|
intro = VideoFileClip(INTRO_VIDEO) |
|
|
|
if original_fps and intro.fps != original_fps: |
|
intro = intro.set_fps(original_fps) |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix="_intro.mp4") as tmp_intro: |
|
intro.write_videofile( |
|
tmp_intro.name, |
|
codec="libx264", |
|
audio_codec="aac", |
|
preset="ultrafast", |
|
fps=original_fps, |
|
bitrate="1M", |
|
ffmpeg_params=["-crf", "28"], |
|
verbose=False |
|
) |
|
segmentos_temp.insert(0, tmp_intro.name) |
|
intro.close() |
|
|
|
|
|
outro = VideoFileClip(OUTRO_VIDEO) |
|
|
|
if original_fps and outro.fps != original_fps: |
|
outro = outro.set_fps(original_fps) |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix="_outro.mp4") as tmp_outro: |
|
outro.write_videofile( |
|
tmp_outro.name, |
|
codec="libx264", |
|
audio_codec="aac", |
|
preset="ultrafast", |
|
fps=original_fps, |
|
bitrate="1M", |
|
ffmpeg_params=["-crf", "28"], |
|
verbose=False |
|
) |
|
segmentos_temp.append(tmp_outro.name) |
|
outro.close() |
|
|
|
|
|
progress(0.9, desc="Generando video final") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as concat_file: |
|
|
|
for segment in segmentos_temp: |
|
concat_file.write(f"file '{segment}'\n".encode()) |
|
concat_path = concat_file.name |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_final: |
|
output_path = tmp_final.name |
|
|
|
cmd = f'ffmpeg -f concat -safe 0 -i "{concat_path}" -vsync cfr -c copy "{output_path}" -y' |
|
logging.info(f"Ejecutando comando FFmpeg: {cmd}") |
|
os.system(cmd) |
|
|
|
|
|
os.remove(concat_path) |
|
for segment in segmentos_temp: |
|
if os.path.exists(segment): |
|
os.remove(segment) |
|
|
|
|
|
try: |
|
final_info = obtener_info_video(output_path) |
|
logging.info(f"Video final - Duraci贸n: {final_info['duration']}s, FPS: {final_info['fps']}") |
|
except Exception as e: |
|
logging.error(f"No se pudo verificar el video final: {e}") |
|
|
|
eliminar_archivo_tiempo(output_path, 3600) |
|
progress(1.0, desc="隆Video listo!") |
|
logging.info(f"Video final guardado: {output_path}") |
|
mostrar_uso_memoria() |
|
return output_path |
|
except Exception as e: |
|
logging.error(f"Fallo general: {str(e)}") |
|
raise |
|
finally: |
|
try: |
|
liberar_memoria([video_original, intro, outro]) |
|
for file in temp_files: |
|
try: |
|
if os.path.exists(file): |
|
os.remove(file) |
|
except Exception as e: |
|
logging.warning(f"Error limpiando {file}: {e}") |
|
for segment in segmentos_temp: |
|
try: |
|
if os.path.exists(segment): |
|
os.remove(segment) |
|
except Exception as e: |
|
logging.warning(f"Error limpiando segmento {segment}: {e}") |
|
except Exception as e: |
|
logging.warning(f"Error al cerrar recursos: {str(e)}") |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Editor de Video con IA") |
|
with gr.Tab("Principal"): |
|
video_input = gr.Video(label="Subir video") |
|
texto_tts = gr.Textbox( |
|
label="Texto para TTS (m谩x. 1000 caracteres)", |
|
lines=3, |
|
placeholder="Escribe aqu铆 tu texto..." |
|
) |
|
voz_seleccionada = gr.Dropdown( |
|
label="Voz", |
|
choices=[ |
|
"es-ES-AlvaroNeural", "es-MX-BeatrizNeural", |
|
"es-ES-ElviraNeural", "es-MX-JavierNeural", |
|
"es-AR-ElenaNeural", "es-AR-TomasNeural", |
|
"es-CL-CatalinaNeural", "es-CL-LorenzoNeural", |
|
"es-CO-SofiaNeural", "es-CO-GonzaloNeural", |
|
"es-PE-CamilaNeural", "es-PE-AlexNeural", |
|
"es-VE-MariaNeural", "es-VE-ManuelNeural", |
|
"es-US-AlonsoNeural", "es-US-PalomaNeural", |
|
"es-ES-AbrilNeural", "es-ES-DarioNeural", |
|
"es-ES-HelenaRUS", "es-ES-LauraNeural", |
|
"es-ES-PabloNeural", "es-ES-TriniNeural", |
|
"en-US-AriaNeural", "en-US-GuyNeural", |
|
"en-US-JennyNeural", "en-US-AmberNeural", |
|
"en-US-AnaNeural", "en-US-AshleyNeural", |
|
"en-US-BrandonNeural", "en-US-ChristopherNeural", |
|
"en-US-CoraNeural", "en-US-DavisNeural", |
|
"en-US-ElizabethNeural", "en-US-EricNeural", |
|
"en-US-GinaNeural", "en-US-JacobNeural", |
|
"en-US-JaneNeural", "en-US-JasonNeural", |
|
"en-US-MichelleNeural", "en-US-MonicaNeural", |
|
"en-US-SaraNeural", "en-US-SteffanNeural", |
|
"en-US-TonyNeural", "en-US-YaraNeural", |
|
"fr-FR-AlainNeural", "fr-FR-BrigitteNeural", |
|
"fr-FR-CelesteNeural", "fr-FR-ClaudeNeural", |
|
"fr-FR-CoralieNeural", "fr-FR-DeniseNeural", |
|
"fr-FR-EloiseNeural", "fr-FR-HenriNeural", |
|
"fr-FR-JacquelineNeural", "fr-FR-JeromeNeural", |
|
"fr-FR-JosephineNeural", "fr-FR-MauriceNeural", |
|
"fr-FR-YvesNeural", "fr-FR-YvetteNeural", |
|
"de-DE-AmalaNeural", "de-DE-BerndNeural", |
|
"de-DE-ChristophNeural", "de-DE-ConradNeural", |
|
"de-DE-ElkeNeural", "de-DE-GiselaNeural", |
|
"de-DE-KasperNeural", "de-DE-KatjaNeural", |
|
"de-DE-KillianNeural", "de-DE-KlarissaNeural", |
|
"de-DE-KlausNeural", "de-DE-LouisaNeural", |
|
"de-DE-MajaNeural", "de-DE-RalfNeural", |
|
"de-DE-TanjaNeural", "de-DE-ViktoriaNeural", |
|
"it-IT-BenignoNeural", "it-IT-CalimeroNeural", |
|
"it-IT-CataldoNeural", "it-IT-DiegoNeural", |
|
"it-IT-ElsaNeural", "it-IT-FabiolaNeural", |
|
"it-IT-GianniNeural", "it-IT-ImeldaNeural", |
|
"it-IT-IrmaNeural", "it-IT-IsabellaNeural", |
|
"it-IT-LisandroNeural", "it-IT-PalmiraNeural", |
|
"it-IT-PierinaNeural", "it-IT-RinaldoNeural", |
|
"ja-JP-AoiNeural", "ja-JP-DaichiNeural", |
|
"ja-JP-HarukaNeural", "ja-JP-KeitaNeural", |
|
"ja-JP-MayuNeural", "ja-JP-NanamiNeural", |
|
"ja-JP-NaokiNeural", "ja-JP-ShioriNeural" |
|
], |
|
value="es-ES-AlvaroNeural" |
|
) |
|
procesar_btn = gr.Button("Generar Video (Modo Optimizado)") |
|
video_output = gr.Video(label="Video Procesado") |
|
with gr.Accordion("Ejemplos de Uso", open=False): |
|
gr.Examples( |
|
examples=[[EJEMPLO_VIDEO, "隆Hola! Esto es una prueba. Suscr铆bete al canal."]], |
|
inputs=[video_input, texto_tts], |
|
label="Ejemplos" |
|
) |
|
procesar_btn.click( |
|
procesar_video, |
|
inputs=[video_input, texto_tts, voz_seleccionada], |
|
outputs=video_output |
|
) |
|
|
|
gr.Markdown(""" |
|
### 鈩癸笍 Notas importantes: |
|
- **Optimizaciones para Hugging Face Spaces:** |
|
- Procesamiento por bloques para videos largos |
|
- M谩ximo tama帽o de archivo: 200MB |
|
- Mantiene la resoluci贸n original del video |
|
- Mantiene la velocidad original del video (FPS) |
|
- Texto TTS limitado a 1000 caracteres |
|
- Las transiciones ocurren cada 30 segundos |
|
- El video contiene intro y outro predefinidos |
|
- El archivo generado se elimina despu茅s de 1 hora |
|
- Para videos de alta calidad, considera usar este c贸digo localmente |
|
""") |
|
|
|
if __name__ == "__main__": |
|
|
|
try: |
|
import psutil |
|
except ImportError: |
|
os.system("pip install psutil") |
|
import psutil |
|
|
|
demo.queue().launch() |