Spaces:
Runtime error
Runtime error
| import subprocess | |
| import os | |
| from typing import List, Optional, Union | |
| from PIL import Image | |
| import numpy as np | |
| from dataclasses import dataclass | |
| import re | |
| from modules.logger_util import get_logger | |
| from modules.constants import SOUND_FILE_EXT, VIDEO_FILE_EXT, IMAGE_FILE_EXT | |
| from modules.paths import TEMP_DIR, TEMP_OUT_DIR | |
| logger = get_logger() | |
| class VideoInfo: | |
| num_frames: Optional[int] = None | |
| frame_rate: Optional[int] = None | |
| duration: Optional[float] = None | |
| has_sound: Optional[bool] = None | |
| codec: Optional[str] = None | |
| def extract_frames( | |
| vid_input: str, | |
| output_temp_dir: str = TEMP_DIR, | |
| start_number: int = 0 | |
| ): | |
| """ | |
| Extract frames as jpg files and save them into output_temp_dir. This needs FFmpeg installed. | |
| """ | |
| os.makedirs(output_temp_dir, exist_ok=True) | |
| output_path = os.path.join(output_temp_dir, "%05d.jpg") | |
| command = [ | |
| 'ffmpeg', | |
| '-y', # Enable overwriting | |
| '-i', vid_input, | |
| '-qscale:v', '2', | |
| '-vf', f'scale=iw:ih', | |
| '-start_number', str(start_number), | |
| f'{output_path}' | |
| ] | |
| try: | |
| subprocess.run(command, check=True) | |
| except subprocess.CalledProcessError as e: | |
| logger.exception("Error occurred while extracting frames from the video") | |
| raise RuntimeError(f"An error occurred: {str(e)}") | |
| return get_frames_from_dir(output_temp_dir) | |
| def extract_sound( | |
| vid_input: str, | |
| output_temp_dir: str = TEMP_DIR, | |
| ): | |
| """ | |
| Extract audio from a video file and save it as a separate sound file. This needs FFmpeg installed. | |
| """ | |
| os.makedirs(output_temp_dir, exist_ok=True) | |
| output_path = os.path.join(output_temp_dir, "sound.mp3") | |
| command = [ | |
| 'ffmpeg', | |
| '-y', # Enable overwriting | |
| '-i', vid_input, | |
| '-vn', | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(command, check=True) | |
| except subprocess.CalledProcessError as e: | |
| logger.exception("Error occurred while extracting sound from the video") | |
| return output_path | |
| def get_video_info(vid_input: str) -> VideoInfo: | |
| """ | |
| Extract video information using ffmpeg. | |
| """ | |
| command = [ | |
| 'ffmpeg', | |
| '-i', vid_input, | |
| '-map', '0:v:0', | |
| '-c', 'copy', | |
| '-f', 'null', | |
| '-' | |
| ] | |
| try: | |
| result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
| encoding='utf-8', errors='replace', check=True) | |
| output = result.stderr | |
| num_frames = None | |
| frame_rate = None | |
| duration = None | |
| has_sound = False | |
| codec = None | |
| for line in output.splitlines(): | |
| if 'Stream #0:0' in line and 'Video:' in line: | |
| fps_match = re.search(r'(\d+(?:\.\d+)?) fps', line) | |
| if fps_match: | |
| frame_rate = float(fps_match.group(1)) | |
| codec_match = re.search(r'Video: (\w+)', line) | |
| if codec_match: | |
| codec = codec_match.group(1) | |
| elif 'Duration:' in line: | |
| duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2}\.\d{2})', line) | |
| if duration_match: | |
| h, m, s = map(float, duration_match.groups()) | |
| duration = h * 3600 + m * 60 + s | |
| elif 'Stream' in line and 'Audio:' in line: | |
| has_sound = True | |
| if frame_rate and duration: | |
| num_frames = int(frame_rate * duration) | |
| return VideoInfo( | |
| num_frames=num_frames, | |
| frame_rate=frame_rate, | |
| duration=duration, | |
| has_sound=has_sound, | |
| codec=codec | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| logger.exception("Error occurred while getting info from the video") | |
| return VideoInfo() | |
| def create_video_from_frames( | |
| frames_dir: str, | |
| frame_rate: Optional[int] = None, | |
| sound_path: Optional[str] = None, | |
| output_dir: Optional[str] = None, | |
| ): | |
| """ | |
| Create a video from frames and save it to the output_path. This needs FFmpeg installed. | |
| """ | |
| if not os.path.exists(frames_dir): | |
| raise "frames_dir does not exist" | |
| if output_dir is None: | |
| output_dir = TEMP_OUT_DIR | |
| os.makedirs(output_dir, exist_ok=True) | |
| num_files = len(os.listdir(output_dir)) | |
| filename = f"{num_files:05d}.mp4" | |
| output_path = os.path.join(output_dir, filename) | |
| if sound_path is None: | |
| temp_sound = os.path.join(TEMP_DIR, "sound.mp3") | |
| if os.path.exists(temp_sound): | |
| sound_path = temp_sound | |
| if frame_rate is None: | |
| frame_rate = 25 # Default frame rate for ffmpeg | |
| command = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-framerate', str(frame_rate), | |
| '-i', os.path.join(frames_dir, "%05d.jpg"), | |
| '-c:v', 'libx264', | |
| '-pix_fmt', 'yuv420p', | |
| output_path | |
| ] | |
| if sound_path is not None: | |
| command += [ | |
| '-i', sound_path, | |
| '-c:a', 'aac', | |
| '-strict', 'experimental', | |
| '-b:a', '192k', | |
| '-shortest' | |
| ] | |
| try: | |
| subprocess.run(command, check=True) | |
| except subprocess.CalledProcessError as e: | |
| logger.exception("Error occurred while creating video from frames") | |
| return output_path | |
| def get_frames_from_dir(vid_dir: str, | |
| available_extensions: Optional[Union[List, str]] = None, | |
| as_numpy: bool = False) -> List: | |
| """Get image file paths list from the dir""" | |
| if available_extensions is None: | |
| available_extensions = [".jpg", ".jpeg", ".JPG", ".JPEG"] | |
| if isinstance(available_extensions, str): | |
| available_extensions = [available_extensions] | |
| frame_names = [ | |
| p for p in os.listdir(vid_dir) | |
| if os.path.splitext(p)[-1] in available_extensions | |
| ] | |
| if not frame_names: | |
| return [] | |
| frame_names.sort(key=lambda x: int(os.path.splitext(x)[0])) | |
| frames = [os.path.join(vid_dir, name) for name in frame_names] | |
| if as_numpy: | |
| frames = [np.array(Image.open(frame)) for frame in frames] | |
| return frames | |
| def clean_temp_dir(temp_dir: Optional[str] = None): | |
| """Removes media files from the directory.""" | |
| if temp_dir is None: | |
| temp_dir = TEMP_DIR | |
| temp_out_dir = TEMP_OUT_DIR | |
| else: | |
| temp_out_dir = os.path.join(temp_dir, "out") | |
| clean_files_with_extension(temp_dir, SOUND_FILE_EXT) | |
| clean_files_with_extension(temp_dir, IMAGE_FILE_EXT) | |
| clean_files_with_extension(temp_out_dir, IMAGE_FILE_EXT) | |
| def clean_files_with_extension(dir_path: str, extensions: List): | |
| """Remove files with the given extensions from the directory.""" | |
| for filename in os.listdir(dir_path): | |
| if filename.lower().endswith(tuple(extensions)): | |
| file_path = os.path.join(dir_path, filename) | |
| try: | |
| os.remove(file_path) | |
| except Exception as e: | |
| logger.exception("Error while removing image files") | |