sam2-playground / modules /video_utils.py
jhj0517
Add docstring
a2c5114
import subprocess
import os
from typing import List, Optional, Union
from PIL import Image
import numpy as np
from dataclasses import dataclass
import re
from modules.logger_util import get_logger
from modules.constants import SOUND_FILE_EXT, VIDEO_FILE_EXT, IMAGE_FILE_EXT
from modules.paths import TEMP_DIR, TEMP_OUT_DIR
logger = get_logger()
@dataclass
class VideoInfo:
num_frames: Optional[int] = None
frame_rate: Optional[int] = None
duration: Optional[float] = None
has_sound: Optional[bool] = None
codec: Optional[str] = None
def extract_frames(
vid_input: str,
output_temp_dir: str = TEMP_DIR,
start_number: int = 0
):
"""
Extract frames as jpg files and save them into output_temp_dir. This needs FFmpeg installed.
"""
os.makedirs(output_temp_dir, exist_ok=True)
output_path = os.path.join(output_temp_dir, "%05d.jpg")
command = [
'ffmpeg',
'-y', # Enable overwriting
'-i', vid_input,
'-qscale:v', '2',
'-vf', f'scale=iw:ih',
'-start_number', str(start_number),
f'{output_path}'
]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
logger.exception("Error occurred while extracting frames from the video")
raise RuntimeError(f"An error occurred: {str(e)}")
return get_frames_from_dir(output_temp_dir)
def extract_sound(
vid_input: str,
output_temp_dir: str = TEMP_DIR,
):
"""
Extract audio from a video file and save it as a separate sound file. This needs FFmpeg installed.
"""
os.makedirs(output_temp_dir, exist_ok=True)
output_path = os.path.join(output_temp_dir, "sound.mp3")
command = [
'ffmpeg',
'-y', # Enable overwriting
'-i', vid_input,
'-vn',
output_path
]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
logger.exception("Error occurred while extracting sound from the video")
return output_path
def get_video_info(vid_input: str) -> VideoInfo:
"""
Extract video information using ffmpeg.
"""
command = [
'ffmpeg',
'-i', vid_input,
'-map', '0:v:0',
'-c', 'copy',
'-f', 'null',
'-'
]
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding='utf-8', errors='replace', check=True)
output = result.stderr
num_frames = None
frame_rate = None
duration = None
has_sound = False
codec = None
for line in output.splitlines():
if 'Stream #0:0' in line and 'Video:' in line:
fps_match = re.search(r'(\d+(?:\.\d+)?) fps', line)
if fps_match:
frame_rate = float(fps_match.group(1))
codec_match = re.search(r'Video: (\w+)', line)
if codec_match:
codec = codec_match.group(1)
elif 'Duration:' in line:
duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2}\.\d{2})', line)
if duration_match:
h, m, s = map(float, duration_match.groups())
duration = h * 3600 + m * 60 + s
elif 'Stream' in line and 'Audio:' in line:
has_sound = True
if frame_rate and duration:
num_frames = int(frame_rate * duration)
return VideoInfo(
num_frames=num_frames,
frame_rate=frame_rate,
duration=duration,
has_sound=has_sound,
codec=codec
)
except subprocess.CalledProcessError as e:
logger.exception("Error occurred while getting info from the video")
return VideoInfo()
def create_video_from_frames(
frames_dir: str,
frame_rate: Optional[int] = None,
sound_path: Optional[str] = None,
output_dir: Optional[str] = None,
):
"""
Create a video from frames and save it to the output_path. This needs FFmpeg installed.
"""
if not os.path.exists(frames_dir):
raise "frames_dir does not exist"
if output_dir is None:
output_dir = TEMP_OUT_DIR
os.makedirs(output_dir, exist_ok=True)
num_files = len(os.listdir(output_dir))
filename = f"{num_files:05d}.mp4"
output_path = os.path.join(output_dir, filename)
if sound_path is None:
temp_sound = os.path.join(TEMP_DIR, "sound.mp3")
if os.path.exists(temp_sound):
sound_path = temp_sound
if frame_rate is None:
frame_rate = 25 # Default frame rate for ffmpeg
command = [
'ffmpeg',
'-y',
'-framerate', str(frame_rate),
'-i', os.path.join(frames_dir, "%05d.jpg"),
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
output_path
]
if sound_path is not None:
command += [
'-i', sound_path,
'-c:a', 'aac',
'-strict', 'experimental',
'-b:a', '192k',
'-shortest'
]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
logger.exception("Error occurred while creating video from frames")
return output_path
def get_frames_from_dir(vid_dir: str,
available_extensions: Optional[Union[List, str]] = None,
as_numpy: bool = False) -> List:
"""Get image file paths list from the dir"""
if available_extensions is None:
available_extensions = [".jpg", ".jpeg", ".JPG", ".JPEG"]
if isinstance(available_extensions, str):
available_extensions = [available_extensions]
frame_names = [
p for p in os.listdir(vid_dir)
if os.path.splitext(p)[-1] in available_extensions
]
if not frame_names:
return []
frame_names.sort(key=lambda x: int(os.path.splitext(x)[0]))
frames = [os.path.join(vid_dir, name) for name in frame_names]
if as_numpy:
frames = [np.array(Image.open(frame)) for frame in frames]
return frames
def clean_temp_dir(temp_dir: Optional[str] = None):
"""Removes media files from the directory."""
if temp_dir is None:
temp_dir = TEMP_DIR
temp_out_dir = TEMP_OUT_DIR
else:
temp_out_dir = os.path.join(temp_dir, "out")
clean_files_with_extension(temp_dir, SOUND_FILE_EXT)
clean_files_with_extension(temp_dir, IMAGE_FILE_EXT)
clean_files_with_extension(temp_out_dir, IMAGE_FILE_EXT)
def clean_files_with_extension(dir_path: str, extensions: List):
"""Remove files with the given extensions from the directory."""
for filename in os.listdir(dir_path):
if filename.lower().endswith(tuple(extensions)):
file_path = os.path.join(dir_path, filename)
try:
os.remove(file_path)
except Exception as e:
logger.exception("Error while removing image files")