import os | |
import cv2 | |
import numpy as np | |
import torch | |
import random | |
from os import path as osp | |
from torchvision.utils import make_grid | |
import sys | |
from pathlib import Path | |
import six | |
from collections import OrderedDict | |
import math | |
import glob | |
import av | |
import io | |
CAP_PROP_POS_FRAMES, VideoWriter_fourcc) | |
if sys.version_info <= (3, 3): | |
FileNotFoundError = IOError | |
else: | |
FileNotFoundError = FileNotFoundError | |
def is_str(x): | |
"""Whether the input is an string instance.""" | |
return isinstance(x, six.string_types) | |
def is_filepath(x): | |
return is_str(x) or isinstance(x, Path) | |
def fopen(filepath, *args, **kwargs): | |
if is_str(filepath): | |
return open(filepath, *args, **kwargs) | |
elif isinstance(filepath, Path): | |
return*args, **kwargs) | |
raise ValueError('`filepath` should be a string or a Path') | |
def check_file_exist(filename, msg_tmpl='file "{}" does not exist'): | |
if not osp.isfile(filename): | |
raise FileNotFoundError(msg_tmpl.format(filename)) | |
def mkdir_or_exist(dir_name, mode=0o777): | |
if dir_name == '': | |
return | |
dir_name = osp.expanduser(dir_name) | |
os.makedirs(dir_name, mode=mode, exist_ok=True) | |
def symlink(src, dst, overwrite=True, **kwargs): | |
if os.path.lexists(dst) and overwrite: | |
os.remove(dst) | |
os.symlink(src, dst, **kwargs) | |
def scandir(dir_path, suffix=None, recursive=False, case_sensitive=True): | |
"""Scan a directory to find the interested files. | |
Args: | |
dir_path (str | :obj:`Path`): Path of the directory. | |
suffix (str | tuple(str), optional): File suffix that we are | |
interested in. Default: None. | |
recursive (bool, optional): If set to True, recursively scan the | |
directory. Default: False. | |
case_sensitive (bool, optional) : If set to False, ignore the case of | |
suffix. Default: True. | |
Returns: | |
A generator for all the interested files with relative paths. | |
""" | |
if isinstance(dir_path, (str, Path)): | |
dir_path = str(dir_path) | |
else: | |
raise TypeError('"dir_path" must be a string or Path object') | |
if (suffix is not None) and not isinstance(suffix, (str, tuple)): | |
raise TypeError('"suffix" must be a string or tuple of strings') | |
if suffix is not None and not case_sensitive: | |
suffix = suffix.lower() if isinstance(suffix, str) else tuple( | |
item.lower() for item in suffix) | |
root = dir_path | |
def _scandir(dir_path, suffix, recursive, case_sensitive): | |
for entry in os.scandir(dir_path): | |
if not'.') and entry.is_file(): | |
rel_path = osp.relpath(entry.path, root) | |
_rel_path = rel_path if case_sensitive else rel_path.lower() | |
if suffix is None or _rel_path.endswith(suffix): | |
yield rel_path | |
elif recursive and os.path.isdir(entry.path): | |
# scan recursively if entry.path is a directory | |
yield from _scandir(entry.path, suffix, recursive, | |
case_sensitive) | |
return _scandir(dir_path, suffix, recursive, case_sensitive) | |
class Cache: | |
def __init__(self, capacity): | |
self._cache = OrderedDict() | |
self._capacity = int(capacity) | |
if capacity <= 0: | |
raise ValueError('capacity must be a positive integer') | |
def capacity(self): | |
return self._capacity | |
def size(self): | |
return len(self._cache) | |
def put(self, key, val): | |
if key in self._cache: | |
return | |
if len(self._cache) >= self.capacity: | |
self._cache.popitem(last=False) | |
self._cache[key] = val | |
def get(self, key, default=None): | |
val = self._cache[key] if key in self._cache else default | |
return val | |
class VideoReader: | |
"""Video class with similar usage to a list object. | |
This video warpper class provides convenient apis to access frames. | |
There exists an issue of OpenCV's VideoCapture class that jumping to a | |
certain frame may be inaccurate. It is fixed in this class by checking | |
the position after jumping each time. | |
Cache is used when decoding videos. So if the same frame is visited for | |
the second time, there is no need to decode again if it is stored in the | |
cache. | |
""" | |
def __init__(self, filename, cache_capacity=10): | |
# Check whether the video path is a url | |
if not filename.startswith(('https://', 'http://')): | |
check_file_exist(filename, 'Video file not found: ' + filename) | |
self._vcap = cv2.VideoCapture(filename) | |
assert cache_capacity > 0 | |
self._cache = Cache(cache_capacity) | |
self._position = 0 | |
# get basic info | |
self._width = int(self._vcap.get(CAP_PROP_FRAME_WIDTH)) | |
self._height = int(self._vcap.get(CAP_PROP_FRAME_HEIGHT)) | |
self._fps = self._vcap.get(CAP_PROP_FPS) | |
self._frame_cnt = int(self._vcap.get(CAP_PROP_FRAME_COUNT)) | |
self._fourcc = self._vcap.get(CAP_PROP_FOURCC) | |
def vcap(self): | |
""":obj:`cv2.VideoCapture`: The raw VideoCapture object.""" | |
return self._vcap | |
def opened(self): | |
"""bool: Indicate whether the video is opened.""" | |
return self._vcap.isOpened() | |
def width(self): | |
"""int: Width of video frames.""" | |
return self._width | |
def height(self): | |
"""int: Height of video frames.""" | |
return self._height | |
def resolution(self): | |
"""tuple: Video resolution (width, height).""" | |
return (self._width, self._height) | |
def fps(self): | |
"""float: FPS of the video.""" | |
return self._fps | |
def frame_cnt(self): | |
"""int: Total frames of the video.""" | |
return self._frame_cnt | |
def fourcc(self): | |
"""str: "Four character code" of the video.""" | |
return self._fourcc | |
def position(self): | |
"""int: Current cursor position, indicating frame decoded.""" | |
return self._position | |
def _get_real_position(self): | |
return int(round(self._vcap.get(CAP_PROP_POS_FRAMES))) | |
def _set_real_position(self, frame_id): | |
self._vcap.set(CAP_PROP_POS_FRAMES, frame_id) | |
pos = self._get_real_position() | |
for _ in range(frame_id - pos): | | | |
self._position = frame_id | |
def read(self): | |
"""Read the next frame. | |
If the next frame have been decoded before and in the cache, then | |
return it directly, otherwise decode, cache and return it. | |
Returns: | |
ndarray or None: Return the frame if successful, otherwise None. | |
""" | |
# pos = self._position | |
if self._cache: | |
img = self._cache.get(self._position) | |
if img is not None: | |
ret = True | |
else: | |
if self._position != self._get_real_position(): | |
self._set_real_position(self._position) | |
ret, img = | |
if ret: | |
self._cache.put(self._position, img) | |
else: | |
ret, img = | |
if ret: | |
self._position += 1 | |
return img | |
def get_frame(self, frame_id): | |
"""Get frame by index. | |
Args: | |
frame_id (int): Index of the expected frame, 0-based. | |
Returns: | |
ndarray or None: Return the frame if successful, otherwise None. | |
""" | |
if frame_id < 0 or frame_id >= self._frame_cnt: | |
raise IndexError( | |
f'"frame_id" must be between 0 and {self._frame_cnt - 1}') | |
if frame_id == self._position: | |
return | |
if self._cache: | |
img = self._cache.get(frame_id) | |
if img is not None: | |
self._position = frame_id + 1 | |
return img | |
self._set_real_position(frame_id) | |
ret, img = | |
if ret: | |
if self._cache: | |
self._cache.put(self._position, img) | |
self._position += 1 | |
return img | |
def current_frame(self): | |
"""Get the current frame (frame that is just visited). | |
Returns: | |
ndarray or None: If the video is fresh, return None, otherwise | |
return the frame. | |
""" | |
if self._position == 0: | |
return None | |
return self._cache.get(self._position - 1) | |
def cvt2frames(self, | |
frame_dir, | |
file_start=0, | |
filename_tmpl='{:06d}.jpg', | |
start=0, | |
max_num=0, | |
show_progress=False): | |
"""Convert a video to frame images. | |
Args: | |
frame_dir (str): Output directory to store all the frame images. | |
file_start (int): Filenames will start from the specified number. | |
filename_tmpl (str): Filename template with the index as the | |
placeholder. | |
start (int): The starting frame index. | |
max_num (int): Maximum number of frames to be written. | |
show_progress (bool): Whether to show a progress bar. | |
""" | |
mkdir_or_exist(frame_dir) | |
if max_num == 0: | |
task_num = self.frame_cnt - start | |
else: | |
task_num = min(self.frame_cnt - start, max_num) | |
if task_num <= 0: | |
raise ValueError('start must be less than total frame number') | |
if start > 0: | |
self._set_real_position(start) | |
def write_frame(file_idx): | |
img = | |
if img is None: | |
return | |
filename = osp.join(frame_dir, filename_tmpl.format(file_idx)) | |
cv2.imwrite(filename, img) | |
if show_progress: | |
pass | |
#track_progress(write_frame, range(file_start,file_start + task_num)) | |
else: | |
for i in range(task_num): | |
write_frame(file_start + i) | |
def __len__(self): | |
return self.frame_cnt | |
def __getitem__(self, index): | |
if isinstance(index, slice): | |
return [ | |
self.get_frame(i) | |
for i in range(*index.indices(self.frame_cnt)) | |
] | |
# support negative indexing | |
if index < 0: | |
index += self.frame_cnt | |
if index < 0: | |
raise IndexError('index out of range') | |
return self.get_frame(index) | |
def __iter__(self): | |
self._set_real_position(0) | |
return self | |
def __next__(self): | |
img = | |
if img is not None: | |
return img | |
else: | |
raise StopIteration | |
next = __next__ | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self._vcap.release() | |
def frames2video(frame_dir, | |
video_file, | |
fps=30, | |
fourcc='XVID', | |
filename_tmpl='{:06d}.jpg', | |
start=0, | |
end=0, | |
show_progress=False): | |
"""Read the frame images from a directory and join them as a video. | |
Args: | |
frame_dir (str): The directory containing video frames. | |
video_file (str): Output filename. | |
fps (float): FPS of the output video. | |
fourcc (str): Fourcc of the output video, this should be compatible | |
with the output file type. | |
filename_tmpl (str): Filename template with the index as the variable. | |
start (int): Starting frame index. | |
end (int): Ending frame index. | |
show_progress (bool): Whether to show a progress bar. | |
""" | |
if end == 0: | |
ext = filename_tmpl.split('.')[-1] | |
end = len([name for name in scandir(frame_dir, ext)]) | |
first_file = osp.join(frame_dir, filename_tmpl.format(start)) | |
check_file_exist(first_file, 'The start frame not found: ' + first_file) | |
img = cv2.imread(first_file) | |
height, width = img.shape[:2] | |
resolution = (width, height) | |
vwriter = cv2.VideoWriter(video_file, VideoWriter_fourcc(*fourcc), fps, | |
resolution) | |
def write_frame(file_idx): | |
filename = osp.join(frame_dir, filename_tmpl.format(file_idx)) | |
img = cv2.imread(filename) | |
vwriter.write(img) | |
if show_progress: | |
pass | |
# track_progress(write_frame, range(start, end)) | |
else: | |
for i in range(start, end): | |
write_frame(i) | |
vwriter.release() | |
def video2images(video_path, output_dir): | |
vidcap = cv2.VideoCapture(video_path) | |
in_fps = vidcap.get(cv2.CAP_PROP_FPS) | |
print('video fps:', in_fps) | |
if not os.path.isdir(output_dir): | |
os.makedirs(output_dir) | |
loaded, frame = | |
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
print(f'number of total frames is: {total_frames:06}') | |
for i_frame in range(total_frames): | |
if i_frame % 100 == 0: | |
print(f'{i_frame:06} / {total_frames:06}') | |
frame_name = os.path.join(output_dir, f'{i_frame:06}' + '.png') | |
cv2.imwrite(frame_name, frame) | |
loaded, frame = | |
def images2video(image_dir, video_path, fps=24, image_ext='png'): | |
''' | |
#codec = cv2.VideoWriter_fourcc(*'XVID') | |
#codec = cv2.VideoWriter_fourcc('A','V','C','1') | |
#codec = cv2.VideoWriter_fourcc('Y','U','V','1') | |
#codec = cv2.VideoWriter_fourcc('P','I','M','1') | |
#codec = cv2.VideoWriter_fourcc('M','J','P','G') | |
codec = cv2.VideoWriter_fourcc('M','P','4','2') | |
#codec = cv2.VideoWriter_fourcc('D','I','V','3') | |
#codec = cv2.VideoWriter_fourcc('D','I','V','X') | |
#codec = cv2.VideoWriter_fourcc('U','2','6','3') | |
#codec = cv2.VideoWriter_fourcc('I','2','6','3') | |
#codec = cv2.VideoWriter_fourcc('F','L','V','1') | |
#codec = cv2.VideoWriter_fourcc('H','2','6','4') | |
#codec = cv2.VideoWriter_fourcc('A','Y','U','V') | |
#codec = cv2.VideoWriter_fourcc('I','U','Y','V') | |
编码器常用的几种: | |
cv2.VideoWriter_fourcc("I", "4", "2", "0") | |
压缩的yuv颜色编码器,4:2:0色彩度子采样 兼容性好,产生很大的视频 avi | |
cv2.VideoWriter_fourcc("P", I", "M", "1") | |
采用mpeg-1编码,文件为avi | |
cv2.VideoWriter_fourcc("X", "V", "T", "D") | |
采用mpeg-4编码,得到视频大小平均 拓展名avi | |
cv2.VideoWriter_fourcc("T", "H", "E", "O") | |
Ogg Vorbis, 拓展名为ogv | |
cv2.VideoWriter_fourcc("F", "L", "V", "1") | |
FLASH视频,拓展名为.flv | |
''' | |
image_files = sorted(glob.glob(os.path.join(image_dir, '*.{}'.format(image_ext)))) | |
print(len(image_files)) | |
height, width, _ = cv2.imread(image_files[0]).shape | |
out_fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G') # cv2.VideoWriter_fourcc(*'MP4V') | |
out_video = cv2.VideoWriter(video_path, out_fourcc, fps, (width, height)) | |
for image_file in image_files: | |
img = cv2.imread(image_file) | |
img = cv2.resize(img, (width, height), interpolation=3) | |
out_video.write(img) | |
out_video.release() | |
def add_video_compression(imgs): | |
codec_type = ['libx264', 'h264', 'mpeg4'] | |
codec_prob = [1 / 3., 1 / 3., 1 / 3.] | |
codec = random.choices(codec_type, codec_prob)[0] | |
# codec = 'mpeg4' | |
bitrate = [1e4, 1e5] | |
bitrate = np.random.randint(bitrate[0], bitrate[1] + 1) | |
buf = io.BytesIO() | |
with, 'w', 'mp4') as container: | |
stream = container.add_stream(codec, rate=1) | |
stream.height = imgs[0].shape[0] | |
stream.width = imgs[0].shape[1] | |
stream.pix_fmt = 'yuv420p' | |
stream.bit_rate = bitrate | |
for img in imgs: | |
img = np.uint8((img.clip(0, 1)*255.).round()) | |
frame = av.VideoFrame.from_ndarray(img, format='rgb24') | |
frame.pict_type = 'NONE' | |
# pdb.set_trace() | |
for packet in stream.encode(frame): | |
container.mux(packet) | |
# Flush stream | |
for packet in stream.encode(): | |
container.mux(packet) | |
outputs = [] | |
with, 'r', 'mp4') as container: | |
if | |
for frame in container.decode(**{'video': 0}): | |
outputs.append( | |
frame.to_rgb().to_ndarray().astype(np.float32) / 255.) | |
#outputs = np.stack(outputs, axis=0) | |
return outputs | |
if __name__ == '__main__': | |
# ----------------------------------- | |
# test VideoReader(filename, cache_capacity=10) | |
# ----------------------------------- | |
# video_reader = VideoReader('utils/test.mp4') | |
# from utils import utils_image as util | |
# inputs = [] | |
# for frame in video_reader: | |
# print(frame.dtype) | |
# util.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
# #util.imshow(np.flip(frame, axis=2)) | |
# ----------------------------------- | |
# test video2images(video_path, output_dir) | |
# ----------------------------------- | |
# video2images('utils/test.mp4', 'frames') | |
# ----------------------------------- | |
# test images2video(image_dir, video_path, fps=24, image_ext='png') | |
# ----------------------------------- | |
# images2video('frames', 'video_02.mp4', fps=30, image_ext='png') | |
# ----------------------------------- | |
# test frames2video(frame_dir, video_file, fps=30, fourcc='XVID', filename_tmpl='{:06d}.png') | |
# ----------------------------------- | |
# frames2video('frames', 'video_01.mp4', filename_tmpl='{:06d}.png') | |
# ----------------------------------- | |
# test add_video_compression(imgs) | |
# ----------------------------------- | |
# imgs = [] | |
# image_ext = 'png' | |
# frames = 'frames' | |
# from utils import utils_image as util | |
# image_files = sorted(glob.glob(os.path.join(frames, '*.{}'.format(image_ext)))) | |
# for i, image_file in enumerate(image_files): | |
# if i < 7: | |
# img = util.imread_uint(image_file, 3) | |
# img = util.uint2single(img) | |
# imgs.append(img) | |
# | |
# results = add_video_compression(imgs) | |
# for i, img in enumerate(results): | |
# util.imshow(util.single2uint(img)) | |
# util.imsave(util.single2uint(img),f'{i:05}.png') | |
# run utils/ | |